当前位置: 首页 > 知识库问答 >
问题:

AWS Glue job insert datetime字段为空(红移)

朱明知
2023-03-14

我是AWS Glue的新手。我正在尝试通过Glue作业插入redshift表,它有S3爬虫来读取csv文件和redshift映射爬虫用于表方案。

下面的作业正在尝试运行,其中create\u日期从S3插入到时间戳的红移列中。值始终为空。

粘合作业:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "salesdatabase", table_name = "sales_timestamp_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "salesdatabase", table_name = "sales_timestamp_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("city", "string", "city", "string"), ("country", "string", "country", "string"), ("amount", "string", "amount", "string"), ("create_date", "string", "create_date", "timestamp")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("city", "string", "city", "string"), ("country", "string", "country", "string"), ("amount", "string", "amount", "string"), ("create_date", "string", "create_date", "timestamp")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["country", "amount", "city", "create_date"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["country", "amount", "city", "create_date"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "salesdatabase", table_name = "metricsdb_public_sales_csv", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "salesdatabase", table_name = "metricsdb_public_sales_csv", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
## @type: DataSink
## @args: [database = "salesdatabase", table_name = "metricsdb_public_sales_csv", redshift_tmp_dir = TempDir, transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "salesdatabase", table_name = "metricsdb_public_sales_csv", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()

爬网程序架构详细信息:

来自爬虫的S3架构

列名数据类型

  1. 城市字符串
  2. 国家字符串
  3. 金额字符串
  4. create_date字符串

爬网程序列名称数据类型中的表架构

  1. 城市字符串
  2. 国家/地区字符串
  3. 金额字符串
  4. 创建\u日期时间戳

任何指针

共有1个答案

党佐
2023-03-14
I could resolve this.

Just convert Dynamic Frame to Spark Data Frame and apply transformation.

from pyspark.sql.functions import to_timestamp, col
from awsglue.dynamicframe import DynamicFrame

#Convert dynamic frame to data frame to use standard pyspark functions
data_frame = datasource0.toDF()
data_frame.show()
data_frame = data_frame.withColumn("created_date",
          to_timestamp(col("created_date"),"dd/MM/yy HH:mm"))
data_frame.show()


#Convert back to the dynamic frame
dynamic_frame_write = DynamicFrame.fromDF(data_frame, glueContext, 'dynamic_frame_write')

Just add this frame to datasink5
It worked for me
 类似资料:
  • 问题内容: 我尝试@Inject这样的字段(它是一个jar模块,在META-INF下存在空bean.xml): IDataProvider接口 数据提供者实现import javax.enterprise.context.ApplicationScoped; 我尝试注入DataProvider的类 如果我在Wildfly上运行此命令,则注入的dataProvider始终为null(DataCont

  • 问题内容: 我有一条更新语句,用于更新ID = xx的字段x,y和z。 在表中,我有几个不同的x_created_datetime字段(用于由不同人员维护/输入的记录的不同部分)。我想编写一个查询,如果为null则将更新此字段,但如果不为null则将其保留。 所以我有: 我需要的是一种添加以下内容的方法,但始终会更新上面的内容: 我希望我可以在不进行第二次事务处理的情况下就可以做到这一点。关于如何

  • 我有一个片段,在这里我调用自定义视图类(Dragger) 并且我有另一个片段与和适配器,在其中我调用方法通过主机的活动的。 我发现在中旋转后会获得新的引用,但当我从适配器调用时-为空。 我真的不明白为什么字段得到引用,然后又丢失了它) 有人知道这有什么不对吗? 提前谢谢! P. S.:主机活动

  • 我觉得这家初创公司有问题。cs,因为我的

  • 问题内容: 我正在努力从以下代码中获取正确的输出: 游乐场片段 打印时,结构字段为空。我敢肯定某个地方有一个愚蠢的错误,但是我仍然对Go还是陌生的,而且我已经在这里呆了几个小时。请帮忙。 问题答案: 这已经出现了很多次了。问题在于只能对导出的字段进行封送处理。 通过以大写(大写)字母开头来导出结构域。 在Go Playground上尝试一下。 请注意,JSON文本包含带有小写字母文本的字段名称,但

  • 问题内容: 我有一个具有字段()的Spring 类(),但是该字段是我尝试使用它时所用的。日志显示该bean和该bean都在创建,但是每当我尝试在服务bean上调用该方法时,都会得到一个a 。Spring为什么不自动接线该领域? 控制器类: 服务等级: 应该自动连接的服务bean,但不是: 当我尝试时,出现以下异常: 问题答案: 本文向大家介绍为什么我的Spring @Autowired字段为空?