当前位置: 首页 > 知识库问答 >
问题:

Firehose JSON->S3 Parquet->ETL Spark,错误:无法推断Parquet的架构

孟凯泽
2023-03-14

看起来这应该很容易,就像这是这组特性的核心用例一样,但它一直是一个又一个问题。

最近的一次尝试是通过Glue-Devendpoint(PySpark和Scalaendpoint)运行命令。

遵循此处的说明:https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-repl.html

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
glueContext = GlueContext(SparkContext.getOrCreate())
df = glueContext.create_dynamic_frame.from_catalog(database="mydb", table_name="mytable")

生成此错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/dynamicframe.py", line 557, in from_catalog
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/context.py", line 136, in create_dynamic_frame_from_catalog
  File "/mnt/tmp/spark-0ba544c9-0b5a-42bb-8a54-9b3138205179/userFiles-95708c09-59f6-4b6d-9016-d0375d511b7a/PyGlue.zip/awsglue/data_source.py", line 36, in getFrame
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'

它还会在其中一个设置行中生成此警告:

18/06/26 19:09:35 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.

整体设置非常简单:我们有一个传入的Kinesis数据流,该流的处理器生成JSON Kinesis数据流,一个配置为将JSON流写入S3中拼花文件的Kinesis firehose流,然后是实现这一点所需的胶水目录配置。

Athena可以很好地看到数据,但是Scala/PySpark脚本出错了。

任何想法/建议?

共有1个答案

陆啸
2023-03-14

好吧,仍然不清楚为什么会这样,但是,找到了一个解决方案!

基本上,不使用生成的代码:

val datasource0 = glueContext.getCatalogSource(
        database = "db",
        tableName = "myTable",
        redshiftTmpDir = "",
        transformationContext = "datasource0"
    ).getDynamicFrame()

使用此代码

val crawledData = glueContext.getSourceWithFormat(
        connectionType = "s3",
        options = JsonOptions(s"""{"path": "s3://mybucket/mytable/*/*/*/*/"}"""),
        format = "parquet",
        transformationContext = "source"
    ).getDynamicFrame()

这里的关键点似乎是*/*/*/*/*/-如果我只指定根文件夹,我会得到拼花错误,并且(显然)正常的/***/通配符不起作用。

 类似资料:
  • 但随后: UPDATE:当使用master=“local”连接时,此操作工作,当连接到master=“MySparkCluster”时,此操作失败。

  • 下面的代码是在pyspark shell中运行时的工作文件,但在spark submit master Thread中执行时失败。 我在这里怎么了? 错误:

  • 有人可以帮助我解决这个问题,我与火花数据帧? 当我执行myFloatRDD时。toDF()我收到一个错误: 类型错误:无法推断类型的架构:类型“浮动” 我不明白为什么... 例子: 谢谢

  • 作为一个学习项目,我正在编写一个通过TCP的聊天服务器。我今天一直在修补ws crate,但我遇到了一个问题。这是我编写的代码,修改了他们的服务器示例。 当我尝试编译它时,我得到一个错误: 为什么会这样?我怎样才能解决这个问题?

  • 我正在尝试编写一个android静态编程语言应用程序,但我得到了以下错误。我哪里出错了? 这是我如何声明我的HashMap: 错误: 类型推断失败。没有足够的信息来推断构造函数HashMap中的参数K。请明确说明

  • 我正在运行EMR笔记本中的所有代码。 火花版本 temp_df.print模式 温度df。显示(2) 温度df。写拼花地板(path='s3://project7878/clean\u data/temperatures.parquet',mode='overwrite',partitionBy='year') 火花阅读拼花地板(path='s3://project7878/clean\u dat