当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:如何在摄取时间模式下获取事件的时间戳?

宗政元青
2023-03-14

我想知道是否可以通过使用Flink的摄取时间模式获得记录的时间戳。考虑以下flink代码示例(https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/windowjoinsampledata.scala),

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

val grades = WindowJoinSampleData.getGradeSource(env, rate)
val salaries = WindowJoinSampleData.getSalarySource(env, rate)

val joined = joinStreams(grades, salaries, windowSize)

...
case class Grade(name: String, level: Int) 
case class Salary(name: String, salary: Int)

默认情况下,薪等和薪资都不包含时间戳字段。但是,既然Flink允许使用“IngestionTime”为数据流中的记录分配挂钟时间戳,那么是否有可能在运行时获得这样的时间戳呢?例如,下面是我正在尝试做的:

val oldDatastream = env.addSource...  // Using ingestion time
val newDatastream = oldDatastream.map{record =>   
    val ts = getRecordTimestamp(record)
    // do some thing with ts
    }

谢谢你的帮助。

共有1个答案

养焱
2023-03-14

使用processfunction会给出一个context,您可以使用它来获取元素的时间戳(无论是它的摄取时间、处理时间还是事件时间)。

 类似资料:
  • 问题内容: 我想得到 1到24 ,太平洋时间凌晨1点。 如何在Node.JS中获得该号码? 我想知道现在太平洋时间现在几点。 问题答案: 您应该检出Date对象。 特别是,您可以查看Date对象的getHours()方法。 getHours()返回0到23之间的时间,因此请确保相应地进行处理。我认为0-23有点直观,因为军事时间是从0-23开始,但这取决于您。 考虑到这一点,代码将类似于以下内容:

  • 我已经看过stackoverflow了,甚至看过一些建议的问题,但似乎没有一个答案,如何在C#中获得unix时间戳?

  • 我选择“无时区”是因为我知道我的应用程序使用的所有时间戳总是UTC。就我得到的文档而言,“with timestamp”的唯一区别是,我可以提供其他时区的值,然后将其转换为UTC。然而,我想避免这样的自动转换,因为如果我知道我的值是UTC,它们几乎没有任何好处。 当我在测试表中添加新记录并使用pgAdmin查看表的内容时,我可以看到插入日期已正确地保存为UTC格式。 但是,当我尝试使用JDBC选择

  • 问题内容: 创建cookie时,如何获取cookie的过期时间? 问题答案: 这很难实现,但是可以在另一个Cookie中设置Cookie的到期日期。然后可以稍后读取此cookie以获取到期日期。也许有更好的方法,但这是解决问题的方法之一。

  • 问题内容: 如何使用Java获取声音文件的总时间中提供的答案?适用于WAV文件,但不适用于mp3文件。 它们是(给定文件): 和: 它们为wav文件提供相同的正确结果,但为mp3文件提供错误和不同的结果。 知道要获取mp3文件的持续时间该怎么办吗? 问题答案: 使用MP3SPI: