1.添加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
2.添加配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
# 指定topic
destination: financial
group: financial
input:
# 指定topic,要与生产者的topic匹配
destination: financial
# 根据业务指定
# 一定要设置,否则会启动报错
# 如果使用的是其他的MQ,可以留空
group: financial
3.启动类处理
@EnableBinding({Sink.class, Source.class})
public class FinancialApplication {
public static void main(String[] args) {
SpringApplication.run(FinancialApplication.class, args);
}
}
4.编写生产者
@Autowired
private Source source;
public void testStream() {
this.source.output().send(
MessageBuilder
.withPayload("你好,请问在吗?")
//1.Header 可不设置
//2.Header 可用来单独传递参数,对象请转换为jsonString
//3.Header 可用于对消息进行选择性接收:3种方式参考:https://www.imooc.com/article/290424
.setHeader("type","order")
.setHeader("identification",1001)
.build()
);
}
5.编写消费者
@StreamListener(Sink.INPUT)
public void receive(String messageBody){
log.info("rocketMQ -----------> " + messageBody);
}
6.传递对象
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class FAccountingEntity{
@ApiModelProperty(value = "修改人")
private String updateBy;
@ApiModelProperty(value = "修改时间")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime updateTime;
}
FAccountingEntity obj = FAccountingEntity
.builder()
.updateBy("qxx")
.updateTime(LocalDateTime.now())
.build();
this.source.output().send(
MessageBuilder
.withPayload(JSON.toJSONStringWithDateFormat(obj, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteDateUseDateFormat))
.build()
);
@StreamListener(Sink.INPUT)
public void receive(@Payload FAccountingEntity obj, @Headers Map<String,String> headers, @Header(name = "type") String type) {
log.info(type);
log.info(headers.toString());
}
7.消费者出现异常
1. spring cloud stream补偿机制,每组默认收到3次消息通知,自定义配置:spring.cloud.stream.bindings.input.consumer.max-attempts=1
2.每组收到3次串行消息通知,每次收到消息通知,业务处理得到结果后再接收下一次。3次消息通知的间隔时间是依业务处理时间确定的。如果当前业务处理逻辑抛出异常,则接收下一次消息通知。如果当前业务处理逻辑成功(没有异常),则拒收消息通知。
3.会重试很多组,组和组之间时间间隔:10s、30s、1m、2m、3m、4m、5m……
4.消息接收服务重启后,通知时间规则不变,不会重新计时
例:通知时间为:10:26:43、10:26:54:、10:27:25、10:28:26……,消息接收服务在10:27:00重启,则接收到下一次通知的时间是10:27:25.
5.消息通知是串行的,在确保消息不重复的情况下,一条消息只能被成功消费一次,不会出现重复消费的情况
版权声明:本文为qq_37128815原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。