当前位置: 首页 > 知识库问答 >
问题:

如何将任何新的库(如spack-sftp)添加到我的Pyspark代码中?

孟佑运
2023-03-14

当我试图在spark配置中设置包依赖项“spark sftp”时,我得到ClassNotFoundException。但当我使用以下命令执行脚本时,它会起作用:

spark submit——包com。springml:spark-sftp_2.11:1.1.1测试。py

下面是我的代码。有人能告诉我,如何在不将包作为参数传递给spark submit的情况下执行pyspark脚本吗?

import sys
import datetime
import pyspark
from pyspark.sql import *
from pyspark.sql import SparkSession, SQLContext, Row, HiveContext
from pyspark import SparkContext

#Create new config
conf = (pyspark.conf.SparkConf()
.set("spark.driver.maxResultSize", "16g")
.set("spark.driver.memory", "20g")
.set("spark.executor.memory", "20g")
.set("spark.executor.cores", "5")
.set("spark.shuffle.service.enabled", "true")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.initialExecutors", "24")
.set("spark.dynamicAllocation.minExecutors", "6")
.set("spark.submit.deployMode", "client")
.set("spark.jars.packages", "com.springml:spark-sftp_2.11:1.1.1")
.set("spark.python.worker.memory", "4g")
.set("spark.default.parallelism", "960")
.set("spark.executor.memoryOverhead", "4g")
.setMaster("yarn-client"))


# Create new context
spark = SparkSession.builder.appName("AppName").config(conf=conf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")


df = spark.read.format("com.springml.spark.sftp").option("host", "HOST").option("username", "HOSTNAME").option("password", "pass").option("fileType", "csv").option("inferSchema", "true").load("/test/sample.csv")

输出java.lang.reflect.ethod.invokeClassNotFoundException:未能找到数据源:ethod.java:498请在py4j.reflection.找到包nvoker.invokeDataSource$. lookupDataSource(DataSnvoker.java:244)在py4j.reflection.DataFrameRngine.invoke(DataFrameRngine.java:357)在ateway.invokeDataFrameRateway.java:282(DataFrameRpy4j.commands.)在ommand.invokeNativeMethodAccessorImpl.invoke0(Native Method)在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.在:: Mjava.lang.(Mcom.springml.spark.sftp.)在http://spark.apache.org/third-party-projects.htmlMethodIorg.apache.spark.sql.execution.datasources.(MethodIource.scala:635)在org.apache.spark.sql.反射Eeader.load(反射Eeader.scala:190)在py4j. Gorg.apache.spark.sql.(Geader.load)在eader.scala:174AbstractCsun.reflect.方法(AbstractCommand. java: 132)在py4j. CallCommand.执行(CallCommand. java: 79)在py4j. GatewayConnec. run(GatewayConnec. java: 238)在jjj.

共有1个答案

易昌翰
2023-03-14

提交spark作业时,可以指定要安装的软件包。对于这个,您可以将这个maven依赖项指定为:

> $SPARK_HOME/bin/spark-shell --packages com.springml:spark-sftp_2.11:1.1.3
 类似资料:
  • 问题内容: 中国货币的ISO 4217代码为CNY。由于限制了使用该货币进行的全球自由交易,因此存在第二种“离岸”等价货币,称为CNH。维基百科对此有一些总结。 CNH不在ISO 4217中,但我希望能够在我的应用程序中使用它,而不必编写自己的Currency类。大概在JVM安装目录中有某种列表。如何添加其他货币代码? 问题答案: 似乎Java 7已添加对此功能的支持。 对于较早的版本,您可以使用

  • 我下载了一些jar文件(例如)并想在本地将它们添加到visual studio代码中,而不需要任何额外的下载,这样我就可以从类和方法的自动完成功能中获益,我应该如何做呢? 我已经在谷歌搜索了一段时间,没有找到任何有用的东西,所以即使链接到其他答案也是赞赏的。

  • 问题内容: 我想在我的Gradle版本(版本1.0)中添加集成测试。它们应与我的常规测试分开运行,因为它们需要将webapp部署到本地主机(它们测试该webapp)。这些测试应该能够使用在我的主要源代码集中定义的类。我如何做到这一点? 问题答案: 这花了我一段时间才能弄清楚,在线资源也不是很好。所以我想记录我的解决方案。 这是一个简单的gradle构建脚本,除了主要和测试源集之外,还具有intTe

  • 问题内容: 我目前正在尝试从MongoDB中提取数据库,并使用Spark来将其提取到ElasticSearch中。 Mongo数据库具有纬度和经度值,但是ElasticSearch要求将它们强制转换为类型。 Spark中是否可以将and 列复制到or 的新列? 任何帮助表示赞赏! 问题答案: 我假设您从某种平面模式开始,如下所示: 首先让我们创建示例数据: 一种简单的方法是使用udf和case类:

  • 问题内容: 如何将servlets API添加到项目的pom.xml中 mvnrepository.com有很多servlet api和名称类似的项目,我不知道哪个是正确的。还是所有人都还好吗? 问题答案: 我相信大多数Web /应用程序服务器都捆绑了servlet api版本,因此您不希望将api捆绑到.war文件中。您需要找出服务器随附的版本,然后才能使用 用您的版本替换servlet-api

  • 我想潜入CN1的Soures。我已经使用Git遵循了这个和这个教程。然而,在NetBeans中,当我用Ctrl键单击CN1对象时,它显示“显示生成的源文件。没有源附加到类'JAR文件‘。”如果我附上我刚刚下载的ie“CodenameOne/CodenameOne/src”的源代码,源代码仍然找不到! 任何帮助都很感激, 问候