在Flink中,我发现了2种设置水印的方法,
第一个是
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(5000)
第二个是
env.addSource(
new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)
我想知道哪个最终会生效。
这两者之间根本没有冲突——他们正在处理不同的问题。指定的一切都将生效。
第一个,
env.getConfig.setAutoWatermarkInterval(5000)
指定生成水印的频率(每5000毫秒一个水印)。如果未指定,则将使用默认值200毫秒。
第二个,
env.addSource(
new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)
指定如何计算这些水印的详细信息。一、 例如,它们应由FlinkKafkaConsumer使用一种有界的Autoforderness策略生成,有界延迟为10秒。水印策略也需要时间戳赋值器。
没有默认的Watermark Strategy
,因此如果您想使用事件时间,则需要类似于第二个代码片段的东西。
我们正在构建一个流处理管道,以使用Flink v1.11和事件时间特性来处理Kinesis消息。在定义源水印策略时,在官方留档中,我遇到了两个开箱即用的水印策略;forBoundedOutOfOrthy和forMonotonousTimestamps。但根据我对上述内容的理解,我认为这些不适合我的用法。以下是我的用法细节: 来自输入流的数据:(包含每分钟带有时间戳的数据) 现在,我想处理11:00
我们正在构建一个流处理管道来处理/摄取Kafka消息。我们正在使用Flink v1.12.2。在定义源水印策略时,在官方留档中,我遇到了两种开箱即用的水印策略;forBoundedOutOfOrness和forMonotonousTimestamps。我确实浏览了javadoc,但并不完全理解何时以及为什么你应该使用一种策略而不是另一种策略。时间戳基于事件时间。谢谢。
我有一个Flink 1.11作业,它使用来自Kafka主题的消息,键入它们,过滤它们(keyBy后跟自定义ProcessFunction),并通过JDBC接收器将它们保存到db中(如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html) Kafka消费者使用以下选项初始化:
我正在尝试编写一个Flink应用程序,它从Kafka读取事件,从MySQL丰富这些事件并将这些数据写入HBase。我正在中进行MySQL丰富,我现在正在尝试弄清楚如何最好地写入HBase。我想批量写入HBase,所以我目前正在考虑使用,后跟标识(仅返回),然后编写,它获取记录列表并批处理放入。 这是正确的做事方式吗?仅仅为了进行基于时间的缓冲而使用所有窗口和应用窗口感觉很奇怪。
我试图了解Apache FLink中Windows和Watermark生成之间的依赖关系,我在下面的示例中出现错误: 这里的时间戳是一个长的,我们可以从Kafka源中检索到,应该是:a,4 C,8,其中C是类别,5是时间戳。 每当我发送事件时,数据流都会打印,但不会使用窗口打印这些事件(打印(“Windows”)。此外,如果我收到一个事件A,12,然后生成了一个水印(在10秒内),那么我有C,2,
我正在使用Flink 1.3.2和scala构建一个流媒体应用程序,我的Flink应用程序将监视一个文件夹,并将新文件流到管道中。文件中的每条记录都有一个相关的时间戳。我想使用此时间戳作为事件时间,并使用AssignerWithPeriodicWatermarks构建水印,我的水印生成器如下所示: 但是,由于我的文件夹中有一些旧数据,我不想处理它们。旧文件中记录的时间戳是