当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka流:无序消息

邹修真
2023-03-14

我有一个Apache Kafka2.6制作人,它写的主题-A(TA)。我还有一个Kafka streams应用程序,它使用TA并写入topic-B(TB)。在streams应用程序中,我有一个自定义的时间戳提取器,它从消息负载中提取时间戳。

对于我的一个失败处理测试用例,我在应用程序运行时关闭了Kafka集群。

当生产者应用程序试图向TA写入消息时,它无法写入,因为集群已关闭,因此(我假设)缓冲了消息。假设它以递增的时间顺序接收4个消息m1、m2、m3、m4。(即m1是第一个,m4是最后一个)。

我从这里得到了一个解决方案,将所有事件从tA流到另一个中间主题(比如tA'),它将使用时间戳提取器到另一个主题。但我不确定这是否会导致事件根据提取的时间戳重新排序。

我的生产者代码如下所示(我使用Spring Cloud创建生产者):Producer.java

@Service
public class Producer {

    private String topicName = "input-topic";
        
    private ApplicationProperties appProps;
    
    @Autowired
    private KafkaTemplate<String, MyEvent> kafkaTemplate;
    
    public Producer() {
        super();        
    }
    
    @Autowired
    public void setAppProps(ApplicationProperties appProps) {
        this.appProps = appProps;
        this.topicName = appProps.getInput().getTopicName();
    }

    public void sendMessage(String key, MyEvent ce) {
        ListenableFuture<SendResult<String,MyEvent>> future = this.kafkaTemplate.send(this.topicName, key, ce); 
        
    }
}

共有1个答案

东方旺
2023-03-14

这是为什么?这是因为生产者中的缓冲是多线程的,每一个生产者同时对主题进行生产吗?

默认情况下,生产者允许最多5个并行的正在运行的请求到一个代理,因此,如果一些请求失败并被重试,请求顺序可能会改变。

若要避免此重新排序问题,您可以设置max.in.flight.requests.per.connection=1(可能会导致性能下降)或设置enable.idempotence=true

顺便问一句:你没有说你的主题是有一个分区还是多个分区,你的消息是不是有一个密钥?如果您的主题有多个分区,并且您的消息被发送到不同的分区,那么读取时没有排序保证,因为偏移量排序只在一个分区内得到保证。

我假设自定义时间戳提取器将有助于在使用消息时对消息进行排序。但他们没有。或者也许我对时间戳提取器的理解是错误的。

时间戳提取器只提取一个时间戳。Kafka Streams不对任何消息重新排序,但总是以偏移量顺序处理消息。

我从这里得到了一个解决方案,将所有事件从tA流到另一个中间主题(比如tA'),它将使用时间戳提取器到另一个主题。但我不确定这是否会导致事件根据提取的时间戳重新排序。

不,它不会做任何重新排序。另一个SO问题是要更改时间戳,但如果您按照a、b、c顺序读取消息,那么结果将按照a、b、c顺序写入(只是使用不同的时间戳,但应保留偏移顺序)。

本讲座解释了一些更多细节:https://www.confluent.io/kafka-summit-San-francisco-2019/Whats-the-time-and-why/

 类似资料:
  • 我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所

  • 我最近看到了这篇关于Apache Kafka文档的文章,内容涉及如何处理Kafka流中的无序消息 https://kafka.apache.org/21/documentation/streams/core-concepts#streams_out_of_ordering 有人能给我解释一下下面这句话背后的原因吗: 在主题分区中,记录的时间戳可能不会随着它们的偏移量单调地增加。由于Kafka流总是

  • 我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制

  • 我想使用Avro来序列化我的Kafka消息的数据,并想将其与Avro模式存储库一起使用,这样我就不必将模式包含在每条消息中。 将Avro与Kafka结合使用似乎是一件很流行的事情,许多博客/堆栈溢出问题/用户组等都提到了将模式Id与消息一起发送,但我找不到一个实际的示例来说明它应该去哪里。 我想它应该放在Kafka消息头的某个地方,但我找不到一个明显的地方。如果它在Avro消息中,则必须根据模式对

  • 我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放 我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。 消费者设置: 生产者设置: 豆接线: 骆驼路线: 但

  • 假设我有一个包含3个应用程序的流——一个源、处理器和接收器。 我需要保留从源收到的消息的顺序。当我收到消息A,B,C,D,我必须将它们作为A,B,C,D.发送到接收器(我不能将它们作为B,A,C,D)发送。 如果每个应用程序只有一个实例,那么一切都将按顺序运行,并且顺序将被保留。 如果我每个应用程序有 10 个实例,则消息 A、B、C、D 可能会在不同的实例中同时处理。我不知道这些消息的顺序是什么