当前位置: 首页 > 知识库问答 >
问题:

将spark数据帧从azure databricks的笔记本作业保存到azure blob存储会导致java。lang.NoSuchMethodError

姚雅珺
2023-03-14

我在Azure数据库中使用笔记本创建了一个简单的工作。我试图保存一个火花数据帧从笔记本到Azure Blob存储。附上样本代码

import traceback

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Attached the spark submit command used
# spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,
# com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py

# Tried with com.microsoft.azure:azure-storage:2.2.0

SECRET_ACCESS_KEY = "xxxxx"
STORAGE_NAME = "my_storage"
CONTAINER = "my_container"
SUB_PATH = "/azure_dbs_check/"
FILE_NAME = "result"

spark = SparkSession \
    .builder \
    .appName("azure_dbs_to_azure_blob") \
    .getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()

try:
    spark_context = spark.sparkContext
    fs_acc_key = "fs.azure.account.key." + STORAGE_NAME + ".blob.core.windows.net"

    spark.conf.set("fs.wasbs.impl",
                   "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
    spark.conf.set(fs_acc_key, SECRET_ACCESS_KEY)

    file_path = 'wasbs://' + CONTAINER + '@' + STORAGE_NAME + '.blob.core.windows.net' + SUB_PATH + FILE_NAME

    df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")
    print("Written successful")
except Exception as exp:
    print("Exception occurred")
    print(traceback.format_exc())

当我在本地机器上运行spark submit时,上面的代码起作用。使用的spark submit命令是

Spark-提交-主本地[1]-包org.apache.hadoop: hadoop-Azure: 2.7.2,com.microsoft.azure: azure-存储: 3.1.0./write_to_blob_from_spark.py

可能的根本原因是

引起原因:java.lang.NoSuchmethod odError:
com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob

所以我把软件包降级为com。微软azure:azure存储:2.2.0,其中包含startCopyFromBlob方法
(在com.microsoft.azure:azure存储:3.x.x版本中,CloudBlob上这个不推荐使用的startCopyFromBlob()已被删除)

即使在降级过程之后,错误仍然保持不变

附加错误堆栈跟踪,

    Traceback (most recent call last):
      File "<command-4281470986294005>", line 28, in <module>
        df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")
      File "/databricks/spark/python/pyspark/sql/readwriter.py", line 738, in save
        self._jwrite.save(path)
      File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
        answer, self.gateway_client, self.target_id, self.name)
      File "/databricks/spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
        format(target_id, ".", name), value)
    py4j.protocol.Py4JJavaError: An error occurred while calling o255.save.
    : org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:192)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:110)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:108)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:128)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:116)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:116)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
        at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
        at py4j.Gateway.invoke(Gateway.java:295)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:251)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 52, 10.2.3.12, executor 0): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
        at org.apache.spark.scheduler.Task.run(Task.scala:112)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.IllegalStateException: Error closing the output.
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:880)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.close(UnivocityGenerator.scala:85)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CSVFileFormat.scala:193)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 11 more
    Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
        at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
        at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
        ... 19 more

    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.html" target="_blank">foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2243)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        ... 33 more
    Caused by: org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
        at org.apache.spark.scheduler.Task.run(Task.scala:112)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
    Caused by: java.lang.IllegalStateException: Error closing the output.
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:880)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.close(UnivocityGenerator.scala:85)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CSVFileFormat.scala:193)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 11 more
    Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
        at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
        at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
        ... 19 more

包括Spark提交包:

  • 组织。阿帕奇。hadoop:hadoop azure:2.7.2,

本地机器:
Python 3.6
Spark版本2.4.4使用Scala版本2.11.12

Databricks详细信息:
集群信息:
5.5LTS(包括Apache Spark 2.4.3、Scala 2.11)
Python 3(3.5)

Runtime 5.5版本说明说Azure存储5.2.0com.microsoft.azure包已经安装在环境中。

问题是否是因为spark从环境(5.2.0版本)中获取库,即使在job中指定了另一个版本(2.2.0)?在5.2.0等版本中,startCopyFromBlob()方法被删除。

我已经记录了我在谷歌文档中尝试的各种案例/罐子组合

意见:

>

  • 数据库工作使用预先安装的库Azure存储:5.2.0。此软件包没有com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob()方法。(在4. x. x版本中被starCopy()替换)。Azure存储被固定为5.2.0

    因此,我尝试使用最新的hadoop azure:3.2.1来获取不调用不推荐方法的jar。但这导致了一个新的错误
    java。lang.NoClassDefFoundError:org/apache/hadoop/fs/StreamCapabilities。

    流能力类存在于hadoop通用包中。因此,我包含了最新的hadopop-通用(3.2.1)。
    这导致了java.lang.NoSuchmethod odError:org.apache.hadoop.security.ProviderUtils.exclude不兼容的凭据提供商()。
    原因:
    org.apache.hadoop: hadopop-通用: 2.7.3是预先安装在Azure运行时。此hadoop-通用: 2.7.3不具有ProviderUtils.exclude不兼容凭据提供者()方法。

    因为这两个包(hadoop common:2.7.3)

    从hadoop azure:3.2.1(最新版本为2019年11月)到hadoop azure:2.8.0,excludeCompatibleCredentialProviders()在内部被调用
    低于2.8.0时,am开始获取旧错误
    NoSuchMethodError:com。微软蔚蓝色的存储斑点。云团。startCopyFromBlob

  • 共有1个答案

    盛柏
    2023-03-14

    另一种方法是创建挂载:

    https://docs.databricks.com/data/data-sources/azure/azure-storage.html

    然后根据需要调整保存路径。

    另外我建议用这个

    spark.conf.set(
      "fs.azure.account.key.<storage-account-name>.blob.core.windows.net",
      "<storage-account-access-key>")
    

    而不是

    spark_context._jsc.hadoopConfiguration().set(fs_acc_key, SECRET_ACCESS_KEY)
    

    因为您使用的是DataFrame api而不是RDD api。

    编辑

    在Databricks社区集群中运行以下代码并修改了spark。conf.set语句。

    import traceback
    
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StringType
    
    # Attached the spark submit command used
    # spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,
    # com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py
    
    # Tried with com.microsoft.azure:azure-storage:2.2.0
    
    SECRET_ACCESS_KEY = "ACCESSKEY"
    STORAGE_NAME = "ACCOUNTNAME"
    CONTAINER = "CONTAINER"
    SUB_PATH = "/azure_dbs_check/"
    FILE_NAME = "result"
    
    spark = SparkSession \
        .builder \
        .appName("azure_dbs_to_azure_blob") \
        .getOrCreate()
    
    df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
    df.show()
    
    try:
        fs_acc_key = "fs.azure.account.key." + STORAGE_NAME + ".blob.core.windows.net"
    
        spark.conf.set("spark.hadoop.fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
        spark.conf.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
        spark.conf.set(fs_acc_key, SECRET_ACCESS_KEY)
    
        file_path = 'wasbs://' + CONTAINER + '@' + STORAGE_NAME + '.blob.core.windows.net' + SUB_PATH + FILE_NAME
    
        print(file_path)
    
        df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")
    
        print("Written successful")
    except Exception as exp:
        print("Exception occurred")
        print(traceback.format_exc())
    
     类似资料:
    • 问题内容: 我在名为bot4CA.py的模块上使用cProfile,因此在控制台中键入: 模块运行并退出后,它将创建一个名为Thing.txt的文件,当我打开它时,那里有一些信息,其余的是一堆字符,而不是我想要的整齐的数据文件。有没有人知道如何使用cProfile并最终得到整齐有序的数据表,就像在命令行中正常使用时一样(除了在文件中)?这是.txt文件中某些数据的示例: 我真正想要的是,当您调用c

    • 我尝试从Socket TCP获取数据以附加到数据帧我收到数据并将它们执行到Seq(),但当我使用foreach将它们附加到数据帧时出现问题这是我的代码: 这是我跑步时的问题 Java语言组织中的lang.NullPointerException。阿帕奇。火花sql。SQLImplicits。localSeqToDatasetHolder(SQLImplicits.scala:231),位于Cl.C

    • 问题内容: 我想每15分钟通过一次cron作业将数据从熊猫存储到redis中,以下是我的代码:- 我使用下面的代码每15分钟将数据带入大熊猫,然后通过cron作业将其发送到redis字典mydict2。 我在键mydict2中一次又一次得到相同的输出。基本上,我想存储整个月的用户ID,并希望在月底存储唯一的ID。 另外我正在使用set方法,假设我有大量数据,这可能是最好的方法。 有人可以在这里帮我

    • 我正在处理一个包含uni_key和createdDate两列的数据帧。我运行一个SQL查询并将结果保存到中,现在我想将这些结果保存到csv文件中。有什么方法可以做到这一点吗?这是一个代码片段: 此代码当前出现以下错误: AttributeError:“DataFrameWriter”对象没有属性“csv”

    • (希望有人能帮我解决这个问题)非常感谢!!

    • 错误: 2021-10-13 12:25:27.092ERROR 21016---[nio-8080-exec-3]o. a. c. c. C.[.[.[ /].[调度Servlet]: Servlet.service()用于servlet[调度Servlet]在路径[]抛出异常[处理程序调度失败;嵌套异常java.lang.StackOverflow Error],具有根本原因 堆栈跟踪(多次重