一.消息中心简介
(一).应用场景
并发的业务:如抢购
耗时长的业务:如商城中的订单生成
耦合度高的业务:如邮件,短信等提醒功能
(二).常见的消息中间件
1.ActiveMQ
ActiveMQ是Apache出品,比较老的一个开源的消息中间件, 是一个完全支持JMS规范的消息中间件.
API丰富,以前在中小企业应用广泛
MQ衡量的指标:服务性能,数据存储,集群架构
2.KafKa
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
3.RocketMQ
RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于 2017 年 9 月 25 日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。
淘宝内部的交易系统使用了淘宝自主研发的 Notify 消息中间件,使用 MySQL 作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011 年初,Linkin开源了 Kafka 这个优秀的消息中间件,淘宝中间件团队在对 Kafka 做过充分 Review 之后, Kafka 无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用 Java 语言编写了 RocketMQ ,定位于非日志的可靠消息传输(日志场景也OK),目前 RocketMQ 在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binlog 分发等场景。
4.RabbitMQ
abbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
对数据的一致性,稳定性和可靠性要求比较高的场景
(三).AMQP协议
AMQP高级消息队列协议
定义:Advanced Message Queueing Protocol
是面向消息的中间件的开放标准应用层协议,AMQP的特征是消息导向,排队,路由(包括点对点和发布和订阅),可靠性和安全性。
AMQP要求消息传递提供商和客户端的行为在不同供应商实现可互操作的情况下,以与SMTP,HTTP,FTP等相同的方式创建了可互操作的系统。
AMQP协议是具有现代特征的二进制协议。一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开发标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
AMQP是一种二进制应用层协议,旨在有效地支持各种消息应用和通信模式。
协议模型如下:
AMQP核心概念:
Server
又称作Broker,用于接受客户端的连接,实现AMQP实体服务;
Connection
连接,应用程序与Broker的网络连接;
Channel
网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务;
Message
消息,服务器和应用程序之间传送的数据,有Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体内容,即我们要传输的数据;
仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。
Virtual Host
虚拟地址,是一个逻辑概念,用于进行逻辑隔离,是最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue;
Virtual Host是权限控制的最小粒度;
Exchange
交换机,用于接收消息,可根据路由键将消息转发到绑定的队列;
Binding:
Exchange和Queue之间的虚拟连接,Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定
Routing Key:
一个路由规则,虚拟机可用它来确定如何路由一个特定的消息;
Queue:
也称作Message Queue,即消息队列,用于保存消息并将他们转发给消费者;
RabbitMQ整体架构图:
消息处理流程:
rabbitMQ的下载安装,请才考下文(Windows版本)
rabbitMQ的安装
二.RabbitMQ使用
(一).入门案例
添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
消息生产者
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
运行main方法,查看控制台
已经消息添加到消息队列中
消息消费者
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
运行main方法,查看控制台
(二).相关参数讲解
1.durable:
RabbitMQ默认将消息存储在内存中,若RabbitMQ宕机,那么整个队列会丢失.
可以把这个属性设置成true,表示这个队列需要做持久化.
这个属性只是声明队列是持久化的,RabbitMQ宕机或者重启之后,队列依然存在,但是里面的消息没有持久化,也会丢失.所以需要针对消息也做持久化.
channel.basicPublish(“”,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
2.exclusive
如果你想创建一个只有自己可见的队列,即不允许其它用户访问,设置为true,RabbitMQ允许你将一个Queue声明成为排他性的(Exclusive Queue)。
该队列的特点是:
1.只对首次声明它的连接(Connection)可见
2.会在其连接断开的时候自动删除。
3.autoDelete
当所有消费客户端连接断开后,是否自动删除队列 true:删除false:不删除
(三).Worker模式
消息生成者
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = "tasequeue";
for(int i=0;i<20;i++){
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
(message+i).getBytes("UTF-8"));
}
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消息消费者
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
默认情况下,是平均分配的,因为每个消费者,有一个预取的初始数量.250.相当于队列的消息,已经分配好了给哪个消费者,如果其中有一个消费者性能较差,这样就会影响效率,所有可以把预取的初始数量设置为1,这样,性能较高的消费者就会去消费更过的消息,大大提高效率
channel.basicQos(1);
(四).Pub/Sub模式
X表示交换机
消息生产者
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "info: Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消息消费者
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//定义一个匿名额队列
String queueName = channel.queueDeclare().getQueue();
//将队列和交换机绑定
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
(五).Routing(路由)模式
消息生产者
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//指定的可以
String severity = "info";
//消息内容
String message = "directMsg";
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
}
消息消费者
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "error");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
(六).Topic模式
只有topic才支持key的通配的
三.SpringBoot整合RabbitMQ
(一).添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(二).简单Queue模式
消息生产者
@Controller
public class QueueController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/queue")
@ResponseBody
public String sendMsg(String msg){
rabbitTemplate.convertAndSend("","boot_queue",msg);
return "发送成功";
}
}
消息消费者:
@Component
public class QueueListener {
@RabbitListener(queuesToDeclare = @Queue("boot_queue"))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel){
System.out.println("收到消息:"+msg);
}
}
(三).Worker模式
签收模式配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=1
消息生产者
@Controller
public class WorkerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/worker")
@ResponseBody
public String sendMsg(){
for(int i= 0;i<20;i++){
rabbitTemplate.convertAndSend("","boot_worker","msg:"+i);
}
return "发送成功";
}
}
消息消费者:
@Component
public class Worker {
@RabbitListener(queuesToDeclare = @Queue("boot_worker"))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("工作者1:"+msg);
channel.basicAck(deliveryTag,false);
}
}
(四).Pub/Sub模式
消息生产者
@Controller
public class PubSubController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/pubsub")
@ResponseBody
public String sendMsg(String msg){
rabbitTemplate.convertAndSend("boot_pubsub","","广播消息");
return "发送成功";
}
}
消息消费者:
@Component
public class PubSubReceiver {
@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "boot_pubsub",type = "fanout")))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws Exception {
System.out.println("收到消息1:"+msg);
channel.basicAck(deliveryTag,false);
}
}
(五).Routing模式
消息生产者
@Controller
public class RountingController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/rounting")
@ResponseBody
public String sendMsg(String key){
rabbitTemplate.convertAndSend("boot_rounting_exchange",key,"rounting消息");
return "发送成功";
}
}
消息消费者
@Component
public class RountingReceiver {
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "boot_rounting_queue01"),
exchange = @Exchange(name = "boot_rounting_exchange",type = "direct"),
key = {"error","info"}
))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws Exception {
System.out.println("error&info 收到消息:"+msg);
channel.basicAck(deliveryTag,false);
}
}
(六).Topic模式
消息生产者
@Controller
public class TopicController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/topic")
@ResponseBody
public String sendMsg(String key){
rabbitTemplate.convertAndSend("boot_topic_exchange",key,"topic消息");
return "发送成功";
}
}
消息消费者
@Component
public class TopicReceiver01 {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("boot_topic_queue01"),
exchange = @Exchange(name = "boot_topic_exchange",type = "topic"),
key = "order.*"
))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws Exception {
System.out.println("topic收取消息:"+msg);
channel.basicAck(deliveryTag,false);
}
}