spring cloud stream 整合 rocketMQ

  • Post author:
  • Post category:其他


1.添加依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

2.添加配置

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        output:
          # 指定topic
          destination: financial
          group: financial
        input:
          # 指定topic,要与生产者的topic匹配
          destination: financial
          # 根据业务指定
          # 一定要设置,否则会启动报错
          # 如果使用的是其他的MQ,可以留空
          group: financial

3.启动类处理

@EnableBinding({Sink.class, Source.class})
public class FinancialApplication {
    public static void main(String[] args) {
        SpringApplication.run(FinancialApplication.class, args);
    }
}

4.编写生产者

@Autowired
private Source source;

public void testStream() {
    this.source.output().send(
                MessageBuilder
                        .withPayload("你好,请问在吗?")
                        //1.Header 可不设置
                        //2.Header 可用来单独传递参数,对象请转换为jsonString
                        //3.Header 可用于对消息进行选择性接收:3种方式参考:https://www.imooc.com/article/290424
                        .setHeader("type","order")
                        .setHeader("identification",1001)
                        .build()
        );
}

5.编写消费者

@StreamListener(Sink.INPUT)
public void receive(String messageBody){
    log.info("rocketMQ -----------> " + messageBody);
}

6.传递对象

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class FAccountingEntity{

    @ApiModelProperty(value = "修改人")
    private String updateBy;

    @ApiModelProperty(value = "修改时间")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime updateTime;
}

FAccountingEntity obj = FAccountingEntity
    .builder()
    .updateBy("qxx")
    .updateTime(LocalDateTime.now())
    .build();
this.source.output().send(
    MessageBuilder
    .withPayload(JSON.toJSONStringWithDateFormat(obj, "yyyy-MM-dd HH:mm:ss", SerializerFeature.WriteDateUseDateFormat))
    .build()
    );
@StreamListener(Sink.INPUT)
public void receive(@Payload FAccountingEntity obj, @Headers Map<String,String> headers, @Header(name = "type") String type) {
    log.info(type);
    log.info(headers.toString());
}

7.消费者出现异常

1. spring cloud stream补偿机制,每组默认收到3次消息通知,自定义配置:spring.cloud.stream.bindings.input.consumer.max-attempts=1

2.每组收到3次串行消息通知,每次收到消息通知,业务处理得到结果后再接收下一次。3次消息通知的间隔时间是依业务处理时间确定的。如果当前业务处理逻辑抛出异常,则接收下一次消息通知。如果当前业务处理逻辑成功(没有异常),则拒收消息通知。

3.会重试很多组,组和组之间时间间隔:10s、30s、1m、2m、3m、4m、5m……

4.消息接收服务重启后,通知时间规则不变,不会重新计时

例:通知时间为:10:26:43、10:26:54:、10:27:25、10:28:26……,消息接收服务在10:27:00重启,则接收到下一次通知的时间是10:27:25.

5.消息通知是串行的,在确保消息不重复的情况下,一条消息只能被成功消费一次,不会出现重复消费的情况



版权声明:本文为qq_37128815原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。