当前位置: 首页 > 知识库问答 >
问题:

Flink SQL间隔联接未触发

柳翼
2023-03-14

我在两个无界流之间有一个简单的间隔连接。这适用于较小的工作负载,但对于较大的(正式生产环境),它不再有效。通过观察输出,我可以看到FlinkSQL作业仅在扫描整个主题(并因此读入内存?)后才触发/发出记录,但我希望作业在找到ingle匹配后立即触发记录。因为在我的正式生产环境中,作业无法承受将整个表读入内存。

我正在做的间隔连接与这里提供的示例非常相似:https://github.com/ververica/flink-sql-cookbook/blob/main/joins/02_interval_joins/02_interval_joins.md

SELECT
  o.id AS order_id,
  o.order_time,
  s.shipment_time,
  TIMESTAMPDIFF(DAY,o.order_time,s.shipment_time) AS day_diff
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE 
    o.order_time BETWEEN s.shipment_time - INTERVAL '3' DAY AND s.shipment_time;

除了我的时间间隔尽可能小(几秒钟)。我在FlinkSQL源表上也有5秒的水印。

我如何指示 Flink 在与联接进行单个“匹配”后立即发出/触发记录?由于目前作业尝试在发出任何记录之前扫描整个表,这对于我的数据量是不可行的。根据我的理解,它应该只需要扫描直到间隔(时间窗口)并检查它,一旦间隔通过,就会发出/触发记录。

另外,通过观察集群,我可以看到水印正在移动,但没有发出任何记录。

共有1个答案

韩智敏
2023-03-14

可能是有些数据被放弃了,可以查看你的活动时间是否合理。在此场景中,您可以尝试使用常规联接并设置3天的ttl(table.state.ttl = 3天),这可以确保每个联接数据的输出。

 类似资料:
  • 在本地MacOS应用程序的触摸条中的两个项目之间创建一个间隔 进程: 主进程​ new TouchBarSlider(options) 实验功能 用途:创建新间隔 options Object size String (可选) - 间隔大小,可选值有: small - 小间隔 large - 大间隔 flexible - 占用所有可用空间

  • 我很难找到sinon间谍没有被触发的原因。在下面的测试中,两个控制台语句都报告为false,因此两个方法都没有被调用(以防出现错误)。 这是我的一个摩卡测试通常的样子: }); PostOnToller中的方法: 最后是PostModel中的方法: 如果我以正常的方式调用方法,它们会执行查找,返回预期的Posts数组。但是,不会执行间谍。另外,如果我将控制器类中的方法更改为 间谍函数(res)确实

  • 问题内容: 我有两个Redis客户端,在一个文件中,我有一个简单的脚本设置并删除了Redis密钥: 在第二个文件中,我有一个Redis客户端充当订户: 关键的“占位符”已设置,那么是否有充分的理由使我在“消息”处理程序中未获得任何输出? 问题答案: 您忘记了订阅用户客户端订阅特定的频道。此外,如果要监视所有事件,则需要使用基于模式的订阅。 您可能想要执行以下操作(未测试): 请参阅Redis文档和

  • 我们有一个Flink任务,它将两个流连接起来,两个流都使用来自Kafka的事件。下面是示例代码 但是,我们没有看到任何连接输出。我们检查了每个流是否连续发射带有时间戳和适当水印的元素。有人知道可能的原因吗?

  • 让事件反复发生 用法 Your browser does not support the video tag. 案例:坏掉的小台灯 功能:不停闪烁 工作原理 在配置项中可以选择每隔多少秒反转一次 例:制作一个闪烁的灯 例:温度过高时会发出“哔哔”声报警

  • # interval(date) Alias for interval.floor(date). For example, d3.time.day(new Date()) returns midnight (12:00 AM) on the current day, in local time. # interval.floor(date) Rounds down the specified da