前言
目前消息中间件有很多,比如rabbitmq、rocketmq、activemq、kafka,集成方法和使用方法大同小异,消息中间件主要是解决三方面的问题:削峰、异步、解耦。曾用过rabbitmq上传机器人状态信息、用rocketmq进行订单超时取消功能。
集成说明
kafka原生并不支持传输对象,具体可用传输类型可参照下图
以Serializer后缀的为序列化类,以Deserializer后缀的为反序列化类。若需要让kafka能够传输对象,一是实体类需要实现序列化接口,二是实现Serializer和Deserializer接口。
为方便了解具体的集成步骤,本文生产者和消费者在不同的工程集成
集成步骤
1、定义实体转换类
/**
* @ClassName BeanConvertUtils
* @Description 实体转换工具类
* @Author yuxk
* @Date 2020/11/11 10:47
* @Version 3.0
**/
public class BeanConvertUtil {
private static final Logger logger = LoggerFactory.getLogger(BeanConvertUtil.class);
private BeanConvertUtil() {
}
public static byte[] ObjectToBytes(Object obj) {
byte[] bytes = null;
ByteArrayOutputStream bo = null;
ObjectOutputStream oo = null;
try {
bo = new ByteArrayOutputStream();
oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
bytes = bo.toByteArray();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
} finally {
try {
if (bo != null) {
bo.close();
}
if (oo != null) {
oo.close();
}
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
return bytes;
}
/**
* 字节数组转对象
*
* @param bytes
* @return
*/
public static Object BytesToObject(byte[] bytes) {
Object obj = null;
ByteArrayInputStream bi = null;
版权声明:本文为zhutian0991原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。