Redis其实也可以做消息队列,但是更多的企业选择了ActiveMQ,为什么,因为Redis的消息队列比较简单,无法做到像ActiveMQ,那样做做到点对点的消息订阅与发送
首先是哪些情况需要用到消息中心?
1.需要解耦出来的业务
比如淘宝中业务的处理就是使用发布/监听的方式,此处不展开,后面会有详细说明
2.耗时比较久的业务:MQ
比如订单服务,整套订单流水很长,而RPC调用(比如Dubbo)是
同步请求
的,在发出请求的时候,客户端就要TM一直等订单系统回应结果!
在这个过程中,在等的过程中就要
占用服务器的CPU和内存资源
,这还不算最糟糕的情况,如果遇到
高并发
的情况下,有些订单延迟过高,用户会以为出问题了,会反复提交订单,造成状况进一步恶化,和资源的进一步被占用
此时如果采用消息中心进行通讯,那么客户端可以
不用去管后台的情况
,直接通过消息中心返回的消息,用户拿到消息后,认为OK,但是实际上
请求
仍然在后台排队等待处理
优点:消息中心避免了
同步请求
带来的问题,既前台应用根本不用等到后台Service非要将业务处理完毕才返回请求,也同时解决了高并发产生的问题;
PS:但是如果订单建立失败,则会涉及到分布式事务,这个后面解释
简单总结:
对于链条比较长且还是高并发的事务可以采用消息中心来处理,这样比RPC同步调用效率会高出不少
3.存在高并发的业务:MQ + Redis
比如常见的秒杀抢购
假如没有消息中心,采用RPC进行通讯,那么常规的方法可以加
乐观锁
解决高并发 + 万能的验证码
削个峰
;
另一种方案就是用队列,就是将所有的请求全部放在队列中,这样就可以不用处理高并发产生的问题
但是这样做,虽然解决了高并发的问题,仍然有其他的问题,问题就是在于客户太多,请求会很多,比如10W个请求,如果商品有有限(比如1),如何解决超卖的问题?
如果要解决超卖的问题,那么我这边的客户端就一定要读取到商品的总数量
这里会出现的问题就是,A客户端从数据库读取到数据怎么能保证不是脏数据?
要做秒杀系统的另一个关键就是:
读操作必须是原子性的,不然没有办法解决超卖的问题!
解决的思路就是利用
Redis的读写操作原子性
的特点来进行改造
具体步骤是:
1.到达了商品抢购的时间,把抢购的商品的数量通过数据库读取到Redis中
2.用户点击商品抢购的按钮,在controller中,使用Redis的decr,让商品的库存数做减法操作,并且接收到减减之后的结果,判断该结果是否大于等于0 (0~99,合计100)
3.如果不是大于等于0,提示用户,商品抢购完毕,并且让抢购的活动结束
4.如果是大于等于0,说明该商品还有库存可以抢,发送关于商品抢购的消息到消息中心
5.商品系统以订阅的方式获取消息中心的消息,最终修改数据库
使用消息中心的大致思路已经明白了,那么我们接下就来看看具体消息中心指的是什么?
什么是消息中心?
消息中心
1.消息异步接收:消息发送者不需要等待接受者的响应,提高整个应用的效率
2.消息可靠接收:消息发送出去以后保存在一个中间容器中,只有消息的接受者收到消息后才能删除消息
3.消息队列接收:消息以队列的形式接收,一个一个排队处理
比较流行的消息中心:
收费:IBM MQService,BEA WebLogic,Oracle MQ
开源:
ActiveMQ(老牌),
RabbitMQ(老牌,Apeach出品),
Kafka(性能很高,单台就能够处理百万级别的并发,一般用在大数据日志的收集),
RocketMQ(阿里开源中间件)
收费:阿里云GTS分布式事务
Kafka例外,并没有实现JMS,原因它并不是Java语言开发的
JMS
JMS就是Java消息服务(
Java Message Servcie
)应用程序接口,是JavaEE平台关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送信息,进行异步他通信,
可以类比为JDBC
JMS规范
JMS定义了Java中间件中访问消息中心的接口,并没有给予实现,实现JMS接口的消息中心成为JMS Provider,例如ActiveMQ
假如想更换消息中心,只需要更换底层的JAR包即可,无需修改代码
关于JMS的几个概念
JMS Provider:实现了JMS接口和规范的消息中心,比如ActiveMQ
JSM Producer: 消息生产者,创建和发送JMS消息的客户端应用
JMS Consumer:消息消费者,接收和处理JSM消息的客户端应用
JMS Message :JMS 消息,分3个部分
1)消息头:每个消息头字段都有相应的getter和setter方法
2)消息属性:如果需要消除消息字段以外的值,那么可以使用消息属性
3)消息体:封装具体的消息数据
5.JMS Destination:消息的目的地,包含queue和topic
目的地通常是一串字符串,比如XX要去北京,XX的目的地就是“beijing”,在消息中心就是”message-order”等形式表型
6.JMS Domain :消息传递域,JMS规范定义了两种消息传递域,分别是点对点和发布/订阅传递域
p2p与queue
1)点对点:
point to point 简称ptp 或者 p2p 消息传递域
,该消息传递域发送的消息地称之为队列(queue)
队列queue特点:
A.每个消息只能有一个消费者或者只能有一种消费者
(为什么称之为一种,后面会有解释)
B.消息的生产者和消费者
没有时间上的相关性
,消费者在提取消息的时候,消息的生产者是否处于运行状态,消费者还是可以去提取消息
或者说消费者所在服务器挂了,但重启后消费者仍然能获取到消息
这个就是
queue下生产者和消费者没有时间上的相关性
的含义
pub/sub与Topic
publish/subscribe 消息传递域,该消息传递域发送的目的地成为主题(topic)
A.每个消息可以有多个消费者
B.生产者和消费者之间有时间上的相关性,订阅一个主体的消费者只能消费自它订阅之后的消息
简单总结:
简单的来说,
pub/sub可以有N多个消费者,pub/sub发布消息相当于广播,有N个就发送N条消息,
不像p2p只能针对一个或者一种消费者一次性只能发一条消息
pub/sub的消费者和订阅者有时间上的相关性,即消费者挂了,生产者在消费者重启的时候发的消息消费者就拿不到了
而p2p在生产者或者消费者挂了后重启,消费者仍然能拿到消息
JMS Session
与JMS Provider 所建立的会话,可设置事务,消息消费签收方式
这个签名方式就是:
设置事务签名:消费者去拿消息,除非等到该消息被提交之后,消息中心才会删除消息
自定义手动签名:可以设置手动签收,手动签收后才消息中心才会删除消息
ActiveMQ
是Apache推出的,一款开源的,完全支持JMS1.1和JavaEE1.4规范的JMS Provider实现的消息中心(MOM)
ActiveMQ能干什么: 最主要的是用来帮助实现高可用,高性能,可伸缩,易用和安全的消息服务系统
Active MQ 特点:
1.完全支持JMS1.1和JavaEE1.4规范(持久化)
2.支持多种传送协议:TCP,UDP,SSL,NIO
3.可拔插的体系结构,可灵活定制,如:集群,负载均衡,消息存储方式,安全管理等
4.很容易和系统集成使用(和spring整合)
5.多语言使用:Java,C,C++,Ruby,PHP,Python
6.在设计上保证了高性能的集群
7.支持通过JDBC把消息持久化到数据库
安装与启动
到ActiveMQ下载即可,但是注意目前不要下载5.15.x版以后的,原因后面解释
下载后
tar -zxvf xxx-acivemq
mv xxx-activemq /usr/local/activemq 即可
ActiveMQ的运行也是十分方便,直接进入active_home/bin
运行./activemq start 即可
查看是否成功运行 ps -ef | grep activemq
也许你没有运行成功,但是后面会有相关的解决方案
查看配置
这里介绍 ActiveMQ有哪些配置起作用
进入activemq_home/conf下
vi activemq.xml
进入activemq.xml后可以进行具体的设置
设置ActiveMQ的监听端口,默认为61616
访问地址是:localhost:8161/admin 这些都可以配置的,包括访问的协议等,如下图
查看jetty.xml
进入jetty中可以查看管理端的配置
设置ActiveMQ的管理端口,默认为: 8161
或者设置管理端的用户名和密码,默认为admin,admin
想要修改的可以去查询其他攻略这里不展开
解决无法启动和无法访问的问题
- 防火墙的问题
启动管理端,前提一定要将防火墙开放对应的端口
如果是阿里云ECS一定要重新配置安全组规则,增加对应的端口开放
- JDK版本问题
注意启动前提之二:必须JDK1.8,假如是1.7将无法启动
将JDK更换为1.8后
ps -ef | grep activemq
Java中ActiveMQ的使用
- 下载Java组件
下载的时候注意,一定是Broker不是Core
并且
Java组件版本一定要与自己用的activemq版本一一对应
,这就是前面为什么要求一定要下载5.15.X的版本的原因,因为之后的Java组件根本没有对应的版本!
这里我选用的是:5.15.3版本的
ActiveMQ启动
启动后进入Queue,相关界面如下
现在开始做Produce和Consumer来对MQ进行操作
DEMO演示-producer
首先是producer
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;
public class MessageProducer {
public static void main(String[] args) throws Exception {
//创建连接工厂
ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory(
"tcp://192.168.230.130:61616"
);
//从连接工厂创建连接
Connection connection =factory.createConnection();
//开启连接
connection.start();
//这里说明一下,true代表开启事务,一但开启事务,后面的session就必须要提交
//不然activeMQ会认为producer这边出了问题,不认生产的消息
//至于10是乱填的,因为一但确定事务开启之后,后面填什么都是一样的
Session session = connection.createSession(true, 10);
//创建消息发送的地点,地点有两种:p2p和topic
Destination destination = session.createQueue("message-test");
//创建消息发送者
javax.jms.MessageProducer producer = session.createProducer(destination);
//创建消息对象
TextMessage message = session.createTextMessage("测试消息中心");
//发送消息
producer.send(message);
//提交事务
session.commit();
//关闭会话
session.close();
//关闭连接
connection.close();
}
}
执行一次生产消息,结果
DEMO演示-消费者consumer
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;
public class MessageConsumer {
public static void main(String[] args) throws Exception {
//创建连接工厂
ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory(
"tcp://192.168.230.130:61616"
);
//从连接工厂创建连接
Connection connection =factory.createConnection();
//开启连接
connection.start();
//不然无法提交事务
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
//创建消息发送的地点,地点有两种:p2p和topic
//此处要去消费消息,地点一定要相同
Destination destination = session.createQueue("message-test");
//创建消息的消费者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//接收消息
TextMessage receive = (TextMessage) consumer.receive();
//获取消息内容
String text = receive.getText();
System.out.println(text);
session.commit();
//关闭会话
session.close();
//关闭连接
connection.close();
}
}
最后的结果:
假如消费者将事务提交的语句删除了
session.commit();
虽然消费者能够拿到消息,但是由于
消费者没有提交事务
,ActiveMQ会认为Counsumer的消息处理出现了异常,所以并不会删除消息
当Consumer这边的事务提交以后,这边的消息中心才最终删除了消息
ActiveMQ消息的持久化
消息中心消息的两种种模式:
一、事务模式
static final int SESSION_TRANSACTED = 0;//当事务提交为true时,就为此值,但此值在代码是被忽略的,所以true后,填何种int值都无所谓
二、非事务模式
static final int AUTO_ACKNOWLEDGE = 1; 消息自动签收
static final int CLIENT_ACKNOWLEDGE = 2; 客户端必须调用acknowledge签收
static final int DUPS_OK_ACKNOWLEDGE = 3; 不签收
持久化消息类型
此处我们使用Windows版本的ActiveMQ来模仿
持久化消息:
使用:DeliveryMode.PERSISTENT
默认就是持久化
特点:发送的消息会持久化到硬盘
非持久化消息
使用:DeliveryMode.NON_PERSISTENT
方法:producer.setPriority(DeliveryMode.NON_PERSISTENT);
特点:发送的消息会保存到内存
具体的配置:
找到data目录
简单的来说就是将内存中的数据通过IO写到硬盘中去
假如将该目录删除,那么重启activeMQ的时候,所有的数据都会丢失
另一种是LevelDB,谷歌开发的数据库,写入效率高
另一种方式写入到关系型数据库中:
消息消费者的问题
原先的模式,receive()方法,一次性只能拿一条
要反复调用receive()方法,就很烦,虽然我们要换种方法来,就是采用监听器的方法来做消息的处理
先看效果:
启动带有监听器的消费者:
而这种方式就是企业中最常见的业务处理方法,即调用监听器来处理业务
具体实现方式是:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;
public class MessageConsumer {
public static void main(String[] args) throws Exception {
//创建连接工厂
ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory(
"tcp://192.168.230.130:61616"
);
//从连接工厂创建连接
Connection connection =factory.createConnection();
//开启连接
connection.start();
//不然无法提交事务
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
//创建消息发送的地点,地点有两种:p2p和topic
//此处要去消费消息,地点一定要相同
Destination destination = session.createQueue("message-test");
//创建消息的消费者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//接收消息
//TextMessage receive = (TextMessage) consumer.receive();
//获取消息内容
//String text = receive.getText();
//System.out.println(text);
//session.commit();
consumer.setMessageListener(new MessageListener() {
//使用匿名内部类的方法来实现该接口
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
//获取消息
String text = null;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//关闭会话
//session.close();
//关闭连接
//connection.close();
}
}
消息的传递域: Queue
特点是:
一次性只能由一个消费者或一种消费者接受到消息
为什么?因为在有的场景中,有些业务必须而且只能由一个Service来接收,比如订单服务
但是如果有多个相同的消费者在监听的时候,那么必然是轮着来的
所以这个就是一种消费者的含义
为什么?因为采用的时候负载均衡用的
并且没有时间上的关联性,就算消费者挂掉了,后面消费者重启后,也能拿到消息
消息的传递域: Topic
时间的关联性的解释:
消费者,只能在订阅的时候才能拿到消息,如果消费者意外挂了,那么它将不能在挂了之后重启拿到消息