当前位置: 首页 > 知识库问答 >
问题:

在Spark DataFrame[Java]中合并多个列

栾钟展
2023-03-14

如何将一个数据帧中的多列(比如3列)组合成一个列(在一个新的数据帧中),其中每一行都成为一个Spark DenseVector?类似于这个线程,但在Java中,有一些下面提到的调整。

我试着用这样的UDF:

private UDF3<Double, Double, Double, Row> toColumn = new UDF3<Double, Double, Double, Row>() {

    private static final long serialVersionUID = 1L;

    public Row call(Double first, Double second, Double third) throws Exception {           
        Row row = RowFactory.create(Vectors.dense(first, second, third));

        return row; 
    }
};

然后注册UDF:

sqlContext.udf().register("toColumn", toColumn, dataType);

其中<code>数据类型<code>为:

StructType dataType = DataTypes.createStructType(new StructField[]{
    new StructField("bla", new VectorUDT(), false, Metadata.empty()),
    });

当我在一个有3列的数据帧上调用这个UDF并打印出新数据帧的模式时,我得到如下结果:

根| - 特征:结构(可为空 = 真) | | - bla: 向量(可空 = 假)

这里的问题是,我需要一个向量在外部,而不是在结构中。像这样:

root
 |-- features: vector (nullable = true)

我不知道如何得到它,因为寄存器函数要求UDF的返回类型为DataType(这反过来又不提供VectorType)

共有1个答案

华萧迟
2023-03-14

实际上,您使用以下数据类型手动将向量类型嵌套到结构中:

new StructField("bla", new VectorUDT(), false, Metadata.empty()),

如果您移除外部StructField,您将得到您想要的。当然,在这种情况下,您需要稍微修改一下函数定义的签名。也就是说,您需要返回类型向量。

请参见下面我的具体例子,它以一个简单的JUnit测试的形式说明了我的意思。

package sample.spark.test;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.junit.Test;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class ToVectorTest implements Serializable {
  private static final long serialVersionUID = 2L;

  private UDF3<Double, Double, Double, Vector> toColumn = new UDF3<Double, Double, Double, Vector>() {

    private static final long serialVersionUID = 1L;

    public Vector call(Double first, Double second, Double third) throws Exception {
      return Vectors.dense(first, second, third);
    }
  };

  @Test
  public void testUDF() {
    // context
    final JavaSparkContext sc = new JavaSparkContext("local", "ToVectorTest");
    final SQLContext sqlContext = new SQLContext(sc);

    // test input
    final DataFrame input = sqlContext.createDataFrame(
        sc.parallelize(
            Arrays.asList(
                RowFactory.create(1.0, 2.0, 3.0),
                RowFactory.create(4.0, 5.0, 6.0),
                RowFactory.create(7.0, 8.0, 9.0),
                RowFactory.create(10.0, 11.0, 12.0)
            )),
        DataTypes.createStructType(
            Arrays.asList(
                new StructField("feature1", DataTypes.DoubleType, false, Metadata.empty()),
                new StructField("feature2", DataTypes.DoubleType, false, Metadata.empty()),
                new StructField("feature3", DataTypes.DoubleType, false, Metadata.empty())
            )
        )
    );
    input.registerTempTable("input");

    // expected output
    final Set<Vector> expectedOutput = new HashSet<>(Arrays.asList(
        Vectors.dense(1.0, 2.0, 3.0),
        Vectors.dense(4.0, 5.0, 6.0),
        Vectors.dense(7.0, 8.0, 9.0),
        Vectors.dense(10.0, 11.0, 12.0)
    ));

    // processing
    sqlContext.udf().register("toColumn", toColumn, new VectorUDT());
    final DataFrame outputDF = sqlContext.sql("SELECT toColumn(feature1, feature2, feature3) AS x FROM input");
    final Set<Vector> output = new HashSet<>(outputDF.toJavaRDD().map(r -> r.<Vector>getAs("x")).collect());

    // evaluation
    assertEquals(expectedOutput.size(), output.size());
    for (Vector x : output) {
      assertTrue(expectedOutput.contains(x));
    }

    // show the schema and the content
    System.out.println(outputDF.schema());
    outputDF.show();

    sc.stop();
  }
}
 类似资料:
  • 问题内容: 如果要在Java中将两个列表合并为一个,可以使用。但是,如果我想合并多个列表怎么办? 这有效: 但这似乎并不是最好的解决方案,阅读起来也不是特别好。可悲的是不起作用。对于我来说,多次使用并为所有条目重复创建自己的列表似乎也不理想。那我该怎么办呢? 问题答案: 借助下面的代码中所示的Stream API, Java 8可以轻松实现这一目标。我们基本上已经创建了一个包含所有列表的流,然后,

  • 问题内容: 我有一个要合并的文件数组。这是我尝试过的,但是没有用。 问题答案: 使用IOUtils可以做到这一点。看我的例子: 如果您不能使用IOUtils lib,请编写自己的实现。例:

  • 问题内容: 我从两个不同的来源使用了一些JSON,最后得到两个s,我想将它们组合为一个。 数据: 使用http://json.org/java/库的代码: 因此,在这种情况下,我想将和组合在一起,以制作一个全新的产品或彼此结合。除了将它们拉开并分别加s 之外,还有其他想法吗? 问题答案: 如果要使用两个键Object1和Object2创建新对象,则可以执行以下操作: 如果要合并它们,例如顶级对象有

  • Java8在收集器中提供了一个groupingBy函数,但它给出了元素的映射。我需要把地图转换成单个列表。 它还返回结果,但我想要的是对列表本身执行分组操作,这样我就不必重新分配它,因为在lambda表达式中不可能重新分配。 但是nodelistgrouped.values()返回Collection(List)

  • (注意这不是按字母顺序,蓝莓排在狒狒之前) 当然,只要有一个列表不是空的,我就可以用一个计数器在“superlist”中循环,一个接一个地向resultslist中添加项目: 但最好使用一些优化的LINQ函数来实现这一点。我一直在研究Zip、GroupBy和Aggregate,但无法使它们工作。 那么:有没有一个漂亮的LINQ函数,或者多个函数的组合,可以把它变成漂亮的代码,或者我应该坚持(也许优

  • 问题内容: 我正在使用’Lucene.Net’库,并且有以下问题。 不是创建两个单独的QueryParser对象并使用它们来获得两个Hits对象,而是可以使用一个QueryParser对象在两个字段上进行搜索,这样我只有一个Hits对象,它可以为我提供每个文档的总体得分? 问题答案: 有3种方法可以做到这一点。 第一种方法是手动构造查询,这是QueryParser内部的工作。这是执行此操作最强大的