当前位置: 首页 > 知识库问答 >
问题:

Flink表异常:只能在时间属性列上定义窗口聚集,但遇到时间戳(6)

易成天
2023-03-14

我正在使用flink 1.12.0。尝试将数据流转换为表A并在tableA上运行sql查询以在如下窗口上聚合。我使用f2列作为其时间戳数据类型字段。

    EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", BOOTSTRAP_SERVER);
    props.setProperty("schema.registry.url", xxx);
    props.setProperty("group.id", "test");
    props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());

    props.put("client.id", "flink-kafka-example");

    FlinkKafkaConsumer<PlaybackListening> kafkaConsumer = new FlinkKafkaConsumer<>(
            "test-topic",
            ConfluentRegistryAvroDeserializationSchema.forSpecific(
                    Avrotest.class, prodSchemaRegistryURL),
            props);

    DataStreamSource<Avrotest> stream =
            env.addSource(kafkaConsumer);
    Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2"));
    Table result =
            tEnv.sqlQuery("SELECT f0, sum(f1),f2 FROM "
                    + tableA + " GROUP BY TUMBLE(f2, INTERVAL '1' HOUR) ,f1" );

    
    tEnv.toAppendStream(result,user.class).print();

    env.execute("Flink kafka test");
}

当我执行上述代码时,我得到

线程“main”org.apache.flink.table.api中出现异常。TableException:只能在时间属性列上定义窗口聚合,但遇到时间戳(6)。在org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateGroup表达式(StreamLogicalWindowsAggregaterule.scala:50)和org.apach.flink.table.plan.rules.LogicalWindowaggregateBase.onMatch(LogicalWindowAGgregaterBase.scala:81)中,在org.apache.calcite.plan.AbstractRelopPlanner.fireRule(AbstractReplopLanner.java:333)中org.apache.calcite.plan.hep.HepPlanner.applyRules(hepplanter.java:542)位于org.apach.calciite.plan.hep.heppanner.applyroles(HepPlanner.java:407),org.apaache.calcites.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243),org.apache.calcite.plan.HepInstruction$RuleInstance.execute(hepInstitution.java:127)

共有2个答案

王子明
2023-03-14

3个步骤:

  1. 首次分配分配时间戳和水印

你有几种策略。

例如:

 WatermarkStrategy<Row> customTime = WatermarkStrategy
                .<Row>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((event, timestamp) -> (long) event.getField("f2"));
env.addSource().assignTimestampsAndWatermarks(customTime)
Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2").rowtime());
汪晨
2023-03-14

若要使用表 API 在数据流上执行事件时间窗口化,需要先分配时间戳和水印。您应该在从DataStream调用之前执行此操作。

使用Kafka,通常最好直接在FlinkKafkaConsumer上调用depodTimestampsAndWatermarks。有关详细信息,请参阅水印文档、kafka 连接器文档和 Flink SQL 文档。

 类似资料:
  • 我正在尝试flink的一些网络监控工作。我的目标是计算每个的不同。 我下面的代码工作,但性能真的很糟糕。似乎每个滑动窗口都重新计算所有事件,但这不应该是必要的。 例如,我们有时间秒1-600的事件。Flink可以得到每秒的累加器,所以我们每秒有600个累加器。当第一个滑动窗口过期时,flink只合并1-300的累加器,并销毁第二个1的累加器。此窗口还可以在最后一秒前预合并1-299。当第二个滑动窗

  • 我有一个带有数据和时间戳的记录日志,我的Flink应用程序按时间戳升序接收记录。在某个键的第一个项到达窗口后,我想在X事件时间后关闭窗口,检查是否有足够的项到达某个条件,并为该键发出通过或失败消息。 对于Flink中的基本窗口功能,这是不可能的吗?例如,如果我希望我的窗口有30秒长,但是键的第一个项在15秒到达,最后一个项在40秒到达,似乎窗口将在30秒关闭,并且该键的记录轨迹将是分成两个窗口。在

  • null 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。 在此场景中,几个事件到达Flink运算符时具有

  • 我有一个Java应用程序,它使用Prometheus库,以便在执行期间收集度量。稍后,我将Prometheus服务器链接到Grafana,以便可视化这些度量。我想知道是否可以让格拉法纳为这些度量显示一个自定义的X轴?通常的X轴是在当地时间。我能让它显示带有GPS/UTC时间戳的数据吗?有可能吗?如果是,需要什么?保存时间戳的附加度量参数? 我这样声明度量变量: 并添加如下所示的数据: 如有任何帮助

  • 我有数据流就像 事件名,事件id,Start_time(时间戳)... 在这里,我想对最后一个带有时间戳的字段<;code>;Start_。 因此,我在flink window中看到的是,所以我猜它需要过去30分钟的事件,但不考虑 我想把数据放在start_ time在最后30分钟内的位置,然后我如何编写转换?我是否需要使用该列使用? 我是Flink的新手。 谢啦