当前位置: 首页 > 知识库问答 >
问题:

如何使用scala[复制]排列Spark中的行和列

西门智
2023-03-14

我想从文本文件中获取以下格式

first line
column1;column2;column3
column1;column2;column3
last line

要将其转换为没有第一行和最后一行的数据帧,我跳过了第一行和最后一行,但随后我变成了一行和onw列中的其余文本如何排列这些行?我还有一个数据框架的模式

var textFile = sc.textFile("*.txt")
val header = textFile.first()
val total = textFile.count()
var rows = textFile.zipWithIndex().filter(x => x._2 < total - 1).map(x => x._1).filter(x => x !=  header)

val schema = StructType(Array(
  StructField("col1", IntegerType, true),
  StructField("col2", StringType, true),
  StructField("col3", StringType, true),
  StructField("col4", StringType, true)
))

共有1个答案

康赞
2023-03-14

您应该执行以下操作(为了清晰起见,请发表评论)

//creating schema
import org.apache.spark.sql.types._
val schema = StructType(Array(
  StructField("col1", StringType, true),
  StructField("col2", StringType, true),
  StructField("col3", StringType, true)
))

//reading text file and finding total lines
val textFile = sc.textFile("*.txt")
val total = textFile.count()

//indexing lines for filtering the first and the last lines
import org.apache.spark.sql.Row
val rows = textFile.zipWithIndex()
    .filter(x => x._2 != 0 && x._2 < total - 1)
  .map(x => Row.fromSeq(x._1.split(";").toSeq))   //converting the lines to Row of sequences

//finally creating dataframe
val df = sqlContext.createDataFrame(rows, schema)
df.show(false)

这应该能给你

+-------+-------+-------+
|col1   |col2   |col3   |
+-------+-------+-------+
|column1|column2|column3|
|column1|column2|column3|
+-------+-------+-------+
 类似资料:
  • 阅读Spark method sortByKey: 是否可能只返回“N”个数量的结果。因此,与其返回所有结果,不如返回前10名。我可以将已排序的集合转换为数组,并使用方法,但既然这是一个O(N)操作,有没有更有效的方法?

  • 我想使用spark dataframe将行转换为列。 我的桌子是这样的 我想把它转换成 我用了下面的代码:- 但我得到的结果是-- 任何人都可以帮助得到渴望的结果。

  • 我在Spark中有一个数据框架,其中包含许多列和我定义的udf。我想要返回相同的数据帧,除了一列被转换。此外,我的udf接收字符串并返回时间戳。有一个简单的方法可以做到这一点吗?我试过了 但这返回一个RDD,并且只返回转换后的列。

  • 问题内容: 有没有一种方法可以限制使用spark sql 2.2.0从jdbc源获取的记录数? 我正在处理将一个大于200M的记录从一个MS Sql Server表移动(和转换)到另一个MS Sql表的任务: 在工作的同时,很明显,它首先要从数据库中加载所有200M条记录,首先要花18分钟的时间,然后将我希望用于测试和开发目的的有限数量的记录返回给我。 在take(…)和load()之间切换会产生

  • scala/火花在火花外壳中使用udf函数在数据框列中进行数组操作 < code>df.printSchema 样本数据: 在udf.jar,我有这个函数来获取上限日期在date_arr根据x: 添加jar到火花外壳: 在火花外壳,我有HiveContext作为,并创建函数: 当我进行查询时:,期望有一个这样的数据帧: 但是,它会抛出以下错误: 组织 apache.spark.sql.Analys

  • 在第一个窗口row_number1到4中,新的秩(新列)将是 在第一个窗口row_number5到8中,新的秩(新列)将是 在第一个窗口中,Row_Number9要Rest,新的秩(新列)将是 但这给了我: 此外,尝试了。rowsbetween(-3,0)但这也给我带来了错误: