当前位置: 首页 > 知识库问答 >
问题:

Kafka流:是否可以使用与流处理不同的时间戳进行删除?

段干麒
2023-03-14

在Kafka流2.0。

我的用例:能够从重新处理应用程序的历史开始(部分)用事件创建的时间(用户从原始数据定义并通过TimestampExtractor设置)重新处理数据,与长期运行的不间断应用程序一起运行,将数据发送到输出主题(两个应用程序将读取并发送到相同的输出主题,用于构建状态)。

存储是根据这些主题构建的,包括按会话设置窗口。想象一下,我想为这些主题保留一个月的时间(对于乱序事件和消费)——当重新处理时,如果使用事件时间,我将处理(并生成)超过一个月的事件。

根据KIP-32使用message.timestamp.type=LogAppendTime以避免删除,将在状态存储中生成错误的数据(因为时间戳不正确,它们将用于例如会话控制)。

使用事件时间,保持完全保留,并在重新处理完成和消耗后应用清除数据,这很乏味,但有助于减少主题的大小——但是,从它们构建的存储呢?例如。为了在重新处理发生时保存数据,我必须设置一个直到设置伪无限,但DSL创建的存储是(或应该是)只读的,而不是操纵的。

所以,回到标题:

  • 是否可能(或设想)使用与流处理不同的删除时间戳

共有1个答案

袁永贞
2023-03-14

对于流,将LogAppendTime用于重新分区主题是未命中配置。还要注意的是,在重新分区主题中不会丢失任何数据,因为这些主题是以Integer的保留时间创建的。最大值(参见。https://cwiki.apache.org/confluence/display/KAFKA/KIP-284:将流重新分区主题的默认保留ms设置为Long。最大值)。Streams使用purgeDataAPI在数据被使用后从重新分区主题中删除数据(参见。https://issues.apache.org/jira/browse/KAFKA-6150)避免无限生长。

因此,我建议通过log重新配置所有重新分区主题。消息时间戳。键入(即主题级配置)。

 类似资料:
  • H全部, 如果有人有任何经验的kafka-spark流对处理各种数据,请给我一个简短的细节,如果这是一个可行的解决方案,并比有两个不同的管道更好。 提前道谢!

  • 我正在使用spark结构流发送记录到一个Kafka主题。kafka主题是用config- 这样做使得目标Kafka主题记录具有与原始记录相同的时间戳。 我的Kafka流代码:

  • 这个服务我已经测试了它,使用不同版本的Kafka(更高或等于0.10),它工作良好。 以下是我的配置: Spring:cloud:stream:kafka:streams:binder:brokers:${KAFKA_BROKERS}applicationid:email-MESSAGES-stream configuration:default.key.serde:org.apache.kafk

  • 我刚开始接触Kafka。我已经经历了这一切。它只表示kafka流DSL的数据/主题管理。任何人都可以共享Kafka流处理器API的相同数据管理的任何链接吗?我对处理器API的用户和内部主题管理特别感兴趣。 在流处理器开始使用输入数据之前,从哪里用输入数据填充此源主题? 简而言之,我们可以像制片人写主题一样,使用流来写Kafka的“源”主题吗?或者流仅用于主题的并行消费?我相信我们应该像“Kafka

  • 我在一个Kafka主题“原始数据”中获取CSV,目标是通过在另一个主题“数据”中发送具有正确时间戳(每行不同)的每行来转换它们。 null 我想通过直接设置时间戳来删除这个“内部”主题的使用,但我找不到一个方法(时间戳提取器只在消耗时间使用)。 我在文档中偶然发现了这一行: 请注意,通过调用#forward()时显式地为输出记录分配时间戳,可以在处理器API中更改description默认行为。

  • 结果:1 2 3 有人能解释为什么会发生这种情况,以及我如何让非并行版本给出与并行版本相同的结果吗?