handler链调用机制示例
https://www.bilibili.com/video/BV1jK4y1s7GV?p=79
https://www.bilibili.com/video/BV1jK4y1s7GV?p=80
https://www.bilibili.com/video/BV1jK4y1s7GV?p=81
socket –> channel –>入栈
channel –> socket –> 出栈
(server)解码器–ProtobufDecoder : MessageToMessageDecoder : ChannelInboundHandlerAdapter
(client)编码器–ProtobufEncoder : MessageToMessageEncoder : ChannelOutboundHandlerAdapter : ChannelHandlerAdapter : ChannelHandler (I)
ByteToMessageDecoder
ByteToMessageDecoder 会对入栈数据进行缓冲,直到它准备好处理。
服务端的channelpipeline接收到客户端通过socket传过来的数据,解析
ToIntegerDecoder
当没有更多元素添加到List的时候,它的内容将会被发送给下一个ChannelInBoundHandler.
读数据(encoder)的顺序
socket channelPipeline
二进制数据 <– ToIntegerDecoder –> 业务处理器 ChannelInboundHandlerAdapter —>—|
二进制数据 <– ToIntegerEncoder <– 业务处理器 ChannelOutboundHandlerAdapter —<—|
==========================客户端代码 ======================================
package com.atguigu.inboundhandlerandoutboundhandler.client;
import com.sun.corba.se.impl.orbutil.concurrent.Sync;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Author: tz_wl
* Date: 2020/9/28 15:50
* Content:
*/
public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new MyClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8087).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
package com.atguigu.inboundhandlerandoutboundhandler.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
/**
* Author: tz_wl
* Date: 2020/9/28 15:52
* Content:
*/
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
//(再走这个)加入一个出站的 Encoder Handler
pipeline.addLast(new MyLongToByteEncoder());
//(先走这个)加入一个自定义的handler ,处理业务
pipeline.addLast(new MyClientHandler());
}
}
package com.atguigu.inboundhandlerandoutboundhandler.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* Author: tz_wl
* Date: 2020/9/28 16:00
* Content:
*/
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Long aLong) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" === step01 ==== MyClientHandler 发送数据");
ctx.writeAndFlush(123456L);
}
}
package com.atguigu.inboundhandlerandoutboundhandler.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* Author: tz_wl
* Date: 2020/9/28 15:54
* Content:
*/
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("=== step02 ==== MyLogToByteEncoder encode 被调用 msg="+msg);
out.writeLong(msg);
}
}
=
服务端代码
=========
package com.atguigu.inboundhandlerandoutboundhandler.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Author: tz_wl
* Date: 2020/9/28 14:30
* Content:
*/
public class MyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture future = serverBootstrap.bind(8087).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.atguigu.inboundhandlerandoutboundhandler.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
/**
* Author: tz_wl
* Date: 2020/9/28 14:34
* Content:
*/
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
//入站的handler进行解码 MyByteToLongDecoder
pipeline.addLast(new MyByteToLongDecoder());
//自定义handler 处理业务逻辑
pipeline.addLast(new MyServerHandler());
}
}
package com.atguigu.inboundhandlerandoutboundhandler.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* Author: tz_wl
* Date: 2020/9/28 14:37
* Content:
*/
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/** ByteBuf in : 待解码数据(入站的ByteBuf )
* List<object> out : 输出 供 链式调用的下一步 ChannelInBoundHandler 调用的数据 */
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println(" === step03 ==== MyByteToLongDecoder 被调用!");
//需要判断8个字节才能为一个long
if(in.readableBytes()>=8){
out.add(in.readLong());
}
}
}
package com.atguigu.inboundhandlerandoutboundhandler.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* Author: tz_wl
* Date: 2020/9/28 14:41
* Content:
*/
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println(" === step04 ==== 从客户端" + ctx.channel().remoteAddress() + "读取到 Long : " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
https://www.bilibili.com/video/BV1jK4y1s7GV?p=83
关于发送参数 一致性的问题