当前位置: 首页 > 知识库问答 >
问题:

从一个文件夹中读取多组csv文件,并使用spark或databricks并行插入到各自的目标表中

程墨竹
2023-03-14

输入:abc.tar.gz->untar->文件夹:abc

ABC的文件夹结构:

根文件夹:abc包含每天每5分钟从100个城市生成的csv文件。

csv文件数量:100个城市*每小时12个文件*24小时=28800个csv文件

abc/    
city1_0005.csv
city1_0010.csv
..
city1_2355.csv
..
..
city2_0005.csv
city2_0010.csv
..
city2_2355.csv
..
..
city100_0005.csv
city100_0010.csv

功能需求:

  • 使用Spark/databricks为每个城市创建表,并将相应城市的csv文件(288)加载到表中。总共100个表将在目标位置。1个城市的1个表。
  • 每个城市有不同的模式。每个城市的所有列都不同。因此,我无法将所有城市数据写入一个带有分区的表中。

我已经开发了下面的代码处理数据顺序。我在寻找优化它的方法。

staging_path="abfss://xyz/abc"

#using databricks utils  to get the list of files in folder
filesProp = dbutils.fs.ls(staging_adls_path)

#extracting the city names from list of filenames
filesSet  =set()
for file in filesProp:
    filesSet.add(file.name.split('-')[0])

#empty list to store dataframes
dictionary_df = {} 

#reading 1 city data and inserting to table
for fileName in filesSet:
    filePath = staging_path+fileName+"*"
    print(filePath)
    dictionary_df[fileName] = spark.read.options(header='True', delimiter=',').csv(filePath)
    dictionary_df[fileName].write.saveAsTable(fileName) 

共有1个答案

况明贤
2023-03-14
 This will ensure the files with same schema are under same root folder
 /abc/
     city1/
          20211021/city1_0005 
          20211021/city1_0010
           ...
     city2/
          20211021/city2_0005
          20211021/city2_0010 
from pyspark.sql import functions as F

tmp_db = "test_multiple_csv_schema"
spark.sql(f"create database if not exists {tmp_db}")
base_path = <your_base_mount_path_root_folder_for_csvs>
checkpoint_location = f"{base_path}/checkpoint/multiplecsvs"
input_path = f"{base_path}/multiplecsvs/"
schema_location = f"{base_path}/schema/multiplecsvs"
staging_checkpoint_path = f"{base_path}/staging/checkpoint/multiplecsvs"
staging_data_path = f"{base_path}/staging/data/multiplecsvs"
input_format = "csv"

def process_multiple_csvs_different_schema(batch_df):

      df = (
             batch_df
                .withColumn("table",F.split(F.col("input_file_name"),"\.csv")[0])
                .withColumn("table_path",F.split(F.col("table"),"/"))
                .withColumn("table_name",F.split(F.col("table"),"/")[F.size(F.col("table_path"))-1])
                .drop("table","table_path")
          )

     list_of_cities = df.select("table_name").distinct().collect()

     list_of_cities = [city[0] for city in list_of_cities]

     for city in list_of_cities:
        print(f"processing data for {city}")
        city_df = df.where(f"table_name='{city}'")
        input_file_name = city_df.limit(1).select("input_file_name").collect()[0][0]

        df_schema = spark.read.option("format",input_format).option("header",True).load(input_file_name,format=input_format)
        select_columns = df_schema.columns

        city_df.select(select_columns).withColumn("processed_time",F.current_timestamp()).write.option("mergeSchema",True).option("mode","append").format("delta").saveAsTable(f"{tmp_db}.{city}")

raw_df = (spark
          .readStream
          .format("cloudFiles")
          .option("cloudFiles.format",input_format)
          .option("cloudFiles.schemaLocation",schema_location)
          .load(input_path)
         )

(
  raw_df.withColumn("input_file_name",F.input_file_name())
        .writeStream
        .option("checkpointLocation",staging_checkpoint_path)
        .option("mergeSchema",True)
        .option("format","delta")
        .outputMode("append")
        .trigger(once=True)
        .start(staging_data_path)
        .awaitTermination()
)

staging_df = spark.readStream.format("delta").load(staging_data_path)
(
  staging_df.writeStream
     .option("checkpointLocation",checkpoint_location)
     .option("format","delta")
     .trigger(once=True)
     .foreachBatch(lambda batch_df,batch_id:process_multiple_csvs_different_schema(batch_df))
     .start()
     .awaitTermination()
)
 类似资料:
  • 问题内容: 我对此进行了一些讨论,但还不太了解正确的解决方案:我想将S3中的数百个文件加载到RDD中。这是我现在的做法: 在不使用实际的阅读客户端: 我从在Scala中针对相同问题看到的答案中“翻译”了一下。我认为也可以将整个路径列表传递给,但是我不确定哪种是最佳做法。 问题答案: 根本的问题是,在s3中列出对象的速度确实很慢,并且每当执行树遍历时,看起来像目录树的方式都会降低性能(就像路径的通配

  • 我的数据位于azure cosmos数据库中,我已经将数据集挂载到azure Databricks上。 我可以使用pandas读取csv文件,并将其加载到spark DataFrame中。

  • 如何在JMeter中将一个csv文件循环到另一个csv文件,其中第一个csv文件包含所有登录数据,另一个csv文件包含交易数据。我应该运行1个出纳员应该处理30笔交易的地方。

  • 我有一个巨大的文件(2GB),其中只包含员工编号。我必须阅读此文件,获取员工号码并调用数据库以获取员工的工资,然后将其写入另一个文件中,并将员工姓名和工资作为其行。 现在的问题是,通过直接读取这个巨大的文件通过简单的nio在java我的STS内存溢出或它需要4-5小时来完成整个读-取-写过程。 所以我想用Java并发来拯救我。 为此,我有一个实现Runnable的EmployeeDetails类,

  • /tmp/data/myfile1.csv,/tmp/data/myfile2.csv,/tmp/data.myfile3.csv,/tmp/datamyfile4.csv 我希望将这些文件读入Spark DataFrame或RDD,并且希望每个文件都是DataFrame的一个解析。我怎么能这么做?

  • 我有从多个文件读取并写入多个文件的Spring批处理配置。是否可以只写入从多个读取的一个文件。假设我收到巨大的XML文件,我将XML拆分为小文件并使用分区器并行读取小文件。但我需要将从不同的小xml文件读取的所有数据写入一个输出文件。Spring批处理是否可以做到这一点?我知道通过使写入器同步是可能的,但我正在寻找任何其他可能的方式作业配置 我得到错误组织。springframework。一批项目