我有一个数据集
数据集输入
+------+---+-----------+---------------------------------------------+---------------+
| Id| value | time |aggregateType |
+------+---------------+---------------------------------------------+---------------+
|0001 | [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum |
+------+---------------+---------------------------------------------+---------------+
预期数据集输出DS
+------+---------------+---------------------------------------------+---------------+-----------+
| Id| value | time |aggregateType | value_new |
+------+---------------+---------------------------------------------+---------------+-----------+
|0001 | [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum | 9.4 |
+------+---------------+---------------------------------------------+---------------+-----------+
我试过的代码。
inputDS.withColumn("value_new",functions.lit(inputDS.map(new MapFunction<Row,Double>(){
public double call(Row row){
String aggregateType = row.getAS("aggregateType");
List<long> timeList = row.getList("time");
List<long> valueList= row.getList("value");
return getAggregate(aggregateType ,timeList,valueList);
}}),Encoders.DOUBLE())));
错误
Unsupported literal type class org.apache.spark.sql.Dataset [value:double]
请注意,如果我错误地使用了< code>map
功能,请告诉我是否有任何解决方法。
谢谢你。!
您会收到此错误,因为您正在尝试使用 Dataset.map()
的结果创建函数文本 (lit()),
您可以在文档中看到它是一个数据集。您可以在 Dataset.with 哥伦布()
的 API 中看到,您需要一个作为列的参数。
看起来您需要创建一个用户定义的函数。看看如何使用JAVA调用Spark数据帧上的UDF?
在PySpark中或者至少在Scala中,Apache Spark中是否有与Pandas Melt函数等价的函数? 到目前为止,我一直在用Python运行一个示例数据集,现在我想对整个数据集使用Spark。
类似的问题,但没有足够的观点来评论。 根据最新的Spark文档,< code>udf有两种不同的用法,一种用于SQL,另一种用于DataFrame。我找到了许多关于如何在sql中使用< code>udf的例子,但是还没有找到任何关于如何在数据帧中直接使用< code>udf的例子。 o.p.针对上述问题提供的解决方案使用,这是,将根据Spark Java API文档在Spark 2.0中删除。在那
我有一个熊猫数据帧,它被定义为空,然后我想在做一些计算后添加一些行。 我试图做到以下几点: 如果我试着打印并附加到测试显示中 因此,很明显,该行没有被添加到数据帧中。 我希望输出是
我想过滤掉具有“c2”列前3个字符的记录,无论是“MSL”还是“HCP”。 所以输出应该如下所示。 有谁能帮忙吗? 我知道df。过滤器($c2.rlike(“MSL”))--用于选择记录,但如何排除记录? 版本:Spark 1.6.2 Scala:2.10
并将其应用于数据表的一列--这是我希望这样做的: 我还没有找到任何简单的方法,正在努力找出如何做到这一点。一定有一个更简单的方法,比将数据rame转换为和RDD,然后从RDD中选择行来获得正确的字段,并将函数映射到所有的值,是吗?创建一个SQL表,然后用一个sparkSQL UDF来完成这个任务,这更简洁吗?
我想在spark中读取一个CSV,将其转换为DataFrame,并使用将其存储在HDFS中 在Apache Spark中将CSV文件加载为DataFrame的正确命令是什么?