当前位置: 首页 > 知识库问答 >
问题:

PubsubIO在使用DataflowRunner和Dataflow服务运行时,不将自定义时间戳属性输出为context.timestamp

年烈
2023-03-14

我正在研究一个Apache Beam项目,该项目遇到了与自定义时间戳属性相关的Dataflow服务和PubsubIO问题。Beam SDK的当前版本是2.7.0。

在该项目中,我们有两个通过PubSub主题和订阅进行通信的数据流作业:

第一条管道(将数据下沉到PubSub)

这个管道工作在每个基础的消息上,因此除了globalwindows(Beam默认)之外,它没有应用自定义窗口策略。在这个管道的末尾,我们使用pubsubio.writeMessages()将已经分配了包括事件时间戳(例如“published_at”)的属性映射的所有消息下沉(写入)到一个PubSubTopic。

注意:如果我们使用pubsubio.writeMessages().WithTimestampatAttribute(),此方法将告诉pubsubio.shardfnpubsubio.writefnpubsubclient将下沉管道的处理时间写入/覆盖到映射中的该属性。

第二个管道(从PubSub读取数据)

在第二个管道(读取管道)中,我们尝试了pubsubio.readmessageswithattributes().withtimestampattribute(“published_at”)pubsubio.readstrings().withtimestampattribute(“published_at”)

  • 使用DirectRunner运行时,一切都如预期的那样工作良好。这些消息从PubSub订阅中读取并输出到下游阶段,其processContext.timestamp()等于其事件时间戳“published_at”.
  • 但是当使用DataflowRunner运行时,processContext.timestamp()总是被设置为接近实时,接近下沉管道的处理时间。我们检查并可以确认这些时间戳不是来自PubSub的发布时间。然后,所有数据都被分配到错误的窗口,与它们的事件域时间戳相比较。我们希望删除较晚的数据,而不是将其分配到无效的窗口中。

注意:在打开第二个管道以获得某种历史/后期数据之前,我们已经让Pubsub主题填充了大量数据。

具有无效上下文时间戳的Pubsub消息

假定根本原因

深入研究DataflowRunner的源代码,我们可以看到Dataflow服务使用一个完全不同的Pubsub代码(在构建管道时重写pubsubio.Read)从Pubsub读取和接收到Pubsub。

因此,如果我们想要使用Beam SDK的PubsubIO,我们必须使用实验选项“enable_custom_pubsub_source”。但是到目前为止还没有运气,因为我们遇到了这个问题https://jira.apache.org/jira/browse/beam-5674,并且还没有能够测试Beam SDK的Pubsub代码。

变通解决方案

我们当前的解决方案是,在为消息分配窗口的步骤之后,我们实现了dofn来根据它们的intervalwindow检查它们的事件时间戳。如果窗口无效,那么我们只需删除消息,然后每周或半周运行一次作业,从历史来源更正它们。最好有一些缺失的数据,而不是计算不当的数据。

由于无效的窗口而丢弃的消息

请与我们分享这个案例的经验。我们知道,从数据流水印管理的角度来看,如果摄入的数据稀疏(不够密集),水印就会自动调整到当前的实时状态。

我们还认为,我们对Dataflow服务维护PubSubUnboundedSource的输出时间戳的方式有些误解,因为我们对Apache Beam和Google的Dataflow仍然是新手,所以有些事情我们还不知道。

多谢!

共有1个答案

权弘新
2023-03-14

我找到了解决这个问题的方法。在我的下沉管道中,timestamp属性设置的日期格式与RFC3339标准相比是错误的。格式化日期缺少“Z”字符。我们要么修复了'z'字符,要么改为使用自纪元以来的毫秒。两者都很有效。

但有一点是,当Dataflow服务无法解析错误的日期格式时,它确实警告或抛出错误,但却占用了所有元素的处理时间,因此,它们被分配到错误的event_time窗口。

 类似资料:
  • 我正在尝试使用AWS CDK而不是无服务器在AWS Lambda中使用Bref自定义运行时。 CDK代码如下所示。 这里是完整的源代码https://github.com/petrabarus/cdk-bref-function 当我尝试使用AWS CLI手动调用时,它显示错误。 Cloudwatch日志显示了如下内容。 如果我将部署与无服务器框架进行比较,那么配置(层、代码等)几乎是相同的。我错

  • 问题内容: 我想将我的python软件包设为“ pip installable”。问题在于,程序包具有必须来自用户的初始Shell脚本(例如)的Shell脚本。 但是在安装之后,用户并不完全知道脚本的去向(大概是,但是我们不能保证)。当然,用户可以运行并手动编辑其初始化脚本。 但我想使这一步骤自动化。我可以创建一个新的distutils命令,但不调用它。而且我可以扩展,但是安装会通过pip中断(尽

  • 问题内容: 我想输出带有PST偏移量的时间戳(例如2008-11-13T13:23:30-08:00)。似乎没有以 hour:minute 格式输出时区偏移量,但排除了冒号。有没有一种简单的方法来获取Java中的时间戳? 另外,无法正确解析上述示例。它抛出一个。 问题答案: 从Java 7开始,提供了ISO8601时区的模式字符串。对于您描述的格式的字符串,请使用。请参阅文档。 样品: 结果:

  • 我在一个Kafka主题“原始数据”中获取CSV,目标是通过在另一个主题“数据”中发送具有正确时间戳(每行不同)的每行来转换它们。 null 我想通过直接设置时间戳来删除这个“内部”主题的使用,但我找不到一个方法(时间戳提取器只在消耗时间使用)。 我在文档中偶然发现了这一行: 请注意,通过调用#forward()时显式地为输出记录分配时间戳,可以在处理器API中更改description默认行为。

  • 我有一个Java应用程序,它使用Prometheus库,以便在执行期间收集度量。稍后,我将Prometheus服务器链接到Grafana,以便可视化这些度量。我想知道是否可以让格拉法纳为这些度量显示一个自定义的X轴?通常的X轴是在当地时间。我能让它显示带有GPS/UTC时间戳的数据吗?有可能吗?如果是,需要什么?保存时间戳的附加度量参数? 我这样声明度量变量: 并添加如下所示的数据: 如有任何帮助

  • 问题内容: 我已经创建了自己的文件,并通过将其设置为系统类加载器。它已经初始化,并且一切正常,但是找不到我要加载的类。这是: 我已确认该罐子存在,并且路径正确。这是我在程序中如何称呼它: 这是我得到的例外(第166行是指我尝试创建新行的行: 我什至尝试像这样显式加载类: 是什么原因造成的?它不应该“正常工作”吗? 更新: 这是来自的重要代码 更新2: 这是一个SSCCE:http : //nucl