我正在使用spring-cloud-kafka绑定器将数据读取到KStream。在阅读其中一个主题的数据时,我需要从头开始阅读。
我尝试设置kafka偏移重置和启动偏移属性。但是,找不到任何参考。
你能帮我提供任何示例application.yaml来重置偏移量吗?这样我就可以从一开始就使用主题中的消息。
添加我使用过的应用程序:
spring.cloud.stream.bindings.input:
destination: input-topic1
consumer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.bindings.output:
destination: output-topic
producer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.bindings.beginningInput:
destination: beginning-topic
consumer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.beginningInput:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
resetOffsets: true
startOffset: earliest
spring.cloud.stream.kafka.streams.binder:
brokers: 127.0.0.1
zkNodes: 127.0.0.1
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 1000
resetOffset
已损坏。它在2.0.0.RELEASE.恢复。
PR在这里。
我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重
侦听器类 在读取日志时,它甚至不会进入我为它放置记录器的侦听器类。对此有什么想法吗?
如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?
我正在尝试通过SCSt频道构建并获取KTable。但这并不奏效。输入KTable没有数据,但如果我尝试查看KSTream聚合(toStream()),我可以看到一些数据。我明白了,KTable是不可查询的,它没有可查询的名称。 类别: 绑定: application.yml:
我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff
将kafka consumer offset重置为“最早”时,它会保留一些带有偏移量的分区 显示: 为什么分区1也没有0?