当前位置: 首页 > 知识库问答 >
问题:

重播上次成功消息中的Kafka主题

郎仰岳
2023-03-14

使用Spring云流上通道的标准配置,消息重试3次,然后跳过。如果以下消息处理成功完成,则提交偏移量。这意味着在瞬态异常情况下,消息可能会丢失。

是否可以更改此行为,从而使通道卡在失败消息上,直到修复瞬态条件?

我已经尝试配置重试模板,但您必须指定多次重试,这看起来像是一个无用的参数,因为所需的行为将永远重试。

有人卷入这些麻烦吗?非常感谢。

我还怀疑这会如何干扰max.poll。间隔ms属性。

共有1个答案

郭盛
2023-03-14

在活页夹中禁用重试,并使用ListenerContainerCustomizer添加具有无限重试次数的SeekTocurErrorHandler。。。

@SpringBootApplication
public class So63193500Application {

    public static void main(String[] args) {
        SpringApplication.run(So63193500Application.class, args);
    }

    @Bean
    Consumer<String> input() {
        return str -> {
            System.out.println(str);
            throw new RuntimeException("test");
        };
    }

    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
        return (container, dest, group) -> {
            if (group.equals("so63193500")) {
                container.setErrorHandler(new SeekToCurrentErrorHandler(
                        new FixedBackOff(5_000, FixedBackOff.UNLIMITED_ATTEMPTS)));
            }
        };
    }

}
spring.cloud.stream.bindings.input-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.input-in-0.group=so63193500

这会导致寻道,只要后退间隔不太长,就不会影响轮询间隔。

也可以使用指数衰减。

 类似资料:
  • 我在kafka中面临一个奇怪的问题,即在消费者应用程序重新启动后,所有来自主题的kafka消息都在重播。有人能帮我我在这里做错了什么吗? 这是我的配置: spring.kafka.consumer.auto-偏移-重置=最早 spring.kafka.enable.auto。提交=false 我的生产者配置: 消费者配置: 消费者代码: 集装箱代码 消费者配置 应用程序.属性

  • 我需要一个Kafka主题存储的消息数量。这与任何消费者是否消费了消息无关。 以上是否等于Kafka主题中当前存储的消息数?

  • 我正在使用kafkapython来消费来自kafka队列(kafka版本0.10.2.0)的消息。特别是我使用的是KafkaConsumer类型。如果消费者停止,并且在一段时间后重新启动,我希望从最新生成的消息重新启动,即删除消费者停止时生成的所有消息。我怎样才能做到这一点? 谢谢

  • 从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。 如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。 我查看了示例,但没有找到符合我要求的任何内容。有没有一种

  • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。