当前位置: 首页 > 知识库问答 >
问题:

Kafka有批量消费者吗?

田曜瑞
2023-03-14

高级使用者 API 似乎一次读取一条消息。

如果消费者想要处理这些消息并提交给其他下游消费者(如Solr或Elastic-Search ),这可能会给他们带来很大的问题,因为他们更喜欢批量接收消息,而不是一次接收一条。

在内存中批处理这些消息也并非易事,因为只有当批处理已经提交时,Kafka中的偏移量也需要同步,否则具有未提交下游消息的崩溃的 kafka 使用者(如在Solr或ES中)将已经更新其偏移量,因此消息松散。

如果使用者在下游提交消息后但在更新消息偏移量之前崩溃,则该使用者可能会多次使用消息。

如果 Kafka 批量使用消息,那么一些指向代码/文档的指针将不胜感激。

谢谢!

共有1个答案

轩辕成天
2023-03-14

我不知道批量消费者。但即使有一个,你的主要问题仍然存在。您希望在成功转发数据后提交偏移量。实现此目的的一种方法是通过设置属性 auto.commit.enable = false 来关闭使用者的自动提交。当然,权衡是您必须注意何时提交偏移量。

在此处查找使用者属性的完整文档:https://kafka.apache.org/documentation.html#consumerconfigs

关于如何手动提交从java文档中窃取的偏移量的一个很好的示例(https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html):

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "false");
 props.put("auto.commit.interval.ms", "1000");
 props.put("session.timeout.ms", "30000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("foo", "bar"));
 final int minBatchSize = 200;
 List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }
 类似资料:
  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我在kafka文档中读到:kafka还有一个命令行使用者,它将把消息转储到标准输出。 bin/kafka-console-consumer.sh--zookeeper localhost:2181--topic test--从头开始 我想知道如果我想要使用消息并将它们推送到另一个输出,应该向上面的命令添加哪些选项。kafka-console-consumer没有--help选项,我找不到任何参数,

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我有一个多分区主题,由多个使用者(同一组)使用。我的目标是最大化消费处理,即任何消费者都可以消费来自任何分区的消息。 我知道这看起来是不可能的,因为只有一个消费者可以从一个分区中消费。 有没有可能使用REST代理来实现这一点?例如,轮询所有代理消费者实例。 谢了。