当前位置: 首页 > 知识库问答 >
问题:

如何将字典作为PCollection返回?

微生博简
2023-03-14

我得到了谷歌云存储桶的URL。我必须:

>

对于每个blob,我进行一些gcsapi调用,以获取关于blob的信息(blob.size、blob.name等)

对于每个Blob,我还必须读取它,在它里面找到一些东西,并将其添加到从GCS API调用中获得的值中

对于每个blob,我必须将步骤2和步骤3中找到的关于blob的值写入BigQuery

我有数千个blob,因此这需要使用ApacheBeam来完成(我被推荐)

我对管道的想法是这样的:

GetUrlOfBucket并进行PCollection

使用该PCollection获取Blob列表作为新的PCollection

使用这些Blob的元数据创建PCollection

执行一个转换,该转换将接受作为元数据值字典的PCollection,进入Blob,扫描一个值,并返回一个新的PCollection,即元数据值字典和这个新值

将此写入BigQuery。

我特别难以考虑如何将词典作为PCollection返回

[]我读到的内容:

https://beam.apache.org/documentation/programming-guide/#composite-转变

https://medium.com/@Rajeshegde/data-pipeline-use-apache-beam-python-sdk-on-dataflow-6bb8550bf366

非常感谢您的任何建议,特别是关于如何接受该bucket名称并返回一个blob的PCollection的建议。

共有1个答案

司英飙
2023-03-14

我通过阅读更多关于ApacheBeam的内容来解决这个问题,我发现我必须使用ParDo函数在我的资源之间分割作业,在ParDo中我调用我的DoFn函数,它接收一个元素并执行它所需的所有处理,并生成一个dic。请参阅本文ApacheBeam:如何同时创建多个经过相同PTransform的pCollection?

    class ExtractMetadata(beam.DoFn):                                                                                                                                                                                                                                                  
def process(self, element):                                                                                                                                                                                                                                                    
    """                                                                                                                                                                                                                                                                        
    Takes in a blobName, fetches the blob and its values and returns a dictionary of values                                                                                                                                                                                    
    """                                                                                                                                                                                                                                                                        
    metadata = element.metadata                                                                                                                                                                                                                                                
    if metadata is not None:                                                                                                                                                                                                                                                   
        event_count = int(metadata['count'])                                                                                                                                                                                                                                   
    else:                                                                                                                                                                                                                                                                      
        event_count = None                                                                                                                                                                                                                                                     

    event_type = self.determine_event_type(element.id)                                                                                                                                                                                                                         
    cluster    = self.determine_cluster(element.id)                                                                                                                                                                                                                            
    customer   = self.determine_customer(element)                                                                                                                                                                                                                              
   # date = datetime.strptime(element.time_created, '%a, %d %b %Y %H:%M:%S')                                                                                                                                                                                                   
    #date = date.isoformat()                                                                                                                                                                                                                                                   
    dic = {                                                                                                                                                                                                                                                                    
        'blob_name'       : element.name,                                                                                                                                                                                                                                      
        'event_path'      : element.path,                                                                                                                                                                                                                                      
        'size'            : int(element.size),                                                                                                                                                                                                                                 
        'time_of_creation': element.time_created.isoformat(),                                                                                                                                                                                                                  
        'event_count'     : event_count,                                                                                                                                                                                                                                       
        'event_type'      : event_type,                                                                                                                                                                                                                                        
        'cluster'         : cluster,                                                                                                                                                                                                                                           
        'customer'        : customer                                                                                                                                                                                                                                           
    }                                                                                                                                                                                                                                                                          
    yield dic
 类似资料:
  • 问题内容: 在Python 2.7中,我可以将字典键,值或项作为列表获取: 现在,在Python> = 3.3中,我得到如下信息: 因此,我必须这样做以获得列表: 我想知道,是否有更好的方法在Python 3中返回列表? 问题答案: 尝试。 这会将对象转换为列表。 另一方面,你应该问自己是否重要。的编码方式是假设鸭子输入(如果看起来像鸭子,而像鸭子一样嘎嘎叫,那就是鸭子)。在大多数情况下,该对象的

  • 我正在使用Apache Beam GoSDK,很难以正确的格式按键分组/组合PCollection。 我在PCollection的字符串中每个键有多条记录,如下所示: 我想使用GroupByKey和CombinePerKey,这样我可以像这样聚合每个人的宠物: 如何转换PCollection 他们在这里提到了类似的内容,但没有包含聚合字符串值的代码。 我可以使用ParDo来获取字符串键和字符串值,

  • 问题内容: 如何将Python字典序列化为字符串,然后再返回字典?该词典中将包含列表和其他词典。 问题答案: 这取决于您要使用它的目的。如果只是尝试保存它,则应使用(或者,如果使用CPython 2.x,则速度更快)。 如果您希望它可读,可以使用: 但是,它支持的功能非常有限,虽然可以用于任意对象(如果它不能自动运行,则该类可以定义以精确指定应如何对其进行腌制)。

  • 问题内容: 我在玩苹果的新 Swift 编程语言,遇到了一些问题… 当前,我正在尝试读取plist文件,在Objective-C中,我将执行以下操作以将内容作为NSDictionary获取: 如何在Swift中将plist作为字典? 我假设我可以通过以下方式获取plist的路径: 当这可行时(如果正确的话):如何将内容作为字典? 还有一个更笼统的问题: 是否可以使用默认的 NS ** 类?我想是…

  • 问题内容: 我有以下字符串,这是一个字符串化的Python字典: 如何从上述字符串中获取Python字典? 问题答案: 好吧,你可以做 但是,如果字符串包含用户输入,则是个坏主意,因为表达式中可能包含一些随机的恶意函数。 因此,更安全的选择可能是: 来自http://docs.python.org/library/ast.html#ast.literal_eval: 提供的字符串或节点只能由以下P

  • 我有一个用例,在这个用例中,我需要将一个file_path(作为一个新元素)追加到另一个pcollection中,它是一个元素字典。 但奇怪的是,它没有执行这个语句,因此没有输出,甚至没有任何错误。但是当我使用以下命令创建pcollection时: file_path=beam.create(['some_path_value'])并按照上面的方式传递beam.pvalue.assingleton