ActiveMQ_点对点队列(二)

  • Post author:
  • Post category:其他



一、本文章包含的内容
1、列举了ActiveMQ中通过Queue方式发送、消费队列的代码(普通文本、json/xml字符串、对象数据)
2、spring+activemq方式

二、配置信息
1、activemq的pom.xml信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

<!--activemq  Begin-->



<


dependency


>



<


groupId


>org.springframework</


groupId


>



<


artifactId


>spring-jms</


artifactId


>



<


version


>${spring.version}</


version


>



</


dependency


>



<!-- <dependency>



<groupId>org.springframework</groupId>



<artifactId>spring-messaging</artifactId>



<version>${spring.version}</version>



</dependency>-->



<


dependency


>



<


groupId


>org.apache.activemq</


groupId


>



<


artifactId


>activemq-all</


artifactId


>



<


version


>5.14.0</


version


>



</


dependency


>



<!--activemq  End-->

2、activemq的配置文件:spring-jms.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

<!-- 启用spring mvc 注解 -->



<


context:component-scan


base-package


=


"org.soa.test.activemq"


/>



<!-- 配置JMS连接工厂 -->



<


bean


id


=


"connectionFactory"


class


=


"org.apache.activemq.ActiveMQConnectionFactory"


>



<


property


name


=


"brokerURL"


value


=


"failover:(tcp://192.168.146.129:61616)"


/>



<!--解决接收消息抛出异常:javax.jms.JMSException: Failed to build body from content. Serializable class not available to broke-->



<


property


name


=


"trustAllPackages"


value


=


"true"


/>



<!-- 是否异步发送 -->



<


property


name


=


"useAsyncSend"


value


=


"true"


/>



</


bean


>



<!--   Queue模式 Begin -->



<!-- 定义消息队列(Queue) -->



<


bean


id


=


"queueDestination"


class


=


"org.apache.activemq.command.ActiveMQQueue"


>



<!-- 设置消息队列的名字 -->



<


constructor-arg


>



<


value


>defaultQueueName</


value


>



</


constructor-arg


>



</


bean


>



<!-- 配置JMS模板,Spring提供的JMS工具类,它发送、接收消息。(Queue) -->



<


bean


id


=


"jmsTemplate"


class


=


"org.springframework.jms.core.JmsTemplate"


>



<


property


name


=


"connectionFactory"


ref


=


"connectionFactory"


/>



<


property


name


=


"defaultDestination"


ref


=


"queueDestination"


/>



<


property


name


=


"pubSubDomain"


value


=


"false"


/>



<!--接收超时时间-->



<!--<property name="receiveTimeout" value="10000" />-->



</


bean


>



<!--   Queue模式 End -->


三、队列发送端及测试程序

1、发送代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

package


org.soa.test.activemq.queues;

import


org.soa.test.activemq.StudentInfo;

import


org.springframework.beans.factory.annotation.Autowired;

import


org.springframework.beans.factory.annotation.Qualifier;

import


org.springframework.jms.core.JmsTemplate;

import


org.springframework.jms.core.MessageCreator;

import


org.springframework.stereotype.Component;

import


javax.jms.Destination;

import


javax.jms.JMSException;

import


javax.jms.Message;

import


javax.jms.Session;

import


java.util.List;

/**



* Created by JamesC on 16-9-22.



*/

@Component

public


class


ProduceMsg {




@Autowired



private


JmsTemplate jmsTemplate;



/**



* 向指定队列发送消息



*/



public


void


sendMessage(Destination destination,


final


String msg) {




System.out.println(


"向队列"


+ destination.toString() +


"发送了消息------------"


+ msg);



jmsTemplate.send(destination,


new


MessageCreator() {




public


Message createMessage(Session session)


throws


JMSException {




return


session.createTextMessage(msg);



}



});



}



/**



* 向默认队列发送消息(默认队列名称在bean:queueDestination配置)



*/



public


void


sendMessage(


final


String msg) {




//queue://queue1



String destination = jmsTemplate.getDefaultDestination().toString();



System.out.println(


"向队列"


+ destination +


"发送了消息------------"


+ msg);



jmsTemplate.send(


new


MessageCreator() {




public


Message createMessage(Session session)


throws


JMSException {




return


session.createTextMessage(msg);



}



});



}



/**



* 向默认队列发送消息



*/



public


void


sendMessageConvertAndSend(


final


Object msg) {




String destination = jmsTemplate.getDefaultDestination().toString();



System.out.println(


"向队列"


+ destination +


"发送了消息------------"


+ msg);



//使用内嵌的MessageConverter进行数据类型转换,包括xml(JAXB)、json(Jackson)、普通文本、字节数组



jmsTemplate.convertAndSend(destination, msg);



}



/**



* 向指定队列发送消息



*/



public


void


sendStudentInfo(Destination destination,


final


StudentInfo msg) {




System.out.println(


"向队列"


+ destination.toString() +


"发送了消息------------"


+ msg);



jmsTemplate.send(destination,


new


MessageCreator() {




public


Message createMessage(Session session)


throws


JMSException {




return


session.createObjectMessage(msg);



}



});



}

}

2、测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105

package


org.soa.test.activemq.queues;

import


com.alibaba.fastjson.JSON;

import


org.apache.activemq.command.ActiveMQQueue;

import


org.junit.Test;

import


org.junit.runner.RunWith;

import


org.soa.test.activemq.StudentInfo;

import


org.springframework.beans.factory.annotation.Autowired;

import


org.springframework.context.ApplicationContext;

import


org.springframework.test.context.ContextConfiguration;

import


org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;

import


org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import


javax.jms.Destination;

import


java.util.Date;

/**



* Created by JamesC on 16-9-22.



*/

@RunWith


(SpringJUnit4ClassRunner.


class


)

@ContextConfiguration


(


"/spring-jms.xml"


)

public


class


ProduceMsgTest


extends


AbstractJUnit4SpringContextTests {




@Autowired



protected


ApplicationContext ctx;



/**



* 队列名queue1  这里使用jms配置文件中的数据



*/



@Autowired



private


Destination queueDestination;



/**



* 队列消息生产者



*/



@Autowired



private


ProduceMsg produceMessage;



//向默认队列发消息(文本)



@Test



public


void


produceMsg_DefaultQueue() {




String msg =


"这里是向默认队列发送的消息"


+


new


Date().toString();



produceMessage.sendMessage(msg);



}



//向默认队列发消息(Json字符串)



@Test



public


void


produceMsg_Json() {




StudentInfo info =


new


StudentInfo();



info.setId(


1


);



info.setStdName(


"李磊"


);



info.setStdNo(


"001"


);



info.setEnterDate(


new


Date());


//队列存放的是时间戳



String alibabaJson = JSON.toJSONString(info);



produceMessage.sendMessage(alibabaJson);



}



//向默认队列发消息(使用convertAndSend发送对象)



@Test



public


void


produceMsg_ConvertAndSend() {




StudentInfo info =


new


StudentInfo();



info.setId(


1


);



info.setStdName(


"李磊"


);



info.setStdNo(


"001"


);



info.setEnterDate(


new


Date());



produceMessage.sendMessageConvertAndSend(info);



}



//向指定队列发消息(文本)



@Test



public


void


produceMsg_CustomQueue() {




for


(


int


i =


0


; i <


20


; i++) {




ActiveMQQueue myDestination =


new


ActiveMQQueue(


"queueCustom"


);



produceMessage.sendMessage(myDestination,


"----发送消息给queueCustom"


);



}



}



//向指定队列发消息(队列名称从XML读取)



@Test



public


void


produceMsg_XmlQueue() {




for


(


int


i =


0


; i <


20


; i++) {




ActiveMQQueue destinationQueue = (ActiveMQQueue) applicationContext.getBean(


"queueDestination"


);



produceMessage.sendMessage(destinationQueue,


"----send my msg to queueXml"


);



}



}



//向指定队列发消息(发送对象)



@Test



public


void


produceMsg_StudentInfo() {




StudentInfo info =


new


StudentInfo();



info.setId(


1


);



info.setStdName(


"李磊"


);



info.setStdNo(


"001"


);



info.setEnterDate(


new


Date());



ActiveMQQueue destination =


new


ActiveMQQueue(


"StudentInfo"


);



produceMessage.sendStudentInfo(destination, info);



}

}


四、队列消费端及测试程序

1、消费代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

package


org.soa.test.activemq.queues;

import


org.soa.test.activemq.StudentInfo;

import


org.springframework.beans.factory.annotation.Autowired;

import


org.springframework.jms.core.JmsTemplate;

import


org.springframework.jms.support.JmsUtils;

import


org.springframework.stereotype.Component;

import


javax.jms.Destination;

import


javax.jms.JMSException;

import


javax.jms.ObjectMessage;

import


javax.jms.TextMessage;

/**



* Created by JamesC on 16-9-22.



*/

@Component

public


class


ConsumeMsg {




@Autowired



private


JmsTemplate jmsTemplate;



/**



* 接受消息



*/



public


String receive(Destination destination) {




TextMessage tm = (TextMessage) jmsTemplate.receive(destination);



String msg =


""


;



try


{




msg = tm.getText();



System.out.println(


"从队列"


+ destination.toString() +


"收到了消息:\t"


+ msg);



}


catch


(JMSException e) {




e.printStackTrace();



return


""


;



}



return


msg;



}



/**



* 接受消息



*/



public


StudentInfo receiveStudentInfo() {




try


{




String destination = jmsTemplate.getDefaultDestination().toString();



ObjectMessage msg=(ObjectMessage)jmsTemplate.receive(destination);



return


(StudentInfo)msg.getObject();



}


catch


(JMSException e) {




//检查性异常转换为非检查性异常



throw


JmsUtils.convertJmsAccessException(e);



}



}



/**



* 接受消息



*/



public


Object receiveConvertAndReceive() {




String destination = jmsTemplate.getDefaultDestination().toString();



Object msg = jmsTemplate.receiveAndConvert(destination);



return


msg;



}

}

2、测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

package


org.soa.test.activemq.queues;

import


org.apache.activemq.command.ActiveMQQueue;

import


org.junit.Test;

import


org.junit.runner.RunWith;

import


org.soa.test.activemq.StudentInfo;

import


org.springframework.beans.factory.annotation.Autowired;

import


org.springframework.test.context.ContextConfiguration;

import


org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**



* Created by JamesC on 16-9-22.



*/

@RunWith


(SpringJUnit4ClassRunner.


class


)

@ContextConfiguration


(


"/spring-jms.xml"


)

public


class


ConsumeMsgTest {




@Autowired



private


ConsumeMsg consumeMsg;



//从指定队列接收消息(文本)



@Test



public


void


receiveMsg() {




//没有消息阻塞一段时间后会抛异常



//java.lang.NullPointerException



ActiveMQQueue destination =


new


ActiveMQQueue(


"defaultQueueName"


);



consumeMsg.receive(destination);



}



//从指定队列接收消息(StudentInfo对象消息)



@Test



public


void


receiveStudentInfo() {




StudentInfo msg = consumeMsg.receiveStudentInfo();



System.out.println(msg.getStdName());



}



//从指定队列接收消息(Json对象)



@Test



public


void


receiveConvertAndReceive() {




StudentInfo msg =(StudentInfo) consumeMsg.receiveConvertAndReceive();



System.out.println(msg.getStdName());



}

}