当前位置: 首页 > 知识库问答 >
问题:

Kafka流:如何更改记录的时间戳(0.11.0)?

仲浩歌
2023-03-14

我正在使用FluentD(V.12最后一个稳定版本)向Kafka发送消息。但是FluentD使用的是旧的KafkaProducer,因此记录时间戳始终设置为-1。因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为消息到达Kafka时的时间点。

我真正感兴趣的时间戳是由fluentd在消息中发送的:

“时间戳”:“1507885936”,“主机”:“V.X.Y.Z.”

Kafka的唱片表现:

偏移量=0,时间戳=-1,键=空,值={“时间戳”:“1507885936”,“主机”:“V.X.Y.Z.”}

我的解决方法类似于:-编写一个使用者来提取时间戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/timestampextractor.html)

  • 编写一个生成器,以生成一个设置了时间戳的新记录(ProducerRecord(字符串主题、整数分区、长时间戳、K键、V值)

我更喜欢KafkaStreams的解决方案,如果有的话。

共有1个答案

步联
2023-03-14

您可以编写一个非常简单的Kafka Streams应用程序,如:

KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");

并使用自定义的timestampextractor配置应用程序,该应用程序从记录中提取时间戳并返回它。

Kafka流将在将记录写回Kafka时使用返回的时间戳。

注意:如果您有无序数据--即时间戳没有严格排序--结果也会包含无序的时间戳。Kafka Streams使用返回的时间戳写回Kafka(即,提取器返回的任何时间戳都用作记录元数据时间戳)。请注意,在写入时,来自当前处理的输入记录的时间戳将用于所有生成的输出记录--这对1.0版是有效的,但在以后的版本中可能会改变。)。

 类似资料:
  • 我正在使用FluentD(v.12最后一个稳定版本)向Kafka发送消息。但是FluentD正在使用一个旧的KafkaProducer,所以记录时间戳总是设置为-1。因此,我必须使用WallclockTimestampExtrator将记录的时间戳设置为消息到达kafka时的时间点。 是否有特定于Kafka Streams的解决方案? 我真的感兴趣的时间戳,是由Fluentd在消息中发送的: “时

  • 问题内容: 如果我在类型表中具有一列并且具有默认值:CURRENT_TIMESTAMP如果我更新同一行中 任何 其他列的值,此列是否会更新为当前时间戳? 似乎没有,但是我不确定这是否应该发生。 我不明白这是什么意思(来自MySQL文档): 如果该列是自动更新的,则当该行中任何其他列的值从其当前值更改时,它将自动更新为当前时间戳。如果所有其他列均设置为其当前值,则该列保持不变。为防止当其他列更改时该

  • 我正在使用django模型。django设置中的TIMEZONE是UTC。并通过做一些算术来构建时间戳。 当我使用::timestamp时,时间戳返回为2021 07月26日00:00:00如果我使用::timestamp,它将变为2021 07月26日00:00:00,即使请求的\u时区是“美国/纽约” 我希望输出为2021 07月26日00:00:00-04:00,即显示与“附加美国/纽约”偏

  • 我们正在使用使用STREAM_TIME标点符号的自定义转换器。当我记录通过转换函数发送的消息时,来自context.timestamp()的流时间显示如预期的那样——基于使用时间戳提取器派生的数据的合理日期。 现在——在过去的某个时候,我们收到了一些恶意消息,将流时间提前到2036年。我们现在已经阻止了这些上游,重新启动了Kafka河。 当流启动时,标点符号会在受影响任务的启动时运行,但会显示20

  • 问题内容: 我有一列是时间戳。 它记录如下内容: 然后我在php中使用它: 它可以输出如下内容: 我的问题是,如何更改日期文本,使其以另一种语言(特别是瑞典语)输出月份名称? 例如:1月为瑞典文Januari 谢谢 问题答案: 本地的PHP函数是 strftime() 。 %B 完整月份的名称,基于从1月到12月的语言环境 如果服务器不在瑞典语言环境中,请使用 setlocale() 。 也就是说

  • 我们使用核心JMS客户机库,通过Boomi中的JMS连接器向ActiveMQ Artemis 2.11.0发送消息。但是,当通过ActiveMQ管理控制台查看队列中的消息时,队列中消息的时间戳与当前时间和日期不同。队列中的消息是从今天开始的,但您可以看到它的日期是去年9月: 你知道如何修复这个配置吗?