MQTT异步使用

  • Post author:
  • Post category:其他


Mqtt C中分为同步访问和异步访问模式

  1. 同步访问的时候,请求会阻塞到访问处,知道有结果返回;
  2. 异步访问的时候,会将请求委托给Mqtt c client然后直接返回(零等待),最后结果返回之后,会回调对应的回调函数。

前面的博客中有

同步的使用方法

,现在来使用异步。



1:结构体



1.1:MQTTAsync

表示MQTT客户机的句柄。在成功调用MQTTAsync_create()之后有效的客户端句柄可用

/**
 * A handle representing an MQTT client. A valid client handle is available
 * following a successful call to MQTTAsync_create().
 * */
typedef void* MQTTAsyn
在这里插入代码片



1.2:MQTTAsync_connectOptions

MQTTAsync_connectOptions定义了几个控制方法的设置客户机连接到MQTT服务器。默认值在MQTTAsync_connectOptions_initializer中设置。

/**
 * MQTTAsync_connectOptions defines several settings that control the way the
 * client connects to an MQTT server.  Default values are set in
 * MQTTAsync_connectOptions_initializer.
 */
typedef struct
{
    /** The eyecatcher for this structure.  must be MQTC. */
    char struct_id[4];
    /** The version number of this structure.  Must be 0, 1, 2, 3 4 5 or 6.
      * 0 signifies no SSL options and no serverURIs
      * 1 signifies no serverURIs
    * 2 signifies no MQTTVersion
    * 3 signifies no automatic reconnect options
    * 4 signifies no binary password option (just string)
    * 5 signifies no MQTTV5 properties
      */
    int struct_version;
    /** The "keep alive" interval, measured in seconds, defines the maximum time
      * that should pass without communication between the client and the server
      * The client will ensure that at least one message travels across the
      * network within each keep alive period.  In the absence of a data-related
      * message during the time period, the client sends a very small MQTT
      * "ping" message, which the server will acknowledge. The keep alive
      * interval enables the client to detect when the server is no longer
      * available without having to wait for the long TCP/IP timeout.
      * Set to 0 if you do not want any keep alive processing.
      */
    int keepAliveInterval;
    /**
      * This is a boolean value. The cleansession setting controls the behaviour
      * of both the client and the server at connection and disconnection time.
      * The client and server both maintain session state information. This
      * information is used to ensure "at least once" and "exactly once"
      * delivery, and "exactly once" receipt of messages. Session state also
      * includes subscriptions created by an MQTT client. You can choose to
      * maintain or discard state information between sessions.
      *
      * When cleansession is true, the state information is discarded at
      * connect and disconnect. Setting cleansession to false keeps the state
      * information. When you connect an MQTT client application with
      * MQTTAsync_connect(), the client identifies the connection using the
      * client identifier and the address of the server. The server checks
      * whether session information for this client
      * has been saved from a previous connection to the server. If a previous
      * session still exists, and cleansession=true, then the previous session
      * information at the client and server is cleared. If cleansession=false,
      * the previous session is resumed. If no previous session exists, a new
      * session is started.
      */
    int cleansession;
    /**
      * This controls how many messages can be in-flight simultaneously.
      */
    int maxInflight;
/**
      * This is a pointer to an MQTTAsync_willOptions structure. If your
      * application does not make use of the Last Will and Testament feature,
      * set this pointer to NULL.
      */
    MQTTAsync_willOptions* will;
    /**
      * MQTT servers that support the MQTT v3.1 protocol provide authentication
      * and authorisation by user name and password. This is the user name
      * parameter.
      */
    const char* username;
    /**
      * MQTT servers that support the MQTT v3.1 protocol provide authentication
      * and authorisation by user name and password. This is the password
      * parameter.
      */
    const char* password;
    /**
      * The time interval in seconds to allow a connect to complete.
      */
    int connectTimeout;
    /**
     * The time interval in seconds after which unacknowledged publish requests are
     * retried during a TCP session.  With MQTT 3.1.1 and later, retries are
     * not required except on reconnect.  0 turns off in-session retries, and is the
     * recommended setting.  Adding retries to an already overloaded network only
     * exacerbates the problem.
     */
    int retryInterval;
    /**
      * This is a pointer to an MQTTAsync_SSLOptions structure. If your
      * application does not make use of SSL, set this pointer to NULL.
      */
    MQTTAsync_SSLOptions* ssl;
    /**
      * A pointer to a callback function to be called if the connect successfully
      * completes.  Can be set to NULL, in which case no indication of successful
      * completion will be received.
      */
    MQTTAsync_onSuccess* onSuccess;
 /**
      * A pointer to a callback function to be called if the connect fails.
      * Can be set to NULL, in which case no indication of unsuccessful
      * completion will be received.
      */
    MQTTAsync_onFailure* onFailure;
    /**
      * A pointer to any application-specific context. The
      * the <i>context</i> pointer is passed to success or failure callback functions to
      * provide access to the context information in the callback.
      */
    void* context;
    /**
      * The number of entries in the serverURIs array.
      */
    int serverURIcount;
    /**
      * An array of null-terminated strings specifying the servers to
      * which the client will connect. Each string takes the form <i>protocol://host:port</i>.
      * <i>protocol</i> must be <i>tcp</i> or <i>ssl</i>. For <i>host</i>, you can
      * specify either an IP address or a domain name. For instance, to connect to
      * a server running on the local machines with the default MQTT port, specify
      * <i>tcp://localhost:1883</i>.
      */
    char* const* serverURIs;
    /**
      * Sets the version of MQTT to be used on the connect.
      * MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if that fails, fall back to 3.1
      * MQTTVERSION_3_1 (3) = only try version 3.1
      * MQTTVERSION_3_1_1 (4) = only try version 3.1.1
      */
    int MQTTVersion;
    /**
      * Reconnect automatically in the case of a connection being lost?
      */
    int automaticReconnect;
    /**
      * Minimum retry interval in seconds.  Doubled on each failed retry.
      */
    int minRetryInterval;
    /**
      * Maximum retry interval in seconds.  The doubling stops here on failed retries.
 int maxRetryInterval;
    /**
     * Optional binary password.  Only checked and used if the password option is NULL
     */
    struct {
        int len;            /**< binary password length */
        const void* data;  /**< binary password data */
    } binarypwd;
    /*
     * MQTT V5 clean start flag.  Only clears state at the beginning of the session.
     */
    int cleanstart;
    /**
     * MQTT V5 properties for connect
     */
    MQTTProperties *connectProperties;
    /**
     * MQTT V5 properties for the will message in the connect
     */
    MQTTProperties *willProperties;
    /**
      * A pointer to a callback function to be called if the connect successfully
      * completes.  Can be set to NULL, in which case no indication of successful
      * completion will be received.
      */
    MQTTAsync_onSuccess5* onSuccess5;
    /**
      * A pointer to a callback function to be called if the connect fails.
      * Can be set to NULL, in which case no indication of unsuccessful
      * completion will be received.
      */
    MQTTAsync_onFailure5* onFailure5;
} MQTTAsync_connectOptions;

MQTTAsync_connectOptions_initializer 会进行一些基本的赋值

#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 6, 60, 1, 65535, NULL, NULL, NULL, 30, 0,\
NULL, NULL, NULL, NULL, 0, NULL, MQTTVERSION_DEFAULT, 0, 1, 60, {0, NULL}, 0, NULL, NULL, NULL, NULL}

通过调用MQTTAsync_connectOptions_initializer 赋初值后,还可以根据需要对结构体中的需要被修改的资源进行修改

例如:

MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;

conn_opts.keepAliveInterval = 20; //与服务器保持活动的时间
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;//连接成功时调用的函数
conn_opts.onFailure = onConnectFailure;//连接失败时调用的函数
conn_opts.context = client;//指向任何应用程序特定上下文的指针。在这里指向创建好的句柄

onSuccess指向连接成功时要调用的回调函数,这个函数内部会有订阅主题的功能(调用MQTTAsync_subscribe订阅主题)

如果没有在MQTTAsync_connectOptions 中添加连接成功的回调函数

即:

conn_opts.onSuccess = NULL;//不设置回调函数

可以通过接口函数:MQTTAsync_setConnected来设置连接成功的回调函数。也可以达到相同的效果



1.3:MQTTAsync_disconnectOptions

设置断开连接时的一些参数

typedef struct
{
    /** The eyecatcher for this structure. Must be MQTD. */
    char struct_id[4];
    /** The version number of this structure.  Must be 0 or 1.  0 signifies no V5 properties */
    int struct_version;
    /**
      * The client delays disconnection for up to this time (in
      * milliseconds) in order to allow in-flight message transfers to complete.
      */
    int timeout;
    /**
    * A pointer to a callback function to be called if the disconnect successfully
    * completes.  Can be set to NULL, in which case no indication of successful
    * completion will be received.
    */
    MQTTAsync_onSuccess* onSuccess;
    /**
    * A pointer to a callback function to be called if the disconnect fails.
    * Can be set to NULL, in which case no indication of unsuccessful
    * completion will be received.
    */
    MQTTAsync_onFailure* onFailure;
    /**
    * A pointer to any application-specific context. The
    * the <i>context</i> pointer is passed to success or failure callback functions to
    * provide access to the context information in the callback.
    */
    void* context;
    /**
     * MQTT V5 input properties
     */
    MQTTProperties properties;
    /**
     * Reason code for MQTTV5 disconnect
     */
    enum MQTTReasonCodes reasonCode;
    /**
    * A pointer to a callback function to be called if the disconnect successfully
    * completes.  Can be set to NULL, in which case no indication of successful
    * completion will be received.
    */
    MQTTAsync_onSuccess5* onSuccess5;
    /**
    * A pointer to a callback function to be called if the disconnect fails.
    * Can be set to NULL, in which case no indication of unsuccessful
    * completion will be received.
    */
    MQTTAsync_onFailure5* onFailure5;
} MQTTAsync_disconnectOptions;

onSuccess;指向成功断开连接时的调用的函数



1.4:MQTTAsync_responseOptions

表示每次操作的响应配置,包括 操作成功,操作失败 等操作;

同类的还有:

  • MQTTAsync_disconnectOptions : 断开连接选项
  • MQTTAsync_connectOptions: 连接选项
  • MQTTAsync_init_options:初始化选项 等等
typedef struct MQTTAsync_responseOptions
{
    /** The eyecatcher for this structure.  Must be MQTR */
    char struct_id[4];
    /** The version number of this structure.  Must be 0 or 1
     *   if 0, no MQTTV5 options */
    int struct_version;
    /**
    * A pointer to a callback function to be called if the API call successfully
    * completes.  Can be set to NULL, in which case no indication of successful
    * completion will be received.
    */
    MQTTAsync_onSuccess* onSuccess;
    /**
    * A pointer to a callback function to be called if the API call fails.
    * Can be set to NULL, in which case no indication of unsuccessful
    * completion will be received.
    */
    MQTTAsync_onFailure* onFailure;
    /**
    * A pointer to any application-specific context. The
    * the <i>context</i> pointer is passed to success or failure callback functions to
    * provide access to the context information in the callback.
    */
    void* context;
    /**
    * A token is returned from the call.  It can be used to track
    * the state of this request, both in the callbacks and in future calls
    * such as ::MQTTAsync_waitForCompletion.
    */
    MQTTAsync_token token;
    /**
    * A pointer to a callback function to be called if the API call successfully
    * completes.  Can be set to NULL, in which case no indication of successful
    * completion will be received.
    */
    MQTTAsync_onSuccess5* onSuccess5;
    /**
    * A pointer to a callback function to be called if the API call successfully
    * completes.  Can be set to NULL, in which case no indication of successful
    * completion will be received.
    */
    MQTTAsync_onFailure5* onFailure5;
    /**
     * MQTT V5 input properties
     */
    MQTTProperties properties;
    /*
     * MQTT V5 subscribe options, when used with subscribe only.
     */
    MQTTSubscribe_options subscribeOptions;
    /*
     * MQTT V5 subscribe option count, when used with subscribeMany only.
     * The number of entries in the subscribe_options_list array.
     */
    int subscribeOptionsCount;
    /*
     * MQTT V5 subscribe option array, when used with subscribeMany only.
     */
    MQTTSubscribe_optio



1.5:MQTTAsync_message

表示MQTT消息的有效负载和属性的结构

/**
 * A structure representing the payload and attributes of an MQTT message. The
 * message topic is not part of this structure (see MQTTAsync_publishMessage(),
 * MQTTAsync_publish(), MQTTAsync_receive(), MQTTAsync_freeMessage()
 * and MQTTAsync_messageArrived()).
 */
typedef struct
{
    /** The eyecatcher for this structure.  must be MQTM. */
    char struct_id[4];
    /** The version number of this structure.  Must be 0 or 1.
     *  0 indicates no message properties */
    int struct_version;
    /** The length of the MQTT message payload in bytes. */
    int payloadlen;
    /** A pointer to the payload of the MQTT message. */
    void* payload;
    /**
     * The quality of service (QoS) assigned to the message.
     * There are three levels of QoS:
     * <DL>
     * <DT><B>QoS0</B></DT>
     * <DD>Fire and forget - the message may not be delivered</DD>
     * <DT><B>QoS1</B></DT>
     * <DD>At least once - the message will be delivered, but may be
     * delivered more than once in some circumstances.</DD>
     * <DT><B>QoS2</B></DT>
     * <DD>Once and one only - the message will be delivered exactly once.</DD>
     * </DL>
     */
    int qos;
    /**
     * The retained flag serves two purposes depending on whether the message
     * it is associated with is being published or received.
     *
     * <b>retained = true</b><br>
     * For messages being published, a true setting indicates that the MQTT
     * server should retain a copy of the message. The message will then be
     * transmitted to new subscribers to a topic that matches the message topic.
     * For subscribers registering a new subscription, the flag being true
     * indicates that the received message is not a new one, but one that has
     * been retained by the MQTT server.
     *
     * <b>retained = false</b> <br>
     * For publishers, this indicates that this message should not be retained
     * by the MQTT server. For subscribers, a false setting indicates this is
     * a normal message, received as a result of it being published to the
     * server.
     */
    int retained;
    /**
      * The dup flag indicates whether or not this message is a duplicate.
      * It is only meaningful when receiving QoS1 messages. When true, the
      * client application should take appropriate action to deal with the
      * duplicate message.
      */
    int dup;
    /** The message identifier is normally reserved for internal use by the
      * MQTT client and server.
      */
    int msgid;
    /**
     * The MQTT V5 properties associated with the message.
     */
    MQTTProperties properties;
} MQTTAsync_message;



2:接口函数



2.1: MQTTAsync_create

创建句柄的语句

DLLExport int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,
        int persistence_type, void* persistence_context);

MQTTAsync* handle:需要创建的句柄指针

const char* serverURI:要连接的服务器路径

const char* clientId: 客户端的ID



2.2: MQTTAsync_setCallbacks

此函数为特定客户端设置全局回调函数。如果您的客户端应用程序不使用特定的回调,设置

相关参数为空。任何必要的消息确认和状态通信在后台处理,不需要任何干预从客户端应用程序。如果你没有设置一个messageArrived回调功能,您将不会被通知收到任何消息作为订阅的结果。

DLLExport int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
                                     MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);



2.3: MQTTAsync_connect

此函数尝试连接之前创建的客户端(参见MQTTAsync_create())到使用指定选项的MQTT服务器。如果你想要启用异步消息和状态通知,你必须调用MQTTAsync_setCallbacks()先于MQTTAsync_connect()

DLLExport int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions* options);



2.4: MQTTAsync_disconnect

此函数尝试断开客户端与MQTT的连接服务器。以便让客户端有时间完成消息处理当调用此函数时,超时时间为指定。当超时时间超过时,客户端甚至断开连接如果仍有未完成的邮件确认

DLLExport int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options);



2.5:MQTTAsync_setConnected

为客户端设置MQTTAsync_connected()回调函数

DLLExport int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* co);

其中回调函数的原型

typedef void MQTTAsync_connected(void* context, char* cause);



2.6:MQTTAsync_subscribe

此函数尝试将客户端订阅到单个主题

DLLExport int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response);



3:示例

订阅端:MQTTAsync_subscribe.c

/*************************************************************************
    > File Name: MQTTAsync_subscribe.c
    > Author: kayshi
    > Mail: kayshi2019@qq.com
    > Created Time: Wed 28 Oct 2020 10:21:13 AM CST
 ************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "MQTTAsync.h"

#define ADDRESS  "tcp://localhost:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC    "MQTT Examples"
#define PAYLOAD  "Hello World!"
#define QOS      1
#define TIMEOUT  10000L

volatile MQTTAsync_token deliveredtoken;

int disc_finished = 0;
int subsribed = 0;
int finished = 0;


void connlost(void *context, char *cause)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    int rc;
    printf("\nConnect lost\n");
    if(cause)
        printf("cause: %s\n", cause);
    printf("Reconneting\n");
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    if((rc = MQTTAsync_connect(client, &conn_opts))!= MQTTASYNC_SUCCESS)
    {
        printf("Failed to start connect, return code %d\n", rc);
        finished = 1;
    }
}

int msgarrvd(void *context, char *topicName, int topiclen, MQTTAsync_message *message)
{
    int i;
    char *payloadptr;
    printf("Message arrived\n");
    printf("topic: %s\n", topicName);
    printf("message:");
    payloadptr = message->payload;
    for(i = 0; i < message->payloadlen; i++)
    {
        putchar(*payloadptr++);
    }
    putchar('\n');
    MQTTAsync_freeMessage(&message);
    MQTTAsync_free(topicName);
    return 1;
}

void onDisconnet(void *context, MQTTAsync_successData *response)
{
    printf("Successful disconnection\n");
    disc_finished  = 1;
}

void onSubscribe(void *context, MQTTAsync_successData *response)
{
    printf("Subsrive successed\n");
    subsribed  = 1;
}

void onSubsribeFailure(void *context, MQTTAsync_failureData *response)
{
    printf("Subscribe failed, rc %d\n", response ? response->code : 0);
    finished  = 1;
}

void onConnectFailure(void *context, MQTTAsync_failureData *response)
{
    printf("Connect failed, rc %d\n", response ? response->code : 0);
    finished  = 1;
}


void onConnect(void *context, MQTTAsync_successData *response)
{
    MQTTAsync client= (MQTTAsync)context;
    MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    int rc;
    printf("Successful connection\n");
    printf("Subscruibing to topic %s\n for client %s\n using Qos %d\n\n" "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
    opts.onSuccess = onSubscribe;
    opts.onFailure = onSubsribeFailure;
    opts.context = client;
    deliveredtoken = 0;
    if((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start subsribe, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }
}

int main(int argc, char* argv[])
{
    int rc;
    int ch;
    MQTTAsync client;

    MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;

    MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
    MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL);

    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.onSuccess = onConnect;
    conn_opts.onFailure = onConnectFailure;
    conn_opts.context = client;
    if((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start connect, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }
    while(!subsribed)
        sleep(3);
    if(finished)
        goto exit;
    do
    {
        ch = getchar();
    } while(ch != 'Q' && ch != 'q');

    disc_opts.onSuccess = onDisconnet;
    if((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start disconnect, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }
    while(!disc_finished)
        sleep(3);

exit:
    MQTTAsync_destroy(&client);
    return rc;

}

发布端:MQTTAsync_publish.c

/*************************************************************************
    > File Name: MQTTAsync_publish/.c
    > Author: kayshi
    > Mail: kayshi2019@qq.com
    > Created Time: Wed 28 Oct 2020 10:21:13 AM CST
 ************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "MQTTAsync.h"


#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World"
#define QOS 1
#define TIMEOUT 10000L

volatile MQTTAsync_token deliveredtoken;

int finished = 0;

void connlost(void *context, char *cause)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    int rc;
    printf("\nConnect lost\n");
    printf("cause: %s\n", cause);
    printf("Reconnecting\n");
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    if((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start connect, return code %d\n", rc);
        finished = 1;
    }
}
void onDisconnect(void *context, MQTTAsync_successData *response)
  printf("Successful disconnection\n");
    finished = 1;
}
void onSend(void *context, MQTTAsync_successData *response)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
    int rc;
    printf("Message with token value %d delivery confirmed\n", response->token);
    opts.onSuccess = onDisconnect;
    opts.context = client;
    if((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start sendMessage, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }

}
void onConnectFailure(void *context, MQTTAsync_failureData *response)
{
    printf("Connect failed, rc %d\n", response? response->code : 0);
    finished = 1;
}
void onConnect(void *context, MQTTAsync_successData *response)
{
    MQTTAsync client = (MQTTAsync)context;
    MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
    MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
    int rc;
    printf("Successful connection\n");
    opts.onSuccess = onSend;
    opts.context = client;

    pubmsg.payload = PAYLOAD;
    pubmsg.payloadlen = (int)strlen(PAYLOAD);
    pubmsg.qos = QOS;
    pubmsg.retained = 0;
    deliveredtoken = 0;

    if((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start sendMessage, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }
}

int main(int argc, char* argv[])
{
    int rc;
    MQTTAsync client;
    MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
    MQTTAsync_setCallbacks(client, NULL, connlost, NULL, NULL);

    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.onSuccess = onConnect;
    conn_opts.onFailure = onConnectFailure;
    conn_opts.context = client;

    if((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
    {
        printf("Failed to start connect, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }

    printf("Waiting for publication of %s \n"
            "on topic %s for client with ClientID %s\n",
            PAYLOAD, TOPIC, CLIENTID);

    while(!finished)
        sleep(4);
    MQTTAsync_destroy(&client);

    return rc;
}

源码路径:

https://download.csdn.net/download/weixin_36209467/13070777



版权声明:本文为weixin_36209467原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。