springboot集成kafka消息中间件

  • Post author:
  • Post category:其他




前言

目前消息中间件有很多,比如rabbitmq、rocketmq、activemq、kafka,集成方法和使用方法大同小异,消息中间件主要是解决三方面的问题:削峰、异步、解耦。曾用过rabbitmq上传机器人状态信息、用rocketmq进行订单超时取消功能。



集成说明

kafka原生并不支持传输对象,具体可用传输类型可参照下图



以Serializer后缀的为序列化类,以Deserializer后缀的为反序列化类。若需要让kafka能够传输对象,一是实体类需要实现序列化接口,二是实现Serializer和Deserializer接口。

为方便了解具体的集成步骤,本文生产者和消费者在不同的工程集成



集成步骤

1、定义实体转换类

/**
 * @ClassName BeanConvertUtils
 * @Description 实体转换工具类
 * @Author yuxk
 * @Date 2020/11/11 10:47
 * @Version 3.0
 **/
public class BeanConvertUtil {
   

    private static final Logger logger = LoggerFactory.getLogger(BeanConvertUtil.class);

    private BeanConvertUtil() {
   

    }

    public static byte[] ObjectToBytes(Object obj) {
   
        byte[] bytes = null;
        ByteArrayOutputStream bo = null;
        ObjectOutputStream oo = null;
        try {
   
            bo = new ByteArrayOutputStream();
            oo = new ObjectOutputStream(bo);
            oo.writeObject(obj);
            bytes = bo.toByteArray();

        } catch (IOException e) {
   
            logger.warn(e.getMessage(), e);
        } finally {
   
            try {
   
                if (bo != null) {
   
                    bo.close();
                }
                if (oo != null) {
   
                    oo.close();
                }
            } catch (IOException e) {
   
                logger.warn(e.getMessage(), e);
            }
        }
        return bytes;
    }

    /**
     * 字节数组转对象
     *
     * @param bytes
     * @return
     */
    public static Object BytesToObject(byte[] bytes) {
   
        Object obj = null;
        ByteArrayInputStream bi = null;
        



版权声明:本文为zhutian0991原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。