当前位置: 首页 > 知识库问答 >
问题:

DataFlow/Apache beam-当传入模式时如何访问当前文件名?

弓嘉纳
2023-03-14
with beam.Pipeline(options=pipeline_options) as p:                              
    lines = p | ReadFromText('gs://url to file')                                        


    data = (                                                                    
        lines                                                                   
        | 'Jsonify' >> beam.Map(jsonify)                                        
        | 'Unnest' >> beam.FlatMap(unnest)                                      
        | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                  
            'project_id:dataset_id.table_name', schema=schema,                     
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,    
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)       
        )                                                   

最后,我想要做的是在转换json的每一行时将文件名传递到转换函数中(请参见此内容,然后使用文件名在不同的BQ表中进行查找以获得值)。我想一旦我设法知道如何获得文件名,我将能够找出侧输入部分,以便在bq表中进行查找,并获得唯一的值。

共有1个答案

弘承业
2023-03-14

我试图用前面引用的案例实现一个解决方案。在那里,以及在其他方法中,比如这个方法,他们也会得到一个文件名列表,但将所有文件加载到一个元素中,这可能不能很好地伸缩大文件。因此,我研究了将文件名添加到每个记录中。

作为输入,我使用了两个csv文件:

$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france

使用gcsfilesystem.match,我们可以访问metadata_list,以字节为单位检索包含文件路径和大小的FileMetadata。在我的示例中:

[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
 FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]
result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

然后,我们继续使用ReadFromText将每个不同的文件读入相应的PCollection,然后调用AddFileNamesFnParDo将每个记录与文件名关联起来。

for i in range(len(result)):   
  globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

其中AddFileNamesfn为:

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        yield {'filename':file_name, 'row':element}

我的第一种方法是直接使用Map函数,这会产生更简单的代码。但是,result[i].path在循环结束时被解析,每个记录都被错误地映射到列表的最后一个文件:

globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()

我们通过记录元素来检查结果:

INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}

我用DirectrunnerDataflowRunnerfor Python SDK 2.8.0测试了这一点。

我希望这能解决这里的主要问题,您现在可以继续将BigQuery集成到完整的用例中。为此,您可能需要使用Python客户机库,我编写了一个类似的Java示例。

import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        # yield (file_name, element) # use this to return a tuple instead
        yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
  logging.info(element)
  return element

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  p = beam.Pipeline(options=PipelineOptions(pipeline_args))
  gcs = GCSFileSystem(PipelineOptions(pipeline_args))
  gcs_reader = GCSFileReader(gcs)

  # in my case I am looking for files that start with 'countries'
  BUCKET='BUCKET_NAME'
  result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
  result = reduce(add, result)

  # create each input PCollection name and unique step labels
  variables = ['p{}'.format(i) for i in range(len(result))]
  read_labels = ['Read file {}'.format(i) for i in range(len(result))]
  add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

  # load each input file into a separate PCollection and add filename to each row
  for i in range(len(result)):
    # globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
    globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

  # flatten all PCollections into a single one
  merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

  p.run()

if __name__ == '__main__':
  run()
 类似资料:
  • 问题内容: 在Keras(具有Tensorflow后端)中,当前输入模式可用于我的自定义损失函数吗? 当前输入模式定义为用于产生预测的输入向量。例如,考虑以下内容:。然后,当前输入模式是与y_train(在损失函数中称为y_true)关联的当前X_train向量。 在设计自定义损失函数时,我打算优化/最小化一个需要访问当前输入模式而不只是当前预测的值。 我已经看过https://github.co

  • 在Keras(Tensorflow后端)中,当前输入模式是否可用于我的自定义损失函数? 当前输入模式定义为用于产生预测的输入向量。例如,考虑以下代码:<代码> xyStand、xyTest、yyStand、yyTest= TrimeTestSythRead(x,y,TestLySt= 0.33,RealthObjySt==42,Suffle=false)< /Cord>。然后,当前输入模式是与y_

  • 我试图使用bash创建一个文件,文件名为当前时间。这就是我试图做到的:

  • 我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实

  • 问题内容: 我希望能够从导入的模块中动态检索当前执行模块或类名称。这是一些代码: foo.py: bar.py: 这显然不起作用,因为包含该功能的模块的名称是无效的。我希望在模块内部访问的是正在使用的当前执行模块的名称。因此,在上述情况下,将是这样,但是如果导入了其他模块,我想动态地访问该模块的名称。 编辑: 该模块看起来很有希望,但它并不是我一直在寻找的东西。我希望找到的是我可以访问的某种全局或