当前位置: 首页 > 知识库问答 >
问题:

不懂间隔加入Flink

訾朗
2023-03-14

来自Flink的官方文件:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/joining.html#interval-join

示例代码是:

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] {
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
         out.collect(left + "," + right); 
        }
      });
    });

从上面的代码中,我想知道如何指定执行此间隔连接的开始时间(例如,从今天开始)(开始时间之前的数据将不考虑在内)。

例如,我已经运行了3天的程序,我不想对所有数据执行3天的连接,我只想对今天生成的数据执行连接。

共有1个答案

管玉堂
2023-03-14

我不认为它像你想象的那样工作。

在本例中,实际间隔是根据<code>orangeStream</code>的实际时间戳计算的,因此您并没有真正提供要考虑的数据间隔,而是类似于窗口,指定哪些元素将与OrangeStriam的给定元素连接。

因此,对于上述窗口,如果您有时间戳为5的橙色元素,则它将与时间戳为36的元素连接。

我真的不认为您可以使用它仅对部分数据执行连接,我唯一能想到的是使用时间戳过滤数据,并过滤掉之前生成的所有元素。

 类似资料:
  • 你对 JavaScript 探索得越多,它就变得越清晰。闭包,对象,和方法这样的词现在可能看起来与你还有些距离,但是这本书将会帮你搞清楚这些术语。

  • 我们有一个Flink任务,它将两个流连接起来,两个流都使用来自Kafka的事件。下面是示例代码 但是,我们没有看到任何连接输出。我们检查了每个流是否连续发射带有时间戳和适当水印的元素。有人知道可能的原因吗?

  • 让事件反复发生 用法 Your browser does not support the video tag. 案例:坏掉的小台灯 功能:不停闪烁 工作原理 在配置项中可以选择每隔多少秒反转一次 例:制作一个闪烁的灯 例:温度过高时会发出“哔哔”声报警

  • 我有一个名为Interval的setInterval,它运行一个倒计时计时器。我有一个开始按钮,第一次点击时播放,第二次暂停就好了。当我双击时,它会将计时器显示回零,但似乎并没有清除实际的计时器。将只播放在显示被0替换之前停止的地方。

  • # interval(date) Alias for interval.floor(date). For example, d3.time.day(new Date()) returns midnight (12:00 AM) on the current day, in local time. # interval.floor(date) Rounds down the specified da

  • 我有一个PostgreSQL表,其列名为test_interval,类型为interval。 我想把数据插入到表中,数据的形式是: 我得到的错误不能插入类型。如何使用JOOQ插入postgres间隔数据类型的数据?