当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者调查行为

杨良平
2023-03-14

关于KafkaConsumer(>=0.9),我在尝试实现满足自己需求的解决方案时遇到了一些严重的问题。

假设我有一个函数,它只能从一个Kafka主题中读取n条消息。

例如:getmsgs(5)-->获取主题中的下5条kafka消息。

所以,我有一个类似这样的循环。用实际正确的参数编辑。在本例中,使用者的max.poll.Records参数设置为1,因此实际循环只循环一次。不同的消费者(他们中的一些人迭代了许多消息)共享一个抽象的父亲(这一个),这就是为什么它是这样编码的。nummss部分是这个使用者专用的。

for (boolean exit= false;!exit;)
{
   Records = consumer.poll(config.pollTime);
   for (Record r:records) 
   {
       processRecord(r); //do my things
       numMss++;
       if (numMss==maximum) //maximum=5
       {   
          exit=true;
          break;
       }
   }
}
    consumer.commitSync(Collections.singletonMap(partition,
    new OffsetAndMetadata(record.offset() + 1)));

提前感谢!!

共有1个答案

韩朝斑
2023-03-14

您可以将max.poll.records设置为您喜欢的任何数字,这样在每次轮询中最多可以获得如此多的记录。

对于您在这个问题中陈述的用例,您不必自己显式地提交偏移量。您只需将enable.auto.commit设置为true,并将auto.offset.reset设置为aresty,这样,当没有使用者group.id时(换句话说,当您第一次开始从分区读取时),它就会启动。一旦在Kafka中存储了group.id和一些使用者偏移量,并且如果Kafka使用者进程死亡,它将从上次提交的偏移量继续,因为这是默认行为,因为当使用者启动时,它将首先查找是否有任何提交的偏移量,如果有,将从上次提交的偏移量继续,并且auto.offset.reset不会起作用。

 类似资料:
  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我目前正在探索Kafka,作为一个简单问题的初学者。 将有一个生产者向一个主题推送消息,但将有n个spark应用程序的消费者从kafka发送消息并插入到数据库中(每个消费者插入到不同的表中)。 是否有可能消费者会不同步(例如消费者的某些部分会停机很长一段时间),然后一个或多个消费者不会处理消息并插入到表中? 假设代码总是正确的,在按摩数据时不会出现异常。重要的是每条消息只处理一次。 我的问题是,K

  • 我想在远程位置检查Kafka消费者的连接。 可以确定是否将使用者分配给分区。 在远程位置,我可以从Kafka代理获得有关该主题的详细信息。 但是消费者能否保证消费者能够收到消费者与主题分区匹配的消息?

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认