我有一个用例,在这个用例中,我需要将一个file_path(作为一个新元素)追加到另一个pcollection中,它是一个元素字典。
但奇怪的是,它没有执行这个语句,因此没有输出,甚至没有任何错误。但是当我使用以下命令创建pcollection时:
file_path=beam.create(['some_path_value'])并按照上面的方式传递beam.pvalue.assingleton(file_path),它工作得很好。
但是我不想显式地创建pcollection。我需要使用上面的转换输出作为SideInput。
def collect(pcoll, path):
print(pcoll, "-----------")
print(path,"-------------")
return path
class Extract(beam.DoFn):
def process(self, element, *args, **kwargs):
pub_sub_json= json.loads(element)
gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
yield gcs_url
file_path = (p |"Read" >> beam.io.ReadFromPubSub(
subscription=options.session_subscription,
with_attributes=False)
| "Extract" >> beam.ParDo(Extract()))
result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path)))
不会出现错误和警告,但它甚至不会进入collect方法内部。它主要发生在数据不会来自上述转换的情况下。但是我检查了很多次file_name有数据。
但是没有输出,只有当我使用beam.create([“pass_some_file_path”])并将其结果用作sideinput时才显示输出
转换的输出应该是2-tuple((key,value))格式,以便接受它作为pvalue.asdict()的侧输入。它也能很好地与value.aslist()一起工作。
def collect(pcoll, path):
print(pcoll, "-----------")
print(path,"-------------")
return path
class Extract(beam.DoFn):
def process(self, element, *args, **kwargs):
pub_sub_json= json.loads(element)
gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
yield pvalue.TaggedOutput("url",("url" :gcs_url))
file_path = (p |"Read" >> beam.io.ReadFromPubSub(
subscription=options.session_subscription,
with_attributes=False)
| "Extract" >> beam.ParDo(Extract()))
file_path_new= (file_path["url"] | "Group by key" >> beam.GroupByKey())
result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path_new)))
你能用更好的方法解决它吗?
我得到了谷歌云存储桶的URL。我必须: > 对于每个blob,我进行一些gcsapi调用,以获取关于blob的信息(blob.size、blob.name等) 对于每个Blob,我还必须读取它,在它里面找到一些东西,并将其添加到从GCS API调用中获得的值中 对于每个blob,我必须将步骤2和步骤3中找到的关于blob的值写入BigQuery 我有数千个blob,因此这需要使用ApacheBea
我正在使用Apache Beam GoSDK,很难以正确的格式按键分组/组合PCollection。 我在PCollection的字符串中每个键有多条记录,如下所示: 我想使用GroupByKey和CombinePerKey,这样我可以像这样聚合每个人的宠物: 如何转换PCollection 他们在这里提到了类似的内容,但没有包含聚合字符串值的代码。 我可以使用ParDo来获取字符串键和字符串值,
我有一个网站,在那里我打开合同,得到唯一的合同。然后,我需要转到另一个页面,在带有分页的表中搜索此id。我编写了一段代码,如果找不到这个请求ID(它是一个链接),则转到下一页,如果它存在,则只打开这个请求ID。但webelement的初始化有一个问题,我正在尝试添加动态值。Selenium给出以下错误,我不知道如何解决 org.openqa.selenium.NoSuchElementExcept
本文向大家介绍如何将动态值动态传递到SAP ABAP中的CDS,包括了如何将动态值动态传递到SAP ABAP中的CDS的使用技巧和注意事项,需要的朋友参考一下 我认为不存在将动态值传递给CDS的方法。 为了使DCL能够完成其分配的活动,您需要声明和定义权限对象。假设您无法执行此操作。然后,您可以获得所有结果,然后使用ABAP在网关层过滤结果。
我正在开发一个N-皇后模拟使用pyplay。 另外,我在另一个文件中有一个pygame程序,用于绘制棋盘,该棋盘将列表作为输入,并相应地放置它们。如果我想显示皇后的移动,如何从递归过程动态传递列表,以便可以看到确切的回溯过程。非常感谢。
问题内容: 我正在使用python flask框架。我编写了一个需要一个参数的装饰器,该参数是动态的。 我的装饰器如下所示,将获得一个密钥,并使用该密钥从redis获取数据。 而且我有一个使用这种装饰器的类,像这样的代码 如您所见,我的装饰器需要一个名为的参数,然后像这样传递密钥 将获得城市的ID,如果一切正常,密钥将如下所示 但我得到了错误: 我很困惑,在烧瓶中,如何将动态参数传递给装饰器? 谢