如何将一个数据帧中的多列(比如3列)组合成一个列(在一个新的数据帧中),其中每一行都成为一个Spark DenseVector?类似于这个线程,但在Java中,有一些下面提到的调整。
我试着用这样的UDF:
private UDF3<Double, Double, Double, Row> toColumn = new UDF3<Double, Double, Double, Row>() {
private static final long serialVersionUID = 1L;
public Row call(Double first, Double second, Double third) throws Exception {
Row row = RowFactory.create(Vectors.dense(first, second, third));
return row;
}
};
然后注册UDF:
sqlContext.udf().register("toColumn", toColumn, dataType);
其中<code>数据类型<code>为:
StructType dataType = DataTypes.createStructType(new StructField[]{
new StructField("bla", new VectorUDT(), false, Metadata.empty()),
});
当我在一个有3列的数据帧上调用这个UDF并打印出新数据帧的模式时,我得到如下结果:
根| - 特征:结构(可为空 = 真) | | - bla: 向量(可空 = 假)
这里的问题是,我需要一个向量在外部,而不是在结构中。像这样:
root
|-- features: vector (nullable = true)
我不知道如何得到它,因为寄存器
函数要求UDF的返回类型为DataType
(这反过来又不提供VectorType)
实际上,您使用以下数据类型手动将向量类型嵌套到结构中:
new StructField("bla", new VectorUDT(), false, Metadata.empty()),
如果您移除外部StructField,您将得到您想要的。当然,在这种情况下,您需要稍微修改一下函数定义的签名。也就是说,您需要返回类型向量。
请参见下面我的具体例子,它以一个简单的JUnit测试的形式说明了我的意思。
package sample.spark.test;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.junit.Test;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ToVectorTest implements Serializable {
private static final long serialVersionUID = 2L;
private UDF3<Double, Double, Double, Vector> toColumn = new UDF3<Double, Double, Double, Vector>() {
private static final long serialVersionUID = 1L;
public Vector call(Double first, Double second, Double third) throws Exception {
return Vectors.dense(first, second, third);
}
};
@Test
public void testUDF() {
// context
final JavaSparkContext sc = new JavaSparkContext("local", "ToVectorTest");
final SQLContext sqlContext = new SQLContext(sc);
// test input
final DataFrame input = sqlContext.createDataFrame(
sc.parallelize(
Arrays.asList(
RowFactory.create(1.0, 2.0, 3.0),
RowFactory.create(4.0, 5.0, 6.0),
RowFactory.create(7.0, 8.0, 9.0),
RowFactory.create(10.0, 11.0, 12.0)
)),
DataTypes.createStructType(
Arrays.asList(
new StructField("feature1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("feature2", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("feature3", DataTypes.DoubleType, false, Metadata.empty())
)
)
);
input.registerTempTable("input");
// expected output
final Set<Vector> expectedOutput = new HashSet<>(Arrays.asList(
Vectors.dense(1.0, 2.0, 3.0),
Vectors.dense(4.0, 5.0, 6.0),
Vectors.dense(7.0, 8.0, 9.0),
Vectors.dense(10.0, 11.0, 12.0)
));
// processing
sqlContext.udf().register("toColumn", toColumn, new VectorUDT());
final DataFrame outputDF = sqlContext.sql("SELECT toColumn(feature1, feature2, feature3) AS x FROM input");
final Set<Vector> output = new HashSet<>(outputDF.toJavaRDD().map(r -> r.<Vector>getAs("x")).collect());
// evaluation
assertEquals(expectedOutput.size(), output.size());
for (Vector x : output) {
assertTrue(expectedOutput.contains(x));
}
// show the schema and the content
System.out.println(outputDF.schema());
outputDF.show();
sc.stop();
}
}
问题内容: 如果要在Java中将两个列表合并为一个,可以使用。但是,如果我想合并多个列表怎么办? 这有效: 但这似乎并不是最好的解决方案,阅读起来也不是特别好。可悲的是不起作用。对于我来说,多次使用并为所有条目重复创建自己的列表似乎也不理想。那我该怎么办呢? 问题答案: 借助下面的代码中所示的Stream API, Java 8可以轻松实现这一目标。我们基本上已经创建了一个包含所有列表的流,然后,
问题内容: 我有一个要合并的文件数组。这是我尝试过的,但是没有用。 问题答案: 使用IOUtils可以做到这一点。看我的例子: 如果您不能使用IOUtils lib,请编写自己的实现。例:
问题内容: 我从两个不同的来源使用了一些JSON,最后得到两个s,我想将它们组合为一个。 数据: 使用http://json.org/java/库的代码: 因此,在这种情况下,我想将和组合在一起,以制作一个全新的产品或彼此结合。除了将它们拉开并分别加s 之外,还有其他想法吗? 问题答案: 如果要使用两个键Object1和Object2创建新对象,则可以执行以下操作: 如果要合并它们,例如顶级对象有
Java8在收集器中提供了一个groupingBy函数,但它给出了元素的映射。我需要把地图转换成单个列表。 它还返回结果,但我想要的是对列表本身执行分组操作,这样我就不必重新分配它,因为在lambda表达式中不可能重新分配。 但是nodelistgrouped.values()返回Collection(List)
(注意这不是按字母顺序,蓝莓排在狒狒之前) 当然,只要有一个列表不是空的,我就可以用一个计数器在“superlist”中循环,一个接一个地向resultslist中添加项目: 但最好使用一些优化的LINQ函数来实现这一点。我一直在研究Zip、GroupBy和Aggregate,但无法使它们工作。 那么:有没有一个漂亮的LINQ函数,或者多个函数的组合,可以把它变成漂亮的代码,或者我应该坚持(也许优
问题内容: 我正在使用’Lucene.Net’库,并且有以下问题。 不是创建两个单独的QueryParser对象并使用它们来获得两个Hits对象,而是可以使用一个QueryParser对象在两个字段上进行搜索,这样我只有一个Hits对象,它可以为我提供每个文档的总体得分? 问题答案: 有3种方法可以做到这一点。 第一种方法是手动构造查询,这是QueryParser内部的工作。这是执行此操作最强大的