我正在尝试使用流数据帧将一个文件(csv.gz格式)转换为拼花地板。我必须使用流式数据帧,因为压缩的文件大小约为700 MB。作业是使用AWS EMR上的自定义jar运行的。源、目标和检查点位置都在AWS S3上。但一旦我尝试写入检查点,作业就会失败,并出现以下错误:
java.lang.IllegalArgumentException:
Wrong FS: s3://my-bucket-name/transformData/checkpoints/sourceName/fileType/metadata,
expected: hdfs://ip-<ip_address>.us-west-2.compute.internal:8020
在EMR集群上运行的其他spark作业从S3读写并成功运行到S3(但它们不使用spark流)。所以我不认为这是S3文件系统访问的问题,正如本文所建议的那样。我也看了这个问题,但答案对我的情况没有帮助。我使用的是Scala:2.11.8和Spark:2.1.0。以下是我到目前为止的代码
...
val spark = conf match {
case null =>
SparkSession
.builder()
.appName(this.getClass.toString)
.getOrCreate()
case _ =>
SparkSession
.builder()
.config(conf)
.getOrCreate()
}
// Read CSV file into structured streaming dataframe
val streamingDF = spark.readStream
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter","|")
.option("timestampFormat", "dd-MMM-yyyy HH:mm:ss")
.option("treatEmptyValuesAsNulls", "true")
.option("nullValue","")
.schema(schema)
.load(s"s3://my-bucket-name/rawData/sourceName/fileType/*/*/fileNamePrefix*")
.withColumn("event_date", "event_datetime".cast("date"))
.withColumn("event_year", year($"event_date"))
.withColumn("event_month", month($"event_date"))
// Write the results to Parquet
streamingDF.writeStream
.format("parquet")
.option("path", "s3://my-bucket-name/transformedData/sourceName/fileType/")
.option("compression", "gzip")
.option("checkpointLocation", "s3://my-bucket-name/transformedData/checkpoints/sourceName/fileType/")
.partitionBy("event_year", "event_month")
.trigger(ProcessingTime("900 seconds"))
.start()
我还尝试在URI中使用s3n://而不是s3://但这似乎没有任何效果。
Tl;dr升级spark或避免使用s3作为检查点位置
ApacheSpark(结构化流):S3检查点支持
此外,您可能应该使用s3a://指定写入路径
S3a:系统是S3本机s3n://文件系统的继承者,它使用Amazon的库与S3进行交互。这允许S3a支持更大的文件(不超过5GB限制)、更高性能的操作等。该文件系统旨在取代S3 Native的/succession:通过替换URL模式,可以从s3n://URL访问的所有对象也可以从s3a访问。
https://wiki.apache.org/hadoop/AmazonS3
所以我一个月前开始学习spark和cassandra。我遇到了这样一个问题,我必须使用spark预先聚合来自传感器的数据,然后将其放入cassandra表。 这是我的应用程序流程 问题是,我需要将数据按秒、分、时、日、月聚合到每年。这导致我在cassandra中创建了90多个聚合表。 就我的进展而言,我发现我必须使用每个聚合的一个写流查询将每个聚合下沉到每个cassandra表,这导致我创建了这个
我有一个spark流媒体工作,它从Kafka读取数据并对其执行一些操作。我正在一个纱线集群Spark 1.4.1上运行该作业,该集群有两个节点,每个节点有16 GB RAM,每个节点有16芯。 我已将这些conf传递给spark提交作业: --主纱线簇--num executors 3--驱动器内存4g--executor内存2g--executor cores 3 作业返回此错误并在运行一段时间
我试图运行火花作业,基本上加载数据在卡桑德拉表。但它也产生了以下错误。
我有一个火花1.2.0的火花流环境,我从本地文件夹中检索数据,每次我发现一个新文件添加到文件夹中时,我都会执行一些转换。 为了对DStream数据执行分析,我必须将其转换为数组 然后,我使用获得的数据提取我想要的信息,并将其保存在HDFS上。 由于我真的需要使用Array操作数据,因此不可能使用(这将正常工作)在HDFS上保存数据,我必须保存RDD,但使用此先决条件,我终于有了名为part-000
我正在尝试使用pysparkn和spack-csv使用以下代码将火花数据帧写入s3 但是,我得到的错误是“输出目录已经存在”,我确信输出目录在作业开始之前不存在,我尝试使用不同的输出目录名称运行,但写入仍然失败。 如果我在作业失败后查看s3桶,我发现很少有零件文件是由火花写入的,但当它尝试写入更多时它失败了,脚本在本地运行良好,我在aws集群上使用10个火花执行器。有人知道这段代码有什么问题吗?
我有下面的GitHub操作配置文件(为了简单起见删除了部分)。 我遇到的主要问题是节点测试。js版本8失败。但其他人都成功了。在这种情况下,如果一个作业失败,GitHub操作往往会取消所有作业。 有没有办法改变这种行为,以便即使一个作业失败,所有作业也能继续运行?这有助于查明特定版本的问题。