当前位置: 首页 > 知识库问答 >
问题:

如何将spark DataFrame作为CSV存储到Azure Blob存储器中

东郭展
2023-03-14

我正试图从本地Spark集群将Spark DataFrame存储为Azure Blob存储中的CSV

首先,我用Azure Account/Account键设置配置(我不确定什么是正确的配置,所以我已经设置了所有这些)

sparkContext.getConf.set(s"fs.azure.account.key.${account}.blob.core.windows.net", accountKey)

sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.${account}.dfs.core.windows.net", accountKey)
    sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.${account}.blob.core.windows.net", accountKey)
filePath = s"wasbs://${container}@${account}.blob.core.windows.net/${prefix}/${filename}"
dataFrame.coalesce(1)
  .write.format("csv")
  .options(Map(
    "header" -> (if (hasHeader) "true" else "false"),
    "sep" -> delimiter,
    "quote" -> quote
  ))
  .save(filePath)
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:185)
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:185)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:184)
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

似乎这个问题已经在数据库论坛上报告了!!

在Azure Blob上存储DataFrame的正确方法是什么?

共有1个答案

元彦君
2023-03-14

事实证明,在作业失败之前,有一个内部错误

Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
    at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
    at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
    at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
    at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
    ... 18 more

在用实际数据创建临时文件之后,它试图将文件移动到用户使用cloudblob.startcopyfromblob指定的位置。像往常一样,microsft人员通过将此方法重命名为cloudblob.startcopy来解决这个问题。

我使用的是“org.apache.hadoop”%“hadoop-azure”%“3.2.1”,这是“hadoop-azure”的最新版本,它似乎与旧的startcopyfromblob保持一致,所以我需要使用具有此方法的旧的azure-storage版本,可能是2.x.x。

参见https://github.com/azure/azure-storage-java/issues/113

 类似资料:
  • 我正在尝试在谷歌云上迁移我的rails应用程序。我已将活动存储与地面军事系统上创建的存储桶连接起来。我上传了bucket中的文件夹“storage”,但应用程序中的所有图像都有404错误。 如何正确迁移GCS中的本地存储文件夹? 谢谢你的建议

  • 我正在运行一个部署到Google App Engine的Node.js应用程序。我还使用了winston库和'StackDriver Logging winston Plugin'[@google-cloud/logging-Winston]。 “链接到温斯顿日志设置教程” “链接到将日志导出到谷歌存储桶的教程” 我通过创建一个接收器将stackdriver日志导出到Google存储桶。目前,所有

  • 问题内容: 有人可以告诉我,在以下情况下如何进行? 接收文件(MS文件,ODS,PDF) 通过Apache Tika提取公元核心元数据+通过jackrabbit-content-extractors提取内容 使用Jackrabbit将文档(内容)及其元数据存储到存储库中 ? 检索文档+元数据 我对第3点和第4点感兴趣… 详细信息:该应用程序正在以交互方式处理文档(一些分析-语言检测,单词计数等。+

  • 问题内容: 有没有一种方法可以将数组存储到mysql字段中?我正在创建一个评论评分系统,因此我想存储用户ID数组以防止进行多次投票。我将创建一个新表,其中包含评论ID和对此评论进行投票的用户ID数组。然后,我将加入评论表和该表,并检查当前用户ID是否存在于选民数组或注释中。如果是这样,将禁用投票图标。我想我会避免以这种方式在循环中使用mysql查询。 您碰巧知道更好的方法吗? 问题答案: 您始终可

  • 问题内容: 我已经看过所有类似的线程,阅读了文档,并尝试了许多组合来将空值存储在db中,并且每次都失败。 我正在使用MySQL。 我定义了一个字段。我从csv文件填充db,并且某些单元格没有值。Django文档说: 由于我正在与我一起工作,因此我希望将一个空字符串(csv中的一个空单元格)存储在db中。因此,我(认为)必须添加到该字段中。实际上,我已经尝试了更多: 每次我向数据库插入一个空字符串时