【mqtt】Java实现mqtt的订阅与发布

  • Post author:
  • Post category:java

maven引用

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.1.0</version>
</dependency>

mqtt订阅

完整代码如下:

import com.cn.common.utils.IDGenerate;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * mqtt 订阅
 */
@Service
public class MqttSubscribe {
    public static final int MAX_TOPIC_NUM = 1;

    @Resource(name = "subCallback")
    private SubCallback subCallback;

    private String[] oldTopics;

    private String HOST = "tcp://127.0.0.1:1883";
    private String clientid = "iot_server_name";
    private String userName = "admin";    //非必须
    private String passWord = "public";  //非必须

    String[] TOPICS;

    private MqttClient client;
    private MqttConnectOptions options;

    void MqttSubscribe(){
        client=null;
    }

    @PostConstruct
    public void initStart() {
        if( TOPICS == null ) {
            TOPICS = new String[MAX_TOPIC_NUM];
        }

        if( oldTopics == null ) {
            oldTopics = new String[MAX_TOPIC_NUM];
        }
    }

    /**
     * 判断服务器是否连接
     * @return
     */
    public boolean isConnected(){
        return client.isConnected();
    }

    /**
     * 重启服务器
     * @param strhost   服务器地址
     * @param port      端口
     * @param mainTopic 主题
     * @param user      用户名
     * @param passwd    密码
     */
    public void ReStart(String strhost,int port,String mainTopic, String user, String passwd){
        int i=0;
        GetMqttSubInfo(strhost,port,mainTopic, user, passwd);
        if(client != null){
            try {
                if(oldTopics[0] != null) {
                    client.unsubscribe(oldTopics);
                }
                for(i=0;i<MAX_TOPIC_NUM;i++){
                    oldTopics[i] = TOPICS[i];
                }

                client.disconnect();
                Thread.sleep(100);

                client=null;
            } catch (MqttException e) {
                e.printStackTrace();
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        start();
    }

    /**
     * 配置mqtt连接参数
     * @param strhost   服务器地址
     * @param port      端口
     * @param mainTopic topic
     * @param user      用户名
     * @param passwd    密码
     */
    private void GetMqttSubInfo(String strhost,int port,String mainTopic, String user, String passwd){
        int num=0;

        if( TOPICS == null ) {
            TOPICS = new String[MAX_TOPIC_NUM];
        }

        int index=strhost.indexOf("tcp://");
        if( index == -1 ) {
            HOST = "tcp://"+strhost+":" + port;
        }
        else{
            HOST = strhost+":" + port;
        }
        userName = user;
        passWord = passwd;

        TOPICS[0] = mainTopic;

        for(int i=0;i<MAX_TOPIC_NUM;i++){
            oldTopics[i] = TOPICS[i];
        }

        clientid = IDGenerate.getID();
    }

    public void start() {
        try {
            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的连接设置
            options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(false);
            // 设置连接的用户名
            options.setUserName(userName);
            // 设置连接的密码
            if(passWord!=null) {
                options.setPassword(passWord.toCharArray());
            }
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            //自动重连
            options.setAutomaticReconnect(true);
            // 设置回调
            subCallback.setServ(this);
            client.setCallback(subCallback);
//          MqttTopic topic = client.getTopic(TOPIC1);
            //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
//遗嘱        options.setWill(topic, "close".getBytes(), 2, true);
            client.connect(options);

            System.out.println("Subscribe is connected\n");
            //订阅消息
/*
            int[] Qos  = {1};
            String[] topic1 = {TOPIC1};
            client.subscribe(topic1, Qos);*/

            System.out.println("Subscribe is subscribed\n");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /*
        重新订阅,用于重连后重新订阅主题
     */
    public void doSub(){
        //订阅消息
        int[] Qos  = {1};

        try {
            if( client != null && client.isConnected() ) {
                client.subscribe(TOPICS, Qos);
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public void main(String[] args) throws MqttException {
        MqttSubscribe client = new MqttSubscribe();
        client.start();
    }
}

1、【SubCallback】是订阅回调函数

2、定义连接参数,如服务器地址【host】等

3、@PostConstruct注解作为java自己的注解,其作用就是在init()方法之前判断主题数组【TOPICS】以及旧的主题数组【oldTopics】是否为null,为null则创建为空的数组。

订阅回调函数

如果需要消费订阅的mqtt,则需要在回调函数里获取订阅到的【message】,然后进行业务处理,我这里用的是【DeviceService 】,或者将信息放到redis缓存等。

package com.cn.mqtt;

import com.cn.sys.service.DeviceService;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.Executor;
import java.util.logging.Logger;

public class SubCallback implements MqttCallbackExtended {
    private static final Logger log = Logger.getLogger("SubCallback");


    private Executor executor;

    private MqttSubscribe serv;

    @Autowired
    private DeviceService deviceService;

    public SubCallback(){

    }

    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        log.info("<Mqtt connect> disconnect. reconnecting ...");
  //      serv.start();
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("<Mqtt Sub> deliveryComplete - " + token.isComplete());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("<Mqtt rcv>("+topic+","+message.getQos()+"):"+new String(message.getPayload()));
        executor.execute(new RcvHandleTask(topic,message));
    }

    @Override
    public void connectComplete(boolean b, String s) {
        serv.doSub();
        log.info("<Mqtt Sub> ReSubscribed !");
    }

    public class RcvHandleTask implements Runnable {
        private MqttMessage msg;
        private String topic;

        public RcvHandleTask(String topic,MqttMessage msg) {
            this.msg = msg;this.topic=topic;
        }

        @Override
        public void run() {

            String payload=new String(msg.getPayload());
            log.info("Rcv Msg : " + payload+"   Qos:"+msg.getQos()+" Topic:"+topic);
            deviceService.doRcvFromSubDev(topic,payload);

        }

    }

    public MqttSubscribe getServ() {
        return serv;
    }

    public void setServ(MqttSubscribe serv) {
        this.serv = serv;
    }

    public Executor getExecutor() {
        return executor;
    }

    public void setExecutor(Executor executor) {
        this.executor = executor;
    }
}

处理订阅到的消息

@Service
public class DeviceServiceImpl implements DeviceService {

    @Autowired
    MqttService mqttService;

    @Override
    public void doRcvFromSubDev(String topic, String msg) {
        System.out.println("订阅:" + msg);
        String pubHost = "tcp://192.168.1.115";
        Integer pubPort = 8874;
        String pubTopic= "topic_server2/data";
        mqttService.ReStartPubClients(pubHost, pubPort, pubTopic);
        int status = mqttService.UpSendMsg( msg );
        System.out.println("发布:" + status);
    }
}

mqtt发布

启动入口测试方法是我自己的,请按照实际情况修改。

import com.alibaba.fastjson.JSONObject;
import com.cn.common.utils.IDGenerate;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

/**
 * mqtt 发布
 */
public class MqttPublish {

    private static final Logger log = Logger.getLogger("MqttPublish");

    private String HOST;
    private String TOPIC = "topic_server/data";
    private String userName = "admin";  //非必须
    private String passWord = "public";  //非必须

    private  String clientid = "iot_server_name";

    private MqttClient client;
    private MqttTopic mqtttopic;
    private MqttConnectOptions options;
    private MqttMessage message;

    private boolean inited=false;

    /**
     * 构造函数
     * @throws MqttException
     */
    public MqttPublish() {

        clientid = null;//"CN-"+IDGenerate.getID();
        client = null;//new MqttClient(HOST, clientid, new MemoryPersistence());
        message = null;
        options=null;
        inited=false;

    }

    /**
     * 构造函数
     * @param host  服务器地址
     * @param topic 主题
     * @param qos   消息质量
     * @throws MqttException
     */
    public MqttPublish(String host,String topic,int qos) throws MqttException {
        // MemoryPersistence设置clientid的保存形式,默认为以内存保存
        clientid = "CN-"+IDGenerate.getID();
        client = new MqttClient(host, clientid, new MemoryPersistence());

        message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(true);

        TOPIC=topic;

        options=null;

    }

    public void StartClient(String url,int port, String topic) {
        inited = false;
        TOPIC = topic;

        String strhost = url;
        int index = strhost.indexOf("tcp://");
        if (index == -1) {
            HOST = "tcp://" + strhost + ":" + port;
        } else {
            HOST = strhost + ":" + port;
        }

        clientid = "hm-" + IDGenerate.getID();

        if (client == null) {
            try {
                client = new MqttClient(HOST, clientid, new MemoryPersistence());
            } catch (MqttException e) {
                e.printStackTrace();
                log.info("<Mqtt> Create Client fail!\r\n");
            }
        } else {
            disconnect();
        }

        message = new MqttMessage();
        message.setQos(1);  //保证消息能到达一次
        message.setRetained(true);
        connect();
        inited = true;

    }

    /**
     *  用来连接服务器
     */
    public void connect() {
        options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        options.setAutomaticReconnect(true);
        // 设置超时时间
        options.setConnectionTimeout(10);
        // 设置会话心跳时间
        options.setKeepAliveInterval(20);
        client.setCallback(new PushCallback(this));
        reconnect();

        mqtttopic = client.getTopic(TOPIC);

    }

    /**
     * 重新连接服务器
     */
    public void  reconnect(){
        if( client != null ){
            try {
                client.connect(options);
            } catch (MqttException e) {
                e.printStackTrace();
                log.info("<Mqtt error> NOT connect!");
            }
        }

    }

    /**
     * 断开服务器
     */
    public void disconnect()
    {
        if(client!=null && client.isConnected() ){
            try {
                client.disconnect();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 判断服务器是否连接
     * @return
     */
    public boolean isConnected() {
        return client.isConnected();
    }

    /**
     *  @todo 发布-公共方法
     * @param topic
     * @param message
     * @throws MqttPersistenceException
     * @throws MqttException
     */
    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
            MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("message is published completely! "
                + token.isComplete()+" id="+token.getMessageId());
    }

    /**
     * @todo 发布消息
     * @param str   json字符串
     */
    public void publishMes(String str){
        if( !inited){
            return;
        }
        if( client == null || !client.isConnected() ){
            log.info("Error:Mqtt is NOT connected!Reconnect...");
            disconnect();
            connect();
            return;
        }

        message.clearPayload();
        message.setPayload(str.getBytes());
        try {
            publish(mqtttopic ,message);
        } catch (MqttException e) {
            e.printStackTrace();
        }

    }

    /**
     * @todo 发布消息
     * @param str   json字符串
     * @param topic topic
     */
    public void publishMes(String str,String topic){
        if( !inited){
            return;
        }
        if( client == null || !client.isConnected() ){
            log.info("Error:Mqtt is NOT connected!Reconnect...");
            disconnect();
            connect();
            return;
        }

        message.clearPayload();
        message.setPayload(str.getBytes());
        MqttTopic mt = client.getTopic(topic);
        try {
            publish(mt ,message);
        } catch (MqttException e) {
            e.printStackTrace();
        }

    }

    public String getStrTopic(){
        return TOPIC;
    }

    /**
     *  启动入口测试
     * @param args
     * @throws MqttException
     */
    public static void main(String[] args) throws MqttException {

        JSONObject jObject=new JSONObject();

        jObject.put("msgTime",System.currentTimeMillis());
        jObject.put("msgId","cn1234");
        jObject.put("msgType","rsp.updata");
        jObject.put("deviceUID","cnsgw001");

        JSONObject jOAl=new JSONObject();
        jOAl.put("address","AL16");
        jOAl.put("ts",System.currentTimeMillis());
        jOAl.put("v",0.98);

        //从此处可以看出其实list和json也是互相转换的
        List<JSONObject> jValues = new ArrayList<JSONObject>();
        jValues.add(jOAl);

//        JSONObject jData=new JSONObject();
        JSONObject jOValues=new JSONObject();
        jOValues.put("values",jValues);
        jObject.put("data",jOValues);

        String strPut=jObject.toJSONString();

        MqttPublish server = new MqttPublish();

        server.message = new MqttMessage();
        server.message.setQos(1);  //保证消息能到达一次
        server.message.setRetained(true);
        server.message.setPayload(strPut.getBytes());
        server.publish(server.mqtttopic , server.message);
        System.out.println("ratained状态:"+server.message.isRetained());

        server.disconnect();

        System.out.println("\r\nDisconnected\r\n");
    }

}

发布回调函数

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.logging.Logger;

public class PushCallback implements MqttCallback {
    private static final Logger log = Logger.getLogger("PushCallback");

    private MqttPublish serv;

    public PushCallback(MqttPublish serv){
        this.serv = serv;
    }

    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        log.info("<Mqtt connect>disconnected. Reconnecting...");
        if( serv!=null){
            serv.reconnect();
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("<Mqtt>("+serv.getStrTopic()+") deliveryComplete - " + token.isComplete()+" ID="+token.getMessageId());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("<Push rcv> : " + topic);
    }

}

主服务层

将一系列操作封装到service层,然后调用

import com.cn.mqtt.MqttPublish;
import com.cn.mqtt.MqttSubscribe;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.logging.Logger;

@Service("mqttService")
public class MqttService {
    private static final Logger log = Logger.getLogger("MqttService");

    @Value("${mqtt.broker.host}")
    private String HOST;
    @Value("${mqtt.pub.maintopic}")
    private String TOPIC = "topic_server/data";

    @Autowired
    private MqttSubscribe mqttSubscribe;

    private MqttPublish mqttPublish;

    private boolean bInited=false;

    public MqttService() {
        bInited=false;
        mqttPublish = new MqttPublish();
    }

    /**
     * 重启mqtt订阅服务
     * @param strhost   服务器地址
     * @param port      端口
     * @param mainTopic 主题
     * @param user      用户名
     * @param passwd    密码
     */
    public void ReStartSubClients(String strhost,int port,String mainTopic, String user, String passwd){
        mqttSubscribe.ReStart(strhost,port,mainTopic, user, passwd);
    }

    /**
     * 重启mqtt发布服务
     * @param url   服务器地址
     * @param port  端口
     */
    public void ReStartPubClients(String url,int port, String topic){
        mqttPublish.StartClient(url,port,topic );
    }

    public int doConect(){
        try {
            mqttPublish = new MqttPublish(HOST,TOPIC,1);
            mqttPublish.connect();
            bInited = true;
            return 0;

        } catch (MqttException e) {
            e.printStackTrace();
            bInited=false;
            mqttPublish = null;
            return -1;
        }
    }

    /**
     * 发布消息
     * @param strMsg    消息,json字符串
     * @return
     */
    public int UpSendMsg(String strMsg){
        log.info("MQtt:"+strMsg);

        mqttPublish.publishMes(strMsg);
        return 0;
    }

    /**
     * 发布消息
     * @param strMsg    消息,json字符串
     * @param topic     主题
     * @return
     */
    public int UpSendMsg(String strMsg,String topic){
        log.info("MQtt <"+topic+">:"+strMsg);

        mqttPublish.publishMes(strMsg,topic);
        return 0;
    }

    public boolean IsPubConnected(){
        return mqttPublish.isConnected();
    }

    public boolean IsSubConnected(){
        return mqttSubscribe.isConnected();
    }
}

注册bean

ExcutorConfig

@Configuration
@ImportResource(locations= {"classpath:/message-context.xml"})
public class ExcutorConfig {

}

 message-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
				http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
                http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <bean id="subscribeTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="10" />
        <property name="maxPoolSize" value="15" />
        <property name="queueCapacity" value="200" />
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
        </property>
    </bean>

    <bean name="subCallback" class="com.cn.mqtt.SubCallback">
        <property name="executor" ref="subscribeTaskExecutor" />
    </bean>
</beans>

测试

首先从topic1订阅,得到消息以后发布到topic2。

String strhost = "tcp://127.0.0.1";
Integer port = 1883;
String topic = "topic_server/data";
String user = "admin";
String password = "public";
mqttService.ReStartSubClients(strhost, port, topic, user, password);

控制台打印出结果:

测试成功。 


版权声明:本文为qq_37634156原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。