Netty接收UDP组播数据

  • Post author:
  • Post category:其他


针对多网卡情况,绑定其中的一个网卡,接收UDP组播中的数据。

package com.demo.udp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;

import java.net.*;
import java.util.Enumeration;

public class UdpReceiver {
    // 组播地址
    private static final String groupIp = "225.1.2.2";
    // 组播端口号
    private static final int groupPort = 1234;
    // 本机地址
    private static final String localIp = "192.168.1.110";


    public static void main(String[] args) {
        // 组播地址
        InetSocketAddress groupAddress = new InetSocketAddress(groupIp, groupPort);

        EventLoopGroup group = new NioEventLoopGroup();

        try {
            NetworkInterface ni = NetworkInterface.getByInetAddress(InetAddress.getByName(localIp));
            Enumeration<InetAddress> addresses = ni.getInetAddresses();

            InetAddress localAddress = null;
            while (addresses.hasMoreElements()) {
                InetAddress address = addresses.nextElement();
                if (address instanceof Inet4Address) {
                    localAddress = address;
                    System.out.println("网络接口名称为:" + ni.getName());
                    System.out.println("网卡接口地址:" + address.getHostAddress());
                }
            }

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channelFactory(new ChannelFactory<NioDatagramChannel>() {
                        @Override
                        public NioDatagramChannel newChannel() {
                            return new NioDatagramChannel(InternetProtocolFamily.IPv4);
                        }
                    })
                    .localAddress(new InetSocketAddress(localAddress, groupPort))
                    .option(ChannelOption.IP_MULTICAST_IF, ni)
                    .option(ChannelOption.IP_MULTICAST_ADDR, InetAddress.getByName(localIp))
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_RCVBUF, 2048 * 1024)
                    .option(ChannelOption.SO_SNDBUF, 1024 * 1024)
                    .handler(new ChannelInitializer<NioDatagramChannel>() {
                        @Override
                        protected void initChannel(NioDatagramChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NioEventLoopGroup(), new UdpHandler());

                        }
                    });

            NioDatagramChannel channel = (NioDatagramChannel) bootstrap.bind(groupAddress.getPort()).sync().channel();
            channel.joinGroup(groupAddress, ni).sync();
            channel.closeFuture().await();
        } catch (SocketException e) {
            throw new RuntimeException(e);
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            group.shutdownGracefully();
        }

    }

}

package com.demo.udp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;

public class UdpHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
        final ByteBuf buf = msg.content();
        int readableBytes = buf.readableBytes();
        byte[] msgData = new byte[readableBytes];
        buf.readBytes(msgData);

        String s = ByteUtil.bytesToHexString(msgData);
        System.out.println("接收到的数据 " + s);
    }
}

package com.demo.udp;


import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

/**
 * Byte数组转换工具类
 */
public class ByteUtil {


    /**
     * 读取输入流中指定字节的长度
     * <p/>
     * 输入流
     *
     * @param length 指定长度
     * @return 指定长度的字节数组
     */
    public static byte[] readBytesFromTo(byte[] buffer, int from, int length) {
        byte[] sub = new byte[length];
        int cur = 0;
        for (int i = from; i < length + from; i++) {
            if (i >= buffer.length) {
                throw new ArrayIndexOutOfBoundsException("该报文长度不符合要求!");
            }
            sub[cur] = buffer[i];
            cur++;
        }
        return sub;
    }

    /**
     * 转换byte数组为int(小端)
     *
     * @return
     * @note 数组长度至少为4,按小端方式转换,即传入的bytes是小端的,按这个规律组织成int
     */
    public static int bytes2Int_LE(byte[] bytes) {
        if (bytes.length < 4) {
            return -1;
        }
        int iRst = (bytes[0] & 0xFF);
        iRst |= (bytes[1] & 0xFF) << 8;
        iRst |= (bytes[2] & 0xFF) << 16;
        iRst |= (bytes[3] & 0xFF) << 24;

        return iRst;
    }

    /**
     * 转换byte数组为Char(小端)
     *
     * @return
     * @note 数组长度至少为2,按小端方式转换
     */
    public static char bytes2Char_LE(byte[] bytes) {
        if (bytes.length < 2)
            return (char) -1;
        int iRst = (bytes[0] & 0xFF);
        iRst |= (bytes[1] & 0xFF) << 8;

        return (char) iRst;
    }

    /**
     * 转换byte数组为short(小端)
     *
     * @return
     * @note 数组长度至少为2,按小端方式转换
     */
    public static short littleByteToShort(byte[] data) {
        if (data.length != 2) {
            throw new UnsupportedOperationException("the byte length is not 2");
        }
        return ByteBuffer.allocate(data.length).put(data).getShort(0);
    }

    public static int bytes2Int(byte[] data) {
        if (data.length != 4) {
            throw new UnsupportedOperationException("the byte length is not 4");
        }
        return ByteBuffer.allocate(data.length).put(data).getInt(0);
    }

    /**
     * byte数组转换成16进制字符串
     */
    public static String bytesToHexString(byte[] bArray) {
        StringBuffer sb = new StringBuffer(bArray.length);
        String sTemp;
        for (int i = 0; i < bArray.length; i++) {
            sTemp = Integer.toHexString(0xFF & bArray[i]);
            if (sTemp.length() < 2) {
                sb.append(0);
            }
            sb.append(sTemp.toUpperCase());
        }
        return sb.toString();
    }

    /**
     * 16进制转换为字符串
     *
     * @param hex
     * @return
     */
    public static String hexStr2Str(String hex) {
        String hexStr = "";
        String str = "0123456789ABCDEF";
        for (int i = 0; i < hex.length(); i++) {
            String s = hex.substring(i, i + 1);
            if (s.equals("a") || s.equals("b") || s.equals("c") || s.equals("d") || s.equals("e") || s.equals("f")) {
                s = s.toUpperCase().substring(0, 1);
            }
            hexStr += s;
        }

        char[] hexs = hexStr.toCharArray();
        int length = (hexStr.length() / 2);
        byte[] bytes = new byte[length];
        int n;
        for (int i = 0; i < bytes.length; i++) {
            int position = i * 2;// 两个16进制字符 -> 1个byte数值
            n = str.indexOf(hexs[position]) * 16;
            n += str.indexOf(hexs[position + 1]);
            bytes[i] = (byte) (n & 0xff);
        }
        String name = "";
        try {
            name = new String(bytes, "GBK");
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return name;
    }

    /**
     * 字符串转换为Ascii
     *
     * @param value
     * @return
     */
    public static String stringToAscii(String value) {
        StringBuffer sbu = new StringBuffer();
        char[] chars = value.toCharArray();
        for (int i = 0; i < chars.length; i++) {
            if (i != chars.length - 1) {
                sbu.append((int) chars[i]).append(",");
            } else {
                sbu.append((int) chars[i]);
            }
        }
        return sbu.toString();
    }

    /**
     * Ascii转换为字符串
     *
     * @param value
     * @return
     */
    public static String asciiToString(String value) {
        StringBuffer sbu = new StringBuffer();
        String[] chars = value.split(",");
        for (int i = 0; i < chars.length; i++) {
            sbu.append((char) Integer.parseInt(chars[i]));
        }
        return sbu.toString();
    }

    public static byte[] int2ByteLe(int res) {
        byte[] targets = new byte[4];

        targets[0] = (byte) (res & 0xff); // 最高位
        targets[1] = (byte) ((res >> 8) & 0xff);
        targets[2] = (byte) (res >> 16 & 0xff);
        targets[3] = (byte) (res >> 24 & 0xff);

        return targets;
    }

}




版权声明:本文为weixin_43823832原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。