当前位置: 首页 > 知识库问答 >
问题:

AWS胶PySpark替换空值

司马高昂
2023-03-14

我正在运行一个AWS Glue作业,使用从Glue自动生成的PySpark脚本,将S3上的管道分隔文件加载到RDS Postgres实例中。

最初,它抱怨某些列中的空值:

pyspark.sql.utils.IllegalArgumentException: u"Can't get JDBC type for null"
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = 
"xyz_catalog", table_name = "xyz_staging_files", transformation_ctx = 
"datasource0")
custom_df = datasource0.toDF()
custom_df2 = custom_df.fillna(-1)
custom_df3 = custom_df2.fromDF()

applymapping1 = ApplyMapping.apply(frame = custom_df3, mappings = [("id", 
"string", "id", "int"),........more code

http://spark.apache.org/docs/latest/api/python/pyspark.sql.sql.html#pyspark.sql.dataframe.fillna

现在,当我运行作业时,它会抛出以下错误:

Log Contents:
Traceback (most recent call last):
File "script_2017-12-20-22-02-13.py", line 23, in <module>
custom_df3 = custom_df2.fromDF()
AttributeError: 'DataFrame' object has no attribute 'fromDF'
End of LogType:stdout

我对Python和Spark是新手,尝试了很多,但不明白这一点。感谢一些专家在这方面的帮助。

custom_df3 = glueContext.create_dynamic_frame.fromDF(frame = custom_df2)
AttributeError: 'DynamicFrameReader' object has no attribute 'fromDF'
pyspark.sql.utils.IllegalArgumentException: u"Can't get JDBC type for null"

更新:确保DynamicFrame已经导入(从awsglue.context import DynamicFrame),因为fromDF/toDF是DynamicFrame的一部分。

请参阅https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html

共有1个答案

贺英悟
2023-03-14

你在打电话。来自错误的班级。应该是这样的:

from awsglue.dynamicframe import DynamicFrame
DyamicFrame.fromDF(custom_df2, glueContext, 'label')
 类似资料:
  • 我有一个数据帧。我需要将每个记录转换为JSON,然后使用JSON负载调用API将数据插入postgress。我在数据框中有14000条记录,要调用api并获得响应,需要5个小时。有没有办法提高性能。下面是我的代码片段。 注意:我知道通过做"json_insert=df_insert.toJSON()。收集()"我正在失去数据帧的优势。有没有更好的方法来完成。

  • 当我试图提取1个表时,我使用AWS Glue从EC2(Postgre)提取要转换的数据,并将其放在S3上。我得到一个错误,如下所示: 有什么我能做的吗?我试图删除null字段或fillna,但这些都不起作用。

  • 我每天都有csv文件被传递到S3,这些文件在当月是增量的。所以file1包含第1天的数据,file2包含第1天和第2天的数据,等等。每天我都想对该数据运行一个ETL并将其写入不同的S3位置,这样我就可以使用Athena查询它,而不会出现重复的行。本质上,我只想查询聚合数据的最新状态(这只是最近交付给S3的文件的内容)。 我认为书签不会起作用,因为增量交付包含以前文件中的数据,因此会产生重复。我知道

  • 编写此自定义项是为了用变量替换列的值。Python 2.7;Spark 2.2.0 变量L_1到L_3更新了每行的列。我这样称呼它: 错误是:

  • 目前有一个AWS胶水作业的问题,读取一个S3集合并将其写入AWS Redshift,其中我们有一列值。

  • 我想将数据帧列中的一个值替换为另一个值,我必须对许多列执行此操作(假设30/100列) 我已经经历过这个和这个了。 我可以在y列和z列中分别用Null替换“baz”。但我想对所有列都这样做——类似于下面的列表理解方式