当前位置: 首页 > 知识库问答 >
问题:

云数据流新鲜度和延迟的确切定义是什么?

斜向文
2023-03-14

问题:

使用云数据流时,我们得到了2个指标(请参见本页):

  1. 系统延迟

Stackdriver中也提供了以下名称(摘自此处):

system\u lag:数据项等待处理的当前最长持续时间,以秒为单位。

data\u watermark\u age:管道已完全处理的最新数据项的期限(自事件时间戳起的时间)。

但是,这些描述仍然非常模糊:

  1. “等待处理”是什么意思?消息在pubsub中等待多长时间?或者它在管道内等待的总时间
  2. “最长持续时间”:处理该最长项目后,是否会调整指标
  3. “自事件时间戳起的时间”是否意味着如果我的事件在时间戳t1被放入pubsub,并且它在时间戳t2流出管道的一端,那么管道在t1?我想我可以假设,如果度量值是t1,那么可以假设处理了t1之前的所有内容

问题:

由于这些指标与Apache Beam的语义学一致,我很想看到一些示例,或者至少更清楚地定义这些指标以使它们可用。

共有1个答案

赫连冠玉
2023-03-14

这些指标是出了名的棘手。Beam/数据流团队的一位成员在本次演讲中可以看到对它们工作方式的深入探讨。

管道被拆分为内存中发生的一系列计算,以及需要将数据序列化到某种数据存储的计算。例如,考虑以下管道:

with Pipeline() as p:
  p | beam.ReadFromPubSub(...)  \
    | beam.Map(parse_data)
    | beam.Map(into_key_value_pairs) \
    | beam.WindowInto(....) \
    | beam.GroupByKey() \
    | beam.Map(format_data) \
    | beam.WriteToBigquery(...)

这个管道将被分成两个阶段。一个阶段是一系列可以应用于内存的计算。

第一阶段从ReadFromPubSubGroupByKey操作。这两个PTransform之间的所有内容都可以在内存中完成。要执行GroupByKey,需要将数据写入持久状态(因此写入新源)。

第二个阶段是从GroupByKey到WriteToBigQuery。在这种情况下,数据是从“源”读取的。

每个源都有自己的一组水印。您在数据流UI中看到的水印是来自管道中任何源的最大水印。

--

回答您的问题:

  1. 等待处理的是什么?

答复

它是一个元素在PubSub中等待的时间。具体来说,一个元素在管道中的任何源中等待的时间。

考虑一个更简单的管道:

<代码>ReadFromPubSub-

此管道对每个项目执行以下操作:从PubSub读取项目-

现在,假设BigQuery服务宕机5分钟。这意味着PubSub在5分钟内不会收到任何元素的确认。因此,这些元素将在PubSub中停留一段时间。

这意味着当BQ写入被阻止时,系统延迟(以及数据新鲜度指标)将膨胀到5分钟。

答复

没错。例如,再次考虑之前的管道:BQ死了5分钟。当BQ回来时,可能会向其写入大量项目,并确认为从PubSub读取。这将大大减少系统延迟(和数据新鲜度)回到几秒钟。

答复

事件时间戳可以作为消息的属性提供给PubSub。这是一个有点棘手的概念,但本质上:

每个阶段都有一个输出数据水印。输出数据水印为T表示计算已处理事件时间早于T的所有元素。输出数据水印可以是其所有上游计算的最早输入水印。但是,如果有一些输入数据尚未处理,则输出水印可能会被保留。

当然,这个指标是启发性的。如果某个数据点很晚才到达,那么数据的新鲜度将被延迟。

--

我建议你看看斯拉瓦的演讲。它涵盖了所有这些概念。

 类似资料:
  • 我试图理解延迟和延迟订阅操作符之间的区别。 本文件描述了延迟操作员: 延迟操作符通过在发出每个源可观察项之前暂停特定的时间增量(您指定)来修改其源可观察项。这会将可观测项发出的整个项目序列在时间上向前移动指定的增量 delaySubscription是这样描述的: 还有一个操作符,您可以使用它延迟对源可观察对象的订阅:delaySubscription。 然而,当我测试这两个操作员的行为时,我觉得

  • 根据我所看到的,在Spring Cloud Dataflow(SCDF)中创建流将部署底层应用程序,绑定通信服务(如RabbitMQ),设置Spring Cloud stream环境变量,并启动应用程序。这一切都可以使用cf push命令轻松手动完成。 同时,我在Spring Cloud Dataflow中遇到了一些缺点: SCDF服务器是PCF上的内存占用者(我有一个只有6个应用程序的流,但我需

  • 这两天我一直在和Git Bash合作。我现在知道了诸如、、、和等基本操作。但我还是不知道Git Bash本身到底是什么! 我已经搜索了很多关于Git Bash的内容,但是我看到的所有站点都集中在它的命令的功能上。我仍然没有为我的问题找到一个好的答案。现在,我想,我在正确的地方得到这个答案!

  • 我读过几篇关于Flink的文章,在读Flink的博客时,我遇到了这样一句话:“最多延迟60秒(事件最多延迟1分钟)” 是否在Flink中定义乱序事件持续时间用于技术“水印”,如果不是,那么内部目的是什么?

  • 1、mybatis 是否支持延迟加载? 延迟加载其实就是讲数据加载时机推迟,比如推迟嵌套查询的时机。 延迟加载可以实现先查询主表,按需实时做关联查询,返回关联表结果集,一定程度上提高了效率。 mybatis仅支持关联对象association和关联集合对象collection的延迟加载,association是一对一,collection是一对多查询,在mybatis配置文件中可以配置lazylo

  • 我在计算一个简单蒸汽的最大值,结果是: (S11000,S1,值:999) (S12000,S1,值:41) 最后一行数据明显迟到了: 为什么按第一个窗口(0-1000)计算? 我认为第一个窗口应该在到达时触发。 对于这个结果,我很疑惑。 MyReductingMax(),MyWindowFunction()