librdkafka 是 Apache Kafka 的 C/C++ 开发包,提供 生产者、消费者 和 管理客户端。
设计理念是可靠以及高性能的消息传输,当前可支持每秒超过100万的消息生产和300万每秒的消息消费。
官方README 文档对librdkafka的介绍:
“librdkafka — the Apache Kafka c/C++ client library”
librdkafka/INTRODUCTION.md
https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md
librdkafka/examples/
https://github.com/edenhill/librdkafka/tree/master/examples
Usage:
使用时,需要在源程序中包含包含 "rdkafka.h"
头文件
创建Conf配置实例,用于填充用户指定的各配置项:
//namespace RdKafka;
//brief Create configuration object:
//RdKafka::Conf ---> 配置接口类,用来设置对生产者、消费者、broker的各配置项的值
static Conf *create(ConfType type);
enum ConfType {
CONF_GLOBAL, //Global configuration
CONF_TOPIC //Topic specific configuration
};
使用举例:
RdKafka::Conf *m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(m_config == nullptr) {
std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
}
RdKafka::Conf *m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(m_config == nullptr) {
std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
}
Conf类中的多个set成员函数,用于对不同的配置项进行赋值:
class Conf {
public:
virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
virtual Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
virtual Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
virtual Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
//...
//...
};
enum ConfResult {
CONF_UNKNOWN = -2, //Unknown configuration property
CONF_INVALID = -1, //Invalid configuration value
CONF_OK = 0 //Configuration property was succesfully set
};
使用举例:
RdKafka::Conf::ConfResult result;
std::string error_str;
RdKafka::Conf *m_config;
//设置 "booststrap::servers" 配置项:
result = m_config->set("bootstrap.servers", "127.0.0.`:9092", error_str);
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Global Conf set 'booststrap.servers' failed: " << error_str << std::endl;
}
//设置 "event_cb" 配置项:
RdKafka::EventCb* m_event_cb = new ProducerEventCb;
result = m_config->set("event_cb", m_event_cb, error_str);
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Global Conf set 'event_cb' failed: " << error_str << std::endl;
}
创建Producer生产者客户端:
class Producer : public virtual Handle {
public:
static Producer *create(Conf *conf, std::string &errstr);
};
使用举例:
RdKfka::Producer *m_producer;
m_producer = RdKafka::Producer::create(m_config, error_str);
if(m_producer == nullptr) {
std::cout << "Create Topic failed: " << error_str << std::endl;
}
创建Topic主题对象:
class Topic {
public:
static Topic *create(Handle *base, const std::string &tipic_str, const Conf *conf, std::string &errstr);
};
使用举例:
RdKafka::Topic *m_topic;
m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, error_str);
if(m_topic == nullptr) {
std::cout << "Create Topic failed: " << error_str << std::endl;
}
class Producer : public virtual Handle {
public:
virtual ErrorCode produce(Topic *topic, int32_t partition, int msgflags,
void *payload, size_t len, const std::string *key, void *msg_opaque);
virtual ErrorCode produce();
};
//Use RdKafka::err2str() to translate an error code a human readable string
enum ErrorCode {
//Internal errors to rdkafka:
ERR_BEGIN = -200, //Begin internal error codes
ERR_BAD_MSG = -199, //Received message is incorrect
//...
ERR_END = -100, //End interval error codes
//Kafka broker errors:
ERROR_UNKNOWN = -1, //Unknown broker error
ERROR_NO_ERROR = 0, //Success
//...
};
使用举例:
RdKafka::ErrorCode error_code = m_producer->produce(m_topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
payload, len, key, NULL);
m_producer->pool(0); //poll()参数为0意味着不阻塞,poll(0)主要是为了触发应用程序提供的回调函数
if(error_code != ERROR_NO_ERROR) {
std::cerr << "Produce failed: " << RdKafka::err2str(error_code) << std::endl;
if(error_code == ERR_QUEUE_FULL) {
m_producer->poll(1000); //如果发送失败的原始是队列正满,则阻塞等待一段时间
}
else if(error_code == ERR_MSG_SIZE_TOO_LARGE) {
//如果消息过大超过了max_size,则需要对消息做裁剪后重新发送
}
else {
std::cerr << "ERR_UNKNOWN_PARTITION or ERR_UNKNOWN_TOPIC" << std::endl;
}
}
创建Consumer消费者客户端:
class KafkaConsumer : public virtual Handle {
public:
static KafkaConsumer *create(const Conf *conf, std::string &errstr);
};
使用举例:
RdKafka::KafkaConsumer *m_consumer;
m_consumer = RdKafka::KafkaConsumer::create(m_config, error_str);
if(m_consumer == nullptr) {
std::cout << "Create KafkaConsumer failed: " << error_str << std::endl;
}
Consumer消费者订阅Topic主题:
class KafkaConsumer : public virtual Handle {
public:
virtual ErrorCode subscribe(const std::vector<std::string> &topics);
};
使用举例:
std::vector<std::string> topics;
topics.push_back(topic_str);
RdKafka::ErrorCode error_code = m_consumer->subscribe(topics);
if(error_code != ERROR_NO_ERROR) {
std::cerr << "Consumer Subscribe Topics Failed: " << RdKafka::err2str(error_code) << std::endl;
}
Consumer消费者拉取消息进行消费:
class KafkaConsumer : public virtual Handle {
public
virtual Message *consume(int timeout_ms);
};
使用举例:
RdKafka::Message *m_message = m_consumer->consume(5000); //若超过 5000ms 未订阅到消息,则触发 RdKafka::ERR_TIMED_OUT
#include "producer_kafka.h"
using namespace std;
int main() {
KafkaProducer producer("127.0.0.1:9092", "topic-demo", 0);
sleep(5);
for(int i = 0; i < 10; i++) {
char msg[64] = {0};
sprintf(msg, "%s%4d", "Hello Kafka ", i); //msg = "Hello Kafka 0001";
char key[8] = {0};
sprintf(key, "%d", i); //key = "1";
producer.pushMessage(msg, key);
}
KafkaProducer::wait_destroyed(50000);
return 0;
}
#ifndef __KAFKAPRODUCER_H_
#define __KAFKAPRODUCER_H_
#include <string>
#include <iostream>
#include "rdkafkacpp.h"
class KafkaProducer {
public:
explicit KafkaProducer(const std::string& brokers, const std::string& topic, int partition); //epplicit:禁止隐式转换,例如不能通过string的构造函数转换出一个broker
~KafkaProducer();
void pushMessage(const std::string& msg, const std::string& key);
protected:
std::string m_brokers;
std::string m_topicStr;
int m_partition;
RdKafka::Conf* m_config; //RdKafka::Conf --- 配置接口类,用来设置对 生产者、消费者、broker的各项配置值
RdKafka::Conf* m_topicConfig;
RdKafka::Producer* m_producer;
RdKafka::Topic* m_topic;
RdKafka::DeliveryReportCb* m_dr_cb; //RdKafka::DeliveryReportCb 用于在调用 RdKafka::Producer::produce() 后返回发送结果,RdKafka::DeliveryReportCb是一个类,需要自行填充其中的回调函数及处理返回结果的方式
RdKafka::EventCb* m_event_cb; //RdKafka::EventCb 用于从librdkafka向应用程序传递errors,statistics,logs 等信息的通用接口
RdKafka::PartitionCb* m_partitioner_cb; //Rdkafka::PartitionCb 用于设定自定义分区器
};
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) { //重载基类RdKafka::DeliveryReportCb中的虚函数dr_cb()
if(message.err() != 0) { //发送出错
std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
} else { //发送成功
std::cerr << "Message delivered to topic: " << message.topic_name()
<< " [" << message.partition()
<< "] at offset " << message.offset() << std::endl;
}
}
};
/*
class DeliveryReportCb { //Kafka将会在produce()之后返回发送结果时调用 DeliveryReportCb::dr_cb(), 并将结果填充到message中
public:
virtual void dr_cb(Message &message) = 0; //Message::err() 中保存返回结果
virtual ~DeliveryReportCb() {}
};
返回的发送结果 Message 中包含的信息:
class Message {
public:
virtual std::string errstr() const = 0; //返回错误原因
virtual ErrorCode err() const = 0; //返回错误码,ErrorCode是enum枚举类型
virtual Topic *topic() const = 0; //返回生产的消息所发送到的主题,返回值类型为 RdKafka::Topic 类对象
virtual std::string topic_name() const = 0; //返回生产的消息所发送到的主题名
virtual int32_t partition() const = 0; //返回生产的消息所发送到的分区号
virtual void *payload() const = 0; //消息内容
virtual size_t len() const = 0; //消息长度
virtual const std::string *key() const = 0; //消息的key
virtual int64_t offset() const = 0; //消息偏移量
//...
};
enum ErrorCode {
ERR_BEGIN = -200,
//...
ERR_UNKNOWN = -1,
ERR_NO_ERROR = 0, //SUCCESS
ERR_OFFSET_OUT_OF_RANGE = 1,
//...
};
*/
class ProducerEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event &event) {
switch(event.type()) {
case RdKafka::EVENT::EVENT_ERROR:
std::cout << "RdKafka::EVENT::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
break;
case RdKafka::EVENT::EVENT_STATS:
std::cout << "RdKafka::EVENT::EVENT_STATS: " << events.str() << std::endl;
break;
case RdKafka::EVENT::EVENT_LOG:
std::cout << "RdKafka::EVENT::EVENT_LOG: " << events.fac() << std::endl;
break;
case RdKafka::EVENT::EVENT_THROTTLE:
std::cout << "RdKafka::EVENT::EVENT_THROTTLE: " << event.broker_name() << std::endl;
break;
}
}
}
/*
EventCb 是用于从librdkafka向应用程序返回errors,statistics, logs等信息的通用接口:
class EventCb {
public:
virtual void event_cb(Event &event) = 0; //提供给应用程序去自定义重载的event_cb函数
virtual ~EventCb(){ }
};
class Event {
public:
enum Type {
EVENT_ERROR,
EVNET_STATS,
EVENT_LOG,
EVENT_THROTTLE //Event is a throttle level signaling from the broker, 紧急事件?
};
enum Severity { //表示Event事件的严重级别
EVENT_SEVERITY_EMERGE = 0,
EVENT_SEVERITY_ALERT = 1,
EVENT_SEVERITY_CRITICAL = 2,
EVENT_SEVERITY_ERROR = 3,
EVENT_SEVERITY_WARNING = 4,
EVENT_SEVERITY_NOTICE = 5,
EVENT_SEVERITY_INFO = 6,
EVENT_SEVERITY_DEBUG = 7,
};
virtual Type type() const = 0; //返回Event事件类型
virtual ErrorCode err() const = 0; //错误码
virtual Severity severity() const = 0; //return log serverity level, 返回log的严重级别
virtual std::string fac() const = 0;
virtual std::string broker_name() const = 0;
virtual std::string broker_id() const = 0;
virtual vool fatal() const = 0; //bool值,返回这是否是一个fatal级错误
//...
};
*/
class HashPartitionerCb : public RdKafka::PartitionerCb { //自定义生产者分区器,作用就是返回一个分区id。 对key计算Hash值,得到待发送的分区号(其实这跟默认的分区器计算方式是一样的)
public:
int32_t partitioner_cb( const Topic *topic, const std::string *key,
int32_t partition_cnt, void *msg_opaque)
{
char msg[128] = {0};
sprintf(smg, "HashPartitionCb:[%s][%s][%d]", topic->name().c_str(), key->c_str(), partition_cnt);
std::cout << msg << std::endl;
//前面的操作只是为了在分区器回调中打印出一行打印,分区器真正的操作是在下面generate_hash,生成一个待发送的分区ID
return generate_hash(key->c_str(), key->size()) % partition_cnt;
}
private:
static inline unsigned int generate_hash(const char *str, size_t len) {
unsigned int hash = 5381;
for (size_t i = 0; i < len; i++) {
hash = ( (hash << 5) + hash ) + str[i];
}
return hash; //返回值必须在 0 到 partition_cnt 之间。如果出错则发回 PARTITION_UA(-1)
}
};
/*
class PartitionerCb {
public:
virtual int32_t partitioner_cb( const Topic *topic,
const std::string *key,
int32_t partition_cnt,
void *msg_opaque) = 0; //自定义分区器的接口,需要指定发送的主题、key、分区数、opaque
virtual ~PartitionerCb() { }
};
注意:
用户实现的派生类重载partitioner_cb() 这个函数后,也是要提供给Kafka去调用的,其中参数 partition_cnt 并非由Producer指定,而是Kafka根据Topic创建时的信息去查询,
且Kafka上的Topic创建也不是由Producer生产者客户端创建的,目前已知的方法只有使用 kafka-topics.sh 脚本这一种方法。
关于“创建主题”的描述:
-- 如果broker端配置参数 auto.create.topics.enable 设置为true(默认值为true),
那 么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为1)、副本因为为 default.replication.factor(默认值为1)的主题。
-- 除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数 num.partitions 和 default.replication.factor的值创建一个相应主题。
*/
#endif
/*
总结:
1. KafkaProducer构造函数:初始化一个“生产者客户端”,要指定三个参数:(1)生产者所要连往的Kafka地址,是zookeeper的ip和port;(2)生产者后续生产的消息所要发送的Topic主题;(3)消息所要发往的分区?
2. RdKafka::Conf 是配置接口类,用来设置生产者、消费者、broker的各项配置值:
namespace RdKafka {
RdKafka::Conf {
enum ConfType {
CONF_GLOBAL, //Global Configuration , 全局级别的配置 (broker级,所有主题都一样)
CONF_TOPIC //Topic specfic configuration , Topic主题级别的专用配置(每个主题都不一样)
};
enum ConfResult { //用于保存 RdKafka::Conf::set() 函数的返回值
CONF_UNKNOWN = -2, //Unknown configuration property, set() 传入的配置项不合法(Kafka中没有定义对应的配置项)
CONF_INVALID = -1, //Invalid configuration value, set() 传入的配置项的值不正确
CONF_OK = 0 //Configuration property was successfully set, set() 设置成功
};
static Conf *create(ConfType type); //创建一个Conf实例,传入ConfType参数,返回Conf*实例的指针,是static静态函数
virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr); //配置name指定的配置项的值
//eg: RdKafka::Conf::ConfResult errCode = m_config->set("dr_cb", m_dr_cb, errorstr);
};
};
Q: 为什么要创建 RdKafka::Conf(GLOBAL/TOPIC)、RdKafka::Producer、RdKafka::Topic ?
A: 这就跟socket() 函数一样,创建一个sockfd句柄,是为了通知内核创建相关的实例,sockfd是与内核中的实例相关联的句柄,内核需要维护不同的实例来区分不同的连接。
同理,Kafka服务器也需要区分不同的生产者,需要区分不同的生产者的差异化配置。因此需要应用程序创建一个Producer客户端实例(m_producer句柄),将自己想要的配置填充好(m_conf),
再将m_producer句柄和m_conf配置属性都传递给Kafka服务端,Kafka在内部创建并维护这个生产者实例,当生产者往Kafka的broker上推送消息时,Kafka就可以根据预先配置好的属性对其进行相应的处理。
*/
#include "producer_kafka.h"
//("192.168.0.105:9092", "topic_demo", 0)
KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition) {
m_brokers = brokers;
m_topicStr = topic;
m_partition = partition;
//先填充构造生产者客户端的参数配置:
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(m_config == nullptr) {
std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
}
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(m_topicConfig == nullptr) {
std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
}
//下面开始配置各种需要的配置项:
RdKafka::Conf::ConfResult result;
std::string error_str;
result = m_config->set("booststrap.servers", m_brokers, error_str); //设置生产者待发送服务器的地址: "ip:port" 格式
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Global Conf set 'booststrap.servers' failed: " << error_str << std::endl;
}
result = m_config->set("statistics.interval.ms", "10000", error_str);
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Global Conf set ‘statistics.interval.ms’ failed: " << error_str << std::endl;
}
result = m_config->set("message.max.bytes", "10240000", error_str); //设置发送端发送的最大字节数,如果发送的消息过大则返回失败
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Global Conf set 'message.max.bytes' failed: " << error_str << std::endl;
}
m_dr_cb = new ProducerDeliveryReportCb;
result = m_config->set("dr_cb", m_dr_cb, error_str); //设置每个消息发送后的发送结果回调
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Global Conf set ‘dr_cb’ failed: " << error_str << std::endl;
}
m_event_cb = new ProducerEventCb;
result = m_config->set("event_cb", m_event_cb, error_str);
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Global Conf set ‘event_cb’ failed: " << error_str << std::endl;
}
m_partitioner_cb = new HashPartitionerCb;
result = m_topicConfig->set("partitioner_cb", m_partitioner_cb, error_str); //设置自定义分区器
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Topic Conf set ‘partitioner_cb’ failed: " << error_str << std::endl;
}
//创建Producer生产者客户端:
m_producer = RdKafka::Producer::create(m_config, error_str); //RdKafka::Producer::create(const RdKafka::Conf *conf, std::string &errstr);
if(m_producer == nullptr) {
std::cout << "Create Producer failed: " << error_str << std::endl;
}
//创建Topic对象,后续produce发送消息时需要使用
m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, error_str); //RdKafka::Topic::create(Hanle *base, const std::string &topic_str, const Conf *conf, std::string &errstr);
if(m_topic == nullptr) {
std::cout << "Create Topic failed: " << error_str << std::endl;
}
}
/*
另外几个关键的参数:
partition.assignment.strategy : range,roundrobin 消费者客户端partition分配策略,当被选举为leader时,分配partition分区给组员消费者的策略
*/
void KafkaProducer::pushMessage(const std::string& msg, const std::string& key) {
int32_t len = str.length();
void *payload = const_cast<void*>(static_cast<const void*>(str.data()));
RdKafka::ErrorCode error_code = m_producer->prodce( m_topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
payload, len, key, NULL);
m_producer->poll(0); //poll()参数为0意味着不阻塞;poll(0)主要是为了触发应用程序提供的回调函数
if(error_code != ERR_NO_ERROR) {
std::cerr << "Produce failed: " << RdKafka::err2str(error_code) << std::endl;
if(error_code == ERR_QUEUE_FULL) {
m_producer->poll(1000); //如果发送失败的原因是队列正满,则阻塞等待一段时间
} else if(error_code == ERR_MSG_SIZE_TOO_LARGE) {
//如果发送消息过大,超过了max.size,则需要裁减后重新发送
} else {
std::cerr << "ERR_UNKNOWN_PARTITION or ERR_UNKNOWN_TOPIC" << std::endl;
}
}
}
/*
RdKafka::ErrorCode RdKafka::Produce::produce( RdKafka::Topic *topic, int32_t partition, //生产消息发往的主题、分区
int msgflags, //RK_MSG_COPY(拷贝payload)、 RK_MSG_FREE(发送后释放payload内容)、 RK_MSG_BLOCK(在消息队列满时阻塞produce函数)
void *payload, size_t len, //消息净荷、长度
const string &key, void *msg_opaque); //key; opaque是可选的应用程序提供给每条消息的opaque指针,opaque指针会在dr_cb回调函数内提供,在Kafka内部维护
返回值:
ERR_NO_ERROR : 消息成功入队
ERR_QUEUE_FULL : 队列满
ERR_MSG_SIZE_TOO_LARGE : 消息长度过大
ERR_UNKNOWN_PARTITION : 所指定的分区不存在
ERR_UNKNOWN_TOPIC : 所指定的主题不存在
int RdKafka::Producer::poll (int timeout_ms);
阻塞等待生产消息发送完成,
poll另外的重要作用是:
(1)轮询处理指定的Kafka句柄的Event(m_producer上的Event事件,在本例中处理事件的方式是进行打印);
(2)触发应用程序提供的回调函数调用,例如 ProducerDeliveryReportCb 等回调函数都需要poll()进行触发。
*/
KafkaProducer::~KafkaProducer() {
while(m_producer->outq_len() > 0) { //当 Handle->outq_len() 客户端的“出队列” 的长度大于0
std::cerr << "Waiting for: " << m_producer->outq_len() << std::endl;
m_producer->flush(5000);
}
delete m_config;
delete m_topicConfig;
delete m_topic;
delete m_producer;
delete m_dr_cb;
delete m_event_cb;
delete m_partitioner_cb;
}
/*
-- flush():
ErrorCode Kafka::Producer::flush(int timeout_ms);
flush会优先调用poll(),去触发生产者提前注册的各种回调函数,然后等待生产者上的所有消息全部发送完毕。
-- outq_len: “出队列”长度,是 Handle 类的一个成员,
表示 生产者队列中中待发送到broker上的数据,或 消费者队列中待发送到broker上的ACK。
Handle是Producer和Consumer类的基类,表示“客户端的句柄”:
class Producer : public virtual Handle { }
class Consumer : public virtual Handle { }
class KafkaConsumer : public virtual Handle { }
*/
#include "kafka_consumer.h"
int main()
{
std::string brokers = "127.0.0.1:9092";
std::vector<std::string> topics; //待消费主题的集合
topics.push_back("topic-demo");
std::string group = "consumer-group-demo"; //消费组
KafkaConsumer consumer(brokers, group, topics, RdKafka::Topic::OFFSET_BEGINNING);
consumer.pullMessage();
RdKafka::wait_destroyed(5000);
return 0;
}
/*
在生产者/消费者 客户端 连接 broker 时,填充“bootstrap.server” 参数:
Q: 为什么只设置了一个broker的地址(port = 9092),如果Kafka集群中有多个broker,且生产者/消费者订阅的Topic横跨多个broker时,生产者是如何知道其他broker的?
A: bootstrap.server 参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为:
host1:port1, host2:port2
可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为 " "。
【注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker信息。】
不过建议至少要设置两个以上的broker地址信息,当其中任意一个宕机时,生产者仍然可以连接到Kafka集群上。
*/
#ifndef __KAFKACONSUMER_H_
#define __KAFKACONSUMER_H_
#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>
#include "rdkafkacpp.h"
class KafkaConsumer {
public:
explicit KafkaConsumer(const std::string& brokers, const std::string& groupID,
const std::vector<std::string>& topics, int partition);
~KafkaConsumer();
void pullMessage();
protected:
std::string m_brokers;
std::string m_groupId;
std::vector<std::string> m_topicVector; //一个消费者可以同时订阅多个主题,所有用vector
int m_partition;
RdKafka::Conf* m_config; //GLOBAL 级别的配置(Consumer客户端级别)
RdKafka::Conf* m_topicConfig; //TOPIC 级别的配置
RdKafka::KafkaConsumer* m_consumer; //消费者客户端实例
RdKafka::EventCb* m_event_cb; //Event事件回调
RdKafka::RebalanceCb* m_rebalance_cb; //再均衡 回调
};
class ConsumerEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
if (event.fatal()) //判断是否为FATAL错误
std::cerr << "FATAL ";
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str());
break;
case RdKafka::Event::EVENT_THROTTLE:
std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " << event.broker_name() << " id " << (int)event.broker_id() << std::endl;
break;
default:
std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl;
break;
}
}
};
class ConsumerRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb( RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) //Kafka服务端通过 err参数传入再均衡的具体事件(发生前、发生后),通过partitions参数传入再均衡 前/后,旧的/新的 分区信息
{
std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": " << printTopicPartition(partitions);
if(err == RdKafka::ERR__ASSIGN_PARTITIONS) { //ERR__ASSIGN_PARTITIONS: 表示“再均衡发生之后,消费者开始消费之前”,此时消费者客户端可以从broker上重新加载offset
consumer->assign(partitions); //再均衡后,重新 assign() 订阅这些分区
partition_count = (int)partitions.size();
} else if(err == RdKafka::ERR__REVOKE_PARTITIONS) { //ERR__REVOKE_PARTITIONS: 表示“消费者停止消费之后,再均衡发生之前”,此时应用程序可以在这里提交 offset
consumer->unassign(); //再均衡前,unassign() 退订这些分区
partition_count = 0; //退订所有分区后,清0
} else {
std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
}
}
private:
static void printTopicPartition(const std::vector<RdKafka::TopicPartition*> &partitions) { //打印出所有的主题、分区信息
for(unsigned int i = 0 ; i < partitions.size() ; i++) {
std::cerr << partitions[i]->topic() << "[" << partitions[i]->partition() << "], ";
}
std::cerr << "\n";
}
private:
int partition_count; //保存consumer消费者客户端 当前订阅的分区数
};
/*
class RdKafka::RebalanceCb {
public:
virtual void rebalance_cb( RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
std::vector<TopicPartition*> &partitions);
virtual ~RebalanceCb() { }
};
注意参数 vector<TopicPartition*> &partitions; 中 元素的类型是 TopicPartiton:
class TopicPartitionImpl {
string topic;
int partition_;
};
同时包括 主题 和 分区信息,所以 consumer.assign(); 订阅分区的方式是包括不同主题的不同分区的集合。
*/
#endif
#include "kafka_consumer.h"
KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupId,
const std::vector<std::string>& topics, int partition)
{
m_brokers = borker;
m_groupId = groupId;
m_topicVector = topics;
m_partition = partition;
//创建Conf实例:
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(m_config == nullptr) {
std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
}
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(m_topicConfig == nullptr) {
std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
}
//设置Conf的各个配置参数:
RdKafka::Conf::ConfResult result;
std::string error_str;
result = m_config->set("bootstrap.servers", m_brokers, error_str);
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Conf set 'bootstrap.servers' failed: " << error_str << std::endl;
}
result = m_config->set("group.id", m_groupId, error_str); //设置消费组名:group.id(string类型)
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Conf set 'group.id' failed: " << error_str << std::endl;
}
result = m_config->set("max.partition.fetch.bytes", "1024000", error_str); //消费消息的最大大小
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Conf set 'max.partition.fetch.bytes' failed: " << error_str << std::endl;
}
result = m_config->set("enable.partition.eof", "false", error_str); //enable.partition.eof: 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件,默认值 true
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Conf set 'enable.partition.eof' failed: " << error_str << std::endl;
}
m_event_cb = new ConsumerEventCb;
result = m_config->set("event_cb", m_event_cb, error_str);
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Conf set 'event_cb' failed: " << error_str << std::endl;
}
m_reblance_cb = new ConsumerRebalanceCb;
result = m_config->set("rebalance_cb", m_reblance_cb, error_str);
if(result != RdKafka::Conf::CONF_OK) {
std::cout << "Conf set 'rebalance_cb' failed: " << error_str << std::endl;
}
//设置 topic_conf的配置项:
result = m_topicConfig->set("auto.offset.reset", "latest", error_str);
if(result != RdKafka::Conf::CONF_OK)
{
std::cout << "Topic Conf set 'auto.offset.reset' failed: " << error_str << std::endl;
}
result = m_config->set("default_topic_conf", m_topicConfig, error_str);
if(result != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set 'default_topic_conf' failed: " << error_str << std::endl;
}
//创建消费者客户端:
m_consumer = RdKafka::KafkaConsumer::create(m_config, error_str);
if(m_consumer == nullptr) {
std::cout << "Create KafkaConsumer failed: " << error_str << std::endl;
}
std::cout << "Create KafkaConsumer succeed, consumer name : " << m_consumer->name() << std::endl;
}
void RdKafkaConsumer::pullMessage() {
}