当前位置: 首页 > 知识库问答 >
问题:

KStreams确定在连接上持久化哪个输入记录时间戳元数据

锺离正祥
2023-03-14

希望有人知道这一点或者能给我指出正确的方向...

我有一个通过应用编程接口REST请求创建的数据主题。REST请求中收到的一个字段是记录事件时间的时间戳。这些记录被生成给Kafka,事件时间被设置为记录的元数据时间戳。

我还有另一个规则主题,它提供了通过向接收的值添加新字段来扩充数据主题记录的信息。

这两个主题都有用于加入的匹配键。

我的目标是使用processor API在所有处理阶段保留数据主题的EventTime。注意:将有多个不同的KStreams应用程序以多种方式/步骤处理/扩充这些数据。

好消息是,我已经看到许多东西表明,当使用Kafka Streams时,输入记录时间戳是保留的。

例如:

  • https://kafka.apache.org/documentation/streams/core-concepts#streams_time
  • 输入记录时间戳和输出记录时间戳在源和接收器主题上是相同的?

并一直在阅读时间戳提取器:

  • https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-如何编写AcustomTimeStampExtractor

更多关于加入:

  • https://cwiki.apache.org/confluence/display/KAFKA/Kafka流连接语义
  • https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#kstream-全局可连接

在许多Streams文档中,我看到它提到“输入记录的时间戳将保留到输出记录”,但我不清楚这在连接时究竟是如何工作的。

我的困惑似乎是,当我们加入时,我们有两个不同的输入记录,并且正在生成一个输出记录。

如何确定在连接中使用的多个输入记录之间保留哪个时间戳?

我一直在和同事们讨论这个问题,有以下几种观点

  • 连接的输入记录的最早非负时间戳被保留

其中一些论点比其他论点更好,但我需要知道到底发生了什么。。。

如果您能提供任何帮助或建议,我们将不胜感激。

共有1个答案

哈沛
2023-03-14

目前(即Kafka2.0版本)没有使用时间戳的公共合同,并且允许实现使用任何策略。当前实现使用触发连接计算的记录的时间戳。

作为一种解决方法,您可以通过添加来操纵时间戳。valueTransformer()连接后。比较https://cwiki.apache.org/confluence/display/KAFKA/KIP-251:允许在处理器API中操作时间戳

即,您需要在加入之前将原始时间戳嵌入到值有效载荷中,并在加入之后将其提取并设置为元数据时间戳。

 类似资料:
  • 我使用处理器API创建kafka流媒体应用程序。 下面是我如何创建一个主题,将时间戳附加到所有传入消息 Kafka主题。sh--创建--zookeeper localhost:2181--复制因子1--分区1--主题topicName--配置消息。时间戳。类型=创建时间 工作流处理来自源主题的传入消息并将其发布到接收器主题。出于某种奇怪的原因,我在源主题和接收器主题消息中看到了相同的时间戳。例如,

  • 英文原文:http://emberjs.com/guides/models/persisting-records/ Ember Data中的记录都基于实例来进行持久化。调用DS.Model实例的save()会触发一个网络请求,来进行记录的持久化。 下面是几个示例: 1 2 3 4 5 6 var post = store.createRecord('post', { title: 'Rail

  • 我尝试使用mongodb插件作为logstash的输入。以下是我的简单配置: 但是我面临一个“循环问题”,可能是由于一个字段“时间戳”,但我不知道该怎么办。 [2018-04-25T12:01:35998][WARN][logstash.inputs.mongodb]mongodb Input引发异常,重新启动{:exception= 还有一个调试日志: [2018-04-25T12:01:34.

  • 我需要将两个数据集与CLOSE时间戳连接起来。第一个数据集是来自移动应用程序的日记数据集: 在这里: 第二个数据集是来自加速度计日志的数据集,显示移动(=INVH)或空闲(=NIVH): 在这里: 我需要根据时间戳字段之间的时间差连接两个数据帧。例如,在df1上留下join,以查看应用程序日志数据如何与实际加速度计日志一致。简单的左连接在这里不起作用,因为在大多数情况下有一个滞后时间。所以我的问题

  • 问题内容: 我使用的是Python 3,我想编写一个程序,在一定时间内要求多个用户输入。这是我的尝试: 问题是,即使时间到了,代码仍然等待输入。我希望循环在时间用完时停止。我该怎么做呢?谢谢! 问题答案: 此解决方案与 平台无关, 并且会 立即 中断键入以告知有关现有超时的信息。不必等到用户按下ENTER键就可以发现发生了超时。除了及时通知用户之外,这还可以确保在进一步处理超时后不再输入任何内容。