Netty之实现自定义简单的编解码器二(MessageToMessageEncoder和MessageToMessageDecoder

  • Post author:
  • Post category:其他



1、对于MessageToMessageEncoder的理解


MessageToMessageEncoder编码器,这里的第二个Message可以理解为任意一个对象。如果是使用ByteBuf对象的话,就和之前的MessageToByte的原理是一样的了。所以要在MessageToMessageDecoder<ByteBuf>的解码器里面,手动的指定,是对ByteBuf类型的对象进行解码的操作。



2、编写MyStringEncoder编码器和MyStringDecoder解码器,



以便于,Netty中可以直接发送和接收String类型的数据




2.1

MyStringEncoder编码器的代码







import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.List;

public class MyStringEncoder extends MessageToMessageEncoder<CharSequence> {
	private final Charset charset;

	public MyStringEncoder() {
		this(Charset.defaultCharset());
	}

	public MyStringEncoder(Charset charset) {
		if (charset == null) {
			throw new NullPointerException("charset");
		}
		this.charset = charset;
	}

	protected void encode(ChannelHandlerContext ctx, CharSequence msg,
			List<Object> out) throws Exception {
		if (msg.length() == 0) {
			return;
		}

		out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg),
				this.charset));
	}
}




2.2

MyStringDecoder解码器的代码










import java.nio.charset.Charset;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

public class MyStringDecoder extends MessageToMessageDecoder<ByteBuf> {
	private final Charset charset;

	public MyStringDecoder() {
		this(Charset.defaultCharset());
	}

	public MyStringDecoder(Charset charset) {
		if (charset == null) {
			throw new NullPointerException("charset");
		}
		this.charset = charset;
	}

	protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
			List<Object> out) throws Exception {
		out.add(msg.toString(this.charset));
	}
}


3、服务端的实现


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {
	public void bind(int port) throws Exception {
		// 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接
		// 另一个线程组用于处理SocketChannel的网络读写
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			// NIO服务器端的辅助启动类 降低服务器开发难度
			ServerBootstrap serverBootstrap = new ServerBootstrap();
			serverBootstrap.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)// 类似NIO中serverSocketChannel
					.option(ChannelOption.SO_BACKLOG, 1024)// 配置TCP参数
					.option(ChannelOption.SO_BACKLOG, 1024) // 设置tcp缓冲区
					.option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲大小
					.option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲大小
					.option(ChannelOption.SO_KEEPALIVE, true) // 保持连接
					.childHandler(new ChildChannelHandler());// 最后绑定I/O事件的处理类
																// 处理网络IO事件

			// 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandler
			ChannelFuture f = serverBootstrap.bind(port).sync();
			System.out.println("Server启动");
			// 等待服务端监听端口关闭
			f.channel().closeFuture().sync();

		} finally {
			// 优雅退出 释放线程池资源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
			System.out.println("服务器优雅的释放了线程资源...");
		}

	}

	/**
	 * 网络事件处理器
	 */
	private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			// 增加自定义的编码器和解码器
			ch.pipeline().addLast(new MyStringEncoder());
			ch.pipeline().addLast(new MyStringDecoder());
			// 服务端的处理器
			ch.pipeline().addLast(new ServerHandler());
		}
	}

	public static void main(String[] args) throws Exception {
		int port = 9998;
		new Server().bind(port);
	}
}


4、服务端Handler的实现




import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		// 接受客户端的数据
		String body = (String) msg;
		System.out.println("Client :" + body);
		// 服务端,回写数据给客户端
		// 直接回写整形的数据
		String data = "Hello ,I am Server ...";
		ctx.writeAndFlush(data);

	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}
}


5、


客户端的实现






import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
	/**
	 * 连接服务器
	 * 
	 * @param port
	 * @param host
	 * @throws Exception
	 */
	public void connect(int port, String host) throws Exception {
		// 配置客户端NIO线程组
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			// 客户端辅助启动类 对客户端配置
			Bootstrap b = new Bootstrap();
			b.group(group)//
					.channel(NioSocketChannel.class)//
					.option(ChannelOption.TCP_NODELAY, true)//
					.handler(new MyChannelHandler());//
			// 异步链接服务器 同步等待链接成功
			ChannelFuture f = b.connect(host, port).sync();
			System.out.println(f);
			// 发送消息
			Thread.sleep(1000);
			f.channel().writeAndFlush("777");
			f.channel().writeAndFlush("666");
			Thread.sleep(2000);
			f.channel().writeAndFlush("888");

			// 等待链接关闭
			f.channel().closeFuture().sync();

		} finally {
			group.shutdownGracefully();
			System.out.println("客户端优雅的释放了线程资源...");
		}

	}

	/**
	 * 网络事件处理器
	 */
	private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			// 增加自定义的编码器和解码器
			ch.pipeline().addLast(new MyStringEncoder());
			ch.pipeline().addLast(new MyStringDecoder());
			// 客户端的处理器
			ch.pipeline().addLast(new ClientHandler());
		}

	}

	public static void main(String[] args) throws Exception {
		new Client().connect(9998, "127.0.0.1");

	}
}


6、客户端Handler的实现


import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		try {
			String body = (String) msg;
			System.out.println("Client :" + body);

			// 只是读数据,没有写数据的话
			// 需要自己手动的释放的消息

		} finally {
			ReferenceCountUtil.release(msg);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
	}

}


7、关于直接发送和接收String类型的编码


7.1 直接发送String类型的数据





7.2 直接接收String类型的数据














版权声明:本文为zbw18297786698原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。