ActiveMQ简单demo

  • Post author:
  • Post category:其他


1.下载ActiveMQ

去官方网站下载:

http://activemq.apache.org/


我下载的时候是 ActiveMQ 5.8.0 Release版

2.运行ActiveMQ

解压缩apache-activemq-5.8.0-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。

启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

3.创建Eclipse项目并运行

创建java project:ActiveMQ-5.8,新建lib文件夹

打开apache-activemq-5.8.0\lib目录

拷贝

activemq-broker-5.8.0.jar

activemq-client-5.8.0.jar

geronimo-j2ee-management_1.1_spec-1.0.1.jar

geronimo-jms_1.1_spec-1.1.1.jar

slf4j-api-1.6.6.jar

这5个jar文件到lib文件夹中,并Build Path->Add to Build Path

结构如图:

Sender.java

Java代码

收藏代码


  1. package

    com.lm.activemq;

  2. /**

  3. * @Header: Sender.java

  4. * 类描述:

  5. * @author: lm

  6. * @date 2013-7-17 上午10:52:42

  7. * @Email

  8. * @company 欢

  9. * @addr 北京市朝阳区劲松

  10. */

  11. import

    javax.jms.Connection;

  12. import

    javax.jms.ConnectionFactory;

  13. import

    javax.jms.DeliveryMode;

  14. import

    javax.jms.Destination;

  15. import

    javax.jms.MessageProducer;

  16. import

    javax.jms.Session;

  17. import

    javax.jms.TextMessage;

  18. import

    org.apache.activemq.ActiveMQConnection;

  19. import

    org.apache.activemq.ActiveMQConnectionFactory;

  20. public


    class

    Sender {

  21. private


    static


    final


    int

    SEND_NUMBER =

    5

    ;

  22. public


    static


    void

    main(String[] args) {

  23. // ConnectionFactory :连接工厂,JMS 用它创建连接
  24. ConnectionFactory connectionFactory;

    // Connection :JMS 客户端到JMS

  25. // Provider 的连接
  26. Connection connection =

    null

    ;

    // Session: 一个发送或接收消息的线程
  27. Session session;

    // Destination :消息的目的地;消息发送给谁.
  28. Destination destination;

    // MessageProducer:消息发送者
  29. MessageProducer producer;

    // TextMessage message;

  30. // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
  31. connectionFactory =

    new

    ActiveMQConnectionFactory(
  32. ActiveMQConnection.DEFAULT_USER,
  33. ActiveMQConnection.DEFAULT_PASSWORD,

    “tcp://localhost:61616”

    );

  34. try

    {

    // 构造从工厂得到连接对象
  35. connection = connectionFactory.createConnection();

  36. // 启动
  37. connection.start();

  38. // 获取操作连接
  39. session = connection.createSession(Boolean.TRUE,
  40. Session.AUTO_ACKNOWLEDGE);

  41. // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
  42. destination = session.createQueue(

    “FirstQueue”

    );

  43. // 得到消息生成者【发送者】
  44. producer = session.createProducer(destination);

  45. // 设置不持久化,此处学习,实际根据项目决定
  46. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

  47. // 构造消息,此处写死,项目就是参数,或者方法获取
  48. sendMessage(session, producer);
  49. session.commit();
  50. }

    catch

    (Exception e) {
  51. e.printStackTrace();
  52. }

    finally

    {

  53. try

    {

  54. if

    (

    null

    != connection)
  55. connection.close();
  56. }

    catch

    (Throwable ignore) {
  57. }
  58. }
  59. }

  60. public


    static


    void

    sendMessage(Session session, MessageProducer producer)

  61. throws

    Exception {

  62. for

    (

    int

    i =

    1

    ; i <= SEND_NUMBER; i++) {
  63. TextMessage message = session.createTextMessage(

    “ActiveMq 发送的消息”
  64. + i);

  65. // 发送消息到目的地方
  66. System.out.println(

    “发送消息:”

    +

    “ActiveMq 发送的消息”

    + i);
  67. producer.send(message);
  68. }
  69. }
  70. }

Receiver.java

Java代码

收藏代码


  1. package

    com.lm.activemq;

  2. /**

  3. * @Header: Receiver.java

  4. * 类描述:

  5. * @author: lm

  6. * @date 2013-7-17 上午10:52:58

  7. * @Email

  8. * @company 欢

  9. * @addr 北京市朝阳区劲松

  10. */

  11. import

    javax.jms.Connection;

  12. import

    javax.jms.ConnectionFactory;

  13. import

    javax.jms.Destination;

  14. import

    javax.jms.MessageConsumer;

  15. import

    javax.jms.Session;

  16. import

    javax.jms.TextMessage;

  17. import

    org.apache.activemq.ActiveMQConnection;

  18. import

    org.apache.activemq.ActiveMQConnectionFactory;

  19. public


    class

    Receiver {

  20. public


    static


    void

    main(String[] args) {

  21. // ConnectionFactory :连接工厂,JMS 用它创建连接
  22. ConnectionFactory connectionFactory;

  23. // Connection :JMS 客户端到JMS Provider 的连接
  24. Connection connection =

    null

    ;

  25. // Session: 一个发送或接收消息的线程
  26. Session session;

  27. // Destination :消息的目的地;消息发送给谁.
  28. Destination destination;

  29. // 消费者,消息接收者
  30. MessageConsumer consumer;
  31. connectionFactory =

    new

    ActiveMQConnectionFactory(
  32. ActiveMQConnection.DEFAULT_USER,
  33. ActiveMQConnection.DEFAULT_PASSWORD,

    “tcp://localhost:61616”

    );

  34. try

    {

  35. // 构造从工厂得到连接对象
  36. connection = connectionFactory.createConnection();

  37. // 启动
  38. connection.start();

  39. // 获取操作连接
  40. session = connection.createSession(Boolean.FALSE,
  41. Session.AUTO_ACKNOWLEDGE);

  42. // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
  43. destination = session.createQueue(

    “FirstQueue”

    );
  44. consumer = session.createConsumer(destination);

  45. while

    (

    true

    ) {

  46. // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s
  47. TextMessage message = (TextMessage) consumer.receive(

    100000

    );

  48. if

    (

    null

    != message) {
  49. System.out.println(

    “收到消息”

    + message.getText());
  50. }

    else

    {

  51. break

    ;
  52. }
  53. }
  54. }

    catch

    (Exception e) {
  55. e.printStackTrace();
  56. }

    finally

    {

  57. try

    {

  58. if

    (

    null

    != connection)
  59. connection.close();
  60. }

    catch

    (Throwable ignore) {
  61. }
  62. }
  63. }
  64. }

5.测试过程

先运行:Receiver.java

再运行:Sender.java

可以看到结果

Sender运行后:

发送消息:ActiveMq 发送的消息1

发送消息:ActiveMq 发送的消息2

发送消息:ActiveMq 发送的消息3

发送消息:ActiveMq 发送的消息4

发送消息:ActiveMq 发送的消息5

Receiver运行后:

收到消息ActiveMq 发送的消息1

收到消息ActiveMq 发送的消息2

收到消息ActiveMq 发送的消息3

收到消息ActiveMq 发送的消息4

收到消息ActiveMq 发送的消息5

要想看到不同的输出内容,通过点击如下图的按钮切换console

在Receiver.java中,可以设置一个时间,比如receive(500000),如下代码所示:

Java代码

收藏代码

  1. TextMessage message = (TextMessage) consumer.receive(

    500000

    );

这个时候运行Receiver.java的话,会使得这个Receiver.java一直运行500秒,在eclipse中可以发现:

点击那个红色方块可以手动停止运行程序