【手把手】深挖IO(补充篇)

  • Post author:
  • Post category:其他


在上一篇【深挖 IO】中已经将各大主流的 IO 网络模型介绍完了(还没看过的小伙伴墙裂推荐去瞅一眼 → https://blog.csdn.net/FeenixOne/article/details/129157665 → 不然对这篇的内容可能会有那么一乃乃的影响),那么废话不多说,趁着 IO 的热乎劲儿还没过去,咱们直接来手写个 Netty 青春版玩玩。

首先声明,上一篇中的某些概念这里就不重复解释了,譬如:channel、bytebuffer、selector 等,在 netty 中,bytebuffer 就抽象成了 bytebuf 类,其实这就是一个缓冲区,有了缓冲区之后可以大大的提高数据的读写效率,这个缓冲区可以是堆内的,也可以是堆外的。那咱们就从这个 bytebuffer 开始着手,先来看看 bytebuffer 的内部是怎样的。

NIO客户端与服务端通信


bytebuffer内部状态

客户端和服务端进行数据交互的时候,都是从 bytebuffer 中去读写数据,那么这个 buffer 到底内部是怎么工作的,写段代码运行一下看看:

public class MyNetty {

    @Test
    public void myBytebuf() {
        // 使用 Java 的 ByteBuffer 只能传一个参数,但是 Netty 的 ByteBuf 需要传两个参数
        // 需要指定初始的大小和最大的大小,趋向于慢慢平滑过渡的一个过程
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(8, 20);
        print(buf);

        // read/write 会控制索引指针的移动;get/set 则不会
        buf.writeBytes(new byte[]{1, 2, 3, 4});
        print(buf);

        buf.writeBytes(new byte[]{1, 2, 3, 4});
        print(buf);

        buf.writeBytes(new byte[]{1, 2, 3, 4});
        print(buf);

        buf.writeBytes(new byte[]{1, 2, 3, 4});
        print(buf);

        // 第 5 次写入则达到了最大容量的边界
        buf.writeBytes(new byte[]{1, 2, 3, 4});
        print(buf);

        // 第 6 次写入会不会报错?
        // buf.writeBytes(new byte[]{1, 2, 3, 4});
        // print(buf);
    }

    public static void print(ByteBuf buf) {
        // 是否可读,里面有没有东西给你读
        System.out.println("buf.isReadable >> " + buf.isReadable());

        // 读取下标,从哪个位置开始读
        System.out.println("buf.readerIndex >> " + buf.readerIndex());

        // 能读多少字节出来
        System.out.println("buf.readableBytes >> " + buf.readableBytes());

        // 是否可写
        System.out.println("buf.isWritable >> " + buf.isWritable());

        // 写入下标,从哪个位置开始写
        System.out.println("buf.writerIndex >> " + buf.writerIndex());

        // 能写多少字节进去
        System.out.println("buf.writableBytes >> " + buf.writableBytes());

        // 实际动态边界有多大
        System.out.println("buf.capacity >> " + buf.capacity());

        // 最大动态边界有多大
        System.out.println("buf.maxCapacity >> " + buf.maxCapacity());

        // true-堆外分配;false-堆内分配
        System.out.println("buf.isDirect >> " + buf.isDirect());

        System.out.println("------------------------------------------");
    }


}

代码中对 buffer 进行了 5 次的读写,每一次都打印出 buffer 的各种状态,看下每次的结果如何

初始创建完成之后,buffer 中的状态为:

buf.isReadable >> false —- 不可读

buf.readerIndex >> 0 ——- 因为里面没东西,所以读取的下标从 0 开始

buf.readableBytes >> 0 —– 能读的字节内容也是 0

buf.isWritable >> true —– 可以写

buf.writerIndex >> 0 ——- 写入的下标也是从 0 开始

buf.writableBytes >> 8 —– 可写的容量是 8

buf.capacity >> 8 ———- 这个仅代表初始化的空间是 8

buf.maxCapacity >> 20 —— 可膨胀到最大 20 的空间

buf.isDirect >> true ——- 现在是堆外分配

开始第 1 次写入,写入后再来看看此时 buffer 的状态:

buf.isReadable >> true —– 刚刚写入了内容,现在可以读

buf.readerIndex >> 0 ——- 因为还没有读取过,所以指针起始还是从 0 开始读

buf.readableBytes >> 4 —– 写入了 4 bytes,所以能读出 4 bytes

buf.isWritable >> true —– 此时还可以往里面写入数据

buf.writerIndex >> 4 ——- 写入了 4 bytes,写入指针的位置移到 4,再写只能从 4 开始往后追加

buf.writableBytes >> 4 —– 还能再往里写 4 bytes,基于初始化后的 8 bytes 总空间而言

buf.capacity >> 8 ———- 这个仅代表初始化的空间是 8

buf.maxCapacity >> 20 —— 可膨胀到最大 20 的空间

buf.isDirect >> true ——- 现在是堆外分配

第 2 次写入之后,刚好将初始化的 8 bytes 填满,所以第 3 次写入之后,总数据即便超过了 8 bytes 有还是可读 12 bytes 数据。但是有一点很有意思的是,buf.capacity >> 16,也就是说从原先的 8 bytes 扩容到了 16 bytes,默认只是扩容原先 1 倍的空间,也不是直接扩容到 20 bytes,当然最大空间还是 20 bytes。

继续写 ~ 继续写 ~ 会一直动态将空间分配到 20 bytes,这个没问题比较简单,但是前 5 次将空间全部填满之后,第 6 次写入的时候会不会报错呢?

答案是肯定的, 会直接抛出越界异常,提醒已经超出最大空间:java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(4) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)

当然除了以上的 ByteBufAllocator.DEFAULT.buffer(8, 20); 这种分配方式之外,Netty 还引入了池化的概念,可以手动指定是否通过需要池化来提高 bytebuffer 的概念:

PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);

UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);

NIO客户端

在之前先简单介绍一个类:NioEventLoopGroup,这个类是 NIO 客户端模式的基础。这个类在构造的时候需要传入一个参数用来指定这个组中可用的线程数量:

NioEventLoopGroup selector = new NioEventLoopGroup(1);

当指定组中只有 1 个线程的时候,即使代码中写了两个线程分别去执行不同的任务,后面的的任务也因为没有线程而无法执行:

而当组中有 2 个线程的时候,就可以调用足够多的线程分别去执行任务:

有了这个知识点的铺垫之后,现在来正式的写 NIO 客户端代码:

    @Test
    public void clientMode() {
        // 创建循环事件组
        NioEventLoopGroup selector = new NioEventLoopGroup(1);

        // 创建客户端
        NioSocketChannel client = new NioSocketChannel();

        // 将客户端注册到循环事件组中
        // 类似 epoll_ctl(5, ADD, 3) 这么个系统调用
        selector.register(client);

        // 连接到服务端
        client.connect(new InetSocketAddress("192.168.126.129", 9090));

        // 将数据的字节流缓冲进 bytebuf 中
        ByteBuf buf = Unpooled.copiedBuffer("zhangsanfeng".getBytes());

        // 将 buf 中的数据刷写
        client.writeAndFlush(buf);


        System.out.println("client over");
    }

客户端有了,还差个服务端。使用虚拟机作为服务器,通过 nc -l 命令来监听 9090 端口:

nc -l 192.168.126.129 9090

nc 是 netcat 的简写,是一个功能强大的网络工具,有着网络界的瑞士军刀美誉。nc 命令在 Linux系统中实际命令是 ncat,nc 是软连接到 ncat。nc 命令的主要作用如下:

1、实现任意 TCP / UDP 端口的侦听,nc 可以作为 server 以 TCP 或 UDP 方式侦听指定端口

2、机器之间传输文件

3、端口的扫描,nc 可以作为 client 发起 TCP / UDP 连接

4、机器之间网络测速

运行客户端,给服务端发个数据,可以看到客户端的数据发了,也没报错,但是服务端这边原先好好的监听的状态已经退出:


这其实是响应式编程中最重要的一个特点,大部分的操作每一步都是异步的。也就是说在上面那版的代码中,连接也是异步的,连接完就会立刻返回一个ChannelFuture,通过这个返回结果设置等待双方连接成功;那么同样的,在 buf 刷写的时候也要设置保证数据书写成功:

    /**
     * NIO 客户端
     */
    @Test
    public void clientMode() throws InterruptedException {
        // 创建循环事件组
        NioEventLoopGroup selector = new NioEventLoopGroup(1);

        // 创建客户端
        NioSocketChannel client = new NioSocketChannel();

        // 将客户端注册到循环事件组中
        // 类似 epoll_ctl(5, ADD, 3) 这么个系统调用
        selector.register(client);

        // 连接到服务端
        ChannelFuture connect = client.connect(new InetSocketAddress("192.168.126.129", 9090));
        ChannelFuture sync = connect.sync();

        // 将数据的字节流缓冲进 bytebuf 中
        ByteBuf buf = Unpooled.copiedBuffer("zhangsanfeng".getBytes());

        // 将 buf 中的数据刷写到服务端
        ChannelFuture send = client.writeAndFlush(buf);
        send.sync();

        // 等待关闭,同步关闭
        sync.channel().closeFuture().sync();
        System.out.println("client over");
    }

这么一设置之后,服务端就可以正常接受到客户端发送过来的数据:

为什么这里花大篇幅来讲需要控制同步异步的事情,因为基于 Netty 向上构建 RPC 的话,有时候需要主动的取到客户端去发送数据,并且连接需要一直保持阻塞住不能断。除非服务端自己主动断开连接,这样客户端才会继续往下走,打印 “client over”。

解决了客户端主动发送数据之后,现在需要处理接收服务端数据。原理同上面的发送基本一致,首先应该要获取一个服务端给客户端发送数据的事件,然后将读取的处理注册到这个事件上。框架上已经封装好了方法,可以通过客户端取到相应的管道,然后这个管道中的数据具体的读取后处理,需要自定义一个 Handler 去继承它提供的 ChannelInboundHandlerAdapter,重写其方法

public class MyInHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client registed >> ");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client active >> ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);
        System.out.println(str);
        System.out.println("---------------------------------------------------------");
    }

}

如此一来,客户端就可以接收到服务端传来的数据:


现在是实现了客户端单方面的数据接收,如果客户端在接收到数据之后,还想要给对方回复的话,那么客户端在读取数据的时候,就不可以使用 buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8); 这个方法,这个方法会将 buf 中缓冲区的指针进行位移,这样再写入数据的时候由于指针在读取的时候已经位移了,就只能写入位移后指针的位置后面的数据。所以在读取数据的时候应该换成 buf.getCharSequence(0, buf.readableBytes(), CharsetUtil.UTF_8);

这样便可以将服务端发送过来的数据,再反向给它回复回去

NIO服务端

服务端的原理和客户端基本一致,也是创建一个事件组,然后将服务端的 Channel 注册到事件中,保持同步连接,等待同步关闭等等,直接看代码:

    /**
     * Netty - NIO - 服务端
     */
    @Test
    public void serverMode() throws InterruptedException {
        NioEventLoopGroup selector = new NioEventLoopGroup(1);

        NioServerSocketChannel server = new NioServerSocketChannel();

        selector.register(server);

        // 服务端和客户端在这里有着本质的不同
        // 对于客户端来说,连接中读取到的是服务端发送来的数据
        // 对于服务端来说,监听中读取到的就是 Socket,也就是一个个的client
        // 所以服务端接收到客户端之后,还要将这些客户端注册到事件中
        ChannelPipeline pipeline = server.pipeline();
        pipeline.addLast(new MyAcceptHandler(selector, new MyInHandler()));

        ChannelFuture bind = server.bind(new InetSocketAddress("192.168.31.134", 9090));
        bind.sync().channel().closeFuture().sync();
        System.out.println("server over");
    }

不过服务端和客户端在这里有着本质的不同:对于客户端来说,连接中读取到的是服务端发送来的数据;而对于服务端来说,监听中读取到的就是 Socket,也就是一个个的客户端,所以服务端接收到客户端之后,还要将这些客户端注册到事件中

public class MyAcceptHandler extends ChannelInboundHandlerAdapter {

    private final EventLoopGroup selector;
    private final ChannelHandler handler;

    public MyAcceptHandler(NioEventLoopGroup selector, ChannelHandler myInHandler) {
        this.selector = selector;
        this.handler = myInHandler;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server registed >> ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 得到连接过来的客户端
        SocketChannel client = (SocketChannel) msg;

        // 注册
        selector.register((Channel) client);

        // 响应式 Handler
        ChannelPipeline clientPipeline = ((Channel) client).pipeline();
        clientPipeline.addLast(handler);
    }

}

这样有了服务端之后,使用虚拟机作为客户端,通过 nc 命令来连接到服务端的 9090 端口:

nc 192.168.31.134 9090

客户端给服务端发送数据之后,被服务端接受,并返回给了客户端

那么既然能接受一个客户端,能不能接受多个客户端呢?很遗憾,此时在开一个窗口,使用 nc 模拟客户端再来连接服务端,会看到直接报错:

核心点在于这句:MyInHandler is not a @Sharable handler, so can’t be added or removed multiple times.

自定义的 Handler 并不可以用于共享,也就是说不允许在多个地方注册给多个连接共享使用。在服务端启动之后,MyInHandler 只被 new 了一次。然后每连接进来一个客户端,都是把这唯一的 MyInHandler 作为参数传递进去,单例是很香没错,但是如果在设计 MyInHandler 的时候定义了很多的属性,那这些属性的值就会乱掉。

这个问题解决起来也很简单,根据提示给 MyInHandler 加上 @ChannelHandler.Sharable 注解即可

很明显,这些处理数据的 Handler 肯定是由用户自定义去实现,就无法强制性要求用户非得按照单例模式来设计,所以 @ChannelHandler.Sharable 注解不应该强压给用户。如果每次传递参数的时候,都 new 一个 MyInHandler 传递进去,但是代码没法写啊…..

所以,是不是可以设计一个没有业务功能的 Handler 作为一个包装外壳,在这个壳里面再去真正的 new 这个 MyInHandler

@ChannelHandler.Sharable
public class ChanneInit extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        Channel client = ctx.channel();
        ChannelPipeline pipeline = client.pipeline();
        pipeline.addLast(new MyInHandler());
        ctx.pipeline().remove(this);
    }

}

原先 MyInHandler 上面的注解直接挪到这个壳上,那么服务端得到连接过来的客户端先接收到的就是 ChanneInit,注册会调起 ChannelInit 的注册事件,new 新的 MyInHandler,remove 掉 ChannelInit,所以在 ChanneInit 中最后有 ctx.pipeline().remove(this); 这么一句代码

这也是为什么 Netty 框架中也设计了一个 ChannelInitializer 抽象类的原因:

并且这个抽象类中还定义了一个 initChannel 方法:

也就是说当用户需要给客户端添加 Handler 的时候,就得先 new 一个 ChannelInitializer,然后去实现其中的 initChannel 方法,其实就是将自定义的 Handler 作为参数传进去。

Netty-客户端/服务端

前面已经将 NIO 的客户端与服务端间的通信手写了一遍,对其中的原理也认识的较为清晰。那么接下来,使用 Netty 官方的手法重新梳理一遍客户端和服务端。

    @Test
    public void nettyClient() throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup(1);

        Bootstrap bs = new Bootstrap();
        ChannelFuture channelFuture = bs.group(group)
                                        .channel(NioSocketChannel.class)
                                        .handler(new ChanneInit())
                                        .connect(new InetSocketAddress("192.168.126.129", 9090));
        Channel client = channelFuture.sync().channel();

        ByteBuf buf = Unpooled.copiedBuffer("qufeiyan".getBytes());
        ChannelFuture send = client.writeAndFlush(buf);
        send.sync();

        client.closeFuture().sync();
    }

Netty 的写法上较为更加的清晰,而且在 handler 那一步也可以采用 Netty 官方提供的 ChannelInitializer :

同样的服务端也可以这么写:

    @Test
    public void nettyServer() throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup(1);

        ServerBootstrap bs = new ServerBootstrap();
        ChannelFuture bind = bs.group(group, group)
                               .channel(NioServerSocketChannel.class)
                               .childHandler(new ChannelInitializer<NioSocketChannel>() {
                                   @Override
                                   protected void initChannel(NioSocketChannel ch) throws Exception {
                                       ChannelPipeline pipeline = ch.pipeline();
                                       pipeline.addLast(new MyInHandler());
                                   }
                               })
                               .bind(new InetSocketAddress("192.168.31.134", 9090));

        bind.sync().channel().closeFuture().sync();
    }



版权声明:本文为FeenixOne原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。