看到网上的部分代码,对于订阅主题后,使用mqttCallBack接口来接收消息,虽然这种方法也可以接收到消息,但是mqtt Paho提供了正规的方法去接收消息,这里分享一下自己的demo。
使用callback接口订阅类:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import edu.jia.pub.Publish;
public class Subscribe {
public static final String HOST = "tcp://116.196.99.111:1883";
public static final String TOPIC = "topic";
private static final String clientid = "Client Subscribe";
private MqttClient client;
private MqttConnectOptions options;
private String msg = null;
// private String userName = "admin";
// private String passWord = "password";
// private ScheduledExecutorService scheduler;
public MqttClient connect() throws MqttException {
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
this.client = new MqttClient(HOST, clientid, new MemoryPersistence());
// MQTT的连接设置
options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
// options.setUserName(userName);
// // 设置连接的密码
// options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息 判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置回调
this.client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息的主题 : " + topic);
System.out.println("接收消息的质量Qos : " + message.getQos());
//
msg = new String(message.getPayload());
System.out.println(">>>>>>>>>>>>>>>>>>>" + msg);
}
});
// MqttTopic topic = client.getTopic(TOPIC);
// setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
// options.setWill(topic, "close".getBytes(), 2, true);
this.client.connect(options);
return this.client;
}
public IMqttToken subscribe(MqttClient client) throws MqttException {
// 订阅消息
int Qos = 2;
String topic1 = TOPIC;
IMqttToken token = client.subscribeWithResponse(topic1, Qos);
String str = new String(token.getResponse().getPayload());
System.out.println("============================" + str);
return token;
}
public static void main(String[] args) throws Throwable {
System.out.println("下发配置");
Thread.sleep(3000);
System.out.println("配置已经下发");
Thread.sleep(3000);
System.out.println("监听回传消息");
Subscribe sub = new Subscribe();
MqttClient client = sub.connect();
Thread.sleep(3000);
System.out.println("建立连接");
IMqttToken token = sub.subscribe(client);
if (token.isComplete()) {
System.out.println("完成订阅");
Publish Publish = new Publish();
Publish.setMessage(new MqttMessage());
Publish.getMessage().setQos(2);
Publish.getMessage().setRetained(true);
Publish.getMessage().setPayload("2018/06-------mqtt服务端测试 msg2".getBytes());
// 重写publish方法
Publish.publish(Publish.getTopic(), Publish.getMessage());
Publish.getClient().disconnect();
}
if (sub.msg == null) {
Thread.sleep(3000);
if (sub.msg!=null) {
System.out.println("====================" + sub.msg);
}
else {
System.out.println("time out ");
}
}
}
}
使用subscribe方法接收消息:
package edu.jia.sub;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class TestSub {
private static int qos = 2;
private static String broker = "tcp://116.196.99.111:1883";
private static MqttClient connect(String clientId) throws MqttException{
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
connOpts.setConnectionTimeout(10);
connOpts.setKeepAliveInterval(20);
MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
mqttClient.connect(connOpts);
return mqttClient;
}
public static void sub(MqttClient mqttClient,String topic) throws MqttException{
int[] Qos = {qos};
String[] topics = {topic};
mqttClient.subscribe(topics, Qos);
}
private static void runsub(String clientId, String topic) throws MqttException{
MqttClient mqttClient = connect(clientId);
if(mqttClient != null){
sub(mqttClient,topic);
}
mqttClient.subscribe(topic,2, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO Auto-generated method stub
System.out.println(new String(message.getPayload()));
}
});
}
public static void main(String[] args) throws MqttException{
runsub("testSub", "test");
}
}
测试:在远程主机,开启mqtt服务:mosquitto -c /etc/mosquitto/mosquitto.conf
mostuitto_pub -p 1883 -q 2 -t “test ” -m “test —- ok ”
多说几句:subscribe提供了三个重载方法,大家可以根据自己的需要选择需要的重载函数,如果对于listener要求比较高的话,可以将内部类单独写出来继承接口即可。
版权声明:本文为sullivan_jia原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。