当前位置: 首页 > 知识库问答 >
问题:

输入记录时间戳和输出记录时间戳在源和接收器主题上是相同的?

沈冠宇
2023-03-14

我使用处理器API创建kafka流媒体应用程序

下面是我如何创建一个主题,将时间戳附加到所有传入消息

Kafka主题。sh--创建--zookeeper localhost:2181--复制因子1--分区1--主题topicName--配置消息。时间戳。类型=创建时间

工作流处理来自源主题的传入消息并将其发布到接收器主题。出于某种奇怪的原因,我在源主题和接收器主题消息中看到了相同的时间戳。例如,在源主题中,消息的创建时间为T0,在接收器主题中也保持不变。

我需要做什么才能在接收主题消息中看到更新的时间戳?

共有1个答案

岳玉堂
2023-03-14

如果使用CreateTime配置主题,则时间戳存储区将是生产者提供的时间戳。

对于普通的KafkaProducer来说,您没有明确指定时间戳,KafkaProducer使用系统。currentTimeMillis()并将消息发送给代理。

对于Kafka流,如果您读取具有特定时间戳的输入记录,我们有专门的时间戳推理逻辑来计算结果记录的时间戳。因此,Kafka流在将时间戳交给内部使用的KafkaProducer时显式设置时间戳,因此制作人只使用该时间戳,不使用当前的挂钟时间。对于流处理,这通常是需要的行为。

如果您有一个简单的管道,只将数据从一个主题复制到另一个主题,那么时间戳推断将使用输入记录时间戳作为输出记录时间戳。

要获得不同的语义,可以做两件事:

  1. 为Kafka Streams应用程序配置WallClockTimestampExtractor。在这种情况下,Kafka流将不使用嵌入的记录时间戳,而是使用当前的挂钟时间来推导输出记录的时间戳
  2. AppendTime而不是CreateTime配置输出主题。对于这种情况,代理总是用当前代理挂钟时间覆盖生产者提供的记录时间戳
 类似资料:
  • 我尝试使用mongodb插件作为logstash的输入。以下是我的简单配置: 但是我面临一个“循环问题”,可能是由于一个字段“时间戳”,但我不知道该怎么办。 [2018-04-25T12:01:35998][WARN][logstash.inputs.mongodb]mongodb Input引发异常,重新启动{:exception= 还有一个调试日志: [2018-04-25T12:01:34.

  • 我正在写一个脚本,连接到N主机通过SSH...查询第三方系统和提取数据,然后显示所有收集的数据在一定的格式。 我希望将脚本执行的所有操作以及在控制台和日志文件中遇到的任何异常记录下来,这样用户就可以看到脚本运行时发生了什么(如果有人使用了Ansible,那么就像我们在运行playbooks时在控制台和日志中得到的输出一样) 预期产出 null 请给出建议,如果可能的话,用一个使用该技术的示例脚本。

  • 我正在使用FluentD(v.12最后一个稳定版本)向Kafka发送消息。但是FluentD正在使用一个旧的KafkaProducer,所以记录时间戳总是设置为-1。因此,我必须使用WallclockTimestampExtrator将记录的时间戳设置为消息到达kafka时的时间点。 是否有特定于Kafka Streams的解决方案? 我真的感兴趣的时间戳,是由Fluentd在消息中发送的: “时

  • 问题内容: 我有一列称为“ s_timestamp”。 如何返回时间戳中具有当天的所有记录? 例如, 我想要以下输出: 让我知道是否不清楚。 问题答案: 只是使用。例如 日期() CURDATE()

  • 希望有人知道这一点或者能给我指出正确的方向... 我有一个通过应用编程接口REST请求创建的数据主题。REST请求中收到的一个字段是记录事件时间的时间戳。这些记录被生成给Kafka,事件时间被设置为记录的元数据时间戳。 我还有另一个规则主题,它提供了通过向接收的值添加新字段来扩充数据主题记录的信息。 这两个主题都有用于加入的匹配键。 我的目标是使用processor API在所有处理阶段保留数据主

  • 问题内容: 我最近了解到oracle具有对我来说非常有用的功能-因为设计者/实现者不太在意数据历史记录- 我可以查询记录的历史状态(如果它在oracle缓存中仍然可用),如下所示: 但是现在我需要检查范围内的历史数据。使用缓存是否有可能? 问题答案: 是的,像这样: 请注意,您可以走多远受UNDO_RETENTION参数的限制,通常为几小时而不是几天。