当前位置: 首页 > 知识库问答 >
问题:

Flink如何设置初始水印

夏侯朝斑
2023-03-14

我正在使用Flink 1.3.2和scala构建一个流媒体应用程序,我的Flink应用程序将监视一个文件夹,并将新文件流到管道中。文件中的每条记录都有一个相关的时间戳。我想使用此时间戳作为事件时间,并使用AssignerWithPeriodicWatermarks构建水印,我的水印生成器如下所示:

class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[Activity] {
    val maxTimeLag = 6 * 3600000L // 6 hours
    override def extractTimestamp(element: Activity, previousElementTimestamp: Long): Long = {
    val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")
        val timestampString = element.getTimestamp
    }
    override def getCurrentWatermark(): Watermark = {
      new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
  }

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(10000L)
val stream = env.readFile(inputformart, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 100)

val activity = stream
      .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
      .map { line =>
        new tuple.Tuple2(line.id, line.count)
      }.keyBy(0).addSink(...)

但是,由于我的文件夹中有一些旧数据,我不想处理它们。旧文件中记录的时间戳是

共有1个答案

涂选
2023-03-14

您所展示的管道中没有关心时间的操作符——没有窗口,没有ProcessFunction计时器——因此每个流元素都将畅通无阻地通过并被处理。如果您的目标是跳过延迟的元素,则需要引入(以某种方式)将事件时间戳与当前水印进行比较的内容。

您可以通过在keyBy和接收器之间引入一个步骤来做到这一点,如下所示:

...
.keyBy(0)
.process(new DropLateEvents())
.addSink(...)

public static class DropLateEvents extends ProcessFunction<...> {
    @Override
    public void processElement(... event, Context context, Collector<...> out) throws Exception {
        TimerService timerService = context.timerService();
        if (context.timestamp() > timerService.currentWatermark()) {
           out.collect(event);
        }
    }
}

完成此操作后,您关于初始水印的问题就变得相关了。对于周期性水印,初始水印是Long.MIN_VALUE,因此在发出第一个水印之前不会考虑延迟,这将在操作10秒后发生(给定您如何设置自动水印间隔)。

如果您想更详细地了解如何生成周期性水印,请参阅此处的相关代码。

如果要避免在前10秒内处理延迟元素,可以完全忘记使用事件时间和水印,只需修改上面显示的processElement方法,将事件时间戳与系统进行比较。currentTimeMillis()-maxTimeLag,而不是当前水印。另一种解决方案是使用带标点的水印,并在第一个事件中发出水印。

或者更简单地说,您可以在平面图或过滤器中检测并删除延迟事件,因为您定义的是相对于系统的延迟。currentTimeMillis()而不是水印。

 类似资料:
  • 当安装完成并首次启动 Navicat Monitor 时,浏览器会弹出并打开你的 Navicat Monitor 的网址“http://<your_ip_address>:<port_number>”。你需要在欢迎页面完成 Navicat Monitor 的基本配置。 【注意】<your_host_address> 是安装了 Navicat Monitor 的系统的主机名,以及 <port_num

  • 我们试图构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)是可更新的。从阅读文档来看,在我看来,Flink broadcast state自然适合这种情况。 作为一个实验,我构建了一个简化的版本:假设我有一个整数流,第二个流包含这些整数的乘法因子(我可以随意发送值)。第二个流的频率很低,很容易在事件之间的几天或几周内出现。目前,这两个都实现为简单的套接字服务器,最终产品将使用

  • 问题内容: 我试图弄清楚如何为redux中的商店设置初始状态。我以https://github.com/reactjs/redux/blob/master/examples/todos- with- undo/reducers/index.js 为例。我试图修改代码,以便待办事项已初始化一个值。 按照文档操作:http : //redux.js.org/docs/api/createStore.h

  • 设置初始 被选中国家 。当Gio地球完成初始化之后,地球会有一个转动动画,将初始国家转至屏幕正前方。 默认设置下初始国家是"CN"(中国)。 默认初始国家: // 将初始国家改成"美国" controller.setInitCountry("US");

  • 我们正在部署一个新的Flink流处理作业,它的状态(存储)需要使用历史数据进行初始化,并且在开始处理任何新的应用程序事件之前,该数据应该在状态存储中可用。我们不想显着修改Flink作业以同时加载历史数据。我们考虑编写另一个单独的Flink作业来处理历史数据,更新其状态存储并创建一个Savepoint并使用此Savepoint在主Flink作业中初始化状态。看起来状态处理器API仅适用于DataSe

  • 本文向大家介绍google-cloud-storage 初始设置,包括了google-cloud-storage 初始设置的使用技巧和注意事项,需要的朋友参考一下 示例 Google维护着有关入门的文档:https://cloud.google.com/storage/docs/quickstart-console 准备使用GCS: 如果您还没有一个项目,请创建一个Google Cloud项目。