当前位置: 首页 > 知识库问答 >
问题:

AWS Glue中的简单ETL工作显示“文件已经存在”

酆晔
2023-03-14

我们正在评估一个大数据项目的AWS胶水,并使用一些ETL。我们添加了一个爬虫程序,它可以正确地从S3中提取CSV文件。最初,我们只想将CSV转换为JSON,并将文件放在另一个S3位置(相同的bucket,不同的路径)。

我们使用了AWS提供的脚本(这里没有自定义脚本)。并且只是映射了所有列。

目标文件夹为空(作业刚刚创建),但作业失败,出现“文件已存在”:此处为快照。我们假装丢弃的S3位置在开始作业之前输出是空的。然而,在出现错误后,我们确实看到了两个文件,但它们似乎是部分文件:快照

有什么想法吗?

以下是完整堆栈:

    Container: container_1513099821372_0007_01_000001 on ip-172-31-49-38.ec2.internal_8041
    LogType:stdout
    Log Upload Time:Tue Dec 12 19:12:04 +0000 2017
    LogLength:8462
    Log Contents:
    Traceback (most recent call last):
    File "script_2017-12-12-19-11-08.py", line 30, in 
    datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options =
    {
        "path": "s3://primero-viz/output/tcw_entries"
    }
    , format = "json", transformation_ctx = "datasink2")
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/dynamicframe.py", line 523, in from_options
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/context.py", line 175, in write_dynamic_frame_from_options
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/context.py", line 198, in write_from_options
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/data_sink.py", line 32, in write
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/data_sink.py", line 28, in writeFrame
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    File "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o86.pyWriteDynamicFrame.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-63-141.ec2.internal, executor 1): java.io.IOException: File already exists:s3://primero-viz/output/tcw_entries/run-1513105898742-part-r-00000
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:604)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:793)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:176)
    at com.amazonaws.services.glue.hadoop.TapeOutputFormat.getRecordWriter(TapeOutputFormat.scala:65)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1158)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1085)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1005)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:996)
    at com.amazonaws.services.glue.HadoopDataSink$$anonfun$2.apply$mcV$sp(DataSink.scala:192)
    at com.amazonaws.services.glue.HadoopDataSink.writeDynamicFrame(DataSink.scala:202)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:48)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.IOException: File already exists:s3://primero-viz/output/tcw_entries/run-1513105898742-part-r-00000

共有2个答案

西门鹏程
2023-03-14

将写入模式设置为“追加”,无论您的负载是增量的,还是“覆盖”,如果它是满载的。

例如:

events.toDF().write.json(events_dir, mode="append", partitionBy=["partition_0", "partition_1"])
沈茂
2023-03-14

目标文件夹为空

空与不存在不同。它看起来不支持写入模式,因此可能必须先删除目录。

 类似资料:
  • 当我用将file对象添加到用于创建JList的DefaultListModel中时,JList中显示的文本是文件的路径。但我只想显示文件名。但是,我不能执行,因为稍后我需要访问file对象,而不仅仅是一个字符串。

  • 问题内容: 我从提供者那里得到了以下简化的JSON字符串,自从我使用Visual Studio和vb.Net以来已经有很长时间了,所以我很生锈! 我想使用JSON.Net将其转换为我可以使用的东西,我已经阅读了示例等,并且JSON.net看起来像答案,但我无处可去。 我的.Net代码(Me.TextBox1.Text包含上面显示的JSON) 有人可以解释一下为什么obj.ID总是以0结尾,为什么我

  • 我试图在谷歌电子表格中使用条件格式,这样,如果我键入一个列中已经存在的邮政编码,它将突出显示该单元格。 因为我不确定如何在同一列中完成所有操作,所以我在此电子表格中创建了两个列,以便您可以看到我的问题。C列有电子表格中所有邮政编码的列表。当我在B列中输入相同的邮政编码进行测试时,一些标记就像它们应该做的那样,而另一些则什么都不做。我使用这个自定义条件格式公式:

  • log4j-core-2.0-beta8.jar 并使用默认配置完成了尽可能简单的示例: 但由于某种原因,我得到的只是‘测试结束’。我从来没有得到过Hello World,无论在哪里我都能找到。我找错地方了吗?我的理解是,在默认配置下,它应该被打印到控制台,并带有“test over”。我已经将日志级别更改为info,仍然相同。我试着把记录器功能放到一个类中,仍然是一样的。我正在log4j文档页面

  • 这是我的组件: 启动时,它将多次运行该功能,但当它加载后,我单击导航时,它将无法工作。

  • 我正在尝试打开一个Excel文件,其完整路径存储在String filePath中,filePath的内容为C:/fullpath/Names.xlsx.我正在尝试以下代码: 它在最后一个语句中抛给我以下错误。 此外,该文件不会保存在工作目录中。不确定这里出了什么问题。请一些Java专家研究一下好吗? 谢啦