我正在尝试从Amazon s3读取一个JSON文件,以创建一个火花上下文并使用它来处理数据。
Spark基本上在docker容器中。所以将文件放入docker路径也是PITA。因此将其推送到S3。
下面的代码解释了其余的内容。
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("first")
sc = SparkContext(conf=conf)
config_dict = {"fs.s3n.awsAccessKeyId":"**",
"fs.s3n.awsSecretAccessKey":"**"}
bucket = "nonamecpp"
prefix = "dataset.json"
filename = "s3n://{}/{}".format(bucket, prefix)
rdd = sc.hadoopFile(filename,
'org.apache.hadoop.mapred.TextInputFormat',
'org.apache.hadoop.io.Text',
'org.apache.hadoop.io.LongWritable',
conf=config_dict)
我得到以下错误-
Py4JJavaError Traceback (most recent call last)
<ipython-input-2-b94543fb0e8e> in <module>()
9 'org.apache.hadoop.io.Text',
10 'org.apache.hadoop.io.LongWritable',
---> 11 conf=config_dict)
12
/usr/local/spark/python/pyspark/context.pyc in hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
558 jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
559 valueClass, keyConverter, valueConverter,
--> 560 jconf, batchSize)
561 return RDD(jrdd, self)
562
/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.hadoopFile.
: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at org.apache.hadoop.fs.s3native.$Proxy20.initialize(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.RDD.take(RDD.scala:1060)
at org.apache.spark.rdd.RDD.first(RDD.scala:1093)
at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
at org.apache.spark.api.python.PythonRDD$.hadoopFile(PythonRDD.scala:543)
at org.apache.spark.api.python.PythonRDD.hadoopFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:744)
我已经清楚地提供了aswSecret AccessKey和awsAccessId。出了什么问题?
我建议通过这个链接。
在我的例子中,我使用实例概要文件凭据访问s3数据。
实例概要文件凭据–用于EC2实例,并通过Amazon EC2元数据服务提供。AWS Java SDK使用InstanceProfileCredentialsProvider加载这些凭据。
笔记
仅当未设置AWS_CONTAINER_CREDENTIALS_RELATIVE_URI时才使用实例配置文件凭据。有关详细信息,请参阅EC2ContainerCreentialsProviderWrapper。
对于pyspark,我使用设置访问s3内容。
def get_spark_context(app_name):
# configure
conf = pyspark.SparkConf()
# init & return
sc = pyspark.SparkContext.getOrCreate(conf=conf)
# s3a config
sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint',
's3.eu-central-1.amazonaws.com')
sc._jsc.hadoopConfiguration().set(
'fs.s3a.aws.credentials.provider',
'com.amazonaws.auth.InstanceProfileCredentialsProvider,'
'com.amazonaws.auth.profile.ProfileCredentialsProvider'
)
return pyspark.SQLContext(sparkContext=sc)
更多关于火花上下文的信息。
有关S3类访问,请参阅此。
我已经解决了添加包组织的问题。阿帕奇。hadoop:hadoop aws:2.7.1进入spark提交命令。
它将下载所有Hadoop缺失的包,这些包将允许您使用S3执行火花作业。
然后在工作中,您需要设置AWS凭据,如:
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_key)
关于设置凭据的另一个选项是将它们定义到spark/conf/spark-env中:
#!/usr/bin/env bash
AWS_ACCESS_KEY_ID='xxxx'
AWS_SECRET_ACCESS_KEY='xxxx'
SPARK_WORKER_CORES=1 # to set the number of cores to use on this machine
SPARK_WORKER_MEMORY=1g # to set how much total memory workers have to give executors (e.g. 1000m, 2g)
SPARK_EXECUTOR_INSTANCES=10 #, to set the number of worker processes per node
更多信息:
问题内容: 我正在尝试从Amazon s3读取JSON文件以创建spark上下文并使用它来处理数据。 Spark基本上位于docker容器中。因此,将文件放在docker路径中也是PITA。因此将其推送到S3。 以下代码说明了其余内容。 我收到以下错误- 我已经清楚地提供了aswSecretAccessKey和awsAccessId。怎么了 问题答案: 我已经解决了添加到spark-submit命
我看到Kafka Connect可以以Avro或JSON格式写入S3。但是没有Parket支持。添加这个有多难?
假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:
Kafka是否将S3支持从JSON连接到Parquet?感谢使用Kafka Connect S3提供的可用和替代建议
我试图连接到使用火花minio提供的s3,但它是说桶迷你库不存在。( 我正在使用以下指南进行连接。 https://github.com/minio/cookbook/blob/master/docs/apache-spark-with-minio.md 这些是我在scala中使用的依赖项。 "org.apache.spark " %% "spack-core"%"2.4.0","org.apac
有什么问题吗?