ActiveMQ工作中遇到的问题总结

  • Post author:
  • Post category:其他



目录


ActiveMQ清除不活跃的订阅者


ActiveMQ启动时候报错的问题


ActiveMQ设置持久订阅者不生效问题


ActiveMQ生产者停止的问题


1. ActiveMQ清除不活跃的订阅者

背景:

通过MQ推送数据给第三方,第三方订阅者为

持久订阅者

,测试时候设置了不同的订阅名称(每次名称不同即使ClientId也会创建新的持久订阅者),导致原来无用的持久订阅者没有删除,然后导致发送的消息不活跃的订阅者没及时消费的话不会清理,导致消息堆积,数据量大时候导致消息堆积,影响mq性能

解决方案:

mq的设置清除不活跃的订阅者 在activeMQ的

activemq.xml

文件中 <broker name=”localhost” offlineDurableSubscriberTimeout=”86400000″ offlineDurableSubscriberTaskSchedule=”3600000″> broker标签里面加入 offlineDurableSubscriberTimeout=”86400000″ offlineDurableSubscriberTaskSchedule=”3600000″ 单位为毫秒 然后重启mq,会定时检测不活跃的订阅者,然后清除。

参考mq官网:

http://activemq.apache.org/manage-durable-subscribers.html

2.ActiveMQ启动时候报错的问题

ActiveMQ启动报错


Failed to start Apache ActiveMQ ([localhost, null], java.io.IOException: Detected missing/corrupt journal files referenced by:[0:ExceptionDLQ.Activit yResultPostProcess] 10 messages affected.)

一个跑了满久的activemq停止后再启动就自动退出了,查看日志有以下报错:

2018-06-11 17:54:21,483 | WARN  | Some journal files are missing: [17496, 17495, 17494, 11811, 11807, 11793] | org.apache.activemq.store.kahadb.MessageDatabase | main
2018-06-11 17:54:21,704 | ERROR | [0:ExceptionDLQ.ActivityResultPostProcess] references corrupt locations. 10 messages affected. | org.apache.activemq.store.kahadb.MessageDatabase | m
ain
2018-06-11 17:54:21,706 | ERROR | Failed to start Apache ActiveMQ ([localhost, null], java.io.IOException: Detected missing/corrupt journal files referenced by:[0:ExceptionDLQ.Activit
yResultPostProcess] 10 messages affected.) | org.apache.activemq.broker.BrokerService | main
2018-06-11 17:54:21,711 | INFO  | Apache ActiveMQ 5.13.3 (localhost, null) is shutting down | org.apache.activemq.broker.BrokerService | main
2018-06-11 17:54:21,715 | INFO  | Connector openwire stopped | org.apache.activemq.broker.TransportConnector | main
2018-06-11 17:54:21,718 | INFO  | Connector amqp stopped | org.apache.activemq.broker.TransportConnector | main
2018-06-11 17:54:21,721 | INFO  | Connector stomp stopped | org.apache.activemq.broker.TransportConnector | main
2018-06-11 17:54:21,724 | INFO  | Connector mqtt stopped | org.apache.activemq.broker.TransportConnector | main
2018-06-11 17:54:21,727 | INFO  | Connector ws stopped | org.apache.activemq.broker.TransportConnector | main

解决方法:

把conf/activemq.xml文件内以下配置从:

        <persistenceadapter>
            <kahadb directory="${activemq.data}/kahadb"></kahadb>
        </persistenceadapter>

改成

        <persistenceadapter>
            <kahadb directory="${activemq.data}/kahadb"
                    ignoreMissingJournalfiles="true"
                    checkForCorruptJournalFiles="true"
                    checksumJournalFiles="true"></kahadb>
        </persistenceadapter>

保存后退出。

相关配置的解释:

ignoreMissingJournalfiles 默认:false 忽略丢失的消息文件,false,当丢失了消息文件,启动异常

checkForCorruptJournalFiles 默认:false 检查消息文件是否损坏,true,检查发现损坏会尝试修复

checksumJournalFiles 默认:false 产生一个checksum,以便能够检测journal文件是否损坏。

参考博客:

http://www.huilog.com/?p=991

3. ActiveMQ设置持久订阅者不生效问题

在设置ActiveMQ的持久订阅者时发现,设置不成功,控制台报错:


Could not refresh JMS Connection for destination ‘portal.admin.topic’ – retrying using FixedBackOff{interval=5000, currentAttempts=9, maxAttempts=unlimited}. Cause: setClientID call not supported on proxy for shared Connection. Set the ‘clientId’ property on the SingleConnectionFactory instead.

后来发现按照下面设置就没有问题。后来看了下报错信息 : “

setClientID call not supported on proxy for shared Connection

” ,百度翻译:共享连接的代理不支持setClientID调用,起初刚开始写时候没有在代码中自己创建connectionFactory,而是在yml中使用的自动配置

spring:
  activemq:
    user: user
    password: user
    broker-url: tcp://127.0.0.1:61616

然后springboot会根据在yml中的配置自己创建connectionFactory,后来看了一下我使用activemq的版本是5.15.11,感觉这个版本有问题。所以各位在用的时候可以下载一下早一点的版本5.15.9,5.15.10也不要下载。如果各位有什么高见,欢迎指教,感谢!

package com.example.demo.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerEndpoint;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.listener.MessageListenerContainer;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;

/**
 * yangsan
 */
@Configuration
@EnableJms
public class ActiveMQConfig {

    @Value("${activemq.url}")
    private String url;
    @Value("${activemq.user}")
    private String user;
    @Value("${activemq.password}")
    private String password;

    /**
     * 配置连接信息
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(url);
        connectionFactory.setUserName(user);
        connectionFactory.setPassword(password);
        //设置重发属性  根据需要配置  可以不用
        connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
        return connectionFactory;

    }

    /**
     *  创建队列  和  Topic
     * @return
     */
    @Bean
    public Topic topic() {
        return new ActiveMQTopic("springboot.topic");
    }


    public RedeliveryPolicy redeliveryPolicy(){
        RedeliveryPolicy redeliveryPolicy=   new RedeliveryPolicy();
        //是否在每次尝试重新发送失败后,增长这个等待时间
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重发次数,默认为6次,这里设置为10次,-1表示不限次数
        redeliveryPolicy.setMaximumRedeliveries(3);
        //重发时间间隔,默认为1毫秒,设置为10000毫秒
        redeliveryPolicy.setInitialRedeliveryDelay(10000);
        //表示没有拖延只有UseExponentialBackOff(true)为true时生效
        //第一次失败后重新发送之前等待10000毫秒,第二次失败再等待10000 * 2毫秒
        //第三次翻倍10000 * 2 * 2,以此类推
        redeliveryPolicy.setBackOffMultiplier(2);
        //是否避免消息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(true);
        //设置重发最大拖延时间360000毫秒 表示没有拖延只有UseExponentialBackOff(true)为true时生效
        redeliveryPolicy.setMaximumRedeliveryDelay(360000);
        return redeliveryPolicy;
    }




    /**
     *  持久订阅者配置
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory jmsTopicPersistentListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 必须设置clientId才能生效  必须设置factory.setSubscriptionDurable(true) 必须设置为true
        factory.setSubscriptionDurable(true);
        factory.setPubSubDomain(true);
        factory.setClientId("doorAccessRecord");
        return factory;
    }
}

其中最主要的设置是: factory.setSubscriptionDurable(true) 和 factory.setClientId(“doorAccessRecord”) 这两句必须设置。

4. ActiveMQ生产者停止的问题

很老很老的一个项目中activemq的默认存储为khadb,某一天发现mq无法运行,查看mq日志发现写的意思是storeUsage存储满了,无法在生产发送消息。

du -sh 查看khadb的大小有102GB,mq的activemq.xml中配置的为100GB限制。随后先将限制改为200GB,先保证系统正常

这个主要是要设置消息的过期时间设置等,不能一直堆积在那里,占用磁盘空间等等。



版权声明:本文为qq_40969452原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。