librdkafka 是一个C实现的高性能 Apache Kafka 客户端,为生产环境提供了一个可靠和高性能的客户端。
librdkafka 同样也提供了传统的 C++ 接口。
以下目录适用于本文
librdkafka 是一个基于现代硬件设计的多线程库, 并且试图保持最少的内存拷贝。
如果应用程序愿意,生产和消费消息的载体可以不通过任何拷贝实现让消息大小不受限制。
librdkafka 同样适用于高吞吐还是低延时的场景,都可以通过属性配置接口来满足。
下面是两个对于性能调节非常重要的属性:
接下来的性能测试数据受限于如下配置:
log.flush.interval.messages=10000000
log.flush.interval.ms=100000
examples
子目录下 rdkafka_performance
程序。测试结果
Test1: 2 brokers, 2 partitions, required.acks=2, 100 byte messages:
850000 messages/second, 85 MB/second
Test2: 1 broker, 1 partition, required.acks=0, 100 byte messages:
710000 messages/second, 71 MB/second
Test3: 2 broker2, 2 partitions, required.acks=2, 100 byte messages,
snappy compression:
300000 messages/second, 30 MB/second
Test4: 2 broker2, 2 partitions, required.acks=2, 100 byte messages,
gzip compression:
230000 messages/second, 23 MB/second
提示: 要了解命令的执行等,请参考本文最后的 测试详情 章节。
提示: 消费者的性能测试将会尽快公布。
高吞吐的关键是消息的批量处理,等待本地队列累计一定数量的消息,然后用一个大的消息集或批量发送到对端。
通过这种方式补偿通讯开销和消除往返时延(RTT)的不利影响。
默认配置 batch.num.messages=1000 和 queue.buffering.max.ms=1000 适用于高吞吐量场景。
这样配置允许 librdkafka 等待 1000 ms 让本地队列累计 1000 条消息,然后发送累计的消息到 broker。
这些配置是全局的 (rd_kafka_conf_t
),但是基于每一个 topic+partition 上应用。
当要求发送低延时,“queue.buffering.max.ms”应该调整到尽可能适用于生产侧低延时。
设置“queue.buffering.max.ms” 为 1 来确保消息尽可能快的发送。
你可以查看如何降低消息延时来获取更多细节。
生产者消息压缩通过配置“compression.codec”属性生效。
压缩通过本地队列批量处理消息实现,批量越大越可能获得更高的压缩率。
本地批量队列大小通过“batch.num.messages”和“queue.buffering.max.ms”配置属性控制,已经在前面的高吞吐章节描述过了。
消息可靠性是 librdkafka 的一个重要因素。
通过指定的配置(“request.required.acks”和“message.send.max.retries”,等)应用程序完全可以信赖 librdkafka 来分发消息。
如果 topic 配置属性“request.required.acks”设置为等待来自 broker 的消息提交确认(非0,详见CONFIGURATION.md
),librdkafka 将会等待消直到所有期望的 ack 都收到,并优雅的处理下列事件:
这些事件都是 librdkafka 自动处理的。对与上面的事件,应用程序无须做任何处理。
失败消息将会重新发送“message.send.max.retries”次,然后返回失败报告给应用程序。
发送失败报告回调函数用于 librdkafka 给应用程序发送消息返回的状态信号,每个消息发送后的消息状态报告都会调用一次回调函数:
error_code
是非零,那么消息发送失败,error_code 表示失败类型(rd_kafka_resp_err_t
枚举)。error_code
是零,那么消息发送成功。发送报告回调函数的更多使用详情查看生产者 API。
发送报告回调函数是可配置的。
librdkafka API 记录在rdkafka.h
头文件中,配置属性记录在CONFIGURATION.md
中。
应用程序需要初始化一个顶层对象(rd_kafka_t
)的基础容器,用于全局配置和共享状态。
通过调用rd_kafka_new()
创建。
还需要实例化一个或多个 topic(rd_kafka_topic_t
)用于生产或消费。
topic 对象保存 topic 级别的属性,并且维护一个映射,
该映射保存所有可用 partition 和他们的领导 broker 。
通过调用rd_kafka_topic_new()
创建。
rd_kafka_t
和 rd_kafka_topic_t
都源于可选的配置 API。
不使用该 API 将导致 librdkafka 使用列在文档CONFIGURATION.md
中的默认配置。
提示:一个应用程序可以创建多个rd_kafka_t
对象,它们不共享状态。
提示:一个rd_kafka_topic_t
对象只能用于一个rd_kafka_t
对象的创建。
为了与官方的 Apache Kafka 软件一致和降低学习门槛,
librdkafka 使用了和 Apache Kafka 官方客户端完全一致的配置属性。
在创建对象之前,通过rd_kafka_conf_set()
和 rd_kafka_topic_conf_set()
API
来应用配置。
提示:一旦通过rd_kafka.._new()
使用过,rd_kafka.._conf_t
对象不能再重复使用。
调用rd_kafka.._new()
后,应用程序不需要释放任何配置资源。
示例
rd_kafka_conf_t *conf;
char errstr[512];
conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "compression.codec", "snappy", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "batch.num.messages", "100", errstr, sizeof(errstr));
rd_kafka_new(RD_KAFKA_PRODUCER, conf);
为了完全利用的现代硬件,librdkafka 本身支持多线程。
API 是完全线程安全。在任何时间和其任何线程中,应用程序都可以调用任何 API 函数。
一个基于调查的 API 被用于向应用程序返回信号。
应用程序需要定期调用rd_kafka_poll()。
这个调查 API 将调用以下配置的回调函数(可选):
可选的回调函数不是通过调查出发,它们可能被任何线程调用:
librdkafka 只需要初始化一个 broker 列表(至少一个)来调用 broker 引导。
librdkafka 会连接所有列在“metadata.broker.list”属性中或调用rd_kafka_brokers_add()
添加的 broker 引导,并且查询列表中每一的元数据信息,包括 broker、topic、partition 和 kafka 集群的领导者。
Broker 名字类似于“host[:port]”,其中端口是可选的(默认 9092),host是可用的主机名或IPv4、IPv6地址。
如果是一个复杂地址,librdkafka会循环地址尝试连接。
一个 DNS 记录用于包含所有可用于引导的 broker 地址。
Apache Kafka broker 版本 0.10.0 新增了一个 ApiVersionRequest API,允许客户端查询 broker 支持的 API 版本。
librdkafka 支持这个特性,会查询每一个 broker 获取该信息(如果api.version.request=true
),根据该信息生效或失效各种特性,如MessageVersion 1 (timestamps), KafkaConsumer等。
如果 broker 没有对 librdkafka 的 ApiVersionRequest 请求作出正确响应,会认为 broker 版本太老不支持该 API,并回退到老版本 broker 的 API。回退的版本可配置到 librdkafka 中,通过broker.version.fallback
属性控制。
通过RD_KAFKA_PRODUCER
类型设置好rd_kafka_t
对象后,一个或多个rd_kafka_topic_t
对象就准备好接收消息,并组装和发送到 broker。
rd_kafka_produce()
函数接受如下参数:
rkt
- 生产的 topic,之前通过rd_kafka_topic_new()
生成partition
- 生产的 partition。如果设置为RD_KAFKA_PARTITION_UA
(未赋值的),需要通过配置分区函数去选择一个确定 partition。msgflags
- 0 或下面的值:
RD_KAFKA_MSG_F_COPY
- librdkafka 将立即从 payload 做一份拷贝。如果 payload 是不稳定存储,如栈,需要使用这个参数。RD_KAFKA_MSG_F_FREE
- 当 payload 使用完后,让 librdkafka 使用free(3)
释放。这两个标志互斥,如果都不设置,payload 既不会被拷贝也不会被 librdkafka 释放。
如果RD_KAFKA_MSG_F_COPY
标志不设置,就会有数据拷贝,librdkafka 将占用 payload 指针直到消息被发送或失败。
librdkafka 处理完消息后,会调用发送报告回调函数,让应用程序重新获取 payload 的所有权。
如果设置了RD_KAFKA_MSG_F_FREE
,应用程序就不要在发送报告回调函数中释放 payload。
payload
,len
- 消息 payloadkey
,keylen
- 可选的消息键,用于分区。将会用于 topic 分区回调函数,如果有,会附加到消息中发送给 broker。msg_opaque
- 可选的,应用程序为每个消息提供的无类型指针,提供给消息发送回调函数,用于应用程序引用。rd_kafka_produce()
是一个非阻塞 API,该函数会将消息塞入一个内部队列并立即返回。
如果队列中的消息数超过queue.buffering.max.messages
属性配置的值,rd_kafka_produce()
通过返回 -1,并在ENOBUFS
中设置错误码来反馈错误。
提示: 见 examples/rdkafka_performance.c
获取生产者的使用。
提示:要获取 high-level KafkaConsumer 接口,查看 rd_kafka_subscribe (rdkafka.h) 或 KafkaConsumer (rdkafkacpp.h)
消费者 API 比生产者 API 更复杂。
通过RD_KAFKA_CONSUMER
类型创建rd_kafka_t
对象并实例化rd_kafka_topic_t
后,应用程序还需要调用rd_kafka_consume_start()
指定partition。
rd_kafka_consume_start()
参数:
rkt
- 要消费的 topic ,之前通过rd_kafka_topic_new()
创建。partition
- 要消费的 partition。offset
- 消费开始的消息偏移量。可以是绝对的值或两中特殊的偏移量:
RD_KAFKA_OFFSET_BEGINNING
从该 partition 的队列的最开始消费(最老的消息)。RD_KAFKA_OFFSET_END
从该 partition 产生的下一个消息开始消费。RD_KAFKA_OFFSET_STORED
使用偏移量存储。当一个 topic+partition 消费者被启动,librdkafka 不断尝试从 broker 批量获取消息来保持本地队列有queued.min.messages
数量的消息。
本地消息队列通过 3 个不同的消费 API 向应用程序提供服务:
rd_kafka_consume()
- 消费一个消息rd_kafka_consume_batch()
- 消费一个或多个消息rd_kafka_consume_callback()
- 消费本地队列中的所有消息,且每一个都调用回调函数这三个 API 的性能按照升序排列,rd_kafka_consume()
最慢,rd_kafka_consume_callback()
最快。不同的类型满足不同的应用需要。
被上述函数消费的消息通过rd_kafka_message_t
类型返回。
rd_kafka_message_t
的成员:
* err
- 返回给应用程序的错误信号。如果该值不是零,payload
字段应该是一个错误的消息,err
是一个错误码(rd_kafka_resp_err_t
)。
* rkt
,partition
- 消息的 topic 和 partition 或错误。
* payload
,len
- 消息的数据或错误的消息 (err!=0)。
* key
,key_len
- 可选的消息键,生产者指定。
* offset
- 消息偏移量。
不管是payload
和key
的内存,还是整个消息,都由 librdkafka 所拥有,且在rd_kafka_message_destroy()
被调用后不要使用。
librdkafka 为了避免消息集的多余拷贝,会为所有从内存缓存中接收的消息共享同一个消息集,这意味着如果应用程序保留单个rd_kafka_message_t
,将会阻止内存释放并用于同一个消息集的其他消息。
当应用程序从一个 topic+partition 中消费完消息,应该调用rd_kafka_consume_stop()
来结束消费。该函数同时会清空当前本地队列中的所有消息。
提示: 见 examples/rdkafka_performance.c
获取消费者的使用。
偏移量管理
当 broker 版本大于等于 0.9.0 时,基于 broker 的偏移量管理可通过使用 high-level KafkaConsumer 接口(见rdkafka.h 或 rdkafkacpp.h)实现。
偏移量管理同样也可以通过本地偏移量文件存储,按每一个topic+partition,偏移量定时写入本地文件。通过下列 topic 属性配置:
auto.commit.enable
auto.commit.interval.ms
offset.store.path
offset.store.sync.interval.ms
当前还不支持 zookeeper 的偏移量管理。
消费者组
基于消费者组的 broker 是支持的(要求 Apache Kafka broker >=0.9)。见 rdkafka.h 或 rdkafkacpp.h 中的 KafkaConsumer。
Topic 自动创建
librdkafka 支持自动创建 topic。broker 需要配置auto.create.topics.enable=true
。