当前位置: 首页 > 知识库问答 >
问题:

在包含JSON数据的Parquet文件上创建配置单元表

封瑞
2023-03-14

我努力实现的目标

    < li >从源大JSON文件(employee-sample.json)获取数据 < li >一个简单的spark应用程序,将它作为文本文件读取并存储在parquet(simple-loader.java)中。我不知道JSON文件中有什么,所以我不能放入任何模式,所以我希望读取模式,而不是写入模式。创建了一个parquet文件,其中一列名为“value ”,包含JSON字符串 < li >在parquet文件上创建一个HIVE外部表,当我选择“select * from table”时,我看到一列带有JSON数据。

我真正需要的是创建一个配置单元表,它可以读取“value”列中的JSON数据,并应用schema和emit列,以便根据需要在原始数据上创建各种表。

我在JSON文件上创建了hive表,并提取了列,但这个从拼花和应用JSON模式中提取列正在欺骗我

employee-sample.json

{"name":"Dave", "age" : 30 , "DOB":"1987-01-01"}
{"name":"Steve", "age" : 31 , "DOB":"1986-01-01"}
{"name":"Kumar", "age" : 32 , "DOB":"1985-01-01"}

将JSON转换为镶木地板的简单Spark代码

simple-loader.java

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder()
            .appName(JsonToParquet.class.getName())
            .master("local[*]").getOrCreate();
    Dataset<String> eventsDataSet = sparkSession.read().textFile("D:\\dev\\employee-sample.json");
    eventsDataSet.createOrReplaceTempView("rawView");
    sparkSession.sqlContext().sql("select string(value) as value from rawView")
            .write()
            .parquet("D:\\dev\\" + UUID.randomUUID().toString());
    sparkSession.close();
}

镶木地板文件上的蜂巢表

CREATE EXTERNAL TABLE EVENTS_RAW (
VALUE STRING)
STORED AS PARQUET
LOCATION 'hdfs://XXXXXX:8020/employee/data_raw';

我尝试通过设置JSON serde,但它只有在数据存储在JSON foram,ROW FORMAT SERDE 'com.proofpoint.hive.serde.JsonSerde'时才有效。

期望格式

CREATE EXTERNAL TABLE EVENTS_DATA (
    NAME STRING,
    AGE STRING,
    DOB STRING)
??????????????????????????????

共有1个答案

东郭思远
2023-03-14

创建配置单元外部表示例:

 public static final String CREATE_EXTERNAL = "CREATE EXTERNAL TABLE %s" +
        " (%s) " +
        " PARTITIONED BY(%s) " +
        " STORED AS %s" +
        " LOCATION '%s'";
/**
 * Will create an external table and recover the partitions
 */
public void createExternalTable(SparkSession sparkSession, StructType schema, String tableName, SparkFormat format, List<StructField> partitions, String tablePath){
    String createQuery = createTableString(schema, tableName, format, partitions, tablePath);
    logger.info("Going to create External table with the following query:\n " + createQuery);
    sparkSession.sql(createQuery);
    logger.debug("Finish to create External table with the following query:\n " + createQuery);
    recoverPartitions(sparkSession, tableName);
}

public String createTableString(StructType schema, String tableName, SparkFormat format, List<StructField> partitions, String tablePath){
    Set<String> partitionNames = partitions.stream().map(struct -> struct.name()).collect(Collectors.toSet());
    String columns = Arrays.stream(schema.fields())
            //Filter the partitions
            .filter(field -> !partitionNames.contains(field.name()))
            //
            .map(HiveTableHelper::fieldToStringBuilder)
            .collect(Collectors.joining(", "));

    String partitionsString = partitions.stream().map(HiveTableHelper::fieldToStringBuilder).collect(Collectors.joining(", "));

    return String.format(CREATE_EXTERNAL, tableName, columns, partitionsString, format.name(), tablePath);
}

/**
 *
 * @param sparkSession
 * @param table
 */
public void recoverPartitions(SparkSession sparkSession, String table){
    String query = "ALTER TABLE " + table + " RECOVER PARTITIONS";
    logger.debug("Start: " + query);
    sparkSession.sql(query);
    sparkSession.catalog().refreshTable(table);
    logger.debug("Finish: " + query);
}

  public static StringBuilder fieldToStringBuilder(StructField field){
    StringBuilder sb = new StringBuilder();
    sb.append(field.name()).append( " ").append(field.dataType().simpleString());
    return sb;
}
 类似资料: