当前位置: 首页 > 知识库问答 >
问题:

如何使用 Java UDF 向火花数据帧添加新列

孙自怡
2023-03-14

我有一个数据集

数据集输入

 +------+---+-----------+---------------------------------------------+---------------+
 |    Id| value         |     time                                   |aggregateType  |
 +------+---------------+---------------------------------------------+---------------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |
 +------+---------------+---------------------------------------------+---------------+

预期数据集输出DS

 +------+---------------+---------------------------------------------+---------------+-----------+
 |    Id| value         |     time                                    |aggregateType  | value_new |
 +------+---------------+---------------------------------------------+---------------+-----------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |   9.4     |
 +------+---------------+---------------------------------------------+---------------+-----------+

我试过的代码。

 inputDS.withColumn("value_new",functions.lit(inputDS.map(new MapFunction<Row,Double>(){

 public double call(Row row){
 String aggregateType = row.getAS("aggregateType");
 List<long> timeList = row.getList("time");
 List<long> valueList= row.getList("value");  

 return  getAggregate(aggregateType ,timeList,valueList);    

 }}),Encoders.DOUBLE())));

错误

 Unsupported literal type class org.apache.spark.sql.Dataset [value:double]

请注意,如果我错误地使用了< code>map功能,请告诉我是否有任何解决方法。

谢谢你。!

共有1个答案

壤驷经国
2023-03-14

您会收到此错误,因为您正在尝试使用 Dataset.map() 的结果创建函数文本 (lit()),您可以在文档中看到它是一个数据集。您可以在 Dataset.with 哥伦布() 的 API 中看到,您需要一个作为列的参数。

看起来您需要创建一个用户定义的函数。看看如何使用JAVA调用Spark数据帧上的UDF?

 类似资料:
  • 在PySpark中或者至少在Scala中,Apache Spark中是否有与Pandas Melt函数等价的函数? 到目前为止,我一直在用Python运行一个示例数据集,现在我想对整个数据集使用Spark。

  • 类似的问题,但没有足够的观点来评论。 根据最新的Spark文档,< code>udf有两种不同的用法,一种用于SQL,另一种用于DataFrame。我找到了许多关于如何在sql中使用< code>udf的例子,但是还没有找到任何关于如何在数据帧中直接使用< code>udf的例子。 o.p.针对上述问题提供的解决方案使用,这是,将根据Spark Java API文档在Spark 2.0中删除。在那

  • 我有一个熊猫数据帧,它被定义为空,然后我想在做一些计算后添加一些行。 我试图做到以下几点: 如果我试着打印并附加到测试显示中 因此,很明显,该行没有被添加到数据帧中。 我希望输出是

  • 我想过滤掉具有“c2”列前3个字符的记录,无论是“MSL”还是“HCP”。 所以输出应该如下所示。 有谁能帮忙吗? 我知道df。过滤器($c2.rlike(“MSL”))--用于选择记录,但如何排除记录? 版本:Spark 1.6.2 Scala:2.10

  • 并将其应用于数据表的一列--这是我希望这样做的: 我还没有找到任何简单的方法,正在努力找出如何做到这一点。一定有一个更简单的方法,比将数据rame转换为和RDD,然后从RDD中选择行来获得正确的字段,并将函数映射到所有的值,是吗?创建一个SQL表,然后用一个sparkSQL UDF来完成这个任务,这更简洁吗?

  • 您好,我正在尝试在构建中下载,,,以及。sbt文件如下: 我只是把这个联机,所以我不确定使用哪个版本等。 有人能向我解释一下我应该如何修复这个. sbt文件吗?我花了几个小时试图弄清楚,但没有一个建议奏效。我安装了通过家酿和我的版本 我所有的错误都是关于: