持久化简述
为了避免意外宕机或者其它情况而导致丢失信息 ,需要做到重启后可以恢复消息队列,消息系统一般采用持久化机制。ActiveMQ的消息持久化机制有JDBC、AMQ、KahaDB和LevelDB,无论采用哪种持久化机制,消息的存储逻辑都是一致的。
在发送者将消息发送出去以后,消息中心首先将消息存储到本机数据文件、内存数据库或者远程数据库等在将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。
持久化方式
KahaDB消息存储(默认)
基于日志文件,从ActiveMQ5.4开始的持久化默认插件。
KahaDB在消息保存目录中只有4类文件和一个lock,跟ActiveMQ的其它几种文件存储引擎相比比较简洁。
db1.log:KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db< Number >.log,当数据文件已满时会创建一个新的文件,number的数值也会随之递增。当不再有引用到数据文件中的任何消息时,文件会被删除或归档。
db.data:该文件包含了持久化的BTree索引,索引记录了消息数据记录中的消息,它是消息的索引文件,本质上是B树,使用B树作为索引指向db< number >.log里面存储的消息。
db.free:当前db.data文件里面哪些页面是空闲的,文件具体内容是所有空闲页的ID。
db.redo:用来进行消息恢复,如果KahaDB消息存储在强制退出后启动,用于恢复B树索引。
lock:文件锁,表示当前获得KahaDB读写权限的broker。
JDBC消息存储
消息基于JDBC存储。
添加驱动
将驱动程序jarlib目录下,
驱动程序jar是一定需要的,但是数据源jar需要根据我们在配置数据源时具体使用的数据源来配置
,由于数据源使用的配置是默认的,activemq自带了这个数据源的jar,所以不用添加jar,如果是其它的数据源,一定要添加jar到lib目录下。
配置mysql数据源
在activemq.xml文件中配置数据源,代码如下:
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
这里使用的是官网上提供的配置,dbcp2这个jar在如果要修改数据库连接池,需要添加对应的jar到lib文件夹下,上一步已经强调过,这里再次强调一次。
改变默认的持久化方式
先注释掉原有的persistenceAdapter,然后添加新的持久化方式,代码如下:
<!--<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter> -->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
重新启动ActiveMQ
打开Sqlyog,可以看到在activemq数据库下面自动生成了三张表
表activemq_acks的作用
表activemq_lock的作用
表activemq_msgs的作用
点对点(p2p)模型测试
在点对点模型中,当DeliveryMode设置为,消息保存在内存中;当DeliveryMode设置为PERSISTENT时,消息保存在broker的相应的文件或者数据库中;在p2p模型中,消息一旦被消费者消费就从broker或数据库中删除。
生产者代码
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws JMSException {
//1.创建连接工场,按照给定Url地址,采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2通过连接工场,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
//3.创建回话session,第一个参数是事务,第二个参数是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(是队列还是主题(topic))
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//设置持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
//设置完持久化以后再启动
connection.start();
//6.通过使用messageProducer生产3条消息发送到MQ的队列里面
for (int i = 1; i <= 3; i++) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("msg---" + i);
//8.通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送到MQ");
}
}
运行上面的代码,在数据库activemq中的activemq_msgs表可以看到类似下图的信息
消费者代码
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工场,按照给定的Url地址,采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL );
//2.通过连接工场,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地
Queue queue = session.createQueue(QUEUE_NAME);
//创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
//通过监听的方式来消费消息
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者接收到消息" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//留出一定时间,让消费者消费消息,不加的话消息没有被消费程序就停止运行了
System.in.read();
//关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}
运行上面代码后,可以发现activemq数据的activemq_msgs表的数据被删除了
发布订阅模型测试
需要先启动消费者再启动生产者,在消费者要设置连接的ClientID,要创建TopicSubscriber 去订阅消息,启动需要放在后面。
消费者代码
public class JmsConsumerTopic {
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static final String TOPIC_NAME = "topic-PERSISTENT-01";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工场,按照给定的Url地址,采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工场,获得连接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
//必须要设置ClientID
connection.setClientID("consumer01");
//创建会话session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地
Topic topic = session.createTopic(TOPIC_NAME);
//创建消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
//要创建TopicSubscriber 去订阅
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"mq-jdbc");
connection.start();
//通过监听的方式来消费消息
messageConsumer.setMessageListener(message -> {
System.out.println("执行了");
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者接收到消息" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//留出一定时间,让消费者消费消息,不加的话消息没有被消费程序就停止运行了
System.in.read();
//关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}
运行上面代码以后,可以发现数据库activemq中的表activemq_acks多了一条记录
生产者代码
public class JmsProduceTopic {
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static final String TOPIC_NAME = "topic-PERSISTENT-01";
public static void main(String[] args) throws JMSException {
//1.创建连接工场,按照给定Url地址,采用默认的用户名和密码
ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2通过连接工场,获得连接connection并启动访问
Connection connection = mqConnectionFactory.createConnection();
//3.创建回话session,第一个参数是事务,第二个参数是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(是队列还是主题(topic))
Topic topic = session.createTopic(TOPIC_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(topic);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
//6.通过使用messageProducer生产3条消息发送到MQ的队列里面
for (int i = 1; i <= 6; i++) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("msg---" + i);
//8.通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发送到MQ");
}
}
运行上面代码以后,可以发现数据库activemq中的表activemq_msgs中新增了记录
LevelDB消息存储
也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久性;但它不使用B树来实现索引预写日志,而是使用基于LevelDB的索引,了解即可。
AMQ Message Store
基于文件的存储方式,以前使用现在不使用了,了解即可。
参考