当前位置: 首页 > 知识库问答 >
问题:

使用Java将Spark数据帧中的数组转换为DensEventor

华君浩
2023-03-14

我正在运行Spark 2.3。我想将以下数据帧中的列<code>features

+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[4.191401, -1.793...|
| 10|[-0.5674514, -1.3...|
| 20|[0.735613, -0.026...|
| 30|[-0.030161237, 0....|
| 40|[-0.038345724, -0...|
+---+--------------------+

root
 |-- id: integer (nullable = false)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = false)

我已经写了下面的< code>UDF,但它似乎不工作:

private static UDF1 toVector = new UDF1<Float[], Vector>() {

    private static final long serialVersionUID = 1L;

    @Override
    public Vector call(Float[] t1) throws Exception {

        double[] DoubleArray = new double[t1.length];
        for (int i = 0 ; i < t1.length; i++)
        {
            DoubleArray[i] = (double) t1[i];
        }   
    Vector vector = (org.apache.spark.mllib.linalg.Vector) Vectors.dense(DoubleArray);
    return vector;
    }
}

我希望提取以下特征作为向量,以便对其执行聚类。

我也注册UDF,然后继续调用它如下:

spark.udf().register("toVector", (UserDefinedAggregateFunction) toVector);
df3 = df3.withColumn("featuresnew", callUDF("toVector", df3.col("feautres")));
df3.show();  

在运行此代码段时,我面临以下错误:

ReadProcessData$1不能强制转换为org.apache.spark.sql.expressions。用户定义聚合函数

共有1个答案

贺福
2023-03-14

问题在于如何在Spark中注册< code>udf。不应使用< code > UserDefinedAggregateFunction ,它不是用于聚合的< code>udf,而是< code>udaf。相反,你应该做的是:

spark.udf().register("toVector", toVector, new VectorUDT());

然后,要使用注册的函数,请使用:

df3.withColumn("featuresnew", callUDF("toVector",df3.col("feautres")));

udf 本身应按如下方式稍作调整:

UDF1 toVector = new UDF1<Seq<Float>, Vector>(){

  public Vector call(Seq<Float> t1) throws Exception {

    List<Float> L = scala.collection.JavaConversions.seqAsJavaList(t1);
    double[] DoubleArray = new double[t1.length()]; 
    for (int i = 0 ; i < L.size(); i++) { 
      DoubleArray[i]=L.get(i); 
    } 
    return Vectors.dense(DoubleArray); 
  } 
};

请注意,在Spark 2.3中,您可以创建一个可以直接调用的scala样式的udf。从这个答案中:

UserDefinedFunction toVector = udf(
  (Seq<Float> array) -> /* udf code or method to call */, new VectorUDT()
);

df3.withColumn("featuresnew", toVector.apply(col("feautres")));
 类似资料:
  • 我正在尝试将熊猫DF转换为Spark one。测向头: 代码: 我得到了一个错误:

  • RDD是以数组[数组[字符串]的格式创建的,具有以下值: 我想用模式创建一个数据帧: 接下来的步骤: 给出以下错误:

  • 我正在尝试将RDD[String]转换为数据框。字符串是逗号分隔的,所以我希望逗号之间的每个值都有一列。为此,我尝试了以下步骤: 但我明白了: 这不是这篇文章的副本(如何将rdd对象转换为火花中的数据帧),因为我要求RDD[字符串]而不是RDD[行]。 而且它也不是火花加载CSV文件作为DataFrame的副本?因为这个问题不是关于将CSV文件读取为DataFrame。

  • 我有以下两个场景共享的前奏代码: 现在,我想将df转换为pyspark数据帧(

  • 我有这个数据框 我想转换这种形式的Numpy数组: 我正在使用转换为_矩阵函数,并在它重塑(1,4)后使用,但它不起作用!!它给我的格式是:有什么建议吗?我需要把它转换成那种格式,这样我就可以应用“精确回忆曲线”功能。

  • 我在Spark中有一个数据框,看起来像这样: 它有30列:只显示其中的一些! 因此,我必须在Scala中将这个数据帧转换成一个键值对,使用键作为数据帧中的一些列,并为这些键分配从索引0到计数(不同的键数)的唯一值。 例如:使用上面的案例,我希望在Scala中的map(key-value)集合中有一个输出,如下所示: 我对斯卡拉和斯帕克是新手,我试着做这样的事情。 但是,这不起作用。:/此操作完成后