在springboot工程中集成netty框架实现UDP通信,在CommandLineRunner的初始化启动时启动两个UDP监听服务。
实例如下
1、两个NettyUdpServer
@Slf4j
@Component
@EnableAsync
public class NettyUdpServer {
@Autowired
NettyServerHandlerInitializer nettyServerHandlerInitializer;
@Async
public void init(int port) {
//表示服务器连接监听线程组,专门接受 accept 新的客户端client 连接
EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
try {
//1、创建netty bootstrap 启动类
Bootstrap serverBootstrap = new Bootstrap();
//2、设置boostrap 的eventLoopGroup线程组
serverBootstrap.group(bossLoopGroup)
//3、设置NIO UDP连接通道
.channel(NioDatagramChannel.class)
// 设置 Netty Server 的端口
.localAddress(new InetSocketAddress(port))
//4、设置通道参数 SO_BROADCAST广播形式
.option(ChannelOption.SO_BROADCAST, true)
//5、设置处理类 装配流水线
.handler(nettyServerHandlerInitializer);
//6、绑定server,通过调用sync()方法异步阻塞,直到绑定成功
ChannelFuture channelFuture = serverBootstrap.bind().sync();
log.info("started and listened on " + channelFuture.channel().localAddress());
//7、监听通道关闭事件,应用程序会一直等待,直到channel关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
log.info("netty udp close!", "info");
//8 关闭EventLoopGroup,
bossLoopGroup.shutdownGracefully();
}
}
}
@Slf4j
@Component
@EnableAsync
public class NettyUdpServer2 {
@Autowired
NettyServerHandlerInitializer2 nettyServerHandlerInitializer2;
@Async
public void init(int port) {
//表示服务器连接监听线程组,专门接受 accept 新的客户端client 连接
EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
try {
//1、创建netty bootstrap 启动类
Bootstrap serverBootstrap = new Bootstrap();
//2、设置boostrap 的eventLoopGroup线程组
serverBootstrap.group(bossLoopGroup)
//3、设置NIO UDP连接通道
.channel(NioDatagramChannel.class)
// 设置 Netty Server 的端口
.localAddress(new InetSocketAddress(port))
//4、设置通道参数 SO_BROADCAST广播形式
.option(ChannelOption.SO_BROADCAST, true)
//5、设置处理类 装配流水线
.handler(nettyServerHandlerInitializer2);
//6、绑定server,通过调用sync()方法异步阻塞,直到绑定成功
ChannelFuture channelFuture = serverBootstrap.bind().sync();
log.info("started and listened on " + channelFuture.channel().localAddress());
//7、监听通道关闭事件,应用程序会一直等待,直到channel关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
log.info("netty udp close!", "info");
//8 关闭EventLoopGroup,
bossLoopGroup.shutdownGracefully();
}
}
}
2、CommandLineRunner启动
...
private void initNettyUdpServer() {
nettyUdpServer.init(port);
nettyUdpServer2.init(port2);
}
启动监听两个端口。
3、ChannelInitializer
@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
@Autowired
NettyHandler nettyHandler;
@Override
protected void initChannel(Channel channel) {
ChannelPipeline channelPipeline = channel.pipeline();
// 添加一堆 NettyServerHandler 到 ChannelPipeline 中
channelPipeline.addLast("nettyHandler", nettyHandler);
}
}
4、SimpleChannelInboundHandler
@Slf4j
@Component
public class NettyHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
ByteBuf byteBuf = datagramPacket.content();
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
//数据处理
// res
String resStr = "ok";
byte[] resBytes = resStr.getBytes(StandardCharsets.UTF_8);
DatagramPacket resData = new DatagramPacket(Unpooled.copiedBuffer(resBytes), datagramPacket.sender());
channelHandlerContext.writeAndFlush(resData);
}
}
5、测试
启动服务,结果类似如下
可见启动了两个upd端口监听。
6、UDP测试的服务端和客户端
下面为基于netty的UDP服务端用于监听和基于netty的UDP客户端用于发送
public class UdpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
// 主线程处理
.channel(NioDatagramChannel.class)
// 广播
.option(ChannelOption.SO_BROADCAST, true)
// 设置读缓冲区为2M
.option(ChannelOption.SO_RCVBUF, 2048 * 1024)
// 设置写缓冲区为1M
.option(ChannelOption.SO_SNDBUF, 1024 * 1024)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NioEventLoopGroup(), new UdpServerHandler());
}
});
ChannelFuture f = bootstrap.bind(8000).sync();
System.out.println("服务器正在监听......");
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) {
System.out.println("服务端接收到消息:" + packet.content().toString(StandardCharsets.UTF_8));
// 向客户端回复消息
ByteBuf byteBuf = Unpooled.copiedBuffer("已经接收到消息!".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(new DatagramPacket(byteBuf, packet.sender()));
}
}
public class UdpClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new UdpClientHandler());
}
});
Channel channel = bootstrap.bind(8089).sync().channel();
InetSocketAddress address = new InetSocketAddress("localhost", 8088);
ByteBuf byteBuf = Unpooled.copiedBuffer("你好".getBytes(StandardCharsets.UTF_8));
channel.writeAndFlush(new DatagramPacket(byteBuf, address)).sync();
channel.closeFuture().await();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
public class UdpClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) {
System.out.println("客户端接收到消息:" + packet.content().toString(StandardCharsets.UTF_8));
// 向服务端回复消息
ByteBuf byteBuf = Unpooled.copiedBuffer("已经接收到消息!".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(new DatagramPacket(byteBuf, packet.sender()));
}
}
注:启动两个端口监听时需要采用@Async注解,使其异步执行,否则会出现只能启动第一个,第二个无法执行的问题。从日志中也只能看到第一个端口的监听,第二个没有执行
版权声明:本文为leijie0322原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。