1.什么是MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用
我们可以拿HTTP协议和MQTT协议做对比来更好地理解什么是MQTT. 按照OSI网络分层模型,IP是
网络层协议
,TCP是传输层协议,而HTTP和MQTT是应用层的协议.在这三者之间, IP/TCP是HTTP和MQTT底层的协议.即MQTT实际上是一个传输层协议.
2. MQTT的使用场景
2.1 http协议和websocket协议
要理解为什么要用mqtt,首先要说一下http协议的不足:
- HTTP客户端和服务器之间的交互是采用请求/应答模式(后来的HTTP1.1支持持久连接,半双工;HTTP2.0是全双工).且消息头内容较多
- HTTP通信方式问题,HTTP的请求/应答方式的会话都是客户端发起的,缺乏服务器通知客户端的机制,在需要通知的场景,如聊天室,游戏,客户端应用需要不断地轮询服务器
为了解决http的上述问题,websocket应运而生(websocket也是使用IP/TCP的传输层协议),websocket的特点:
- websocket建立连接时,数据是通过http传输的,建立连接后就不需要http协议了
-
websocket建立连接后就是
全双工
模式,也是基于
tcp
协议 - 建立连接之后,不必在浏览器(客户端)发送request之后服务器才能发送信息到浏览器,这时候服务器有主动权,可以随时发消息给浏览器(客户端)
- 发送的信息中不必带有head部分信息了,相对于http来说,降低了服务器的压力,极大的减少了不必要的网络流量与延迟
http和websocket的区别与联系:
一. 联系
- 都是基于TCP协议
- websocket是基于http的他们的兼容性都很好
- 在连接的建立过程中对错误的处理方式相同
- 都使用 Request/Response模型进行连接的建立
- 都可以在网络中传输数据
二. 区别
- websocket是持久连接,http 是短连接(http可以通过Ajax一直发送请求和长轮询保持一段时间内的连接,但本质上还是短连接);
- websocket的协议是以 ws/wss 开头,http 对应的是 http/https;
- websocket是有状态的双向连接,http 是无状态的单向连接;
- websocket连接建立之后,数据的传输使用帧来传递,不再需要Request消息;
- websocket是可以跨域的。
websocket其实以及弥补了http很多的不足,那么为什么还会诞生MQTT呢?个人看来,websocket的诞生更多是为了解决http不能解决的问题,是http的互补,而MQTT更像是专门为了工业互联网的应用场景诞生的,下面我们学习下MQTT的特点,大家可以更好地体会一下.
2.2 MQTT协议
2.2.1 设计规范
根据物联网特殊的使用环境,MQTT遵循一下设计规范
- (1)精简,不添加可有可无的功能;
- (2)发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递;
- (3)允许用户动态创建主题,零运维成本;
- (4)把传输量降到最低以提高传输效率;
- (5)把低带宽、高延迟、不稳定的网络等因素考虑在内;
- (6)支持连续的会话控制;
- (7)理解客户端计算能力可能很低;
- (8)提供服务质量管理;
- (9)假设数据不可知,不强求传输数据的类型与格式,保持灵活性。
2.2.2 主要特性
-
使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
这一点很类似于XMPP,但是MQTT的信息冗余远小于XMPP,,因为XMPP使用XML格式文本来传递数据。
-
对负载内容屏蔽的消息传输
-
使用TCP/IP提供网络连接。
主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了
-
HTTP相比,MQTT协议确保了高传输保证。有3个级别的服务质量(QoS):
–
最多一次:
保证尽力交付。 当 QoS 为 0 时,消息的分发依赖于底层网络的能力。发布者只会发布一次消息,接收者不会应答消息,发布者也不会储存和重发消息。消息在这个等级下具有最高的传输效率,但可能送达一次也可能根本没送达。–
至少一次:
保证消息至少传送一次。但是消息也可以不止一次传递。当 QoS 为 1 时,可以保证消息至少送达一次。MQTT 通过简单的 ACK 机制来保证 QoS 1。- 发送者:发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内没有收到 PUBACK 的应答,发布者会将消息的 DUP 置为1 并重发消息。
- 接受者:接收到 QoS 为 1 的消息时应该回应 PUBACK 报文,可能因为网络延迟等原因没有及时发出,这时接收者可能会多次接受同一个消息,无论 DUP标志如何,接收者都会将收到的消息当作一个新的消息并发送 PUBACK 报文应答。
核心
:就是发送消息的时候,接受者需要确认一次,规定时间内没有确认就会重新发。如果使用这种方式,写业务的时候需要保证
幂等性
。–
恰好一次:
保证每个消息只被对方接收一次.当 QoS 为 2 时,发布者和订阅者通过两次会话来保证消息只被传递一次,这是最高等级的服务质量,消息丢失和重复都是不可接受的。使用这个服务质量等级会有额外的开销。- 发送者:发布 QoS 为 2 的消息之后,消息储存起来并等待接收者回复 PUBREC 的消息。
- 接受者:收到一条 QoS 为 2 的消息时,他会处理此消息并返回一条 PUBREC 进行应答。
- 发送者:收到 PUBREC 消息后,丢弃掉之前的发布消息。保存 PUBREC 消息,并应答一个 PUBREL。等待接收者回复 PUBCOMP 消息
- 接受者:当接收者收到 PUBREL 消息之后,它会丢弃掉所有已保存的状态,并回复 PUBCOMP。
- 发送者:当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。
核心
:发送消息的时候,接受者需要确认两次,来保证消息确实已经送到。无论在传输过程中何时出现丢包,发送端都负责重发上一条消息。不管发送端是 Publisher(发送端) 还是 Broker(服务器),都是如此。因此,接收端也需要对每一条命令消息都进行应答。
-
MQTT还为用户提供Last will&Testament和Retained消息的选项。第一个意味着在客户端意外断开连接的情况下,所有订阅的客户端都将从代理获得消息。保留消息意味着新订阅的客户端将立即获得状态更新
-
低协议开销,MQTT 的独特之处在于,它的每消息标题可以短至 2 个字节。MQ 和 HTTP 都拥有高得多的每消息开销。对于 HTTP,为每个新请求消息重新建立 HTTP 连接会导致重大的开销。MQ 和 MQTT 所使用的永久连接显著减少了这一开销。
-
对不稳定网络的容忍,MQTT 和 MQ 能够从断开等故障中恢复,而且没有进一步的代码需求。但是,HTTP 无法原生地实现此目的,需要客户端重试编码,这可能增加幂等性问题
-
保活心跳(Keep Alive), MQTT 客户端向服务器发起 CONNECT 请求时,通过 Keep Alive 参数设置保活周期。 客户端在无报文发送时,按 Keep Alive 周期定时发送 2 字节的 PINGREQ 心跳报文,服务端收到 PINGREQ 报文后,回复 2 字节的 PINGRESP 报文。 服务端在 1.5 个心跳周期内,既没有收到客户端发布订阅报文,也没有收到 PINGREQ 心跳报文时,将断开客户端连接。
-
保留消息(Retained Message), MQTT 客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息会驻留在消息服务器,后来的订阅者订阅主题时可以接收到最新一条
(注意,是只有最近的一条)
保留消息。 -
遗嘱消息(Will Message)
-
低功耗,MQTT 是专门针对低功耗目标而设计的。HTTP 的设计没有考虑此因素,因此增加了功耗
-
MQTT 客户端都已在大量平台上实现。(http同样适用大部分平台)
这些特性很多都是http所不具备的,MQTT 可以很好地解决物联网环境 1.网络代价昂贵,带宽低、不可靠;2. 在嵌入设备中运行,处理器和内存资源有限等问题, 根据3G网络的测量结果,MQTT的吞吐量比HTTP快93倍。 因此,在物联网环境中mqtt有着无可比拟的优势
2.2.3 总结
3.使用说明
3.1docker部署EMQX
EMQX则是实现mqtt消息代理分发的一个消息中间件。 部署很简单,分两步走:
-
获取 Docker 镜像
docker pull emqx/emqx:5.0.8
-
启动 Docker 容器
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.8
各个服务端口说明:各个服务端口说明:
- 1883:MQTT 协议端口
-
8883:MQTT/
SSL
端口 - 8083:MQTT/WebSocket 端口
-
8080:HTTP
API
端口 - 8084 WSS端口
- 18083:Dashboard 管理控制台端口
EMQX 提供了 Dashboard 以方便用户管理设备与监控相关指标。控制台地址为:http://localhost:18083,默认用户名密码为:admin/public,可以在 etc/plugins/emqx_dashboard.conf 配置文件中修改默认密码。(国人开发的软件用户体验是真好)
3.2 SpringBoot整合mqtt&emqx
3.2.1 创建SpringBoot项目,添加pom引入jar包
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
3.2.2 配置application.yml文件的Emqx参数
mqtt:
#MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
host: tcp://127.0.0.1:11883
#MQTT-连接服务器默认客户端ID
clientid: mqtt_id
#MQTT-用户名
username: admin
#MQTT-密码
password: admin
#MQTT-默认的消息推送主题,实际可在调用接口时指定
topic: test
#连接超时
timeout: 1000
#设置会话心跳时间
keepalive: 100
3.2.3 读取配置参数
package com.jscoe.mqtt;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Component
@Configuration
@Data
@ConfigurationProperties("mqtt")
public class MqttConfiguration {
@Autowired
private MqttCustomerClient mqttCustomerClient;
private String host;
private String clientid;
private String username;
private String password;
private String topic;
private int timeout;
private int keepalive;
@Bean
public MqttCustomerClient getMqttCustomerClient() {
mqttCustomerClient.connect(host, clientid, username, password, timeout,keepalive);
// 以/#结尾表示订阅所有以test开头的主题
mqttCustomerClient.subscribe("test/#");
return mqttCustomerClient;
}
}
3.2.4 为Mqtt客户端实例提供回调函数
package com.jscoe.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
/**
* @author rongyu
* @description 消费监听
* @date 2022/9/29
*/
@Component
public class PushCallback implements MqttCallback {
private static MqttClient mqttClient;
/**
* 在断开连接时调用
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
if (mqttClient == null || !mqttClient.isConnected()) {
System.out.println("连接断开,正在重连....");
}
}
/**
* 消息到达后,调用
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
/**
* 消息发送成功后,调用
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
3.2.5 封装工具类
package com.jscoe.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author rongyu
* @description 消费监听
* @date 2022/9/29
*/
@Slf4j
@Component
public class MqttCustomerClient {
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
public static MqttClient getClient(){
return client;
}
public static void setClient(MqttClient client){
MqttCustomerClient.client=client;
}
/**
* 客户端连接
*
* @param host ip+端口
* @param clientID 客户端Id
* @param username 用户名
* @param password 密码
* @param timeout 超时时间
* @param keeplive 保留数
*/
public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){
MqttClient client;
try {
client=new MqttClient(host,clientID,new MemoryPersistence());
MqttConnectOptions options=new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keeplive);
MqttCustomerClient.setClient(client);
try {
client.setCallback(pushCallback);
client.connect(options);
}catch (Exception e){
e.printStackTrace();
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 发布,默认qos为0,非持久化
* @param topic
* @param pushMessage
*/
public void pushlish(String topic,String pushMessage){
pushlish(0,false,topic,pushMessage);
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void pushlish(int qos,boolean retained,String topic,String pushMessage){
MqttMessage message=new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mqttTopic= MqttCustomerClient.getClient().getTopic(topic);
if(null== mqttTopic){
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
token=mqttTopic.publish(message);
token.waitForCompletion();
}catch (MqttPersistenceException e){
e.printStackTrace();
}catch (MqttException e){
e.printStackTrace();
}
}
/**
* 订阅某个主题,qos默认为0
* @param topic
*/
public void subscribe(String topic){
log.error("开始订阅主题" + topic);
subscribe(topic,0);
}
public void subscribe(String topic,int qos){
try {
MqttCustomerClient.getClient().subscribe(topic,qos);
}catch (MqttException e){
e.printStackTrace();
}
}
}
3.2.6 测试
package com.jscoe.core.modules.mqtt;
import com.jscoe.mqtt.MqttCustomerClient;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class MqttTest {
@Autowired
private MqttCustomerClient mqttCustomerClient;
@Test
public void pushlish() {
mqttCustomerClient.pushlish("test/device1", "hello mqtt............");
// for (int i = 0; i < 10; i++) {
// mqttCustomerClient.pushlish("test/device1", "hello mqtt............" + i);
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
}
}
结果
[2022-09-29 17:14:32.572] [ithere-api] [local] [trade-id] ERROR 38980 --- [ main] com.jscoe.mqtt.MqttCustomerClient : 开始订阅主题test/#
deliveryComplete---------true
接收消息主题 : test/device1
接收消息Qos : 0
接收消息内容 : hello mqtt............
同时emqx的图形化界面也会同步显示内容,图片展示不方便,就不放了
参考文献
菜鸟教程-MQTT 入门介绍:https://www.runoob.com/w3cnote/mqtt-intro.html
CSDN博主「向上的小强」的原创文章 原文链接:https://blog.csdn.net/weixin_43647723/article/details/116605960