将rdkafka数据写入过程分为两部分:
1. 用户程序调用相关接口(rd_kafka_produce)produce数据;
2. rdkafka内部线程异步向kafka服务端发送数据;
1. producer发送流程
1.1 流程图
1.2 流程说明
配置项“queue.buffering.max.messages”决定缓冲上限,默认100000(10W)条数据,每topic一个;
2. rdkafka主流程
2.1 流程图
2.2 说明
- 调用rd_kafka_new时,针对每个broker创建一个子线程,启动该处理流程;
- 配置项“batch.num.messages”决定批量大小,默认1000条数据,每partition单独计数;
- 配置项“queue.buffering.max.ms”决定消息缓存时间,默认1000ms,每partition单独计量;
问题
- 发送数据不能太快,如果写入速度远超过网络发送速度,会导致produce失败,错误信息形如:
[2016-07-26 12:24:30] Failed to produce message: Local: Queue full