当前位置: 首页 > 知识库问答 >
问题:

Apache-Beam向PCollection添加序列号

慎旭尧
2023-03-14
p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy) 

我读过没有办法从PCollection中获得列表:如何在Google Dataflow中从PCollection中获得元素列表,并在管道中使用它来循环写入转换?

我怎样才能实现呢?有人帮忙吗?

共有1个答案

沈乐邦
2023-03-14

如果您希望获得包含pcollection中每个元素的列表,则可以使用侧输入。请记住,这将从您的结果中删除所有并行性,并且您的管道可能会变得缓慢。

如果您还想这样做,那么:

side_input_coll = beam.pvalue.AsIterable(my_collection)

(p 
 | beam.Create([0]) 
 | beam.FlatMap(lambda _, my_seq: [(elem, i) for i, elem in enumerate(my_seq)],
               my_seq=side_input_coll))

但是不要忘记,为了保持并行性,最好只生成一个随机ID。请记住pCollections本质上是无序的。

 类似资料:
  • 我的输入是一个json列表,我希望有一个多元素pCollection。这是我的代码: 问题是,当列表由2个元素组成时,我只得到列表的第一个元素。 我如何做到这一点?

  • 假设我们有一些嵌套列表: 我们可以像这样轻松地在Stream API中进行翻盖映射: 但是用“FlatMapElements”做这件事,真是一团糟: 我们能用平面贴图功能做得更好吗<一个简单的平面图工作不应该那么复杂,所以我想我遗漏了一些东西 我甚至无法替换。via(列表-

  • 我在云存储中有两个文件。包含Avro格式的File1,其中包含来自温度传感器的数据。 包含Avro格式的File2,其中包含来自风传感器的数据。 我想像下面这样组合输出 我正在寻找apache光束中的解决方案来组合上述文件。现在它正在从文件中读取,但将来可能会通过pubsub来读取。我想找出组合两个PCollection的自定义方法,并创建另一个PCollection temDataSusWind

  • 我正在使用Apache Beam GoSDK,很难以正确的格式按键分组/组合PCollection。 我在PCollection的字符串中每个键有多条记录,如下所示: 我想使用GroupByKey和CombinePerKey,这样我可以像这样聚合每个人的宠物: 如何转换PCollection 他们在这里提到了类似的内容,但没有包含聚合字符串值的代码。 我可以使用ParDo来获取字符串键和字符串值,

  • 有什么方法可以检查PCollection是否为空? 我没有在数据流和Apache Beam的留档中找到任何相关内容。

  • 我是Apache Beam的新手。 基本上,我有两个PCollection,每个都包含多个DataRecords,定义为: 每条记录都有一个id和多个数据字段。 我有两个收藏: 我需要找出: p1中存在但p2中不存在的数据记录 DataRecord只能通过其id字段进行区分。 到目前为止,我所做的是将两个PCollection实例转换为PCollection 然而,由于PCollection不允许