springboot整合mqtt(emqx)

  • Post author:
  • Post category:其他



目录


1.安装emqx


2.导入pom依赖


3.yml配置


4.emqx配置类+服务端


5.客户端



1.安装emqx

可以参考:



(278条消息) WINDOWS下搭建MQTT服务EMQX_emqx windows_李夕的博客-CSDN博客


icon-default.png?t=N6B9
https://blog.csdn.net/qq_19294353/article/details/123290346?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522169033795716800213030622%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=169033795716800213030622&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_click~default-1-123290346-null-null.142^v91^insert_down28v1,239^v3^control&utm_term=windows%E5%AE%89%E8%A3%85emqx&spm=1018.2226.3001.4187



为什么使用emqx:




(279条消息) EMQX以及用MQTT的测试和性能测试_emqx和rabbitmq性能_yiyang1208的博客-CSDN博客


icon-default.png?t=N6B9
https://blog.csdn.net/yiyang1208/article/details/128283367?ops_request_misc=&request_id=&biz_id=102&utm_term=emqx%E7%9A%84%E5%A5%BD%E5%A4%84&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-1-128283367.142^v91^insert_down28v1,239^v3^control&spm=1018.2226.3001.4187



2.导入pom依赖

      <!--mqtt依赖 start-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
      <!--mqtt依赖 end-->


3.yml配置

mqtt:
   host: tcp://127.0.0.1:1883
   username: admin
   password: admin123

emqx默认端口是1883,账号:admin  密码:public


4.emqx配置类+服务端

@Configuration
// MQTT配置类
public class MqttConfig {

    // exmq服务器地址
    @Value("${mqtt.host}")
    private String host;

    // 定义客户端ID,使用"DC"加上一个随机生成的数字
    private final String clientId = "DC" + new Random().nextInt(100000000);

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    // 定义连接超时时间,默认为10秒,如果未在属性中指定,则使用默认值
    @Value("${mqtt.connection.timeout:10}")
    private int connectionTimeout;

    private static MqttClient mqttClient;

    /*
     * MQTT连接参数设置
     */
    private MqttConnectOptions mqttConnectOptions(String userName, String passWord, String host) throws MqttException {
        mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        options.setConnectionTimeout(connectionTimeout); // 设置连接超时时间
        options.setAutomaticReconnect(true); // 开启自动重连
        options.setCleanSession(false); // 设置为false,表示不清除会话session
        // 可以根据需要设置其他参数,例如 options.setKeepAliveInterval(20); 设置心跳间隔时间,默认为60秒
        return options;
    }

    // 创建一个MqttClient的Bean实例,用于连接MQTT代理
    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttConnectOptions options = mqttConnectOptions(username, password, host);
        try {
            mqttClient.connect(options);
        } catch (MqttException e) {
            System.out.println("连接失败:" + e.getMessage());
        }
        return mqttClient;
    }

    // 发布消息
    public void publish(String topic, String msg, int qos) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos); // 设置消息质量
        mqttMessage.setRetained(true); // 设置保留消息
        mqttMessage.setPayload(msg.getBytes()); // 设置消息内容
        try {
            MqttTopic mqttTopic = mqttClient.getTopic(topic);
            MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 发布消息
            token.waitForCompletion(); // 等待发布完成
            System.out.println("发送消息:Topic=" + topic + ", Message=" + msg.getBytes());
        } catch (MqttException e) {
            MqttConnectOptions options = mqttConnectOptions(username, password, host);
            reconnect(mqttClient, options, topic, mqttMessage); // 递归重新连接并重试发送消息
        }
    }


    // 重新连接并重试发送消息
    private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic, MqttMessage mqttMessage) throws MqttException {
        try {
            // 等待一段时间,可以根据需要调整等待时间
            Thread.sleep(5000);
            // 重新连接 MqttClient
            mqttClient.connect(mqttConnectOptions);
            // 判断是否连接成功
            if (mqttClient.isConnected()) {
                System.out.println("发送方MQTT 客户端已成功连接到 MQTT 代理。");
                MqttTopic mqttTopic = mqttClient.getTopic(topic);
                MqttDeliveryToken token = mqttTopic.publish(mqttMessage); // 重新发布消息
                token.waitForCompletion();
            }
        } catch (MqttException | InterruptedException e) {
            System.out.println("发送方重新连接失败:" + e.getMessage());
            reconnect(mqttClient, mqttConnectOptions, topic, mqttMessage); // 重连失败,继续重试
        }
    }

}

1.导入相关的包,包括MqttClient和MqttException以及其他必要的类。

2.使用@Configuration注解将这个类声明为一个配置类,使得Spring Boot能够识别它。

3.使用@Value注解注入配置文件中的MQTT相关配置信息,包括服务器地址host、用户名username和密码等。password

4.定义一个静态的MqttClient对象mqttClient,用于管理MQTT客户端连接。

5.定义了一个私有方法mqttConnectOptions,用于设置MQTT连接参数,包括用户名、密码、连接超时时间、自动重连等。

6.在@Bean注解的方法mqttClient()中创建一个MqttClient的实例,并连接到MQTT代理。如果连接失败,会打印连接失败的信息。

7.定义了一个publish方法,用于发布消息。该方法接收三个参数:topic表示要发布的主题,msg表示要发布的消息内容,qos表示消息的质量。如果发布消息失败,会调用reconnect方法进行重新连接并重试发送消息。

8.reconnect方法用于处理连接丢失后的重新连接逻辑。它会等待一段时间(这里是5000毫秒),然后尝试重新连接MqttClient。如果连接成功,就会重新发布之前的消息。


需要注意的是,这里的代码实现了一个消息的发布功能,没有涉及到订阅功能。如果需要实现消息的订阅功能,可以通过在mqttClient()方法中设置消息回调来处理接收到的消息,类似之前的示例代码。同时,也需要添加订阅主题的逻辑


5.客户端

    public static void main(String[] args) {

        String host = "tcp://localhost:1883";
        String clientId = "Client_B";
        String topic = "li";
        try {
            MqttClient mqttClient = new MqttClient(host, clientId);
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setUserName("admin");
            mqttConnectOptions.setPassword("admin123".toCharArray());

            // 连接到 EMQ X Broker
            mqttClient.connect(mqttConnectOptions);

            // 设置消息回调
            mqttClient.setCallback(new MqttCallback() {
                @SneakyThrows
                @Override
                public void connectionLost(Throwable cause) {
                    // 处理连接丢失的情况
                    System.out.println("连接丢失,尝试重新连接...");
                    reconnect(mqttClient, mqttConnectOptions, topic);
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    // 处理接收到的消息
                    String payload = new String(message.getPayload());
                    System.out.println("收到消息:Topic=" + topic + ", Message=" + payload);
                }

                @SneakyThrows
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    // 处理消息发送完成的情况
                    System.out.println("消息发送完成: " + token.getMessage().getPayload());
                }
            });

            // 订阅主题
            mqttClient.subscribe(topic);

            // 保持连接,防止程序退出
            // 这里可以根据需要,设置一个条件或者等待一段时间,确保程序能够保持连接
            while (true) {
                Thread.sleep(1000);
            }
        } catch (MqttException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void reconnect(MqttClient mqttClient, MqttConnectOptions mqttConnectOptions, String topic) throws MqttException {
        try {
            // 等待一段时间,可以根据需要调整等待时间
            Thread.sleep(5000);
            // 重新连接 MqttClient
            mqttClient.connect(mqttConnectOptions);
            // 判断是否连接成功
            if (mqttClient.isConnected()) {
                System.out.println("MQTT 客户端已成功连接到 MQTT 代理。");
                // 重新订阅主题
                mqttClient.subscribe(topic);
            }
        } catch (MqttException | InterruptedException e) {
            System.out.println("重新连接失败:" + e.getMessage());
            reconnect(mqttClient, mqttConnectOptions, topic);
        }
    }

1.导入MqttClient相关的包,以及SneakyThrows注解。

2.定义MQTT Broker服务器地址 host、客户端ID clientId、订阅主题 topic。

3..在main方法中,创建MqttClient实例,并设置连接参数,包括用户名和密码。然后使用mqttClient.connect(mqttConnectOptions)连接到MQTT Broker。

4.设置消息回调,通过实现MqttCallback接口,处理连接丢失、接收到消息和消息发送完成的情况。

5.使用mqttClient.subscribe(topic)订阅指定主题。

6.使用一个无限循环保持连接,防止程序退出。可以根据需要设置一个条件或者等待一段时间,确保程序保持连接。

7.当连接丢失时,MqttCallback中的connectionLost方法会被触发,执行reconnect方法进行重新连接,并重新订阅主题。

8.reconnect方法是用来处理连接丢失后的重新连接逻辑。它会等待一段时间(这里是5000毫秒),然后尝试重新连接MqttClient。如果连接成功,就会重新订阅之前的主题。

需要注意的是,MqttClient是一个阻塞的客户端,它会一直运行在主线程中,除非程序被显式终止,否则不会自动退出。因此,这段代码将保持连接状态并持续接收处理来自MQTT Broker的消息。


最后写个发送的接口调用发送消息:

@RestController
public class MessageControlle {

    @Autowired
    private MqttConfig mqttConfig;


    @PostMapping("/publish")
    public String publishMessage(@RequestBody String message) {
        try {
            mqttConfig.publish("li", message,1);
            return "发送成功.";
        } catch (MqttException e) {
            return "发送失败: " + e.getMessage();
        }
    }
}


让我们来看看效果:



版权声明:本文为weixin_54063400原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。