当前位置: 首页 > 知识库问答 >
问题:

处理文件前的Apache beam/PubSub时间延迟

闻人飞白
2023-03-14

我需要延迟处理或发布文件名(文件)。我在寻找最好的选择。

目前,我有两个Apache Beam数据流和PubSub介于两者之间。首先,dataflow从源代码读取文件名,并将其推送到PubSub主题。另一个数据流读取它们并处理它们。然而,我的用例是在源代码中创建实际文件至少1小时后开始处理/读取这些文件。

所以我有两个选择:

1)延迟发布消息,以便在好的/预期的时刻立即处理它

2)延迟处理检索到的文件

共有1个答案

姚才捷
2023-03-14

您可能通过发布作业中的触发器/窗口配置来实现您想要的。

然后,您可以定义一个窗口配置,在该配置中,触发器直到延迟1小时后才触发。类似于:

Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))
      .triggering(AfterProcessingTime.pastFirstElementInPane()
                  .plusDelayOf(Duration.standardHours(1)))

请记住,你最终会得到一份简单的工作,除了保持一个小时的状态之外,什么也不做。另外,上面所述的只是基于处理时间,所以即使文件的实际创建时间足够长,它也会在作业开始后等待一个小时,以便立即发出结果。

您可以将其细化为事件时间触发器,但可能需要编写自己的代码来为记录(文件名)分配时间戳。据我所知,Beam目前不具备读取文件创建时间的内置支持。例如,当通过textio读取文件时,我观察到所有记录都被分配了一个默认的静态时间戳。您应该检查用于读取文件名的转换的细节,看看它是否对您的目的更有用。您还可以使用WithTimestamps转换来自行分配时间戳。

 类似资料:
  • 到目前为止,我了解到有3种方法可以处理Flink中的后期数据: > 删除延迟事件(这是事件时间窗口运算符的默认行为。(因此,延迟到达的元素不会创建新窗口。)( 重定向延迟事件(也可以使用side输出功能将延迟事件重定向到另一个数据流) 通过包含延迟事件更新结果(重新计算不完整的结果并发出更新) 我不太清楚非窗口操作符的延迟事件会发生什么,特别是当时间戳被分配到源时。这里我有一个FlinkKafka

  • 问题内容: 我正在努力为正在研究的项目中的某些实用程序类设置单元测试,其中一个类(包含许可信息)具有一种根据当前时间进行确定的方法。 也就是说,许可证包含到期日期,并且许可证字符串会验证该日期,但是查看许可证是否过期的实际逻辑是基于当前时间。 因此,我不确定该怎么做,因为“ new Date()”不是静态标准。 我是否应该不费心地测试’isValid’,而只是分别测试’isLicenseStrin

  • 之前章节已经提过,filters/date 插件可以用来转换你的日志记录中的时间字符串,变成 LogStash::Timestamp 对象,然后转存到 @timestamp 字段里。 注意:因为在稍后的 outputs/elasticsearch 中常用的 %{+YYYY.MM.dd} 这种写法必须读取 @timestamp 数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用 filte

  • 如何在Vertx中处理延迟作业列表(实际上是数百个HTTP GET请求,到禁止快速请求主机的有限API)?现在,我正在使用此代码,它被阻止,因为Vertx一次启动所有请求。希望在每个请求之间有5秒的延迟来处理每个请求。

  • 我需要延迟处理一些事件。 我有三件事(发表在Kafka上): A(id: 1, retry At: now) B(id: 2, retry At: 10分钟后) C(id: 3, retry At: now) 我需要立即处理记录A和C,而记录B需要在十分钟后处理。这在Apache Flink中实现可行吗? 到目前为止,无论我研究了什么,“触发器”似乎都有助于在Flink中实现它,但还没有能够正确实