ActiveMQ:消息中心基本介绍

  • Post author:
  • Post category:其他


Redis其实也可以做消息队列,但是更多的企业选择了ActiveMQ,为什么,因为Redis的消息队列比较简单,无法做到像ActiveMQ,那样做做到点对点的消息订阅与发送

首先是哪些情况需要用到消息中心?

1.需要解耦出来的业务

比如淘宝中业务的处理就是使用发布/监听的方式,此处不展开,后面会有详细说明

2.耗时比较久的业务:MQ

在这里插入图片描述

比如订单服务,整套订单流水很长,而RPC调用(比如Dubbo)是

同步请求

的,在发出请求的时候,客户端就要TM一直等订单系统回应结果!

在这个过程中,在等的过程中就要

占用服务器的CPU和内存资源

,这还不算最糟糕的情况,如果遇到

高并发

的情况下,有些订单延迟过高,用户会以为出问题了,会反复提交订单,造成状况进一步恶化,和资源的进一步被占用

在这里插入图片描述

此时如果采用消息中心进行通讯,那么客户端可以

不用去管后台的情况

,直接通过消息中心返回的消息,用户拿到消息后,认为OK,但是实际上

请求

仍然在后台排队等待处理

优点:消息中心避免了

同步请求

带来的问题,既前台应用根本不用等到后台Service非要将业务处理完毕才返回请求,也同时解决了高并发产生的问题;

PS:但是如果订单建立失败,则会涉及到分布式事务,这个后面解释

简单总结:


对于链条比较长且还是高并发的事务可以采用消息中心来处理,这样比RPC同步调用效率会高出不少

3.存在高并发的业务:MQ + Redis

比如常见的秒杀抢购

在这里插入图片描述

假如没有消息中心,采用RPC进行通讯,那么常规的方法可以加

乐观锁

解决高并发 + 万能的验证码

削个峰

另一种方案就是用队列,就是将所有的请求全部放在队列中,这样就可以不用处理高并发产生的问题

在这里插入图片描述

但是这样做,虽然解决了高并发的问题,仍然有其他的问题,问题就是在于客户太多,请求会很多,比如10W个请求,如果商品有有限(比如1),如何解决超卖的问题?

如果要解决超卖的问题,那么我这边的客户端就一定要读取到商品的总数量


这里会出现的问题就是,A客户端从数据库读取到数据怎么能保证不是脏数据?

要做秒杀系统的另一个关键就是:

读操作必须是原子性的,不然没有办法解决超卖的问题!

解决的思路就是利用

Redis的读写操作原子性

的特点来进行改造

在这里插入图片描述

具体步骤是:

1.到达了商品抢购的时间,把抢购的商品的数量通过数据库读取到Redis中

2.用户点击商品抢购的按钮,在controller中,使用Redis的decr,让商品的库存数做减法操作,并且接收到减减之后的结果,判断该结果是否大于等于0 (0~99,合计100)

3.如果不是大于等于0,提示用户,商品抢购完毕,并且让抢购的活动结束

4.如果是大于等于0,说明该商品还有库存可以抢,发送关于商品抢购的消息到消息中心

5.商品系统以订阅的方式获取消息中心的消息,最终修改数据库

使用消息中心的大致思路已经明白了,那么我们接下就来看看具体消息中心指的是什么?

什么是消息中心?

消息中心

1.消息异步接收:消息发送者不需要等待接受者的响应,提高整个应用的效率

2.消息可靠接收:消息发送出去以后保存在一个中间容器中,只有消息的接受者收到消息后才能删除消息

3.消息队列接收:消息以队列的形式接收,一个一个排队处理

比较流行的消息中心:

收费:IBM MQService,BEA WebLogic,Oracle MQ

开源:

ActiveMQ(老牌),

RabbitMQ(老牌,Apeach出品),

Kafka(性能很高,单台就能够处理百万级别的并发,一般用在大数据日志的收集),

RocketMQ(阿里开源中间件)

收费:阿里云GTS分布式事务

Kafka例外,并没有实现JMS,原因它并不是Java语言开发的

JMS

JMS就是Java消息服务(

Java Message Servcie

)应用程序接口,是JavaEE平台关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送信息,进行异步他通信,

可以类比为JDBC

JMS规范

JMS定义了Java中间件中访问消息中心的接口,并没有给予实现,实现JMS接口的消息中心成为JMS Provider,例如ActiveMQ

假如想更换消息中心,只需要更换底层的JAR包即可,无需修改代码

关于JMS的几个概念

JMS Provider:实现了JMS接口和规范的消息中心,比如ActiveMQ

JSM Producer: 消息生产者,创建和发送JMS消息的客户端应用

JMS Consumer:消息消费者,接收和处理JSM消息的客户端应用

JMS Message :JMS 消息,分3个部分

1)消息头:每个消息头字段都有相应的getter和setter方法

2)消息属性:如果需要消除消息字段以外的值,那么可以使用消息属性

3)消息体:封装具体的消息数据

5.JMS Destination:消息的目的地,包含queue和topic

目的地通常是一串字符串,比如XX要去北京,XX的目的地就是“beijing”,在消息中心就是”message-order”等形式表型

6.JMS Domain :消息传递域,JMS规范定义了两种消息传递域,分别是点对点和发布/订阅传递域

p2p与queue

1)点对点:

point to point 简称ptp 或者 p2p 消息传递域

,该消息传递域发送的消息地称之为队列(queue)

队列queue特点:

A.每个消息只能有一个消费者或者只能有一种消费者

(为什么称之为一种,后面会有解释)

在这里插入图片描述

B.消息的生产者和消费者

没有时间上的相关性

,消费者在提取消息的时候,消息的生产者是否处于运行状态,消费者还是可以去提取消息

或者说消费者所在服务器挂了,但重启后消费者仍然能获取到消息

这个就是

queue下生产者和消费者没有时间上的相关性

的含义

在这里插入图片描述

pub/sub与Topic

publish/subscribe 消息传递域,该消息传递域发送的目的地成为主题(topic)

A.每个消息可以有多个消费者

B.生产者和消费者之间有时间上的相关性,订阅一个主体的消费者只能消费自它订阅之后的消息

简单总结:

简单的来说,

pub/sub可以有N多个消费者,pub/sub发布消息相当于广播,有N个就发送N条消息,

不像p2p只能针对一个或者一种消费者一次性只能发一条消息

pub/sub的消费者和订阅者有时间上的相关性,即消费者挂了,生产者在消费者重启的时候发的消息消费者就拿不到了

而p2p在生产者或者消费者挂了后重启,消费者仍然能拿到消息

JMS Session

与JMS Provider 所建立的会话,可设置事务,消息消费签收方式

这个签名方式就是:

设置事务签名:消费者去拿消息,除非等到该消息被提交之后,消息中心才会删除消息

自定义手动签名:可以设置手动签收,手动签收后才消息中心才会删除消息

ActiveMQ

是Apache推出的,一款开源的,完全支持JMS1.1和JavaEE1.4规范的JMS Provider实现的消息中心(MOM)

ActiveMQ能干什么: 最主要的是用来帮助实现高可用,高性能,可伸缩,易用和安全的消息服务系统

Active MQ 特点:

1.完全支持JMS1.1和JavaEE1.4规范(持久化)

2.支持多种传送协议:TCP,UDP,SSL,NIO

3.可拔插的体系结构,可灵活定制,如:集群,负载均衡,消息存储方式,安全管理等

4.很容易和系统集成使用(和spring整合)

5.多语言使用:Java,C,C++,Ruby,PHP,Python

6.在设计上保证了高性能的集群

7.支持通过JDBC把消息持久化到数据库

安装与启动

到ActiveMQ下载即可,但是注意目前不要下载5.15.x版以后的,原因后面解释


http://activemq.apache.org/download-archives.html

下载后

tar -zxvf xxx-acivemq

mv xxx-activemq /usr/local/activemq 即可

ActiveMQ的运行也是十分方便,直接进入active_home/bin

运行./activemq start 即可

在这里插入图片描述

查看是否成功运行 ps -ef | grep activemq

也许你没有运行成功,但是后面会有相关的解决方案

查看配置

这里介绍 ActiveMQ有哪些配置起作用

进入activemq_home/conf下

vi activemq.xml

在这里插入图片描述

进入activemq.xml后可以进行具体的设置

设置ActiveMQ的监听端口,默认为61616

访问地址是:localhost:8161/admin 这些都可以配置的,包括访问的协议等,如下图

在这里插入图片描述

查看jetty.xml

在这里插入图片描述

进入jetty中可以查看管理端的配置

设置ActiveMQ的管理端口,默认为: 8161

或者设置管理端的用户名和密码,默认为admin,admin

想要修改的可以去查询其他攻略这里不展开

解决无法启动和无法访问的问题

  • 防火墙的问题

启动管理端,前提一定要将防火墙开放对应的端口

如果是阿里云ECS一定要重新配置安全组规则,增加对应的端口开放

  • JDK版本问题

注意启动前提之二:必须JDK1.8,假如是1.7将无法启动

在这里插入图片描述

将JDK更换为1.8后

ps -ef | grep activemq

在这里插入图片描述

Java中ActiveMQ的使用

  • 下载Java组件

下载的时候注意,一定是Broker不是Core

并且

Java组件版本一定要与自己用的activemq版本一一对应

,这就是前面为什么要求一定要下载5.15.X的版本的原因,因为之后的Java组件根本没有对应的版本!

这里我选用的是:5.15.3版本的

在这里插入图片描述

ActiveMQ启动

启动后进入Queue,相关界面如下

在这里插入图片描述

现在开始做Produce和Consumer来对MQ进行操作

DEMO演示-producer

首先是producer

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;

public class MessageProducer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ActiveMQConnectionFactory factory =
                new ActiveMQConnectionFactory(
                        "tcp://192.168.230.130:61616"
                );
        //从连接工厂创建连接
        Connection connection =factory.createConnection();

        //开启连接
        connection.start();
        
        //这里说明一下,true代表开启事务,一但开启事务,后面的session就必须要提交
        //不然activeMQ会认为producer这边出了问题,不认生产的消息
        //至于10是乱填的,因为一但确定事务开启之后,后面填什么都是一样的
        Session session = connection.createSession(true, 10);
        
        //创建消息发送的地点,地点有两种:p2p和topic
        Destination destination = session.createQueue("message-test");
        
        //创建消息发送者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        
        //创建消息对象
        TextMessage message = session.createTextMessage("测试消息中心");
        
        //发送消息
        producer.send(message);
        
        //提交事务
        session.commit();
        
        //关闭会话
        session.close();
        
        //关闭连接
        connection.close();
    }
}

执行一次生产消息,结果

在这里插入图片描述

DEMO演示-消费者consumer

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;

public class MessageConsumer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ActiveMQConnectionFactory factory =
                new ActiveMQConnectionFactory(
                        "tcp://192.168.230.130:61616"
                );
        //从连接工厂创建连接
        Connection connection =factory.createConnection();

        //开启连接
        connection.start();
       
        //不然无法提交事务
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        //创建消息发送的地点,地点有两种:p2p和topic
        //此处要去消费消息,地点一定要相同
        Destination destination = session.createQueue("message-test");
        //创建消息的消费者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //接收消息
        TextMessage receive = (TextMessage) consumer.receive();
        //获取消息内容
        String text = receive.getText();
        System.out.println(text);
        session.commit();
        //关闭会话
        session.close();
        //关闭连接
        connection.close();
    }
}

最后的结果:

在这里插入图片描述

假如消费者将事务提交的语句删除了

 session.commit();

虽然消费者能够拿到消息,但是由于

消费者没有提交事务

,ActiveMQ会认为Counsumer的消息处理出现了异常,所以并不会删除消息

在这里插入图片描述

当Consumer这边的事务提交以后,这边的消息中心才最终删除了消息

在这里插入图片描述

ActiveMQ消息的持久化

消息中心消息的两种种模式:

一、事务模式

	static final int SESSION_TRANSACTED = 0;//当事务提交为true时,就为此值,但此值在代码是被忽略的,所以true后,填何种int值都无所谓

二、非事务模式

   static final int AUTO_ACKNOWLEDGE = 1;  消息自动签收

   static final int CLIENT_ACKNOWLEDGE = 2;  客户端必须调用acknowledge签收

   static final int DUPS_OK_ACKNOWLEDGE = 3; 不签收

持久化消息类型

此处我们使用Windows版本的ActiveMQ来模仿

持久化消息:

使用:DeliveryMode.PERSISTENT

默认就是持久化

特点:发送的消息会持久化到硬盘

非持久化消息

使用:DeliveryMode.NON_PERSISTENT

方法:producer.setPriority(DeliveryMode.NON_PERSISTENT);

特点:发送的消息会保存到内存

具体的配置:

在这里插入图片描述

找到data目录

在这里插入图片描述

在这里插入图片描述

简单的来说就是将内存中的数据通过IO写到硬盘中去

假如将该目录删除,那么重启activeMQ的时候,所有的数据都会丢失

另一种是LevelDB,谷歌开发的数据库,写入效率高

另一种方式写入到关系型数据库中:

在这里插入图片描述

消息消费者的问题

原先的模式,receive()方法,一次性只能拿一条

要反复调用receive()方法,就很烦,虽然我们要换种方法来,就是采用监听器的方法来做消息的处理

先看效果:

在这里插入图片描述

启动带有监听器的消费者:

在这里插入图片描述

而这种方式就是企业中最常见的业务处理方法,即调用监听器来处理业务

具体实现方式是:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;

public class MessageConsumer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ActiveMQConnectionFactory factory =
                new ActiveMQConnectionFactory(
                        "tcp://192.168.230.130:61616"
                );
        //从连接工厂创建连接
        Connection connection =factory.createConnection();

        //开启连接
        connection.start();
        //不然无法提交事务
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

        //创建消息发送的地点,地点有两种:p2p和topic
        //此处要去消费消息,地点一定要相同
        Destination destination = session.createQueue("message-test");
        //创建消息的消费者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //接收消息
        //TextMessage receive = (TextMessage) consumer.receive();
        //获取消息内容
        //String text = receive.getText();
        //System.out.println(text);
        //session.commit();
        
        consumer.setMessageListener(new MessageListener() {
            //使用匿名内部类的方法来实现该接口
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                //获取消息
                String text = null;
                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //关闭会话
        //session.close();
        //关闭连接
        //connection.close();


    }
}

消息的传递域: Queue

特点是:

一次性只能由一个消费者或一种消费者接受到消息

为什么?因为在有的场景中,有些业务必须而且只能由一个Service来接收,比如订单服务

但是如果有多个相同的消费者在监听的时候,那么必然是轮着来的

所以这个就是一种消费者的含义

为什么?因为采用的时候负载均衡用的

并且没有时间上的关联性,就算消费者挂掉了,后面消费者重启后,也能拿到消息

消息的传递域: Topic

时间的关联性的解释:

消费者,只能在订阅的时候才能拿到消息,如果消费者意外挂了,那么它将不能在挂了之后重启拿到消息



版权声明:本文为fenghuoliuxing990124原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。