当前位置: 首页 > 知识库问答 >
问题:

Kafka流-重新处理非常旧的消息

丁恩
2023-03-14

我们正在应用程序中使用apache kafka streams 0.10.2.0。我们利用kafka streams拓扑将处理后的数据传递到下一个主题,直到处理结束。

此外,我们使用AWS ECS容器来部署消费者应用程序。我们观察到消费者正在拾取非常旧的消息进行处理,尽管它们已经在更早的时候处理过。这个问题在服务扩展/缩减或新部署时随机发生。我知道在消费者重新平衡时,有些消息可以重新处理。但在这种情况下,它正在重新处理大量很久以前(超过10天)成功处理的消息

我们无法理解这个问题的根本原因。它没有正确地提交偏移量,并在不同的拓扑中拾取随机消息。这会导致在任何拓扑中重新处理一条消息的行为不一致。

令人惊讶的是,我们在消费者身上也没有看到任何例外。请提供帮助。

以下是我们正在使用的配置:

    Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"UniqueKey");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,key);
    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 60000));
    streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 6));

以下是处理器的代码片段:

    final KStreamBuilder builder = new KStreamBuilder();
    builder.addSource(key, Serdes.String().deserializer(), executor.getDeserializer(), key);
    builder.addProcessor(key + "_processor", () -> new KafkaProcessor(), key);
    builder.addSink(key + "_sink", key + "_sink", key + "_processor");
    final KafkaStreams streams = new KafkaStreams(builder, StreamConfigurations.getStreamsConfgurations(key, kafkaHost));
    streams.start();
    streams.setUncaughtExceptionHandler((t, th) -> {
    _logger.error("UncaughtException in Kafka StreamThread  " + t.getName() + " exception = ", th.getMessage());
    });
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

我查看了一些Kafka重新处理博客,并考虑尝试下面列出的更多配置:

    streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);
    streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); //default is 10000
    streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); //default is 30000
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    streamsConfiguration.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000); //default is 5000
    streamsConfiguration.put(ProducerConfig.ACKS_CONFIG,1);
    streamsConfiguration.put(ProducerConfig.RETRIES_CONFIG,10);

谢谢,阿尔帕

共有1个答案

谢叶五
2023-03-14

在发生缩放导致的重新平衡时,您是否有代理端请求日志?我怀疑偏移量提取请求/响应和后续提取中存在一些错误(例如,如果提取主题碰巧被截断,因此从提交的偏移量开始的提取返回“超出范围”异常,导致其重置)。但所有这些嫌疑犯都需要从服务器端请求日志中进行验证。

 类似资料:
  • 我有一个Kafka Streams应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题。 每小时消耗/产生几百万条记录。每当我关闭一个代理时,应用程序就进入重新平衡状态,在重新平衡多次之后,它开始使用非常旧的消息。 注意:当Kafka Streams应用程序运行良好时,它的消费者滞后几乎为0。但再平衡之后,它的滞后从0到1000万。 这会不会是因为偏移.保留.分钟。 在这方面的任何帮助都将

  • 具有Kafka Streams应用,其通过例如1天的流连接来执行开窗(使用原始事件时间,而不是挂钟时间)。 如果启动此拓扑,并从头开始重新处理数据(如在 lambda 样式的体系结构中),此窗口是否会将旧数据保留在那里?da 例如:如果今天是2022-01-09,而我收到来自2021-03-01的数据,那么这个旧数据会进入表格,还是会从一开始就被拒绝? 在这种情况下,可以采取什么策略来重新处理这些

  • 曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法

  • 我最近看到了这篇关于Apache Kafka文档的文章,内容涉及如何处理Kafka流中的无序消息 https://kafka.apache.org/21/documentation/streams/core-concepts#streams_out_of_ordering 有人能给我解释一下下面这句话背后的原因吗: 在主题分区中,记录的时间戳可能不会随着它们的偏移量单调地增加。由于Kafka流总是

  • 我将一些事件转发给Kafka并启动了我的Kafka流程序。我的程序开始处理事件并完成。一段时间后,我停止了我的Kafka流应用程序并重新开始。观察到我的Kafka流程序正在处理已经处理过的先前事件。 根据我的理解,Kafka流在内部维护每个应用程序id的输入主题本身的偏移量。但在这里重新处理已经处理的事件。 如何验证Kafka流处理的偏移量?Kafka流是如何保存这些书签的?根据什么 如果Kafk

  • 问题内容: 曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。 我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等, 有人可以建议正确的做法吗?我应该使用 吗?或者,还有更好的方法? 如何处理重试? 问题答案: 这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异