MQTT断线重连

  • Post author:
  • Post category:其他

MQTT客户端:org.eclipse.paho.client.mqttv3
MQTT服务器:EMQ
MQTT服务器官网:http://emqtt.com/
如果第一次看MQTT,可以参考:http://blog.csdn.net/whb3299065/article/details/79088928
在之前的文章中我们简单介绍了MQTT的收发消息,并没有实现重连机制,我在实现重连时,发现有不少坑。经常提示
线程异常中断之类的.
首先,让我们先进行一下准备工作,我们需要先定义一个连接对象

private static MqttClient client;
//生成配置对象,用户名,密码等
public MqttConnectOptions getOptions() {
	MqttConnectOptions options = new MqttConnectOptions();
	options.setCleanSession(false);
	options.setUserName(account);
	options.setPassword(password.toCharArray());
	options.setConnectionTimeout(10);
	options.setKeepAliveInterval(20);
	return options;
}
public void connect() throws MqttException {
	//防止重复创建MQTTClient实例
	if (client==null) {
		client = new MqttClient(host + ":" + port, clientId, new MemoryPersistence());
		client.setCallback(new PushCallback(MiddlewareMqttClient.this));
	}
	MqttConnectOptions options = getOptions();
	//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
	if (!client.isConnected()) {
		client.connect(options);
		System.out.println("连接成功");
	}else {//这里的逻辑是如果连接成功就重新连接
		client.disconnect();
		client.connect(getOptions(options));
		System.out.println("连接成功");
	}
}
//监听设备发来的消息
//PostConstruct确保该函数在类被初始化时调用
@PostConstruct
public void init() {
	connect();
	//getMessage是我自己封装的一个订阅主题的函数,对于聪明的你们,应该很简单吧
	getMessage(topic, 2);
	getMessage(taskTopic, 2);
	getMessage("$SYS/brokers/emq@127.0.0.1/clients/#", 2);
}

由于这里需要将连接对象独立出来,所以专们将连接函数提取出来了,其他函数简单贴一下代码,不懂的可以去查一下之前的代码

大家看一下上面连接对象中:client.setCallback(new PushCallback(MiddlewareMqttClient.this));
这里MiddlewareMqttClient类为MQTT客户端类,这个类用于处理接受道德消息(自己定义,只要用来处理消息即可)
我们在MqttCallback 的实现类中,保存了这个对象。

public class PushCallback implements MqttCallback {
	public PushCallback() {}
	MiddlewareMqttClient service;
	public PushCallback(MiddlewareMqttClient service) {
		this.service = service;
	}

然后,我们在掉线回调中,调用service的连接信息:

//连接丢失:一般用与重连如果发生丢失,就会调用这里
public void connectionLost(Throwable throwable) {
	while (true){
		try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
			Thread.sleep(1000);
			service.init();
			break;
		}catch (Exception e){
			continue;
		}
	}
}

这样,我们就实现了断线重连的机制了。
下面我再来填一个坑,大家想一想,如果第一次就没有连接成功会怎么样呢,,,客户端并不算掉线,所以并不会触发掉线重连的机制,所以我们在连接时就应该不断连接,于是我重载了一下我的连接对象。这里如果给定999以下参数,会只进行有限次数的重连,如果让系统不停重连需要number>999

public void connect(int number) {
	for (int i = 0; i < number||number>999; i++) {
		try {
			connect();
		} catch (Exception e) {
			e.printStackTrace();
			//Thread.sleep(5000);
			System.err.println("连接失败,正在第"+i+"次尝试");
			continue;
		}
		return;
	}
	throw new RuntimeException("无法连接服务器");
}

目前我能想到的就只有这些,测试证明,效果不错


版权声明:本文为whb3299065原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。