目录
Mqtt C中分为同步访问和异步访问模式
- 同步访问的时候,请求会阻塞到访问处,知道有结果返回;
- 异步访问的时候,会将请求委托给Mqtt c client然后直接返回(零等待),最后结果返回之后,会回调对应的回调函数。
前面的博客中有
同步的使用方法
,现在来使用异步。
1:结构体
1.1:MQTTAsync
表示MQTT客户机的句柄。在成功调用MQTTAsync_create()之后有效的客户端句柄可用
/**
* A handle representing an MQTT client. A valid client handle is available
* following a successful call to MQTTAsync_create().
* */
typedef void* MQTTAsyn
在这里插入代码片
1.2:MQTTAsync_connectOptions
MQTTAsync_connectOptions定义了几个控制方法的设置客户机连接到MQTT服务器。默认值在MQTTAsync_connectOptions_initializer中设置。
/**
* MQTTAsync_connectOptions defines several settings that control the way the
* client connects to an MQTT server. Default values are set in
* MQTTAsync_connectOptions_initializer.
*/
typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0, 1, 2, 3 4 5 or 6.
* 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs
* 2 signifies no MQTTVersion
* 3 signifies no automatic reconnect options
* 4 signifies no binary password option (just string)
* 5 signifies no MQTTV5 properties
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
* that should pass without communication between the client and the server
* The client will ensure that at least one message travels across the
* network within each keep alive period. In the absence of a data-related
* message during the time period, the client sends a very small MQTT
* "ping" message, which the server will acknowledge. The keep alive
* interval enables the client to detect when the server is no longer
* available without having to wait for the long TCP/IP timeout.
* Set to 0 if you do not want any keep alive processing.
*/
int keepAliveInterval;
/**
* This is a boolean value. The cleansession setting controls the behaviour
* of both the client and the server at connection and disconnection time.
* The client and server both maintain session state information. This
* information is used to ensure "at least once" and "exactly once"
* delivery, and "exactly once" receipt of messages. Session state also
* includes subscriptions created by an MQTT client. You can choose to
* maintain or discard state information between sessions.
*
* When cleansession is true, the state information is discarded at
* connect and disconnect. Setting cleansession to false keeps the state
* information. When you connect an MQTT client application with
* MQTTAsync_connect(), the client identifies the connection using the
* client identifier and the address of the server. The server checks
* whether session information for this client
* has been saved from a previous connection to the server. If a previous
* session still exists, and cleansession=true, then the previous session
* information at the client and server is cleared. If cleansession=false,
* the previous session is resumed. If no previous session exists, a new
* session is started.
*/
int cleansession;
/**
* This controls how many messages can be in-flight simultaneously.
*/
int maxInflight;
/**
* This is a pointer to an MQTTAsync_willOptions structure. If your
* application does not make use of the Last Will and Testament feature,
* set this pointer to NULL.
*/
MQTTAsync_willOptions* will;
/**
* MQTT servers that support the MQTT v3.1 protocol provide authentication
* and authorisation by user name and password. This is the user name
* parameter.
*/
const char* username;
/**
* MQTT servers that support the MQTT v3.1 protocol provide authentication
* and authorisation by user name and password. This is the password
* parameter.
*/
const char* password;
/**
* The time interval in seconds to allow a connect to complete.
*/
int connectTimeout;
/**
* The time interval in seconds after which unacknowledged publish requests are
* retried during a TCP session. With MQTT 3.1.1 and later, retries are
* not required except on reconnect. 0 turns off in-session retries, and is the
* recommended setting. Adding retries to an already overloaded network only
* exacerbates the problem.
*/
int retryInterval;
/**
* This is a pointer to an MQTTAsync_SSLOptions structure. If your
* application does not make use of SSL, set this pointer to NULL.
*/
MQTTAsync_SSLOptions* ssl;
/**
* A pointer to a callback function to be called if the connect successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess* onSuccess;
/**
* A pointer to a callback function to be called if the connect fails.
* Can be set to NULL, in which case no indication of unsuccessful
* completion will be received.
*/
MQTTAsync_onFailure* onFailure;
/**
* A pointer to any application-specific context. The
* the <i>context</i> pointer is passed to success or failure callback functions to
* provide access to the context information in the callback.
*/
void* context;
/**
* The number of entries in the serverURIs array.
*/
int serverURIcount;
/**
* An array of null-terminated strings specifying the servers to
* which the client will connect. Each string takes the form <i>protocol://host:port</i>.
* <i>protocol</i> must be <i>tcp</i> or <i>ssl</i>. For <i>host</i>, you can
* specify either an IP address or a domain name. For instance, to connect to
* a server running on the local machines with the default MQTT port, specify
* <i>tcp://localhost:1883</i>.
*/
char* const* serverURIs;
/**
* Sets the version of MQTT to be used on the connect.
* MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if that fails, fall back to 3.1
* MQTTVERSION_3_1 (3) = only try version 3.1
* MQTTVERSION_3_1_1 (4) = only try version 3.1.1
*/
int MQTTVersion;
/**
* Reconnect automatically in the case of a connection being lost?
*/
int automaticReconnect;
/**
* Minimum retry interval in seconds. Doubled on each failed retry.
*/
int minRetryInterval;
/**
* Maximum retry interval in seconds. The doubling stops here on failed retries.
int maxRetryInterval;
/**
* Optional binary password. Only checked and used if the password option is NULL
*/
struct {
int len; /**< binary password length */
const void* data; /**< binary password data */
} binarypwd;
/*
* MQTT V5 clean start flag. Only clears state at the beginning of the session.
*/
int cleanstart;
/**
* MQTT V5 properties for connect
*/
MQTTProperties *connectProperties;
/**
* MQTT V5 properties for the will message in the connect
*/
MQTTProperties *willProperties;
/**
* A pointer to a callback function to be called if the connect successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess5* onSuccess5;
/**
* A pointer to a callback function to be called if the connect fails.
* Can be set to NULL, in which case no indication of unsuccessful
* completion will be received.
*/
MQTTAsync_onFailure5* onFailure5;
} MQTTAsync_connectOptions;
MQTTAsync_connectOptions_initializer 会进行一些基本的赋值
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 6, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL}
通过调用MQTTAsync_connectOptions_initializer 赋初值后,还可以根据需要对结构体中的需要被修改的资源进行修改
例如:
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 20; //与服务器保持活动的时间
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;//连接成功时调用的函数
conn_opts.onFailure = onConnectFailure;//连接失败时调用的函数
conn_opts.context = client;//指向任何应用程序特定上下文的指针。在这里指向创建好的句柄
onSuccess指向连接成功时要调用的回调函数,这个函数内部会有订阅主题的功能(调用MQTTAsync_subscribe订阅主题)
如果没有在MQTTAsync_connectOptions 中添加连接成功的回调函数
即:
conn_opts.onSuccess = NULL;//不设置回调函数
可以通过接口函数:MQTTAsync_setConnected来设置连接成功的回调函数。也可以达到相同的效果
1.3:MQTTAsync_disconnectOptions
设置断开连接时的一些参数
typedef struct
{
/** The eyecatcher for this structure. Must be MQTD. */
char struct_id[4];
/** The version number of this structure. Must be 0 or 1. 0 signifies no V5 properties */
int struct_version;
/**
* The client delays disconnection for up to this time (in
* milliseconds) in order to allow in-flight message transfers to complete.
*/
int timeout;
/**
* A pointer to a callback function to be called if the disconnect successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess* onSuccess;
/**
* A pointer to a callback function to be called if the disconnect fails.
* Can be set to NULL, in which case no indication of unsuccessful
* completion will be received.
*/
MQTTAsync_onFailure* onFailure;
/**
* A pointer to any application-specific context. The
* the <i>context</i> pointer is passed to success or failure callback functions to
* provide access to the context information in the callback.
*/
void* context;
/**
* MQTT V5 input properties
*/
MQTTProperties properties;
/**
* Reason code for MQTTV5 disconnect
*/
enum MQTTReasonCodes reasonCode;
/**
* A pointer to a callback function to be called if the disconnect successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess5* onSuccess5;
/**
* A pointer to a callback function to be called if the disconnect fails.
* Can be set to NULL, in which case no indication of unsuccessful
* completion will be received.
*/
MQTTAsync_onFailure5* onFailure5;
} MQTTAsync_disconnectOptions;
onSuccess;指向成功断开连接时的调用的函数
1.4:MQTTAsync_responseOptions
表示每次操作的响应配置,包括 操作成功,操作失败 等操作;
同类的还有:
- MQTTAsync_disconnectOptions : 断开连接选项
- MQTTAsync_connectOptions: 连接选项
- MQTTAsync_init_options:初始化选项 等等
typedef struct MQTTAsync_responseOptions
{
/** The eyecatcher for this structure. Must be MQTR */
char struct_id[4];
/** The version number of this structure. Must be 0 or 1
* if 0, no MQTTV5 options */
int struct_version;
/**
* A pointer to a callback function to be called if the API call successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess* onSuccess;
/**
* A pointer to a callback function to be called if the API call fails.
* Can be set to NULL, in which case no indication of unsuccessful
* completion will be received.
*/
MQTTAsync_onFailure* onFailure;
/**
* A pointer to any application-specific context. The
* the <i>context</i> pointer is passed to success or failure callback functions to
* provide access to the context information in the callback.
*/
void* context;
/**
* A token is returned from the call. It can be used to track
* the state of this request, both in the callbacks and in future calls
* such as ::MQTTAsync_waitForCompletion.
*/
MQTTAsync_token token;
/**
* A pointer to a callback function to be called if the API call successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess5* onSuccess5;
/**
* A pointer to a callback function to be called if the API call successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onFailure5* onFailure5;
/**
* MQTT V5 input properties
*/
MQTTProperties properties;
/*
* MQTT V5 subscribe options, when used with subscribe only.
*/
MQTTSubscribe_options subscribeOptions;
/*
* MQTT V5 subscribe option count, when used with subscribeMany only.
* The number of entries in the subscribe_options_list array.
*/
int subscribeOptionsCount;
/*
* MQTT V5 subscribe option array, when used with subscribeMany only.
*/
MQTTSubscribe_optio
1.5:MQTTAsync_message
表示MQTT消息的有效负载和属性的结构
/**
* A structure representing the payload and attributes of an MQTT message. The
* message topic is not part of this structure (see MQTTAsync_publishMessage(),
* MQTTAsync_publish(), MQTTAsync_receive(), MQTTAsync_freeMessage()
* and MQTTAsync_messageArrived()).
*/
typedef struct
{
/** The eyecatcher for this structure. must be MQTM. */
char struct_id[4];
/** The version number of this structure. Must be 0 or 1.
* 0 indicates no message properties */
int struct_version;
/** The length of the MQTT message payload in bytes. */
int payloadlen;
/** A pointer to the payload of the MQTT message. */
void* payload;
/**
* The quality of service (QoS) assigned to the message.
* There are three levels of QoS:
* <DL>
* <DT><B>QoS0</B></DT>
* <DD>Fire and forget - the message may not be delivered</DD>
* <DT><B>QoS1</B></DT>
* <DD>At least once - the message will be delivered, but may be
* delivered more than once in some circumstances.</DD>
* <DT><B>QoS2</B></DT>
* <DD>Once and one only - the message will be delivered exactly once.</DD>
* </DL>
*/
int qos;
/**
* The retained flag serves two purposes depending on whether the message
* it is associated with is being published or received.
*
* <b>retained = true</b><br>
* For messages being published, a true setting indicates that the MQTT
* server should retain a copy of the message. The message will then be
* transmitted to new subscribers to a topic that matches the message topic.
* For subscribers registering a new subscription, the flag being true
* indicates that the received message is not a new one, but one that has
* been retained by the MQTT server.
*
* <b>retained = false</b> <br>
* For publishers, this indicates that this message should not be retained
* by the MQTT server. For subscribers, a false setting indicates this is
* a normal message, received as a result of it being published to the
* server.
*/
int retained;
/**
* The dup flag indicates whether or not this message is a duplicate.
* It is only meaningful when receiving QoS1 messages. When true, the
* client application should take appropriate action to deal with the
* duplicate message.
*/
int dup;
/** The message identifier is normally reserved for internal use by the
* MQTT client and server.
*/
int msgid;
/**
* The MQTT V5 properties associated with the message.
*/
MQTTProperties properties;
} MQTTAsync_message;
2:接口函数
2.1: MQTTAsync_create
创建句柄的语句
DLLExport int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
int persistence_type, void* persistence_context);
MQTTAsync* handle:需要创建的句柄指针
const char* serverURI:要连接的服务器路径
const char* clientId: 客户端的ID
2.2: MQTTAsync_setCallbacks
此函数为特定客户端设置全局回调函数。如果您的客户端应用程序不使用特定的回调,设置
相关参数为空。任何必要的消息确认和状态通信在后台处理,不需要任何干预从客户端应用程序。如果你没有设置一个messageArrived回调功能,您将不会被通知收到任何消息作为订阅的结果。
DLLExport int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
2.3: MQTTAsync_connect
此函数尝试连接之前创建的客户端(参见MQTTAsync_create())到使用指定选项的MQTT服务器。如果你想要启用异步消息和状态通知,你必须调用MQTTAsync_setCallbacks()先于MQTTAsync_connect()
DLLExport int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options);
2.4: MQTTAsync_disconnect
此函数尝试断开客户端与MQTT的连接服务器。以便让客户端有时间完成消息处理当调用此函数时,超时时间为指定。当超时时间超过时,客户端甚至断开连接如果仍有未完成的邮件确认
DLLExport int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options);
2.5:MQTTAsync_setConnected
为客户端设置MQTTAsync_connected()回调函数
DLLExport int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* co);
其中回调函数的原型
typedef void MQTTAsync_connected(void* context, char* cause);
2.6:MQTTAsync_subscribe
此函数尝试将客户端订阅到单个主题
DLLExport int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response);
3:示例
订阅端:MQTTAsync_subscribe.c
/*************************************************************************
> File Name: MQTTAsync_subscribe.c
> Author: kayshi
> Mail: kayshi2019@qq.com
> Created Time: Wed 28 Oct 2020 10:21:13 AM CST
************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTAsync_token deliveredtoken;
int disc_finished = 0;
int subsribed = 0;
int finished = 0;
void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnect lost\n");
if(cause)
printf("cause: %s\n", cause);
printf("Reconneting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if((rc = MQTTAsync_connect(client, &conn_opts))!= MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
int msgarrvd(void *context, char *topicName, int topiclen, MQTTAsync_message *message)
{
int i;
char *payloadptr;
printf("Message arrived\n");
printf("topic: %s\n", topicName);
printf("message:");
payloadptr = message->payload;
for(i = 0; i < message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onDisconnet(void *context, MQTTAsync_successData *response)
{
printf("Successful disconnection\n");
disc_finished = 1;
}
void onSubscribe(void *context, MQTTAsync_successData *response)
{
printf("Subsrive successed\n");
subsribed = 1;
}
void onSubsribeFailure(void *context, MQTTAsync_failureData *response)
{
printf("Subscribe failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnectFailure(void *context, MQTTAsync_failureData *response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnect(void *context, MQTTAsync_successData *response)
{
MQTTAsync client= (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
printf("Successful connection\n");
printf("Subscruibing to topic %s\n for client %s\n using Qos %d\n\n" "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
opts.onSuccess = onSubscribe;
opts.onFailure = onSubsribeFailure;
opts.context = client;
deliveredtoken = 0;
if((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subsribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
int main(int argc, char* argv[])
{
int rc;
int ch;
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while(!subsribed)
sleep(3);
if(finished)
goto exit;
do
{
ch = getchar();
} while(ch != 'Q' && ch != 'q');
disc_opts.onSuccess = onDisconnet;
if((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while(!disc_finished)
sleep(3);
exit:
MQTTAsync_destroy(&client);
return rc;
}
发布端:MQTTAsync_publish.c
/*************************************************************************
> File Name: MQTTAsync_publish/.c
> Author: kayshi
> Mail: kayshi2019@qq.com
> Created Time: Wed 28 Oct 2020 10:21:13 AM CST
************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTAsync_token deliveredtoken;
int finished = 0;
void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnect lost\n");
printf("cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
void onDisconnect(void *context, MQTTAsync_successData *response)
printf("Successful disconnection\n");
finished = 1;
}
void onSend(void *context, MQTTAsync_successData *response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
printf("Message with token value %d delivery confirmed\n", response->token);
opts.onSuccess = onDisconnect;
opts.context = client;
if((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
void onConnectFailure(void *context, MQTTAsync_failureData *response)
{
printf("Connect failed, rc %d\n", response? response->code : 0);
finished = 1;
}
void onConnect(void *context, MQTTAsync_successData *response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
printf("Successful connection\n");
opts.onSuccess = onSend;
opts.context = client;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = (int)strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
deliveredtoken = 0;
if((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
int main(int argc, char* argv[])
{
int rc;
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, NULL, connlost, NULL, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Waiting for publication of %s \n"
"on topic %s for client with ClientID %s\n",
PAYLOAD, TOPIC, CLIENTID);
while(!finished)
sleep(4);
MQTTAsync_destroy(&client);
return rc;
}
源码路径:
https://download.csdn.net/download/weixin_36209467/13070777