当前位置: 首页 > 知识库问答 >
问题:

Kafka手工抵销管理问题

司马奇希
2023-03-14

在执行手工抵销管理时,我遇到了以下问题:(使用0.9)

为了手动管理偏移量,对于每个消耗的记录,我检索记录的当前偏移量并提交新的偏移量(currentOffset+1,因为偏移量重置策略是“最新的”)。

当创建新的使用者组时,它没有显式的偏移量(偏移量是“未知”的),因此,如果它在停止之前没有使用来自所有现有分区的消息,那么它将只为部分分区(使用者从中获取消息的分区)提交偏移量,而其余分区的偏移量仍然是“未知”的。

当使用者再次启动时,它只获得在其关闭时产生的一些消息(只获得来自具有提交偏移量的分区的消息),来自具有“未知”偏移量的分区的消息将丢失,并且由于偏移量重置策略而永远不会被使用。

由于在我的例子中,一旦创建了使用者组就会错过任何消息,这是不可接受的,所以我希望在开始使用之前为每个分区显式提交一个偏移量。

为此,我找到了两个选择:

  1. 使用低级使用者发送偏移请求。
  2. 使用高级使用者,调用consumer.poll(0)(以触发分配),然后调用consumer.assignment(),并对每个TopicPartition调用consumer.committed(TopicPartition);Consumer.SeekToEnd(topicPartition);position(topicPartition)并最终提交所有偏移量。

两者都比我想象的要复杂和嘈杂(我希望有一个更简单的API,我可以用来获取分配给使用者的所有分区的日志结束位置)。

希望有更好的执行办法的任何想法或想法。

10倍。

共有1个答案

蓝昊天
2023-03-14

使用使用者API完全取决于您在哪里提交偏移。

  • 如果您的偏移量存储在Kafka broker中,那么您肯定应该使用高级使用者API,它将为您提供对偏移量的更多控制。
  • 如果您在zookeeper中保留偏移量,则可以使用任何旧的使用者API,如

list >streams=consumer.CreateMessageStreamsByFilter(新建白名单(topicRegex),1) stream

 类似资料:
  • 我有一个Kafka的题目有1个分区。如果它有100条消息,偏移量将从0.99开始。 根据Kafka保留策略,在指定的时间之后,所有的消息都将被清除。 并且我正在发送100个新的消息到主题,一旦所有已经被清除(在保留期之后)。现在,消息的新偏移量从哪里开始呢?是从100还是从0?? 我想知道新的偏移是100-199还是0-99?

  • 根据我的理解,消费者阅读特定主题的消息,并且消费者客户机将定期提交偏移量。 因此,如果由于某种原因,使用者失败了一个特定的消息,该偏移量将不会被提交,然后您可以返回并重新处理该消息。 是否有任何东西跟踪您刚刚消耗的偏移和您随后提交的偏移?

  • 但是,consumer只从主题中第一个未提交的消息开始轮询。我希望总是从偏移量0开始,不管提交的消息是什么。使用Alpakka消费者,如何手动指定偏移量?

  • 本文向大家介绍python实现手机销售管理系统,包括了python实现手机销售管理系统的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了python实现手机销售管理系统的具体代码,供大家参考,具体内容如下 要求如下: 手机销售系统     手机品牌                手机价格    库存数量      vivoX9                       2798  

  • 我们为您提供了以下可能遇到的问题,如果有更多的疑问,欢迎您在下方留言告知我们。 如何设置员工离职状态 一个员工能加入多个企业么 常驻城市列表

  • 我们正在使用JavaInputDStream 我们按照Spark Streaming-Kafka集成指南中的描述实施了偏移管理,但现在我们刚刚意识到偏移管理不适合我们,并且如果当前小批量中出现故障,Stream不会再次读取消息。即使我们跳过这一行,它也不会再次读取消息: 我们将代码分解为以下内容,并期望流最终在循环中一次又一次地读取相同的消息,但事实并非如此: 消费者配置参数enable.auto