KAFKA

  • Post author:
  • Post category:其他


rdkafka接口说明

原创 mfslx 发布于2017-12-26 17:14:45 阅读数 5163 收藏

展开

一、主要数据结构

1、typedef struct rd_kafka_conf_s rd_kafka_conf_t;

rd_kafka_conf_t是kafka的全局配置结构,通过rd_kafka_conf_new()创建,创建时即进行了默认配置,通过rd_kafka_conf_set()设置参数值,是rd_kafka_new()创建kafka处理句柄的第二个参数,是必须创建的结构。

2、typedef struct rd_kafka_topic_conf_s rd_kafka_topic_conf_t;

rd_kafka_topic_conf_t主题配置结构,通过rd_kafka_topic_conf_new()创建,创建时即进行了默认配置,通过rd_kafka_topic_conf_set()设置参数值,是必须创建的结构。

3、typedef struct rd_kafka_s rd_kafka_t;

rd_kafka_t 是kafka 处理句柄结构,分为producer类型和consumer类型,由rd_kafka_new()第一个参数类型决定。rd_kafka_t是一个总体结构,conf和topic_conf都是为此结构服务,其中包含rk_brokers链表,rk_topics链表,是必须创建的结构。

4、typedef struct rd_kafka_topic_partition_list_s rd_kafka_topic_partition_list_t;

rd_kafka_topic_partition_list_t 可扩展长度的 主题-分区 链表,通过rd_kafka_topic_partition_list_new()创建,创建时指定长度,通过rd_kafka_topic_partition_list_add()添加 主题-分区对,用于订阅消息。


二、主要接口

1、rd_kafka_conf_t *rd_kafka_conf_new (void)

参数:无

返回值:rd_kafka_conf_t *

创建一个kafka全局配置结构,并进行默认初始化设置,返回其引用指针。

2、rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,

const char *name,

const char *value,

char *errstr, size_t errstr_size)

参数:

conf:配置结构

vame:配置项名称

value:配置项值

errstr:错误提示

errstr_size:错误提示长度

返回值:rd_kafka_conf_res_t 枚举,错误写入errstr中

name具体的名称及作用见rd_kafka_properties 中_RK_GLOBAL类型的数据定义。调用这个函数后再调用rd_kafka_conf_set_default_topic_conf()会将之前设置的值全部用默认值覆盖掉。因为错误提示会写入errstr中,所以提前给errstr分配512字节空间。

bootstrap.servers

3、rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)

参数:无

返回值:rd_kafka_topic_conf_t *

创建一个主题配置结构,并进行默认初始化设置,返回其引用指针。

4、rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,

const char *name,

const char *value,

char *errstr, size_t errstr_size)

参数:

Conf:主题配置结构

Name:主题配置项名称

Value:主题配置项值

Errstr:错误提示

errstr_size:错误提示长度

返回值:rd_kafka_conf_res_t 枚举,错误写入errstr中

name具体的名称及作用见rd_kafka_properties 中_RK_TOPIC类型的数据定义。因为错误提示会写入errstr中,所以提前给errstr分配512字节空间。

5、rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,

char *errstr, size_t errstr_size)

参数:

Type:RD_KAFKA_PRODUCER是创建生产者类型,RD_KAFKA_CONSUMER是创建消费者类型

Conf:配置结构

Errstr:错误提示

errstr_size:错误提示长度

返回值:

成功:返回rd_kafka_t *kafka操作句柄

失败:返回NULL,并记录错误信息到errstr

程序中先配置conf和topic_conf,然后调用此接口生成操作句柄。对消费者来讲,订阅主题,轮询接收消息。对生产者来讲,根据主题生成主题操作句柄,并通过主题操作句柄发送消息。

6、void rd_kafka_destroy (rd_kafka_t *rk)

参数:

Rk:kafka操作句柄

返回值:无

释放创建的kafka操作句柄。

6、rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic,

rd_kafka_topic_conf_t *conf)

参数:

Rk:kafka操作句柄

Topic:主题内容

Conf:主题配置

返回值:

成功:返回rd_kafka_topic_t * 主题操作句柄

失败:返回NULL,记录错误信息到errstr

此接口一般是生产者使用,使用此接口生成的主题操作句柄进行发送消息。

7、void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)

参数:

app_rkt:主题操作句柄

返回值:无

释放创建的主题操作句柄

8、int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,

int msgflags,

void *payload, size_t len,

const void *key, size_t keylen,

void *msg_opaque)

参数:

Rkt:主题操作句柄

Partition:分区号

Msgflags:消息标志,使用RD_KAFKA_MSG_F_COPY标志

Payload:消息体指针

Len:消息体长度

Key:消息选项key值,用作平衡分区,计算分区号的,填NULL

Keylen:key长度,填0

msg_opaque:是作为回调函数的参数,填NULL

10、int rd_kafka_poll(rd_kafka_t *rk, int timeout_ms)

参数:

Rk:kafka操作句柄

timeout_ms:毫秒级时间

返回值:处理的事件数

发送完消息后调用此接口,timeout_ms是毫秒级的时间,函数会阻塞timeout_ms 毫秒等待事件处理,调用设置的回调函数。timeout_ms为0是非阻塞状态。

11、rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)

参数:

Rk:kafka操作句柄

timeout_ms:毫秒级时间

返回值:

成功:RD_KAFKA_RESP_ERR_NO_ERROR

失败:RD_KAFKA_RESP_ERR__TIMED_OUT

在摧毁生产者之前调用此接口,确保正在排队和正在进行的消息被处理完成。此函数会调用rd_kafka_poll()并触发回调。

12、int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)

参数:

Rk:kafka操作句柄

Brokerlist:broker字符串 如:”172.20.51.38:9092” 不写端口,则采用默认端口9092

多个broker  brokerlist = “broker1:10000,broker2”

返回值:成功添加的broker个数

添加一个broker也可以通过 设置rd_kafka_conf_t结构中的 “bootstrap.servers” 配置项

rd_kafka_conf_set(conf, “bootstrap.servers”, brokers, errstr, sizeof(errstr))

13、rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk)

参数:

Rk:kafka操作句柄

返回值:rd_kafka_resp_err_t 枚举

将消息重定向到了消费者队列,可以使用rd_kafka_consumer_poll()进行取消息。

14、rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size)

参数:size是topic结构的个数

返回值:无

————————————————

版权声明:本文为CSDN博主「mfslx」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/icando777/article/details/78904487

==============================================================================================================


c语言使用librdkafka库实现kafka的生产和消费实例(转)

关于librdkafka库的介绍,可以参考

kafka的c/c++高性能客户端librdkafka简介

,本文使用librdkafka库来进行kafka的简单的生产、消费



一、producer

librdkafka进行kafka生产操作的大致步骤如下:



1、创建kafka配置

// rd_kafka_conf_t是kafka的全局配置结构,通过rd_kafka_conf_new()创建

  1. rd_kafka_conf_t *rd_kafka_conf_new (void)



2、配置kafka各项参数


[cpp]


view plain


copy

//通过rd_kafka_conf_set()设置参数值

  1. rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
  2. const char *name,
  3. const char *value,
  4. char *errstr, size_t errstr_size)



3、设置发送回调函数


[cpp]


view plain


copy

  1. void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,
  2. void (*dr_msg_cb) (rd_kafka_t *rk,
  3. const rd_kafka_message_t *
  4. rkmessage,
  5. void *opaque))



4、创建producer实例


[cpp]


view plain


copy

//是rd_kafka_new()创建kafka处理句柄的第二个参数,是必须创建的结构。

  1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)



5、实例化topic


[cpp]


view plain


copy

  1. rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)



6、异步调用将消息发送到指定的topic


[cpp]


view plain


copy

  1. int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
  2. int msgflags,
  3. void *payload, size_t len,
  4. const void *key, size_t keylen,
  5. void *msg_opaque)



7、阻塞等待消息发送完成


[cpp]


view plain


copy

  1. int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)



8、等待完成producer请求完成


[cpp]


view plain


copy

  1. rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)



9、销毁topic


[cpp]


view plain


copy

  1. void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)



10、销毁producer实例


[cpp]


view plain


copy

  1. void rd_kafka_destroy (rd_kafka_t *rk)


完整代码如下my_producer.c:


[cpp]


view plain


copy

  1. #include <stdio.h>
  2. #include <signal.h>
  3. #include <string.h>
  4. #include “../src/rdkafka.h”
  5. static int run = 1;
  6. static void stop(int sig){
  7. run = 0;
  8. fclose(stdin);
  9. }
  10. /*
  11. 每条消息调用一次该回调函数,说明消息是传递成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR)
  12. 还是传递失败(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)
  13. 该回调函数由rd_kafka_poll()触发,在应用程序的线程上执行
  14. */
  15. static void dr_msg_cb(rd_kafka_t *rk,
  16. const rd_kafka_message_t *rkmessage, void *opaque){
  17. if(rkmessage->err)
  18. fprintf(stderr, “%% Message delivery failed: %s\n”,
  19. rd_kafka_err2str(rkmessage->err));
  20. else
  21. fprintf(stderr,
  22. “%% Message delivered (%zd bytes, ”
  23. “partition %”PRId32″)\n”,
  24. rkmessage->len, rkmessage->partition);
  25. /* rkmessage被librdkafka自动销毁*/
  26. }
  27. int main(int argc, char **argv){
  28. rd_kafka_t *rk;            /*Producer instance handle*/
  29. rd_kafka_topic_t *rkt;     /*topic对象*/
  30. rd_kafka_conf_t *conf;     /*临时配置对象*/
  31. char errstr[512];
  32. char buf[512];
  33. const char *brokers;
  34. const char *topic;
  35. if(argc != 3){
  36. fprintf(stderr, “%% Usage: %s <broker> <topic>\n”, argv[0]);
  37. return 1;
  38. }
  39. brokers = argv[1];
  40. topic = argv[2];
  41. /* 创建一个kafka配置占位 */
  42. conf = rd_kafka_conf_new();
  43. /*创建broker集群*/
  44. if (rd_kafka_conf_set(conf, “bootstrap.servers”, brokers, errstr,
  45. sizeof(errstr)) != RD_KAFKA_CONF_OK){
  46. fprintf(stderr, “%s\n”, errstr);
  47. return 1;
  48. }
  49. /*设置发送报告回调函数,rd_kafka_produce()接收的每条消息都会调用一次该回调函数
  50. *应用程序需要定期调用rd_kafka_poll()来服务排队的发送报告回调函数*/
  51. rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
  52. /*创建producer实例
  53. rd_kafka_new()获取conf对象的所有权,应用程序在此调用之后不得再次引用它*/
  54. rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
  55. if(!rk){
  56. fprintf(stderr, “%% Failed to create new producer:%s\n”, errstr);
  57. return 1;
  58. }
  59. /*实例化一个或多个topics(`rd_kafka_topic_t`)来提供生产或消费,topic
  60. 对象保存topic特定的配置,并在内部填充所有可用分区和leader brokers,*/
  61. rkt = rd_kafka_topic_new(rk, topic, NULL);
  62. if (!rkt){
  63. fprintf(stderr, “%% Failed to create topic object: %s\n”,
  64. rd_kafka_err2str(rd_kafka_last_error()));
  65. rd_kafka_destroy(rk);
  66. return 1;
  67. }
  68. /*用于中断的信号*/
  69. signal(SIGINT, stop);
  70. fprintf(stderr,
  71. “%% Type some text and hit enter to produce message\n”
  72. “%% Or just hit enter to only serve delivery reports\n”
  73. “%% Press Ctrl-C or Ctrl-D to exit\n”);
  74. while(run && fgets(buf, sizeof(buf), stdin)){
  75. size_t len = strlen(buf);
  76. if(buf[len-1] == ‘\n’)
  77. buf[–len] = ‘\0’;
  78. if(len == 0){
  79. /*轮询用于事件的kafka handle,
  80. 事件将导致应用程序提供的回调函数被调用
  81. 第二个参数是最大阻塞时间,如果设为0,将会是非阻塞的调用*/
  82. rd_kafka_poll(rk, 0);
  83. continue;
  84. }
  85. retry:
  86. /*Send/Produce message.
  87. 这是一个异步调用,在成功的情况下,只会将消息排入内部producer队列,
  88. 对broker的实际传递尝试由后台线程处理,之前注册的传递回调函数(dr_msg_cb)
  89. 用于在消息传递成功或失败时向应用程序发回信号*/
  90. if (rd_kafka_produce(
  91. /* Topic object */
  92. rkt,
  93. /*使用内置的分区来选择分区*/
  94. RD_KAFKA_PARTITION_UA,
  95. /*生成payload的副本*/
  96. RD_KAFKA_MSG_F_COPY,
  97. /*消息体和长度*/
  98. buf, len,
  99. /*可选键及其长度*/
  100. NULL, 0,
  101. NULL) == -1){
  102. fprintf(stderr,
  103. “%% Failed to produce to topic %s: %s\n”,
  104. rd_kafka_topic_name(rkt),
  105. rd_kafka_err2str(rd_kafka_last_error()));
  106. if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){
  107. /*如果内部队列满,等待消息传输完成并retry,
  108. 内部队列表示要发送的消息和已发送或失败的消息,
  109. 内部队列受限于queue.buffering.max.messages配置项*/
  110. rd_kafka_poll(rk, 1000);
  111. goto retry;
  112. }
  113. }else{
  114. fprintf(stderr, “%% Enqueued message (%zd bytes) for topic %s\n”,
  115. len, rd_kafka_topic_name(rkt));
  116. }
  117. /*producer应用程序应不断地通过以频繁的间隔调用rd_kafka_poll()来为
  118. 传送报告队列提供服务。在没有生成消息以确定先前生成的消息已发送了其
  119. 发送报告回调函数(和其他注册过的回调函数)期间,要确保rd_kafka_poll()
  120. 仍然被调用*/
  121. rd_kafka_poll(rk, 0);
  122. }
  123. fprintf(stderr, “%% Flushing final message.. \n”);
  124. /*rd_kafka_flush是rd_kafka_poll()的抽象化,
  125. 等待所有未完成的produce请求完成,通常在销毁producer实例前完成
  126. 以确保所有排列中和正在传输的produce请求在销毁前完成*/
  127. rd_kafka_flush(rk, 10*1000);
  128. /* Destroy topic object */
  129. rd_kafka_topic_destroy(rkt);
  130. /* Destroy the producer instance */
  131. rd_kafka_destroy(rk);
  132. return 0;
  133. }



二、consumer

librdkafka进行kafka消费操作的大致步骤如下:



1、创建kafka配置


[cpp]


view plain


copy

  1. rd_kafka_conf_t *rd_kafka_conf_new (void)



2、创建kafka topic的配置


[cpp]


view plain


copy

  1. rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)



3、配置kafka各项参数


[cpp]


view plain


copy

  1. rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
  2. const char *name,
  3. const char *value,
  4. char *errstr, size_t errstr_size)



4、配置kafka topic各项参数


[cpp]


view plain


copy

  1. rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,
  2. const char *name,
  3. const char *value,
  4. char *errstr, size_t errstr_size)



5、创建consumer实例


[cpp]


view plain


copy

  1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)



6、为consumer实例添加brokerlist


[cpp]


view plain


copy

  1. int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)



7、开启consumer订阅


[cpp]


view plain


copy

  1. rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)



8、轮询消息或事件,并调用回调函数


[cpp]


view plain


copy

  1. rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)



9、关闭consumer实例


[cpp]


view plain


copy

  1. rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)



10、释放topic list资源


[cpp]


view plain


copy

  1. rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)



11、销毁consumer实例


[cpp]


view plain


copy

  1. void rd_kafka_destroy (rd_kafka_t *rk)



12、等待consumer对象的销毁


[cpp]


view plain


copy

  1. int rd_kafka_wait_destroyed (int timeout_ms)

完整代码如下my_consumer.c


[cpp]


view plain


copy

  1. #include <string.h>
  2. #include <stdlib.h>
  3. #include <syslog.h>
  4. #include <signal.h>
  5. #include <error.h>
  6. #include <getopt.h>
  7. #include “../src/rdkafka.h”
  8. static int run = 1;
  9. //`rd_kafka_t`自带一个可选的配置API,如果没有调用API,Librdkafka将会使用CONFIGURATION.md中的默认配置。
  10. static rd_kafka_t *rk;
  11. static rd_kafka_topic_partition_list_t *topics;
  12. static void stop (int sig) {
  13. if (!run)
  14. exit(1);
  15. run = 0;
  16. fclose(stdin); /* abort fgets() */
  17. }
  18. static void sig_usr1 (int sig) {
  19. rd_kafka_dump(stdout, rk);
  20. }
  21. /**
  22. * 处理并打印已消费的消息
  23. */
  24. static void msg_consume (rd_kafka_message_t *rkmessage,
  25. void *opaque) {
  26. if (rkmessage->err) {
  27. if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
  28. fprintf(stderr,
  29. “%% Consumer reached end of %s [%”PRId32″] ”
  30. “message queue at offset %”PRId64″\n”,
  31. rd_kafka_topic_name(rkmessage->rkt),
  32. rkmessage->partition, rkmessage->offset);
  33. return;
  34. }
  35. if (rkmessage->rkt)
  36. fprintf(stderr, “%% Consume error for ”
  37. “topic \”%s\” [%”PRId32″] ”
  38. “offset %”PRId64″: %s\n”,
  39. rd_kafka_topic_name(rkmessage->rkt),
  40. rkmessage->partition,
  41. rkmessage->offset,
  42. rd_kafka_message_errstr(rkmessage));
  43. else
  44. fprintf(stderr, “%% Consumer error: %s: %s\n”,
  45. rd_kafka_err2str(rkmessage->err),
  46. rd_kafka_message_errstr(rkmessage));
  47. if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
  48. rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
  49. run = 0;
  50. return;
  51. }
  52. fprintf(stdout, “%% Message (topic %s [%”PRId32″], ”
  53. “offset %”PRId64″, %zd bytes):\n”,
  54. rd_kafka_topic_name(rkmessage->rkt),
  55. rkmessage->partition,
  56. rkmessage->offset, rkmessage->len);
  57. if (rkmessage->key_len) {
  58. printf(“Key: %.*s\n”,
  59. (int)rkmessage->key_len, (char *)rkmessage->key);
  60. }
  61. printf(“%.*s\n”,
  62. (int)rkmessage->len, (char *)rkmessage->payload);
  63. }
  64. /*
  65. init all configuration of kafka
  66. */
  67. int initKafka(char *brokers, char *group,char *topic){
  68. rd_kafka_conf_t *conf;
  69. rd_kafka_topic_conf_t *topic_conf;
  70. rd_kafka_resp_err_t err;
  71. char tmp[16];
  72. char errstr[512];
  73. /* Kafka configuration */
  74. conf = rd_kafka_conf_new();
  75. //quick termination
  76. snprintf(tmp, sizeof(tmp), “%i”, SIGIO);
  77. rd_kafka_conf_set(conf, “internal.termination.signal”, tmp, NULL, 0);
  78. //topic configuration
  79. topic_conf = rd_kafka_topic_conf_new();
  80. /* Consumer groups require a group id */
  81. if (!group)
  82. group = “rdkafka_consumer_example”;
  83. if (rd_kafka_conf_set(conf, “group.id”, group,
  84. errstr, sizeof(errstr)) !=
  85. RD_KAFKA_CONF_OK) {
  86. fprintf(stderr, “%% %s\n”, errstr);
  87. return -1;
  88. }
  89. /* Consumer groups always use broker based offset storage */
  90. if (rd_kafka_topic_conf_set(topic_conf, “offset.store.method”,
  91. “broker”,
  92. errstr, sizeof(errstr)) !=
  93. RD_KAFKA_CONF_OK) {
  94. fprintf(stderr, “%% %s\n”, errstr);
  95. return -1;
  96. }
  97. /* Set default topic config for pattern-matched topics. */
  98. rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
  99. //实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态
  100. rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
  101. if(!rk){
  102. fprintf(stderr, “%% Failed to create new consumer:%s\n”, errstr);
  103. return -1;
  104. }
  105. //Librdkafka需要至少一个brokers的初始化list
  106. if (rd_kafka_brokers_add(rk, brokers) == 0){
  107. fprintf(stderr, “%% No valid brokers specified\n”);
  108. return -1;
  109. }
  110. //重定向 rd_kafka_poll()队列到consumer_poll()队列
  111. rd_kafka_poll_set_consumer(rk);
  112. //创建一个Topic+Partition的存储空间(list/vector)
  113. topics = rd_kafka_topic_partition_list_new(1);
  114. //把Topic+Partition加入list
  115. rd_kafka_topic_partition_list_add(topics, topic, -1);
  116. //开启consumer订阅,匹配的topic将被添加到订阅列表中
  117. if((err = rd_kafka_subscribe(rk, topics))){
  118. fprintf(stderr, “%% Failed to start consuming topics: %s\n”, rd_kafka_err2str(err));
  119. return -1;
  120. }
  121. return 1;
  122. }
  123. int main(int argc, char **argv){
  124. char *brokers = “localhost:9092”;
  125. char *group = NULL;
  126. char *topic = NULL;
  127. int opt;
  128. rd_kafka_resp_err_t err;
  129. while ((opt = getopt(argc, argv, “g:b:t:qd:eX:As:DO”)) != -1){
  130. switch (opt) {
  131. case ‘b’:
  132. brokers = optarg;
  133. break;
  134. case ‘g’:
  135. group = optarg;
  136. break;
  137. case ‘t’:
  138. topic = optarg;
  139. break;
  140. default:
  141. break;
  142. }
  143. }
  144. signal(SIGINT, stop);
  145. signal(SIGUSR1, sig_usr1);
  146. if(!initKafka(brokers, group, topic)){
  147. fprintf(stderr, “kafka server initialize error\n”);
  148. }else{
  149. while(run){
  150. rd_kafka_message_t *rkmessage;
  151. /*-轮询消费者的消息或事件,最多阻塞timeout_ms
  152. -应用程序应该定期调用consumer_poll(),即使没有预期的消息,以服务
  153. 所有排队等待的回调函数,当注册过rebalance_cb,该操作尤为重要,
  154. 因为它需要被正确地调用和处理以同步内部消费者状态 */
  155. rkmessage = rd_kafka_consumer_poll(rk, 1000);
  156. if(rkmessage){
  157. msg_consume(rkmessage, NULL);
  158. /*释放rkmessage的资源,并把所有权还给rdkafka*/
  159. rd_kafka_message_destroy(rkmessage);
  160. }
  161. }
  162. }
  163. done:
  164. /*此调用将会阻塞,直到consumer撤销其分配,调用rebalance_cb(如果已设置),
  165. commit offset到broker,并离开consumer group
  166. 最大阻塞时间被设置为session.timeout.ms
  167. */
  168. err = rd_kafka_consumer_close(rk);
  169. if(err){
  170. fprintf(stderr, “%% Failed to close consumer: %s\n”, rd_kafka_err2str(err));
  171. }else{
  172. fprintf(stderr, “%% Consumer closed\n”);
  173. }
  174. //释放topics list使用的所有资源和它自己
  175. rd_kafka_topic_partition_list_destroy(topics);
  176. //destroy kafka handle
  177. rd_kafka_destroy(rk);
  178. run = 5;
  179. //等待所有rd_kafka_t对象销毁,所有kafka对象被销毁,返回0,超时返回-1
  180. while(run– > 0 && rd_kafka_wait_destroyed(1000) == -1){
  181. printf(“Waiting for librdkafka to decommission\n”);
  182. }
  183. if(run <= 0){
  184. //dump rdkafka内部状态到stdout流
  185. rd_kafka_dump(stdout, rk);
  186. }
  187. return 0;
  188. }


在linux下编译producer和consumer的代码:


[cpp]


view plain


copy

  1. gcc my_producer.c -o my_producer  -lrdkafka -lz -lpthread -lrt
  2. gcc my_consumer.c -o my_consumer  -lrdkafka -lz -lpthread -lrt

在运行my_producer或my_consumer时可能会报错”error while loading shared libraries xxx.so”, 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录


在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test_topic”的topic

启动方式可参考

kafka0.8.2集群的环境搭建并实现基本的生产消费

启动consumer:

启动producer,并发送一条数据“hello world”:


consumer处成功收到producer发送的“hello world”:

http://orchome.com/5

https://github.com/edenhill/librdkafka

https://github.com/mfontanini/cppkafka

https://github.com/zengyuxing007/kafka_test_cpp

aliyun活动 https://www.aliyun.com/acts/limit-buy?userCode=re2o7acl



版权声明:本文为lusic01原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。