Netty 教程 – Netty粘包/拆包解决之道

  • Post author:
  • Post category:其他


异常情况



上一章

的代码,我们改造

TimeServerHandler

中的

channelRead

方法

private static class TimeServerHandler extends ChannelHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
        System.out.println("TimeServer 接收到的消息 :" + body + "; 当前统计:" + ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? String.valueOf(System.currentTimeMillis()) : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将消息队列中信息写入到SocketChannel中去,解决了频繁唤醒Selector所带来不必要的性能开销
        //Netty的 write 只是将消息放入缓冲数组,再通过调用 flush 才会把缓冲区的数据写入到 SocketChannel
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();//发生异常时候,执行重写后的 exceptionCaught 进行资源关闭
    }
}

改造

TimeClientHandler

代码

private static class TimeClientHandler extends ChannelHandlerAdapter {
    private byte[] req;
    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();

    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }
    private int counter;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("TimeClient 接收到的消息 :" + body + "; 当前统计:" + ++counter);
        ctx.close();//接受完消息关闭连接
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("释放资源:" + cause.getMessage());//不重写将会看到堆栈信息以及资源无法关闭
        ctx.close();
    }
}

分别启动

TimeServer



TimeClient

两个程序

绑定端口,同步等待成功......
TimeServer 接收到的消息 :QUERY TIME ORDER
QUERY TIME ORDER
......省略部分 QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORD; 当前统计:1
QUERY TIME ORDER
TimeServer 接收到的消息 :
......省略部分 QUERY TIME ORDER
QUERY TIME ORDER; 当前统计:2
TimeClient 接收到的消息 :BAD ORDER
BAD ORDER
; 当前统计:1

从上面的日志中,我们可以发现

服务端

发生

TCP

粘包的情况,正确情况应该是服务端输出

100

条含

TimeServer 接收到的消息 :QUERY TIME ORDER; 当前统计:counter

的日志,而且客户端只接收了部分断断续续的数据,说明返回时也发生了粘包…

解决之道

我们在上文说了

LineBasedFrameDecoder

是一个基于行的解码器,从源码中可以看到它是根据

\n

或者

\r\n

判断的,当

ByteBuf

存在这样的字符就认为是一个完整的数据包,这样可以有效的避免数据粘包或者拆包的情况,从而保证我们消息的有效传输,接下来我们就玩一玩

Netty



LineBasedFrameDecoder

…..

/**
 * A decoder that splits the received {@link ByteBuf}s on line endings.
 * <p>
 * Both {@code "\n"} and {@code "\r\n"} are handled.
 * For a more general delimiter-based decoder, see {@link DelimiterBasedFrameDecoder}.
 */
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        final int eol = findEndOfLine(buffer);
        if (!discarding) {
            if (eol >= 0) {
                final ByteBuf frame;
                final int length = eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
            }
        }
        ......
    }
}

修改

TimeServer



ChildChannelHandler



TimeServerHandler

内部类,添加了

LineBasedFrameDecoder



StringDecoder

两个解码器,同时向客户端回写系统当前时间戳,记得我们这里是用

\n

做换行处理的

private static class ChildChannelHandler extends ChannelInitializer {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast(new LineBasedFrameDecoder(1024));//划重点了,拿笔记下
        channel.pipeline().addLast(new StringDecoder());
        channel.pipeline().addLast(new TimeServerHandler());
    }

    private static class TimeServerHandler extends ChannelHandlerAdapter {
        private int counter;

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            System.out.println("TimeServer 接收到的消息 :" + body + "; 当前统计:" + ++counter);
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? String.valueOf(System.currentTimeMillis())+"\n"  : "BAD ORDER";
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            ctx.write(resp);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //将消息队列中信息写入到SocketChannel中去,解决了频繁唤醒Selector所带来不必要的性能开销
            //Netty的 write 只是将消息放入缓冲数组,再通过调用 flush 才会把缓冲区的数据写入到 SocketChannel
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();//发生异常时候,执行重写后的 exceptionCaught 进行资源关闭
        }
    }
}

修改

TimeClient



connect



handler

内部类

protected void initChannel(SocketChannel channel) throws Exception {
    channel.pipeline().addLast(new LineBasedFrameDecoder(1024));//划重点了,拿笔记下
    channel.pipeline().addLast(new StringDecoder());
    channel.pipeline().addLast(new TimeClientHandler());
}

修改

TimeClientHandler

中的

channelRead

读取数据的方法

private static class TimeClientHandler extends ChannelHandlerAdapter {
    private byte[] req;
    private int counter;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER\n").getBytes();
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("TimeClient 接收到的消息 :" + body + "; 当前统计:" + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("释放资源:" + cause.getMessage());//不重写将会看到堆栈信息以及资源无法关闭
        ctx.close();
    }
}

分别启动

TimeServer



TimeClient

两个程序



版权声明:本文为weixin_37948564原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。