目录
事务消息
分布式系统中的事务可以用2pc(两阶段提交、tcc(补偿事务)来解决分布式系统中的消息原子性RocketMq4.3+ 提供分布式事务功能,通过Rocketmq事务消息能达到分布式事务的最终一致性
第一阶段尝试提交
第二阶段确认ok
数据库就是2pc 提交的数据不会立即生效。再次确认的时候才会持久化,如果给的rollback就会把数据丢弃
tcc try——confirm——cancel
RocketMq实现方式
Half Message
:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
检查事务状态
:Broker会开启一个定时任务,在消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。
超时
:如果超过回查次数,默认回滚消息
TransactionListener
executeLocalTransaction半消息发送成功触发来执行本地事务
checkLocalTransaction
broker将发送检查消息来检查事务状态,并将调用方法来获取本地事务状态
本地事务执行状态
public enum LocalTransactionState {
COMMIT_MESSAGE,//执行事务成功,确认提交
ROLLBACK_MESSAGE,//回滚消息,broker端回删除半消息
UNKNOW;//暂时为位置状态,等待broker回查
}
流程图
简单应用
TransactionMQProducer producer = new TransactionMQProducer("xxoogp");
//回调
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//执行本地事务
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//Broker端回调检查,检查事务
return LocalTransactionState.COMMIT_MESSAGE;
// return LocalTransactionState.UNKNOW;
// return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
重试机制
producer
默认超时时间
//异步发送时,重试次数,默认为2
producer.setRetryTimesWhenSendAsyncFailed(1);
//同步发送时,重试次数,默认为2
producer.setRetryTimesWhenSendFailed(1);
//是否向其他broker发送请求 默认为false
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
consumer
//消费超时,单位分钟
consumer.setConsumeTimeout(1);
//发送ack,消费失败
RECONSUME_LATER
broker投递
只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试,重投使用messageDelayLevel
默认值
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 19m 20m 30m 1h 2h
顺序消费
FIFO是topic里的queue来维护的。是一个几乎无限大的数组。每生成一个topic就会默认在里面创建4个queue。
为了保证FIFO,所以就要同步添加到同一topic的同一个queue里去。所以就有queue选择器
MessageQueueSelector
producer.send(message,
//queue选择器,向topic中的哪个queue去写消息
new MessageQueueSelector() {
@Override
//手动 选择一个queue list 所有的queue列表,msg具体要发的消息,o,外面send传的后一个参数
public MessageQueue select(List<MessageQueue> list, Message msg, Object o) {
return null;
}
},args,2000);
自定义实现这个方法选择queue。也有封装的三种
基于附带参数的hash
public class SelectMessageQueueByHash implements MessageQueueSelector {
public SelectMessageQueueByHash() {
}
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value %= mqs.size();
return (MessageQueue)mqs.get(value);
}
}
基于随机
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
public SelectMessageQueueByRandom() {
}
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = this.random.nextInt(mqs.size());
return (MessageQueue)mqs.get(value);
}
}
开源版本没有实现的
机房策略
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
public SelectMessageQueueByMachineRoom() {
}
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
public Set<String> getConsumeridcs() {
return this.consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
以上三种封装选择去topic下哪个queue里去传输数据
在消费者的时候
consumer.registerMessageListener()
MessageListenerConcurrently 并发消息/多线程
MessageListenerOrderly 顺序消费,对一个queue开启一个线程,多个queue多个线程
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
return null;
}
});
因为是多线程,所以就有一些
线程池的参数
可以配置
//最大开启消费线程数
consumer.setConsumeThreadMax(4);
//最小线程数
consumer.setConsumeThreadMin(1);