目录
1.安装emqx
2.导入pom依赖
<!--mqtt依赖 start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--mqtt依赖 end-->
3.yml配置
mqtt:
host: tcp://127.0.0.1:1883
username: admin
password: admin123
emqx默认端口是1883,账号:admin 密码:public
4.emqx配置类+服务端
@Configuration
// MQTT配置类
public class MqttConfig {
// exmq服务器地址
@Value("${mqtt.host}")
private String host;
// 定义客户端ID,使用"DC"加上一个随机生成的数字
private final String clientId = "DC" + new Random().nextInt(100000000);
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
// 定义连接超时时间,默认为10秒,如果未在属性中指定,则使用默认值
@Value("${mqtt.connection.timeout:10}")
private int connectionTimeout;
private static MqttClient mqttClient;
/*
* MQTT连接参数设置
*/
private MqttConnectOptions mqttConnectOptions(String userName, String passWord, String host) throws MqttException {
mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(connectionTimeout); // 设置连接超时时间
options.setAutomaticReconnect(true); // 开启自动重连
options.setCleanSession(false); // 设置为false,表示不清除会话session
// 可以根据需要设置其他参数,例如 options.setKeepAliveInterval(20); 设置心跳间隔时间,默认为60秒
return options;
}
// 创建一个MqttClient的Bean实例,用于连接MQTT代理
@Bean
public MqttClient mqttClient() throws MqttException {
MqttConnectOptions options = mqttConnectOptions(username, password, host);
try {
mqttClient.connect(options);
} catch (MqttException e) {
System.out.println("连接失败:" + e.getMessage());
}
return mqttClient;
}
// 发布消息
public void publish(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos); // 设置消息质量
mqttMessage.setRetained(true); // 设置保留消息
mqttMessage.setPayload(msg.getBytes()); // 设置消息内容
try {
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 发布消息
token.waitForCompletion(); // 等待发布完成
System.out.println("发送消息:Topic=" + topic + ", Message=" + msg.getBytes());
} catch (MqttException e) {
MqttConnectOptions options = mqttConnectOptions(username, password, host);
reconnect(mqttClient, options, topic, mqttMessage); // 递归重新连接并重试发送消息
}
}
// 重新连接并重试发送消息
private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic, MqttMessage mqttMessage) throws MqttException {
try {
// 等待一段时间,可以根据需要调整等待时间
Thread.sleep(5000);
// 重新连接 MqttClient
mqttClient.connect(mqttConnectOptions);
// 判断是否连接成功
if (mqttClient.isConnected()) {
System.out.println("发送方MQTT 客户端已成功连接到 MQTT 代理。");
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 重新发布消息
token.waitForCompletion();
}
} catch (MqttException | InterruptedException e) {
System.out.println("发送方重新连接失败:" + e.getMessage());
reconnect(mqttClient, mqttConnectOptions, topic, mqttMessage); // 重连失败,继续重试
}
}
}
1.导入相关的包,包括MqttClient和MqttException以及其他必要的类。
2.使用@Configuration注解将这个类声明为一个配置类,使得Spring Boot能够识别它。
3.使用@Value注解注入配置文件中的MQTT相关配置信息,包括服务器地址host、用户名username和密码等。password
4.定义一个静态的MqttClient对象mqttClient,用于管理MQTT客户端连接。
5.定义了一个私有方法mqttConnectOptions,用于设置MQTT连接参数,包括用户名、密码、连接超时时间、自动重连等。
6.在@Bean注解的方法mqttClient()中创建一个MqttClient的实例,并连接到MQTT代理。如果连接失败,会打印连接失败的信息。
7.定义了一个publish方法,用于发布消息。该方法接收三个参数:topic表示要发布的主题,msg表示要发布的消息内容,qos表示消息的质量。如果发布消息失败,会调用reconnect方法进行重新连接并重试发送消息。
8.reconnect方法用于处理连接丢失后的重新连接逻辑。它会等待一段时间(这里是5000毫秒),然后尝试重新连接MqttClient。如果连接成功,就会重新发布之前的消息。
需要注意的是,这里的代码实现了一个消息的发布功能,没有涉及到订阅功能。如果需要实现消息的订阅功能,可以通过在mqttClient()方法中设置消息回调来处理接收到的消息,类似之前的示例代码。同时,也需要添加订阅主题的逻辑
5.客户端
public static void main(String[] args) {
String host = "tcp://localhost:1883";
String clientId = "Client_B";
String topic = "li";
try {
MqttClient mqttClient = new MqttClient(host, clientId);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("admin123".toCharArray());
// 连接到 EMQ X Broker
mqttClient.connect(mqttConnectOptions);
// 设置消息回调
mqttClient.setCallback(new MqttCallback() {
@SneakyThrows
@Override
public void connectionLost(Throwable cause) {
// 处理连接丢失的情况
System.out.println("连接丢失,尝试重新连接...");
reconnect(mqttClient, mqttConnectOptions, topic);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 处理接收到的消息
String payload = new String(message.getPayload());
System.out.println("收到消息:Topic=" + topic + ", Message=" + payload);
}
@SneakyThrows
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 处理消息发送完成的情况
System.out.println("消息发送完成: " + token.getMessage().getPayload());
}
});
// 订阅主题
mqttClient.subscribe(topic);
// 保持连接,防止程序退出
// 这里可以根据需要,设置一个条件或者等待一段时间,确保程序能够保持连接
while (true) {
Thread.sleep(1000);
}
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}
private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic) throws MqttException {
try {
// 等待一段时间,可以根据需要调整等待时间
Thread.sleep(5000);
// 重新连接 MqttClient
mqttClient.connect(mqttConnectOptions);
// 判断是否连接成功
if (mqttClient.isConnected()) {
System.out.println("MQTT 客户端已成功连接到 MQTT 代理。");
// 重新订阅主题
mqttClient.subscribe(topic);
}
} catch (MqttException | InterruptedException e) {
System.out.println("重新连接失败:" + e.getMessage());
reconnect(mqttClient, mqttConnectOptions, topic);
}
}
1.导入MqttClient相关的包,以及SneakyThrows注解。
2.定义MQTT Broker服务器地址 host、客户端ID clientId、订阅主题 topic。
3..在main方法中,创建MqttClient实例,并设置连接参数,包括用户名和密码。然后使用mqttClient.connect(mqttConnectOptions)连接到MQTT Broker。
4.设置消息回调,通过实现MqttCallback接口,处理连接丢失、接收到消息和消息发送完成的情况。
5.使用mqttClient.subscribe(topic)订阅指定主题。
6.使用一个无限循环保持连接,防止程序退出。可以根据需要设置一个条件或者等待一段时间,确保程序保持连接。
7.当连接丢失时,MqttCallback中的connectionLost方法会被触发,执行reconnect方法进行重新连接,并重新订阅主题。
8.reconnect方法是用来处理连接丢失后的重新连接逻辑。它会等待一段时间(这里是5000毫秒),然后尝试重新连接MqttClient。如果连接成功,就会重新订阅之前的主题。
需要注意的是,MqttClient是一个阻塞的客户端,它会一直运行在主线程中,除非程序被显式终止,否则不会自动退出。因此,这段代码将保持连接状态并持续接收处理来自MQTT Broker的消息。
最后写个发送的接口调用发送消息:
@RestController
public class MessageControlle {
@Autowired
private MqttConfig mqttConfig;
@PostMapping("/publish")
public String publishMessage(@RequestBody String message) {
try {
mqttConfig.publish("li", message,1);
return "发送成功.";
} catch (MqttException e) {
return "发送失败: " + e.getMessage();
}
}
}
让我们来看看效果: