当前位置: 首页 > 知识库问答 >
问题:

为什么我的Kafka Streams应用程序的消费者组(app-id)的偏移量在应用程序重启后会被重置?

徐鸿文
2023-03-14

我有一个Kafka Streams应用程序,每当我重新启动它时,它所消耗的主题的偏移量就会被重置。因此,对于所有分区,延迟增加,应用程序需要重新处理所有数据

更新:输出主题接收到一系列事件,这些事件在应用程序重新启动后已经被处理,而不是像我在上一段中所说的那样,输入主题的偏移量被重置。但是,内部主题(KTABLE-SUPPRESS-STATE-STORE)偏移量正在重置,请参见下面的注释。

在重新启动之前,我已经确保每个分区的延迟为1(这是针对输出主题的)。属于该消费者组ID(app-id)的所有消费者都是活动的。重启是立即的,大约需要30秒。

我已经读过这个答案,对于一个Apache Kafka消费者群体,偏移是如何过期的?.

我尝试了auto.offset.reset=最新和auto.offset.reset=最早。

似乎这些主题的补偿没有有效地提交,(但我不确定这一点)。

Kafka流API是否确保在关闭之前提交所有消耗的偏移量?(在调用streams.close()之后)

我会非常感激任何关于这件事的线索。

更新:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(...);

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))              
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(
                  Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));

再一次,我会非常感激任何关于这一点的线索。

更新:这个问题已经在2.2.1版中解决了(https://issues.apache.org/jira/browse/kafka-7895)

共有1个答案

封德华
2023-03-14

偏移量重置只是而且总是(在重新启动之后)使用Kafka Stream API创建的KTABLE-SUPPRESS-STATE-STORE内部主题进行。

这是当前(版本2.1)的预期行为,因为suppress()操作符仅在内存中工作。因此,在重新启动时,必须从changelog主题重新创建禁止缓冲区,然后才能开始处理。

注意,计划在未来的版本中允许suppress()写入磁盘(参见https://issues.apache.org/jira/browse/kafka-7224)。这将避免从changelog主题重新创建缓冲区的开销。

 类似资料:
  • 我有一个Kafka主题和一个消费者,在Spring云应用程序中分配了一个消费者组(必须)。作为一项要求,在每次应用程序重启时,我都需要从一开始就开始读取所有接收到的消息。这应该是通过属性实现的,但从这个问题可以清楚地看出,它目前不起作用。 我在kafka消费者api中发现了这个变通方法,它建议在每次重启时为消费者组分配一个新的随机名称,作为从最早开始阅读的一种方式。在Spring Cloud St

  • 我已经用Apache ActiveMQ和一个简单的应用程序创建了一个JMS代理,该应用程序将消息纳入队列OK。 我想创建另一个简单的应用程序,使用MDP异步出列这些消息。以下是我到目前为止所拥有的一个例子: 现在我大概需要一个main方法,但是如果消息到达队列时监听器会异步调用onMessage方法,我不确定如何编写代码: 谢谢你的帮助。

  • 我将python kafka consumer的< code>auto_commit设置为< code>False,我正在手动提交消息。然而,重启后,消费者再次消费来自每个分区的最后一条消息。只有最后一个,不能再多。 这就是所展示的: 不知道为什么会显示滞后,whu当前偏移设置为最后一条消息而不是下一条?当我提交偏移量3时,当前偏移量不应该移动到4吗? 我提交我使用的每条消息,但是在重启时,它总是

  • 我使用的是0.10.1.1 API的高级使用者。 奇怪的是,当我关闭应用程序并重新启动它时,偏移量比上次提交的偏移量大一点,我找不到原因。 我在代码中只有一个提交点。 一个分区的示例: 关机前偏移量:3107169023 分区分配时的偏移量:3107180350

  • 我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1

  • 我有一个Spring Cloud Stream Kafka Stream应用程序,它读取主题(事件)并执行一个简单的处理: 该应用程序使用来自Confluent Cloud的Kafka环境,带有6个分区的事件主题。完整的配置是: 首先,它显示还原使用者客户端的创建。自动偏移复位无: > 配置了两个消费者的原因是什么? 为什么第二个函数具有,而我没有显式配置它,而且Kafka的默认值是最新的? 我已