传统的网络编程如下
public void server(int port) throws Exception {
//将服务器绑定到指定端口
ServerSocket socket = new ServerSocket(port);
try {
for (;;){
//接受连接
Socket accept = socket.accept();
//创建一个线程来处理这个连接 并启动线程
new Thread(() -> {
OutputStream out;
try {
//回写消息给客户端
out = accept.getOutputStream();
out.write("XXXXXX".getBytes(Charset.forName("UTF-8")));
out.flush();
accept.close();
} catch (Exception e){
e.printStackTrace();
} finally {
try {
accept.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
} catch (Exception e){
e.printStackTrace();
}
}
socket.accept()一直阻塞,直到连接建立成功,就创建一个新的线程处理此次连接事件。这样是不是很浪费资源呢?来一起看下Netty是怎么做的吧
public static void main(String[] args) throws Exception {
//netty模型 一个bossgroup 和一个workergroup
//bossgroup处理accept连接
//workergroup处理read和warit
//创建的线程组内的子线程数默认为cpu内核数*2. 子线程为NioEventLoop
// 每个NioEventLoop都有自己的selector
NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
NioEventLoopGroup childGroup = new NioEventLoopGroup();
//创建服务器端启动器,并设置参数, worker子线程默认使用轮询的方式进行负载
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(parentGroup, childGroup)//加入两个线程组
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道(channel), 如果使用OIO传输的话就使用OioServerSocketChannel.class
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列的连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true)//一直保持连接
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道对象
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ServerHandler());
}
});
//绑定端口 同时启动服务器
ChannelFuture cf = bootstrap.bind(8881).sync();
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isDone()){
System.out.println("do ....");
}
}
});
//监听关闭事件
cf.channel().closeFuture().sync();
} finally {
//最终关机
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
netty使用线程池,一个管理连接,一个复杂读写。并且为每种传输的实现都暴露了相同的API,所以无论选择哪一种传输,所写的代码基本上是不会受影响的。之前传统的IO如果切换成NIO的话,代码基本上就等于重构了。接下来让我们一起看下传输的API吧
1.传输API
每个Channel都会分配给一个ChannelPipeline和ChannelConfig,ChannelConfig包含了该Channel的所有配置,并且支持热更新。
由于每个Channel都是独一无二的,所以将Channel声明为Comparable的一个子接口,当两个Channel返回的HashCode一样时,那么AbstractChannel的compleTo方法就会报错。
ChannelPipeline拥有所有处理初入站的ChannelHandler的实例,这些实例的组成实现了应用程序处理数据的逻辑。其中ChannelHandler的典型用途如下:
- 加解码
- 提供异常通知
- 提供channel变为活动或者非活动状态通知
- 提供当channel注册到EventLoop或者从EventLoop注销时的通知
- 提供有关用户自定义事件的通知
2.内置的传输
netty内置了一些开箱即用的传输,但是这些传输并不是支持每一种协议;比如内置的Epoll就只能在Linux上支持,并且不支持SCTP协议。
2.1.NIO——非阻塞I/O
OIO这里就不再说明了。
NIO是jdk1.4版本引入的,在Netty中是支持的。是基于Selector的API。
选择器背后的基本概念是充当一个注册表,在那里我们可以得到请求在channel的状态发送改变时的通知。状态的变化有:
- 新的channel被接受并且就绪
- channel连接成功
- channel有已经就绪的可供读取的数据
- channel可用于写数据
选择器运行在一个简称状态变化并对其做出相应处理的线程上,在应用程序对状态的改变做出处理后,选择器会被重置,并重复这个过程。
名称 | 描述 |
OP_ACCPET | 请求在接受新连接并创建channel时获得通知 |
OP_CONNNECT | 请求在建议一个新连接是获得通知 |
OP_READ | 请求当数据已经就绪,可以从channel中读取数据时获取通知 |
OP_WRITE | 请求当可以向channel中写数据时获得通知 |
内部细节处理流程如下:
2.2.Epoll——用于Linux的本地非阻塞传输
这目前只能用于Linux机器上,且内核版本要高于2.5.44。代码层面只需要将NioEventLoopGroup缓存EpollEventLoopGroup、NIOServerSocketChannel.class缓存EpollServerSocketChannel.class即可;在性能和速度方面要优于Nio
2.3.用于JVM内部通信的Local传输
就是客户端没有绑定服务器地址,和服务器在同一台机器上且在一个JVM上。传输不受网络流量的控,所以它并不能够和其他传输实现进行互操作。
@Test
public void testEncoder(){
byte[] bytes = "test Embedded".getBytes(CharsetUtil.UTF_8);
MyProtocl myProtocl = new MyProtocl(bytes.length, bytes);
EmbeddedChannel channel = new EmbeddedChannel(new MyEncoder());
channel.writeOutbound(myProtocl);
ByteBuf byteBuf = (ByteBuf) channel.readOutbound();
byte[] bytes1 = new byte[byteBuf.readInt()];
byteBuf.readBytes(bytes1);
System.out.println(new String(bytes, CharsetUtil.UTF_8));
}
2.4.Embedded传输
这是Netty提供的一种额外传输,主要用于ChannelHandler实现测试,也就是所谓的单元测试
3.总结
应用程序的需求 | 推荐的传输 |
非阻塞代码库或者一个常规的 | NIO或者在Linux上使用epoll |
阻塞代码库 | OIO |
在同一个jvm内部通信 | Local |
测试ChannelHandler的实现 | Embedded |