输入:abc.tar.gz->untar->文件夹:abc
ABC的文件夹结构:
根文件夹:abc包含每天每5分钟从100个城市生成的csv文件。
csv文件数量:100个城市*每小时12个文件*24小时=28800个csv文件
abc/
city1_0005.csv
city1_0010.csv
..
city1_2355.csv
..
..
city2_0005.csv
city2_0010.csv
..
city2_2355.csv
..
..
city100_0005.csv
city100_0010.csv
功能需求:
我已经开发了下面的代码处理数据顺序。我在寻找优化它的方法。
staging_path="abfss://xyz/abc"
#using databricks utils to get the list of files in folder
filesProp = dbutils.fs.ls(staging_adls_path)
#extracting the city names from list of filenames
filesSet =set()
for file in filesProp:
filesSet.add(file.name.split('-')[0])
#empty list to store dataframes
dictionary_df = {}
#reading 1 city data and inserting to table
for fileName in filesSet:
filePath = staging_path+fileName+"*"
print(filePath)
dictionary_df[fileName] = spark.read.options(header='True', delimiter=',').csv(filePath)
dictionary_df[fileName].write.saveAsTable(fileName)
This will ensure the files with same schema are under same root folder
/abc/
city1/
20211021/city1_0005
20211021/city1_0010
...
city2/
20211021/city2_0005
20211021/city2_0010
from pyspark.sql import functions as F
tmp_db = "test_multiple_csv_schema"
spark.sql(f"create database if not exists {tmp_db}")
base_path = <your_base_mount_path_root_folder_for_csvs>
checkpoint_location = f"{base_path}/checkpoint/multiplecsvs"
input_path = f"{base_path}/multiplecsvs/"
schema_location = f"{base_path}/schema/multiplecsvs"
staging_checkpoint_path = f"{base_path}/staging/checkpoint/multiplecsvs"
staging_data_path = f"{base_path}/staging/data/multiplecsvs"
input_format = "csv"
def process_multiple_csvs_different_schema(batch_df):
df = (
batch_df
.withColumn("table",F.split(F.col("input_file_name"),"\.csv")[0])
.withColumn("table_path",F.split(F.col("table"),"/"))
.withColumn("table_name",F.split(F.col("table"),"/")[F.size(F.col("table_path"))-1])
.drop("table","table_path")
)
list_of_cities = df.select("table_name").distinct().collect()
list_of_cities = [city[0] for city in list_of_cities]
for city in list_of_cities:
print(f"processing data for {city}")
city_df = df.where(f"table_name='{city}'")
input_file_name = city_df.limit(1).select("input_file_name").collect()[0][0]
df_schema = spark.read.option("format",input_format).option("header",True).load(input_file_name,format=input_format)
select_columns = df_schema.columns
city_df.select(select_columns).withColumn("processed_time",F.current_timestamp()).write.option("mergeSchema",True).option("mode","append").format("delta").saveAsTable(f"{tmp_db}.{city}")
raw_df = (spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format",input_format)
.option("cloudFiles.schemaLocation",schema_location)
.load(input_path)
)
(
raw_df.withColumn("input_file_name",F.input_file_name())
.writeStream
.option("checkpointLocation",staging_checkpoint_path)
.option("mergeSchema",True)
.option("format","delta")
.outputMode("append")
.trigger(once=True)
.start(staging_data_path)
.awaitTermination()
)
staging_df = spark.readStream.format("delta").load(staging_data_path)
(
staging_df.writeStream
.option("checkpointLocation",checkpoint_location)
.option("format","delta")
.trigger(once=True)
.foreachBatch(lambda batch_df,batch_id:process_multiple_csvs_different_schema(batch_df))
.start()
.awaitTermination()
)
问题内容: 我对此进行了一些讨论,但还不太了解正确的解决方案:我想将S3中的数百个文件加载到RDD中。这是我现在的做法: 在不使用实际的阅读客户端: 我从在Scala中针对相同问题看到的答案中“翻译”了一下。我认为也可以将整个路径列表传递给,但是我不确定哪种是最佳做法。 问题答案: 根本的问题是,在s3中列出对象的速度确实很慢,并且每当执行树遍历时,看起来像目录树的方式都会降低性能(就像路径的通配
我的数据位于azure cosmos数据库中,我已经将数据集挂载到azure Databricks上。 我可以使用pandas读取csv文件,并将其加载到spark DataFrame中。
如何在JMeter中将一个csv文件循环到另一个csv文件,其中第一个csv文件包含所有登录数据,另一个csv文件包含交易数据。我应该运行1个出纳员应该处理30笔交易的地方。
我有一个巨大的文件(2GB),其中只包含员工编号。我必须阅读此文件,获取员工号码并调用数据库以获取员工的工资,然后将其写入另一个文件中,并将员工姓名和工资作为其行。 现在的问题是,通过直接读取这个巨大的文件通过简单的nio在java我的STS内存溢出或它需要4-5小时来完成整个读-取-写过程。 所以我想用Java并发来拯救我。 为此,我有一个实现Runnable的EmployeeDetails类,
/tmp/data/myfile1.csv,/tmp/data/myfile2.csv,/tmp/data.myfile3.csv,/tmp/datamyfile4.csv 我希望将这些文件读入Spark DataFrame或RDD,并且希望每个文件都是DataFrame的一个解析。我怎么能这么做?
我有从多个文件读取并写入多个文件的Spring批处理配置。是否可以只写入从多个读取的一个文件。假设我收到巨大的XML文件,我将XML拆分为小文件并使用分区器并行读取小文件。但我需要将从不同的小xml文件读取的所有数据写入一个输出文件。Spring批处理是否可以做到这一点?我知道通过使写入器同步是可能的,但我正在寻找任何其他可能的方式作业配置 我得到错误组织。springframework。一批项目