我是使用AWS Glue的新手,我不明白ETL作业是如何收集数据的。我使用爬虫从S3存储桶中的一些文件生成我的表模式,并检查了ETL作业中的自动生成脚本,如下所示(略有修改):
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mytablename", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("data", "string", "data", "string")], transformation_ctx = "applymapping1")
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://myoutputbucket"}, format = "json", transformation_ctx = "datasink2")
当我运行此作业时,它成功地从我的爬虫用于生成表模式的存储桶中获取我的数据,并按预期将数据放入我的目标s3存储桶中。
我的问题是:可以说,我在这个脚本中看不到任何地方“加载”了数据。我知道我把它指向了爬虫程序生成的表,但从这个文档:
AWS Glue中的表和数据库是AWS Glue数据目录中的对象。它们包含元数据;它们不包含来自数据存储的数据。
如果该表只包含元数据,那么ETL作业如何检索数据存储区(在我的示例中是S3 bucket)中的文件?我之所以这样问,主要是因为我想以某种方式修改ETL作业,以便在不同的存储桶中转换相同结构的文件,而无需编写新的爬虫程序,但也因为我想加强对Glue服务的一般理解。
如果深入了解AWS Glue数据目录。它具有驻留在数据库下的表。通过单击这些表,您可以看到元数据,该元数据显示当前表作为爬网程序运行的结果指向哪个s3文件夹。
您仍然可以通过数据目录选项添加表,手动在s3结构化文件上创建表:
并将其指向您的s3位置。
另一种方法是使用AWS-athena控制台创建指向s3位置的表。您将使用常规创建表脚本,其中位置字段保存您的s3位置。
需要了解的主要内容是:Glue datasource目录(DateBases和tables)始终与Athena同步,Athena是一种无服务器查询服务,可以使用标准SQL轻松分析Amazon S3中的数据。您可以从Glue控制台/Athena查询控制台创建表/数据库。
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydatabase", table_name = "mytablename", transformation_ctx = "datasource0")
上面这行Glue Spark代码为您使用Glue data catalog源表创建初始数据帧发挥了神奇的作用,除了元数据、模式和表属性之外,它还具有指向数据所在的数据存储(s3位置)的位置。
完成应用映射
后,这部分(数据链路)代码正在将数据实际加载到目标集群/数据库中。
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://myoutputbucket"}, format = "json", transformation_ctx = "datasink2")
问题内容: 如果我知道作业ID,有什么方法可以检索作业配置(配置中的某些属性)? 基本上,我正在做的是检查当前是否有任何正在运行的作业,然后我要检查当前正在运行的任何作业中是否存在某些属性值? 用于检索当前正在运行的作业的部分代码: 问题答案: 您可以在作业跟踪器中查看正在运行的作业的配置,该配置通常在端口50030上运行。
直接从HDFS读取文件,而不将其复制到本地文件系统。不过,我将结果复制到本地文件系统。 hduser@ubuntu:/usr/local/hadoop$mkdir/tmp/gutenberg-output bin/hadoop dfs-getmerge/user/hduser/gutenberg-output/tmp/gutenberg-output deprecated:不推荐使用此脚本执行hd
问题内容: 我目前正在使用Python处理数据流 模板 ,我想访问作业ID并将其保存到特定的Firestore文档。 是否可以访问作业ID? 我在文档中找不到与此有关的任何内容。 问题答案: 您可以通过在管道中进行调用来实现(请参见下面的完整代码)。一种可能性是始终使用相同的作业名称来调用模板,这很有意义,否则可以将作业前缀作为运行时参数传递。使用正则表达式解析作业列表,以查看该作业是否包含名称前
问题内容: 我有以下表格及其关系。我将JSON数据存储在client_services表中。它们是使用MySQL查询来检索JSON值的任何方式,如下所示: 还是可以进一步规范化client_services表? 表: 表: 表: 表: 问题答案: 由于很多人都亲自问过我这个问题,所以我想我会再作一次修改。这是一个具有SELECT,Migration和View Creation的完整SQL的要点,
问题内容: 我已经通过DefaultTableModel用(data,headers)构造函数填充了一个JTable 。用户可以编辑表,并且我希望能够将新数据加载回数组()中。请注意,我宁愿不只是一点一点地更新数组,而是能够从表中完全加载新数组。如何才能做到这一点? 问题答案: 我回想起来,出于第二种想法,您不需要任何类型转换-TableModel是一个接口,其中包含您需要的所有3个方法调用。:)
{“type”:“record”、“name”:“twitter_schema”、“namespace”:“com.miguno.avro”、“fields”:[{“name”:“username”、“type”:“string”、“doc”:“Twitter.com上的用户帐户名称”}、{“name”:“tweet”、“type”:“string”、“doc”:“用户的Twitter消息内容”}