最近在做MQTT对接,然后发送消息,然后参考网上的实战文章进行了一下整理。
文章主要参考自(https://www.codetd.com/article/13550340),然后自己做了些许更改。
1、整合准备
SpringBoot:2.2.2.RELEASE
MQTT平台:EMQX4.4.1(Docker运行)
虚拟机服务器:Centos7(192.168.56.102 )
发送端:cloud-mqtt-send8001
接收端:cloud-mqtt-accept8002
2、发送端:cloud-mqtt-send8002
导入POM依赖:
<!-- mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--配置文件报错问题-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
设置YML
server:
port: 8001
spring:
application:
name: mqtt-send
#mqtt属性配置
mqtt:
hostUrl: tcp://192.168.56.102:1883
username: admin
password: public
clientid: mqtt_send_client
cleanSession: true
reconnect: true
#连接超时
timeout: 1000
#设置会话心跳时间
keepalive: 100
defaultTopic: client:report:1
isOpen: true
qos: 1
新建配置类:MqttProperties.java
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址
*/
private String hostUrl;
/**
* 客户端Id,同一台服务器下,不允许出现重复的客户端id
*/
private String clientId;
/**
* 默认连接主题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
*/
private int keepAlive;
/**
* 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
*/
private Boolean cleanSession;
/**
* 是否断线重连
*/
private Boolean reconnect;
/**
* 启动的时候是否关闭mqtt
*/
private Boolean isOpen;
/**
* 连接方式
*/
private Integer qos;
}
添加MQTT发送客户端:MqttSendClient:
@Slf4j
@Component
public class MqttSendClient {
@Autowired
private MqttSendCallBack mqttSendCallBack;
@Autowired
private MqttProperties mqttProperties;
private static MqttClient mqttClient;
private static MqttClient getClient() {
return mqttClient;
}
private static void setClient(MqttClient client) {
MqttSendClient.mqttClient = client;
}
/**
* 客户端连接
* @return
*/
public void connect(){
MqttClient client = null;
try {
//String uuid = UUID.randomUUID().toString().replaceAll("-",""); //设置每一个客户端的id
client = new MqttClient(mqttProperties.getHostUrl(),mqttProperties.getClientId() , new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAlive());
options.setCleanSession(true);
options.setAutomaticReconnect(false);
MqttSendClient.setClient(client);
try {
// 设置回调
client.setCallback(mqttSendCallBack);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发布,默认qos为0,非持久化
*
* @param topic 主题名
* @param pushMessage 消息
*/
public void publish(String topic, String pushMessage) {
publish(0, false, topic, pushMessage);
}
/**
* 发布
*
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = MqttSendClient.getClient().getTopic(topic);
if (null == mTopic) {
log.error("主题不存在:{}",mTopic);
}
try {
mTopic.publish(message);
log.info("消息发送成功");
} catch (Exception e) {
log.error("mqtt发送消息异常:",e);
}
}
}
添加MQTT发送客户端回调类:MqttSendCallBack
@Slf4j
@Component
public class MqttSendCallBack implements MqttCallbackExtended {
/**
* 链接EMQ服务器后触发
* @param reconnect
* @param serverURI
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("————————————————-ClientID:{}——————————————"+"链接成功");
}
/**
* 客户端连接断开后触发
* 这里可以做重新链接操作
*/
@Override
public void connectionLost(Throwable cause) {
log.error("【MQTT-发送端】链接断开!");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("【MQTT-发送端】接收消息主题 : " + topic);
log.info("【MQTT-发送端】接收消息Qos : " + message.getQos());
log.info("【MQTT-发送端】接收消息内容 : " + new String(message.getPayload()));
}
/**
* 发送消息回调
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
String[] topics = token.getTopics();
if (topics!=null && topics.length>0){
for (String topic : topics) {
log.info("【MQTT-发送端】向主题:" + topic + "发送消息成功!");
}
}
try {
MqttMessage message = token.getMessage();
byte[] payload = message.getPayload();
String s = new String(payload, "UTF-8");
log.info("【MQTT-发送端】消息的内容是:" + s);
} catch (MqttException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
添加:MqttCondition
public class MqttCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
System.out.println("MqttCondition。。。。");
//1、能获取到ioc使用的beanfactory
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
//2、获取类加载器
ClassLoader classLoader = context.getClassLoader();
//3、获取当前环境信息
Environment environment = context.getEnvironment();
String isOpen = environment.getProperty("mqtt.isOpen");
return Boolean.valueOf(isOpen);
}
}
添加MQTT配置类:MqttConfig
@Configuration
public class MqttConfig {
@Autowired
private MqttSendClient mqttSendClient;
@Conditional(MqttCondition.class)
@Bean
public MqttSendClient getMqttSendClient(){
mqttSendClient.connect();
return mqttSendClient;
}
}
主启动类
@SpringBootApplication
public class MqttSendApplication {
public static void main(String[] args) {
SpringApplication.run(MqttSendApplication.class, args);
}
}
启动项目,链接MQTT服务器成功。
项目整体代码结构如下:
3、接收端:cloud-mqtt-accept8002
导入POM
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--MQTT客户端工具-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
设置YML
server:
port: 8002
spring:
application:
name: mqtt-accept
mqtt:
hostUrl: tcp://192.168.56.102:1883
username: admin
password: public
clientid: mqtt_accept_client
cleanSession: true
reconnect: true
#连接超时
timeout: 1000
#设置会话心跳时间
keepalive: 100
defaultTopic: client:report:1
isOpen: true
qos: 1
属性配置文件:MqttProperties.java
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 连接地址
*/
private String hostUrl;
/**
* 客户端Id,同一台服务器下,不允许出现重复的客户端id
*/
private String clientId;
/**
* 默认连接主题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端
* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制
*/
private int keepAlive;
/**
* 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连
* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接
*/
private Boolean cleanSession;
/**
* 是否断线重连
*/
private Boolean reconnect;
/**
* 启动的时候是否关闭mqtt
*/
private Boolean isOpen;
/**
* 连接方式
*/
private Integer qos;
}
添加MQTT接收客户端:MqttAcceptClient
@Slf4j
@Component
public class MqttAcceptClient {
@Autowired
private MqttAcceptCallback mqttAcceptCallback;
@Autowired
private MqttProperties mqttProperties;
private static MqttClient mqttClient;
public static MqttClient getMqttClient() {
return mqttClient;
}
public static void setMqttClient(MqttClient mqttClient) {
MqttAcceptClient.mqttClient = mqttClient;
}
/**
* 客户端连接
*/
public void connect() {
MqttClient client;
try {
client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAlive());
options.setAutomaticReconnect(mqttProperties.getReconnect());
options.setCleanSession(mqttProperties.getCleanSession());
MqttAcceptClient.setMqttClient(client);
try {
// 设置回调
client.setCallback(mqttAcceptCallback);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 重新连接
*/
public void reconnection() {
try {
mqttClient.connect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题
*
* @param topic 主题
* @param qos 连接方式
*/
public void subscribe(String topic, int qos) {
log.info("==============开始订阅主题==============" + topic);
try {
mqttClient.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 取消订阅某个主题
*
* @param topic
*/
public void unsubscribe(String topic) {
log.info("==============开始取消订阅主题==============" + topic);
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
添加mqtt接受服务的回调类:MqttAcceptCallback
@Slf4j
@Component
public class MqttAcceptCallback implements MqttCallbackExtended {
@Autowired
private MqttAcceptClient mqttAcceptClient;
/**
* 客户端断开后触发
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("【MQTT-消费端】连接断开,可以做重连");
if (MqttAcceptClient.getMqttClient() == null || !MqttAcceptClient.getMqttClient().isConnected()) {
log.info("【MQTT-消费端】emqx重新连接....................................................");
mqttAcceptClient.reconnection();
}
}
/**
* 客户端收到消息触发
*
* @param topic 主题
* @param mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.info("【MQTT-消费端】接收消息主题 : " + topic);
log.info("【MQTT-消费端】接收消息Qos : " + mqttMessage.getQos());
log.info("【MQTT-消费端】接收消息内容 : " + new String(mqttMessage.getPayload()));
// int i = 1/0;
}
/**
* 发布消息成功
*
* @param token token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
String[] topics = token.getTopics();
for (String topic : topics) {
log.info("【MQTT-消费端】向主题:" + topic + "发送消息成功!");
}
try {
MqttMessage message = token.getMessage();
byte[] payload = message.getPayload();
String s = new String(payload, "UTF-8");
log.info("【MQTT-消费端】消息的内容是:" + s);
} catch (MqttException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**
* 连接emq服务器后触发
*
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
System.out.println("s: " + s);
log.info("--------------------【MQTT-消费端】连接成功!--------------------");
// 以/#结尾表示订阅所有以test开头的主题
// 订阅所有机构主题
mqttAcceptClient.subscribe("test_queue", 0);
}
}
MqttCondition
public class MqttCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
System.out.println("MqttCondition。。。。");
//1、能获取到ioc使用的beanfactory
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
//2、获取类加载器
ClassLoader classLoader = context.getClassLoader();
//3、获取当前环境信息
Environment environment = context.getEnvironment();
String isOpen = environment.getProperty("mqtt.isOpen");
return Boolean.valueOf(isOpen);
}
}
MQTT启动配置类:MqttConfig.java
@Configuration
public class MqttConfig {
@Autowired
private MqttAcceptClient mqttAcceptClient;
@Conditional(MqttCondition.class)
@Bean
public MqttAcceptClient getMqttAcceptClient(){
mqttAcceptClient.connect();
//mqttAcceptClient.subscribe("test_queue",0);
return mqttAcceptClient;
}
}
主启动类
package com.xlh.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MqttAcceptApplication {
public static void main(String[] args) {
SpringApplication.run(MqttAcceptApplication.class,args);
}
}
启动项目,链接MQTT服务器,然后订阅主题:
项目整体结构如下:
4、项目测试
分别启动发送端和接收端以后,EMQX监控平台如下:
然后调用发送端测试接口:
http://localhost:8001/send
发送端cloud-mqtt-send8001运行结果如下:
接收端cloud-mqtt-accept8002运行结果如下:
可以看到接收端成功的接收到消息