spring cloud stream 3.1.x 整合集成rocketMq 4.9.2
最近做项目升级时候,项目使用了spring cloud alibaba ,其中使用spring cloud stream 集成rocketMq 最新版本4.9.2 ,现将具体几个区别点和使用写进来,给后续其他人少走弯路
1、交代以下具体使用的版本
spring cloud alibaba 使用 2021.0.1.0
<dependencyManagement>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.6.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2021.0.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2021.0.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencyManagement>
2、pom.xml 引用配置,其他公共引用我就没贴了,自行查看spring cloud alibaba文档即可
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
3、配置文件配置,采用yml方式配置,只截取重点
stream:
rocketmq:
binder:
# RocketMQ 服务器地址
name-server: 172.16.233.22:9876
group: ttbossDbData
bindings:
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
inBlacklistOrderEvent-in-0: { consumer: { subscription: addBlackListOrder, broadcasting: false } }
inBlacklistConfigEvent-in-0: { consumer: { subscription: upBlackListConfig||addBlackListSafeIp||delBlackListSafeIp||addWhiteMobile||delWhiteMobile, messageModel: BROADCASTING } }
bindings:
# 这里是个 Map 类型参数,{} 为 YAML 中 Map 的行内写法
inBlacklistConfigEvent-in-0: { destination: blacklist_config, contentType: text/plain, group: group_blacklist_config, consumer: { max-attempts: 1 } }
inBlacklistOrderEvent-in-0: { destination: blacklist_order, contentType: text/plain, group: group_blacklist_order, consumer: { max-attempts: 1 } }
outBlacklistConfigEvent-out-0: { destination: blacklist_config, contentType: application/json,group: group_blacklist_config}
# 单个可不指定,多个使用`;`分号隔开
function:
definition: inBlacklistConfigEvent;inBlacklistOrderEvent;outBlacklistConfigEvent
4、监听消息队列事件代码
@Bean
public Function<Flux<Message<String>>, Mono<Void>> inBlacklistConfigEvent() {
return flux -> flux.map(message -> {
logger.info(message.getHeaders().toString());
String ROCKET_TAGS = message.getHeaders().containsKey("ROCKET_TAGS") ? Objects.requireNonNull(message.getHeaders().get("ROCKET_TAGS")).toString() : "";
String ROCKET_MQ_MESSAGE_ID = message.getHeaders().containsKey("ROCKET_MQ_MESSAGE_ID") ? Objects.requireNonNull(message.getHeaders().get("ROCKET_MQ_MESSAGE_ID")).toString() : "";
String id = message.getHeaders().containsKey("id") ? Objects.requireNonNull(message.getHeaders().get("id")).toString() : "";
logger.info("监听黑名单拦截配置事件#消息队列主机头{},{},", ROCKET_TAGS, message.getPayload());
//更新当前jvm配置文件信息
return message;
}).then();
}
5、发送消息代码
//mq监听处理
@Service
public class MqSendMessageServiceImpl implements MqSendMessageService {
private static final Logger logger = LoggerFactory.getLogger(MqSendMessageServiceImpl.class);
private static final String blacklistOrderOut = "utBlacklistOrderEvent-out-0";
private static final String blacklistConfigOut = "outBlacklistConfigEvent-out-0";
@Autowired
private StreamBridge streamBridge;
//执行 发送订单信息
public void sendBlackListOrderData(BlackListOrderDto blackListOrderDto) {
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS, "addBlackListOrder");
Message<String> message = MessageBuilder.createMessage(JSONObject.toJSONString(blackListOrderDto), new MessageHeaders(headers));
streamBridge.send(blacklistOrderOut, message);
}
//第二种方式
public void sendAddWhiteMobile(BlackListUserWhiteListDataEntity blackListUserWhiteListDataEntity) {
logger.info("发送同步新增用户白名单命令#{}" , blackListUserWhiteListDataEntity.getCaller());
final Message<String> message = MessageBuilder.withPayload(JSONObject.toJSONString(blackListUserWhiteListDataEntity))
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(MessageConst.PROPERTY_TAGS, "addWhiteMobile")
.build();
streamBridge.send(blacklistConfigOut, message);
}
特别注意事项:
-
如果从老版本升级到当前版本,有几个yml参数要修改,tag过滤字段由
tags
变为subscription,rockemq广播模式配置由 broadcasting: false 变为 messageModel: BROADCASTING
-
以上修改都是自己查看源代码找出来,网上还没相关教程,怎么看源码等等问题,这里就不献丑了,都是大神,如果有错别字和不正确,欢迎一起交流
版权声明:本文为caojianshuang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。