1 环境配置
要运行起Netty Demo,主要依赖于JDK
和Maven
,具体怎么安装可自行Google。
本机环境列表:
JDK: 1.8.0_112
Maven: 3.5.3
IDE: IntelliJ IDEA 2021.3 (Ultimate Edition)
OS: MacOS 10.15
2 Demo
2.1 Server
Server端至少需要以下组件:
- 引导:
ServerBootstrap
- 至少一个
ChannelHandler
2.1.2 ServerBootstrap
先上代码。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* @author zyxelva
* @date 2022/08/25
* @description 基于Netty的服务器
*/
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
int port = 9999;
EchoServer echoServer = new EchoServer(port);
System.out.println("服务器即将启动");
echoServer.start();
System.out.println("服务器关闭");
}
public void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
try {
/*服务端启动必备*/
ServerBootstrap b = new ServerBootstrap();
b.group(group)
/*指定使用NIO的通信模式*/
.channel(NioServerSocketChannel.class)
/*指定监听端口*/
.localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(serverHandler);
}
});
/*异步绑定到服务器,sync()会阻塞到完成*/
ChannelFuture f = b.bind().sync();
/*阻塞当前线程,直到服务器的ServerChannel被关闭*/
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
- 新建
ServerBootstrap
实例 - 注册
EventLoopGroup
上一章节中,我们提到Netty Server端一般是有两个EventLoopGroup
,一个是Boss
,一个是Worker
,我们可以查看ServerBootstrap
的group
方法:public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { public ServerBootstrap group(EventLoopGroup group) { return this.group(group, group); } public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { //此处省略几行代码... } //此处省略一万个字符 }
可以看出,group
只有一个参数时,其实Boss
和Work
共用一个EventLoopGroup
。
注册IO的通信模式
前面介绍IO模型
时介绍过,这里Netty提供了以下几种:EpollServerSocketChannel
NioServerSocketChannel
OioServerSocketChannel
KQueueServerSocketChannel
:这个主要是在Unix
操作系统中,比如MacOS
。
绑定用于客户端连接使用的端口
- 注册
ChannelHandler
。两种方式:- 直接注册实现了
ChannelHandler
的实例 - 通过实现
ChannelInitializer
接口的实例注册
- 直接注册实现了
总结下,ServerBootstrap
要干的事儿:
new
一个ServerBootstrap
实例
绑定一个Channel
,Channel
可以是OIO
类型的,也可以是NIO
类型的
绑定端口,提供给客户端链接
注册服务于Channel
的所有请求的ChannelHandler
开启监听
2.1.3 ChannelHandler
ChannelHandler
是一个接口族的父接口
,它的实现负责接收并响应事件通知
。 在Netty 应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中。
因为你的Echo 服务器
会响应
传入的消息,所以它需要实现ChannelInboundHandler
接口,用来定义响应入站事件
的方法。这个简单的应用程序只需要用到少量的
这些方法,所以继承 ChannelInboundHandlerAdapter
类也就足够了,它提供了ChannelInboundHandler
的默认实现。
样例代码如下:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author zyxelva
*/
//EchoServerHandler 被标注为@Shareable,所以我们可以总是使用同样的实例
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server accept: " + in.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
我们感兴趣的方法是:
channelRead()
:对于每个传入的消息都要调用;channelReadComplete()
:通知ChannelInboundHandler
最后一次对channelRead()
的调用是当前批量读取中的最后一条
消息;exceptionCaught()
:在读取
操作期间,有异常抛出时会调用。
如果不捕获异常,会发生什么呢?
每个Channel
都拥有一个与之相关联的ChannelPipeline
,其持有一个ChannelHandler
的实例链
。在默认的情况下,ChannelHandler
会把对它的方法的调用转发给链中的下一个ChannelHandler
。因此,如果exceptionCaught()
方法没有被该链中的某处实现,那么所接收的异常将会被传递到ChannelPipeline 的尾端
并被记录。为此,你的应用程序应该提供至少有一个
实现了exceptionCaught()
方法的ChannelHandler
。
2.2 Client
老样子,先上代码:
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* @author zyxelva
* @date 2022/08/25
* @description 基于Netty的客户端
*/
public class EchoClient {
private final int port;
private final String host;
public EchoClient(int port, String host) {
this.port = port;
this.host = host;
}
public void start() throws InterruptedException {
//线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//客户端启动必备
Bootstrap b = new Bootstrap();
b.group(group)
//指定使用NIO的通信模式
.channel(NioSocketChannel.class)
//指定服务器的IP地址和端口
.remoteAddress(new InetSocketAddress(host, port))
//注册ChannelHandler
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
//异步连接到服务器,sync()会阻塞到完成
ChannelFuture f = b.connect().sync();
//阻塞当前线程,直到客户端的Channel被关闭
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient(9999, "127.0.0.1").start();
}
}
Client端启动与Server端类似,需要以下组件:
- 引导:
Bootstrap
- 至少一个
ChannelHandler
2.2.1 Bootstrap
- 新建
Bootstrap
实例
与Server端主要区别在于引导类是Bootstrap
,Client端需要的是Bootstrap实例
,且需要指定Server端
提供的地址
和端口
,以便于连接
。 - 注册IO的通信模式
- 指定服务器的
IP地址
和端口
- 注册
ChannelHandler
- 异步
连接
到服务器
2.2.2 ChannelHandler
如同服务器,客户端将拥有一个用来处理数据的ChannelInboundHandler
。在这个场景下,你将扩展SimpleChannelInboundHandler 类
以处理所有必须的任务,如下代码清单所示。这要求重写下面的方法:
channelActive()
:在到服务器的连接已经建立之后
将被调用;channelRead0()
:当从服务器接收
到一条消息时被调用;exceptionCaught()
:在处理过程中引发异常
时被调用。
实现代码:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @author zyxelva
* @date 2022/08/25
* @description EchoClientHandler
*/
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 读取到网络数据后进行业务处理
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept" + msg.toString(CharsetUtil.UTF_8));
}
/**
* channel活跃后,做业务处理
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Netty", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
SimpleChannelInboundHandler
与ChannelInboundHandler
为什么我们在客户端
使用的是SimpleChannelInboundHandler
,而不是在EchoServerHandler
中所使用的ChannelInboundHandlerAdapter
呢?这和两个因素的相互作用有关:业务逻辑如何处理消息以及Netty 如何管理资源。
在客户端
,当channelRead0()
方法完成时,你已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler
负责释放指向保存该消息的ByteBuf
的内存引用。
在EchoServerHandler
中,你仍然需要将传入消息回送给发送者,而write()
操作是异步的
,直到channelRead()
方法返回后可能仍然没有完成。为此,EchoServerHandler
扩展了ChannelInboundHandlerAdapter
,其在这个时间点上不会释放消息。
消息在EchoServerHandler
的channelReadComplete()
方法中,当writeAndFlush()
方法被调用时被释放。
总结下Client启动流程:
- 为初始化客户端,创建了一个
Bootstrap 实例
; - 为进行事件处理分配了一个
NioEventLoopGroup 实例
,其中事件处理包括创建新的连接
以及处理入站和出站数据
; - 为服务器连接创建了一个
InetSocketAddress 实例
; - 当连接被建立时,一个
EchoClientHandler 实例
会被安装到(该Channel 的)ChannelPipeline
中; - 在一切都设置完成后,调用
Bootstrap.connect()
方法连接
到远程节点;
完成了客户端,你便可以着手构建并测试该系统了。