当前位置: 首页 > 知识库问答 >
问题:

如何将任何分隔的文本文件转换为拼花/拼花-使用spark sql将列编号/结构动态更改为拼花/拼花?

钱渊
2023-03-14

我们需要每天将文本数据转换为拼花地板/avro,如果输入来自多个具有不同结构的源,我们希望使用基于spark sql的scala代码来实现这一点,而不考虑分隔符和列数或结构。

共有2个答案

魏康安
2023-03-14

我在spark 2.1.0-spark SQL中编写了这段代码

使用的输入

1238769|Michael|Hoffman|50000|New York
1238769|Michael1|Hoffman1|50000|New York1
1238770|Michael2|Hoffman2|50000|New York2
1238771|Michael3|Hoffman3|50000|New York3
1238772|Michael4|Hoffman4|50000|New York4
1238773|Michael5|Hoffman5|50000|New York5
1238774|Michael6|Hoffman6|50000|New York6
1238775|Michael7|Hoffman7|50000|New York7
1238776|Michael8|Hoffman8|50000|New York8
1238777|Michael9|Hoffman9|50000|New York9

在此示例中,我将把管道(“|”)文本文件转换为拼花

步骤#1:读取输入变量

//creating spark session
val spark = SparkSession.builder().appName("Text to Parquet").master("local[*]").getOrCreate()
import spark.implicits._

//Assigning values to the variables
val input_location = args(0).trim.toString()
val delimiter = "\\|" //You can make it dynamic by passing it as an argument
val selectColString_location = args(1).trim().toString()
val output_location = args(2).trim().toString()

步骤#2:读取输入文本数据并根据分隔符进行拆分

//Reading data from text file
val input_rdd = spark.sparkContext.textFile(input_location)

//Split the input data using the delimiter(we are suing pipe(\\|) as delimiter for this example)
val input_array_rdd:RDD[Array[String]] = input_rdd.map(x => x.split(delimiter, -1))

步骤#3:使用toDF将步骤#2中创建的rdd转换为数据帧,其中只有一列-col,它将是一个数组列

//Converting input_array_rdd into dataframe with only one column - col
val input_df:DataFrame = input_array_rdd.toDF("col")

//Creating temp table on top of input_df with the name TABLE1
input_df.createOrReplaceTempView("TABLE1")

步骤#4:使用临时表-TABLE1和数组列-coll根据输入结构准备选择语句

select cast(col[0] as bigint) as cust_id, col[1] as first_name, col[2] as last_name, cast(col[3] as decimal(18,6)) as amount, col[4] as city from table1

步骤#5:从文件中读取select语句并执行它以生成输出

//Reading the selectColString, remember we are reading only the first row from the file
//Select SQL should be only one row in the selectColString.txt file
val sqlColString = spark.sparkContext.textFile(selectColString_location).first().toString()
//Generating the output using the colString
val output_df = spark.sql(sqlColString)

步骤#6:将输出写入拼花

output_df.write.mode(SaveMode.Overwrite).parquet(output_location)

输出拼花架构

root
 |-- cust_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- amount: decimal(18,6) (nullable = true)
 |-- city: string (nullable = true)

通过这个程序,我们可以根据输入的文本修改selectColString文件,将所有文本文件转换为拼花地板。

Github代码链接:https://github.com/sangamgavini/ReusableCodes/tree/master/src/main/scala/com/sangam/TexttoParquet

松鸣
2023-03-14

在分析了你的问题陈述后,我做出以下假设,

1. data source can be anything, primarily HDFS
2. delimiter can be anything
3. you're maintaining  structure for each source. 
4. file does not contains header

建议:这里的问题是,如果您的数据不包含头,则必须生成StructType。想出一些结构可能是json结构来定义您的数据源。然后使用scala使用jackson加载和解析json。或者只需将column\u map传递给您的程序。

Example: 
{
    "inputLocation": "",
    "delimiter" : ",",
    "column_map" : "col1, datatype; col12, datatype;col1, datatype; col12, datatype"
    "outputLocation": ""
}

现在,使用column\u map动态生成结构类型。

object GenerateStructType {

  import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}

  def generateStrucType(columnsList: Seq[String]): StructType = {

    val res=columnsList.map( columnDetail => {
      val  columnName = columnDetail.split(",")(0).trim
      val  columnType = columnDetail.split(",")(1).trim

      columnType match {
        case "String" => StructField(columnName,StringType,true)
        case "Bool" => StructField(columnName,BooleanType,true)
        case _ => StructField(columnName,StringType,true)

      }
    })
    StructType(res)
  }

  def main(args: Array[String]): Unit = {
    val columnMap=  "col1, datatype; col12, datatype;col1, datatype; col12, datatype"

    val result= GenerateStructType.generateStrucType(    columnMap.split(";"))
    println(result)
  }

}

动态生成的StructType:

StructType(StructField(col1,StringType,true), StructField(col12,StringType,true), StructField(col1,StringType,true), StructField(col12,StringType,true))

加载数据时使用结构类型。

希望这有帮助......

 类似资料:
  • 我有以下代码,它从Marketo系统中获取一些数据 这将返回给我以下数据 我想做的是,保存这个返回到一个拼花文件。但是当我用下面的代码尝试时,我收到了一条错误消息。 我做错了什么?

  • 我是Spark的新手。我尝试在本地模式(windows)下使用spark java将csv文件保存为parquet。我得到了这个错误。 原因:org.apache.spark.Spark异常:写入行时任务失败 我引用了其他线程并禁用了spark推测 set("spark.speculation "," false ") 我还是会出错。我在csv中只使用了两个专栏进行测试。 输入: 我的代码: 请帮

  • 我们有一个以红移方式处理数据的用例。但我想在S3中创建这些表的备份,以便使用Spectrum查询这些表。 为了将表从Redshift移动到S3,我使用了一个胶水ETL。我已经为AWS红移创建了一个爬虫程序。胶水作业将数据转换为拼花地板,并将其存储在S3中,按日期进行分区。然后,另一个爬虫会对S3文件进行爬行,以再次对数据进行编目。 如何消除第二个爬虫并在作业本身中执行此操作?

  • 我正在从Impala迁移到SparkSQL,使用以下代码读取一个表: 我如何调用上面的SparkSQL,这样它就可以返回这样的东西:

  • 则错误如下: AttributeError:“property”对象没有属性“parquet”

  • 我有AWS胶水ETL作业,每15分钟运行一次,每次在S3中生成一个拼花文件。 我需要创建另一个作业来运行每小时结束,以使用AWS Glue ETL pyspark代码将S3中的所有4个拼花文件合并为1个拼花文件。 有人试过吗?建议和最佳做法? 提前感谢!