当前位置: 首页 > 知识库问答 >
问题:

Kafka流:当偏移量不再存在时,Kafka流阻塞

洪河
2023-03-14

我在Kafka·吉拉也描述了这个问题:https://issues.apache.org/jira/browse/KAFKA-13014

我们有多个实例和线程的Kafka流。

这个Kafka流消耗了很多话题。

其中一个主题分区一天内无法访问,主题保留时间为4小时。

解决问题后,Kafka流正试图从不再存在的偏移量中消费:

Kafka消费群体描述:

我们可以看到KS正在等待的当前偏移量是

Kafka流没有抛出任何异常的问题

这是我看到的唯一的记录

08:44:53.924混合prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2

PI:版本代理Kafka:

共有1个答案

宰父才
2023-03-14

问题比我想的要复杂。

问题是把exactly_once和使用国营商店混在一起

当应用程序在不等待流关闭的情况下兑现时,changelog主题的最新消息的事务被中止,因此当我们重新启动kafka流时,拓扑将等待在启动consumig消息之前重新加载本地rocksdb存储。

错误就在那里,因为他们使用消费者元日期“topic.lastoffset”==current\u consumer\u offset检查

但是应该是这样的:

“消费者元日期”主题。上次提交的消息和交易偏移量==当前消费者偏移量

我通过切换到至少一个来解决这个问题,但我认为它是固定在2.7.1上的

 类似资料:
  • 我实现了一个简单的Kafka死信记录处理器。 当使用从控制台生成器生成的记录时,它可以完美地工作。 然而,我发现我们的Kafka Streams应用程序并不能保证向接收器生成记录,对于每个生成的记录,偏移量将增加1。 我有一个场景,其中记录可能在处理它所需的所有数据发布之前被接收。当streams应用程序处理的记录不匹配时,它们将移动到一个死信主题,而不是继续向下流。当发布新数据时,我们将最新的消

  • 我过去曾与Kafka合作过一点,最近需要将部分数据管道移植到AWS Kinesis Stream上。现在我读到Kinesis实际上是Kafka的一个分支,并且有许多相似之处。 然而,我没有看到如何让多个消费者从同一个流中读取数据,每个消费者都有相应的偏移量。每个数据记录都有一个序列号,但我找不到特定于消费者的任何内容(Kafka组Id?)。 在同一个AWS Kinesis Stream上,真的有可

  • 问题内容: 我对使用Kafka和Zookeeper时在哪里存储偏移量感到困惑。在某些情况下,偏移似乎存储在Zookeeper中,而在其他情况下,偏移存储在Kafka中。 是什么决定偏移量存储在Kafka还是Zookeeper中?优点和缺点是什么? 注意:当然,我也可以将偏移量单独存储在其他数据存储区中,但这并不是本文的内容。 有关我的设置的更多详细信息: 我运行以下版本:KAFKA_VERSION

  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 我用Kafka和spring-布特: Kafka制作人班: Kafka-配置: 问题: 我有一个主题的5个分区,比方说。 发生的情况是,我获得成功(即消息成功发送到Kafka)日志,但是topic的无分区的偏移量增加。 正如您在上面看到的,我添加了日志和。我所期望的是,当Kafka不能发送消息给Kafka时,我应该得到一个错误,但在这种情况下,我没有收到任何错误消息。 Kafka的上述行为以的比例

  • 我知道在你的流中的任何时间点都可能发生再平衡。当它发生时,由于没有提交给定偏移量的最新偏移量,可能会发生事件的重新处理。 Kafka流是否允许在重新平衡发生之前完成任何飞行中处理?我的意思是,你的应用程序正在消耗一个记录(在你的过程方法内部),发生一个再平衡事件。该处理是否立即中止或允许处理方法完成? 一个具体的例子是 最后一次计算是否会在状态存储中结束并转发到接收器主题?因此,这意味着当重新平衡