# Netty 基础代码
我们先来看一下 Netty 搭建服务端和客户端一个简单的例子
服务端:
public static void main(String[] args) { | |
// 使用 NioEventLoopGroup 实现类,创建 BossGroup 和 WorkerGroup | |
/** 其他的还有 EpollEventLoopGroup,但是仅支持 Linux,这是 Netty 基于 Linux 底层 | |
Epoll 单独编写的一套本地实现,没有使用 NIO 那套 */ | |
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(); | |
// 创建服务端启动引导类 | |
ServerBootstrap bootstrap = new ServerBootstrap(); | |
// 可链式 | |
bootstrap | |
.group(bossGroup, workerGroup) // 指定事件循环组 | |
.channel(NioServerSocketChannel.class) // 指定为 NIO 的 ServerSocketChannel | |
.childHandler(new ChannelInitializer<SocketChannel>() { | |
//SocketChannel 不是 NIO 里面的,是 Netty 的 | |
@Override | |
protected void initChannel(SocketChannel channel) { | |
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) { | |
//ctx 是上下文,msg 是收到的消息,默认以 ByteBuf 形式(也可以是其他形式,后面再说) | |
ByteBuf buf = (ByteBuf) msg; // 类型转换一下 | |
System.out.println(Thread.currentThread().getName() + | |
" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); | |
// 通过上下文可以直接发送数据回去,注意要 writeAndFlush 才能让客户端立即收到 | |
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes())); | |
} | |
}); | |
} | |
}); | |
// 最后绑定端口,启动 | |
bootstrap.bind(8080); | |
} |
客户端:
public static void main(String[] args) { | |
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080)); | |
Scanner scanner = new Scanner(System.in)){ | |
System.out.println("已连接到服务端!"); | |
while (true) { | |
System.out.println("请输入要发送给服务端的内容:"); | |
String text = scanner.nextLine(); | |
if(text.isEmpty()) continue; | |
channel.write(ByteBuffer.wrap(text.getBytes())); | |
System.out.println("已发送!"); | |
ByteBuffer buffer = ByteBuffer.allocate(128); | |
channel.read(buffer); // 直接从通道中读取数据 | |
buffer.flip(); | |
System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining())); | |
} | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} |
服务端很多陌生的类,这就是我们之后要讲解的,客户端的代码应该很熟悉了。
服务端前两行代码,一个是创建 bossGroup 和 workerGroup,另一个是创建服务端启动引导类 ServerBootstrap。(bootstrap 翻译为引导程序)。
看后面的链式代码就知道,我们通过 ServerBootstrap 配置相关信息。比如设置 group,channel,handler 等。
# Channel 通道
Netty 的 Channel 没有直接使用 NIO 的 Channel,通道支持 IO 操作,但是所有的 IO 操作都是异步的(Netty)。意味着任何 IO 调用都将立即返回,但不能保证请求的 IO 操作在调用时已经完成。它会返回一个 ChannelFuture 实例(在 JUC 里面我们就将结果关于这种异步获取结果的 FutureTask 类),该实例将在请求的 IO 操作成功,失败或取消时通知你。
关于 Channel
,在 Netty 中,还有 ChannelHandler
和 ChannelPipeline
两个组件。
- ChannelHandler 负责 Channel 的逻辑处理,其实就是 Reactor 里面的 Handler。
- ChannelPipeline 负责管理 ChannelHandler,他是一个流水线容器,存储结构为双向链表。
- 从上面的代码可以看出,通过链式代码,我们不断将
ChannelInboundHandlerAdapter
(实现了 ChannelHandler)加入到 Pipe 中
一个
Channel
包含一个ChannelPipeline
,所有ChannelHandler
都会顺序加入到ChannelPipeline
中,创建Channel
时会自动创建一个ChannelPipeline
;每个Channel
都会有一个ChannelPipeline
,这个关联关系是永久性的。
关系图如下:
其实
ChannelPipeline
维护的是由ChannelHandlerContext
组成的双向链表,只是每一个ChannelHandlerContext
又关联着一个ChannelHandler
。
我们现在来看一下源码
Channel
源码,里面方法比较多,这里我们只看几个重要的
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { | |
ChannelId id(); // 通道 ID | |
// 获取此通道所属的 EventLoop,因为一个 Channel 在它的 | |
// 生命周期内只能注册到一个 EventLoop 中 | |
EventLoop eventLoop(); | |
//Channel 是具有层级关系的,这里是返回父 Channel | |
Channel parent(); | |
// 通道当前的相关状态 | |
boolean isOpen(); | |
boolean isRegistered(); | |
boolean isActive(); | |
// 关闭通道,但是会用到 ChannelFuture | |
ChannelFuture closeFuture(); | |
// 流水线,例子中我们就是通过 pipeline 拿到管道, | |
// 然后加入 ChannelHandler | |
ChannelPipeline pipeline(); | |
// 可以直接从 Channel 拿到 ByteBufAllocator 的实例,来分配 ByteBuf | |
ByteBufAllocator alloc(); | |
Channel read(); | |
Channel flush(); // 刷新,基操 | |
} |
其中 pipeline()
方法在例子中使用到了,我们再来看一下 ChannelHandler
。该接口定义了一些负责完成具体请求的方法,例子中使用到的 ChannelInboundHandlerAdapter
抽象类是 ChannelInboundHandler
的接口实现,用于处理入站数据(Inbound 表示入站,Outbound 表示出站),抽象类重写对应方法进行请求处理,这些方法会在合适的时间被调用。
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) { | |
//ctx 是上下文,msg 是收到的消息,以 ByteBuf 形式 | |
ByteBuf buf = (ByteBuf) msg; // 类型转换一下 | |
System.out.println(Thread.currentThread().getName() | |
+" >> 接收到客户端发送的数据:" | |
+buf.toString(StandardCharsets.UTF_8)); | |
// 通过上下文可以直接发送数据回去,注意要 writeAndFlush 才能让客户端立即收到 | |
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes())); | |
} | |
}); |
我们先看顶层接口 ChannelHandler
:只有一些流水线相关的回调方法
public interface ChannelHandler { | |
// 当 ChannelHandler 被添加到流水线中时调用 | |
void handlerAdded(ChannelHandlerContext var1) throws Exception; | |
// 当 ChannelHandler 从流水线中移除时调用 | |
void handlerRemoved(ChannelHandlerContext var1) throws Exception; | |
// 还有一个过时方法,不管 | |
} |
看下一级接口 ChannelInboundHandler
:
//ChannelInboundHandler 用于处理入站相关事件 | |
public interface ChannelInboundHandler extends ChannelHandler { | |
/** 当 Channel 已经注册到自己的 EventLoop 上时调用,前面我们说了, | |
一个 Channel 只会注册到一个 EventLoop 上,注册到 EventLoop 后, | |
这样才会在发生对应事件时被通知。*/ | |
void channelRegistered(ChannelHandlerContext var1) throws Exception; | |
// 从 EventLoop 上取消注册时 | |
void channelUnregistered(ChannelHandlerContext var1) throws Exception; | |
// 当 Channel 已经处于活跃状态时被调用,此时 Channel 已经连接 / 绑定,并且已经就绪 | |
void channelActive(ChannelHandlerContext var1) throws Exception; | |
// 跟上面相反,不再活跃了,并且不在连接它的远程节点 | |
void channelInactive(ChannelHandlerContext var1) throws Exception; | |
// 当从 Channel 读取数据时被调用,可以看到数据被自动包装成了一个 Object(默认是 ByteBuf) | |
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception; | |
// 上一个读取操作完成后调用 | |
void channelReadComplete(ChannelHandlerContext var1) throws Exception; | |
// 暂时不介绍 | |
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception; | |
// 当 Channel 的可写状态发生改变时被调用 | |
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception; | |
// 出现异常时被调用 | |
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception; | |
} |
现在我们就可以看到, ChannelInboundHandler
负责处理请求,是事件驱动的。上面用到的 ChannelInboundHandlerAdapter
实际上就是对这些方法实现的抽象类,相比直接用接口,我们可以只重写我们需要的方法,没有重写的方法会默认向流水线下一个 ChannelHandler
发送。
与之对应的还有
ChannelOutboundHandler
,用于处理出站相关操作,此处不再讲解
最后我们看一下 ChannelPipeline
,这个双向链表容器保存著 ChannelHandler
,里面的 ChannelHandler
顺序存储,从客户端发送过来的请求,入站会依次经过 ChannelHandler
,直到它被某一个 handler
完成请求,然后出站时会经过处理出站的 ChannelHandler
。
比如我们希望创建两个入站的 ChannelHandler
,一个用于接收请求并处理,一个用于处理当前接收请求过程中出现的异常。
.childHandler(new ChannelInitializer<SocketChannel>() { | |
@Override | |
protected void initChannel(SocketChannel channel) { | |
channel.pipeline() | |
.addLast(new ChannelInboundHandlerAdapter(){ | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | |
ByteBuf buf = (ByteBuf) msg; | |
System.out.println("接收到客户端发送数据:"+ | |
buf.toString(StandardCharsets.UTF_8)); | |
// 此处故意人为抛出异常 | |
throw new RuntimeException("我是异常"); | |
} | |
}) | |
.addLast(new ChannelInboundHandlerAdapter(){ | |
@Override | |
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { | |
System.out.println("处理异常:" + cause); | |
} | |
}); | |
} | |
} |
如果我们不在 ChannelHandler
重写对应方法,他就会默认传播到流水线下一个 ChannelHandler
。也有人为传递
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { | |
// 通过 ChannelHandlerContext 来向下传递,ChannelHandlerContext | |
// 是在 Handler 添加进 Pipeline 中时就被自动创建的 | |
ctx.fireExceptionCaught(cause); | |
} |
关于通道,最后我们看一下出站的 ChannelHandler
相关的机制
@Override | |
protected void initChannel(SocketChannel channel) { | |
channel.pipeline() // 直接获取 pipeline,然后添加两个 Handler | |
.addLast(new ChannelInboundHandlerAdapter(){ | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | |
ByteBuf buf = (ByteBuf) msg; | |
System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); | |
ctx.fireChannelRead(msg); | |
} | |
}) | |
.addLast(new ChannelInboundHandlerAdapter(){ | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | |
ByteBuf buf = (ByteBuf) msg; | |
System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); | |
ctx.channel().writeAndFlush("伞兵一号卢本伟"); // 这里我们使用 channel 的 write | |
} | |
}) | |
.addLast(new ChannelOutboundHandlerAdapter(){ | |
@Override | |
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { | |
System.out.println("1号出站:"+msg); | |
} | |
}) | |
.addLast(new ChannelOutboundHandlerAdapter(){ | |
@Override | |
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { | |
System.out.println("2号出站:"+msg); | |
// 继续 write 给其他的出站 Handler,不然到这里就断了 | |
ctx.write(msg); | |
} | |
}); | |
} |
重点就是按照顺序存储,但是第一个 ChannelOutbindHandlerAdapter
并没有 write
方法。这是因为当我们入站的 ChannelHandler
完成操作后(比如写请求,就需要有一个 ChannelHandler
调用 write
方法),就会从当前位置倒着找 ChannelOutbindHandler
,找到第一个,再调用。
# 参考
知一码园:https://www.zhiyi.zone/netty/Channel.html
Netty Channel 组件作用:https://www.programminghunter.com/article/12712042451/
语雀・青空の霞光:https://www.yuque.com/qingkongxiaguang/javase/ibx6ug#f443a934