当前位置: 首页 > 知识库问答 >
问题:

使用Kafka Simple Consumer可以读取多个分区吗?

吴修洁
2023-03-14

使用Kafka Simple Consumer可以读取多个分区吗?简单使用者在以下情况下使用分区:

PartitionMetadata metadata = findLeader(brokers, port, topic, partition);
SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
leadBroker = findNewLeader(leadBroker, topic, partition, port);

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0SimpleConsumer示例

共有2个答案

易雅畅
2023-03-14

一个线程将只从一个分区读取。要从多个分区读取,需要生成多个线程,每个线程将从单个分区读取。您必须在不同的线程中运行它,否则您将失去分区的好处,您的性能将受到影响。

对于starter,您可以在一台机器上运行所有消费者。但最终你将不得不开始使用不同的机器进行消费。此时,您需要确保一个分区只处理一次。具体来说,您需要解决的问题是,两个线程(来自不同的)正试图从同一个分区读取数据。在任何时候,您必须只允许一个人处理它。

此外,您需要管理偏移。你需要定期在zookeeper中冲洗它们。

我建议你使用高级消费者。它比简单消费者更容易使用。它提供访问同一个分区的不同线程之间的协调,并管理自己的偏移量。

何兴邦
2023-03-14

Simple消费者的一个实例从单个分区读取。尽管您可以轻松创建多个Simple消费者实例并按顺序或并行读取不同的分区(从不同的线程)。

棘手的部分是不同机器上的读卡器之间的协调,以便它们不从同一分区读取(假设所有消息只需处理一次)。您需要使用高级使用者或编写类似的自定义代码来实现这一点。

 类似资料:
  • 我们计划编写一个Kafka消费者(java),它读取Kafka队列以执行消息中的操作。

  • 我有一个Kafka系统,看起来像这样(所有消费者都在一个消费者群体中): 在每个消费者中,我轮询消息,然后进行昂贵的计算(从1到60秒)。如果操作成功,我将提交消费者。 在我提交之前,另一个使用者是否会开始处理相同的消息?我需要保证,一旦消息被拾取,它就会被只执行一次 - 除非处理中途失败。

  • /tmp/data/myfile1.csv,/tmp/data/myfile2.csv,/tmp/data.myfile3.csv,/tmp/datamyfile4.csv 我希望将这些文件读入Spark DataFrame或RDD,并且希望每个文件都是DataFrame的一个解析。我怎么能这么做?

  • 我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?

  • 我想使用一个BufferedReader对象从两个或多个文件中读取文本。

  • 问题内容: 我正在使用Angular-Filter的groupBy过滤器。 来自GitHub的示例 : 因此,该示例按 team 对玩家集合进行 分组 。想象一下,每个玩家对象都有一个 年龄 属性: 我想GROUP BY 团队 和 年龄 。我怎样才能做到这一点? 问题答案: 遵循多个字段的示例