物联网IOT与Java结合之MQTT协议:EMQT

  • Post author:
  • Post category:java

准备:下载EMQX的windows版本

https://download.csdn.net/download/qq_39246466/87201696

1.pom文件:引入mqtt

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>io.github.pnoker</groupId>
        <artifactId>dc3-driver</artifactId>
        <version>2022.1.0</version>
    </parent>

    <artifactId>dc3-driver-mqtt</artifactId>
    <packaging>jar</packaging>

    <description>IOT DC3 平台 Mqtt 驱动。</description>

    <dependencies>

        <!-- Spring Integration Stream -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>

        <!-- Spring Integration Mqtt -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        </dependency>

    </dependencies>

</project>

2.配置文件:主要节点driver.mqtt

driver:
  name: EdgeGateway
  type: gateway
  project: @project.artifactId@
  description: @project.description@
  schedule:
    status:
      enable: true
      corn: '0/10 * * * * ?'
    read:
      enable: false
      corn: '0/30 * * * * ?'
    custom:
      enable: true
      corn:  '0/5 * * * * ?'
  point-attribute:
    - displayName: 指令Topic
      name: commandTopic
      type: string
      value: commandTopic
      description: 测点/设备接收下行指令的Mqtt主题
      option:
        type: input\select\checkox\switch\time...
        required: true
        data-type: static/url
        data: jsonString
    - displayName: 指令Qos
      name: commandQos
      type: int
      value: 2
      description: 测点/设备接收下行指令的Mqtt主题的Qos
      option:
        type: input\select\checkox\switch\time...
        required: true
        data-type: static/url
        data: jsonString
  mqtt:
    url: ssl://dc3-emqx:8883
    auth-type: X509
    username: dc3
    password: dc3
    ca-crt: classpath:/certs/ca.crt
    client-key-pass: dc3-client
    client-key: classpath:/certs/client.key
    client-crt: classpath:/certs/client.crt
    client: ${spring.application.name}
    receive-topics:
      - qos: 0
        name: mqtt/group/device/#
    default-send-topic:
      qos: 1
      name: dc3-mqtt-topic
    keep-alive: 15
    completion-timeout: 3000

3.config引入配置:

        MqttProperties配置类


package io.github.pnoker.common.sdk.bean.mqtt;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.*;
import java.util.List;

/**
 * @author pnoker
 */
@Data
@Validated
@NoArgsConstructor
@AllArgsConstructor
@ConfigurationProperties(prefix = "driver.mqtt")
public class MqttProperties {
    @NotBlank(message = "url can't be empty,ssl://host:port")
    private String url;

    @NotNull(message = "auth type can't be empty")
    private AuthTypeEnum authType = AuthTypeEnum.NONE;

    private String username;
    private String password;

    private String caCrt = "classpath:/certs/ca.crt";
    private String clientKeyPass = "dc3-client";
    private String clientKey = "classpath:/certs/client.key";
    private String clientCrt = "classpath:/certs/client.crt";

    @NotBlank(message = "client name can't be empty")
    private String client;

    @NotNull(message = "default topic can't be empty")
    private Topic defaultSendTopic = new Topic("dc3/d/v/dc3-driver-mqtt_default", 2);

    @Size(min = 1, message = "receive topic at least one topic")
    private List<Topic> receiveTopics;

    @NotNull(message = "keep alive interval can't be empty")
    private Integer keepAlive = 15;

    @NotNull(message = "completion timeout can't be empty")
    private Integer completionTimeout = 3000;


    /**
     * Mqtt 权限认证类型枚举
     */
    @NoArgsConstructor
    public enum AuthTypeEnum {
        NONE, CLIENT_ID, USERNAME, X509
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Topic {
        @NotBlank(message = "topic name can't be empty")
        private String name;

        @Min(0)
        @Max(2)
        private Integer qos;
    }

}


package io.github.pnoker.driver.config;

import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import io.github.pnoker.common.constant.CommonConstant;
import io.github.pnoker.common.sdk.bean.mqtt.MqttProperties;
import io.github.pnoker.common.sdk.utils.X509Util;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;
import java.util.ArrayList;


/**
 * @author pnoker
 */
@Slf4j
@Configuration
@IntegrationComponentScan
@EnableConfigurationProperties({MqttProperties.class})
public class MqttConfig {

    private static final String RANDOM_ID = CommonConstant.Symbol.UNDERSCORE + RandomUtil.randomString(8);

    @Resource
    private MqttProperties mqttProperties;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer mqttInbound() {
        // set default receive topic
        String topicName = "dc3/mc/" + mqttProperties.getClient();
        if (null == mqttProperties.getReceiveTopics()) {
            mqttProperties.setReceiveTopics(new ArrayList<>());
        }
        boolean match = mqttProperties.getReceiveTopics().stream().anyMatch(topic -> topic.getName().equals(topicName));
        if (!match) {
            mqttProperties.getReceiveTopics().add(new MqttProperties.Topic(topicName, 2));
        }

        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                mqttProperties.getClient() + RANDOM_ID + "_in",
                mqttClientFactory(),
                mqttProperties.getReceiveTopics().stream().map(MqttProperties.Topic::getName).toArray(String[]::new));
        adapter.setQos(mqttProperties.getReceiveTopics().stream().mapToInt(MqttProperties.Topic::getQos).toArray());
        adapter.setOutputChannel(mqttInputChannel());
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler outbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                mqttProperties.getClient() + "_out",
                mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(mqttProperties.getDefaultSendTopic().getQos());
        messageHandler.setDefaultTopic(mqttProperties.getDefaultSendTopic().getName());
        return messageHandler;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

        // username & password
        if (mqttProperties.getAuthType().equals(MqttProperties.AuthTypeEnum.USERNAME)) {
            mqttConnectOptions.setUserName(mqttProperties.getUsername());
            mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
        }

        // tls x509
        if (mqttProperties.getAuthType().equals(MqttProperties.AuthTypeEnum.X509)) {
            mqttConnectOptions.setSocketFactory(X509Util.getSSLSocketFactory(
                    mqttProperties.getCaCrt(),
                    mqttProperties.getClientCrt(),
                    mqttProperties.getClientKey(),
                    StrUtil.isBlank(mqttProperties.getClientKeyPass()) ? "" : mqttProperties.getClientKeyPass()
            ));
            if (!StrUtil.isBlank(mqttProperties.getUsername()) && !StrUtil.isBlank(mqttProperties.getPassword())) {
                mqttConnectOptions.setUserName(mqttProperties.getUsername());
                mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
            }
        }

        // disable https hostname verification
        mqttConnectOptions.setHttpsHostnameVerificationEnabled(false);
        mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()});
        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
        return mqttConnectOptions;

    }

}

4.通过mqtt发送数据:@MessagingGateway(defaultRequestChannel = “mqttOutputChannel”) 将定义的config中方法引入即可



package io.github.pnoker.driver.mqtt.handler;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

/**
 * @author pnoker
 */
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttSendHandler {
    /**
     * 使用 Default Topic & Default Qos 发送数据
     *
     * @param data string
     */
    void sendToMqtt(String data);

    /**
     * 使用 Default Topic & 自定义 Qos 发送数据
     *
     * @param qos  自定义 Qos
     * @param data string
     */
    void sendToMqtt(@Header(MqttHeaders.QOS) Integer qos, String data);

    /**
     * 使用 自定义 Topic & Default Qos 发送数据
     *
     * @param topic 自定义 Topic
     * @param data  string
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);

    /**
     * 使用 自定义 Topic & 自定义 Qos 发送数据
     *
     * @param topic 自定义 Topic
     * @param qos   自定义 Qos
     * @param data  string
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data);
}

5.通过mqtt接收数据:@Resource
    private MqttReceiveService mqttReceiveService;注入service代码 @ServiceActivator(inputChannel = “mqttInputChannel”)引入到方法上表示喝config中定义方法联通,统一处理接收的数据



package io.github.pnoker.driver.mqtt.handler;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import io.github.pnoker.common.sdk.bean.mqtt.MessageHeader;
import io.github.pnoker.common.sdk.bean.mqtt.MessagePayload;
import io.github.pnoker.common.sdk.bean.mqtt.MessageType;
import io.github.pnoker.common.sdk.bean.mqtt.MqttMessage;
import io.github.pnoker.common.utils.JsonUtil;
import io.github.pnoker.driver.mqtt.job.MqttScheduleJob;
import io.github.pnoker.driver.mqtt.service.MqttReceiveService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author pnoker
 */
@Slf4j
@Configuration
public class MqttReceiveHandler {

    @Value("${driver.mqtt.batch.speed}")
    private Integer batchSpeed;

    @Resource
    private MqttReceiveService mqttReceiveService;
    @Resource
    private ThreadPoolExecutor threadPoolExecutor;

    /**
     * 此处用于接收 MQTT 发送过来的数据,订阅的主题为 application.yml 中 mqtt.receive-topics 配置的 Topic 列表
     * +(加号):可以(只能)匹配一个单词
     * #(井号):可以匹配多个单词(或者零个)
     *
     * @return MessageHandler
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handlerValue() {
        return message -> {
            MessagePayload messagePayload = JsonUtil.parseObject(message.getPayload().toString(), MessagePayload.class);

            // 处理空字段
            // 当类型为空时,使用默认类型
            // 当消息载荷为空时,使用其 String 内容
            if (ObjectUtil.isNull(messagePayload)) {
                messagePayload = new MessagePayload(message.getPayload(), MessageType.DEFAULT);
            } else {
                if (StrUtil.isEmpty(messagePayload.getPayload())) messagePayload.setPayload(message.getPayload().toString());
                if (ObjectUtil.isNull(messagePayload.getMessageType())) messagePayload.setMessageType(MessageType.DEFAULT);
            }

            MessageHeader messageHeader = new MessageHeader(message.getHeaders());
            MqttMessage mqttMessage = new MqttMessage(messageHeader, messagePayload);

            // Judge whether to process data in batch according to the data transmission speed
            if (MqttScheduleJob.messageSpeed.get() < batchSpeed) {
                threadPoolExecutor.execute(() -> {
                    // Receive single mqtt message
                    mqttReceiveService.receiveValue(mqttMessage);
                });
            } else {
                // Save point value to schedule
                MqttScheduleJob.messageLock.writeLock().lock();
                MqttScheduleJob.mqttMessages.add(mqttMessage);
                MqttScheduleJob.messageLock.writeLock().unlock();
            }

        };
    }
}

6.使用发送数据在service的实现层用法:引入直接使用

 7.使用接收数据在service的实现层用法:引入直接使用,



package io.github.pnoker.driver.mqtt.service.impl;

import io.github.pnoker.common.sdk.bean.mqtt.MqttMessage;
import io.github.pnoker.driver.mqtt.service.MqttReceiveService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @author pnoker
 */
@Slf4j
@Service
public class MqttReceiveServiceImpl implements MqttReceiveService {

    @Override
    public void receiveValue(MqttMessage mqttMessage) {
    }

    @Override
    public void receiveValues(List<MqttMessage> mqttMessageList) {

    }
}

8.使用的消息体:



package io.github.pnoker.common.sdk.bean.mqtt;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

import java.io.Serializable;

/**
 * @author pnoker
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class MqttMessage implements Serializable {
    private MessageHeader messageHeader;
    private MessagePayload messagePayload;
}

9.其他辅助类:



package io.github.pnoker.common.sdk.bean.mqtt;

import lombok.NoArgsConstructor;

/**
 * @author pnoker
 */
@NoArgsConstructor
public enum MessageType {
    OPC_UA,
    OPC_DA,
    MODBUS,
    PLC,
    SERIAL,
    SOCKET,
    HEARTBEAT,
    DEFAULT
}


package io.github.pnoker.common.sdk.bean.mqtt;

import io.github.pnoker.common.utils.JsonUtil;
import lombok.Data;
import lombok.experimental.Accessors;

/**
 * @author pnoker
 */
@Data
@Accessors(chain = true)
public class MessagePayload {
    private String payload;
    private MessageType messageType;

    public MessagePayload() {
        this.messageType = MessageType.DEFAULT;
    }

    public MessagePayload(Object payload, MessageType messageType) {
        this.payload = JsonUtil.toJsonString(payload);
        this.messageType = messageType;
    }
}


package io.github.pnoker.common.sdk.bean.mqtt;

import cn.hutool.core.util.ObjectUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.springframework.messaging.MessageHeaders;

import java.io.Serializable;
import java.util.UUID;

/**
 * @author pnoker
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class MessageHeader implements Serializable {

    private String id;
    private Integer mqttId;
    private Integer mqttReceivedQos;
    private String mqttReceivedTopic;
    private Boolean mqttDuplicate;
    private Boolean mqttReceivedRetained;
    private Long timestamp;

    public MessageHeader(MessageHeaders messageHeaders) {
        if (ObjectUtil.isNotNull(messageHeaders)) {
            try {
                UUID id = messageHeaders.get("id", UUID.class);
                if (ObjectUtil.isNotNull(id)) {
                    this.id = id.toString();
                }
            } catch (Exception ignored) {
            }
            try {
                this.mqttId = messageHeaders.get("mqtt_id", Integer.class);
            } catch (Exception ignored) {
            }
            try {
                this.mqttReceivedQos = messageHeaders.get("mqtt_receivedQos", Integer.class);
            } catch (Exception ignored) {
            }
            try {
                this.mqttReceivedTopic = messageHeaders.get("mqtt_receivedTopic", String.class);
            } catch (Exception ignored) {
            }
            try {
                this.mqttDuplicate = messageHeaders.get("mqtt_duplicate", Boolean.class);
            } catch (Exception ignored) {
            }
            try {
                this.mqttReceivedRetained = messageHeaders.get("mqtt_receivedRetained", Boolean.class);
            } catch (Exception ignored) {
            }
            try {
                this.timestamp = messageHeaders.get("timestamp", Long.class);
            } catch (Exception ignored) {
            }
        }
    }
}


版权声明:本文为qq_39246466原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。