rocketmq 常用Api(二)

  • Post author:
  • Post category:其他



目录


事务消息


流程图


简单应用


重试机制


顺序消费


事务消息

分布式系统中的事务可以用2pc(两阶段提交、tcc(补偿事务)来解决分布式系统中的消息原子性RocketMq4.3+ 提供分布式事务功能,通过Rocketmq事务消息能达到分布式事务的最终一致性

第一阶段尝试提交

第二阶段确认ok


数据库就是2pc 提交的数据不会立即生效。再次确认的时候才会持久化,如果给的rollback就会把数据丢弃


tcc  try——confirm——cancel


RocketMq实现方式


Half Message

:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中


检查事务状态

:Broker会开启一个定时任务,在消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。


超时

:如果超过回查次数,默认回滚消息


TransactionListener

executeLocalTransaction半消息发送成功触发来执行本地事务


checkLocalTransaction

broker将发送检查消息来检查事务状态,并将调用方法来获取本地事务状态


本地事务执行状态

public enum LocalTransactionState {
    COMMIT_MESSAGE,//执行事务成功,确认提交
    ROLLBACK_MESSAGE,//回滚消息,broker端回删除半消息
    UNKNOW;//暂时为位置状态,等待broker回查
}

流程图

简单应用

TransactionMQProducer producer = new TransactionMQProducer("xxoogp");
//回调
producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                //执行本地事务
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                //Broker端回调检查,检查事务
                return LocalTransactionState.COMMIT_MESSAGE;
//                return LocalTransactionState.UNKNOW;
//                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });

重试机制


producer

默认超时时间

//异步发送时,重试次数,默认为2
producer.setRetryTimesWhenSendAsyncFailed(1);
//同步发送时,重试次数,默认为2
producer.setRetryTimesWhenSendFailed(1);
//是否向其他broker发送请求 默认为false
producer.setRetryAnotherBrokerWhenNotStoreOK(true);


consumer

//消费超时,单位分钟
consumer.setConsumeTimeout(1);
//发送ack,消费失败
RECONSUME_LATER


broker投递

只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试,重投使用messageDelayLevel

默认值

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 19m 20m 30m 1h 2h

顺序消费

FIFO是topic里的queue来维护的。是一个几乎无限大的数组。每生成一个topic就会默认在里面创建4个queue。

为了保证FIFO,所以就要同步添加到同一topic的同一个queue里去。所以就有queue选择器

MessageQueueSelector

producer.send(message,
        //queue选择器,向topic中的哪个queue去写消息
        new MessageQueueSelector() {
            @Override
            //手动 选择一个queue  list 所有的queue列表,msg具体要发的消息,o,外面send传的后一个参数
            public MessageQueue select(List<MessageQueue> list, Message msg, Object o) {
                return null;
            }
        },args,2000);

自定义实现这个方法选择queue。也有封装的三种


基于附带参数的hash

public class SelectMessageQueueByHash implements MessageQueueSelector {
    public SelectMessageQueueByHash() {
    }

    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value %= mqs.size();
        return (MessageQueue)mqs.get(value);
    }
}


基于随机

public class SelectMessageQueueByRandom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());

    public SelectMessageQueueByRandom() {
    }

    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = this.random.nextInt(mqs.size());
        return (MessageQueue)mqs.get(value);
    }
}

开源版本没有实现的

机房策略

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set<String> consumeridcs;

    public SelectMessageQueueByMachineRoom() {
    }

    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return null;
    }

    public Set<String> getConsumeridcs() {
        return this.consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}

以上三种封装选择去topic下哪个queue里去传输数据

在消费者的时候

consumer.registerMessageListener()

MessageListenerConcurrently 并发消息/多线程

MessageListenerOrderly 顺序消费,对一个queue开启一个线程,多个queue多个线程

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        return null;
    }
});

因为是多线程,所以就有一些

线程池的参数

可以配置

//最大开启消费线程数
consumer.setConsumeThreadMax(4);
//最小线程数
consumer.setConsumeThreadMin(1);



版权声明:本文为randomswap原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。