【转】ActiveMQ架构设计与最佳实践

  • Post author:
  • Post category:其他


http://shift-alt-ctrl.iteye.com/blog/2378868 (laoda.toutiao.im)

ActiveMQ是最常用、特性最丰富的消息中间件,通常用于消息异步通信、调用解耦等多种场景,是JMS规范的实现者之一。


一、架构设计概要

ActiveMQ提供两种可供实施的架构模型:“M-S”和“network bridge”;其中“M-S”是HA方案,“网络转发桥”用于实现“分布式队列”。



1、M-S

Master-Slave模型下,通常需要2+个ActiveMQ实例,任何时候只有一个实例为Master,向Client提供”生产”、“消费”服务,Slaves用于做backup或者等待Failover时角色接管。

M-S模型是最通用的架构模型,它提供了“高可用”特性,当Master失效后,Slaves之一提升为master继续提供服务,且Failover之后消息仍然可以恢复。(根据底层存储不同,有可能会有消息的丢失)

1)M-S架构中,设计到选举问题,选举的首要条件就是需要有“排它锁”的支持。排它锁,可以有共享文件锁、JDBC数据库排它锁、JDBC锁租约、zookeeper分布式锁等方式实现。这取决你的底层存储的机制。

2)M-S架构中,消息存储的机制有多种,“共享文件存储”、“JDBC”存储、“非共享存储”等。不同存储机制,各有优缺点。



2、网络转发桥(network bridge)

无论如何,一组M-S所能承载的消息量、Client并发级别总是有限的,当我们的消息规模达到单机的上限时,我们应该基于集群的方式,将消息、Client进行分布式和负载均衡。ActiveMQ提供了“网络转发桥”模式,核心思想是:集群中多个broker之间,通过“连接”互相通信,并将消息在多个Broker之间转发和存储,提供存储层面的“负载均衡”,以及根据Client的并发情况,对Client进行动态平衡,最终实现支持大规模生产者、消费者。


二、M-S架构设计详解



1、非共享存储模式

集群中有2+个ActiveMQ实例,每个实例单独存储数据,Master将消息保存在本地后,并将消息异步的方式转发给Slaves。


Master和slaves独立部署,各自负责存储,Master与slaves之间通过“network connector”连接,通常是Master单项与slaves建立连接。master上接收到的消息将会全量转发给slaves。

1)任何时候只有Master向Clients提供服务,slaves仅作backup。

2)slaves上存储的消息,有短暂的延迟。

3)master永远是master,当master失效时,我们不能随意进行角色切换,最佳实施方式就是重启master,只有当master物理失效时才会考虑将slave提升为master。

4)当slaves需要提升为master时,应该确保此slaves的消息是最新的。

5)如果slaves离线,那么在重启slaves之前,还应该将master的数据手动同步给slaves。否则slave离线期间的数据,将不会在slaves上复现。

6)Client端不支持failover协议;即Client只会与master建立连接。

这种架构,是最原始的架构,易于实时,但是问题比较严重,缺乏Failover机制,消息的可靠性我们无法完全保障,因为master与slaves角色切换没有仲裁者、或者说缺少分布式排它锁机制。

在Production环境中,不建议采用,如果你能容忍failover期间SLA水平降级的话,也可以作为备选。



2、共享文件存储

即采用SAN(GFS)技术,基于网络的全局共享文件系统模式,这种架构简单易用,

但是可架构、可设计的能力较弱,在Production环境下也可以采用



SAN存储,可以参考GFS。其中master和slaves配置保持一致,每个broker都需要有唯一的brokerName;broker实例在启动时首先通过SAN获取文件系统的排它锁,获取lock的实例将成为master,其他brokers将等待lock、并间歇性的尝试获取锁,slaves不提供Clients服务;因为brokers将数据写入GFS,所以在failover之后,新的master获取的数据视图仍然与原master保持一致,毕竟GFS是全局的共享文件系统。

我们通常使用kahaDB作为存储引擎,即使用日志文件方式;kahaDB的存储效率非常的高,TPS可以高达2W左右,是一种高效的、数据恢复能力强的存储机制。

这种架构模式下,支持failover,当master失效后,Clients能够通过failover协议与新的master重连,服务中断时间很短。因为基于GFS存储,所以数据总是保存在远端共享存储区域,所以不存在数据丢失的问题。

唯一的问题,就是GFS(SAN)的稳定性问题。这一点需要确定,SAN区域中的节点之间网络通信必须稳定且高效。



3、基于JDBC共享存储

我们可以将支持JDBC的数据库作为共享存储层,即master将数据写入数据库,本地不保存任何数据,在failover期间,slave提升为master之后,新master即可从数据库中读取数据,这也意味着在整个周期中,master与slaves的数据视图是一致的(同SAN架构),所以数据的恢复能力和一致性是可以保障的,也不存在数据丢失的情况(在存储层)。

但是我们需要知道JDBC存储机制,性能较低,与kahaDB这种基于日志存储层相比,性能相差近10倍左右。


如果你的业务需求,表明数据丢失是难以容忍的、且SLA水平很高,那么JDBC或许是你最好的选择。



既然JDBC数据库为最终存储层,那么我们很多时候需要关注数据库的可用性问题,比如数据库基于M-S模式等;如果数据库失效,将导致ActiveMQ集群不可用。


三、network bridges模式架构

这种架构模式,主要是应对大规模Clients、高密度的消息增量的场景;它将以集群的模式,承载较大数据量的应用。

1)有大量Producers、Consumers客户端接入。只所以如此,或许是因为消息通道(Topic,Queue)在水平扩张的方向上,已经没有太大的拆分可能性。

2)或许消息的增量,是很庞大的,特别是一些“非持久化消息”。

我们寄希望于构建“分布式队列”架构

3)因为集群规模较大,我们可能允许集群某些节点短暂的离线,但数据恢复机制仍然需要提供,总体而言,集群仍然提供较高的可用性。

4)集群支持Clients的负载均衡,比如有多个producers时,这些producers会被动态的在多个brokers之间平衡。

5)支持failover,即当某个broker失效时,Clients可以与其他brokers重连;当集群中有的新的brokers加入时,集群的拓扑也可以动态的通知给Clients。



集群有多个子Groups构成,每个Group为M-S模式、共享存储;多个Groups之间基于“network Connector”建立连接(masterslave协议),通常为双向连接,所有的Groups之间彼此相连,Groups之间形成“订阅”关系,比如G2在逻辑上为G1的订阅者(订阅的策略是根据各个Broker上消费者的Destination列表进行分类),消息的转发原理也是基于此。对于Client而言,仍然支持failover,failover协议中可以包含集群中“多数派”的节点地址。

对于Topic订阅者的消息,将会在所有Group中复制存储,对弈Queue的消息,将会在brokers之间转发,并最终到达Consumer所在的节点。

Producers和Consumers可以与任何Group中的Master建立连接并进行消息通信,当Brokers集群拓扑变化时、Producers或Consumers的个数变化时,将会动态平衡Clients的连接位置。Brokers之间通过“advisory”机制来同步Clients的连接信息,比如新的Consumers加入,Broker将会发送advisory消息(内部的通道)通知其他brokers。

集群模式提供了较好的可用性担保能力,在某些特性上或许需要权衡,比如Queue消息的有序性将会打破,因为同一个Queue的多个Consumer可能位于不同的Group上,如果某个Group实现,那么保存在其上的消息只有当其恢复后才能对Clients可见。


“网络转发桥”集群模式,构建复杂,维护成本高,可以在Production环境中使用。


四、性能评估

综上所述,在Production环境中,我们能够真正意义上采用的架构,只有三种:

1)基于JDBC的共享数据库模式:HA架构,单一Group,Group中包含一个master和任意多个slaves;所有Brokers之间通过远端共享数据库存取数据。对客户端而言支持Failover协议。

2)基于Network Bridge构建分布式消息集群:Cluster架构,集群中有多个Group,每个Group均为M-S架构、基于共享存储;对于Clients而言,支持负载均衡和Failover;消息从Producer出发,到达Broker节点,Broker根据“集群中Consumers分布”,将消息转发给Consumers所在的Broker上,实现消息的按需流动。

3)基于Network Bridge的简化改造:与2)类似,但是每个“Group”只有一个Broker节点,此Broker基于kahaDB本地文件存储,即相对于2)Group缺少了HA特性;当Broker节点失效时,其上的消息将不可见、直到Broker恢复正常。这种简化版的架构模式,通过增加机器的数量、细分消息的分布,来降低数据影响故障影响的规模,因为其基于kahaDB本地日志存储,所以性能很高。



1、共享JDBC

Java代码
收藏代码


  1. Producer端(压力输出机器):

  2. 数量:

    4



  3. 硬件配置:16Core、32G,云主机

  4. 软件配置:JDK

    1.8

    ,JVM 24G

  5. 并发与线程:

    32

    并发线程,连接池为

    128

    ,发送文本消息,每个消息

    128

    个字符实体。

  6. 消息:持久化,Queue,非事务


  7. Broker端(压力承载)

  8. 数量:

    2



  9. 硬件配置:16Core、32G,云主机

  10. 软件配置:JDK

    1.8

    ,JVM 24G

  11. 架构模式:M-S模式,开启异步转发、关闭FlowControl,数据库连接池为

    1024



  12. 数据库(存储层)

  13. 数量:

    2



  14. 硬件配置:16Core、32G,SSD(IOPS

    3000

    ),云主机

  15. 架构模式:M-S

  16. 数据库:MySQL



  17. 测试结果


  18. 1

    、消息生产效率:

    1500

    TPS


  19. 2

    、Broker负载情况:CPU

    30

    %,内存使用率

    11

    %


  20. 3

    、MySQL负载情况:CPU

    46

    %,IO_WAIT

    25

    %


  21. 结论:


  22. 1

    、基于共享JDBC存储架构,性能确实较低。


  23. 2

    、影响性能的关键点,就是数据库的并发IO能力,当TPS在

    1800

    左右时,数据库的磁盘(包括slave同步IO)已经出现较高的IO_WAIT。


  24. 3

    、通过升级磁盘、增加IOPS,可以有效提升TPS指标,建议同时提高CPU的个数。



2、基于非共享文件存储

测试单个ActiveMQ,基于kahaDB存储,kahaDB分为两种数据刷盘模式:

1)逐条消息刷盘

2)每隔一秒刷盘

Java代码
收藏代码


  1. <persistenceAdapter>

  2. <kahaDB directory=

    “${activemq.data}/kahadb”

    journalDiskSyncStrategy=

    “periodic”

    journalDiskSyncInterval=

    “2000”

    />

  3. </persistenceAdapter>

压力测试环境与1)保持一致,只是ActiveMQ的机器的磁盘更换为:SSD (600 IOPS)。

Java代码
收藏代码



  1. 1

    )逐条刷新磁盘

  2. TPS:

    660


  3. Broker IO_WAIT:

    19

    %


  4. 2

    )每隔一秒刷新磁盘

  5. TPS:

    9800


  6. Broker IO_WAIT:

    1.6

    %

由此可见,基于日志文件的存储性能比JDBC高了接近5倍,其中逐条刷盘策略,消息的可靠性是最高的,但是性能却低于JDBC。如果基于“每隔一秒刷盘”策略,在极端情况下,可能导致最近一秒的数据丢失。



3、基于转发桥

基于转发桥的架构,实施成本较高,维护成本较高,架构复杂度也相对较大。本人根据实践经验,不推荐使用此模式。如果你希望尝试,也无妨,毕竟它是ActiveMQ官方推荐的“分布式队列实现机制”,从原理上它可以支持较大规模的消息存储。



4、最佳实践


终归,我们需要面对“海量消息”的存储,我们在按照业务进行队列拆分之后,仍然需要面临某个单纯业务的消息量仍然是“单个M-S架构”无法满足,我们又不愿意承担Cluster模式复杂度所带来的潜在问题,此时,我们可以采用比较通用的“逻辑分布式”机制。

1)我们构建多个M-S组,但是每个Group之间在物理上没有关联,即它们之间互不通信,且不共享存储。

2)我们再Producer的客户端,增加“router”层面, 即开发一个Client Wrapper,此wrapper提供了Producer常用的接口,且持有多个M-S组的ConnectionFactory,在通过底层通道发送消息之前,根据message中的某个property、或者指定的KEY,进行hash计算,进而选择相应的连接(或者Spring的包装类),然后发送消息。这有点类似于基于客户端的数据库读写分离的策略。

3)对于Consumers,则只需要配置多个ConnectionFactory即可。

4)经过上述实践,我们将消息sharding到多个M-S组,解决了消息发送效率的问题,且逻辑集群可以进行较大规模的扩展。而且对Client是透明的。

5)

如果你不想开发shard-router层面,我们仍然可以基于failover协议来实现“逻辑分布式”的消息散列存储,此时需要在failover协议中指明所有Groups的brokers节点列表,且randomize=true。这种用法,可以实现消息在多个Group上存储,唯一遗憾的地方时,因为缺乏“自动负载均衡策略”,可能导致消息分布不均。

Java代码
收藏代码


  1. failover:(tcp:

    //G1.master,tcp://G1.slave,tcp://G2.master,tcp://G2.slave)?randomize=true



  2. //randomize必须为true


五、ActiveMQ配置样例(基于共享JDBC)

根据本人深思,最终还是决定采用共享JDBC数据库的方式,因为我无法承担业务团队叫嚣“消息丢失”所带来的“非技术性”压力与纠葛。虽然单个Group性能稍差,但是我们可以进行多Groups扩容。

Java代码
收藏代码


  1. <?xml version=

    “1.0”

    encoding=

    “utf-8”

    ?>

  2. <beans xmlns=

    “http://www.springframework.org/schema/beans”


  3. xmlns:xsi=

    “http://www.w3.org/2001/XMLSchema-instance”


  4. xsi:schemaLocation=”http:

    //www.springframework.org/schema/beans


  5. http:

    //www.springframework.org/schema/beans/spring-beans.xsd


  6. http:

    //activemq.apache.org/schema/core


  7. http:

    //activemq.apache.org/schema/core/activemq-core.xsd”>


  8. <!–

  9. 配置与授权

  10. –>

  11. <bean

    class

    =

    “org.springframework.beans.factory.config.PropertyPlaceholderConfigurer”

    >

  12. <property name=

    “locations”

    >

  13. <value>file:${activemq.conf}/credentials.properties</value>

  14. </property>

  15. </bean>

  16. <!– 审计日志 –>

  17. <bean id=

    “logQuery”


    class

    =

    “io.fabric8.insight.log.log4j.Log4jLogQuery”


  18. lazy-init=

    “false”


  19. scope=

    “singleton”


  20. init-method=

    “start”


  21. destroy-method=

    “stop”

    >

  22. </bean>

  23. <!–

  24. <bean id=

    “mysql-ds”


    class

    =

    “org.apache.commons.dbcp.BasicDataSource”

    destroy-method=

    “close”

    >

  25. <property name=

    “driverClassName”

    value=

    “com.mysql.jdbc.Driver”

    />

  26. <property name=

    “url”

    value=

    “jdbc:mysql://localhost/activemq?relaxAutoCommit=true”

    />

  27. <property name=

    “username”

    value=

    “activemq”

    />

  28. <property name=

    “password”

    value=

    “activemq”

    />

  29. <property name=

    “maxActive”

    value=

    “128”

    ></property>

  30. <property name=

    “maxIdle”

    value=

    “2”

    ></property>

  31. <property name=

    “minIdle”

    value=

    “1”

    ></property>

  32. <property name=

    “maxWait”

    value=

    “3000”

    ></property>

  33. <property name=

    “defaultAutoCommit”

    value=

    “true”

    ></property>

  34. <property name=

    “poolPreparedStatements”

    value=

    “true”

    />

  35. </bean>

  36. –>

  37. <!–


  38. 1

    、brokerName

  39. 每个broker必须持有唯一不同的名称,我们通常,以broker + {IP}方式


  40. 2

    、useJmx

  41. 我们开启jmx,适用于组件监控,配合下文中的<managementContext/>


  42. 3

    、dataDirectory

  43. 数据目录,包括日志,cursor文件,数据文件等。数据文件可以在persistence配置中“重写”。


  44. 4

    、enableStatistics

  45. 开启统计,此后可以通过active ${status}等相关指令查看,开启有一定的性能损耗。


  46. 5

    、persistent

  47. 开启持久化功能,即数据将会写入Store.

  48. 如果为

    false

    ,那么所有的消息都将以内存方式存储,请使用<memoryPersistenceAdapter>


  49. 6

    、schedulerSupport

  50. 开启调度,如果需要Broker执行,比如定期清理过期消息、检测磁盘和内容容量、清理离线订阅者等,此时必须开启。


  51. 7

    、useVirtualTopics

  52. 开启虚拟Topics功能


  53. 8

    、offlineDurableSubscriberTimeout

  54. 对于“持久订阅者”,如果长期离线,将导致Topic消息积压,验证影响Topic的转发效率。

  55. 我们应该将那些“长期离线”的订阅者删除。此值为

    7

    天,单位:毫秒


  56. 9

    、offlineDurableSubscriberTaskSchedule

  57. 用于“检测”离线订阅者的定时器调度间隔,此值为

    1

    个小时


  58. 10

    、schedulePeriodForDestinationPurge

  59. 如果一个空的Destination(没有消息积压)在一定时间内,没有Consumer消费时,将会被删除。

  60. 需要配合才能生效

  61. <policyEntry  gcInactiveDestinations=

    “true”

    inactiveTimeoutBeforeGC=

    “30000”

    />

  62. 本实例中为

    7

    天有效期,每隔

    1

    小时检测一次


  63. 11

    、advisorySupport

  64. 开启通知,主要用于监控,当出现慢消费者、DLQ、容量不足等问题时,将会在“advisory”相关的Queue、Topic中发送内置的消息,

  65. 对于监控程序,可以通过消费advisory,实现组件监控机制。

  66. 有一定的性能开支


  67. 12

    、schedulePeriodForDiskUsageCheck

  68. 每个

    5

    分钟检测一次磁盘存储使用率。参见<systemUsage>


  69. –>

  70. <broker xmlns=

    “http://activemq.apache.org/schema/core”


  71. brokerName=

    “broker-01”


  72. useJmx=

    “true”


  73. dataDirectory=

    “${activemq.data}”


  74. enableStatistics=

    “true”


  75. persistent=

    “true”


  76. useVirtualTopics=

    “true”


  77. schedulerSupport=

    “true”


  78. offlineDurableSubscriberTimeout=

    “604800000”


  79. offlineDurableSubscriberTaskSchedule=

    “3600000”


  80. schedulePeriodForDestinationPurge=

    “3600000”


  81. advisorySupport=

    “true”


  82. schedulePeriodForDiskUsageCheck=

    “300000”

    >

  83. <destinationPolicy>

  84. <policyMap>

  85. <policyEntries>

  86. <!–

  87. 关于持久化订阅者的相关配置

  88. http:

    //activemq.apache.org/manage-durable-subscribers.html


  89. 通道策略

  90. http:

    //activemq.apache.org/per-destination-policies.html


  91. 删除不活跃通道

  92. http:

    //activemq.apache.org/delete-inactive-destinations.html


  93. –>

  94. <!–


  95. 1

    、topic

  96. 通用正则表达式,表示“全部topic”


  97. 2

    、expireMessagesPeriod

  98. 每个

    5

    分钟检测一次消息,对于TLL过期的消息将会被移除。(根据DLQ策略)


  99. 3

    、advisoryForSlowConsumers

  100. 如果“advisorySupport”开启时,当Broker判定某个消费者为慢速消费者(待确认消息 >=

    2

    * prefetch)

  101. 将会发送通知。


  102. 4

    、advisoryWhenFull

  103. 如果cursor、store溢满时,发送通知


  104. 5

    、maxPageSize

  105. 从store中pageIn消息列表的批量大小


  106. 6

    、producerFlowControl

  107. 是否开启“生产者流量控制”,如果开启,当内存溢满、“待发送消息达到阈值”将会阻塞producer。

  108. 因为我们采用store存储,所以不需要流量控制


  109. 7

    、durableTopicPrefetch


  110. 8

    、gcInactiveDestinations

  111. 不活跃的通道,是否允许被删除。


  112. 9

    、inactiveTimeoutBeforeGC

  113. 当一个通道中没有消息,且没有消费者时,此通道将会被认定为“不活跃”

  114. –>


  115. <policyEntry topic=

    “>”

    expireMessagesPeriod=

    “0”


  116. advisoryForSlowConsumers=

    “true”


  117. advisoryWhenFull=

    “true”


  118. maxPageSize=

    “512”


  119. producerFlowControl=

    “false”


  120. durableTopicPrefetch=

    “200”


  121. gcInactiveDestinations=

    “true”


  122. inactiveTimeoutBeforeGC=

    “604800000”


  123. >

  124. <!– 转发策略 –>

  125. <dispatchPolicy>

  126. <!– 对于Topic,我们通常采用轮训机制 –>

  127. <roundRobinDispatchPolicy/>

  128. </dispatchPolicy>

  129. <!–

  130. 对于non-durable Topic,积压的消息数量 ,如果超过限制,则剔除

  131. http:

    //activemq.apache.org/slow-consumer-handling.html


  132. 仅对非持久化Topic有效,目的是提高Topic的转发效率。

  133. –>

  134. <pendingMessageLimitStrategy>

  135. <constantPendingMessageLimitStrategy limit=

    “256”

    />

  136. </pendingMessageLimitStrategy>

  137. <messageEvictionStrategy>

  138. <oldestMessageEvictionStrategy/>

  139. </messageEvictionStrategy>

  140. <!–

  141. 不支持

    “可回溯”

    订阅者,即新加入的订阅者只能获取订阅操作发生之后的消息

  142. http:

    //activemq.apache.org/subscription-recovery-policy.html


  143. –>

  144. <subscriptionRecoveryPolicy>

  145. <noSubscriptionRecoveryPolicy/>

  146. </subscriptionRecoveryPolicy>

  147. <!–

  148. 对于TTL过期的、或者临时存储溢满被剔除的、重发次数超过限制的等等,都有可能进入DLQ


  149. 1

    、processExpired

  150. TTL过期的消息,将直接移除,不会进入DLQ


  151. 2

    、processNonPersistent

  152. 对于非持久化消息,无论如何都进入DLQ


  153. 3

    、expiration

  154. DLQ中消息的TTL,从进入DLQ开始。此值为“

    3

    天”

  155. –>

  156. <deadLetterStrategy>

  157. <sharedDeadLetterStrategy processExpired=

    “false”


  158. processNonPersistent=

    “false”


  159. expiration=

    “259200000”

    />

  160. </deadLetterStrategy>

  161. <!–

  162. 积压消息的转发策略,cursor机制

  163. 当Producer发送小于大于Consumer消费效率时,这意味着Broker在转发层面需要对

  164. “积压”的消息进行buffer或者临时存储。


  165. 1

    、对于非持久化订阅者,消息直接保存在内存中,存储量受限于systemUsage。


  166. 2

    、对于持久化订阅者,消息将使用store(内部基于VM + File)

  167. http:

    //activemq.apache.org/message-cursors.html


  168. –>

  169. <pendingSubscriberPolicy>

  170. <vmCursor/>

  171. </pendingSubscriberPolicy>

  172. <pendingDurableSubscriberPolicy>

  173. <storeDurableSubscriberCursor/>

  174. </pendingDurableSubscriberPolicy>

  175. </policyEntry>

  176. <!–

  177. 因为Queue总是基于prefetch批量推送机制,所有当consumer有多个,且消息的密度不大时,如果使用

  178. strictOrderDispatch将会导致总是转发给一个consumer的问题。

  179. strictOrderDispatch + prefetch需要注意

  180. –>

  181. <policyEntry queue=

    “>”

    expireMessagesPeriod=

    “0”


  182. maxPageSize=

    “512”


  183. producerFlowControl=

    “false”


  184. queuePrefetch=

    “1000”


  185. strictOrderDispatch=

    “false”


  186. useConsumerPriority=

    “true”


  187. sendAdvisoryIfNoConsumers=

    “true”


  188. advisoryForSlowConsumers=

    “true”


  189. advisoryWhenFull=

    “true”


  190. gcInactiveDestinations=

    “true”


  191. inactiveTimoutBeforeGC=

    “604800000”

    >

  192. <deadLetterStrategy>

  193. <!–

  194. 私信队列,统一使用一个,避免不必要的维护成本,易于监控

  195. –>

  196. <sharedDeadLetterStrategy processExpired=

    “false”


  197. processNonPersistent=

    “false”


  198. expiration=

    “259200000”

    />

  199. </deadLetterStrategy>

  200. <!–

  201. 积压待发的消息,采用store

  202. –>

  203. <pendingQueuePolicy>

  204. <storeCursor/>

  205. </pendingQueuePolicy>

  206. </policyEntry>

  207. </policyEntries>

  208. </policyMap>

  209. </destinationPolicy>

  210. <!–

  211. 虚拟Topic,我们让所有的Topic都支持虚拟化

  212. http:

    //activemq.apache.org/virtual-destinations.html


  213. –>

  214. <destinationInterceptors>

  215. <virtualDestinationInterceptor>

  216. <virtualDestinations>

  217. <virtualTopic name=

    “>”

    prefix=

    “VConsumers.*.”

    selectorAware=

    “false”

    />

  218. </virtualDestinations>

  219. </virtualDestinationInterceptor>

  220. </destinationInterceptors>

  221. <!–

  222. JMX监控

  223. http:

    //activemq.apache.org/jmx.html


  224. –>

  225. <managementContext>

  226. <managementContext createConnector=

    “false”

    />

  227. </managementContext>

  228. <!–

  229. PersistenceAdapter

  230. http:

    //activemq.apache.org/persistence.html


  231. –>

  232. <persistenceAdapter>

  233. <jdbcPersistenceAdapter dataDirectory=

    “${activemq.data}”

    dataSource=

    “#mysql-ds”

    lockKeepAlivePeriod=

    “5000”

    >

  234. <locker>

  235. <lease-database-locker lockAcquireSleepInterval=

    “10000”

    />

  236. </locker>

  237. </jdbcPersistenceAdapter>

  238. </persistenceAdapter>

  239. <!–

  240. Memory Setting and Flow-Control

  241. http:

    //activemq.apache.org/producer-flow-control.html


  242. –>

  243. <systemUsage>

  244. <systemUsage>

  245. <memoryUsage>

  246. <memoryUsage percentOfJvmHeap=

    “70”

    />

  247. </memoryUsage>

  248. <storeUsage>

  249. <storeUsage limit=

    “50 gb”

    />

  250. </storeUsage>

  251. <tempUsage>

  252. <tempUsage limit=

    “20 gb”

    />

  253. </tempUsage>

  254. </systemUsage>

  255. </systemUsage>

  256. <!–

  257. TransportConnector and Protocol Setting

  258. http:

    //activemq.apache.org/configuring-transports.html


  259. –>

  260. <transportConnectors>

  261. <!– DOS protection, limit concurrent connections to

    1000

    and frame size to 100MB –>

  262. <transportConnector name=

    “openwire”


  263. uri=

    “tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600”

    />

  264. <transportConnector name=

    “amqp”


  265. uri=

    “amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600”

    />

  266. <transportConnector name=

    “stomp”


  267. uri=

    “stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600”

    />

  268. <transportConnector name=

    “mqtt”


  269. uri=

    “mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600”

    />

  270. <!–

  271. <transportConnector name=

    “ws”

    uri=

    “ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600”

    />

  272. –>

  273. </transportConnectors>

  274. <!– destroy the spring context on shutdown to stop jetty –>

  275. <shutdownHooks>

  276. <bean xmlns=

    “http://www.springframework.org/schema/beans”



  277. class

    =

    “org.apache.activemq.hooks.SpringContextHook”

    >

  278. </bean>

  279. </shutdownHooks>

  280. <!–

  281. 私信队列处理

  282. http:

    //activemq.apache.org/message-redelivery-and-dlq-handling.html


  283. –>

  284. <plugins>

  285. <simpleAuthenticationPlugin>

  286. <users>

  287. <authenticationUser username=

    “amq_manager”

    password=

    “012345”


  288. groups=

    “users,admins”

    />

  289. <authenticationUser username=

    “amq_common”

    password=

    “123456”


  290. groups=

    “users”

    />

  291. </users>

  292. </simpleAuthenticationPlugin>

  293. <redeliveryPlugin fallbackToDeadLetter=

    “true”


  294. sendToDlqIfMaxRetriesExceeded=

    “true”

    >

  295. <redeliveryPolicyMap>

  296. <!– 重发策略,对于超过重发次数的消息将会被添加到DLQ –>

  297. <redeliveryPolicyMap>

  298. <redeliveryPolicyEntries>

  299. <!–

  300. 重发机制,默认重发

    6

    ,重发延迟基于backOff模式

  301. –>

  302. <redeliveryPolicy maximumRedeliveries=

    “6”


  303. initialRedeliveryDelay=

    “1000”


  304. useExponentialBackOff=

    “true”


  305. backOffMultiplier=

    “5”


  306. queue=

    “>”

    />

  307. </redeliveryPolicyEntries>

  308. <defaultEntry>

  309. <!– 其他策略 –>

  310. <redeliveryPolicy maximumRedeliveries=

    “6”


  311. initialRedeliveryDelay=

    “1000”


  312. useExponentialBackOff=

    “true”


  313. backOffMultiplier=

    “5”

    />

  314. </defaultEntry>

  315. </redeliveryPolicyMap>

  316. </redeliveryPolicyMap>

  317. </redeliveryPlugin>

  318. </plugins>

  319. </broker>

  320. <!–

  321. Web Manager

  322. –>

  323. <

    import

    resource=

    “jetty.xml”

    />

  324. </beans>


需要特别注意“


expireMessagesPeriod


”这个参数,我们发现这个参数一旦开启,Broker将会间歇性全量获取数据,特别是在JDBC存储模式下,会导致Broker与数据库之间的数据流量巨大,导致内存OOM的问题。


六、Producer配置(基于Spring)

Java代码
收藏代码


  1. <?xml version=

    “1.0”

    encoding=

    “UTF-8”

    ?>

  2. <beans xmlns=

    “http://www.springframework.org/schema/beans”


  3. xmlns:xsi=

    “http://www.w3.org/2001/XMLSchema-instance”


  4. xmlns:amq=

    “http://activemq.apache.org/schema/core”


  5. xmlns:jms=

    “http://www.springframework.org/schema/jms”


  6. xsi:schemaLocation=”http:

    //www.springframework.org/schema/beans


  7. http:

    //www.springframework.org/schema/beans/spring-beans-4.0.xsd


  8. http:

    //www.springframework.org/schema/jms


  9. http:

    //www.springframework.org/schema/jms/spring-jms-4.0.xsd


  10. http:

    //activemq.apache.org/schema/core


  11. http:

    //activemq.apache.org/schema/core/activemq-core-5.8.0.xsd”>



  12. <!– Spring提供的JMS工具类,它可以进行消息发送、接收等 –>

  13. <bean id=

    “jmsTemplate”


    class

    =

    “org.springframework.jms.core.JmsTemplate”

    >

  14. <property name=

    “connectionFactory”

    ref=

    “amqPooledConnectionFactory”

    />

  15. <property name=

    “defaultDestination”

    ref=

    “testQueue”

    />

  16. <!– 非持久化:

    1

    ,持久化:

    2

    –>

  17. <property name=

    “deliveryMode”

    value=

    “2”

    />

  18. <property name=

    “explicitQosEnabled”

    value=

    “true”

    />

  19. <property name=

    “messageIdEnabled”

    value=

    “true”

    />

  20. <property name=

    “messageTimestampEnabled”

    value=

    “true”

    />

  21. <!–

    0

    :基于事务的确认机制

    1

    :基于session的自动确认,

    2

    :客户端确认,此值对consumer生效 –>

  22. <property name=

    “sessionAcknowledgeMode”

    value=

    “2”

    />

  23. <property name=

    “sessionTransacted”

    value=

    “false”

    />

  24. <!–

    72

    小时 –>

  25. <!– 所有的消息,都应该表明其TTL –>

  26. <property name=

    “timeToLive”

    value=

    “259200000”

    />

  27. </bean>

  28. <bean id=

    “amqPooledConnectionFactory”


    class

    =

    “org.apache.activemq.pool.PooledConnectionFactory”

    destroy-method=

    “stop”

    >

  29. <!– 当session池(连接池)已满时,getSession()操作阻塞的最大时间,超时后抛出异常 –>

  30. <property name=

    “blockIfSessionPoolIsFullTimeout”

    value=

    “6000”

    />

  31. <property name=

    “connectionFactory”

    ref=

    “amqConnectionFactory”

    />

  32. <!– 单个连接的生命周期,TTL,从创建开始,当其服务时长达到timeout时,且没有Consumer、Producer使用,则会关闭 –>

  33. <property name=

    “expiryTimeout”

    value=

    “0”

    />

  34. <!– 一个正常的连接,当没有producer或者消费者进行数据交互时、空闲timeout之后,应该被关闭并移出pool –>

  35. <property name=

    “idleTimeout”

    value=

    “30000”

    />

  36. <!– 最大连接数,应该合理,建议与应用的并发级别保持一致 –>

  37. <property name=

    “maxConnections”

    value=

    “128”

    />

  38. <!– 每个连接上,允许创建的、并发的session数量(createSession,PooledSession)–>

  39. <property name=

    “maximumActiveSessionPerConnection”

    value=

    “500”

    />

  40. </bean>


  41. <bean id=

    “amqConnectionFactory”


    class

    =

    “org.apache.activemq.ActiveMQConnectionFactory”

    >

  42. <property name=

    “brokerURL”

    value=

    “failover:(tcp://10.0.1.100:61616,tcp://10.0.1.101:61616)?randomize=false”

    />

  43. <property name=

    “userName”

    value=

    “amq_common”

    />

  44. <property name=

    “password”

    value=

    “123456”

    />

  45. <!– 连接ID的前缀,建议与项目名保持一致 –>

  46. <property name=

    “connectionIDPrefix”

    value=

    “sample-”

    />

  47. <!– 消息转发和消费时,校验TTL是否过期 –>

  48. <property name=

    “consumerExpiryCheckEnabled”

    value=

    “true”

    />

  49. <!– 要求Broker端进行异步转发,提高消息的发送效率 –>

  50. <property name=

    “dispatchAsync”

    value=

    “true”

    />

  51. <property name=

    “prefetchPolicy”

    >

  52. <bean

    class

    =

    “org.apache.activemq.ActiveMQPrefetchPolicy”

    >

  53. <property name=

    “queuePrefetch”

    value=

    “100”

    />

  54. </bean>

  55. </property>

  56. <property name=

    “sendTimeout”

    value=

    “10000”

    />

  57. <!– 异步发送,可能导致消息丢失,通常对于非持久化消息可以采用异步发送 + producerWindowSize –>

  58. <property name=

    “useAsyncSend”

    value=

    “false”

    />

  59. </bean>


  60. <bean id=

    “testQueue”


    class

    =

    “org.apache.activemq.command.ActiveMQQueue”

    >

  61. <constructor-arg value=

    “test-queue”

    />

  62. </bean>

  63. </beans>

我们使用ActiveMQ提供的PooledConnectionFactory,底层基于连接池(session对象池)机制,在一定程度上可以提高底层消息的通信效率,特别是在高并发环境中。我们并没有采用Spring-JMS中提供的CachingConnectionFactory,因为它是单连接机制,而且在Consumer层面,稍有不慎可能导致消息的重发问题。

上述配置中,有些细微的参数需要特别注意,否则可能导致问题。(配置中并不是所有的参数都是为Producer服务的,有些是针对Consumers)


七、Consumer端(基于Spring)

Java代码
收藏代码


  1. <bean

    class

    =

    “org.springframework.jms.listener.DefaultMessageListenerContainer”

    >

  2. <property name=

    “connectionFactory”

    ref=

    “amqPooledConnectionFactory”

    />

  3. <property name=

    “destination”

    ref=

    “testQueue”

    />

  4. <property name=

    “messageListener”

    >

  5. <bean

    class

    =

    “com.demo.jms.sample.TestListener”

    >

  6. </bean>

  7. </property>

  8. <property name=

    “concurrentConsumers”

    value=

    “2”

    />

  9. <property name=

    “maxConcurrentConsumers”

    value=

    “5”

    />

  10. <!– 会话确认机制,默认为:

    1

    ,自动确认,我们建议使用:

    2

    ,手动确认 –>

  11. <property name=

    “sessionAcknowledgeMode”

    value=

    “2”

    />

  12. <!– Topic订阅者有效 –>

  13. <!–

  14. <property name=

    “clientId”

    value=

    “${clientId}”

    />

  15. –>

  16. </bean>



TestListener.java样例

Java代码
收藏代码



  1. public


    class

    TestListener

    implements

    MessageListener {


  2. private


    static


    final

    Logger LOGGER = LoggerFactory.getLogger(TestListener.

    class

    );




  3. @Override




  4. public


    void

    onMessage(Message message) {


  5. try

    {


  6. if

    (message

    instanceof

    TextMessage) {

  7. String text = ((TextMessage) message).getText();

  8. LOGGER.error(

    “<Consumer>:”

    + text);

  9. }

  10. message.acknowledge();

  11. }

    catch

    (Exception e) {

  12. LOGGER.error(

    “”

    ,e);

    //遇到异常,如果你希望回滚或者重发,你应该重新抛出


  13. }

  14. }

  15. }


转载于:https://my.oschina.net/javahongxi/blog/1531591