我得到了谷歌云存储桶的URL。我必须:
>
对于每个blob,我进行一些gcsapi调用,以获取关于blob的信息(blob.size、blob.name等)
对于每个Blob,我还必须读取它,在它里面找到一些东西,并将其添加到从GCS API调用中获得的值中
对于每个blob,我必须将步骤2和步骤3中找到的关于blob的值写入BigQuery
我有数千个blob,因此这需要使用ApacheBeam来完成(我被推荐)
我对管道的想法是这样的:
GetUrlOfBucket并进行PCollection
使用该PCollection获取Blob列表作为新的PCollection
使用这些Blob的元数据创建PCollection
执行一个转换,该转换将接受作为元数据值字典的PCollection,进入Blob,扫描一个值,并返回一个新的PCollection,即元数据值字典和这个新值
将此写入BigQuery。
我特别难以考虑如何将词典作为PCollection返回
[]我读到的内容:
https://beam.apache.org/documentation/programming-guide/#composite-转变
https://medium.com/@Rajeshegde/data-pipeline-use-apache-beam-python-sdk-on-dataflow-6bb8550bf366
非常感谢您的任何建议,特别是关于如何接受该bucket名称并返回一个blob的PCollection的建议。
我通过阅读更多关于ApacheBeam的内容来解决这个问题,我发现我必须使用ParDo函数在我的资源之间分割作业,在ParDo中我调用我的DoFn函数,它接收一个元素并执行它所需的所有处理,并生成一个dic。请参阅本文ApacheBeam:如何同时创建多个经过相同PTransform的pCollection?
class ExtractMetadata(beam.DoFn):
def process(self, element):
"""
Takes in a blobName, fetches the blob and its values and returns a dictionary of values
"""
metadata = element.metadata
if metadata is not None:
event_count = int(metadata['count'])
else:
event_count = None
event_type = self.determine_event_type(element.id)
cluster = self.determine_cluster(element.id)
customer = self.determine_customer(element)
# date = datetime.strptime(element.time_created, '%a, %d %b %Y %H:%M:%S')
#date = date.isoformat()
dic = {
'blob_name' : element.name,
'event_path' : element.path,
'size' : int(element.size),
'time_of_creation': element.time_created.isoformat(),
'event_count' : event_count,
'event_type' : event_type,
'cluster' : cluster,
'customer' : customer
}
yield dic
问题内容: 在Python 2.7中,我可以将字典键,值或项作为列表获取: 现在,在Python> = 3.3中,我得到如下信息: 因此,我必须这样做以获得列表: 我想知道,是否有更好的方法在Python 3中返回列表? 问题答案: 尝试。 这会将对象转换为列表。 另一方面,你应该问自己是否重要。的编码方式是假设鸭子输入(如果看起来像鸭子,而像鸭子一样嘎嘎叫,那就是鸭子)。在大多数情况下,该对象的
我正在使用Apache Beam GoSDK,很难以正确的格式按键分组/组合PCollection。 我在PCollection的字符串中每个键有多条记录,如下所示: 我想使用GroupByKey和CombinePerKey,这样我可以像这样聚合每个人的宠物: 如何转换PCollection 他们在这里提到了类似的内容,但没有包含聚合字符串值的代码。 我可以使用ParDo来获取字符串键和字符串值,
问题内容: 如何将Python字典序列化为字符串,然后再返回字典?该词典中将包含列表和其他词典。 问题答案: 这取决于您要使用它的目的。如果只是尝试保存它,则应使用(或者,如果使用CPython 2.x,则速度更快)。 如果您希望它可读,可以使用: 但是,它支持的功能非常有限,虽然可以用于任意对象(如果它不能自动运行,则该类可以定义以精确指定应如何对其进行腌制)。
问题内容: 我在玩苹果的新 Swift 编程语言,遇到了一些问题… 当前,我正在尝试读取plist文件,在Objective-C中,我将执行以下操作以将内容作为NSDictionary获取: 如何在Swift中将plist作为字典? 我假设我可以通过以下方式获取plist的路径: 当这可行时(如果正确的话):如何将内容作为字典? 还有一个更笼统的问题: 是否可以使用默认的 NS ** 类?我想是…
问题内容: 我有以下字符串,这是一个字符串化的Python字典: 如何从上述字符串中获取Python字典? 问题答案: 好吧,你可以做 但是,如果字符串包含用户输入,则是个坏主意,因为表达式中可能包含一些随机的恶意函数。 因此,更安全的选择可能是: 来自http://docs.python.org/library/ast.html#ast.literal_eval: 提供的字符串或节点只能由以下P
我有一个用例,在这个用例中,我需要将一个file_path(作为一个新元素)追加到另一个pcollection中,它是一个元素字典。 但奇怪的是,它没有执行这个语句,因此没有输出,甚至没有任何错误。但是当我使用以下命令创建pcollection时: file_path=beam.create(['some_path_value'])并按照上面的方式传递beam.pvalue.assingleton