当前位置: 首页 > 知识库问答 >
问题:

Apache flink 1.52行时时间戳为空

魏誉
2023-03-14

我正在使用以下代码进行一些查询:

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStream<Row> ds = SourceHelp.builder().env(env).consumer010(MyKafka.builder().build().kafkaWithWaterMark2())
            .rowTypeInfo(MyRowType.builder().build().typeInfo())
            .build().source4();
    //,proctime.proctime,rowtime.rowtime
    String sql1 = "select a,b,max(rowtime)as rowtime from user_device group by a,b";
    DataStream<Row> ds2 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device").fields("a,b,rowtime.rowtime")
            .rowTypeInfo(MyRowType.builder().build().typeInfo13())
            .sql(sql1).in(ds).build().result();

    ds2.print();
    // String sql2 = "select a,count(b) as b from user_device2 group by a";
    String sql2 = "select a,count(b) as b,HOP_END(rowtime,INTERVAL '5' SECOND,INTERVAL '30' SECOND) as c from user_device2 group by HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '30' SECOND),a";
    DataStream<Row> ds3 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device2").fields("a,b,rowtime.rowtime")
            .rowTypeInfo(MyRowType.builder().build().typeInfo14())
            .sql(sql2).in(ds2).build().result();

    ds3.print();
    env.execute("test");

注意:对于sql1,我使用带有rowtime的max函数,它不起作用,并引发以下异常:

线程"main"org.apache.flink.runtime.client.JobExecutionException:java.lang.RuntimeException中的异常:Rowtime时间戳为空。请确保定义了正确的TimestampAssigner并且流环境使用EventTime时间特性。org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)com.aicaigroup.water.WaterTest.testRowtimeWithMoreSqls5(WaterTest.java:158)com.aicaigroup.water.WaterTest.main(WaterTest.java:20)引起的:java.lang.RuntimeException: Rowtime时间戳为空。请确保定义了正确的TimestampAssigner,并且流环境使用EventTime时间特性。在DataStreamSourceConversion$org.apache.flink.streaming.api.operators.AbstractStreamOperator(未知来源)在CountingOutput.collect(CRowOutputProcessRunner.scala:67)在org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)在org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:628)在org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:581)在24.processElement$org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(AbstractStreamoperator. java: 679)在org. apache. flink. stream. api. operator. AbstractStreamoperator$Counting Output. Collection(AbstractStreamoperator. java: 657)在org. apache. flink. stream. api. operator. TimestatamCollector. Collection(TimestampeColltor. java: 51)在com. aicaigroup. TableHelp1 Dollar. Process Element(TableHelp1 Dollar. java: 42)在com. aicaigroup. TableHelp1 Element(TableHelp. java)533)在org. apache. flink. stream. runtime. tasks.0014 atorChain$CopyingChainingOutput.集合(操作链. java: 513)在org. apache. flink. stream. api. operator. AbstractStream运算符$Counting Output.集合(AbstractStream运算符. java: 679)在org. apache. flink. stream. api.操作员. AbstractStream运算符$Counting Output.集合(AbstractStream运算符. java: 657)在org. apache. flink. stream. api. operator. TimestamedCollector.收集(Timestampe Collector. java: 51)在org. apache. flink. table. runtime.聚合. GroupAggProcessFunc. Element(GroupAggessFunc. scala: 151)

然后我试着这样更新sql1“从user_device中选择a,b,rowtime”,成功了。那么如何修复错误呢?第一个sql应该使用group by,第二个sql应该使用rowtime by timeWindow。3q

共有1个答案

东郭翰音
2023-03-14

我从1.6开始眨眼,遇到像你这样的类似问题。通过这些步骤解决:

  • 使用赋值时间戳和水印,只需使用默认和正常的实现BoundedOutOfOrdernessTimestampExtractor。您需要编写extTimestamp函数来提取时间戳值并在构造函数中声明窗口间隔。
  • 在字段末尾追加proctime.proctimerowtime.rowtime(我正在使用fromDataStream(Flink 1.6)将stream转换为表)
  • 如果您想使用存在字段作为行时间。例如,数据源字段是“a, clicktime, c”,您可以声明“a,clicktime.rowtime, c”

希望能帮到你。

 类似资料:
  • 本文向大家介绍sqlite时间戳转时间语句(时间转时间戳),包括了sqlite时间戳转时间语句(时间转时间戳)的使用技巧和注意事项,需要的朋友参考一下 下面是具体的实现代码:

  • 我知道这是一个非常常见的问题,但我觉得我找到的答案并没有真正解决问题。我将概述我的具体用例,并对来自其他SO答案和网络的信息进行总结。 对于我正在编写的服务,数据库条目被创建并存储在移动设备和我们的网站上,需要以两种方式同步。我们目前的目标是Android和iOS,它们都使用sqlite作为关系数据库。服务器端是使用Django和MySQL在Python中实现的,但将来可能会有其他解决方案取代它。

  • 假设我有一个时间戳值。 编辑 现在我正在使用获取上述时间的毫秒值; 根据Java文档,getTime()方法的定义是

  • 问题内容: 如何从MongoDB集合中的时间(HH:MM:SS.Milisecond)值大于零的日期字段中选择记录,并通过保持日期的时间(HH:MM:SS)值为零来更新记录值是否与Python脚本中的现有值相同? 当前数据如下所示- 如何在Python脚本中仅选择第4、5、6和7行,并使用时间戳将其更新为零? 更新后,数据如下所示- 问题答案: 最好的方式来更新您的文件,并在时间使用日期时间模块,

  • 我有一个Flink程序,它接受两个流,即数据/传感器读数流和警报规则流。我正在广播规则流,并将其连接到数据流以生成动态警报。ProcessingTime的一切都很好,但EventTime却不行。我已经分配了时间戳 > 当两个流(即带有时间戳的流)同时出现时,如何使用“EventTime”生成警报 我是否也必须为我的规则流分配时间戳和水印? 因为我的规则流只有在有任何添加/修改时才会有记录。是否有任

  • 我正在尝试使用Joda在一个简单的Java程序中获取UTC时间戳: 程序输出如下: 毫秒值是正确的UTC时间(即用时区确认)第二个值是时区。 我需要的是UTC值不变为(即独立于TZ),用于数据库写入。这可能吗? 我知道是本地日期(GMT-4),是UTC(GMT-0)。日期的输出值如下: 我尝试了所有组合,试图将的UTC值作为java.sql.TimeStamp: 用于测试的打印输出: 第一行是正确