目录
1. ActiveMQ清除不活跃的订阅者
背景:
通过MQ推送数据给第三方,第三方订阅者为
持久订阅者
,测试时候设置了不同的订阅名称(每次名称不同即使ClientId也会创建新的持久订阅者),导致原来无用的持久订阅者没有删除,然后导致发送的消息不活跃的订阅者没及时消费的话不会清理,导致消息堆积,数据量大时候导致消息堆积,影响mq性能
解决方案:
mq的设置清除不活跃的订阅者 在activeMQ的
activemq.xml
文件中 <broker name=”localhost” offlineDurableSubscriberTimeout=”86400000″ offlineDurableSubscriberTaskSchedule=”3600000″> broker标签里面加入 offlineDurableSubscriberTimeout=”86400000″ offlineDurableSubscriberTaskSchedule=”3600000″ 单位为毫秒 然后重启mq,会定时检测不活跃的订阅者,然后清除。
参考mq官网:
http://activemq.apache.org/manage-durable-subscribers.html
2.ActiveMQ启动时候报错的问题
ActiveMQ启动报错
Failed to start Apache ActiveMQ ([localhost, null], java.io.IOException: Detected missing/corrupt journal files referenced by:[0:ExceptionDLQ.Activit yResultPostProcess] 10 messages affected.)
一个跑了满久的activemq停止后再启动就自动退出了,查看日志有以下报错:
2018-06-11 17:54:21,483 | WARN | Some journal files are missing: [17496, 17495, 17494, 11811, 11807, 11793] | org.apache.activemq.store.kahadb.MessageDatabase | main
2018-06-11 17:54:21,704 | ERROR | [0:ExceptionDLQ.ActivityResultPostProcess] references corrupt locations. 10 messages affected. | org.apache.activemq.store.kahadb.MessageDatabase | m
ain
2018-06-11 17:54:21,706 | ERROR | Failed to start Apache ActiveMQ ([localhost, null], java.io.IOException: Detected missing/corrupt journal files referenced by:[0:ExceptionDLQ.Activit
yResultPostProcess] 10 messages affected.) | org.apache.activemq.broker.BrokerService | main
2018-06-11 17:54:21,711 | INFO | Apache ActiveMQ 5.13.3 (localhost, null) is shutting down | org.apache.activemq.broker.BrokerService | main
2018-06-11 17:54:21,715 | INFO | Connector openwire stopped | org.apache.activemq.broker.TransportConnector | main
2018-06-11 17:54:21,718 | INFO | Connector amqp stopped | org.apache.activemq.broker.TransportConnector | main
2018-06-11 17:54:21,721 | INFO | Connector stomp stopped | org.apache.activemq.broker.TransportConnector | main
2018-06-11 17:54:21,724 | INFO | Connector mqtt stopped | org.apache.activemq.broker.TransportConnector | main
2018-06-11 17:54:21,727 | INFO | Connector ws stopped | org.apache.activemq.broker.TransportConnector | main
解决方法:
把conf/activemq.xml文件内以下配置从:
<persistenceadapter>
<kahadb directory="${activemq.data}/kahadb"></kahadb>
</persistenceadapter>
改成
<persistenceadapter>
<kahadb directory="${activemq.data}/kahadb"
ignoreMissingJournalfiles="true"
checkForCorruptJournalFiles="true"
checksumJournalFiles="true"></kahadb>
</persistenceadapter>
保存后退出。
相关配置的解释:
ignoreMissingJournalfiles 默认:false 忽略丢失的消息文件,false,当丢失了消息文件,启动异常
checkForCorruptJournalFiles 默认:false 检查消息文件是否损坏,true,检查发现损坏会尝试修复
checksumJournalFiles 默认:false 产生一个checksum,以便能够检测journal文件是否损坏。
参考博客:
http://www.huilog.com/?p=991
3. ActiveMQ设置持久订阅者不生效问题
在设置ActiveMQ的持久订阅者时发现,设置不成功,控制台报错:
Could not refresh JMS Connection for destination ‘portal.admin.topic’ – retrying using FixedBackOff{interval=5000, currentAttempts=9, maxAttempts=unlimited}. Cause: setClientID call not supported on proxy for shared Connection. Set the ‘clientId’ property on the SingleConnectionFactory instead.
后来发现按照下面设置就没有问题。后来看了下报错信息 : “
setClientID call not supported on proxy for shared Connection
” ,百度翻译:共享连接的代理不支持setClientID调用,起初刚开始写时候没有在代码中自己创建connectionFactory,而是在yml中使用的自动配置
spring:
activemq:
user: user
password: user
broker-url: tcp://127.0.0.1:61616
然后springboot会根据在yml中的配置自己创建connectionFactory,后来看了一下我使用activemq的版本是5.15.11,感觉这个版本有问题。所以各位在用的时候可以下载一下早一点的版本5.15.9,5.15.10也不要下载。如果各位有什么高见,欢迎指教,感谢!
package com.example.demo.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerEndpoint;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.listener.MessageListenerContainer;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
/**
* yangsan
*/
@Configuration
@EnableJms
public class ActiveMQConfig {
@Value("${activemq.url}")
private String url;
@Value("${activemq.user}")
private String user;
@Value("${activemq.password}")
private String password;
/**
* 配置连接信息
* @return
*/
@Bean
public ConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(url);
connectionFactory.setUserName(user);
connectionFactory.setPassword(password);
//设置重发属性 根据需要配置 可以不用
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return connectionFactory;
}
/**
* 创建队列 和 Topic
* @return
*/
@Bean
public Topic topic() {
return new ActiveMQTopic("springboot.topic");
}
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
//是否在每次尝试重新发送失败后,增长这个等待时间
redeliveryPolicy.setUseExponentialBackOff(true);
//重发次数,默认为6次,这里设置为10次,-1表示不限次数
redeliveryPolicy.setMaximumRedeliveries(3);
//重发时间间隔,默认为1毫秒,设置为10000毫秒
redeliveryPolicy.setInitialRedeliveryDelay(10000);
//表示没有拖延只有UseExponentialBackOff(true)为true时生效
//第一次失败后重新发送之前等待10000毫秒,第二次失败再等待10000 * 2毫秒
//第三次翻倍10000 * 2 * 2,以此类推
redeliveryPolicy.setBackOffMultiplier(2);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(true);
//设置重发最大拖延时间360000毫秒 表示没有拖延只有UseExponentialBackOff(true)为true时生效
redeliveryPolicy.setMaximumRedeliveryDelay(360000);
return redeliveryPolicy;
}
/**
* 持久订阅者配置
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory jmsTopicPersistentListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 必须设置clientId才能生效 必须设置factory.setSubscriptionDurable(true) 必须设置为true
factory.setSubscriptionDurable(true);
factory.setPubSubDomain(true);
factory.setClientId("doorAccessRecord");
return factory;
}
}
其中最主要的设置是: factory.setSubscriptionDurable(true) 和 factory.setClientId(“doorAccessRecord”) 这两句必须设置。
4. ActiveMQ生产者停止的问题
很老很老的一个项目中activemq的默认存储为khadb,某一天发现mq无法运行,查看mq日志发现写的意思是storeUsage存储满了,无法在生产发送消息。
du -sh 查看khadb的大小有102GB,mq的activemq.xml中配置的为100GB限制。随后先将限制改为200GB,先保证系统正常
这个主要是要设置消息的过期时间设置等,不能一直堆积在那里,占用磁盘空间等等。