使用C/C++语言操作Kafka时,librdkafka是首选的开源库
#include "librdkafka/rdkafka.h"
// 声明消费者实例
rd_kafka_t *rk;
// 临时配置对象
rd_kafka_conf_t *conf;
conf = rd_kafka_conf_new();
//消费者会话组保持活动心跳间隔
if (rd_kafka_conf_set(conf, "heartbeat.interval.ms", "3000", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
//自动提交偏移
if (rd_kafka_conf_set(conf, "auto.commit.enable", "true", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
//自动重置偏移
if (rd_kafka_conf_set(conf, "auto.offset.reset", "latest", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
// 创建消费者实例
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
#include "librdkafka/rdkafka.h"
// 声明生产者实例
rd_kafka_t *rk;
// 临时配置对象
rd_kafka_conf_t *conf;
conf = rd_kafka_conf_new();
// 设置长连接
if(rd_kafka_conf_set(conf, "socket.keepalive.enable", "true", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
rd_kafka_conf_destroy(conf);
return false;
}
// 创建生产者实例
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));