在执行手工抵销管理时,我遇到了以下问题:(使用0.9)
为了手动管理偏移量,对于每个消耗的记录,我检索记录的当前偏移量并提交新的偏移量(currentOffset+1,因为偏移量重置策略是“最新的”)。
当创建新的使用者组时,它没有显式的偏移量(偏移量是“未知”的),因此,如果它在停止之前没有使用来自所有现有分区的消息,那么它将只为部分分区(使用者从中获取消息的分区)提交偏移量,而其余分区的偏移量仍然是“未知”的。
当使用者再次启动时,它只获得在其关闭时产生的一些消息(只获得来自具有提交偏移量的分区的消息),来自具有“未知”偏移量的分区的消息将丢失,并且由于偏移量重置策略而永远不会被使用。
由于在我的例子中,一旦创建了使用者组就会错过任何消息,这是不可接受的,所以我希望在开始使用之前为每个分区显式提交一个偏移量。
为此,我找到了两个选择:
两者都比我想象的要复杂和嘈杂(我希望有一个更简单的API,我可以用来获取分配给使用者的所有分区的日志结束位置)。
希望有更好的执行办法的任何想法或想法。
10倍。
使用使用者API完全取决于您在哪里提交偏移。
list
我有一个Kafka的题目有1个分区。如果它有100条消息,偏移量将从0.99开始。 根据Kafka保留策略,在指定的时间之后,所有的消息都将被清除。 并且我正在发送100个新的消息到主题,一旦所有已经被清除(在保留期之后)。现在,消息的新偏移量从哪里开始呢?是从100还是从0?? 我想知道新的偏移是100-199还是0-99?
根据我的理解,消费者阅读特定主题的消息,并且消费者客户机将定期提交偏移量。 因此,如果由于某种原因,使用者失败了一个特定的消息,该偏移量将不会被提交,然后您可以返回并重新处理该消息。 是否有任何东西跟踪您刚刚消耗的偏移和您随后提交的偏移?
但是,consumer只从主题中第一个未提交的消息开始轮询。我希望总是从偏移量0开始,不管提交的消息是什么。使用Alpakka消费者,如何手动指定偏移量?
本文向大家介绍python实现手机销售管理系统,包括了python实现手机销售管理系统的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了python实现手机销售管理系统的具体代码,供大家参考,具体内容如下 要求如下: 手机销售系统 手机品牌 手机价格 库存数量 vivoX9 2798
我们正在使用JavaInputDStream 我们按照Spark Streaming-Kafka集成指南中的描述实施了偏移管理,但现在我们刚刚意识到偏移管理不适合我们,并且如果当前小批量中出现故障,Stream不会再次读取消息。即使我们跳过这一行,它也不会再次读取消息: 我们将代码分解为以下内容,并期望流最终在循环中一次又一次地读取相同的消息,但事实并非如此: 消费者配置参数enable.auto
我们为您提供了以下可能遇到的问题,如果有更多的疑问,欢迎您在下方留言告知我们。 如何设置员工离职状态 一个员工能加入多个企业么 常驻城市列表