一、yml配置
spring:
rabbitmq:
username: admin
password: admin
host: localhost
port: 5672
virtual-host: /
publisher-confirm: true #发布确认 开启confirms回调 Producer -> Exchange
publisher-returns: true #发布返回 开启returnedMessage回调 Exchange -> Queue
listener:
type: simple
simple:
acknowledge-mode: manual #消费端收到消息后的确认方式 manual手动确认 none自动确认
prefetch: 1 #消费者预取1条数据到内存
default-requeue-rejected: false #决定被拒绝的消息是否重新入队。默认值为true,需要手动basicNack时这些参数谅失效了
retry:
enabled: true #开启消费者 程序异常情况下会进行重试
max-attempts: 3 #重试次数
initial-interval: 2000 #消费者重试间隔次数 2s
2、RabbitConfig
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* rabbitTemplate
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
//消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功发送至Exchange");
} else {
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
//触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
//失败回调 消息是否从Exchange路由到Queue (只有消息从Exchange路由到Queue失败才会回调这个方法)
rabbitTemplate.setReturnsCallback(returned ->
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText(), returned.getMessage())
);
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
/**
* 普通队列交换机
*/
@Bean
public FanoutExchange testFanoutExchange() {
return new FanoutExchange(RabbitConstants.TEST_FANOUT_EXCHANGE, true, false);
}
/**
* 普通队列
* 基于消息事务的处理方式,当失败消息进行重试,有时间间隔,当达到超时时,就发送到死信队列,等待人工处理
* @return
*/
@Bean
public Queue testFanoutQueue() {
//超时2s就放入死信队列
return QueueBuilder.durable(RabbitConstants.TEST_FANOUT_QUEUE_A).deadLetterExchange(RabbitConstants.TEST_DDL_EXCHANGE).ttl(2000).build();
}
/**
* 普通队列绑定到交换机
* @return
*/
@Bean
public Binding testFanoutBinding() {
return BindingBuilder.bind(testFanoutQueue()).to(testFanoutExchange());
}
/**
* 死信交换机
* @return
*/
@Bean
public FanoutExchange testFanoutDdlExchange() {
return new FanoutExchange(RabbitConstants.TEST_DDL_EXCHANGE, true, false);
}
/**
* 死信队列
* @return
*/
@Bean
public Queue testFanoutDdlQueue() {
return new Queue(RabbitConstants.TEST_DDL_QUEUE_A, true, false, false);
}
/**
* 死信队列绑定至交换机
* @return
*/
@Bean
public Binding testFanoutDdlBinding() {
return BindingBuilder.bind(testFanoutDdlQueue()).to(testFanoutDdlExchange());
}
3、消息实体
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value="MsgLog对象", description="消息投递日志")
public class MsgLog implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "消息唯一标识")
private String msgId;
@ApiModelProperty(value = "消息体, json格式化")
private String msg;
@ApiModelProperty(value = "交换机")
private String exchange;
@ApiModelProperty(value = "路由键")
private String routingKey;
@ApiModelProperty(value = "状态: 0投递中 1投递成功 2投递失败 3已消费")
private Integer status;
@ApiModelProperty(value = "重试次数")
private Integer tryCount;
@ApiModelProperty(value = "下一次重试时间")
private Date nextTryTime;
@ApiModelProperty(value = "创建时间")
private Date createTime;
@ApiModelProperty(value = "更新时间")
private Date updateTime;
}
4、producer
/**
* @author : lixuan
* @date : 2021/04/07/10:55
* @description: 生产者
*/
public interface ProducerService {
/**
* 发送消息
*
* @param msg 某些你需要的参数
* @param exchangeName 队列名称
* @param routingKey 路由key
* @return
*/
void sendMsg(Msg msg, String exchangeName, String routingKey);
}
/**
* @author : lixuan
* @date : 2021/04/07/10:57
* @description: 消息生产者
*/
@Service
public class ProducerServiceImpl implements ProducerService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendMsg(Msg msg, String exchangeName, String routingKey) {
if (StringUtils.isBlank(msg.getMsgId())) {
String msgId = UUID.randomUUID().toString().replaceAll("-", "");
msg.setMsgId(msgId);
}
CorrelationData correlationData = new CorrelationData(msg.getMsgId());
//发送消息到rabbitMQ
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationData);
}
}
5、consumer
/**
* @author : lixuan
* @date : 2021/04/07/11:23
* @description: 消息接收者
*/
@Component
@Slf4j
public class MessageReceiver {
@Autowired
private MsgService msgService;
@Autowired
private MsgLogService msgLogService;
@RabbitListener(queues = RabbitConstants.TEST_FANOUT_QUEUE_A)
public void testMsgReceiver(Message message, Channel channel) throws IOException, InterruptedException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
Msg msg = JSON.parseObject(message.getBody(), Msg.class);
try {
int a = 1 / 0;
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
//重试次数
Integer retryCount;
String mapKey = "retry-count";
if (!headers.containsKey(mapKey)) {
retryCount = 0;
} else {
retryCount = (Integer) headers.get(mapKey);
}
if (retryCount++ < RETRY) {
log.info("已经重试 " + retryCount + " 次");
headers.put("retry-count", retryCount);
//当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部。
//这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行
//而比较理想的方式是,出现异常时,消息到达消息队列尾部,这样既保证消息不回丢失,又保证了正常业务的进行。
//因此我们采取的解决方案是,将消息进行应答。
//这时消息队列会删除该消息,同时我们再次发送该消息 到消息队列,这时就实现了错误消息进行消息队列尾部的方案
//1.应答
channel.basicAck(deliveryTag, false);
//2.重新发送到MQ中
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("application/json").headers(headers).build();
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), basicProperties,
message.getBody());
} else {
log.info("现在重试次数为:" + retryCount);
/**
* 重要的操作 存盘
* 手动ack
* channel.basicAck(deliveryTag,false);
* 通知人工处理
* log.error("重试三次异常,快来人工处理");
*/
//消息存盘
MsgLog msgLog = new MsgLog();
msgLog.setMsgId(msg.getMsgId());
msgLog.setMsg(new String(message.getBody(),"utf-8"));
msgLog.setExchange(message.getMessageProperties().getReceivedExchange());
msgLog.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
msgLog.setTryCount(retryCount);
msgLog.setStatus(MsgLogStatusEnum.FAIL.getStatus());
msgLogService.save(msgLog);
/**
* 不重要的操作放入 死信队列
* 消息异常处理:消费出现异常后,延时几秒,然后从新入队列消费,直到达到ttl超时时间,再转到死信,证明这个信息有问题需要人工干预
*/
//休眠2s 延迟写入队列,触发转入死信队列
//Thread.sleep(2000);
//channel.basicNack(deliveryTag, false, true);
}
}
}
@RabbitListener(queues = RabbitConstants.TEST_DDL_QUEUE_A)
public void deadTestReceiver(Message message, Channel channel) throws IOException {
log.info("消息将放入死信队列", new String(message.getBody(), "UTF-8"));
String str = new String(message.getBody());
//转换为消息实体
Msg msg = JSON.parseObject(str, Msg.class);
log.info("收到的消息为{}", msg);
}
}
6、写个controller 自己测试一下。
7、postman测试一下
8、查看控制台
9、查看数据库