当前位置: 首页 > 知识库问答 >
问题:

使用GCP Dataflow从GCS读取数据的速度非常慢&Apache Beam Python SDK

周辰沛
2023-03-14

首先让我先说明一下,我刚刚开始了解如何使用Beam的Python SDK和GCP Dataflow!

问题:我的管道对于几乎所有的用例都非常有效。没有我可以抱怨的错误。我只是对一些可能的瓶颈或我可以做的优化有一些问题。我注意到,当处理大小为50MB的gzipped文件时,我的管道执行时间几乎超过3个小时。不完全确定是否有任何方法来加快这一部分。下面是在作业最终成功完成之前我看到的一堆日志警告的截图。

记录数据流的输出详细信息

以下是管道的相关片段:

if __name__ == '__main__':

    parser = argparse.ArgumentParser(
        description='Beam Pipeline: example'
    )
    parser.add_argument('--input-glob',
                        help='Cloud Path To Raw Data',
                        required=True)
    parser.add_argument('--output-bq-table',
                        help='BQ Native {dataset}.{table}')
    known_args, pipeline_args = parser.parse_known_args()

    with beam.Pipeline(argv=pipeline_args) as p:

        json_raw = (p | 'READING RAW DATA' >> beam.io.ReadFromText(known_args.input_glob)
            | 'JSON-ing' >> beam.Map(lambda e: json.loads(e))
        )

额外信息:

>

  • 我正在使用Airflow的Dataflowhook启动此操作。
  • 我尝试过不同的机器类型,希望在这个问题上投入更多的计算能力可以使它消失,但到目前为止还没有成功。

    以下管道执行参数:

    --runner=DataflowRunner \
    --project=example-project \
    --region=us-east4 \
    --save_main_session \
    --temp_location=gs://example-bucket/temp/ \
    --input-glob=gs://example-bucket/raw/  \
    --machine_type=n1-standard-16 \
    --job_name=this-is-a-test-insert
    
  • 共有1个答案

    汪深
    2023-03-14

    Beam有许多优化,允许它将文件处理拆分给多个工作人员。此外,它能够拆分单个文件,供多个工作人员并行使用。

    不幸的是,这在gzip文件中是不可能的。这是因为gzip文件被压缩到单个块中,必须连续解压缩。当光束工作者读取这个文件时,它必须串行读取整个东西。

    有一些压缩格式允许您并行读取它们(这些通常是“多块”格式)。不幸的是,我认为Beam Python SDK目前只支持串行格式。

    如果需要以这种方式工作管道,可以尝试在readfromtext之后添加beam.reshuffle操作。通过这样做,您的管道仍将串行读取文件,但并行应用所有下游操作(请参见PTransform guide中的性能部分,以了解为什么会出现这种情况)。

    其他一些备选方案:

    • 将数据分离到多个gzip文件中。
    • 在管道中消耗它之前解压缩它。
    • (半开玩笑/半认真)为梁贡献多块压缩支撑?:)):)
     类似资料:
    • 我有26个CSV文件,我想每晚从互联网上抓取并上传到Postgresql表中。我使用Java、PreparedStatement和Batch实现了这一点。尽管如此,性能仍然非常缓慢。要获取大约6000个条目并将其放入Postgresql,需要30分钟。这是我第一次做这样的事情,所以我不知道这是快还是慢。 为了获取文件,我使用了以下代码。 然后,我使用PreparedStatement从输入流中提取

    • 我有一个大问题,我自己创建了链表和数据结构,但数据读取功能工作非常慢。如果我尝试读取10k结构,函数需要大约530ms: 但是当我尝试读取10倍大的数据量(100k)时,大约需要44500毫秒: 这是我的代码: IQ_struct.h IQ_struct.cpp IQ_data.h IQ_data.cpp Main.cpp 我做错了什么?主要问题是我的文件包含超过5000K的结构。提前感谢您的帮助

    • 我在Google Compute Engine上创建了两个集群,这些集群读取100 GB的数据。 集群I:1个主机-15 GB内存-250 GB磁盘10个节点-7.5 GB内存-200 GB磁盘 集群II:1主-15 GB内存-250 GB磁盘150节点-1.7 GB内存-200 GB磁盘 我用它来读取文件: 另外,hadoop的复制或移动命令也很慢。数据只有100 GB。大公司如何处理太字节的数

    • 问题内容: 我正在查询有关的信息。 我正在迭代一个数组,并查询列表中的每个值。 不幸的是 ,在调试器下, 单个查询大约需要3-4秒,而 在禁用调试器的情况下, 查询时间要 短一些。 任何想法为什么这么慢?我使用进行测试。 这是我的代码: 更新资料 当我离开时,评估很快就完成了,但是我没有得到。它返回一个空字符串… 问题答案: 感谢@nvrmnd我尝试了一下,发现了一种更好的解析器: VTD-XML

    • 问题内容: 我面临一个非常奇怪的问题:使用Redis时,我的写入速度非常糟糕(在理想情况下,写入速度应该接近RAM上的写入速度)。 这是我的基准: 是生成随机字符串的类(arg是字符串长度) 以下是几个结果: [写入] nb:100000 |时间:4.408319378 |速度:0.713905907055318 MB / s [写入] nb:100000 |时间:4.4139469070553

    • 问题内容: 我有一个MySQL查询(Ubu 10.04,Innodb,Core i7、16Gb RAM,SSD驱动器,优化的MySQL参数): 表em_link_data有大约700万行,em_link有数千行。此查询大约需要 18秒 才能完成。但是,如果我替换子查询的结果并执行以下操作: 那么查询将在不到1毫秒的时间内运行。仅子查询在不到1毫秒的时间内运行,因此索引了列linkid。 如果我将查