当前位置: 首页 > 知识库问答 >
问题:

合并两个PCollection(Apache beam)

刁俊人
2023-03-14

我在云存储中有两个文件。包含Avro格式的File1,其中包含来自温度传感器的数据。

time_stamp     |  Temperature
1000           |  T1
2000           |  T2
3000           |  T3
4000           |  T3
5000           |  T4
6000           |  T5

包含Avro格式的File2,其中包含来自风传感器的数据。

time_stamp     |  wind_speed
500            |  w1
1200           |  w2
1500           |  w3
2200           |  w4
2500           |  w5
3000           |  w6

我想像下面这样组合输出

time_stamp |Temperature|wind_speed
1000       |T1         |w1 (last earliest reading from wind sensor at 500)
2000       |T2         |w3 (last earliest reading from wind sensor at 1500)
3000       |T3         |w6 (wind sensor reading at 3000)
4000       |T3         |w6 (last earliest reading from wind sensor at 3000)
5000       |T4         |w6 (last earliest reading from wind sensor at 3000)
6000       |T5         |w6(last earliest reading from wind sensor at 3000)

我正在寻找apache光束中的解决方案来组合上述文件。现在它正在从文件中读取,但将来可能会通过pubsub来读取。我想找出组合两个PCollection的自定义方法,并创建另一个PCollection temDataSusWindSpeed。

     PCollection<Temperature> tempData = p.apply(AvroIO
         .read(AvroAutoGenClass.class)
         .from("gs://my_bucket/path/to/temp-sensor-data.avro")

     PCollection<WindSpeed> windData = p.apply(AvroIO
         .read(AvroAutoGenClass.class)
         .from("gs://my_bucket/path/to/wind-sensor-data.avro")

     PCollection<WindSpeed> tempDataWithWindSpeed = ?

共有1个答案

西门凯康
2023-03-14

@jszule的评论通常是Dataflow/Beam的一个很好的答案:最好的支持连接是当两个PCollection有一个共同的键时。对于大多数数据,Beam可以找出一个模式,您可以使用CoGroup.join转换。您必须做出的设计决定是如何选择键,例如四舍五入到最近的1000。

您的用例有一个复杂性:您需要在没有数据的键的时间序列中结转值。解决方案是使用状态和计时器生成“缺失”值。您仍然需要仔细选择键,因为状态和计时器是针对每个键和窗口的。状态和计时器也在批处理模式下工作,因此这是一个批处理/流式统一解决方案。

你可能想阅读Reza Rokni和我关于这个主题的博客文章,或者Reza在2019年柏林光束峰会上的演讲

 类似资料:
  • 我想将两个ObjectNode合并在一起,但要具有最佳的复杂性。我知道一种方法,我可以使用setAll方法,但它返回JsonNode,因此我必须转换它。我发现的转换过程的最佳方法是如何将JsonNode转换为ObjectNode。我认为这种转换一旦迭代Json,也许我们可以找到一个更好的解决方案。 我认为第二种方法是迭代第二个ObjectNode,然后用put方法一个接一个地添加到第一个Objec

  • 问题内容: 我有两个像这样的数组: 我想结合这两个数组,使其不包含重复项,并保留其原始键。例如,输出应为: 我已经尝试过了,但是它正在更改其原始键: 有什么办法吗? 问题答案: 只需使用: 那应该解决。因为如果一个键出现多次(例如在您的示例中),则使用字符串键,因此一个键将覆盖具有相同名称的处理键。因为在您的情况下,它们两者都具有相同的值,但这无关紧要,并且还会删除重复项。 更新:我刚刚意识到,P

  • 问题内容: 我有两个键为s且值为的映射。给定两个s,合并它们的最简单方法是什么,如果两个键相同,则值是两个集合的并集。您可以假设值永远不会为null,并且如果有用的话,我们可以将它们设为s。 问题答案: 我们在谈论实例。在这种情况下,查找值为O(1),因此您只需获取一个映射,然后对该映射的条目进行迭代,看看另一个映射是否包含该键。如果没有,只需添加设置。如果包含密钥,则将两个集合并集(通过将一个集

  • 我有两个代号为“一”的独立项目,它们都运行良好,现在我想将这两个项目合并为一个。我知道如何组合代码和类文件,但我想知道如何组合两个项目的文件,因为在一个文件,很难再次创建所有文件。请建议是否有任何方法我可以结合两个文件,或者我可以使用两个一个项目中的文件。e、 g。 项目1:名称:test1有主题。res文件和12个表格 项目2:名称:test2有主题。res文件和18表格 新项目:名称:Merg

  • rank ▲ ✰ vote url 65 357 50 683 url 合并两个列表 怎样合并两个列表? 例如: listone = [1,2,3] listtwo = [4,5,6] 我期待: mergedlist == [1, 2, 3, 4, 5, 6] 在Python中非常容易. mergedlist = listone + listtwo

  • 我是Apache Beam的新手。 基本上,我有两个PCollection,每个都包含多个DataRecords,定义为: 每条记录都有一个id和多个数据字段。 我有两个收藏: 我需要找出: p1中存在但p2中不存在的数据记录 DataRecord只能通过其id字段进行区分。 到目前为止,我所做的是将两个PCollection实例转换为PCollection 然而,由于PCollection不允许