来自Flink的官方文件:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/joining.html#interval-join
示例代码是:
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream
.keyBy(elem => /* select key */)
.intervalJoin(greenStream.keyBy(elem => /* select key */))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process(new ProcessJoinFunction[Integer, Integer, String] {
override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
out.collect(left + "," + right);
}
});
});
从上面的代码中,我想知道如何指定执行此间隔连接的开始时间(例如,从今天开始)(开始时间之前的数据将不考虑在内)。
例如,我已经运行了3天的程序,我不想对所有数据执行3天的连接,我只想对今天生成的数据执行连接。
我不认为它像你想象的那样工作。
在本例中,实际间隔是根据<code>orangeStream</code>的实际时间戳计算的,因此您并没有真正提供要考虑的数据间隔,而是类似于窗口,指定哪些元素将与OrangeStriam的给定元素连接。
因此,对于上述窗口,如果您有时间戳为5
的橙色元素,则它将与时间戳为3
到6的元素
连接。
我真的不认为您可以使用它仅对部分数据执行连接,我唯一能想到的是使用时间戳过滤数据,并过滤掉之前生成的所有元素。
你对 JavaScript 探索得越多,它就变得越清晰。闭包,对象,和方法这样的词现在可能看起来与你还有些距离,但是这本书将会帮你搞清楚这些术语。
我们有一个Flink任务,它将两个流连接起来,两个流都使用来自Kafka的事件。下面是示例代码 但是,我们没有看到任何连接输出。我们检查了每个流是否连续发射带有时间戳和适当水印的元素。有人知道可能的原因吗?
让事件反复发生 用法 Your browser does not support the video tag. 案例:坏掉的小台灯 功能:不停闪烁 工作原理 在配置项中可以选择每隔多少秒反转一次 例:制作一个闪烁的灯 例:温度过高时会发出“哔哔”声报警
我有一个名为Interval的setInterval,它运行一个倒计时计时器。我有一个开始按钮,第一次点击时播放,第二次暂停就好了。当我双击时,它会将计时器显示回零,但似乎并没有清除实际的计时器。将只播放在显示被0替换之前停止的地方。
# interval(date) Alias for interval.floor(date). For example, d3.time.day(new Date()) returns midnight (12:00 AM) on the current day, in local time. # interval.floor(date) Rounds down the specified da
我有一个PostgreSQL表,其列名为test_interval,类型为interval。 我想把数据插入到表中,数据的形式是: 我得到的错误不能插入类型。如何使用JOOQ插入postgres间隔数据类型的数据?