Netty之传输(3)

  • Post author:
  • Post category:其他


传统的网络编程如下

public void  server(int port) throws Exception {
        //将服务器绑定到指定端口
        ServerSocket socket = new ServerSocket(port);
        try {
            for (;;){
                //接受连接
                Socket accept = socket.accept();
                //创建一个线程来处理这个连接 并启动线程
                new Thread(() -> {
                    OutputStream out;
                    try {
                        //回写消息给客户端
                        out = accept.getOutputStream();
                        out.write("XXXXXX".getBytes(Charset.forName("UTF-8")));
                        out.flush();
                        accept.close();
                    } catch (Exception e){
                        e.printStackTrace();
                    } finally {
                        try {
                            accept.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }

socket.accept()一直阻塞,直到连接建立成功,就创建一个新的线程处理此次连接事件。这样是不是很浪费资源呢?来一起看下Netty是怎么做的吧

public static void main(String[] args) throws Exception {
        //netty模型  一个bossgroup 和一个workergroup
        //bossgroup处理accept连接
        //workergroup处理read和warit
        //创建的线程组内的子线程数默认为cpu内核数*2. 子线程为NioEventLoop
        // 每个NioEventLoop都有自己的selector
        NioEventLoopGroup parentGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup childGroup = new NioEventLoopGroup();

        //创建服务器端启动器,并设置参数, worker子线程默认使用轮询的方式进行负载
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            bootstrap.group(parentGroup, childGroup)//加入两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道(channel), 如果使用OIO传输的话就使用OioServerSocketChannel.class
                    .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列的连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true)//一直保持连接
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道对象

                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            sc.pipeline().addLast(new ServerHandler());
                        }
                    });
            //绑定端口 同时启动服务器
            ChannelFuture cf = bootstrap.bind(8881).sync();

            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isDone()){
                        System.out.println("do ....");
                    }
                }
            });

            //监听关闭事件
            cf.channel().closeFuture().sync();
        } finally {
            //最终关机
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }

    }

netty使用线程池,一个管理连接,一个复杂读写。并且为每种传输的实现都暴露了相同的API,所以无论选择哪一种传输,所写的代码基本上是不会受影响的。之前传统的IO如果切换成NIO的话,代码基本上就等于重构了。接下来让我们一起看下传输的API吧

1.传输API

每个Channel都会分配给一个ChannelPipeline和ChannelConfig,ChannelConfig包含了该Channel的所有配置,并且支持热更新。

由于每个Channel都是独一无二的,所以将Channel声明为Comparable的一个子接口,当两个Channel返回的HashCode一样时,那么AbstractChannel的compleTo方法就会报错。

ChannelPipeline拥有所有处理初入站的ChannelHandler的实例,这些实例的组成实现了应用程序处理数据的逻辑。其中ChannelHandler的典型用途如下:

  • 加解码
  • 提供异常通知
  • 提供channel变为活动或者非活动状态通知
  • 提供当channel注册到EventLoop或者从EventLoop注销时的通知
  • 提供有关用户自定义事件的通知

2.内置的传输

netty内置了一些开箱即用的传输,但是这些传输并不是支持每一种协议;比如内置的Epoll就只能在Linux上支持,并且不支持SCTP协议。

2.1.NIO——非阻塞I/O

OIO这里就不再说明了。

NIO是jdk1.4版本引入的,在Netty中是支持的。是基于Selector的API。

选择器背后的基本概念是充当一个注册表,在那里我们可以得到请求在channel的状态发送改变时的通知。状态的变化有:

  • 新的channel被接受并且就绪
  • channel连接成功
  • channel有已经就绪的可供读取的数据
  • channel可用于写数据

选择器运行在一个简称状态变化并对其做出相应处理的线程上,在应用程序对状态的改变做出处理后,选择器会被重置,并重复这个过程。

名称 描述
OP_ACCPET 请求在接受新连接并创建channel时获得通知
OP_CONNNECT 请求在建议一个新连接是获得通知
OP_READ 请求当数据已经就绪,可以从channel中读取数据时获取通知
OP_WRITE 请求当可以向channel中写数据时获得通知

内部细节处理流程如下:

2.2.Epoll——用于Linux的本地非阻塞传输

这目前只能用于Linux机器上,且内核版本要高于2.5.44。代码层面只需要将NioEventLoopGroup缓存EpollEventLoopGroup、NIOServerSocketChannel.class缓存EpollServerSocketChannel.class即可;在性能和速度方面要优于Nio

2.3.用于JVM内部通信的Local传输

就是客户端没有绑定服务器地址,和服务器在同一台机器上且在一个JVM上。传输不受网络流量的控,所以它并不能够和其他传输实现进行互操作。

@Test
    public void testEncoder(){
        byte[] bytes = "test Embedded".getBytes(CharsetUtil.UTF_8);
        MyProtocl myProtocl = new MyProtocl(bytes.length, bytes);
        EmbeddedChannel channel = new EmbeddedChannel(new MyEncoder());
        channel.writeOutbound(myProtocl);
        ByteBuf byteBuf = (ByteBuf) channel.readOutbound();
        byte[] bytes1 = new byte[byteBuf.readInt()];
        byteBuf.readBytes(bytes1);
        System.out.println(new String(bytes, CharsetUtil.UTF_8));
    }

2.4.Embedded传输

这是Netty提供的一种额外传输,主要用于ChannelHandler实现测试,也就是所谓的单元测试

3.总结

应用程序的需求 推荐的传输
非阻塞代码库或者一个常规的 NIO或者在Linux上使用epoll
阻塞代码库 OIO
在同一个jvm内部通信 Local
测试ChannelHandler的实现 Embedded



版权声明:本文为shehuinibingge原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。