当前位置: 首页 > 知识库问答 >
问题:

Flink SQL查询不返回结果

方风华
2023-03-14

我有下面的SQL查询,我在flink工作中使用。< code>mysql_table是使用JDBC连接器创建的,而< code>kafa_source表是从传入的kafka流创建的。

SELECT T.event_id, T.event_name, TUMBLE_END(T.event_time, INTERVAL '5' MINUTE) AS event_time,
MAX(T.event_value) AS max_event_value FROM (
SELECT d.event_id, d.event_name, d.event_source_id, d.event_key, s.event_value, s.event_time
FROM kafka_source s JOIN mysql_table FOR SYSTEM_TIME AS OF s.proc_time AS d
ON d.event_id = s.event_id and d.event_name = s.event_name) T
GROUP BY T.event_id, T.event_key, TUMBLE(T.event_time, INTERVAL '5' MINUTE)

我在两者之间执行时态连接,当我在Flink的sql-client CLI中检查时,运行良好(用< code>flink-faker测试)。内部查询工作得非常好,并且正在打印结果。有人能帮助我找出这个问题吗?

编辑:我正在寻找在5分钟内创建的TUMBLE事件的输出,如下所示

+I ("11", "SPILL_OVER", 2022-04-28T00:30:00", 28.0)
+I ("11", "SPILL_OVER", 2022-04-28T00:35:00", 32.4)
+I ("11", "SPILL_OVER", 2022-04-28T00:40:00", 19.6)
+I ("11", "SPILL_OVER", 2022-04-28T00:45:00", 22.3)

mysql表的模式是

+-----------------+--------------+
| Field           | Type         |
+-----------------+--------------+
| event_id        | varchar(64)  |
| event_source_id | varchar(255) |
| event_name      | varchar(255) |

而Kafka表的诡计是

event_id STRING
event_name STRING
event_time TIMESTAMP(9)
event_value DOUBLE

编辑:我观察到<code>TUMBLE_END</code>在<code>PROCTIME()</code>列中工作正常,但在<code>event_time</code>中不工作。下面是分别选择<code>proc_time的查询的输出。

+I[2022-05-09T14:36:21.078Z, 2022-05-09T14:36:14.163Z]
+I[2022-05-09T14:36:21.079Z, 2022-05-09T14:36:19.170Z]

下面的查询在kafka_source表之上工作。

SELECT event_id, event_name, MAX(event_value), TUMBLE_END(proc_time, INTERVAL '2' MINUTE)
FROM kafka_source
GROUP BY event_id, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE)

并给出如下类似的输出

+I[[11, SPILL_OVER, 2022-05-09T20:10, 0.93]
+I[[12, SPILL_OVER, 2022-05-09T20:10, 0.9]
+I[[11, PRAXY, 2022-05-09T20:12, 0.91]

当我使用event_time代替proc_time相同的查询不会返回任何结果。我正在表中创建这些列,如下所示:

Schema.newBuilder()
    .columnByExpression("proc_time", "PROCTIME()")
    .columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTime, 3)")
    .watermark("event_time", "event_time - INTERVAL '20' SECOND")
    .build()

其中 eventTime 只是来自 kafka 主题的传入时间戳值。两个字段的类型相同,TIMESTAMP_LTZ(3)

proc_time TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,
event_time TIMESTAMP_LTZ(3) *ROWTIME*

我在这里犯了什么错误?

共有1个答案

百里智勇
2023-03-14

我通过向表环境添加一个设置来解决这个问题。

tableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")

我可以在设置超时后立即创建窗口

 类似资料:
  • 我正在玩mongob,并将一些测试数据{name:"david"}输入到“用户”集合中。我通过键入mongoshell验证了数据在MongoDB中 结果: 在 node.js 脚本中,使用以下代码: 不返回任何结果 我没有发现任何错误,也没有错误。请告知

  • Firebase查询返回此查询的值 该项由model类捕获 但是,返回null

  • 以下是我的疑问.... 我没有结果。 另外,我正在使用这个插件来生成请求正文。 我的查询如下所示.. null 感谢您到目前为止的阅读,如果有人能帮助我找出如何使这一工作,我将非常感谢。

  • 问题内容: 我有一个数据库,正在运行以下查询: 上面的查询一次返回两个结果集,我不能分别触发两个查询。如何在Java类中一次处理两个结果集? 问题答案: 正确的代码来处理JDBC语句返回的多个: 重要位: 并返回以表明语句的结果只是一个数字,而不是一个。 您需要检查以了解是否还有更多结果。 确保关闭结果集或使用

  • 问题内容: 我有以下从数据库获取十六进制代码的函数 我的问题是我在回调函数中返回了结果,但getColour函数未返回任何内容。我希望getColour函数返回的值。 在我调用getColour的那一刻,它不返回任何内容 我尝试做类似的事情 但当然SELECT查询在返回值时已经完成 问题答案: 您只需要对回调中的db查询结果进行处理。就像。

  • 问题内容: 我有一个运行中的elasticsearch的内存实例,并做了一些探索性的编码来学习搜索Java API。我能够将文档提交到索引并使用GET检索它们,但是当我尝试简单的搜索查询时,没有得到任何结果。 经过一些测试后,我认为问题出在我如何设置节点和关联的客户端(在内存中): 问题答案: Googleelasticsearch小组中的某个人很友好,可以在这里帮助我。将文档提交到内存节点后,我