1 定义
每个网络应用程序都必须定义如何解析在两个节点之间
来回传输的原始字节
,以及如何将其和目标应用程序的数据格式
做相互转换
。这种转换逻辑由编解码器
处理。
编解码器由编码器
和解码器
组成,它们每种都可以将字节流
从一种格式转换为另一种格式。
2 解码器
负责将入站数据
从一种格式转换到另一种格式的,所以Netty的解码器实现了ChannelInboundHandler
。
- 将字节解码为消息:
ByteToMessageDecoder
和ReplayingDecoder
; - 将一种消息类型解码为另一种:
MessageToMessageDecoder
。
每当需要为
ChannelPipeline
中的下一个ChannelInboundHandler
转换入站数据
时会用到。
2.1 抽象类 ByteToMessageDecoder
ByteToMessageDecoder
是Netty将字节
解码为消息
的基类。下表给出了它最重要的两个方法。
方法 | 描述 |
---|---|
decode(ChannelHandlerContext ctx, ByteBuf in, List | decode() 方法被调用时将会传入一个包含了传入数据的 ByteBuf ,以及一个用来添加解码消息的 List 。 |
decodeLast(ChannelHandlerContext ctx, ByteBuf in, List | Netty提供的这个默认实现只是简单地调用了decode() 方法。 当Channel的状态变为非活动时,这个方法将会被调用一次。 可以重写该方法以提供特殊的处理 |
我自己实现聊天应用时,利用Protocol Buffer
实现了消息的序列化。对于消息头,我进行了一些加工,比如添加了一个魔数(short),版本(short),然后才是消息的长度(int)。因此,对于消息头处理时,需要判断及读取特定长度的数据。
public class ProtocolBufferDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 标记一下当前的readIndex的位置
in.markReaderIndex();
// 判断包头长度
if (in.readableBytes() < 8) {
// 不够包头
return;
}
//读取魔数
short magic = in.readShort();
if (magic != ProtoInstant.MAGIC_CODE) {
String error = "客户端口令不对:" + ctx.channel().remoteAddress();
throw new BusinessException(error);
}
//读取版本
short version = in.readShort();
// 读取传送过来的消息的长度。
int length = in.readInt();
//此处省略一万字符......
}
}
虽然 ByteToMessageDecoder 使得可以很简单地实现这种模式,但是你可能会发现,在调用 readInt()
方法前不得不验证所输入的 ByteBuf 是否具有足够的
数据有点繁琐。因而,Netty也提供了更加简便的解码器。
2.2 抽象类 ReplayingDecoder
ReplayingDecoder
扩展了ByteToMessageDecoder类
,使得我们不必调用readableBytes()方法
。它通过使用一个自定义的ByteBuf实现
,ReplayingDecoderByteBuf
,包装传入的ByteBuf实现了这一点,其将在内部执行调用readableBytes()方法
。
这个类的完整声明是:
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
类型参数
S
指定了用于状态管理
的类型,其中 Void
代表不需要状态管理。请注意 ReplayingDecoderByteBuf 的下面这些方面:
- 并不是所有的 ByteBuf 操作都被支持,如果调用了一个不被支持的方法,将会抛出一个
UnsupportedOperationException
; ReplayingDecoder
稍慢于ByteToMessageDecoder
。
在真实的、更加复杂的情况下,使用一种或者另一种作为基类所带来的差异可能是很显著的。这里有一个简单的准则:如果使用
ByteToMessageDecoder
不会引入太多的复杂性,那么请使用它;否则,请使用ReplayingDecoder
。
2.3 抽象类 MessageToMessageDecoder
完整声明如下:
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
类型参数 I
指定了 decode()方法
的输入参数 msg
的类型,它是你必须实现的唯一方法。如下表所示:
方法 | 描述 |
---|---|
decode(ChannelHandlerContext ctx, I msg, List | 对于每个需要被解码为另一种格式的入站消息来说,该方法都将会被调用。解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。 |
比如IntegerToStringDecoder
用于将integer
类型转化为String
类型。如下所示:
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
@Override
public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
out.add(String.valueOf(msg));
}
}
2.4 TooLongFrameException 类
由于 Netty 是一个异步框架
,所以需要在字节可以解码之前在内存中缓冲
它们。因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty 提供了 TooLongFrameException
类,其将由解码器在帧超出指定的大小限制时抛出。
为了避免这种情况,你可以设置一个最大字节数的阈值
,如果超出该阈值,则会导致抛出一个 TooLongFrameException
(随后会被 ChannelHandler.exceptionCaught()
方法捕获)。然后,如何处理该异常则完全取决于该解码器的用户。某些协议(如 HTTP)可能允许你返回一个特殊的响应。而在其他的情况下,唯一的选择可能就是关闭对应的连接
。
下面代码展示了 ByteToMessageDecoder
是如何使用 TooLongFrameException
来通知 ChannelPipeline
中的其他 ChannelHandler
发生了帧大小溢出的。需要注意的是,如果你正在使用一个可变帧大小的协议
,那么这种保护措施将是尤为重要
的。
public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
private static final int MAX_FRAME_SIZE = 1024;
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readable = in.readableBytes();
if (readable > MAX_FRAME_SIZE) {
in.skipBytes(readable);
}
throw new TooLongFrameException("Frame too big!");
// do something ...
}
}
3 编码器
与解码器相反,其实现了 ChannelOutboundHandler
,并将出站数据从一种格式转换为另一种格式。Netty 提供了一组类,用于帮助你编写具有以下功能的编码器:
- 将消息编码为字节
- 将消息编码为消息
3.1 抽象类 MessageToByteEncoder
其API如下表所示:
方法 | 描述 |
---|---|
encode(ChannelHandlerContext ctx, I msg, ByteBuf out) | encode() 方法是你需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为 ByteBuf 的(类型为 I 的)出站消息 。该 ByteBuf 随后将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler |
这个类只有一个方法,而解码器有两个。原因是解码器
通常需要在 Channel 关闭之后
产生最后一个消息(因此也就有了 decodeLast()
方法)。这显然不适用
于编码器
的场景——在连接被关闭之后仍然产生一个消息是毫无意义的
。
3.2 抽象类 MessageToMessageEncoder
方法 | 描述 |
---|---|
encode(ChannelHandlerContext ctx, I msg, List | 这是你需要实现的唯一方法。每个通过 write() 方法写入的消息都将会被传递给 encode() 方法,以编码为一个或者多个出站消息 。随后,这些出站消息将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler |
4 抽象的编解码器类
我们一直将解码器和编码器作为单独的实体讨论,但是你有时将会发现在同一个类中管理入站和出站数据和消息的转换是很有用的。Netty 的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器/编码器对。这些类同时实现了 ChannelInboundHandler
和 ChannelOutboundHandler
接口。
为什么我们并没有一直优先于单独的解码器和编码器使用这些复合类呢? 因为通过尽可能地将这两种功能分开,最大化了代码的可重用性
和可扩展性
,这是 Netty 设计的一个基本原则。
相关的类:
- 抽象类
ByteToMessageCodec
- 抽象类
MessageToMessageCodec
4.1 ByteToMessageCodec
方法名称 | 描述 |
---|---|
decode(ChannelHandlerContext ctx, ByteBuf in, List | 只要有字节可以被消费,这个方法就将会被调用。它将入站 ByteBuf 转换为指定的消息格式,并将其转发给 ChannelPipeline 中的下一个 ChannelInboundHandler |
decodeLast(ChannelHandlerContext ctx, ByteBuf in, List | 这个方法的默认实现委托给了 decode() 方法。它只会在 Channel 的状态变为非活动时被调用一次。它可以被重写以实现特殊的处理 |
encode(ChannelHandlerContext ctx, I msg, ByteBuf out) | 对于每个将被编码并写入出站 ByteBuf 的(类型为 I 的)消息来说,这个方法都将会被调用 |
4.2 MessageToMessageCodec
MessageToMessageCodec
是一个参数化的类,定义如下:
public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
方法名称 | 描述 |
---|---|
protected abstract decode( ChannelHandlerContext ctx, INBOUND_IN msg, List | 这个方法被调用时会被传入 INBOUND_IN 类型的消息。 它将把它们解码为 OUTBOUND_IN 类型的消息,这些消息将被转发给 ChannelPipeline 中的下一个 ChannelInboundHandler |
protected abstract encode( ChannelHandlerContext ctx, OUTBOUND_IN msg, List | 对于每个 OUTBOUND_IN 类型的消息,这个方法都将会被调用。这些消息将会被编码为 INBOUND_IN 类型的消息,然后被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler |
5 内置的编解码器
5.1 通过SSL/TLS 保护Netty 应用程序
为了支持 SSL/TLS
,Java 提供了 javax.net.ssl
包,它的 SSLContext
和 SSLEngine
类使得实现解密和加密相当简单直接。Netty 通过一个名为 SslHandler
的 ChannelHandler
实现利用了这个 API
,其中 SslHandler
在内部使用 SSLEngine
来完成实际的工作。
在大多数情况下,SslHandler
将是ChannelPipeline
中的第一个ChannelHandler
。
5.2 HTTP 系列
HTTP 是基于请求/响应
模式的:客户端向服务器发送一个 HTTP 请求
,然后服务器将会返回一个 HTTP 响应
。Netty 提供了多种编码器和解码器以简化对这个协议的使用。
一个HTTP请求/响应
可能由多个数据部分组成,FullHttpRequest
和FullHttpResponse
消息是特殊的子类型,分别代表了完整的请求和响应。所有类型的 HTTP 消息(FullHttpRequest
、LastHttpContent
等等)都实现了 HttpObject 接口
。
下表概要地介绍了处理和生成这些消息的 HTTP 解码器和编码器。
名称 | 描述 |
---|---|
HttpRequestEncoder | 将 HttpRequest 、HttpContent 和 LastHttpContent 消息编码为字节 |
HttpResponseEncoder | 将 HttpResponse 、HttpContent 和 LastHttpContent 消息编码为字节 |
HttpRequestDecoder | 将字节 解码为 HttpRequest 、HttpContent 和 LastHttpContent 消息 |
HttpResponseDecoder | 将字节 解码为 HttpResponse 、HttpContent 和 LastHttpContent 消息 |
5.3 WebSocket
5.4 空闲的连接和超时
检测空闲连接
以及超时
对于及时释放资源
来说是至关重要的。由于这是一项常见的任务,Netty 特地为它提供了几个 ChannelHandler
实现。下表给出了它们的概述。
名称 | 描述 |
---|---|
IdleStateHandler | 当连接空闲时间太长时,将会触发一个 IdleStateEvent 事件。然后,你可以通过在你的 ChannelInboundHandler 中重写 userEventTriggered() 方法来处理该 IdleStateEvent 事件。 |
ReadTimeoutHandler | 如果在指定的时间间隔内没有收到任何的入站数据,则抛出一个 ReadTimeoutException 并关闭对应的 Channel 。可以通过重写 你的 ChannelHandler 中的 exceptionCaught() 方法来检测该 ReadTimeoutException 。 |
WriteTimeoutHandler | 如果在指定的时间间隔内没有任何出站数据写入,则抛出一个 WriteTimeoutException 并关闭对应的 Channel 。可以通过重写 你的 ChannelHandler 的 exceptionCaught() 方法检测该 WriteTimeoutException 。 |
5.5 序列化
5.5.1 JDK 序列化
JDK 提供了 ObjectOutputStream
和 ObjectInputStream
,用于通过网络对 POJO
的基本数据类型
和图
进行序列化
和反序列化
。
下表中列出了 Netty提供的用于和JDK进行互操作的序列化类。
名称 | 描述 |
---|---|
CompatibleObjectDecoder | 和使用 JDK 序列化的非基于 Netty 的远程节点进行互操作的解码器 。 |
CompatibleObjectEncoder | 和使用 JDK 序列化的非基于 Netty 的远程节点进行互操作的编码器 。 |
ObjectDecoder | 构建于 JDK 序列化之上的使用自定义的序列化来解码 的解码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取。 |
ObjectEncoder | 构建于 JDK 序列化之上的使用自定义的序列化来编码 的编码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取。 |
5.5.2 JBoss Marshalling 序列化
如果你可以自由地使用外部依赖,那么JBoss Marshalling
将是个理想的选择:它比JDK序列化最多快 3 倍
,而且也更加紧凑
。
JBoss Marshalling 是一种可选的序列化 API,它修复了在 JDK 序列化 API 中所发现的许多问题,同时保留了与
java.io.Serializable
及其相关类的兼容性,并添加 了几个新的可调优参数以及额外的特性,所有的这些都是可以通过工厂配置(如外部序列化器、类/实例查找表、类解析以及对象替换等)实现可插拔的。
Netty 通过下表所示的两组解码器/编码器对为 Boss Marshalling
提供了支持。
名称 | 描述 |
---|---|
CompatibleMarshallingDecoder/CompatibleMarshallingEncoder | 与只使用 JDK 序列化的远程节点兼容 |
MarshallingDecoder/MarshallingEncoder | 适用于使用 JBoss Marshalling 的节点。这些类必须一起使用 |
5.5.3 Protocol Buffers
Protocol Buffers
是一种由Google
公司开发的、现在已经开源的数据交换格式。它以一种紧凑
而高效
的方式对结构化的数据进行编码以及解码。它具有许多的编程语言绑定,使得它很适合跨语言的项目。
下表展示了 Netty 为支持 protobuf
所提供的 ChannelHandler 实现
。
名称 | 描述 |
---|---|
ProtobufDecoder | 使用 protobuf 对消息进行解码 |
ProtobufEncoder | 使用 protobuf 对消息进行编码 |
ProtobufVarint32FrameDecoder | 根据消息中的 Google Protocol Buffers 的Base 128 Varints整型长度字段值 动态地分割 所接收到的 ByteBuf |
ProtobufVarint32LengthFieldPrepender | 向ByteBuf 前追加一个Google Protocol Buffers 的Base 128 Varints整型 的长度字段值 |