我正在使用Kafka Producer和RoundRobin分区器来处理一个有12个分区的主题。
代码可在此处找到https://github.com/apache/kafka/blob/2.8/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java
我面临的问题是,这个分区程序让分区正确地(以循环方式)发送特定消息,但在Kafka生产者代码中,分区方法在第931行和第956行(在新批的if块内)被调用了两次,因此某些分区没有发送给它们的记录,我无法实现我想要的12的并行性。我试过以下方法。我使用与roundrobin分区器相同的逻辑有效地编写了一个自定义分区器,唯一的区别是,如果在分区器上调用newBatch方法后调用了分区方法,那么将返回之前返回的分区no。我有点担心在生产中使用它,而不理解为什么Kafka生产商代码是上面指定的方式,如果有人能解释一下,我会非常感激。此外,如果有人提出任何建议,我可以通过这些建议确保记录在每个分区上均匀分布,我也愿意接受他们。
Kafka制作人代码-https://github.com/apache/kafka/blob/2.8/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
提前谢谢你。
您应该使用Default分区器而不是RoundRobin分区器。
对DefaultPartitioner的评论。java说:
默认分区策略:
有关粘性分区的详细信息,请参阅KIP-480。
没有为生产者记录提供指定的分区号或分区键,那么粘性分区将起作用,这近似地使Kafka分区上的记录均匀。请参阅技巧2:了解producer API中的新粘性分区器
再次,我想解释为什么RoundRobinPartitioner总是不以通常认为的循环方式工作。RoundRobinPartitioner中的“partition()”只能确保调用partition()方法的数量分布是以循环方式进行的,这不足以确保分区上的记录是均匀的。
请注意,在KafkaProducer中调用partition()。doSend()很奇怪(可能连续两次调用partition())。
当分区数为偶数时,此处的细微代码可能会导致分区分布不均匀。假设我们有4个分区(0、1、2、3)和8条记录。
record 1 -> 2 **partition()** call(return 0, return 1), finally assigned to partition 1
record 2 -> 2 **partition()** call(return 2, return 3), finally assigned to partition 3
record 3 -> 2 **partition()** call(return 0, return 1), finally assigned to partition 1
record 4 -> 2 **partition()** call(return 2, return 3), finally assigned to partition 3
record 5 -> 2 **partition()** call(return 0, return 1), finally assigned to partition 1
....
看见记录将只分发到分区1和3!
RoundRobinPartitioner有一个令人困惑的名称,它提供了调用partition()的循环,而不是KafkaProducer的循环。发送()。要确保记录在所有分区中均匀分布,请使用DefaultPartitioner!
我们在AWS上运行16个节点kafka集群,每个节点是m4. xLargeEC2实例,具有2TB EBS(ST1)磁盘。Kafka版本0.10.1.0,目前我们有大约100个主题。一些繁忙的话题每天会有大约20亿个事件,一些低量的话题每天只有数千个。 我们的大多数主题在生成消息时使用UUID作为分区键,因此分区分布相当均匀。 我们有相当多的消费者使用消费群体从这个集群消费。每个使用者都有一个唯一的
我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制
0.1-0.2:********** 0.2-0.3:******** 0.3-0.4:********* 0.5-0.6:********* 0.6-0.7:********* 0.7-0.8:********* 0.4-0.5:********* 0.5-0.6:********* 0.6-0.7:********* 0.1-0.2:********* 0.2-0.3:********* 0.
我有一个有几个消费者的消费群体。每个使用者被分配到一组分区。消费者何时轮询选择了已使用分区的消息?它是在消费者端完成的,还是Kafka服务器决定使用哪个分区? 我的一些分区有很多消息,但有些分区没有或几乎没有。但我仍然需要我的消费者平等地使用分配给它的每个分区。因此,我需要我的消费者快速遍历分区,最好从每个分配的分区轮询x条消息。 我在用https://github.com/appsignal/r
我可以使用这个问题中的技巧来强制初始分区和最终分区之间的关系,但是Spark不知道每个原始分区的所有内容都将转移到一个特定的新分区。因此,它不能优化掉洗牌,而且它的运行速度比慢得多。