# EventLoop
我们在上一章节的例子中,首先创建 bossGroup 和 workerGroup 两个 EventLoopGroup
:
// 因为博客显示问题,我这里写成两行 | |
EventLoopGroup bossGroup = new NioEventLoopGroup(); | |
EventLoopGroup workerGroup = new NioEventLoopGroup(); |
我们之前也提到,这两个类似于主从 Reactor。之前在编写 NIO 代码时,就是用了 while
循环,不断使用 selector
监听新的事件,EventLoop 也是这种思想,本质是事件等待 / 处理线程。
但是这与 NIO 又有不同:
EvetnLoopGroup
包含多个EventLoop
。- 一个
EventLoop
绑定一个线程,只有第一次执行时才会通过ThreadFactory
创建线程 - 一个
Channel
只会绑定一个EventLoop
,绑定后不会修改。但是一个EventLoop
可以被多个Channel
绑定(多对一的关系)
先看一下示意图:
我们之前写的代码,多个 Channel
注册到同一个 EventLoop 中,本身没有什么问题。但是我们将数据的读写和数据的处理(其他操作)放在了一起,如果某一个 Channel
的数据处理阻塞了,就会导致其他 Channel
也阻塞。所以使用 Netty 进行程序开发时,我们需要对 ChannelHandler 的实现逻辑有充分的风险意识。
protected void initChannel(SocketChannel channel) { | |
channel.pipeline() | |
.addLast(new ChannelInboundHandlerAdapter(){ | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | |
ByteBuf buf = (ByteBuf) msg; | |
System.out.println("接收到客户端发送的数据:"+ | |
buf.toString(StandardCharsets.UTF_8)); | |
// 这里我们直接卡 10 秒假装在处理任务 | |
// 就会导致阻塞,其他 Channel 不能处理 | |
Thread.sleep(10000); | |
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes())); | |
} | |
}); | |
} |
在 IO 多路复用详解就说过,Reactor 模型是事件驱动,在完成读写请求时不会阻塞,因为只有客户端真的发送了读写请求,服务端才会进行读写操作。所以会出现阻塞的只有数据处理那一部分。
我们回忆一下多线程 Reactor 模型中,Handler 负责处理读写,其他任务(解码,编码等)都交给线程池处理
public void run() { | |
try { | |
ByteBuffer buffer = ByteBuffer.allocate(1024); | |
channel.read(buffer); | |
buffer.flip(); | |
POOL.submit(() -> { | |
try { | |
System.out.println("接收到客户端数据:"+ | |
new String(buffer.array(), 0, | |
buffer.remaining())); | |
channel.write(ByteBuffer.wrap("已收到!".getBytes())); | |
}catch (IOException e){ | |
e.printStackTrace(); | |
} | |
}); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} |
Netty 处理这种情况,可以再创建一个 EventLoopGroup
来专门处理阻塞事件
EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); | |
//... | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | |
ByteBuf buf = (ByteBuf) msg; | |
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); | |
handlerGroup.submit(() -> { | |
// 由于继承自 ScheduledExecutorService,直接提交任务 | |
try { | |
Thread.sleep(10000); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes())); | |
}); | |
} |
当然也可以写成流水线,一个 ChannelHandler 处理读写事件,然后将任务传给下一个 ChannelHandler,处理阻塞时间
最后顺便给出客户端代码
public static void main(String[] args) { | |
// 客户端也通过 Bootstrap 启动 | |
Bootstrap bootstrap = new Bootstrap(); | |
// 客户端只需要一个 EventLoop 即可 | |
bootstrap.group(new NioEventLoopGroup()) | |
.channel(NioSocketChannel.class) | |
.handler(new ChannelInitializer<SocketChannel>() { | |
@Override | |
protected void initChannel(SocketChannel channel) throws Exception { | |
channel.pipeline().addLast(new ChannelHandlerContextAdapter() { | |
@Override | |
public void ChannelRead(ChannelHandlerContext ctx,Object msg) thorws Exception { | |
ByteBuf buf = (Byte) msg; | |
System.out.println(">> 接收到客户端发送数据:" + | |
buf,toString(StandardCharsets.UTF_8)); | |
} | |
}); | |
} | |
}); | |
Channel channel = bootstrap.connect("localhost",8888).channel(); | |
try(Scanner in = new Scanner(System.in)) { | |
while(true) { | |
System.out.println("<< 请输入要发送给服务端的内容"); | |
String text = in.nextLine(); | |
if(text,isEmpty()) continue; | |
channel.writeAndFlush(unPooled.wrappedBuffer(text.getBuffer())); | |
} | |
} | |
} |
# 异步获得结果
# ChannelFuture
Netty 中的 Channel 相关操作都是异步进行的,并不是在当前线程同步执行,我们不能立即得到执行结果。所以想要得到结果就需要使用 Future
// 先看一下使用 | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | |
ByteBuf buff = (ByteBuf) msg; | |
System.out.println("接收到客户端发送的数据:" + | |
buff.toString(StandardCharsets.UTF_8)); | |
ChannelFuture future = ctx.writeAndFlush(Unpool.wrappedBuffer("已收到".getBytes())); | |
System.out.println("任务完成状态:" + funture.isDone()); | |
} |
ChannelFuture
继承Netty
自定义的Future
,但是这个自定义的Future
接口也是继承了j.u.c
里面的Future
接口。而我们也是主要讲解ChannelFuture
。
包括服务端启动也是返回的 ChannelFuture
:
ChannelFuture future = bootstrap.bind(8888); | |
System.out.println(future.isDone());// false |
服务端启动比较慢,所以一开始直接获取 isDone
的结果会返回 false
。如果需要当服务端启动后我们才能进行下一步操作,相当于把异步恢复成同步,有两种方案
- 方案一:使用
sync
方法
ChannelFuture future = bootstrap.bind(8888); | |
future.sync();// 让线程同步等待任务返回 | |
System.out.println(future.isDone());// false |
- 方案二:设置监听器
ChannelFuture future = bootstrap.bind(8888); | |
// 直接添加监听器,当任务完成时自动执行,但是注意执行也是异步的,不是在当前线程 | |
future.addListener((ChannelFutureListener) channelFuture -> | |
System.out.println("我是服务端启动完成之后要做的事情!")); |
ChannelFuture
中定义了监听方法
public interface ChannelFuture extends Future<Void> { | |
Channel channel(); // 我们可以直接获取此任务的 Channel | |
ChannelFuture addListener(GenericFutureListener<? extends | |
Future<? super Void>> var1); | |
// 当任务完成时,会直接执行 GenericFutureListener 的任务,注意执行的位置也是在 EventLoop 中 | |
boolean isVoid(); // 返回类型是否为 void | |
} |
GenericFutureListener
是一个函数接口,只有一个operationComplete
方法,可以使用 Lambda 表达式。而且,执行任务的线程在完成任务后就会执行事件监听里的任务(而不是在定义事件监听的线程中执行)。
# Promise
该接口支持手动设定成功和失败的结果:
// 此接口也是继承自 Netty 中的 Future 接口 | |
public interface Promise<V> extends Future<V> { | |
Promise<V> setSuccess(V var1); // 手动设定成功 | |
boolean trySuccess(V var1); | |
Promise<V> setFailure(Throwable var1); // 手动设定失败 | |
boolean tryFailure(Throwable var1); | |
boolean setUncancellable(); | |
// 这些就和之前的 Future 是一样的了 | |
Promise<V> addListener(GenericFutureListener<? extends | |
Future<? super V>> var1); | |
Promise<V> addListeners(GenericFutureListener<? extends | |
Future<? super V>>... var1); | |
Promise<V> removeListener(GenericFutureListener<? extends | |
Future<? super V>> var1); | |
Promise<V> removeListeners(GenericFutureListener<? extends | |
Future<? super V>>... var1); | |
Promise<V> await() throws InterruptedException; | |
Promise<V> awaitUninterruptibly(); | |
Promise<V> sync() throws InterruptedException; | |
Promise<V> syncUninterruptibly(); | |
} |
该接口也是可以获取异步执行结果情况
.addLast(new ChannelInboundHandlerAdapter(){ | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | |
ByteBuf buf = (ByteBuf) msg; | |
String text = buf.toString(StandardCharsets.UTF_8); | |
System.out.println("接收到客户端发送的数据:"+text); | |
ChannelPromise promise = new DefaultChannelPromise(channel); | |
System.out.println(promise.isSuccess()); | |
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()), promise); | |
promise.sync(); // 同步等待一下 | |
System.out.println(promise.isSuccess()); | |
} | |
}); |
# 参考
Netty 源码之 EventLoopGroup:https://www.zhiyi.zone/netty/EventLoopGroup.html#multithreadeventloopgroup
语雀・青空の霞光:https://www.yuque.com/qingkongxiaguang/javase/ibx6ug#b7dc6f87
技术文章摘抄