1、简单测试rocketMQ
1、引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
2、消息生产者类
package com.lx.business.mq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class RocketMqSendTest {
//发送消息
public static void main(String[] args) throws Exception {
//1. 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定Nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3. 启动生产者
producer.start();
//4. 创建消息对象,指定主题、标签和消息体
Message msg = new Message("myTopic", "myTag", ("RocketMQ Message").getBytes());
//5. 发送消息
SendResult sendResult = producer.send(msg, 10000);
System.out.println(sendResult);
//6. 关闭生产者
producer.shutdown();
}
}
3、消费者类
package com.lx.business.mq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMqReceiveTest {
public static void main(String[] args) throws MQClientException {
//1. 创建消息消费者, 指定消费者所属的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");
//2. 指定Nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//3. 指定消费者订阅的主题和标签
consumer.subscribe("myTopic", "*");
//4. 设置回调函数,编写处理消息的方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs,
ConsumeConcurrentlyContext
context) {
System.out.println("Receive New Messages: " + msgs);
//返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5. 启动消息消费者
consumer.start();
System.out.println("Consumer Started.");
}
}
4、先启动消费者,一直监听
5、再启动消费者
2、服务间调用
1、pom文件
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
2、yml 配置
rocketmq:
name-server: 127.0.0.1:9876 #rocketMQ服务的地址
producer:
group: sale-order # 生产者组
3、java测试类、生产者类
package com.lx.business.mq;
import com.lx.business.feign.StockFeignService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class ContractController {
@Autowired
private StockFeignService stockFeignService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping(value = "/feignTest")
public String feignTest(@RequestParam String id){
rocketMQTemplate.convertAndSend("order-topic", byStockId);
return byStockId;
}
}
4、消费者服务 pom
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
5、yml配置
rocketmq:
name-server: 127.0.0.1:9876
6、消费者测试类
package com.lx.business.mq;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class TestRocketMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String order) {
log.info("收到一个消息", JSON.toJSONString(order));
}
}
同步消息发送 生产者端
package com.lx.business.mq;
import com.lx.business.feign.StockFeignService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class ContractController {
@Autowired
private StockFeignService stockFeignService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping(value = "/feignTest")
public String feignTest(@RequestParam String id){
String byStockId = stockFeignService.getByStockId(id);
rocketMQTemplate.convertAndSend("order-topic", byStockId);
return byStockId;
}
@GetMapping(value = "/mqTest1")
public String mqTest1(@RequestParam String id){
String byStockId = stockFeignService.getByStockId(id);
// 同步消息
SendResult sendResult = rocketMQTemplate.syncSend("sale-topic", byStockId);
SendStatus sendStatus = sendResult.getSendStatus();
return sendStatus.toString();
}
}
消费者端
package com.lx.business.mq;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "shop-user2", topic = "sale-topic")
public class TestRocketMQListener2 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("收到一个消息", JSON.toJSONString(message));
}
}
异步消息和单向消息,消费者公用,生产者不同
消息生产者使用Test 单元测试,
//异步消息
@Test
public void testAsyncSend() throws InterruptedException {
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
//参数二: 消息内容
//参数三: 回调函数, 处理返回结果
rocketMQTemplate.asyncSend("sale-topic", "这是一条异步消息", new
SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
//让线程不要终止
Thread.sleep(30000000);
}
//单向消息
@Test
public void testOneWay() {
rocketMQTemplate.sendOneWay("sale-topic", "这是一条单向消息");
}
版权声明:本文为lu1171901273原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。