当前位置: 首页 > 知识库问答 >
问题:

使用pyspark从S3服务器读取时出错:[java.lang.IllegalArgumentException]

潘弘壮
2023-03-14

我正在尝试使用S3中的pyspark读取文件,并出现以下错误--

Traceback (most recent call last):
  File "C:/Users/Desktop/POC_Pyspark/create_csv_ecs.py", line 41, in <module>
df = sc.read.csv("s3a://my_bucket/my_folder/test_data.csv")
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py", line 166, in load
return self._df(self._jreader.load(path))
  File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1160, in __call__
  File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
  File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: java.lang.IllegalArgumentException
at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1307)
at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1230)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:280)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:705)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:388)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)

我的代码很简单,但我可以使用BOTO3连接,但我需要使用pyspark,因为我正在处理的文件很大,还需要在CSV上进行一些聚合-

from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import Window, SparkSession
import pyspark as spark
import pyspark.sql.functions as fn

conf = spark.SparkConf().setAppName("Full PSGL Aggregation - PySpark")

sc = spark.SQLContext(spark.SparkContext(conf=conf))
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "access_key")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "secret key")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://end point:8020")

df = sc.read.csv("s3a://my_bucket/my_folder/test_data.csv")
print(df)

java版本-

java version "1.8.0_66"

python-

Python 3.6.1 |Anaconda 4.4.0 (64-bit)

Pypark/火花-

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Python version 3.6.1 (default, May 11 2017 13:25:24)

如果需要更多信息,请告诉我。

共有2个答案

龚昊然
2023-03-14

作为hadoop-aws-2.7.7.jarS3AFileSystem中的源代码,至少将此配置添加到您的core-site.xml

<property>
  <name>fs.s3a.impl</name>
  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
  <name>fs.s3a.multipart.size</name>
  <value>104857600</value>
</property>
<property>
  <name>fs.s3a.multipart.threshold</name>
  <value>134217728</value>
</property>
<property>
  <name>fs.s3a.threads.max</name>
  <value>256</value>
</property>
<property>
  <name>fs.s3a.threads.core</name>
  <value>4</value>
</property>
<property>
  <name>fs.s3a.block.size</name>
  <value>33554432</value>
</property>

漆雕皓轩
2023-03-14

从S3AFileSystem的源代码来看,似乎要么<代码>fs。s3a。Thread。最大值或fs。s3a。Thread。核心参数缺失或不相关。

就我而言,加上

sc._jsc.hadoopConfiguration().set("fs.s3a.threads.max", "4")
sc._jsc.hadoopConfiguration().set("fs.s3a.threads.core", "4")

解决了问题。

 类似资料:
  • 问题内容: 我正在使用predis并订阅了频道并进行监听,它抛出错误并死了,如下图所示,过了60秒后,肯定不是我的Web服务器错误或超时。 目前正在讨论的一个类似的问题在这里。无法得到太多。 我尝试将predis conf文件中的connection_timeout设置为0,但没有太大帮助。 另外,如果我继续使用(向其发送数据并进行处理)该工作程序,则不会出现任何错误。因此,它可能在某处超时,并且

  • 我在SSL\u error\u日志文件中得到了这种Apache代理错误。 你知道这种错误吗?客户端必须等待3分钟以等待响应,然后才能强制断开连接。更改ssl证书后发生此错误。我们已将SHA-1证书更新为与所有浏览器兼容的SHA-2证书。观察并记录了各种超时情况。

  • 我正在尝试用PySpark从HBase写/读。 环境: null 我的火花提交是: 当我写到HBase时,一切都很好,数据从mydf保存到HBase表中。 当我试图阅读时,它很好,只有在激发行动之前。df.show()-导致错误。

  • 我刚刚安装了Ruby193和Ruby200,在创建了一个新的应用程序之后,我正在尝试加载服务器,但是我得到了下面的错误。 我用gem'sqlite3'替换了sqlite3 gem,'~ 有人知道怎么修吗? Gemfile是 Bundle Show提供捆绑包中包含的宝石: actionmailer(3.2.13) actionpack(3.2.13) activemodel(3.2.13) acti

  • 我在生产环境中使用redis server,在HA中使用redis Sentinel。但是阅读文档,当客户机连接到sentinel时,sentinel将主redis服务器提供给客户机。 是所有的读写操作都在主服务器上进行,而从服务器只用于故障转移,还是我们可以使用从服务器读取数据?

  • 问题内容: 我正在尝试使用来自Python的SSH从服务器读取文件。我正在使用Paramiko进行连接。我可以连接到服务器并运行类似的命令,然后从服务器获取数据,但是我尝试读取的某些文件的大小约为1 GB或更大。 如何使用Python在服务器上逐行读取文件? 附加信息:通常要做的是运行命令并将结果存储在变量中并加以解决。但是由于这里的文件很大,我正在寻找一种从服务器逐行读取文件的方法。 编辑:我可