# Netty 基础代码

我们先来看一下 Netty 搭建服务端和客户端一个简单的例子

服务端:

public static void main(String[] args) {
    // 使用 NioEventLoopGroup 实现类,创建 BossGroup 和 WorkerGroup
    /** 其他的还有 EpollEventLoopGroup,但是仅支持 Linux,这是 Netty 基于 Linux 底层
    	Epoll 单独编写的一套本地实现,没有使用 NIO 那套 */
    
    EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();
    // 创建服务端启动引导类
    ServerBootstrap bootstrap = new ServerBootstrap();
    // 可链式
    bootstrap
            .group(bossGroup, workerGroup)   // 指定事件循环组
            .channel(NioServerSocketChannel.class)   // 指定为 NIO 的 ServerSocketChannel
            .childHandler(new ChannelInitializer<SocketChannel>() {   
                //SocketChannel 不是 NIO 里面的,是 Netty 的
                @Override
                protected void initChannel(SocketChannel channel) {
                    channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){   
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) {
                            //ctx 是上下文,msg 是收到的消息,默认以 ByteBuf 形式(也可以是其他形式,后面再说)
                            ByteBuf buf = (ByteBuf) msg;   // 类型转换一下
                            System.out.println(Thread.currentThread().getName() + 
                                               " >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                            // 通过上下文可以直接发送数据回去,注意要 writeAndFlush 才能让客户端立即收到
                            ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
                        }
                    });
                }
            });
    // 最后绑定端口,启动
    bootstrap.bind(8080);
}

客户端:

public static void main(String[] args) {
    try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
         Scanner scanner = new Scanner(System.in)){
        System.out.println("已连接到服务端!");
        while (true) {
            System.out.println("请输入要发送给服务端的内容:");
            String text = scanner.nextLine();
            if(text.isEmpty()) continue;
            channel.write(ByteBuffer.wrap(text.getBytes()));
            System.out.println("已发送!");
            ByteBuffer buffer = ByteBuffer.allocate(128);
            channel.read(buffer);   // 直接从通道中读取数据
            buffer.flip();
            System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining()));
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

服务端很多陌生的类,这就是我们之后要讲解的,客户端的代码应该很熟悉了。

服务端前两行代码,一个是创建 bossGroup 和 workerGroup,另一个是创建服务端启动引导类 ServerBootstrap。(bootstrap 翻译为引导程序)。

看后面的链式代码就知道,我们通过 ServerBootstrap 配置相关信息。比如设置 group,channel,handler 等。

# Channel 通道

Netty 的 Channel 没有直接使用 NIO 的 Channel,通道支持 IO 操作,但是所有的 IO 操作都是异步的(Netty)。意味着任何 IO 调用都将立即返回,但不能保证请求的 IO 操作在调用时已经完成。它会返回一个 ChannelFuture 实例(在 JUC 里面我们就将结果关于这种异步获取结果的 FutureTask 类),该实例将在请求的 IO 操作成功,失败或取消时通知你。

关于 Channel ,在 Netty 中,还有 ChannelHandlerChannelPipeline 两个组件。

  • ChannelHandler 负责 Channel 的逻辑处理,其实就是 Reactor 里面的 Handler。
  • ChannelPipeline 负责管理 ChannelHandler,他是一个流水线容器,存储结构为双向链表。
  • 从上面的代码可以看出,通过链式代码,我们不断将 ChannelInboundHandlerAdapter (实现了 ChannelHandler)加入到 Pipe 中

一个 Channel 包含一个 ChannelPipeline ,所有 ChannelHandler 都会顺序加入到 ChannelPipeline 中,创建 Channel 时会自动创建一个 ChannelPipeline ;每个 Channel 都会有一个 ChannelPipeline ,这个关联关系是永久性的。

关系图如下:

其实 ChannelPipeline 维护的是由 ChannelHandlerContext 组成的双向链表,只是每一个 ChannelHandlerContext 又关联着一个 ChannelHandler

我们现在来看一下源码

Channel 源码,里面方法比较多,这里我们只看几个重要的

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
    ChannelId id();   // 通道 ID
    
    // 获取此通道所属的 EventLoop,因为一个 Channel 在它的
    // 生命周期内只能注册到一个 EventLoop 中
    EventLoop eventLoop();  
    
    //Channel 是具有层级关系的,这里是返回父 Channel
    Channel parent();   
    // 通道当前的相关状态
    boolean isOpen();   
    boolean isRegistered();
    boolean isActive();
    
    // 关闭通道,但是会用到 ChannelFuture
    ChannelFuture closeFuture();
    
    // 流水线,例子中我们就是通过 pipeline 拿到管道,
    // 然后加入 ChannelHandler
    ChannelPipeline pipeline();   
    
     // 可以直接从 Channel 拿到 ByteBufAllocator 的实例,来分配 ByteBuf
    ByteBufAllocator alloc();  
    Channel read();
    Channel flush();   // 刷新,基操
}

其中 pipeline() 方法在例子中使用到了,我们再来看一下 ChannelHandler 。该接口定义了一些负责完成具体请求的方法,例子中使用到的 ChannelInboundHandlerAdapter 抽象类是 ChannelInboundHandler 的接口实现,用于处理入站数据(Inbound 表示入站,Outbound 表示出站),抽象类重写对应方法进行请求处理,这些方法会在合适的时间被调用。

channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {  
      	//ctx 是上下文,msg 是收到的消息,以 ByteBuf 形式
        ByteBuf buf = (ByteBuf) msg;   // 类型转换一下
        System.out.println(Thread.currentThread().getName()
                           +" >> 接收到客户端发送的数据:"
                           +buf.toString(StandardCharsets.UTF_8));
        
        // 通过上下文可以直接发送数据回去,注意要 writeAndFlush 才能让客户端立即收到
        ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
    }
});

我们先看顶层接口 ChannelHandler :只有一些流水线相关的回调方法

public interface ChannelHandler {
  	// 当 ChannelHandler 被添加到流水线中时调用
    void handlerAdded(ChannelHandlerContext var1) throws Exception;
	// 当 ChannelHandler 从流水线中移除时调用
    void handlerRemoved(ChannelHandlerContext var1) throws Exception;
	
    // 还有一个过时方法,不管
}

看下一级接口 ChannelInboundHandler

//ChannelInboundHandler 用于处理入站相关事件
public interface ChannelInboundHandler extends ChannelHandler {
  	/** 当 Channel 已经注册到自己的 EventLoop 上时调用,前面我们说了,
    一个 Channel 只会注册到一个 EventLoop 上,注册到 EventLoop 后,
    这样才会在发生对应事件时被通知。*/
    void channelRegistered(ChannelHandlerContext var1) throws Exception;
    
	// 从 EventLoop 上取消注册时
    void channelUnregistered(ChannelHandlerContext var1) throws Exception;
	// 当 Channel 已经处于活跃状态时被调用,此时 Channel 已经连接 / 绑定,并且已经就绪
    void channelActive(ChannelHandlerContext var1) throws Exception;
	// 跟上面相反,不再活跃了,并且不在连接它的远程节点
    void channelInactive(ChannelHandlerContext var1) throws Exception;
    
	// 当从 Channel 读取数据时被调用,可以看到数据被自动包装成了一个 Object(默认是 ByteBuf)
    void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
	
    // 上一个读取操作完成后调用
    void channelReadComplete(ChannelHandlerContext var1) throws Exception;
	// 暂时不介绍
    void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
	// 当 Channel 的可写状态发生改变时被调用
    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
	
    // 出现异常时被调用
    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}

现在我们就可以看到, ChannelInboundHandler 负责处理请求,是事件驱动的。上面用到的 ChannelInboundHandlerAdapter 实际上就是对这些方法实现的抽象类,相比直接用接口,我们可以只重写我们需要的方法,没有重写的方法会默认向流水线下一个 ChannelHandler 发送。

与之对应的还有 ChannelOutboundHandler ,用于处理出站相关操作,此处不再讲解

最后我们看一下 ChannelPipeline ,这个双向链表容器保存著 ChannelHandler ,里面的 ChannelHandler 顺序存储,从客户端发送过来的请求,入站会依次经过 ChannelHandler ,直到它被某一个 handler 完成请求,然后出站时会经过处理出站的 ChannelHandler

比如我们希望创建两个入站的 ChannelHandler ,一个用于接收请求并处理,一个用于处理当前接收请求过程中出现的异常。

.childHandler(new ChannelInitializer<SocketChannel>() {   
    @Override
    protected void initChannel(SocketChannel channel) {
    	channel.pipeline()
            .addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println("接收到客户端发送数据:"+
                                       buf.toString(StandardCharsets.UTF_8));
                    // 此处故意人为抛出异常
                    throw new RuntimeException("我是异常");
                }
            })
            .addLast(new ChannelInboundHandlerAdapter(){
               @Override
                public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
                    System.out.println("处理异常:" + cause);
                }
            });
    }
}

如果我们不在 ChannelHandler 重写对应方法,他就会默认传播到流水线下一个 ChannelHandler 。也有人为传递

public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
    // 通过 ChannelHandlerContext 来向下传递,ChannelHandlerContext
    // 是在 Handler 添加进 Pipeline 中时就被自动创建的
    ctx.fireExceptionCaught(cause);
}

关于通道,最后我们看一下出站的 ChannelHandler 相关的机制

@Override
protected void initChannel(SocketChannel channel) {
    channel.pipeline()   // 直接获取 pipeline,然后添加两个 Handler
            .addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                    ctx.fireChannelRead(msg);
                }
            })
            .addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                    ctx.channel().writeAndFlush("伞兵一号卢本伟");  // 这里我们使用 channel 的 write
                }
            })
            .addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    System.out.println("1号出站:"+msg);
                }
            })
            .addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    System.out.println("2号出站:"+msg);
                    
                    // 继续 write 给其他的出站 Handler,不然到这里就断了
                    ctx.write(msg);  
                }
            });
}

重点就是按照顺序存储,但是第一个 ChannelOutbindHandlerAdapter 并没有 write 方法。这是因为当我们入站的 ChannelHandler 完成操作后(比如写请求,就需要有一个 ChannelHandler 调用 write 方法),就会从当前位置倒着找 ChannelOutbindHandler ,找到第一个,再调用。

# 参考

知一码园:https://www.zhiyi.zone/netty/Channel.html

Netty Channel 组件作用:https://www.programminghunter.com/article/12712042451/

语雀・青空の霞光:https://www.yuque.com/qingkongxiaguang/javase/ibx6ug#f443a934