springboot+netty实现UDP监听两个端口

  • Post author:
  • Post category:其他


在springboot工程中集成netty框架实现UDP通信,在CommandLineRunner的初始化启动时启动两个UDP监听服务。

实例如下

1、两个NettyUdpServer

@Slf4j
@Component
@EnableAsync
public class NettyUdpServer {

    @Autowired
    NettyServerHandlerInitializer nettyServerHandlerInitializer;

    @Async
    public void init(int port) {
        //表示服务器连接监听线程组,专门接受 accept 新的客户端client 连接
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
        try {
            //1、创建netty bootstrap 启动类
            Bootstrap serverBootstrap = new Bootstrap();
            //2、设置boostrap 的eventLoopGroup线程组
            serverBootstrap.group(bossLoopGroup)
                    //3、设置NIO UDP连接通道
                    .channel(NioDatagramChannel.class)
                    // 设置 Netty Server 的端口
                    .localAddress(new InetSocketAddress(port))
                    //4、设置通道参数 SO_BROADCAST广播形式
                    .option(ChannelOption.SO_BROADCAST, true)
                    //5、设置处理类 装配流水线
                    .handler(nettyServerHandlerInitializer);
            //6、绑定server,通过调用sync()方法异步阻塞,直到绑定成功
            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            log.info("started and listened on " + channelFuture.channel().localAddress());
            //7、监听通道关闭事件,应用程序会一直等待,直到channel关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
        } finally {
            log.info("netty udp close!", "info");
            //8 关闭EventLoopGroup,
            bossLoopGroup.shutdownGracefully();
        }
    }
}
@Slf4j
@Component
@EnableAsync
public class NettyUdpServer2 {

    @Autowired
    NettyServerHandlerInitializer2 nettyServerHandlerInitializer2;

    @Async
    public void init(int port) {
        //表示服务器连接监听线程组,专门接受 accept 新的客户端client 连接
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
        try {
            //1、创建netty bootstrap 启动类
            Bootstrap serverBootstrap = new Bootstrap();
            //2、设置boostrap 的eventLoopGroup线程组
            serverBootstrap.group(bossLoopGroup)
                    //3、设置NIO UDP连接通道
                    .channel(NioDatagramChannel.class)
                    // 设置 Netty Server 的端口
                    .localAddress(new InetSocketAddress(port))
                    //4、设置通道参数 SO_BROADCAST广播形式
                    .option(ChannelOption.SO_BROADCAST, true)
                    //5、设置处理类 装配流水线
                    .handler(nettyServerHandlerInitializer2);
            //6、绑定server,通过调用sync()方法异步阻塞,直到绑定成功
            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            log.info("started and listened on " + channelFuture.channel().localAddress());
            //7、监听通道关闭事件,应用程序会一直等待,直到channel关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
        } finally {
            log.info("netty udp close!", "info");
            //8 关闭EventLoopGroup,
            bossLoopGroup.shutdownGracefully();
        }
    }
}

2、CommandLineRunner启动

	...
	private void initNettyUdpServer() {
        nettyUdpServer.init(port);
        nettyUdpServer2.init(port2);
    }

启动监听两个端口。

3、ChannelInitializer

@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {

    @Autowired
    NettyHandler nettyHandler;

    @Override
    protected void initChannel(Channel channel) {
        ChannelPipeline channelPipeline = channel.pipeline();
        // 添加一堆 NettyServerHandler 到 ChannelPipeline 中
        channelPipeline.addLast("nettyHandler", nettyHandler);
    }
}

4、SimpleChannelInboundHandler

@Slf4j
@Component
public class NettyHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
        ByteBuf byteBuf = datagramPacket.content();
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        //数据处理

        // res
        String resStr = "ok";
        byte[] resBytes = resStr.getBytes(StandardCharsets.UTF_8);
        DatagramPacket resData = new DatagramPacket(Unpooled.copiedBuffer(resBytes), datagramPacket.sender());
        channelHandlerContext.writeAndFlush(resData);
    }
}

5、测试

启动服务,结果类似如下

在这里插入图片描述

可见启动了两个upd端口监听。

6、UDP测试的服务端和客户端

下面为基于netty的UDP服务端用于监听和基于netty的UDP客户端用于发送

public class UdpServer {
 
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    // 主线程处理
                    .channel(NioDatagramChannel.class)
                    // 广播
                    .option(ChannelOption.SO_BROADCAST, true)
                    // 设置读缓冲区为2M
                    .option(ChannelOption.SO_RCVBUF, 2048 * 1024)
                    // 设置写缓冲区为1M
                    .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
                    .handler(new ChannelInitializer<NioDatagramChannel>() {
                        @Override
                        protected void initChannel(NioDatagramChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NioEventLoopGroup(), new UdpServerHandler());
                        }
                    });
            ChannelFuture f = bootstrap.bind(8000).sync();
            System.out.println("服务器正在监听......");
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
 
    }
}
public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) {
        System.out.println("服务端接收到消息:" + packet.content().toString(StandardCharsets.UTF_8));
        // 向客户端回复消息
        ByteBuf byteBuf = Unpooled.copiedBuffer("已经接收到消息!".getBytes(StandardCharsets.UTF_8));
        ctx.writeAndFlush(new DatagramPacket(byteBuf, packet.sender()));
    }
}
public class UdpClient {
 
    public static void main(String[] args) {
    EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioDatagramChannel.class)
                    .handler(new ChannelInitializer<NioDatagramChannel>() {
                        @Override
                        protected void initChannel(NioDatagramChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new UdpClientHandler());
                        }
                    });
            Channel channel = bootstrap.bind(8089).sync().channel();
            InetSocketAddress address = new InetSocketAddress("localhost", 8088);
            ByteBuf byteBuf = Unpooled.copiedBuffer("你好".getBytes(StandardCharsets.UTF_8));
            channel.writeAndFlush(new DatagramPacket(byteBuf, address)).sync();
            channel.closeFuture().await();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}
public class UdpClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
 
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) {
        System.out.println("客户端接收到消息:" + packet.content().toString(StandardCharsets.UTF_8));
        // 向服务端回复消息
        ByteBuf byteBuf = Unpooled.copiedBuffer("已经接收到消息!".getBytes(StandardCharsets.UTF_8));
        ctx.writeAndFlush(new DatagramPacket(byteBuf, packet.sender()));
    }
 
}

注:启动两个端口监听时需要采用@Async注解,使其异步执行,否则会出现只能启动第一个,第二个无法执行的问题。从日志中也只能看到第一个端口的监听,第二个没有执行



版权声明:本文为leijie0322原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。