当前位置: 首页 > 知识库问答 >
问题:

Flink SQL窗口不报告最终结果

宋华灿
2023-03-14

我使用Flink SQL计算基于事件时间的窗口分析。在我的数据源每天晚上空闲之前,一切都正常工作,之后直到第二天数据再次开始流动时才产生最后一分钟的结果。

CREATE TABLE input
    id STRING,
    data BIGINT,
    rowtime TIMESTAMP(3) METADATA FROM 'timestamp',
    WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
WITH (
    'connector' = 'kafka',
    'topic' = 'input',
    'properties.bootstrap.servers' = 'localhost:9092',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
)

SELECT ...
FROM
  (SELECT * FROM
     TABLE(TUMBLE(TABLE input, DESCRIPTOR(rowtime), INTERVAL '1' MINUTES)))
GROUP BY ..., window_start, window_end

我已尝试将<code>设置为table.exec.source。空闲超时,但没有帮助。我能做什么?

共有1个答案

奚昌胤
2023-03-14

table.exec.source.idle-timeout(以及与DataStream API for Watermark Strategy一起使用的相应with Idlness构造)检测空闲的输入分区并防止它们阻碍整体水印的进度。但是,为了使整体水印前进,仍然必须在某个地方有一些输入。

一些选项:

(1)解决问题,这意味着等待水印可以正常前进,基于观察输入流中更大的时间戳。正如您所指出的,在您的用例中,这可能需要等待几个小时。

(2) 安排输入流包含保持活动状态的消息。这样,水印生成器将有证据(基于保持活动状态消息中的时间戳)它可以推进水印。您必须修改查询以忽略这些其他无关的事件。

(3)在到达作业已经完全摄取了所有的日常输入,但是还没有产生最终结果集的点时,停止作业并指定< code> - drain。这将通过管道发送一个值为MAX_WATERMARK的水印,这将关闭所有挂起的窗口。然后,您可以重新启动该作业。

(4)实现自定义水印策略,使用处理时间定时器检测空闲,并根据挂钟时间的流逝人为地推进水印。这将需要将表输入转换为 DataStream,在其中添加水印,然后转换回表以进行窗口化。有关这些转换的示例,请参阅 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/。

 类似资料:
  • 背景:我正在尝试在计划任务失败时收到电子邮件通知。我的任务可以通过退出代码(错误级别)指示失败,我想使用它并按照本答案中描述的过滤器方法来触发电子邮件。 问题:我在不同的计算机和版本的 Windows 上从任务计划程序收到不一致的行为。为了进行测试,我使用以下非常简单的任务。 仅在用户登录时运行。(其中“用户”是当前用户) 要执行的操作: 启动一个程序 程序/脚本: 参数: 开始时间:空白 我已经

  • 您可以导出Burp Scanner生成的部分或全部问题的报告。你可以依次打开站点地图(Site map)-->问题视图(Issues view)或在问题活动日志中选择报告所选问题(Report selected issues),报告向导将带着您您为报告做各种选项,具体如下。 查看示例报告 报告格式 您可以为报告选择以下格式之一: HTML - 生成HTML格式的报告,以便在浏览器中打印或查看。 X

  • 问题内容: 在Objective-C for Cocoa Apps中,可以使用这种方式使窗口始终位于顶部吗? 如何用Swift实现相同的目标? 导致构建错误 问题答案: 要更改窗口级别,您不能在viewDidload中执行此操作,因为视图的window属性始终在那里为零,但是可以通过覆盖viewDidAppear方法或IBAction方法来实现: 斯威夫特1 迅捷2 迅捷3 斯威夫特4 最后,他们

  • 问题内容: 我正在 引发该异常的代码。我正在从无限运行的线程调用刷新,直到有要刷新的数据为止。 编辑:我正在调用独立线程中的刷新,因为数据存储列表包含数据。从我看来,它是对刷新的同步操作调用,因此理想情况下,刷新应在事务完成之前不返回。我希望这种方式是我最不希望看到的。由于它是一个独立的线程来完成其工作,因此我所关心的只是刷新操作是同步操作。现在我的问题是,txD.commit是异步操作吗?它在该

  • 我正在尝试测量具有窗口操作的 Flink 应用程序的延迟,如下所示: 我正在考虑事件时间并提取时间戳,我使用这个水印策略: 聚合函数将特定对象保存为累加器,其中还包含提取的时间戳;这些时间戳写在 kafka 主题中。问题是返回的时间戳如下: 返回的时间戳并不像我预期的那样间隔相等,第四个和第五个是相等的,但它们的返回间隔为15秒,这是不可能的,因为应用程序记录的输入每秒(每秒10个)连续生成。在其

  • 我想做的是: < li >消费数字主题中的记录(Long的) < li >聚合(计数)每个5秒窗口的值 < li >将最终聚合结果发送到另一个主题 我的代码如下: 看起来一切都像预期的那样工作,但聚合被发送到每个传入记录的目标主题。我的问题是如何仅发送每个窗口的最终聚合结果?