当前位置: 首页 > 知识库问答 >
问题:

如何在Spring云kafka活页夹中将偏移量重置为开始

丌官坚秉
2023-03-14

我正在使用spring-cloud-kafka绑定器将数据读取到KStream。在阅读其中一个主题的数据时,我需要从头开始阅读。

我尝试设置kafka偏移重置和启动偏移属性。但是,找不到任何参考。

你能帮我提供任何示例application.yaml来重置偏移量吗?这样我就可以从一开始就使用主题中的消息。

添加我使用过的应用程序:

spring.cloud.stream.bindings.input:
  destination: input-topic1
  consumer:
    useNativeDecoding: true
    headerMode: raw
spring.cloud.stream.bindings.output:
  destination: output-topic
  producer:
    useNativeDecoding: true
    headerMode: raw
spring.cloud.stream.bindings.beginningInput:
  destination: beginning-topic
  consumer:
    useNativeDecoding: true
    headerMode: raw
spring.cloud.stream.kafka.streams.bindings.input:
  consumer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.output:
  producer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.beginningInput:
  consumer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    resetOffsets: true
    startOffset: earliest
spring.cloud.stream.kafka.streams.binder:
  brokers: 127.0.0.1
  zkNodes: 127.0.0.1
  configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    commit.interval.ms: 1000

共有1个答案

宗穆冉
2023-03-14

resetOffset已损坏。它在2.0.0.RELEASE.恢复。

PR在这里。

 类似资料:
  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重

  • 侦听器类 在读取日志时,它甚至不会进入我为它放置记录器的侦听器类。对此有什么想法吗?

  • 如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?

  • 我正在尝试通过SCSt频道构建并获取KTable。但这并不奏效。输入KTable没有数据,但如果我尝试查看KSTream聚合(toStream()),我可以看到一些数据。我明白了,KTable是不可查询的,它没有可查询的名称。 类别: 绑定: application.yml:

  • 我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff

  • 将kafka consumer offset重置为“最早”时,它会保留一些带有偏移量的分区 显示: 为什么分区1也没有0?