当前位置: 首页 > 知识库问答 >
问题:

如何在Python中将动态pcollection作为sideinput传递?

夔宏深
2023-03-14

我有一个用例,在这个用例中,我需要将一个file_path(作为一个新元素)追加到另一个pcollection中,它是一个元素字典。

但奇怪的是,它没有执行这个语句,因此没有输出,甚至没有任何错误。但是当我使用以下命令创建pcollection时:

file_path=beam.create(['some_path_value'])并按照上面的方式传递beam.pvalue.assingleton(file_path),它工作得很好。

但是我不想显式地创建pcollection。我需要使用上面的转换输出作为SideInput。

def collect(pcoll, path):
   print(pcoll, "-----------")
   print(path,"-------------")       
   return path

class Extract(beam.DoFn):    
   def process(self, element, *args, **kwargs):       
       pub_sub_json= json.loads(element)
       gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
       yield gcs_url

file_path = (p |"Read" >> beam.io.ReadFromPubSub(
                           subscription=options.session_subscription,
                           with_attributes=False)
             | "Extract" >> beam.ParDo(Extract()))                     

result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path)))


不会出现错误和警告,但它甚至不会进入collect方法内部。它主要发生在数据不会来自上述转换的情况下。但是我检查了很多次file_name有数据。

但是没有输出,只有当我使用beam.create([“pass_some_file_path”])并将其结果用作sideinput时才显示输出

共有1个答案

金瑞
2023-03-14

转换的输出应该是2-tuple((key,value))格式,以便接受它作为pvalue.asdict()的侧输入。它也能很好地与value.aslist()一起工作。

def collect(pcoll, path):
   print(pcoll, "-----------")
   print(path,"-------------")       
   return path

class Extract(beam.DoFn):    
   def process(self, element, *args, **kwargs):       
       pub_sub_json= json.loads(element)
       gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
       yield pvalue.TaggedOutput("url",("url" :gcs_url))

file_path = (p |"Read" >> beam.io.ReadFromPubSub(
                           subscription=options.session_subscription,
                           with_attributes=False)
             | "Extract" >> beam.ParDo(Extract())) 
file_path_new= (file_path["url"] | "Group by key" >> beam.GroupByKey())                    

result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path_new)))

你能用更好的方法解决它吗?

 类似资料:
  • 我得到了谷歌云存储桶的URL。我必须: > 对于每个blob,我进行一些gcsapi调用,以获取关于blob的信息(blob.size、blob.name等) 对于每个Blob,我还必须读取它,在它里面找到一些东西,并将其添加到从GCS API调用中获得的值中 对于每个blob,我必须将步骤2和步骤3中找到的关于blob的值写入BigQuery 我有数千个blob,因此这需要使用ApacheBea

  • 我正在使用Apache Beam GoSDK,很难以正确的格式按键分组/组合PCollection。 我在PCollection的字符串中每个键有多条记录,如下所示: 我想使用GroupByKey和CombinePerKey,这样我可以像这样聚合每个人的宠物: 如何转换PCollection 他们在这里提到了类似的内容,但没有包含聚合字符串值的代码。 我可以使用ParDo来获取字符串键和字符串值,

  • 我有一个网站,在那里我打开合同,得到唯一的合同。然后,我需要转到另一个页面,在带有分页的表中搜索此id。我编写了一段代码,如果找不到这个请求ID(它是一个链接),则转到下一页,如果它存在,则只打开这个请求ID。但webelement的初始化有一个问题,我正在尝试添加动态值。Selenium给出以下错误,我不知道如何解决 org.openqa.selenium.NoSuchElementExcept

  • 本文向大家介绍如何将动态值动态传递到SAP ABAP中的CDS,包括了如何将动态值动态传递到SAP ABAP中的CDS的使用技巧和注意事项,需要的朋友参考一下 我认为不存在将动态值传递给CDS的方法。 为了使DCL能够完成其分配的活动,您需要声明和定义权限对象。假设您无法执行此操作。然后,您可以获得所有结果,然后使用ABAP在网关层过滤结果。

  • 我正在开发一个N-皇后模拟使用pyplay。 另外,我在另一个文件中有一个pygame程序,用于绘制棋盘,该棋盘将列表作为输入,并相应地放置它们。如果我想显示皇后的移动,如何从递归过程动态传递列表,以便可以看到确切的回溯过程。非常感谢。

  • 问题内容: 我正在使用python flask框架。我编写了一个需要一个参数的装饰器,该参数是动态的。 我的装饰器如下所示,将获得一个密钥,并使用该密钥从redis获取数据。 而且我有一个使用这种装饰器的类,像这样的代码 如您所见,我的装饰器需要一个名为的参数,然后像这样传递密钥 将获得城市的ID,如果一切正常,密钥将如下所示 但我得到了错误: 我很困惑,在烧瓶中,如何将动态参数传递给装饰器? 谢