ActiveMQ的持久化机制

  • Post author:
  • Post category:其他




持久化简述

为了避免意外宕机或者其它情况而导致丢失信息 ,需要做到重启后可以恢复消息队列,消息系统一般采用持久化机制。ActiveMQ的消息持久化机制有JDBC、AMQ、KahaDB和LevelDB,无论采用哪种持久化机制,消息的存储逻辑都是一致的。


在发送者将消息发送出去以后,消息中心首先将消息存储到本机数据文件、内存数据库或者远程数据库等在将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。



持久化方式



KahaDB消息存储(默认)

基于日志文件,从ActiveMQ5.4开始的持久化默认插件。

KahaDB在消息保存目录中只有4类文件和一个lock,跟ActiveMQ的其它几种文件存储引擎相比比较简洁。

在这里插入图片描述

db1.log:KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db< Number >.log,当数据文件已满时会创建一个新的文件,number的数值也会随之递增。当不再有引用到数据文件中的任何消息时,文件会被删除或归档。

db.data:该文件包含了持久化的BTree索引,索引记录了消息数据记录中的消息,它是消息的索引文件,本质上是B树,使用B树作为索引指向db< number >.log里面存储的消息。

db.free:当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID。

db.redo:用来进行消息恢复,如果KahaDB消息存储在强制退出后启动,用于恢复B树索引。

lock:文件锁,表示当前获得KahaDB读写权限的broker。



JDBC消息存储

消息基于JDBC存储。



添加驱动

将驱动程序jarlib目录下,

驱动程序jar是一定需要的,但是数据源jar需要根据我们在配置数据源时具体使用的数据源来配置

,由于数据源使用的配置是默认的,activemq自带了这个数据源的jar,所以不用添加jar,如果是其它的数据源,一定要添加jar到lib目录下。

在这里插入图片描述



配置mysql数据源

在activemq.xml文件中配置数据源,代码如下:

	<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value="root"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean>

这里使用的是官网上提供的配置,dbcp2这个jar在如果要修改数据库连接池,需要添加对应的jar到lib文件夹下,上一步已经强调过,这里再次强调一次。



改变默认的持久化方式

先注释掉原有的persistenceAdapter,然后添加新的持久化方式,代码如下:

        <!--<persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter> -->
		<persistenceAdapter>
			<jdbcPersistenceAdapter  dataSource="#mysql-ds"/>
        </persistenceAdapter>



重新启动ActiveMQ

打开Sqlyog,可以看到在activemq数据库下面自动生成了三张表

在这里插入图片描述


表activemq_acks的作用


在这里插入图片描述


表activemq_lock的作用


在这里插入图片描述


表activemq_msgs的作用


在这里插入图片描述



点对点(p2p)模型测试

在点对点模型中,当DeliveryMode设置为,消息保存在内存中;当DeliveryMode设置为PERSISTENT时,消息保存在broker的相应的文件或者数据库中;在p2p模型中,消息一旦被消费者消费就从broker或数据库中删除。


生产者代码

public class JmsProduce {

    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws JMSException {

        //1.创建连接工场,按照给定Url地址,采用默认的用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        //2通过连接工场,获得连接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        

        //3.创建回话session,第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(是队列还是主题(topic))
        Queue queue = session.createQueue(QUEUE_NAME);

        //5.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        //设置持久化
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
		//设置完持久化以后再启动
		connection.start();
		
        //6.通过使用messageProducer生产3条消息发送到MQ的队列里面
        for (int i = 1; i <= 3; i++) {
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("msg---" + i);
            //8.通过messageProducer发送给mq
            messageProducer.send(textMessage);
        }

        //关闭资源
        messageProducer.close();
        session.close();
        connection.close();

        System.out.println("消息发送到MQ");

    }
}

运行上面的代码,在数据库activemq中的activemq_msgs表可以看到类似下图的信息

在这里插入图片描述


消费者代码

public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws JMSException, IOException {
        //1.创建连接工场,按照给定的Url地址,采用默认的用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL );
        //2.通过连接工场,获得连接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        //创建会话session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);

        //创建消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        //通过监听的方式来消费消息
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到消息" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        //留出一定时间,让消费者消费消息,不加的话消息没有被消费程序就停止运行了
        System.in.read();
        //关闭资源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

运行上面代码后,可以发现activemq数据的activemq_msgs表的数据被删除了

在这里插入图片描述



发布订阅模型测试

需要先启动消费者再启动生产者,在消费者要设置连接的ClientID,要创建TopicSubscriber 去订阅消息,启动需要放在后面。


消费者代码

public class JmsConsumerTopic {


    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    public static final String TOPIC_NAME = "topic-PERSISTENT-01";


    public static void main(String[] args) throws JMSException, IOException {
        //1.创建连接工场,按照给定的Url地址,采用默认的用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过连接工场,获得连接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        //必须要设置ClientID
        connection.setClientID("consumer01");

        //创建会话session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Topic topic = session.createTopic(TOPIC_NAME);
        //创建消费者
        MessageConsumer messageConsumer = session.createConsumer(topic);
        
		//要创建TopicSubscriber 去订阅
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"mq-jdbc");
			
        connection.start();

        //通过监听的方式来消费消息
        messageConsumer.setMessageListener(message -> {
            System.out.println("执行了");
            if (null != message && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费者接收到消息" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        //留出一定时间,让消费者消费消息,不加的话消息没有被消费程序就停止运行了
        System.in.read();
        //关闭资源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

运行上面代码以后,可以发现数据库activemq中的表activemq_acks多了一条记录

在这里插入图片描述


生产者代码

public class JmsProduceTopic {

    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    public static final String TOPIC_NAME = "topic-PERSISTENT-01";

    public static void main(String[] args) throws JMSException {
        //1.创建连接工场,按照给定Url地址,采用默认的用户名和密码
        ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        //2通过连接工场,获得连接connection并启动访问
        Connection connection = mqConnectionFactory.createConnection();


        //3.创建回话session,第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(是队列还是主题(topic))
        Topic topic = session.createTopic(TOPIC_NAME);

        //5.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(topic);
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        connection.start();
        //6.通过使用messageProducer生产3条消息发送到MQ的队列里面
        for (int i = 1; i <= 6; i++) {
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("msg---" + i);
            //8.通过messageProducer发送给mq
            messageProducer.send(textMessage);
        }

        //关闭资源
        messageProducer.close();
        session.close();
        connection.close();

        System.out.println("消息发送到MQ");
    }
}

运行上面代码以后,可以发现数据库activemq中的表activemq_msgs中新增了记录

在这里插入图片描述



LevelDB消息存储

也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性;但它不使用B树来实现索引预写日志,而是使用基于LevelDB的索引,了解即可。



AMQ Message Store

基于文件的存储方式,以前使用现在不使用了,了解即可。



参考



版权声明:本文为wsb_2526原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。