当前位置: 首页 > 知识库问答 >
问题:

Spark结构化流媒体中的拼花数据和分割问题

周祺
2023-03-14

我正在使用Spark结构化流媒体;我的DataFrame具有以下架构

root 
 |-- data: struct (nullable = true) 
 |    |-- zoneId: string (nullable = true) 
 |    |-- deviceId: string (nullable = true) 
 |    |-- timeSinceLast: long (nullable = true) 
 |-- date: date (nullable = true) 

如何使用Parquet格式执行writeStream并写入数据(包含zoneId、deviceId、TimesInclast;除日期外的所有内容)并按日期对数据进行分区?我尝试了以下代码,但partition by子句不起作用

val query1 = df1 
  .writeStream 
  .format("parquet") 
  .option("path", "/Users/abc/hb_parquet/data") 
  .option("checkpointLocation", "/Users/abc/hb_parquet/checkpoint") 
  .partitionBy("data.zoneId") 
  .start() 

共有2个答案

於宾白
2023-03-14

我认为您应该尝试方法重新分区,它可以接受两种参数:

  • 列名
  • 所需分区数

我建议使用重新分区(“日期”)按日期对数据进行分区。

这是一个很好的主题链接:https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

澹台成龙
2023-03-14

如果要按日期进行分区,则必须在方法中使用它。

val query1 = df1 
  .writeStream 
  .format("parquet") 
  .option("path", "/Users/abc/hb_parquet/data") 
  .option("checkpointLocation", "/Users/abc/hb_parquet/checkpoint") 
  .partitionBy("date") 
  .start()

如果要对由<代码>

val df = dataset.withColumn("date", dataset.col("date").cast(DataTypes.DateType))

df.withColumn("year", functions.date_format(df.col("date"), "YYYY"))
  .withColumn("month", functions.date_format(df.col("date"), "MM"))
  .withColumn("day", functions.date_format(df.col("date"), "dd"))
  .writeStream 
  .format("parquet") 
  .option("path", "/Users/abc/hb_parquet/data") 
  .option("checkpointLocation", "/Users/abc/hb_parquet/checkpoint") 
  .partitionBy("year", "month", "day")
  .start()
 类似资料:
  • 我的项目中有一个场景,我正在使用spark-sql-2.4.1版本阅读Kafka主题消息。我能够使用结构化流媒体处理一天。一旦收到数据并进行处理后,我需要将数据保存到hdfs存储中的各个拼花文件中。 我能够存储和读取拼花文件,我保持了15秒到1分钟的触发时间。这些文件的大小非常小,因此会产生许多文件。 这些拼花地板文件需要稍后通过配置单元查询读取。 那么1)该策略在生产环境中有效吗?还是会导致以后

  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 在火花批处理作业中,我通常将JSON数据源写入文件,并可以使用DataFrame阅读器的损坏列功能将损坏的数据写入单独的位置,并使用另一个阅读器从同一作业中写入有效数据。(数据写成拼花) 但是在火花结构流中,我首先通过Kafka作为字符串读取流,然后使用from_json来获取我的数据帧。然后from_json使用JsonToSTRts,它在解析器中使用FailFast模式,并且不会将未解析的字符

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我建立了一个管道,从Kafka读取数据,使用Spark结构化流处理数据,然后将拼花文件写入HDFS。数据查询的下游客户端正在使用配置为以配置单元表的形式读取数据的Presto。 Kafka-- 一般来说,这是可行的。当Spark作业运行批处理时发生查询时,就会出现问题。Spark作业在HDFS上创建零长度拼花文件。如果Presto在处理查询的过程中试图打开此文件,则会抛出错误: 查询2017111

  • 我试图从kafka主题获取数据并将其推送到hdfs位置。我面临以下问题。 在每条消息(kafka)之后,hdfs位置都会更新为带有.c000.csv格式的部分文件。我已经在HDFS位置的顶部创建了一个hive表,但是HIVE无法读取从火花结构化流写入的任何数据。 以下是spark结构化流媒体之后的文件格式 以下是我要插入的代码: 谁能帮帮我,为什么要创建这样的文件? 如果我执行dfs-cat/pa