当前位置: 首页 > 面试题库 >

将文字值设置为set的新列添加到DataFrame中

李昱
2023-03-14
问题内容
Map<File, Dataset<Row> allWords = ...
StructField[] structFields = new StructField[] {
        new StructField("word", DataTypes.StringType, false, Metadata.empty()),
        new StructField("count", DataTypes.IntegerType, false, Metadata.empty()),
        new StructField("files", ???, false, Metadata.empty())
};
StructType structType = new StructType(structFields);

Dataset<Row> allFilesWords = spark.createDataFrame(new ArrayList<>(), structType);

for (Map.Entry<File, Dataset<Row>> entry : allWords.entrySet()) {
    Integer fileIndex = files.indexOf(entry.getKey());
    allFilesWords.unionAll(
            allWords.get(entry.getKey()).withColumn("files", ???)
    );
}

在上面的给定代码中,allWords表示从文件到其字数(Row: (string, integer))的映射。现在,我想将所有文件的结果聚合到一个DataFrame中,同时保留提到该单词的原始文件。由于最后,每个单词可能已在多个文件中提及,因此该files列设计为整数类型集(假设文件被映射为整数)。现在,我正在尝试向allWordsDataFrames添加一个新列,然后使用unionAll将它们合并在一起。

但是我不知道如何files使用仅包含一项的集合来定义和初始化新列(在此命名)fileIndex。

多亏了注释中提供的链接,我知道我应该使用它,functions.typedLit但是此函数要求提供第二个参数,但我不知道该为它提供什么。另外,我不知道如何定义列。最后一件事,提供的链接在Python中,而我正在寻找Java API。


问题答案:

我自己找到了解决方案(在一些帮助下):

Map<File, Dataset<Row> allWords = ...
StructField[] structFields = new StructField[] {
        new StructField("word", DataTypes.StringType, false, Metadata.empty()),
        new StructField("count", DataTypes.IntegerType, false, Metadata.empty()),
        new StructField("files", DataTypes.createArrayType(DataTypes.IntegerType), true, Metadata.empty())
};
StructType structType = new StructType(structFields);

Dataset<Row> allFilesWords = spark.createDataFrame(new ArrayList<>(), structType);
for (Map.Entry<File, Dataset<Row>> entry : allWords.entrySet()) {
    Integer fileIndex = files.indexOf(entry.getKey());
    allFilesWords.unionAll(
            allWords.get(entry.getKey())
                    .withColumn("files", functions.typedLit(seq, MyTypeTags.SeqInteger()))
    );
}

问题在于这TypeTag是Scala的编译时工件,根据我在另一个问题中得到的内容,它需要由Scala编译器生成,并且无法用Java生成。因此,我不得不TypeTag在Scala文件中编写我的自定义数据结构,并将其添加到我的Maven Java项目中。为此,我关注了本文。

这是我的MyTypeTags.scala文件:

import scala.reflect.runtime.universe._

object MyTypeTags {
  val SeqInteger = typeTag[Seq[Integer]]
}


 类似资料:
  • 问题内容: 我目前正在尝试从MongoDB中提取数据库,并使用Spark来将其提取到ElasticSearch中。 Mongo数据库具有纬度和经度值,但是ElasticSearch要求将它们强制转换为类型。 Spark中是否可以将and 列复制到or 的新列? 任何帮助表示赞赏! 问题答案: 我假设您从某种平面模式开始,如下所示: 首先让我们创建示例数据: 一种简单的方法是使用udf和case类:

  • 我想添加一个新列,并将其设置为带有的MultIndex,但我收到一个错误。 我的代码: 错误: 文件"/库/框架/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/core/indexes/base.py",第3078行,get_loc返回自己。_engine.get_loc(键)文件"熊猫/_libs/index.p

  • 问题内容: 与此问题类似,如何将空列添加到数据框?,我想知道向DataFrame添加一列空列表的最佳方法。 我想要做的基本上是初始化一列,然后遍历行以处理其中的一些行,然后在此新列中添加填充列表以替换初始化的值。 例如,如果下面是我的初始DataFrame: 然后,我最终希望得到这样的结果,其中每一行都经过单独处理(显示了示例结果): 当然,如果我尝试像使用其他任何常量一样进行初始化,它会认为我正

  • 以下是我目前的尝试: 我一直在思考如何动态地向列(mpg、cyl、disp)添加(1,2,3)。提前谢了。

  • 问题内容: 我想将属性值添加到查询中的xml字段中。我的例子如下 我希望我的最终结果看起来像这样 问题答案: 如果需要选择数据,则可以使用xquery: 甚至更简单: