由于ST_GeomFromText不是org.apache.spark.sql.functions的一部分,因此它不会在内部识别它。我需要首先为此函数定义UDF。意味着我需要定义该函数的定义,然后将该函数向spark注册为UDF,然后只有我才能使用此函数。
我陷入了开始定义此功能的困境,需要使用什么参数。
编辑
我使用的代码如下:
sparkSession.udf().register("ST_GeomFromText", new UDF1<String, String>() {
@Override
public String call(String txt ) {
return (new ST_GeomFromText(txt));
}
}, DataTypes.StringType);
我真的需要你的帮助。
谢谢
类似的问题-
我认为,您没有完全按照GeoSparkSQL-Overview /#quick-
start
进行操作-
按照快速入门,您需要将GeoSpark-core和GeoSparkSQL添加到项目POM.xml或build.sbt中
<!– Geo spark lib doc - https://datasystemslab.github.io/GeoSpark/api/sql/GeoSparkSQL-Overview/#quick-start-->
声明您的Spark会话
SparkSession sparkSession = SparkSession.builder()
.config(“spark.serializer”, KryoSerializer.class.getName())
.config(“spark.kryo.registrator”, GeoSparkKryoRegistrator.class.getName())
.master(“local[*]”)
.appName(“myGeoSparkSQLdemo”)
.getOrCreate();
从注册所有功能geospark-sql_2.3
的sparkSession
,这样它可以用来直接火花SQL
// register all functions from geospark-sql_2.3 to sparkSession
GeoSparkSQLRegistrator.registerAll(sparkSession);
SparkSession sparkSession = SparkSession.builder()
.config("spark.serializer", KryoSerializer.class.getName())
.config("spark.kryo.registrator", GeoSparkKryoRegistrator.class.getName())
.master("local[*]")
.appName("myGeoSparkSQLdemo")
.getOrCreate();
// register all functions from geospark-sql_2.3 to sparkSession
GeoSparkSQLRegistrator.registerAll(sparkSession);
try {
System.out.println(sparkSession.catalog().getFunction("ST_Geomfromtext"));
// Function[name='ST_GeomFromText', className='org.apache.spark.sql.geosparksql.expressions.ST_GeomFromText$', isTemporary='true']
} catch (Exception e) {
e.printStackTrace();
}
// https://datasystemslab.github.io/GeoSpark/api/sql/GeoSparkSQL-Function/
Dataset<Row> dataframe = sparkSession.sql("select ST_GeomFromText('POINT(-7.07378166 33.826661)')");
dataframe.show(false);
dataframe.printSchema();
/**
* +---------------------------------------------+
* |st_geomfromtext(POINT(-7.07378166 33.826661))|
* +---------------------------------------------+
* |POINT (-7.07378166 33.826661) |
* +---------------------------------------------+
*/
// using longitude and latitude column from existing dataframe
Datahtml" target="_blank">set<Row> df = sparkSession.sql("select -7.07378166 as longitude, 33.826661 as latitude");
df.withColumn("ST_Geomfromtext ",
expr("ST_GeomFromText(CONCAT('POINT(',longitude,' ',latitude,')'))"))
.show(false);
/**
* +-----------+---------+-----------------------------+
* |longitude |latitude |ST_Geomfromtext |
* +-----------+---------+-----------------------------+
* |-7.07378166|33.826661|POINT (-7.07378166 33.826661)|
* +-----------+---------+-----------------------------+
*/
我正在尝试使用一个继承的Scala函数(stuctType.diff())并获得一个NoSuchMethodError。 有人有什么想法吗?我使用的是Spark 1.6.2和Scala 2.10
我正在尝试从Spark官方网站运行Spark Streaming示例 这些是我在pom文件中使用的依赖项: 这是我的Java代码: 当我尝试从Eclipse运行它时,我遇到以下异常: 我从我的IDE(eclipse)运行它。我是否必须创建并将JAR部署到火花中以使其运行。如果有人知道这个异常,请分享您的经验。提前谢谢
我也在遵循Spark1.3文档。https://spark.apache.org/docs/latest/sql-programming-guide.html#推断-the-schema-using-reflection有一个解决方案吗? 下面是我的测试代码。
主要内容:Take函数示例在Spark中,函数的行为类似于数组。它接收一个整数值(比方说,n)作为参数,并返回数据集的前个元素的数组。 Take函数示例 在此示例中,返回现有数据集的前n个元素。要在Scala模式下打开Spark,请按照以下命令操作。 在此示例中,检索数据集的第一个元素。要在Scala模式下打开Spark,请按照以下命令操作。 使用并行化集合创建RDD。 现在,可以使用以下命令读取生成的结果。 应用函数来检
主要内容:First函数示例在Spark中,函数始终返回数据集的第一个元素。它类似于。 First函数示例 在此示例中,检索数据集的第一个元素。要在Scala模式下打开Spark,请按照以下命令操作。 使用并行化集合创建RDD。 现在,可以使用以下命令读取生成的结果。 应用函数来检索数据集的第一个元素。
主要内容:cogroup函数示例在Spark中,函数对不同的数据集执行,比方说,(K,V)和(K,W)并返回元组的数据集。此操作也称为groupWith。 cogroup函数示例 在这个例子中,将执行操作。要在Scala模式下打开Spark,请按照以下命令操作。 使用并行化集合创建RDD。 现在,可以使用以下命令读取生成的结果。 使用并行化集合创建另一个RDD。 现在,可以使用以下命令读取生成的结果。 应用函数对值进行分组。 现