maven引用
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
mqtt订阅
完整代码如下:
import com.cn.common.utils.IDGenerate;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* mqtt 订阅
*/
@Service
public class MqttSubscribe {
public static final int MAX_TOPIC_NUM = 1;
@Resource(name = "subCallback")
private SubCallback subCallback;
private String[] oldTopics;
private String HOST = "tcp://127.0.0.1:1883";
private String clientid = "iot_server_name";
private String userName = "admin"; //非必须
private String passWord = "public"; //非必须
String[] TOPICS;
private MqttClient client;
private MqttConnectOptions options;
void MqttSubscribe(){
client=null;
}
@PostConstruct
public void initStart() {
if( TOPICS == null ) {
TOPICS = new String[MAX_TOPIC_NUM];
}
if( oldTopics == null ) {
oldTopics = new String[MAX_TOPIC_NUM];
}
}
/**
* 判断服务器是否连接
* @return
*/
public boolean isConnected(){
return client.isConnected();
}
/**
* 重启服务器
* @param strhost 服务器地址
* @param port 端口
* @param mainTopic 主题
* @param user 用户名
* @param passwd 密码
*/
public void ReStart(String strhost,int port,String mainTopic, String user, String passwd){
int i=0;
GetMqttSubInfo(strhost,port,mainTopic, user, passwd);
if(client != null){
try {
if(oldTopics[0] != null) {
client.unsubscribe(oldTopics);
}
for(i=0;i<MAX_TOPIC_NUM;i++){
oldTopics[i] = TOPICS[i];
}
client.disconnect();
Thread.sleep(100);
client=null;
} catch (MqttException e) {
e.printStackTrace();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
start();
}
/**
* 配置mqtt连接参数
* @param strhost 服务器地址
* @param port 端口
* @param mainTopic topic
* @param user 用户名
* @param passwd 密码
*/
private void GetMqttSubInfo(String strhost,int port,String mainTopic, String user, String passwd){
int num=0;
if( TOPICS == null ) {
TOPICS = new String[MAX_TOPIC_NUM];
}
int index=strhost.indexOf("tcp://");
if( index == -1 ) {
HOST = "tcp://"+strhost+":" + port;
}
else{
HOST = strhost+":" + port;
}
userName = user;
passWord = passwd;
TOPICS[0] = mainTopic;
for(int i=0;i<MAX_TOPIC_NUM;i++){
oldTopics[i] = TOPICS[i];
}
clientid = IDGenerate.getID();
}
public void start() {
try {
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的连接设置
options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
if(passWord!=null) {
options.setPassword(passWord.toCharArray());
}
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
//自动重连
options.setAutomaticReconnect(true);
// 设置回调
subCallback.setServ(this);
client.setCallback(subCallback);
// MqttTopic topic = client.getTopic(TOPIC1);
//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
//遗嘱 options.setWill(topic, "close".getBytes(), 2, true);
client.connect(options);
System.out.println("Subscribe is connected\n");
//订阅消息
/*
int[] Qos = {1};
String[] topic1 = {TOPIC1};
client.subscribe(topic1, Qos);*/
System.out.println("Subscribe is subscribed\n");
} catch (Exception e) {
e.printStackTrace();
}
}
/*
重新订阅,用于重连后重新订阅主题
*/
public void doSub(){
//订阅消息
int[] Qos = {1};
try {
if( client != null && client.isConnected() ) {
client.subscribe(TOPICS, Qos);
}
} catch (MqttException e) {
e.printStackTrace();
}
}
public void main(String[] args) throws MqttException {
MqttSubscribe client = new MqttSubscribe();
client.start();
}
}
1、【SubCallback】是订阅回调函数
2、定义连接参数,如服务器地址【host】等
3、@PostConstruct注解作为java自己的注解,其作用就是在init()方法之前判断主题数组【TOPICS】以及旧的主题数组【oldTopics】是否为null,为null则创建为空的数组。
订阅回调函数
如果需要消费订阅的mqtt,则需要在回调函数里获取订阅到的【message】,然后进行业务处理,我这里用的是【DeviceService 】,或者将信息放到redis缓存等。
package com.cn.mqtt;
import com.cn.sys.service.DeviceService;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
public class SubCallback implements MqttCallbackExtended {
private static final Logger log = Logger.getLogger("SubCallback");
private Executor executor;
private MqttSubscribe serv;
@Autowired
private DeviceService deviceService;
public SubCallback(){
}
@Override
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
log.info("<Mqtt connect> disconnect. reconnecting ...");
// serv.start();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("<Mqtt Sub> deliveryComplete - " + token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("<Mqtt rcv>("+topic+","+message.getQos()+"):"+new String(message.getPayload()));
executor.execute(new RcvHandleTask(topic,message));
}
@Override
public void connectComplete(boolean b, String s) {
serv.doSub();
log.info("<Mqtt Sub> ReSubscribed !");
}
public class RcvHandleTask implements Runnable {
private MqttMessage msg;
private String topic;
public RcvHandleTask(String topic,MqttMessage msg) {
this.msg = msg;this.topic=topic;
}
@Override
public void run() {
String payload=new String(msg.getPayload());
log.info("Rcv Msg : " + payload+" Qos:"+msg.getQos()+" Topic:"+topic);
deviceService.doRcvFromSubDev(topic,payload);
}
}
public MqttSubscribe getServ() {
return serv;
}
public void setServ(MqttSubscribe serv) {
this.serv = serv;
}
public Executor getExecutor() {
return executor;
}
public void setExecutor(Executor executor) {
this.executor = executor;
}
}
处理订阅到的消息
@Service
public class DeviceServiceImpl implements DeviceService {
@Autowired
MqttService mqttService;
@Override
public void doRcvFromSubDev(String topic, String msg) {
System.out.println("订阅:" + msg);
String pubHost = "tcp://192.168.1.115";
Integer pubPort = 8874;
String pubTopic= "topic_server2/data";
mqttService.ReStartPubClients(pubHost, pubPort, pubTopic);
int status = mqttService.UpSendMsg( msg );
System.out.println("发布:" + status);
}
}
mqtt发布
启动入口测试方法是我自己的,请按照实际情况修改。
import com.alibaba.fastjson.JSONObject;
import com.cn.common.utils.IDGenerate;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
/**
* mqtt 发布
*/
public class MqttPublish {
private static final Logger log = Logger.getLogger("MqttPublish");
private String HOST;
private String TOPIC = "topic_server/data";
private String userName = "admin"; //非必须
private String passWord = "public"; //非必须
private String clientid = "iot_server_name";
private MqttClient client;
private MqttTopic mqtttopic;
private MqttConnectOptions options;
private MqttMessage message;
private boolean inited=false;
/**
* 构造函数
* @throws MqttException
*/
public MqttPublish() {
clientid = null;//"CN-"+IDGenerate.getID();
client = null;//new MqttClient(HOST, clientid, new MemoryPersistence());
message = null;
options=null;
inited=false;
}
/**
* 构造函数
* @param host 服务器地址
* @param topic 主题
* @param qos 消息质量
* @throws MqttException
*/
public MqttPublish(String host,String topic,int qos) throws MqttException {
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
clientid = "CN-"+IDGenerate.getID();
client = new MqttClient(host, clientid, new MemoryPersistence());
message = new MqttMessage();
message.setQos(qos);
message.setRetained(true);
TOPIC=topic;
options=null;
}
public void StartClient(String url,int port, String topic) {
inited = false;
TOPIC = topic;
String strhost = url;
int index = strhost.indexOf("tcp://");
if (index == -1) {
HOST = "tcp://" + strhost + ":" + port;
} else {
HOST = strhost + ":" + port;
}
clientid = "hm-" + IDGenerate.getID();
if (client == null) {
try {
client = new MqttClient(HOST, clientid, new MemoryPersistence());
} catch (MqttException e) {
e.printStackTrace();
log.info("<Mqtt> Create Client fail!\r\n");
}
} else {
disconnect();
}
message = new MqttMessage();
message.setQos(1); //保证消息能到达一次
message.setRetained(true);
connect();
inited = true;
}
/**
* 用来连接服务器
*/
public void connect() {
options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setAutomaticReconnect(true);
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
client.setCallback(new PushCallback(this));
reconnect();
mqtttopic = client.getTopic(TOPIC);
}
/**
* 重新连接服务器
*/
public void reconnect(){
if( client != null ){
try {
client.connect(options);
} catch (MqttException e) {
e.printStackTrace();
log.info("<Mqtt error> NOT connect!");
}
}
}
/**
* 断开服务器
*/
public void disconnect()
{
if(client!=null && client.isConnected() ){
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 判断服务器是否连接
* @return
*/
public boolean isConnected() {
return client.isConnected();
}
/**
* @todo 发布-公共方法
* @param topic
* @param message
* @throws MqttPersistenceException
* @throws MqttException
*/
public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("message is published completely! "
+ token.isComplete()+" id="+token.getMessageId());
}
/**
* @todo 发布消息
* @param str json字符串
*/
public void publishMes(String str){
if( !inited){
return;
}
if( client == null || !client.isConnected() ){
log.info("Error:Mqtt is NOT connected!Reconnect...");
disconnect();
connect();
return;
}
message.clearPayload();
message.setPayload(str.getBytes());
try {
publish(mqtttopic ,message);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* @todo 发布消息
* @param str json字符串
* @param topic topic
*/
public void publishMes(String str,String topic){
if( !inited){
return;
}
if( client == null || !client.isConnected() ){
log.info("Error:Mqtt is NOT connected!Reconnect...");
disconnect();
connect();
return;
}
message.clearPayload();
message.setPayload(str.getBytes());
MqttTopic mt = client.getTopic(topic);
try {
publish(mt ,message);
} catch (MqttException e) {
e.printStackTrace();
}
}
public String getStrTopic(){
return TOPIC;
}
/**
* 启动入口测试
* @param args
* @throws MqttException
*/
public static void main(String[] args) throws MqttException {
JSONObject jObject=new JSONObject();
jObject.put("msgTime",System.currentTimeMillis());
jObject.put("msgId","cn1234");
jObject.put("msgType","rsp.updata");
jObject.put("deviceUID","cnsgw001");
JSONObject jOAl=new JSONObject();
jOAl.put("address","AL16");
jOAl.put("ts",System.currentTimeMillis());
jOAl.put("v",0.98);
//从此处可以看出其实list和json也是互相转换的
List<JSONObject> jValues = new ArrayList<JSONObject>();
jValues.add(jOAl);
// JSONObject jData=new JSONObject();
JSONObject jOValues=new JSONObject();
jOValues.put("values",jValues);
jObject.put("data",jOValues);
String strPut=jObject.toJSONString();
MqttPublish server = new MqttPublish();
server.message = new MqttMessage();
server.message.setQos(1); //保证消息能到达一次
server.message.setRetained(true);
server.message.setPayload(strPut.getBytes());
server.publish(server.mqtttopic , server.message);
System.out.println("ratained状态:"+server.message.isRetained());
server.disconnect();
System.out.println("\r\nDisconnected\r\n");
}
}
发布回调函数
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.logging.Logger;
public class PushCallback implements MqttCallback {
private static final Logger log = Logger.getLogger("PushCallback");
private MqttPublish serv;
public PushCallback(MqttPublish serv){
this.serv = serv;
}
@Override
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
log.info("<Mqtt connect>disconnected. Reconnecting...");
if( serv!=null){
serv.reconnect();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("<Mqtt>("+serv.getStrTopic()+") deliveryComplete - " + token.isComplete()+" ID="+token.getMessageId());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("<Push rcv> : " + topic);
}
}
主服务层
将一系列操作封装到service层,然后调用
import com.cn.mqtt.MqttPublish;
import com.cn.mqtt.MqttSubscribe;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.logging.Logger;
@Service("mqttService")
public class MqttService {
private static final Logger log = Logger.getLogger("MqttService");
@Value("${mqtt.broker.host}")
private String HOST;
@Value("${mqtt.pub.maintopic}")
private String TOPIC = "topic_server/data";
@Autowired
private MqttSubscribe mqttSubscribe;
private MqttPublish mqttPublish;
private boolean bInited=false;
public MqttService() {
bInited=false;
mqttPublish = new MqttPublish();
}
/**
* 重启mqtt订阅服务
* @param strhost 服务器地址
* @param port 端口
* @param mainTopic 主题
* @param user 用户名
* @param passwd 密码
*/
public void ReStartSubClients(String strhost,int port,String mainTopic, String user, String passwd){
mqttSubscribe.ReStart(strhost,port,mainTopic, user, passwd);
}
/**
* 重启mqtt发布服务
* @param url 服务器地址
* @param port 端口
*/
public void ReStartPubClients(String url,int port, String topic){
mqttPublish.StartClient(url,port,topic );
}
public int doConect(){
try {
mqttPublish = new MqttPublish(HOST,TOPIC,1);
mqttPublish.connect();
bInited = true;
return 0;
} catch (MqttException e) {
e.printStackTrace();
bInited=false;
mqttPublish = null;
return -1;
}
}
/**
* 发布消息
* @param strMsg 消息,json字符串
* @return
*/
public int UpSendMsg(String strMsg){
log.info("MQtt:"+strMsg);
mqttPublish.publishMes(strMsg);
return 0;
}
/**
* 发布消息
* @param strMsg 消息,json字符串
* @param topic 主题
* @return
*/
public int UpSendMsg(String strMsg,String topic){
log.info("MQtt <"+topic+">:"+strMsg);
mqttPublish.publishMes(strMsg,topic);
return 0;
}
public boolean IsPubConnected(){
return mqttPublish.isConnected();
}
public boolean IsSubConnected(){
return mqttSubscribe.isConnected();
}
}
注册bean
ExcutorConfig
@Configuration
@ImportResource(locations= {"classpath:/message-context.xml"})
public class ExcutorConfig {
}
message-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<bean id="subscribeTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="15" />
<property name="queueCapacity" value="200" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
<bean name="subCallback" class="com.cn.mqtt.SubCallback">
<property name="executor" ref="subscribeTaskExecutor" />
</bean>
</beans>
测试
首先从topic1订阅,得到消息以后发布到topic2。
String strhost = "tcp://127.0.0.1";
Integer port = 1883;
String topic = "topic_server/data";
String user = "admin";
String password = "public";
mqttService.ReStartSubClients(strhost, port, topic, user, password);
控制台打印出结果:
测试成功。
版权声明:本文为qq_37634156原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。