当前位置: 首页 > 知识库问答 >
问题:

PySpark/Aws胶水的性能问题

林龙野
2023-03-14

我有一个数据帧。我需要将每个记录转换为JSON,然后使用JSON负载调用API将数据插入postgress。我在数据框中有14000条记录,要调用api并获得响应,需要5个小时。有没有办法提高性能。下面是我的代码片段。

df_insert = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "source_table_name") \
.load()

json_insert = df_insert.toJSON().collect()

for row in json_insert:
  line = json.loads(row)
    headers = {
    'Authorization': authorization,
    'content-type': "application/json",
    'cache-control': "no-cache",
    }
  response = requests.request("POST", url_insert, data=payload, headers=headers)
  print(response.text)
  res = response.text
  response_result = json.loads(res)
  #print(response_result["httpStatus"])
  if response_result["message"] == 'success':
      print ("INFO : Record inserted successfully")
  else:
      print ("ERROR : Error in the record")
      status_code = response_result["status"]
      error_message =  response_result["error"]
      my_list = [(status_code,error_message,row)]
      df = sc.createDataFrame(my_list, ['status', 'error', 'json data'])
      df.write.format(SNOWFLAKE_SOURCE_NAME) \
      .options(**sfOptions) \
      .option("dbtable", "error_table") \
      .option("header", "true") \
      .option("truncate_table", "on") \
      .mode("append") \
      .save()

注意:我知道通过做"json_insert=df_insert.toJSON()。收集()"我正在失去数据帧的优势。有没有更好的方法来完成。

共有1个答案

蔚和风
2023-03-14

df_insert.toJSON()返回一个RDD,您可以平面图1

source_rdd = df_insert.toJSON()

在此RDD上执行平面图,并返回仅包含错误的RDD。

headers = {
    'Authorization': authorization,
    'content-type': "application/json",
    'cache-control': "no-cache"
}

def post_service_error(row):
    # requests package may not be available in the node
    # see about adding files to the spark context
    response = requests.request("POST", url_insert, data=row, headers=headers)
    response_result = response.json()
    if response_result['message'] == 'success':
        print ("INFO : Record inserted successfully")
        return []
    print ("ERROR : Error in the record")
    status_code = response_result["status"]
    error_message =  response_result["error"]
    return [(status_code, error_message, row)]

errors_rdd = source_rdd.flatMap(post_service_error)

将错误RDD转换为火花DataFrame并将其持久化到表中。

errors_df = sc.createDataFrame(errors_rdd, ['status', 'error', 'json data'])
(errors_df.write.format(SNOWFLAKE_SOURCE_NAME)
  .options(**sfOptions)
  .option("dbtable", "error_table")
  .option("header", "true")
  .option("truncate_table", "on")
  .mode("append")
  .save())

如果您拥有正在执行请求的API,我建议探索一个接受批处理这些对象/数组的实现。这样,您可以在将每个分区映射到批处理请求之前对RDD进行分区,然后处理错误。

 类似资料:
  • 当我试图提取1个表时,我使用AWS Glue从EC2(Postgre)提取要转换的数据,并将其放在S3上。我得到一个错误,如下所示: 有什么我能做的吗?我试图删除null字段或fillna,但这些都不起作用。

  • 我每天都有csv文件被传递到S3,这些文件在当月是增量的。所以file1包含第1天的数据,file2包含第1天和第2天的数据,等等。每天我都想对该数据运行一个ETL并将其写入不同的S3位置,这样我就可以使用Athena查询它,而不会出现重复的行。本质上,我只想查询聚合数据的最新状态(这只是最近交付给S3的文件的内容)。 我认为书签不会起作用,因为增量交付包含以前文件中的数据,因此会产生重复。我知道

  • 我正在运行一个AWS Glue作业,使用从Glue自动生成的PySpark脚本,将S3上的管道分隔文件加载到RDS Postgres实例中。 最初,它抱怨某些列中的空值: http://spark.apache.org/docs/latest/api/python/pyspark.sql.sql.html#pyspark.sql.dataframe.fillna 现在,当我运行作业时,它会抛出以下

  • 我已经创建了一个成功执行的AWS胶水作业。但是,我无法在作业中放置任何自定义日志记录。 如何在AWS S3存储桶中创建日志文件,以便跟踪日常作业执行情况? 目前,当我的工作执行时,它会创建默认日志(即火花日志),我可以在AWS云手表中看到它。在AWS胶水中记录事件的最佳实践是什么?

  • 我有一个S3存储桶,每天的文件都会被丢弃。AWS爬虫从该位置爬网数据。在我的glue作业运行的第一天,它将获取AWS crawler创建的表中的所有数据。例如,在第一天就有三个文件。(即file1.txt、file2.txt、file3.txt)和glue job在执行glue job的第一天处理这些文件。第二天,另两个文件到达S3位置。现在,在S3位置,这些是存在的文件。(即file1.txt、

  • 我需要对S3 bucket执行附加加载。 每天都有新的. gz文件被转储到S3位置,胶水爬虫读取数据并在数据曲库中更新它。 Scala AWS Glue作业运行并仅过滤当前日期的数据。 上面过滤的数据按照一些规则进行转换,并创建一个分区的动态数据帧(即年、月、日)级别。 现在,我需要将这个动态数据帧写入到S3 bucket中,其中包含所有前一天的分区。事实上,我只需要将一个分区写入S3存储桶。目前