一、背景
先了解一下什么是MQTT
MQTT协议快速了解
是一种协议,可以抓包看到的东西
二、订阅Subscribe
我们假设其他客户端发布信息的格式是json格式,需要订阅它,并获取
{
"myString":"HelloMqtt!",
"myInt":123456
}
以下代码来自ChatGpt-3.5,测试可用,包含断线重连
当线程main开始时,定义一个lambda的订阅函数,再在下面调用。
订阅成功后本线程是可以继续执行的,而调用了client.start_consuming();的话,就会启动一个监听线程,一直监听代理服务器Broker发来的其他消息,并通过回调来收取(注意,回调处于另一个线程)。
回调需要继承mqtt的callback再自己实现,回调函数可以用lambda写在局部函数里。
// JsonCpp 头文件
#include <json/json.h>
// mqtt头文件
#include <mqtt/async_client.h> // Eclipse Paho C++ 客户端库头文件
#include <iostream>
#include <sstream>
// 定义subscribe回调类
class SubscribeCallback : public virtual mqtt::callback {
public:
std::function<void()> connection_lost_callback;
std::function<void(const std::string&)> message_arrived_callback;
// 断线,发出重连信号
void connection_lost(const std::string &cause) override {
std::cout << "MQTT Connection lost: " << cause << std::endl;
if (connection_lost_callback) {
connection_lost_callback();
}
}
// 接收订阅的消息
void message_arrived(mqtt::const_message_ptr msg) override {
try {
std::string payload = msg->to_string();
if (message_arrived_callback) {
message_arrived_callback(payload);
}
} catch (const std::exception &e) {
std::cout << "Exception while handling MQTT message: " << e.what() << std::endl;
}
}
};
void main()
{
const std::string SERVER_ADDRESS("tcp://192.168.1.1:8554");
const std::string CLIENT_ID("rtmp_mqtt_client");
const std::string TOPIC("admin/abcde/12345");
const std::string USR("admin");
const std::string PWD("123456");
std::mutex mtx;
bool need_reconnect = 1; // 断线重连标记,初始化为1,保证第一次运行时能够执行一次
mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID);
mqtt::connect_options conn_opts;
conn_opts.set_keep_alive_interval(30);
conn_opts.set_clean_session(true);
conn_opts.set_user_name(USR); // 设置用户名
conn_opts.set_password(PWD); // 设置密码
// 定义接收与断线回调,执行时与本函数不在同一线程
SubscribeCallback callback;
// 设置接收回调
callback.message_arrived_callback = [this, &mtx, &rtmp_param_changed, &heart_beat_cnt](const std::string &payload) {
std::cout << "Get published json:\n\t" << payload << std::endl;
Json::CharReaderBuilder reader;
Json::Value root;
std::istringstream ss(payload);
std::string errs;
Json::parseFromStream(reader, ss, &root, &errs);
// 获取订阅到的消息中的json字段
std::string myString = root["myString"].asString();
int myInt = root["myInt"].asInt();
};
// 设置重连回调
callback.connection_lost_callback = [&mtx, &need_reconnect]() {
std::unique_lock<std::mutex> lock(mtx);
need_reconnect = 1; // 不能用断线回调线程直接重连,会造成死锁
lock.unlock();
};
// 设进client中
client.set_callback(callback);
// 定义订阅功能
auto subscribe_topic = [&client, &conn_opts, &TOPIC]() -> bool {
// 尝试连接mqtt,如果未连上,则阻塞,直到连上为止。每次重连间隔时长递增5秒
for (int try_send = 1; true; try_send++){
try {
mqtt::token_ptr conntok = client.connect(conn_opts);
conntok->wait();
// 订阅主题
client.subscribe(TOPIC, 1); // 实际上不需要等待 ->wait_for(TIMEOUT);
// 保持订阅状态,否则只接收一次订阅的消息,启动新线程来监听服务器消息。回调在新消息中执行
client.start_consuming();
// TODO,没有断开连接,可能导致资源占用
//client.disconnect()->wait();
std::cout << "MQTT Subscribe topic " << TOPIC << " success!!!" << std::endl;
return true;
} catch (const mqtt::exception &exc) {
std::cout << "MQTT Exception: " << exc.what() << std::endl;
}
std::cout << "=====try "<< try_send <<" times. reconnect after " << try_send * 5 << " second.=====" << std::endl;
sleep(try_send * 5);
}
return false; // 实际不会执行到这
};
// 功能主循环
while (1){
if (need_reconnect) { // 断线重连
// 执行订阅
subscribe_topic();
std::unique_lock<std::mutex> lock(mtx);
need_reconnect = 0;
lock.unlock();
}
// ...其他代码
}
}
执行时打印订阅成功的话,就可以尝试用另一个客户端去发布了。
三、踩过的坑
可能遇到几个问题
1、没设用户名和密码
mqtt::token_ptr conntok = client.connect(conn_opts);
执行connect失败,报错
MQTT Exception: MQTT error [5]: CONNACK return code
基本是由于没有设用户名和密码了
2、用断线回调去重连,造成死锁
如果把订阅lambda直接丢进断线回调去执行,会卡死。因为订阅client.start_consuming();会将任务交由断线回调的线程执行,两者互相等锁。所以必须断线回调给出信号,在主循环中重连。
版权声明:本文为aaddggddaa原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。