# EventLoop

我们在上一章节的例子中,首先创建 bossGroup 和 workerGroup 两个 EventLoopGroup

// 因为博客显示问题,我这里写成两行
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

我们之前也提到,这两个类似于主从 Reactor。之前在编写 NIO 代码时,就是用了 while 循环,不断使用 selector 监听新的事件,EventLoop 也是这种思想,本质是事件等待 / 处理线程。

但是这与 NIO 又有不同:

  • EvetnLoopGroup 包含多个 EventLoop
  • 一个 EventLoop 绑定一个线程,只有第一次执行时才会通过 ThreadFactory 创建线程
  • 一个 Channel 只会绑定一个 EventLoop ,绑定后不会修改。但是一个 EventLoop 可以被多个 Channel 绑定(多对一的关系)

先看一下示意图:

我们之前写的代码,多个 Channel 注册到同一个 EventLoop 中,本身没有什么问题。但是我们将数据的读写和数据的处理(其他操作)放在了一起,如果某一个 Channel 的数据处理阻塞了,就会导致其他 Channel 也阻塞。所以使用 Netty 进行程序开发时,我们需要对 ChannelHandler 的实现逻辑有充分的风险意识

protected void initChannel(SocketChannel channel) {
    channel.pipeline()
        .addLast(new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                System.out.println("接收到客户端发送的数据:"+
                                   buf.toString(StandardCharsets.UTF_8));
                // 这里我们直接卡 10 秒假装在处理任务
                // 就会导致阻塞,其他 Channel 不能处理
                Thread.sleep(10000);   
                ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
            }
        });
}

在 IO 多路复用详解就说过,Reactor 模型是事件驱动,在完成读写请求时不会阻塞,因为只有客户端真的发送了读写请求,服务端才会进行读写操作。所以会出现阻塞的只有数据处理那一部分。

我们回忆一下多线程 Reactor 模型中,Handler 负责处理读写,其他任务(解码,编码等)都交给线程池处理

public void run() {
    try {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer);
        buffer.flip();
        POOL.submit(() -> {
            try {
                System.out.println("接收到客户端数据:"+
                                   new String(buffer.array(), 0, 
                                              buffer.remaining()));
                channel.write(ByteBuffer.wrap("已收到!".getBytes()));
            }catch (IOException e){
                e.printStackTrace();
            }
        });
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

Netty 处理这种情况,可以再创建一个 EventLoopGroup 来专门处理阻塞事件

EventLoopGroup handlerGroup = new DefaultEventLoopGroup();
//...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
    handlerGroup.submit(() -> {   
        // 由于继承自 ScheduledExecutorService,直接提交任务
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
    });
}

当然也可以写成流水线,一个 ChannelHandler 处理读写事件,然后将任务传给下一个 ChannelHandler,处理阻塞时间

最后顺便给出客户端代码

public static void main(String[] args) {
    // 客户端也通过 Bootstrap 启动
    Bootstrap bootstrap = new Bootstrap();
    // 客户端只需要一个 EventLoop 即可
    bootstrap.group(new NioEventLoopGroup())
    		 .channel(NioSocketChannel.class)
        	 .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel channel) throws Exception {
                     channel.pipeline().addLast(new ChannelHandlerContextAdapter() {
                         @Override
                         public void ChannelRead(ChannelHandlerContext ctx,Object msg) thorws Exception {
                             ByteBuf buf = (Byte) msg;
                             System.out.println(">> 接收到客户端发送数据:" + 
                                               buf,toString(StandardCharsets.UTF_8));
                         }
                     });
                 }
             });
    
    Channel channel = bootstrap.connect("localhost",8888).channel();
    try(Scanner in = new Scanner(System.in)) {
        while(true) {
			System.out.println("<< 请输入要发送给服务端的内容");
            String text = in.nextLine();
            if(text,isEmpty()) continue;
            channel.writeAndFlush(unPooled.wrappedBuffer(text.getBuffer()));
        }
    }
}

# 异步获得结果

# ChannelFuture

Netty 中的 Channel 相关操作都是异步进行的,并不是在当前线程同步执行,我们不能立即得到执行结果。所以想要得到结果就需要使用 Future

// 先看一下使用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf buff = (ByteBuf) msg;
    System.out.println("接收到客户端发送的数据:" + 
                      buff.toString(StandardCharsets.UTF_8));
    
    ChannelFuture future = ctx.writeAndFlush(Unpool.wrappedBuffer("已收到".getBytes()));
    
    System.out.println("任务完成状态:" + funture.isDone());
}

ChannelFuture 继承 Netty 自定义的 Future ,但是这个自定义的 Future 接口也是继承了 j.u.c 里面的 Future 接口。而我们也是主要讲解 ChannelFuture

包括服务端启动也是返回的 ChannelFuture

ChannelFuture future = bootstrap.bind(8888);
System.out.println(future.isDone());// false

服务端启动比较慢,所以一开始直接获取 isDone 的结果会返回 false 。如果需要当服务端启动后我们才能进行下一步操作,相当于把异步恢复成同步,有两种方案

  • 方案一:使用 sync 方法
ChannelFuture future = bootstrap.bind(8888);
future.sync();// 让线程同步等待任务返回
System.out.println(future.isDone());// false
  • 方案二:设置监听器
ChannelFuture future = bootstrap.bind(8888);
// 直接添加监听器,当任务完成时自动执行,但是注意执行也是异步的,不是在当前线程
future.addListener((ChannelFutureListener) channelFuture -> 
                   System.out.println("我是服务端启动完成之后要做的事情!"));

ChannelFuture 中定义了监听方法

public interface ChannelFuture extends Future<Void> {
    Channel channel();    // 我们可以直接获取此任务的 Channel
    ChannelFuture addListener(GenericFutureListener<? extends 
                              Future<? super Void>> var1);  
    // 当任务完成时,会直接执行 GenericFutureListener 的任务,注意执行的位置也是在 EventLoop 中
    boolean isVoid();   // 返回类型是否为 void
}

GenericFutureListener 是一个函数接口,只有一个 operationComplete 方法,可以使用 Lambda 表达式。而且,执行任务的线程在完成任务后就会执行事件监听里的任务(而不是在定义事件监听的线程中执行)。

# Promise

该接口支持手动设定成功和失败的结果:

// 此接口也是继承自 Netty 中的 Future 接口
public interface Promise<V> extends Future<V> {
    Promise<V> setSuccess(V var1);    // 手动设定成功
    boolean trySuccess(V var1);
    Promise<V> setFailure(Throwable var1);  // 手动设定失败
    boolean tryFailure(Throwable var1);
    boolean setUncancellable();
    
	// 这些就和之前的 Future 是一样的了
    Promise<V> addListener(GenericFutureListener<? extends 
                           Future<? super V>> var1);
    
    Promise<V> addListeners(GenericFutureListener<? extends 
                            Future<? super V>>... var1);
    
    Promise<V> removeListener(GenericFutureListener<? extends 
                              Future<? super V>> var1);
    
    Promise<V> removeListeners(GenericFutureListener<? extends 
                               Future<? super V>>... var1);
    
    Promise<V> await() throws InterruptedException;
    Promise<V> awaitUninterruptibly();
    Promise<V> sync() throws InterruptedException;
    Promise<V> syncUninterruptibly();
}

该接口也是可以获取异步执行结果情况

.addLast(new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        String text = buf.toString(StandardCharsets.UTF_8);
        System.out.println("接收到客户端发送的数据:"+text);
        
        ChannelPromise promise = new DefaultChannelPromise(channel);
        System.out.println(promise.isSuccess());
        ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()), promise);
        
        promise.sync();  // 同步等待一下
        System.out.println(promise.isSuccess());
    }
});

# 参考

Netty 源码之 EventLoopGroup:https://www.zhiyi.zone/netty/EventLoopGroup.html#multithreadeventloopgroup

语雀・青空の霞光:https://www.yuque.com/qingkongxiaguang/javase/ibx6ug#b7dc6f87

技术文章摘抄