当前位置: 首页 > 知识库问答 >
问题:

Spark结构化流媒体运行期间Presto提供的“不是拼花地板文件(太小)”

颛孙炜
2023-03-14

我建立了一个管道,从Kafka读取数据,使用Spark结构化流处理数据,然后将拼花文件写入HDFS。数据查询的下游客户端正在使用配置为以配置单元表的形式读取数据的Presto。

Kafka--

一般来说,这是可行的。当Spark作业运行批处理时发生查询时,就会出现问题。Spark作业在HDFS上创建零长度拼花文件。如果Presto在处理查询的过程中试图打开此文件,则会抛出错误:

查询20171116_170937_07282_489cc失败:打开Hive拆分hdfs://namenode错误:50071/hive/仓库/表/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet(偏移量=0,长度=0):hdfs://namenode:50071/hive/仓库/表/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet不是拼花文件(太小)

文件此时确实为零字节,因此错误严格正确,但这不是我希望管道的行为。我希望能够连续写入适当的HDFS文件夹,而不会干扰Presto查询。

作业的Spark scala代码如下所示:

val FilesOnDisk = 1
Spark
  .initKafkaStream("fleet_profile_test")
  .filter(_.name.contains(job.kafkaTag))
  .flatMap(job.parser)
  .coalesce(FilesOnDisk)
  .writeStream
  .trigger(ProcessingTime("1 hours"))
  .outputMode("append")
  .queryName(job.queryName)
  .format("parquet")
  .option("path", job.outputFilesPath)
  .start()

作业在小时的顶部:00开始。该文件在HDFS上首先在:05时作为零长度文件可见。直到在:21时完全写入它才会更新,就在作业完成之前。这使得表在Presto 25%的时间内实际上无法使用。

每个文件只有500kB多一点,所以我不希望文件的物理写入需要很长时间。据我所知,拼花文件的元数据位于文件的末尾,因此编写更大文件的人会遇到更多的麻烦。

在解决这个Presto错误的同时,人们使用了哪些策略来集成Spark结构化流媒体和Presto?

共有1个答案

仲鸿风
2023-03-14

您可以尝试说服Presto(或Presto团队)忽略空文件,但这不会有帮助,因为编写文件的程序(此处为Spark)最终将刷新部分数据,并且文件将显示为部分、非空且格式不正确,因此也会导致错误。

防止Presto(或其他读取表数据的程序)看到部分文件的方法是将文件组装在不同的位置,然后原子地将文件移动到正确的位置。

 类似资料:
  • 我正在使用Spark结构化流媒体;我的DataFrame具有以下架构 如何使用Parquet格式执行writeStream并写入数据(包含zoneId、deviceId、TimesInclast;除日期外的所有内容)并按日期对数据进行分区?我尝试了以下代码,但partition by子句不起作用

  • 我试图从kafka主题获取数据并将其推送到hdfs位置。我面临以下问题。 在每条消息(kafka)之后,hdfs位置都会更新为带有.c000.csv格式的部分文件。我已经在HDFS位置的顶部创建了一个hive表,但是HIVE无法读取从火花结构化流写入的任何数据。 以下是spark结构化流媒体之后的文件格式 以下是我要插入的代码: 谁能帮帮我,为什么要创建这样的文件? 如果我执行dfs-cat/pa

  • 我试图利用火花分区。我试图做这样的事情 这里的问题每个分区都会创建大量的镶木地板文件,如果我尝试从根目录读取,则会导致读取缓慢。 为了避免这种情况,我试过 但是,这将创建每个分区中镶木地板文件的数目。现在我的分区大小不同了。因此,理想情况下,我希望每个分区都有单独的合并。然而,这看起来并不容易。我需要访问所有分区合并到一定数量并存储在单独的位置。 我应该如何使用分区来避免写入后出现许多文件?

  • 我的项目中有一个场景,我正在使用spark-sql-2.4.1版本阅读Kafka主题消息。我能够使用结构化流媒体处理一天。一旦收到数据并进行处理后,我需要将数据保存到hdfs存储中的各个拼花文件中。 我能够存储和读取拼花文件,我保持了15秒到1分钟的触发时间。这些文件的大小非常小,因此会产生许多文件。 这些拼花地板文件需要稍后通过配置单元查询读取。 那么1)该策略在生产环境中有效吗?还是会导致以后

  • 我是Spark的初学者,试图理解Spark数据帧的机制。当从csv和parquet加载数据时,我比较了spark sql dataframe上sql查询的性能。我的理解是,一旦数据加载到spark数据框中,数据的来源(csv或parquet)应该无关紧要。然而,我看到了两者之间的显著性能差异。我使用以下命令加载数据,并对其编写查询。 请解释差异的原因。

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\