当前位置: 首页 > 知识库问答 >
问题:

Spark Kafka流媒体-发送原始时间戳而不是当前时间戳

逑兴安
2023-03-14

我正在使用spark结构流发送记录到一个Kafka主题。kafka主题是用config-消息创建的。timestamp.type=createtime

这样做使得目标Kafka主题记录具有与原始记录相同的时间戳。

我的Kafka流代码:

kafkaRecords.selectExpr("CAST(key AS STRING)", "CAST(value AS BINARY)","CAST(timestamp AS TIMESTAMP)")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers","IP Of kafka")
    .option("topic",targetTopic)
    .option("kafka.max.in.flight.requests.per.connection", "1")
    .option("checkpointLocation",checkPointLocation)
    .save()

共有1个答案

唐运诚
2023-03-14

主题的createtime配置意味着何时创建记录,即您获得的时间。

不清楚您在哪里读取数据和看到时间戳,如果您运行的是生产者代码“今天”,这是他们得到的时间,而不是以前。

如果您想要过去的时间戳,则需要使用包含时间戳参数的构造函数,使ProducerRecord包含该时间戳,但Spark不公开该时间戳。

 类似资料:
  • 问题内容: 我想要这样的当前时间戳: 1320917972 问题答案: 解决方案是:

  • 当我在twitch上开始直播时,我正试图让我的机器人向指定频道发送消息。到目前为止,我还在忙于从我的状态中获取正确的“活动”,这表示我正在流媒体。到目前为止,我得到的是: 提前感谢任何帮助!^^

  • 我们正在使用使用STREAM_TIME标点符号的自定义转换器。当我记录通过转换函数发送的消息时,来自context.timestamp()的流时间显示如预期的那样——基于使用时间戳提取器派生的数据的合理日期。 现在——在过去的某个时候,我们收到了一些恶意消息,将流时间提前到2036年。我们现在已经阻止了这些上游,重新启动了Kafka河。 当流启动时,标点符号会在受影响任务的启动时运行,但会显示20

  • 问题内容: 我正在开发一个支持Google两步验证的应用程序。此应用程序还支持“可信任此设备30天”的功能。 我使用数据库保存所有这些信息,例如IP地址和到期时间。现在,当我填写时间戳以将当前时间增加30天时,它将比当前时间早的时间戳插入数据库中。 例如:当前时间= 。现在,当我加上30天(毫秒)时,得出的日期不是30天,而是大约19天。 问题答案: 此问题与32位整数溢出有关。由于整数的最大值为

  • 我正在使用Spring Cloud Sleuth和Zipkin(通过HTTP),将spring-cloud-starter-zipkin版本2.0.0.M6添加到我的依赖项(基于Spring Boot 2.0.0.rc1和Spring Cloud Finchley M6)中。 我正在使用@Newspan注释来标记某个(昂贵的)操作的子跨度。当跨度信息发送到Zipkin时,我注意到缺少子跨度的时间戳

  • 本文向大家介绍php 将当前时间戳unixtime增加时间间隔的方法,包括了php 将当前时间戳unixtime增加时间间隔的方法的使用技巧和注意事项,需要的朋友参考一下     可以+year   还可以是天, 月日都可以的,如下代码: