非阻塞的NIO可以开发出性能很不错的网络应用程序,而Netty和Mina这类NIO框架对Java NIO做了封装,可以让高性能IO程序的开发变得容易。

Netty是一个基于Java NIO的网络通信框架,它有两个特点

  • 基于事件
  • 全异步

话不多说先上代码,从代码感受Netty的思想是最直接的。要想实现一个服务器,需要三个类

  • Server类

    public class SimpleChatServer {
    private int port;
     
    public SimpleChatServer(int port){
        this.port = port;
    }
     
    public void run() throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new SimpleChatServerInitializer())
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
     
            System.out.println(">>>>>>>>>>Server start<<<<<<<<<<");
     
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        }
        finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            System.out.println(">>>>>>>>>>Server stop<<<<<<<<<<");
            System.out.println();
        }
    }
     
    public static void main(String[] args) throws Exception{
        int port = Host.SERVER_PORT;
        new SimpleChatServer(port).run();
    }
    }
    

服务器的任务是接受来自客户端的连接,以及处理客户端发来的数据,这就依靠EventLoopGroup来实现,Group实际上就是线程组,它包含了一些EventLoop(线程)。我们需要2个Group,是因为第一个用来接受连接(boss),另一个用来处理收到的信息(worker)。

因为启动配置服务器操作繁杂,Netty提供的Bootstrap类(快速引导类)给我们带来方便,新建一个ServerBootstrap实例,配置好线程组,以及服务器Channel(一般都会选择NIO实现的NioServerSocketChannel.class)以及childHandler、option、childOption

Netty里的child就可以理解为worker,parent是boss。所以childHandler意思就是处理收到的信息的处理器,因为我们暂时不需要对服务器新接受的连接做额外的处理,所以不配置.handler,只需要配置.childHandler

.option和.childOption和上面同理,一个是服务器的TCP选项,另一个是来自客户端的连接的TCP选项。

配置好这项项目,Bootstrap.bind方法绑定端口,并开始监听连接,服务器就启动了。

  • Initializer类

    public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> {
       
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("idle", new IdleStateHandler(10,10,30, TimeUnit.SECONDS));
        pipeline.addLast("decoder", new MyDecoder());
        pipeline.addLast("encoder", new MyEncoder());
        pipeline.addLast("heartbeat", new HeartbeatServerHandler());
        pipeline.addLast("handler", new SimpleChatServerHandler());
    }
    }
    

bootstrap的childHandler配置,就是这个类。这个类可以理解为许多Handler 的容器,作为基于事件的框架,最重要的就是处理事件(比如新的连接,收到的消息等等),pipeline就像一个管道,里面流通着许多的事件,通过addLast的方式给pipeline添加各种各样的handler,然后netty通过pipeline来把各个事件分发给handler

我添加了5个handler,他们的作用分别是

idlestatehandler 它是netty提供的一个工具型handler,它能够检查channel的读、写是否空闲,我们可以通过实现userEventTriggered方法来处理这些情况。我们主要通过他来实现心跳功能,如果channel长时间读写空闲,它可能已经掉线,我们就需要发心跳包来确定它的状态。

myDecoder 是我的解码器,它继承Netty的BytesToMessageDecoder。因为TCP的数据传输是基于流的传输,也就是说我们发的信息“Hi, Shawn!”传输时会被分成很多字节并且可能分到多个数据包中,解码器的作用就是把字节流翻译成需要的数据。

myEncoder是我的编码器,它继承Netty的MessageToBytesEncoder。作用和解码器相反。

heartbeatServerHandler是我的心跳处理器,针对客户端发来的心跳包做特定的处理。

SimpleChatServerHandler就是聊天的核心处理器了,客户端发送的信息,可以在channelRead0方法里得到,并且做一些业务逻辑,比如持久化到数据库,或者广播给其他channel等等。

注意initializer类其实也是个Handler,加载了我们的Handler以后,它会从pipeline里移除,所以他的作用就是加载其他handler。

  • Handler类

    public class SimpleChatServerHandler extends SimpleChannelInboundHandler<MyProtocol> {
    //begin
     
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
     
     
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        channels.add(incoming);
        channels.writeAndFlush(DateUtil.getTime()+" "+incoming.remoteAddress()+" online\n");
        System.out.println(DateUtil.getTime()+" "+ incoming.remoteAddress()+" online "+Conter.oncnt++);
     
    }
     
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        channels.remove(incoming);
        channels.writeAndFlush(DateUtil.getTime()+" "+incoming.remoteAddress()+" offline\n");
        System.out.println(DateUtil.getTime()+" "+ incoming.remoteAddress()+" offline "+Conter.offcnt++);
    }
     
     
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //Channel incoming = ctx.channel();
        //System.out.println(DateUtil.getTime()+" "+ incoming.remoteAddress()+" exception");
        System.out.println("ERROR: "+cause.getMessage());
        ctx.close();
    }
     
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyProtocol myProtocol) throws Exception {
        if(myProtocol.getContentType() == Constant.TYPE_HEARTBEAT){
            Heartbeat heartbeat = (Heartbeat) ConvertUtil.getObject(myProtocol.getContent());
            System.out.println("received heartbeat "+heartbeat.getInfo()+" from "+heartbeat.getOrigin());
        } else {
            Book book = (Book) ConvertUtil.getObject(myProtocol.getContent());
            Channel incoming = ctx.channel();
            System.out.println(DateUtil.getTime()+" "+incoming.remoteAddress()+" "+book.toString());
            channels.writeAndFlush(DateUtil.getTime()+" "+incoming.remoteAddress()+" "+book.toString()+"\n");
        }
     
    }
    }
    

(这是其中一个handler的代码)

new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);得到所有的channel,也就是所有的连接

channelActive 当有一个新的连接,触发这个方法,我在里面写了channels.writeAndFlush就是给所有channel发一个广播,当有人连接成功,所有人都会收到一条消息 xxx.xxx.xxx online

channelInactive 某个连接断开,触发这个方法

exceptionCaught 当遇到异常,触发这个方法。比如某个客户端强制断开连接。

channelRead0 读取信息

源码 Open source