当前位置: 首页 > 知识库问答 >
问题:

在AWS EMR上使用自定义jar的Spark流作业在写入时失败

郦良才
2023-03-14

我正在尝试使用流数据帧将一个文件(csv.gz格式)转换为拼花地板。我必须使用流式数据帧,因为压缩的文件大小约为700 MB。作业是使用AWS EMR上的自定义jar运行的。源、目标和检查点位置都在AWS S3上。但一旦我尝试写入检查点,作业就会失败,并出现以下错误:

java.lang.IllegalArgumentException: 
Wrong FS: s3://my-bucket-name/transformData/checkpoints/sourceName/fileType/metadata,
expected: hdfs://ip-<ip_address>.us-west-2.compute.internal:8020

在EMR集群上运行的其他spark作业从S3读写并成功运行到S3(但它们不使用spark流)。所以我不认为这是S3文件系统访问的问题,正如本文所建议的那样。我也看了这个问题,但答案对我的情况没有帮助。我使用的是Scala:2.11.8和Spark:2.1.0。以下是我到目前为止的代码

...

    val spark = conf match {
      case null =>
        SparkSession
          .builder()
          .appName(this.getClass.toString)
          .getOrCreate()
      case _ =>
        SparkSession
          .builder()
          .config(conf)
          .getOrCreate()
    }

    // Read CSV file into structured streaming dataframe
    val streamingDF = spark.readStream
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("delimiter","|")
      .option("timestampFormat", "dd-MMM-yyyy HH:mm:ss")
      .option("treatEmptyValuesAsNulls", "true")
      .option("nullValue","")
      .schema(schema)
      .load(s"s3://my-bucket-name/rawData/sourceName/fileType/*/*/fileNamePrefix*")
      .withColumn("event_date", "event_datetime".cast("date"))
      .withColumn("event_year", year($"event_date"))
      .withColumn("event_month", month($"event_date"))

    // Write the results to Parquet
    streamingDF.writeStream
      .format("parquet")
      .option("path", "s3://my-bucket-name/transformedData/sourceName/fileType/")
      .option("compression", "gzip")
      .option("checkpointLocation", "s3://my-bucket-name/transformedData/checkpoints/sourceName/fileType/")
      .partitionBy("event_year", "event_month")
      .trigger(ProcessingTime("900 seconds"))
      .start()

我还尝试在URI中使用s3n://而不是s3://但这似乎没有任何效果。

共有1个答案

戚飞虎
2023-03-14

Tl;dr升级spark或避免使用s3作为检查点位置

ApacheSpark(结构化流):S3检查点支持

此外,您可能应该使用s3a://指定写入路径

S3a:系统是S3本机s3n://文件系统的继承者,它使用Amazon的库与S3进行交互。这允许S3a支持更大的文件(不超过5GB限制)、更高性能的操作等。该文件系统旨在取代S3 Native的/succession:通过替换URL模式,可以从s3n://URL访问的所有对象也可以从s3a访问。

https://wiki.apache.org/hadoop/AmazonS3

 类似资料:
  • 所以我一个月前开始学习spark和cassandra。我遇到了这样一个问题,我必须使用spark预先聚合来自传感器的数据,然后将其放入cassandra表。 这是我的应用程序流程 问题是,我需要将数据按秒、分、时、日、月聚合到每年。这导致我在cassandra中创建了90多个聚合表。 就我的进展而言,我发现我必须使用每个聚合的一个写流查询将每个聚合下沉到每个cassandra表,这导致我创建了这个

  • 我有一个spark流媒体工作,它从Kafka读取数据并对其执行一些操作。我正在一个纱线集群Spark 1.4.1上运行该作业,该集群有两个节点,每个节点有16 GB RAM,每个节点有16芯。 我已将这些conf传递给spark提交作业: --主纱线簇--num executors 3--驱动器内存4g--executor内存2g--executor cores 3 作业返回此错误并在运行一段时间

  • 我试图运行火花作业,基本上加载数据在卡桑德拉表。但它也产生了以下错误。

  • 我有一个火花1.2.0的火花流环境,我从本地文件夹中检索数据,每次我发现一个新文件添加到文件夹中时,我都会执行一些转换。 为了对DStream数据执行分析,我必须将其转换为数组 然后,我使用获得的数据提取我想要的信息,并将其保存在HDFS上。 由于我真的需要使用Array操作数据,因此不可能使用(这将正常工作)在HDFS上保存数据,我必须保存RDD,但使用此先决条件,我终于有了名为part-000

  • 我正在尝试使用pysparkn和spack-csv使用以下代码将火花数据帧写入s3 但是,我得到的错误是“输出目录已经存在”,我确信输出目录在作业开始之前不存在,我尝试使用不同的输出目录名称运行,但写入仍然失败。 如果我在作业失败后查看s3桶,我发现很少有零件文件是由火花写入的,但当它尝试写入更多时它失败了,脚本在本地运行良好,我在aws集群上使用10个火花执行器。有人知道这段代码有什么问题吗?

  • 我有下面的GitHub操作配置文件(为了简单起见删除了部分)。 我遇到的主要问题是节点测试。js版本8失败。但其他人都成功了。在这种情况下,如果一个作业失败,GitHub操作往往会取消所有作业。 有没有办法改变这种行为,以便即使一个作业失败,所有作业也能继续运行?这有助于查明特定版本的问题。