Netty的多协议开发和粘包拆包

  • Post author:
  • Post category:其他


Http协议概述


什么是HTTP协议

HTTP是一个属于应用层的面向对象的协议,由于其简捷、快速的方式,适用于分布式超媒体信息系统


HTTP协议的主要特点可概括如下:

  • 支持Client/Server模式。
  • 简单快速:客户向服务器请求服务时,只需传送请求方法和路径。请求方法常用的有GET、HEAD、POST。每种方法规定了客户与服务器联系的类型不同。由于HTTP协议简单,使得HTTP服务器的程序规模小,因而通信速度很快。
  • 灵活:HTTP允许传输任意类型的数据对象。正在传输的类型由Content-Type加以标记。
  • 无连接:无连接的含义是限制每次连接只处理一个请求。服务器处理完客户的请求,并收到客户的应答后,即断开连接。采用这种方式可以节省传输时间。
  • 无状态:HTTP协议是无状态协议。无状态是指协议对于事务处理没有记忆能力。缺少状态意味着如果后续处理需要前面的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息时它的应答就较快。

Http协议交互过程

协议交互本质是指协议两端(客户端、服务端)如何传输数据?如何交换数据?


传输数据:


传输数据一般基于TCP/IP 实现,体现到开发语言上就是我们所熟悉的Socket 编程。


交换数据:


交换数据本质是指,两端(客户端、服务端)能各自识别对方所发送的数据。那么这就需要制定一套报文编码格式,双方以该格式编码数据发送给对方。Http 对应的Request 与Response报文格式如下图:

报文约定好以后两端都需要对其进行解码和编码操作,其过程如下图:

Http协议内容组成


请求方法

  • GET: 请求指定的页面信息,并返回实体主体。
  • HEAD: 类似于get请求,只不过返回的响应中没有具体的内容,用于获取报头
  • POST:向指定资源提交数据进行处理请求(例如提交表单或者上传文件)。数据被包含在请求体中,POST请求可能会导致新的资源的建立和/或已有资源的修改。
  • PUT: 从客户端向服务器传送的数据取代指定的文档的内容。
  • DELETE: 请求服务器删除指定的页面。
  • CONNECT:HTTP/1.1协议中预留给能够将连接改为管道方式的代理服务器。
  • OPTIONS: 允许客户端查看服务器的性能。
  • TRACE: 回显服务器收到的请求,主要用于测试或诊断。



部分请求头:

  • Host: 接受请求的服务器地址,可以是IP:端口号,也可以是域名
  • User-Agent:发送请求的应用程序名称
  • Connection: 指定与连接相关的属性,如Connection:Keep-Alive
  • Accept-Charset: 通知服务端可以发送的编码格式
  • Accept-Encoding: 通知服务端可以发送的数据压缩格式
  • Accept-Language: 通知服务端可以发送的语言


部分响应头:

  • Server: 服务器应用程序软件的名称和版本
  • Content-Type: 响应正文的类型(是图片还是二进制字符串)
  • Content-Length:实体报头域用于指明实体正文的长度,以字节方式存储的十进制数字来表示响应正文长度
  • Content-Charset: 响应正文使用的编码
  • Content-Encoding: 响应正文使用的数据压缩格式
  • Content-Language: 响应正文使用的语言


部分响应状态:

  • 200 响应成功
  • 302 跳转,跳转地址通过响应头中的Location属性指定(JSP中Forward和Redirect之间的区别)
  • 400 客户端请求有语法错误,不能被服务器识别
  • 403 服务器接收到请求,但是拒绝提供服务(认证失败)
  • 404 请求资源不存在
  • 500 服务器内部错误

Netty的Http协议开发

由于Netty天生是异步事件驱动的架构,因此基于NIO TCP协议栈开发的HTTP协议栈也是异步非阻塞的,Netty的HTTP协议栈无论在性能上还是可靠性上相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性更好。

Http服务端开发

public class HttpServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();//处理连接请求
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();//处理I/O 读写事件和业务逻辑
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //1. 给pipeline设置httpServerCodec处理器,httpServerCodec是netty提供的处理http的编解码器
                            socketChannel.pipeline().addLast("myHttpServerCodec", new HttpServerCodec());
                            //2. SimpleChannelInboundHandler是ChannelInboundHandlerAdapter的子类;HttpObject表示客户端和服务器端互相通讯的数据被封装成HttpObject类型
                            //而不是之前的Object类型
                            socketChannel.pipeline().addLast("httpServerHandler", new SimpleChannelInboundHandler<HttpObject>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
                                    //不同浏览器对应不同的pipeline和handler
                                    if(msg instanceof HttpRequest){
                                        HttpRequest request = (HttpRequest)msg;
                                        //获取URI过滤特定资源
                                        URI uri = new URI(request.uri());
                                        if("/favicon.ico".equals(uri.toString())){
                                            System.out.println("请求图标不做响应");
                                            return;
                                        }
                                        System.out.println("msg类型=" + msg.getClass());
                                        System.out.println("客户端地址:"+ctx.channel().remoteAddress());
                                        //回复消息给浏览器
                                        ByteBuf content = Unpooled.copiedBuffer("hello,我是服务器", CharsetUtil.UTF_8);
                                        //构造一个Http响应,即HttpResponse
                                        DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
                                        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
                                        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
                                        //将构建好的response返回
                                        ctx.writeAndFlush(response);
                                    }
                                }
                            });
                        }
                    });//给我们workGroup的eventLoop的对应的管道设置处理器
            ChannelFuture channelFuture = bootstrap.bind(7788).sync();//绑定监听端口并调用同步阻塞方法等待绑定操作完成
            channelFuture.channel().closeFuture().sync();//等待服务器链路关闭之后main函数才退出
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

上面的HttpServerCodec是Netty提供的编解码器,相当于HttpRequestDecoder, HttpResponseEncoder,也可以在服务端使用HttpRequestDecoder来解码请求并用HttpResponseEncoder来编码响应,如下:

public class HttpSimpleServer {
    //open 启动服务
    public void openServer() {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.channel(NioServerSocketChannel.class);
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup(8);
        bootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());//添加Http请求消息解码器,解码Request
                //将多个消息转换为单一的FullHttpRequest或者FullHttpResponse,因为Http解码器在每个Http消息中会生成多个消息对象
                ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
                ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());//添加Http消息编码器,编码response
                ch.pipeline().addLast("http-server", new HttpServerHandler());
            }
        });
        bootstrap.group(boss, work);
        try {
            ChannelFuture future = bootstrap.bind(8833).sync();
            System.out.println("服务启动:8833");
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    private static class HttpServerHandler extends SimpleChannelInboundHandler {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");
            String html = "<!DOCTYPE html>\n" +
                    "<html lang=\"en\">\n" +
                    "<head>\n" +
                    "    <meta charset=\"UTF-8\">\n" +
                    "    <title>hello word</title>\n" +
                    "</head>\n" +
                    "<body>\n" +
                    "hello word\n" +
                    "</body>\n" +
                    "</html>";
            response.content().writeBytes(html.getBytes("UTF-8"));
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    }

    public static void main(String[] args) {
        HttpSimpleServer simpleServer = new HttpSimpleServer();
        simpleServer.openServer();
    }
}

WebSocket协议开发

Http协议的弊端

  • Http协议为半双工协议,半双工指的是数据可以在客户端和服务端两个方向上传输,但是不能同时传输,同一时刻只有一个方向上的数据传送
  • Http消息冗长而繁琐,Http消息包含消息头、消息体、换行符等,文本方式传输。

为了解决Http协议效率抵消的问题,HTML5开始提供了一种浏览器和服务器之间进行全双工通信的网络技术-WebSocket,WebSocket API中浏览器和服务器只需要做一个握手的动作,然后服务器和浏览器之间就形成一条快速通道,两者就可以互相传递数据了,WebSocket基于TCP双向全双工进行消息传递,同一时刻既可以发送消息可以接受消息,相比HTTP的半双工协议,性能得到很大的提升。

WebSocket的特点

  • 单一的TCP连接,采用全双工模式通信
  • 对代理、防火墙和路由器透明
  • 无头部信息、Cookie和身份验证
  • 无安全开销
  • 通过“ping/pong”帧保持链路激活
  • 服务器可主动传递消息给客户端,不在需要客户端轮训

WebSocket 协议报文格式:

任何应用协议都有其特有的报文格式,比如Http协议通过 空格 换行组成其报文。如http 协议不同在于WebSocket属于二进制协议,通过规范进二进位来组成其报文。具体组成如下图:


报文说明:



FIN:

标识是否为此消息的最后一个数据包,占 1 bit


RSV1, RSV2, RSV3

:用于扩展协议,一般为0,各占1bit


Opcode

:数据包类型(frame type),占4bits

0x0:标识一个中间数据包

0x1:标识一个text类型数据包

0x2:标识一个binary类型数据包

0x3-7:保留

0x8:标识一个断开连接类型数据包

0x9:标识一个ping类型数据包

0xA:表示一个pong类型数据包

0xB-F:保留


MASK

:占1bits,用于标识PayloadData是否经过掩码处理。如果是1,Masking-key域的数据即是掩码密钥,用于解码PayloadData。客户端发出的数据帧需要进行掩码处理,所以此位是1。


Payload length:


Payload data的长度,占7bits,7+16bits,7+64bits:

如果其值在0-125,则是payload的真实长度。

如果值是126,则后面2个字节形成的16bits无符号整型数的值是payload的真实长度。注意,网络字节序,需要转换。

如果值是127,则后面8个字节形成的64bits无符号整型数的值是payload的真实长度。注意,网络字节序,需要转换。


Payload data


应用层数据

WebSocket连接的建立

建立webSocket连接时,需要通过客户端或浏览器发出握手请求,请求消息示例如下图:

为了建立一个WebSocket连接,客户端浏览器首先向服务器发起一个Http请求,这个Http请求和通常的Http请求不同,包含一些附加头信息,其中附加头信息“Upgrade :Websocket”表明这是一个申请协议升级的Http请求,服务器解析附加头信息并生成应答报文返回给客户端,客户端和服务端的WebSocket连接就建立起来了,双方可以通过这个连接自由传递消息,直到某一方主动关闭该连接,服务端返回给客户端的应答报文如下:

其中请求消息中的“Sec-WebSocket-key”是随机的,服务器端会用这些数据来构造出一个SHA-1的信息摘要,并进行BASE-64编码,作为“Sec-WebSocket-Accept”头的值返回给客户端。

Websocket协议开发

主要流程如下:


服务端:


pipeline添加 聚合器 HttpObjectAggregator

pipeline添加分包器ChunkedWriteHandler

pipeline添加WebSocket协议处理器 WebSocketServerProtocolHandler

pipeline编写WebSocketServerHandler


客户端:


编写客户端js脚本


服务端代码如下:

public class WebSocketServer {

    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //第一次握手请求时由Http协议承载,使用Http的编码器和解码器
                            pipeline.addLast(new HttpServerCodec());
                            //是以块方式写,支持异步发送大的码流(如大文件的传输),但不占用过多的内存
                            pipeline.addLast(new ChunkedWriteHandler());
                            /**
                             * Http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合
                             * 这就是为什么当浏览器发送大量数据时就会发送多次Http请求
                             */
                            pipeline.addLast(new HttpObjectAggregator(8192));
                            /**
                             * 对于websocket数据是以帧(frame)的形式传递
                             * 可以看到WebSocketFrame下面有6个子类
                             * 浏览器请求时:ws://localhost:7000/hello 表示请求的uri,(与页面中websocket中url一样)
                             * WebSocketServerProtocolHandler的核心功能是将Http协议升级成ws协议(是通过状态码101),保持长连接
                             */
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
                            pipeline.addLast(new WebSocketHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(7000).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    static class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            System.out.println("服务器收到消息" + msg.text());
            //响应
            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + msg.text()));
        }

        /**
         * 当web客户端连接后,触发方法
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            //id表示唯一的值,LongText是唯一的,ShortText不是唯一的
            System.out.println("handlerAdd被调用" + ctx.channel().id().asLongText());
            System.out.println("handlerAdd被调用" + ctx.channel().id().asShortText());
        }
        /**
         * 当web客户端连接断开后,触发方法
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerRemoved被调用" + ctx.channel().id().asLongText());
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("异常发生" + cause.getMessage());
            ctx.close();

        }
    }
}

通过javaScript 中的API可以直接操作WebSocket 对象,其示例如下,客户端的hello.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
    <form onsubmit="return false">
        <textarea name="message" style="height:300px; width: 300px"></textarea>
        <input type="button" value="发送消息" onclick="send(this.form.message.value)">
        <textarea id="responseText" style="height:300px; width: 300px"></textarea>
        <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
    </form>
</body>
<script>

    var socket;
    if(window.WebSocket){
        socket = new WebSocket("ws://localhost:7000/hello");
        //这个方法相当于channel中read方法收到服务器端会送的消息
        socket.onmessage = function (msg) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + msg.data;
        }
        //相当于连接开启
        socket.onopen = function (msg) {
            var rt = document.getElementById("responseText");
            rt.value = "连接开启了"
        }
        //相当于连接断开
        socket.onclose = function (msg) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" +"连接断开了"
        }
    }else{
        alert("不支持websocket")
    }

    function send(message) {
        if(!window.socket){//先判断websocket是否创建好
            return
        }
        if(socket.readyState == WebSocket.OPEN){
            //通过socket发送消息
            socket.send(message);
        }else{
            alert("连接没有开启")
        }
    }
</script>
</html>

打开hello.html

在从标准的HTTP或者HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此,使用WebSocket的应用程序将始终以HTTP/S作为开始,然后再执行升级。这个升级动作发生的确切时刻特定于应用程序;他可能会发生在启动时,也可能会发生在请求了某个特定的URL之后。

我们的应用程序将采用下面的约定:如果被请求的URL以/hello结尾,那么我们将会把该协议升级为WebSocket;否则,服务器将使用基本的HTTP/S。在连接已经升级完成之后,所有数据都将会使用WebSocket进行传输。

在页面中发送消息,客户端和服务端基于长连接,可以发送消息,效率高:

websocket的常用API如下:

var ws = new WebSocket(“ws://localhost:8080”);
ws.onopen = function()// 建⽴成功之后触发的事件
{
console.log(“打开连接”);
ws.send("message"); // 发送消息
};
ws.onmessage = function(evt) { // 接收服务器消息
console.log(evt.data);
};
ws.onclose = function(evt) {
console.log(“WebSocketClosed!”); // 关闭连接
};
ws.onerror = function(evt) {
console.log(“WebSocketError!”); // 连接异常
};

首先:申请创建一个WebSocket对象,并传入WebSocket地址信息,这时client会通过Http先发起握手请求。消息格式如下:



GET /chat HTTP/1.1

Host: server.example.com

Upgrade: websocket    //告诉服务端需要将通信协议升级到websocket

Connection: Upgrade

Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==     //浏览器base64加密的密钥,server端收到后需要提取

Sec-WebSocket-Key 信息,然后加密。

Origin: http://example.com

Sec-WebSocket-Protocol: chat, superchat     //表⽰客⼾端请求提供的可供选择的⼦协议

Sec-WebSocket-Version: 13      //版本标识

其次:服务端响应、并建立连接


HTTP/1.1 101 Switching Protocols

Upgrade: websocket

Connection: Upgrade

Sec-WebSocket-Accept: SIEylb7zRYJAEgiqJXaOW3V+ZWQ=

然后:握手成功促发客户端 onOpen 事件

连接状态查看:

通过ws.readyState 可查看当前连接状态可选值如下:

  • CONNECTING (0):表示还没建立连接;
  • OPEN (1): 已经建立连接,可以进行通讯;
  • CLOSING (2):通过关闭握手,正在关闭连接;
  • CLOSED (3):连接已经关闭或无法打开;


WEBSOCKET帧

WebSocket以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。

IETF发布的WebSocket RFC,定义了6种帧,Netty为它们都提供了一个POJO实现。WebSocketFrame类型的6种子类型:

BinaryWebSocketFrame——包含了二进制数据

TextWebSocketFrame——包含了文本数据

ContinuationWebSocketFrame——包含属于上一个BinaryWebSocketFrame或TextWebSocketFrame的文本数据或者二进制数据

CloseWebSocketFrame——表示一个关闭链路的请求,包含一个关闭的状态码和关闭的原因

PingWebSocketFrame——请求传输一个PongWebSocketFrame

PongWebSocketFrame——作为一个对于PingWebSocketFrame的响应被发送

TextWebSocketFrame是我们需要处理的帧类型。为了符合WebSocket RFC,Netty提供了WebSocketServerProtocolHandler来处理其他类型的帧。

webSocket+Netty实现弹幕系统


编码和解码

编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码,Codec编解码器由两部分:Decoder(解码器)和Encoder(编码器)组成,Encoder负责把业务数据转换成字节码数据,Decoder负责把字节码数据转换成业务数据。

Netty中提供了一些Codec编解码器,如StringEncoder和StringDecoder对字符串进行编解码,ObjectEncoder和ObjectDecoder对java对象进行编解码。ObjectEncoder和ObjectDecoder对Pojo对象序列化操作的时候底层仍是使用Java序列化技术,而Java序列化技术本身存在以下问题:

  • 无法跨语言
  • 序列化后码流太大
  • 序列化性能比较低

Google的Protobuf

ProtoBuf全称是Google Protocol Buffers,由谷歌开源而来,是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,适合做数据存储或RPC数据交换格式(目前很多公司的数据交换由Http+Json向Tcp+Protobuf转换,),它将数据结构以.proto文件进行描述,通过代码生成工具(protobuf.exe编译器)可以生成对应数据结构的POJO对象和Protobuf相关的方法和属性,特点如下:

结构化的数据存储格式(xml或者Json)

  • 高效的编解码性能
  • 语言无关、平台无关、扩展性好
  • 官方支持Java、C++、Python三种语言

Protobuf的入门

Protobuf是一个灵活的、高效、结构化的数据序列化框架,支持将数据结构化一次而可以到处使用,甚至跨语言使用,通过代码生成工具可以自动生产不通语言版本的源代码,甚至可以使用不同的版本的数据结构进程间进行传递,实现数据结构的前后兼容

准备工作:引入protobuf的依赖和下载protoc-3.13.0-win32.zip(protoc.exe工具主要是将.protp文件生成相应的代码)

 <dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>3.13.0</version>
</dependency>

编写.proto文件,demo1如下:

syntax = "proto3"; //使用proto3协议
option java_outer_classname = "StudentPOJO"; //生成的外部类名,同时也是文件名
message Student { //会在StudentPOJO外部类中生成一个内部类Student,Student才是真正发送的POJO对象

    int32 id = 1; //Student类中有一个属性名字为id,类型为int32(protobuf类型), 1表示属性的序号,而不是值
    string name = 2;
}

demo2如下:

syntax = "proto3";
option optimize_for = SPEED;//加快解析
option java_package = "com.geeron.authdemo.nio.codec2";//指定生成在那个包下
option java_outer_classname = "DataPOJO"; //生成的外部类名,同时也是文件名
//protobuf可以使用message 管理其他message
message MultiMessage {
    enum MessageType{
        StudentType = 0; //在proto3中要求enum的编号从0开始
        WorkerType = 1;
    }
    //哪一个枚举的类型
    MessageType type = 1;
    //oneof 表示每次枚举的类型最多只能出现其中之一,节省空间
    oneof dataBody{
        Student student = 2;
        Worker worker = 3;
    }
}

message Student { //会在StudentPOJO外部类中生成一个内部类Student
    int32 id = 1; //Student类中有一个属性名字为id,类型为int32(protobuf类型), 1表示属性的序号,而不是值
    string name = 2;
}

message Worker { //会在StudentPOJO外部类中生成一个内部类Student
    string name = 1;
    int32 age = 2;
}

用protoc.exe命令生产java代码,如下,生成的java类名是由proto文件中

option java_outer_classname

指定的名称。


protoc.exe –java_out=. xxx.proto

代码中引入生成的protobuf java类,在Netty中引入protobuf的编解码器并发送protobuf相关

服务器端:

   public static void main(String[] args) throws InterruptedException {
      
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();//处理连接请求
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();//处理I/O 读写事件和业务逻辑
        try {
            //ServerBootstrap对象是Netty用于启动NIO服务端的辅助启动类,设置启动参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//设置服务器的通道类型:NioServerSocketChannel
                    .option(ChannelOption.SO_BACKLOG, 1024)//设置NioServerSocketChannel的TCP参数,线程队列等待连接的个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {//绑定I/O事件处理类
                        //给pipeline设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                      
                            System.out.println("客户的channel的hashCode" +socketChannel.hashCode());
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //在pipeline加入ProtobufDecoder,并指定对那种对象进行解码
                            pipeline.addLast("decoder", new ProtobufDecoder(DataPOJO.MultiMessage.getDefaultInstance()));
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });//给我们workGroup的eventLoop的对应的管道设置处理器
            ChannelFuture channelFuture = bootstrap.bind(9911).sync();//绑定监听端口并调用同步阻塞方法等待绑定操作完成
            channelFuture.channel().closeFuture().sync();//等待服务器链路关闭之后main函数才退出
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    //自定义一个Handler继续Netty规定的某个Handler适配器
    static class NettyServerHandler extends SimpleChannelInboundHandler<DataPOJO.MultiMessage> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, DataPOJO.MultiMessage msg) throws Exception {
            DataPOJO.MultiMessage.DataBodyCase type = msg.getDataBodyCase();
            if(type == DataPOJO.MultiMessage.DataBodyCase.WORKER){
                System.out.println("客户端发送消息age::"+msg.getWorker().getAge() +"名字" + msg.getWorker().getName());

            }else if(type == DataPOJO.MultiMessage.DataBodyCase.STUDENT){
                System.out.println("客户端发送消息id:"+msg.getStudent().getId() +"名字" + msg.getStudent().getName());
            }
        }
    }
}

客户端:

public static void main(String[] args) throws InterruptedException {
        //客户端需要一个事件循环组
        NioEventLoopGroup loopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();//客户端使用BootStrap
            bootstrap.group(loopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //在pipeline加入protobufEncoder
                            pipeline.addLast("encoder", new ProtobufEncoder());
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9911);//连接服务端
            channelFuture.channel().closeFuture().sync();
        }finally {
            loopGroup.shutdownGracefully();
        }

    }
    static class NettyClientHandler extends ChannelInboundHandlerAdapter{
        /**
         * 当通道就绪就会触发该方法,发送消息给服务器端(写入通道)
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            DataPOJO.MultiMessage message = null;
            if(new Random(10).nextInt() %2 == 0){
                message = DataPOJO.MultiMessage.newBuilder().setType(DataPOJO.MultiMessage.MessageType.StudentType).setStudent(DataPOJO.Student.newBuilder().setId(35).setName("dachun").build()).build();
            }else{
                message = DataPOJO.MultiMessage.newBuilder().setType(DataPOJO.MultiMessage.MessageType.WorkerType).setWorker(DataPOJO.Worker.newBuilder().setAge(35).setName("saoxiu").build()).build();
            }
            System.out.println("Server ctx: " +ctx);
            ctx.writeAndFlush(message);

        }
    }

Netty的ChannelHandler的调用机制

客户端发送消息(如:Long型),经过(出栈)编码器编码,发送到服务端,服务端收到后,解码消息(入栈),消费后响应一个消息(Long型)给服务端,经过编码器编码(出栈)后响应给服务端,服务端收到后通过解码器解码(入栈)响应消息。

服务端的开发

public class MyServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //添加入栈的Handler进行解码,对客户端的消息进行解码
                            pipeline.addLast(new MyByteToLongDecoder());
                            //添加出栈的编码器handler,对客户端做出响应的数据进行编码
                            pipeline.addLast(new Myclient.MyLongToByteEncoder());
                            //添加业务逻辑处理
                            pipeline.addLast(new MyServerHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.bind(7009).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    //自定义byte-Long解码器
    static class MyByteToLongDecoder extends ByteToMessageDecoder {
        /**
         * decoder方法 会根据接受的数据被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多的可读字节为止
         * 如果list 不为空,就会将list的内容传递给下一个ChannelInboundHandler处理,该处理器的方法也会被调用多次
         * @param ctx 上下文
         * @param in 入站的Bytebuf
         * @param out 将解码后的数据传给下一个Handler处理(也就是MyServerHandler)
         * @throws Exception
         */
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            System.out.println("decoder被调用");
            //Long 是8个字节,所以必须满足8个字节才能读取1个Long
            if(in.readableBytes() >= 8){
                out.add(in.readLong());//添加到list中被下个handler获取
            }
        }
    }

    static class MyServerHandler extends SimpleChannelInboundHandler<Long> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
            System.out.println("从客户端:" +ctx.channel().remoteAddress() + "读取到long:" + msg);
            //给客户端做出响应
            ctx.writeAndFlush(985L);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

服务端的开发

public class Myclient {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //添加出栈的Handler,对发送服务端的数据进行编码
                            pipeline.addLast(new MyLongToByteEncoder());
                            //添加入栈的handler,对服务端响应的数据进行解码
                            pipeline.addLast(new MyServer.MyByteToLongDecoder());
                            //加入一个自定义的handler,处理业务
                            pipeline.addLast(new MyClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7009).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }

    }
    static class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
        @Override
        protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
            System.out.println("encoder方法被调用,msg" + msg);
            out.writeLong(msg);//写入到ByteBuf中
        }
    }

    static class MyClientHandler extends SimpleChannelInboundHandler<Long>{

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
            System.out.println("服务器IP=" + ctx.channel().remoteAddress());
            System.out.println("收到服务器的消息:" + msg);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client 发送数据");
            ctx.writeAndFlush(211L);
            //该处理器的前一个Handler是MessageToByteEncoder的 实现,在write方法中有如下判断:类型与解码器的泛型是否匹配
            //因此我们在编写encoder时要主要传入的数据类型和处理的数据类型一致
            /**
             *   if (acceptOutboundMessage(msg)) { 判断当前msg是不是应该处理的类型
             *                 @SuppressWarnings("unchecked")
             *                 I cast = (I) msg;
             *                 buf = allocateBuffer(ctx, cast, preferDirect);
             *                 try {
             *                     encode(ctx, cast, buf);
             *                 } finally {
             *                     ReferenceCountUtil.release(cast);
             *                 }
             *
             *                 if (buf.isReadable()) {
             *                     ctx.write(buf, promise);
             *                 } else {
             *                     buf.release();
             *                     ctx.write(Unpooled.EMPTY_BUFFER, promise);
             *                 }
             *                 buf = null;
             *             } else {
             *                 ctx.write(msg, promise);
             *             }
             */
            // 如果按照这种形式发送, 服务端在接受消息的时候可能会解码多次
            //ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcd", CharsetUtil.UTF_8));
        }
    }
}

结论:不论是解码器Handler还是编码器Handler,即接受的消息的类型必须与待处理的消息类型一致,否则该Handler不会被执行

在解码器在数据解码时,需要判断缓存区(ByteBuf)的数据是否足够,否则接收到的结果会和期望的可能不一致。


TCP的粘包和拆包

TCP编程中无论是服务端还是客户端,读取和发送消息时都要考虑TCP底层的粘包和拆包机制,TCP是面向连接(数据是没有界限的)、面向流的、提供高可靠性服务,服务器和客户端都要有对应的socket,因此发送端为了将多个发送给接收端的包更有效的发给对方,使用了优化算法(Nagle算法),将多次间隔较小且数据量小的数据合并成一个大的数据块,然后进行封包,这样虽然提高了发送的效率,但是接收端就难以分辨完整的数据包,因此面向流的通信是无消息保护边界的。所以相对于业务来说,一个完整的包可能会被TCP拆分多个包进行发送 ,也有可能把许多小的包封装成一个大的数据包发送,这就是TCP的粘包和拆包的问题


粘包、拆包问题图解

假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,由于服务端一次性读取的字节数是不确定的,因此服务端收到的数据可以分为三种,如下所示:

第一种情况,接收端正常收到两个数据包,即没有发生拆包和粘包的现象,此种情况不考虑。

第二种情况,接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以接收端不知道如何处理。

第三种情况,接收端收到了两个数据包,但是这两个数据包要么是不完整的,第一次读取到了完整Packet2包和一部分的Packet1包,第二次读取到Packet2包的剩余内容,(或者第一次读取一部分的Packet2包,第二次读取剩下的Packet2包和完整的Packet1包),这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。

粘包和拆包的演示

服务端核心代码:接受服务端发送的消息并做出响应

 static class TcpServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            //服务端接受数据
            byte[] buffer = new byte[msg.readableBytes()];
            msg.readBytes(buffer);//将msg消息读取到buffer字节数组中
            String message = new String(buffer, Charset.forName("UTF-8"));
            System.out.println("服务器接收到的数据" + message);
            //做出响应
            ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(), Charset.forName("UTF-8"));
            ctx.writeAndFlush(byteBuf);

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

客户端核心代码:发送10个消息并接受服务端的响应消息

 static class TcpClientHandler extends SimpleChannelInboundHandler<ByteBuf>{
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //发送10个数据
            for (int i=0; i< 10; i++){
                ByteBuf byteBuf = Unpooled.copiedBuffer("Hello,Server" + i, Charset.forName("UTF-8"));
                ctx.writeAndFlush(byteBuf);
            }
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            byte[] buffer = new byte[msg.readableBytes()];
            msg.readBytes(buffer);//将msg消息读取到buffer字节数组中
            String message = new String(buffer, Charset.forName("UTF-8"));
            System.out.println("客户端接收到的数据" + message);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
            ctx.close();
        }
    }

执行可以看出:服务端接受数据时候发生粘包和拆包

服务器接收到的数据Hello,Server0

服务器接收到的数据Hello,Server1

服务器接收到的数据Hello,Server2Hello,Server3Hello,Server4

服务器接收到的数据Hello,Server5Hello,Server6

服务器接收到的数据Hello,Server7Hello,Server8

服务器接收到的数据Hello,Server9

TCP粘包拆包发生的原因有很多,主要包括如下:

  • 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。

  • 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。

  • 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。

  • 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。


粘包和拆包的解决策略


使用自定义协议+编解码器来解决


关键是要解决服务端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或者少读数据的问题,从而避免了TCP粘包和拆包的问题

自定义协议MessageProtocol

@Data
public class MessageProtocol {

    private int len; //消息长度
    private byte[] content;//消息内容

}

在服务端和客户端指定编码器

服务端代码:添加编解码器

public class ProtocolTcpServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //添加解码器
                            pipeline.addLast(new MessageDecoder());
                            //添加编码器
                            pipeline.addLast(new ProtocolTcpClient.MessageEncoder());
                            //添加业务逻辑处理
                            pipeline.addLast(new TcpServerHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.bind(7010).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    static class MessageDecoder extends ReplayingDecoder<MessageProtocol>{

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            System.out.println("decoder被调用");
            //需要将二进制字节码转成MessageProtocol对象
            int length = in.readInt();
            byte[] bytes = new byte[length];
            in.readBytes(bytes);
            //封装成MessageProtocol对象,放入out,传递下一个handler业务处理
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(bytes);
            out.add(messageProtocol);


        }
    }

    static class TcpServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
        private int count;
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
            int len = msg.getLen();
            byte[] content = msg.getContent();
            System.out.println("服务器接收到信息:长度" + len + "内容:" +
                    new String(content, Charset.forName("utf-8")));
            System.out.println("服务器接收到的消息包数量" + (++ this.count));
            //回复消息
            String resp = "海底捞";
            int respLen =resp.getBytes(Charset.forName("utf-8")).length;
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setContent(resp.getBytes(Charset.forName("utf-8")));
            messageProtocol.setLen(respLen);
            ctx.writeAndFlush(messageProtocol);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

客户端代码:

public class ProtocolTcpClient {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //添加编码器,对对象进行编码
                            pipeline.addLast(new MessageEncoder());
                            //添加解码器,对服务端的响应进行解码
                            pipeline.addLast(new ProtocolTcpServer.MessageDecoder());
                            pipeline.addLast(new TcpClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7010).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }

    }

    static class MessageEncoder extends MessageToByteEncoder<MessageProtocol>{

        @Override
        protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
            System.out.println("messageEncoder encoder被调用");
            out.writeInt(msg.getLen());
            out.writeBytes(msg.getContent());
        }
    }

    static class TcpClientHandler extends SimpleChannelInboundHandler<MessageProtocol>{
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //发送10个数据
            for (int i=0; i< 5; i++){
                String msg = "吃火锅";
                byte[] content = msg.getBytes(Charset.forName("utf-8"));
                int length = msg.getBytes(Charset.forName("utf-8")).length;
                //创建协议包
                MessageProtocol messageProtocol = new MessageProtocol();
                messageProtocol.setContent(content);
                messageProtocol.setLen(length);
                ctx.writeAndFlush(messageProtocol);
            }
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
            byte[] content = msg.getContent();
            int len = msg.getLen();
            System.out.println("客户端接受的消息 长度:" +len + "内容:"
                    +new String(content, Charset.forName("utf-8")
            ));


        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("异常消息" + cause.getMessage());
            ctx.close();
        }
    }
}

由于TCP无法知道上层业务数据,所以TCP底层无法保证数据包不会被拆分和重组,所以我们只能利用上层的应用协议栈设计来解决,归纳如下:

  1. 消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格

  2. 在包尾增加回车换行符进行分割,例如FTP协议

  3. 将消息分为消息头和消息体,消息头包含消息的总长度(或者消息体长度)

以上3种方式,客户端接受到包的时候就可以根据这些约束区分出来不同的包。

Netty中解决TCP粘包拆包问题

为了解决TCP中粘包、拆包导致的半包读写问题,Netty默认提供了多种编解码器用于处理半包,直接使用这些类库,TCP粘包拆包问题就变得非常容易


LineBasedFremeDecoder解决TCP粘包问题

LineBasedFremeDecoder改造服务端代码

public class NettyServer {

    public void bind(int port){
        //NioEventLoopGroup是一个线程组,包含一组NIO线程
        EventLoopGroup bossGroup = new NioEventLoopGroup();//用于服务端接受客户端的连接
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用于SocketChannel的网络读写
        try{
            //ServerBootstrap对象是Netty用于启动NIO服务端的辅助启动类
            ServerBootstrap bs = new ServerBootstrap();
            bs.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//设置创建的channel
                    .option(ChannelOption.SO_BACKLOG, 1024)//设置NioServerSocketChannel的TCP参数
                    .childHandler(new ChildChannelHandler());//绑定I/O事件处理类
            ChannelFuture sync = bs.bind(port).sync();//绑定监听端口并调用同步阻塞方法等待绑定操作完成
            sync.channel().closeFuture().sync();//等待服务器链路关闭之后main函数才退出
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
            socketChannel.pipeline().addLast(new StringDecoder());
            socketChannel.pipeline().addLast(new TimeServerHandler());
        }
    }

    class TimeServerHandler extends ChannelHandlerAdapter{
        private int counter;
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String)msg;
            System.out.println("The time server received order :" + body +";the counter is:" + ++counter);
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
            //响应的消息也添加回车换行符
            currentTime = currentTime + System.getProperty("line.separator");
            ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
            ctx.writeAndFlush(resp);
            System.out.println("done" +currentTime);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //Netty把write方法并不直接将消息写入到SocketChannel中,调用write方法只是把待发送的消息放到缓冲数组中,
            // 调用flush方法才将消息全部写道SocketChanel
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //释放相关句柄等资源
            ctx.close();
        }
    }

    public static void main(String[] args) {
        new NettyServer().bind(9988);
    }
}

LineBasedFremeDecoder改造客户端代码

public class NetttClient {

    public void connect(String host, int port){
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            //创建客户端辅助启动类Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,Boolean.TRUE)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        //创建NioSocketChannel成功之后,进行初始化
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            ChannelFuture sync = bootstrap.connect(host, port).sync();
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //释放NIO线程组资源
            group.shutdownGracefully();
        }
    }

    class TimeServerHandler extends ChannelHandlerAdapter {
        //private final ByteBuf firstMessage;
        private byte[] req;
        private int counter;
        public TimeServerHandler() {
            //给消息添加回车换行符
            req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
        }
        //当客户端和服务端TCP链路建立成功之后,Netty的NI线程会调用channelActive方法,发送查询指定给服务端
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //将请求消息发送给服务端
            ByteBuf message;
            for (int i =0; i <100; i++){
                message = Unpooled.buffer(req.length);
                message.writeBytes(req);
                ctx.writeAndFlush(message);
            }
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String) msg;
            System.out.println("now is :" + body +";the couter is :" + ++counter);

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.info("Unexpected exception frm downstream:" + cause.getMessage());
            ctx.close();
        }
    }

    public static void main(String[] args){
        new NetttClient().connect("127.0.0.1",9988);
    }
}

在改造的代码中,新增了2个解码器LineBasedFrameDecoder和StringDecoder,发送的带有回车换行符的消息在被接收后msg就是删除了回车换行符的消息,不需要再对消息进行编码解码。LineBasedFrameDecoder的工作原理就是一次遍历ByteBUF中可读字节,判断看是否有“\n”或者“\r\n”,如有,就以此位置为结束位置,这样可以读到一行一行的息,LineBasedFrameDecoder是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度,如果连续读取到最大长度仍然没有发现换行符就会抛出异常, 同时忽略之前读取的异常码流。StringDecoder的功能就是将接收到的对象转成字符串,然后继续调用Handler,LineBasedFrameDecoder+StringDecoder组合就是按行切换的文本解码器。

当然,基于LineBasedFrameDecoder+StringDecoder组合是针对回车换行符,如果消息没有回车换行符的消息就需要使用其他的半包解码器,Netty提供了支持多种TCP粘包/拆包的解码器,用来满足不同需求

分隔符和定长解码器


分隔符解码器:DelimiterBasedFrameDecoder的应用开发


DelimiterBasedFrameDecoder的服务端开发

public class EchoServer {

    public void bind(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            //1024 表示单条消息的最大长度,当达到该长度后仍然没有查找到分隔符则抛出TooLongFrameException异常
                            //第二个参数就是分隔符缓冲对象
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            ChannelFuture sync = bootstrap.bind(port).sync();
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    class EchoServerHandler extends ChannelHandlerAdapter {

        private int counter;
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();//发生异常,关闭链路
        }


        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String body = (String)msg;
            System.out.println("This is " + ++counter + "times received cline :{" + body +"}");
            body += "$_";
            ByteBuf byteBuf = Unpooled.copiedBuffer(body.getBytes());
            ctx.writeAndFlush(byteBuf);
        }
    }


    public static void main(String[] args) {
        new EchoServer().bind(9999);
    }
}


DelimiterBasedFrameDecoder的客户端开发

public class EchoClient {

    public void connect(String host, int port){
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf delimeter = Unpooled.copiedBuffer("$_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimeter));
                            socketChannel.pipeline().addLast(new StringEncoder());
                            socketChannel.pipeline().addLast(new EchoClientHandler());

                        }
                    });
            //发起异步连接操作
            ChannelFuture sync = bootstrap.connect(host, port).sync();
            //等待客户端链路关闭
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //释放NIO线程组
            group.shutdownGracefully();
        }
    }

    class EchoClientHandler extends ChannelHandlerAdapter {
        private int counter;
        static final String ECHO_REQ = "HI,this is netty.$_";
        public EchoClientHandler() {
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
           for (int i =0; i < 100; i ++){
               ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
           }
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("This is "+ ++counter + "times receive server :{" + msg +"}");
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    }

    public static void main(String[] args) {
        new EchoClient().connect("127.0.0.1", 9999);
    }
}

分别启动服务端和客户端

This is 1times received cline :{HI,this is netty.}

This is 2times received cline :{HI,this is netty.}

This is 3times received cline :{HI,this is netty.}

This is 4times received cline :{HI,this is netty.}

This is 5times received cline :{HI,this is netty.}

This is 6times received cline :{HI,this is netty.}

DelimiterBasedFrameDecoder是一个支持自定义分隔符来实现解码,创建的DelimiterBasedFrameDecoder加入到ChannelPipeline中,构造方法中的第一个参数1024 表示单条消息的最大长度,当达到最大长度仍然没有找到分隔符的话就会抛出TooLongFrameExeception异常。防止由于缺失分隔符导致的内存溢出,第二个参数是分隔符缓冲对象,DelimiterBasedFrameDecoder对消息进行解码,后续的ChannelHandler收到的msg就是一个完整的消息包,StringDecoder的作用就是将Bytebuf解码成字符串对象。


FixedLengthFrameDecoder的应用开发

FixedLengthFrameDecoder解码器是固定长度的解码器,能够按照指定的长度对消息进行自动解码,使用的方法和上面的几种解码器是一致的,主要不通在于如下

 public void bind(int port){
        //NioEventLoopGroup是一个线程组,包含一组NIO线程
        EventLoopGroup bossGroup = new NioEventLoopGroup();//用于服务端接受客户端的连接
        EventLoopGroup workerGroup = new NioEventLoopGroup();//用于SocketChannel的网络读写
        try{
            //ServerBootstrap对象是Netty用于启动NIO服务端的辅助启动类
            ServerBootstrap bs = new ServerBootstrap();
            bs.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//设置创建的channel
                    .option(ChannelOption.SO_BACKLOG, 1024)//设置NioServerSocketChannel的TCP参数
                    .childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(15));
                    socketChannel.pipeline().addLast(new StringDecoder());
                    socketChannel.pipeline().addLast(new TimeServerHandler());
                }
            });//绑定I/O事件处理类
            ChannelFuture sync = bs.bind(port).sync();//绑定监听端口并调用同步阻塞方法等待绑定操作完成
            sync.channel().closeFuture().sync();//等待服务器链路关闭之后main函数才退出
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

通过自定义长度的解码器,无论一次接受多少数据包,都会按照构造设置的固定长度进行解码,如果是半包消息,会缓存宝宝消息并等待下一个包到达后进行拼包,知道读取一个完整的包



版权声明:本文为weixin_37598682原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。