MQTT C++订阅开发流程

  • Post author:
  • Post category:其他




一、背景

先了解一下什么是MQTT


MQTT协议快速了解


是一种协议,可以抓包看到的东西

请添加图片描述



二、订阅Subscribe

我们假设其他客户端发布信息的格式是json格式,需要订阅它,并获取

{
	"myString":"HelloMqtt!",
	"myInt":123456
}

以下代码来自ChatGpt-3.5,测试可用,包含断线重连

当线程main开始时,定义一个lambda的订阅函数,再在下面调用。

订阅成功后本线程是可以继续执行的,而调用了client.start_consuming();的话,就会启动一个监听线程,一直监听代理服务器Broker发来的其他消息,并通过回调来收取(注意,回调处于另一个线程)。

回调需要继承mqtt的callback再自己实现,回调函数可以用lambda写在局部函数里。

// JsonCpp 头文件
#include <json/json.h>  

// mqtt头文件
#include <mqtt/async_client.h>  // Eclipse Paho C++ 客户端库头文件

#include <iostream>
#include <sstream>
// 定义subscribe回调类
class SubscribeCallback : public virtual mqtt::callback {
public:
	std::function<void()> connection_lost_callback;
	std::function<void(const std::string&)> message_arrived_callback;

	// 断线,发出重连信号
	void connection_lost(const std::string &cause) override {
		std::cout << "MQTT Connection lost: " << cause << std::endl;
		if (connection_lost_callback) {
			connection_lost_callback();
		}
	}

	// 接收订阅的消息
	void message_arrived(mqtt::const_message_ptr msg) override {
		try {
			std::string payload = msg->to_string();
			if (message_arrived_callback) {
				message_arrived_callback(payload);
			}
		} catch (const std::exception &e) {
			std::cout << "Exception while handling MQTT message: " << e.what() << std::endl;
		}
	}
};

void main()
{
	const std::string SERVER_ADDRESS("tcp://192.168.1.1:8554");
	const std::string CLIENT_ID("rtmp_mqtt_client");
	const std::string TOPIC("admin/abcde/12345"); 
	const std::string USR("admin");
	const std::string PWD("123456");

	std::mutex mtx;
	bool need_reconnect = 1; // 断线重连标记,初始化为1,保证第一次运行时能够执行一次

	mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID);
	mqtt::connect_options conn_opts;
	conn_opts.set_keep_alive_interval(30);
	conn_opts.set_clean_session(true);
	conn_opts.set_user_name(USR); // 设置用户名
	conn_opts.set_password(PWD); // 设置密码

	// 定义接收与断线回调,执行时与本函数不在同一线程
	SubscribeCallback callback;
	// 设置接收回调
	callback.message_arrived_callback = [this, &mtx, &rtmp_param_changed, &heart_beat_cnt](const std::string &payload) {
		std::cout << "Get published json:\n\t" << payload << std::endl;
		Json::CharReaderBuilder reader;
		Json::Value root;
		std::istringstream ss(payload);
		std::string errs;
		Json::parseFromStream(reader, ss, &root, &errs);

		// 获取订阅到的消息中的json字段
		std::string myString = root["myString"].asString();
		int myInt = root["myInt"].asInt();
	};
	// 设置重连回调
	callback.connection_lost_callback = [&mtx, &need_reconnect]() {
		std::unique_lock<std::mutex> lock(mtx);
		need_reconnect = 1; // 不能用断线回调线程直接重连,会造成死锁
		lock.unlock();
	};
	// 设进client中
	client.set_callback(callback);

	// 定义订阅功能
	auto subscribe_topic = [&client, &conn_opts, &TOPIC]() -> bool {
		// 尝试连接mqtt,如果未连上,则阻塞,直到连上为止。每次重连间隔时长递增5秒
		for (int try_send = 1; true; try_send++){
			try {
				mqtt::token_ptr conntok = client.connect(conn_opts);
				conntok->wait();

				// 订阅主题
				client.subscribe(TOPIC, 1); // 实际上不需要等待 ->wait_for(TIMEOUT);

				// 保持订阅状态,否则只接收一次订阅的消息,启动新线程来监听服务器消息。回调在新消息中执行
				client.start_consuming();

				// TODO,没有断开连接,可能导致资源占用
				//client.disconnect()->wait();

				std::cout << "MQTT Subscribe topic " << TOPIC << " success!!!" << std::endl;
				return true;
			} catch (const mqtt::exception &exc) {
				std::cout << "MQTT Exception: " << exc.what() << std::endl;
			}

			std::cout << "=====try "<< try_send <<" times. reconnect after " << try_send * 5 << " second.=====" << std::endl;
			sleep(try_send * 5);
		}

		return false; // 实际不会执行到这
	};


	// 功能主循环
	while (1){
		if (need_reconnect) { // 断线重连
			// 执行订阅
			subscribe_topic();
			std::unique_lock<std::mutex> lock(mtx);
			need_reconnect = 0;
			lock.unlock();
		}
		
		// ...其他代码
	}
}

执行时打印订阅成功的话,就可以尝试用另一个客户端去发布了。



三、踩过的坑

可能遇到几个问题



1、没设用户名和密码

mqtt::token_ptr conntok = client.connect(conn_opts);

执行connect失败,报错

MQTT Exception: MQTT error [5]: CONNACK return code

请添加图片描述

基本是由于没有设用户名和密码了



2、用断线回调去重连,造成死锁

如果把订阅lambda直接丢进断线回调去执行,会卡死。因为订阅client.start_consuming();会将任务交由断线回调的线程执行,两者互相等锁。所以必须断线回调给出信号,在主循环中重连。



版权声明:本文为aaddggddaa原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。