ActiveMQ的消息存储(八)

  • Post author:
  • Post category:其他


1.队列存储

   Queues采取先进先出模式,同一时间,消息只会发送给某一个消费者,只有当该消息被消费并告知已收到时,它才能在代理的存储中被删除。

这里写图片描述

   而对于持久性Topic来说,每一个消费者都会获取消息的拷贝。为了节约空间,代理的存储介质中只存储了一份消息,存储介质的持久订阅对象为其以后的被存储的消息维护了一个指针,消费者消费时,从存储介质中复制一个消息,指针指向这个消息。消息被所有订阅者获取后才能删除

这里写图片描述

2.文件存储(kaha存储)

从ActiveMQ5.3后,推荐使用KahaDB存储普通用途的消息。KahaDB是一种基于文件的消息存储机制,为了提高消息存储的可靠性和可恢复性,它整合了一个事务日志。KahaDB拥有高性能和可扩展性等特点。由于KahaDB使用的是基于文件的存储,所以不需要使用第三方数据库。

KahaDB消息存储机制为所有目的地使用一个索引,其索引使用一个事务日志。

KahaDB被使用在10000个活动连接的产品环境,每个连接都拥有一个分割的队列。

KahaDB可以通过配置或硬代码实现,配置文件在/config/activemq.xml中,以下是配置方式:

   <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="true">
 <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
 </persistenceAdapter>

这个directory文件在apache-activitymq-xxx/data/kahadb中

这个文件的目录结构如下图所示:

这里写图片描述

  • archive directory 这个文件只有当archiveDataLogs属性为true时才会存在,用来存储KahaDb不再需要但是需要进行恢复时才会用到这个archived中的数据。

  • db.data 里面记录着B树的索引,指向log文件的位置。

  • db.redo 当硬件发生意外时,用来恢复B树的索引。

可选择的配置

这里写图片描述

3.文件存储(AMQ存储)

AMQ消息存储与KahaDB消息存储类似,由一个提供可靠持续性的事务日志以及高效索引组成,当一个应用中,消息吞吐量是主要需求时,AMQ是最好的选择。但由于它为每个索引使用了两个分隔文件,而每个目的地都有一个索引,所以它不能被使用于每个代理拥有成百上千个队列的情况。ActiveMQ代理没有被完全关闭时,索引的覆盖也会很慢。这是因为所有的索引都需要被重建。

AMQ的使用方式

<?xml version="1.0" encoding="UTF-8"?>
<beans>
  <broker xmlns="http://activemq.apache.org/schema/core">
   <persistenceAdapter>
<amqPersistenceAdapter directory="target/Broker2-data/activemq-data" syncOnWrite="true"
indexPageSize="16kb"
indexMaxBinSize="100"
maxFileLength="10mb" />
    </persistenceAdapter>
</broker>
</beans>

AMQ存储的配置选项:

这里写图片描述

AMQ的文件存储结构

这里写图片描述

lock file保证只有一个broker可以访问
tmp-storage存储临时文件,一般是非持久化消息
The data directory负责储存消息在log中的引用的索引
The state directory负责记录Topic消费者的状态
The archive directory和kaha的目录一样
The journal directory储存数据,data-control里面有元数据信息,还会记录每个数据的引用次数,消息传递完成后,会被删除

4.JDBC存储

jdbc存储数据需要用到三个表

ACTIVITY-MSGS负责储存Topic和Queues的数据

这里写图片描述


ACTIVITY-ACKS负责管理Message的状态

这里写图片描述


ACTIVEMQ_LOCK负责控制只能有一个broker访问数据库的三个表

这里写图片描述

<?xml version="1.0" encoding="UTF-8"?>
<beans>

<broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
          <persistenceAdapter>
     <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
  </persistenceAdapter>
</broker>

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url"
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/> 
</bean>
</beans>

5.内存存储方式

方式一:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
   <broker brokerName="test-broker"
           persistent="false"
xmlns="http://activemq.apache.org/schema/core"> <transportConnectors>
<transportConnector uri="tcp://localhost:61635"/> </transportConnectors>
    </broker>
</beans>

方式二:

import org.apache.activemq.broker.BrokerService;
public void createEmbeddedBroker() throws Exception {
BrokerService broker = new BrokerService(); //configure the broker to use the Memory Store broker.setPersistent(false);
    //Add a transport connector
    broker.addConnector("tcp://localhost:61616");
    //now start the broker
    broker.start();
}



版权声明:本文为qq_33394088原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。