Replicated LevelDB集群的高可用

  • Post author:
  • Post category:其他




一.集群故障迁移和验证



1.思路

ActiveMQ的客户端只能访问Master的Broker,其他处于Slave的Broker不能访问,所以客户端连接的Broker应该使用failover协议(失败转移)。

当一个ActiveMQ节点挂掉或者一个Zookeeper节点挂掉,ActiveMQ服务依然正常运转,如果仅剩一个ActiveMQ节点,由于不能选举Master,所以ActiveMQ不能正常运行。

如果Zookeeper仅剩一个节点活动,不管ActiveMQ各节点存活,ActiveMQ也不能正常提供服务。(ActiveMQ集群的高可用,依赖于Zookeeper集群的高可用)



2.故障迁移的解决方案



(1)注意事项

当前集群除去宕机的activemq服务器,还有备用activemq服务器。并且activemq最少俩台,否则迁移失败。



(2)解决方案

//访问activemq的地址【设置访问地址的使用failover协议】
public static final String ACTIVEMQ_URL = "failover:(tcp://192.168.83.141:61616,tcp://192.168.83.141:61617,tcp://192.168.83.141:61618)?randomize=false"; 



3.演示



(1)代码块【队列消息生产者】

package activemqDB;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 测试activemq集群高可用
 * 消息生产者类
 */
public class JmsProduce {
    //定制activemq的地址,注意tcp协议
    public static final String ACTIVEMQ_URL = "failover:(tcp://192.168.83.141:61616,tcp://192.168.83.141:61617,tcp://192.168.83.141:61618)?randomize=false"; //访问activemq的地址
    public static final String QUEUE_NAME = "queue-cluster";  //队列名称
    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过连接工厂,获得连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建session,第一个参数事务,第二个参数签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);//参数为队列名称
        //5.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        //6.通过使用messageProducer生产3条消息发送到MQ的队列里面
        for(int i = 0;i < 3;i++){
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("msg----" + i);//理解一个字符串
            textMessage.setStringProperty("c01","这是对textMessage设置自定义属性");   //可选,设置自定义属性
            //8.通过messageProducer发送到activemq
            messageProducer.send(textMessage);
            //7.创建消息
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("k1","这是map形式的消息+k"+i);  //存储键值对消息
            //8.发送消息
            messageProducer.send(mapMessage);
        }
        //8.关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("消息发送完成");
    }
}



(2)代码块【队列消息消费者】

package activemqDB;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
 * 消息消费者(监听方式)
 */
public class JmsConsumer2 {
    //定制activemq的地址,注意tcp协议
    public static final String ACTIVEMQ_URL = "failover:(tcp://192.168.83.141:61616,tcp://192.168.83.141:61617,tcp://192.168.83.141:61618)?randomize=false"; //访问activemq的地址
    public static final String QUEUE_NAME = "queue-cluster";  //队列名称
    public static void main(String[] args) throws JMSException, IOException {
        //1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过连接工厂,获得连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建session,第一个参数事务,第二个参数签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);//参数为队列名称
        //5.创建消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //6.通监听的方式来消费消息
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //判断message是否为空,并且是否属于textmessage的子类,TextMessage是可变
                if(null != message && message instanceof TextMessage)
                {
                    //注意:消息提供者的类型则强转什么类型:TextMessage
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        //消费消息,则需要解决异常
                        System.out.println("接收的消息是" + textMessage.getText());
                        //接受自定义的属性
                        System.out.println("接收的消息是" + textMessage.getStringProperty("c01"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                //判断message是否为空,并且是否属于MapMessage的子类,MapMessage是可变
                if(null != message && message instanceof MapMessage)
                {
                    //注意:消息提供者的类型则强转什么类型:MapMessage
                    MapMessage mapMessage =(MapMessage) message;
                    try {
                        //消费消息,则需要解决异常
                        System.out.println("接收的map消息是" + mapMessage.getString("k1"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //7.使控制台保存工作
        System.in.read();   //细节:如果不加一直保持读取状态,则监听器无法消费消息
        //8.关闭资源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}



(3)范例



(3-1)生产者

图片

图片

图片



(3-2)消费者

图片

图片

图片



(4)故障演示

图片

图片



(5)消息传递演示结果

图片

图片



4.源码


demo01.rar



二.异步投递



1.说明

ActiveMQ支持同步、异步俩种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。

ActiveMQ默认使用异步发送的模式:除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这俩种情况都是同步发送的。

如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻塞producer知道broker返回一个确认,表示消息被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。

很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即时发送的是持久化的消息。



2.什么是异步发送

它可以最大化producer端的发送效率。我们通常在发送消息量比较秘籍的情况下使用异步发送,它可以很大的提升Producer性能;

不过这也带来额外的问题,就是需要消耗较多的Client端内存同时也会导致broker端性能消耗增加。此外它不能有效的确保消息的发送成功。在useAsyncSend=true的情况下客户端需要容忍消息丢失的可能。



3.解决方式【三种】

图片



4.如何确保异步投递的成功

异步发送丢失消息的场景是:生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息。

由于消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。

如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。

所以,正确的异步发送方法是需要接收回调的。

同步发送和异步发送的区别就在此,

同步发送等send不阻塞了就表示一定发送成功了,

异步发送需要接收回执并由客户端再判断一次是否发送成功。



5.演示【异步投递的搭建】



(1)代码块

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import javax.jms.*;
import java.util.UUID;
/**
 * 测试异步投递
 * 消息生产者类
 */
public class JmsProduce {
    //开启异步投递:第一种方式
//    public static final String ACTIVEMQ_URL = "tcp://192.168.83.141:61616?jms.useAsyncSend=true";
    public static final String ACTIVEMQ_URL = "tcp://192.168.83.141:61616";
    public static final String QUEUE_NAME = "queue-cluster";  //队列名称
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //开启异步投递:第二种方式
//        activeMQConnectionFactory.setUseAsyncSend(true);
        ActiveMQConnection activeMQConnection = (ActiveMQConnection)activeMQConnectionFactory.createConnection();
        //开启异步投递:第三种方式
        activeMQConnection.setUseAsyncSend(true);
        activeMQConnection.start();
        Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        ActiveMQMessageProducer activeMQMessageProducer= (ActiveMQMessageProducer)session.createProducer(queue);
        for(int i = 0;i < 3;i++){
            TextMessage textMessage = session.createTextMessage("msg----" + i);
            //设置自定义属性
            textMessage.setJMSMessageID(UUID.randomUUID().toString() + "---orderID");
            //获取id
            String msgID = textMessage.getJMSMessageID();
            //使用异步投递发送消息需要回调
            activeMQMessageProducer.send(textMessage, new AsyncCallback() {
                @Override
                //消息发送成功走该方法
                public void onSuccess() {
                    System.out.println(msgID + " has been ok send");
                }
                @Override
                //消息发送失败走该方法
                public void onException(JMSException exception) {
                    System.out.println(msgID + " fail to send");
                }
            });
        }
        activeMQMessageProducer.close();
        session.close();
        activeMQConnection.close();
        System.out.println("消息发送完成");
    }
}



(2)范例

图片

图片



6.源码


demo03.rar



三.延迟投递和定时投递



1.设置方式

Property name type description
AMQ_SCHEDULED_DELAY long 延迟投递的时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON string Cron表达式



2.思路

1.在activemq.xml中配置schedulerSupport属性为true。

2.Java代码里面封装的辅助消息类型:ScheduledMessage。



3.演示



(1)第一步每台activemq服务器设置schedulerSupport属性



(1-1)代码块

schedulerSupport="true"



(1-2)范例

图片

图片



(2)消息生产者类



(2-1)代码块

package yanchiAndDingshi;
import org.apache.activemq.*;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.UUID;
/**
 * 测试定时和延时
 * 消息生产者类
 */
public class JmsProduce {
    public static final String ACTIVEMQ_URL = "tcp://192.168.83.141:61616";
    public static final String QUEUE_NAME = "queue-cluster";  //队列名称
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        ActiveMQConnection activeMQConnection = (ActiveMQConnection)activeMQConnectionFactory.createConnection();
        activeMQConnection.start();
        Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        ActiveMQMessageProducer activeMQMessageProducer= (ActiveMQMessageProducer)session.createProducer(queue);
        //延时三秒
        long delay = 3 * 1000;
        //定时4秒
        long period = 4 * 1000;
        //重复5次
        int repeat = 5;
        for(int i = 0;i < 3;i++){
            TextMessage textMessage = session.createTextMessage("delayMessage----" + i);
            //设置自定义属性
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);//延时三秒
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period);//定时4秒
            textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);//重复5次
            //发送消息
            activeMQMessageProducer.send(textMessage);
        }
        activeMQMessageProducer.close();
        session.close();
        activeMQConnection.close();
        System.out.println("消息发送完成");
    }
}



(2-2)范例

图片

图片



4.源码


demo03.rar



四.消息重试机制



1.哪些情况会引发消息重发

1.client用了transactions且在session中调用了rollback()。

2.client用了transactions且在调用commit()之前关闭或者没有commit。

3.client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()。



2.消息重发时间间隔和重发次数【默认】

消息时间间隔:1s。

重试次数:6。



3.有毒消息Poiso ACK的理解



(1)解答

一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费端会给MQ发送一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。



(2)属性说明



(2-1)英文版

Property Default value Description
backOffMultiplier 5 The back-off multiplier.
collisionAvoidanceFactor 0.15 The percentage of range of collision avidance if enabled.
initialRedeliveryDelay 1000L The initial redelivery delay in milliseconds.
maximunRedeliveries 6 Sets the maximum number of times a message will be redelivered before it is considered a poisoned pill and returned to the broker so it can go to a Dead Letter Queue.Set to-1for unlimited redeliveries.
maximunRedeliveryDelay -1 Sets the maximum delivery delay that will be applied if theuseExponentialBackOffoption is set.(use value-1to define that no maximum be applied)(5.5).
useCollisionAvoidance false Should the redelivery policy use collision avoidance.
useExponentialBackOff false Should exponential back-off be used,i.e., toexponentially increase the timeout.



(2-2)中文版

1.collisionAvoidanceFactor:设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。默认值为0.15。

2.maximumRedeliveries:最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。默认值为6.

3.maximumRedeliveryDelay:最大传送延迟,只有在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大过最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1.

4.initialRedeliveryDelay:初始重发延迟时间,默认为1000L。

5.redeliveryDelay:重发延迟时间。当initalRedeliveryDelay=0时生效,默认为1000L。

6.useCollisionAvoidance:启用防止冲突功能,默认false。

7.useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认为false。

8.backOffMultiplier:重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。默认为5.



4.演示



(1)消息生产者



(1-1)代码块

package xiaoxichongfu;
import org.apache.activemq.*;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
 * 测试消息重复发送
 * 消息生产者类
 */
public class JmsProduce {
    public static final String ACTIVEMQ_URL = "tcp://192.168.83.141:61616";
    public static final String QUEUE_NAME = "queue-cluster";  //队列名称
    public static void main(String[] args) throws JMSException {
        
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        ActiveMQConnection activeMQConnection = (ActiveMQConnection)activeMQConnectionFactory.createConnection();
        activeMQConnection.start();
        Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        ActiveMQMessageProducer activeMQMessageProducer= (ActiveMQMessageProducer)session.createProducer(queue);
        for(int i = 0;i < 3;i++){
            TextMessage textMessage = session.createTextMessage("message----" + i);
            //发送消息
            activeMQMessageProducer.send(textMessage);
        }
        activeMQMessageProducer.close();
        session.close();
        activeMQConnection.close();
        System.out.println("消息发送完成");
    }
}



(1-2)范例

图片

图片



(2)消息消费者



(2-1)代码块

package xiaoxichongfu;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import javax.jms.*;
/**
 * 测试消息重复发送
 * 消息消费者(receive)
 */
public class JmsConsumer {
    //定制activemq的地址,注意tcp协议
    public static final String ACTIVEMQ_URL = "tcp://192.168.83.141:61616"; //访问activemq的地址
    public static final String QUEUE_NAME = "queue-cluster";  //队列名称
    public static void main(String[] args) throws JMSException
    {
        //1.创建连接工厂,按照给定的url地址,采用默认用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //设置重复消费的次数----开始
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(3);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        //设置重复消费的次数----结束
        //2.通过连接工厂,获得连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建session,第一个参数事务,第二个参数签收
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);//参数为队列名称
        //5.创建消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        TextMessage textMessage = null;
        //6.接受消息
        while (true)
        {
            textMessage = (TextMessage)messageConsumer.receive(1000L);
            if(null != textMessage)
            {
                System.out.println("消息消费者接收到的消息是" + textMessage.getText());
            }else{
                break;
            }
        }
        //7.事务提交
//        session.commit();
        //8.关闭资源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}



(2-2)范例

图片

图片

图片



(3)死信队列

图片



(4)源码


demo03.rar



五.死信队列



1.概述

ActiveMQ中引入了”死信队列”(Dead Letter Queue)的概念。即一条消息再被重发了多次后(默认为重发6次redeliveryCounter==6),将会被ActiveMQ移入”死信队列”。开发人员可以在这个Queue中查看处理出错的消息,进行人工干预。

图片



2.作用

处理失败的消息。



3.图解

图片



4.业务逻辑

  1. 一般生产环境中在使用MQ的时候设计俩个队列:一个是核心业务队列,一个是死信队列。
  2. 核心业务队列,就是比如上图专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况。
  3. 假如第三方物流系统故障了此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送都会遇到对方的接口报错。。此时仓储系统就可以把这条消息拒绝访问或者标志位处理失败。一旦标志这条消息处理失败了之后,MQ就会把这条消息转入提前设置好的一个死信队列中。然后你会看到的就是,在第三方物流系统的故障期间,所有订单消息全部处理失败,全部会转入死信队列。然后你的仓储系统得专门有一个后台线程,监控第三方物流系统是否正常,能否请求,不停的监视。一旦发现对方恢复正常,这个后台线程就从死信队列消费出来处理失败的订单,重新执行发货和配送的通知逻辑。



5.策略方案一:共享死信队列【默认】



(1)说明

将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。

共享队列默认为”ActiveMQ.DLQ”,可以通过”deadLetterQueue”属性来设定。



(2)设置方式

<deadLetterStrategy>
    <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>



6.策略方案二:独立死信队列



(1)说明

对于Queue而言,死信通道的前缀默认为”ActiveMQ.DLQ.Queue.”。

对于Topic而言,死信通道的前缀默认为”ActiveMQ.DLQ.Topic.”。

比如队列Order,那么它对应的死信通道为”ActiveMQ.DLQ.Queue.Order”。

我们使用”queuePrefix” “topicPrefix”来指定上述前缀。



(2)设置方式

默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue;不过开发者也可以指定为Topic。

<policyEntry queue="order">
  <deadLetterStrategy>
    <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessage="false"/>
   </deadLetterStrategy>
 </policyEntry>

将队列Order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic。

属性”useQueueForTopicMessages”,此值表示是否将Topic的DeadLetter保存在Queue中,默认为true。



7.策略方案三:自动删除过期消息

有时需要直接删除过期的消息而不需要发送到死信队列中,”processExpired”表示是否将过期消息放入死信队列,默认为true。

<policyEntry queue=">">
  <deadLetterStrategy>
    <sharedDeadLetterStrategy processExpired="false"/>
  </deadLetterStrategy>
</policyEntry>



8.策略方案四:存放非持久消息到死信队列中

默认情况下,Activemq不会把非持久的死消息发送到死信队列中。

“processNonPersistent”表示是否将”非持久化”消息放入死信队列,默认为false。

非持久化如果你想把非持久的消息发送到死信队列中,需要设置属性processNonPersistent=“true”

<policyEntry queue=">">
  <deadLetterStrategy>
    <sharedDeadLetterStrategy processNonPersistent="true"/>
  </deadLetterStrategy>
</policyEntry>



六.防止重复消费



1.说明

网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。



2.解决方案一

如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。



3.解决方案二【推荐】

如果上述俩种情况还不行,准备一个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没有消费记录即可。



版权声明:本文为qq_45421186原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。