MQTT客户端:org.eclipse.paho.client.mqttv3
MQTT服务器:EMQ
MQTT服务器官网:http://emqtt.com/
如果第一次看MQTT,可以参考:http://blog.csdn.net/whb3299065/article/details/79088928
在之前的文章中我们简单介绍了MQTT的收发消息,并没有实现重连机制,我在实现重连时,发现有不少坑。经常提示
线程异常中断之类的.
首先,让我们先进行一下准备工作,我们需要先定义一个连接对象
private static MqttClient client;
//生成配置对象,用户名,密码等
public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(account);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
return options;
}
public void connect() throws MqttException {
//防止重复创建MQTTClient实例
if (client==null) {
client = new MqttClient(host + ":" + port, clientId, new MemoryPersistence());
client.setCallback(new PushCallback(MiddlewareMqttClient.this));
}
MqttConnectOptions options = getOptions();
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
if (!client.isConnected()) {
client.connect(options);
System.out.println("连接成功");
}else {//这里的逻辑是如果连接成功就重新连接
client.disconnect();
client.connect(getOptions(options));
System.out.println("连接成功");
}
}
//监听设备发来的消息
//PostConstruct确保该函数在类被初始化时调用
@PostConstruct
public void init() {
connect();
//getMessage是我自己封装的一个订阅主题的函数,对于聪明的你们,应该很简单吧
getMessage(topic, 2);
getMessage(taskTopic, 2);
getMessage("$SYS/brokers/emq@127.0.0.1/clients/#", 2);
}
由于这里需要将连接对象独立出来,所以专们将连接函数提取出来了,其他函数简单贴一下代码,不懂的可以去查一下之前的代码
大家看一下上面连接对象中:client.setCallback(new PushCallback(MiddlewareMqttClient.this));
这里MiddlewareMqttClient类为MQTT客户端类,这个类用于处理接受道德消息(自己定义,只要用来处理消息即可)
我们在MqttCallback 的实现类中,保存了这个对象。
public class PushCallback implements MqttCallback {
public PushCallback() {}
MiddlewareMqttClient service;
public PushCallback(MiddlewareMqttClient service) {
this.service = service;
}
然后,我们在掉线回调中,调用service的连接信息:
//连接丢失:一般用与重连如果发生丢失,就会调用这里
public void connectionLost(Throwable throwable) {
while (true){
try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
Thread.sleep(1000);
service.init();
break;
}catch (Exception e){
continue;
}
}
}
这样,我们就实现了断线重连的机制了。
下面我再来填一个坑,大家想一想,如果第一次就没有连接成功会怎么样呢,,,客户端并不算掉线,所以并不会触发掉线重连的机制,所以我们在连接时就应该不断连接,于是我重载了一下我的连接对象。这里如果给定999以下参数,会只进行有限次数的重连,如果让系统不停重连需要number>999
public void connect(int number) {
for (int i = 0; i < number||number>999; i++) {
try {
connect();
} catch (Exception e) {
e.printStackTrace();
//Thread.sleep(5000);
System.err.println("连接失败,正在第"+i+"次尝试");
continue;
}
return;
}
throw new RuntimeException("无法连接服务器");
}
目前我能想到的就只有这些,测试证明,效果不错
版权声明:本文为whb3299065原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。