当前位置: 首页 > 知识库问答 >
问题:

Kafka流中消息的无序处理

施兴言
2023-03-14

我最近看到了这篇关于Apache Kafka文档的文章,内容涉及如何处理Kafka流中的无序消息

https://kafka.apache.org/21/documentation/streams/core-concepts#streams_out_of_ordering

有人能给我解释一下下面这句话背后的原因吗:

在主题分区中,记录的时间戳可能不会随着它们的偏移量单调地增加。由于Kafka流总是尝试处理主题分区内的记录以遵循偏移量顺序,因此它会导致具有较大时间戳(但偏移量较小)的记录比同一主题分区中具有较小时间戳(但偏移量较大)的记录更早地被处理。

共有1个答案

魏彦
2023-03-14

一个Kafaka生产者可以在它产生一个消息时指定一个时间戳。

标准Java生产者API的示例:https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/Producer/producerrecord.html#producerrecord-Java.lang.string-java.lang.integer-java.lang.long-k-v-

 类似资料:
  • 我有一个Apache Kafka2.6制作人,它写的主题-A(TA)。我还有一个Kafka streams应用程序,它使用TA并写入topic-B(TB)。在streams应用程序中,我有一个自定义的时间戳提取器,它从消息负载中提取时间戳。 对于我的一个失败处理测试用例,我在应用程序运行时关闭了Kafka集群。 当生产者应用程序试图向TA写入消息时,它无法写入,因为集群已关闭,因此(我假设)缓冲了

  • 我试图找出最好的方式将我的数据扇出到单独的占位符中,以供其他处理的数据使用 用例我正在接收Kafka主题中几个脚本(约2000只股票)的股票数据。我希望能够单独在所有脚本上运行KPI(KPI就像应用于输入数据以获取KPI值的公式)。 我能想到的选项 > 将所有刻度数据保存在一个主题中,并使用Custom分区器按脚本名称对其进行分区。这有助于保持低主题计数和系统易于管理。但是所有消费者都需要丢弃大量

  • 我们正在应用程序中使用apache kafka streams 0.10.2.0。我们利用kafka streams拓扑将处理后的数据传递到下一个主题,直到处理结束。 此外,我们使用AWS ECS容器来部署消费者应用程序。我们观察到消费者正在拾取非常旧的消息进行处理,尽管它们已经在更早的时候处理过。这个问题在服务扩展/缩减或新部署时随机发生。我知道在消费者重新平衡时,有些消息可以重新处理。但在这种

  • 我注意到Kafka记录有一个CRC字段。如果日志文件中的记录损坏(例如,消息中间的一个比特被翻转),那么在流应用程序中,我希望看到的是: 该主题被复制 由于我们使用的是Avro,我可以想象以下情况之一: 底层基础设施检测到CRC错误,并从另一个代理获取该错误

  • 如果每个Kafka消息属于一个特定的会话,如何管理会话关联,以便同一个Spark执行器看到链接到一个会话的所有消息? 如何确保属于会话的消息被Spark executor按照在Kafka中报告的顺序处理?我们能以某种方式实现这一点而不对线程计数施加限制并导致处理开销(如按消息时间戳排序)吗? 何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何

  • 我有一个Kafka Streams应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题。 每小时消耗/产生几百万条记录。每当我关闭一个代理时,应用程序就进入重新平衡状态,在重新平衡多次之后,它开始使用非常旧的消息。 注意:当Kafka Streams应用程序运行良好时,它的消费者滞后几乎为0。但再平衡之后,它的滞后从0到1000万。 这会不会是因为偏移.保留.分钟。 在这方面的任何帮助都将