# 简介
编解码器分为两部分,解码器负责入站(可以理解为将字节流转为原来的格式);编码器负责出站。
- 解码器负责入站操作,需要实现
ChannelInboundHandler
接口,解码器本质上也是ChannelInboundHandler
。 - 编码器负责出站操作,需要实现
ChannelOutboundhandler
接口,编码器本质上也是ChannelOutboundHandler
。
# 解码器
Netty 提供了一些常用的解码器,他们都继承了 ByteToMessageDecoder
类 :
RedisDecoder
基于 Redis 协议的解码器。XmlDecoder
基于 XML 格式的解码器。JosnObjectDecoder
基于 json 数据格式的解码器HttpObjectDecoder
: 基于 http 协议的解码器
Netty 也提供了 MessageToMeMessageDecoder
,将一种格式转化为另一种格式的解码器。下面的类都继承了该抽象类。
StringDecoder
将接收到ByteBuf
转化字符串ByteArrayDecoder
将接收到ByteBuf
转化为字节数组Base64Decoder
将由ByteBuf
或US-ASCII
字符串编码的 Base64 解码为ByteBuf
。
解码器本质就是 ChannelInboundHandler
接口,所以上面所有的类其实都是实现了该接口。
所以我们在流水线上就可以直接将解码器(编码器也是)当成 ChannelHandler
来使用
@Override | |
protected void initChannel(SocketChannel channel) { | |
channel.pipeline() | |
// 解码器本质上也算是一种 ChannelInboundHandlerAdapter,用于处理入站请求 | |
.addLast(new StringDecoder()) | |
.addLast(new ChannelInboundHandlerAdapter(){ | |
@Override | |
public void channelRead(ChannelHandlerContext ctx, Object msg) | |
throws Exception { | |
// 经过 StringDecoder 转换后,msg 直接就是一个字符串,所以打印就行了 | |
System.out.println(msg); | |
} | |
}); | |
} |
看一下 StringDecoder
类图
要使用自定义的解码器,我们可以继承 MessageToMeMessage
类,重写 decode
方法即可。
public class TestDecoder extends MessageToMessageDecoder<ByteBuf> { | |
@Override | |
protected void decode(ChannelHandlerContext channelHandlerContext, | |
ByteBuf buf, List<Object> list) throws Exception { | |
System.out.println("数据已收到,正在进行解码..."); | |
String text = buf.toString(StandardCharsets.UTF_8); // 直接转换为 UTF8 字符串 | |
list.add(text); | |
// 解码后需要将解析后的数据丢进 List 中,如果丢进去多个数据, | |
// 相当于数据被分成了多个,后面的 Handler 就需要每个都处理一次 | |
} | |
} |
# 编码器
Netty 提供了一些常用编码器类型,它们继承了 MessageToByteEncoder
类:
ObjectEncoder
:对象(需要实现Serializable
接口)编码成字节流。SocketMessageEncoder
:将SocketMessage
编码为字节流。HAProxyMessageEncoder
:将HAProxyMessage
编码成字节流。
Netty 也提供了 MessageToMessageEncoder
,将一种格式转化为另一种格式的编码器,下面类都是继承此类:
RedisEncoder
:将 Redis 协议的对象进行编码。StringEncoder
:将字符串进行编码操作。Base64Encoder
:将 Base64 字符串进行编码操作。
我们编写一下客户端的代码
public static void main(String[] args) { | |
Bootstrap bootstrap = new Bootstrap(); | |
bootstrap.group(new NioEventLoopGroup()) | |
.channel(NioSocketChannel.class) | |
.handler(new ChannelInitializer<SocketChannel>() { | |
@Override | |
protected void initChannel(SocketChannel socketChannel) throws Exception { | |
socketChannel.pipeline() | |
.addLast(new StringDecoder()) // 解码器 | |
.addLast(new ChannelInboundHandlerAdapter() { | |
@Override | |
public void channelRead(ChannelHandlerContext channelHandlerContext, | |
Object o) throws Exception { | |
System.out.println(">> 接收到客户端发送的数据:" + o); | |
} | |
}) | |
.addLast(new StringEncoder()); | |
} | |
}); | |
Channel channel = bootstrap.connect("localhost",8888).channel(); | |
try(Scanner in = new Scanner(System.in)) { | |
while (true) { | |
System.out.println("<< 请输入要发送给服务端的内容:"); | |
String text = in.nextLine(); | |
if(text.isEmpty()) continue; | |
channel.writeAndFlush(text); // 直接发送字符串就行 | |
} | |
} finally { | |
System.out.println("客户端断开连接"); | |
} | |
} |
# 编解码器
它是既继承了 ChannelInboundHandlerAdapter
也实现了 ChannelOutboundHandler
接口,又能处理出站也能处理入站请求,实际上就是将之前的给组合到一起了,比如我们也可以实现一个缝合在一起的 StringCodec
类:
// 需要指定两个泛型,第一个是入站的消息类型,一个是出站的消息类型,出站是 String 类型,我们要转成 ByteBuf | |
public class StringCodec extends MessageToMessageCodec<ByteBuf, String> { | |
@Override | |
protected void encode(ChannelHandlerContext ctx, String buf, List<Object> list) throws Exception { | |
System.out.println("正在处理出站数据..."); | |
list.add(Unpooled.wrappedBuffer(buf.getBytes())); // 同样的,添加的数量就是出站的消息数量 | |
} | |
@Override | |
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> list) throws Exception { | |
System.out.println("正在处理入站数据..."); | |
list.add(buf.toString(StandardCharsets.UTF_8)); // 和之前一样,直接一行解决 | |
} | |
} |
# 常用的解码器
# FixedLengthFrameDecoder
固定长度解码器:通过构造函数设置固定长度的大小 frameLength
,无论接收方依次获取多大的数据,都会严格按照该大小进行解码,如果积累读取到长度大小 frameLength
的消息,解码器才会处理。
// 有效解决粘包拆包问题 | |
channel.pipeline().addLast(new FixedLengthFrameDecoder(10)); |
# DelimiterBasedFrameDecoder
特殊分隔符解码器,其重要属性:
delimiters
:指定特殊分隔符,通过写入ByteBuf
作为参数传入。delimiters
的类型是ByteBuf
数组,所以可以同时指定多个分隔符。maxLength
:报文最大长度限制,如果超过该长度还没有检测到分隔符,就会抛出TooLongFrameException
异常,是对程序在极端情况下的一种保护。failFast
:设置failFast
可以控制抛出TooLongFrameException
的时机。如果failFast = true
,就会在超过maxLength
立即抛出。反之,就会等到解码出一个完整的消息才会抛出异常。StripDelimiter
:是判断解码后得到的消息是否去除分隔符
channel.pipeline() | |
.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer("!".getBytes()))) | |
// 指定一个特定的分隔符,比如我们这里以感叹号为分隔符 | |
// 在收到分隔符之前的所有数据,都作为同一个数据包的内容 |
也可以这么写
ByteBuf delimiter = Unpooled.copiedBuffer("&".getBytes()); | |
new DelimiterBasedFrameDecoder(10, true, true, delimiter); |
# LengthFieldBasedFrameDecoder
长度域解码器:解决 TCP 拆包 / 粘包问题最常用的解码器、基本可以覆盖大部分基于长度拆包场景。
属性:
// 长度字段的偏移量,也就是存放长度数据的起始位置 | |
private final int lengthFieldOffset; | |
// 长度字段所占用的字节数 | |
private final int lengthFieldLength; | |
/* 消息长度的修正值 | |
在很多较为复杂一些的协议设计中,长度域不仅仅包含消息的长度,而且包含其他的数据, | |
如版本号、数据类型、数据状态等,那么这时候我们需要使用 lengthAdjustment 进行修正 | |
lengthAdjustment = 包体的长度值 - 长度域的值 | |
*/ | |
private final int lengthAdjustment; | |
// 解码后需要跳过的初始字节数,也就是消息内容字段的起始位置 | |
private final int initialBytesToStrip; | |
// 长度字段结束的偏移量, | |
// lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength | |
private final int lengthFieldEndOffset; |
也有一些和上面两个解码器相似的属性。
其实,如果对计算机网络的 TCP 协议和报文段传输比较熟悉,就知道该解码器的工作机制
# 参考
语雀・青空の霞光:https://www.yuque.com/qingkongxiaguang/javase/ibx6ug#91071a42
51CTO:https://www.51cto.com/article/636349.html
技术文章摘抄
开箱即用:Netty 支持哪些常用的解码器?