我的项目中有一个场景,我正在使用spark-sql-2.4.1版本阅读Kafka主题消息。我能够使用结构化流媒体处理一天。一旦收到数据并进行处理后,我需要将数据保存到hdfs存储中的各个拼花文件中。
我能够存储和读取拼花文件,我保持了15秒到1分钟的触发时间。这些文件的大小非常小,因此会产生许多文件。
这些拼花地板文件需要稍后通过配置单元查询读取。
那么1)该策略在生产环境中有效吗?还是会导致以后出现小文件问题?
2) 处理/设计此类场景(即行业标准)的最佳实践是什么?
3) 这些事情在生产中通常是如何处理的?
谢谢你。
这是一个常见的火花流问题,没有任何固定的答案。我采用了一种基于append思想的非传统方法。当您使用spark 2.4.1时,此解决方案将非常有用。
因此,如果支持像parket或orc这样的柱状文件格式的append,那会更容易,因为新数据可以附加在相同的文件中,并且每个微批处理后文件大小会越来越大。但是,由于不支持,我采用了版本控制方法来实现这一点。在每个微批处理之后,数据都会使用版本分区生成。例如。
/prod/mobility/cdr_data/date=01–01–2010/version=12345/file1.parquet
/prod/mobility/cdr_data/date=01–01–2010/version=23456/file1.parquet
我们所能做的是,在每个微批中,读取旧版本数据,将其与新的流数据合并,然后在新版本的相同路径上再次写入。然后,删除旧版本。这样,在每个微批处理之后,每个分区中都会有一个版本和一个文件。每个分区中的文件大小将继续增长并变得更大。
由于不允许流式数据集和静态数据集的联合,我们可以使用forEachBatch sink(spark中提供
我已经在链接中描述了如何以最佳方式实现这一点。你可能想看看。https://medium.com/@库马尔。拉胡尔。nitk/solving-small-file-problem-in-spark-structured-streaming-a-versioning-approach-73a0153a0a
我们也有类似的问题。在谷歌上搜索了很多之后,似乎普遍接受的方法是编写另一份工作,该工作经常聚合许多小文件,并将它们写入更大的合并文件中的其他地方。这就是我们现在所做的。
顺便说一句:无论如何,您在这里可以做的事情是有限的,因为您拥有的并行性越多,文件的数量就越多,因为每个执行线程都写入自己的文件。他们从不写入共享文件。这似乎是并行处理野兽的本质。
我知道这个问题太老了。我也有类似的问题
我的用例是从Kafka获取数据
下面的代码将获取前一个小时的分区数据,应用重新分区
val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()
session.streams.addListener(AppListener(config,session))
class AppListener(config: Config,spark: SparkSession) extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
this.synchronized {AppListener.mergeFiles(event.progress.timestamp,spark,config)}
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}
object AppListener {
def mergeFiles(currentTs: String,spark: SparkSession,config:Config):Unit = {
val configs = config.kafka(config.key.get)
if(currentTs.datetime.isAfter(Processed.ts.plusMinutes(5))) {
println(
s"""
|Current Timestamp : ${currentTs}
|Merge Files : ${Processed.ts.minusHours(1)}
|
|""".stripMargin)
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val ts = Processed.ts.minusHours(1)
val hdfsPath = s"${configs.hdfsLocation}/year=${ts.getYear}/month=${ts.getMonthOfYear}/day=${ts.getDayOfMonth}/hour=${ts.getHourOfDay}"
val path = new Path(hdfsPath)
if(fs.exists(path)) {
val hdfsFiles = fs.listLocatedStatus(path)
.filter(lfs => lfs.isFile && !lfs.getPath.getName.contains("_SUCCESS"))
.map(_.getPath).toList
println(
s"""
|Total files in HDFS location : ${hdfsFiles.length}
| ${hdfsFiles.length > 1}
|""".stripMargin)
if(hdfsFiles.length > 1) {
println(
s"""
|Merge Small Files
|==============================================
|HDFS Path : ${hdfsPath}
|Total Available files : ${hdfsFiles.length}
|Status : Running
|
|""".stripMargin)
val df = spark.read.format(configs.writeFormat).load(hdfsPath).cache()
df.repartition(1)
.write
.format(configs.writeFormat)
.mode("overwrite")
.save(s"/tmp${hdfsPath}")
df.cache().unpersist()
spark
.read
.format(configs.writeFormat)
.load(s"/tmp${hdfsPath}")
.write
.format(configs.writeFormat)
.mode("overwrite")
.save(hdfsPath)
Processed.ts = Processed.ts.plusHours(1).toDateTime("yyyy-MM-dd'T'HH:00:00")
println(
s"""
|Merge Small Files
|==============================================
|HDFS Path : ${hdfsPath}
|Total files : ${hdfsFiles.length}
|Status : Completed
|
|""".stripMargin)
}
}
}
}
def apply(config: Config,spark: SparkSession): AppListener = new AppListener(config,spark)
}
object Processed {
var ts: DateTime = DateTime.now(DateTimeZone.forID("UTC")).toDateTime("yyyy-MM-dd'T'HH:00:00")
}
有时数据是巨大的
val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
val dataSize = bytes.toLong
val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt
df.repartition(if(numPartitions == 0) 1 else numPartitions)
.[...]
编辑-1
使用此-spark。会话状态。executePlan(df.queryExecution.logical)。优化计划。stats(spark.sessionState.conf)。sizeInBytes一旦数据帧加载到内存中,我们就可以得到它的实际大小,例如,您可以检查下面的代码。
scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]
scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils
scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709
scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB
scala> import sys.process._
import sys.process._
scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r----- 3 svcmxns hdfs 0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r----- 3 svcmxns hdfs 727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0
我试图从kafka主题获取数据并将其推送到hdfs位置。我面临以下问题。 在每条消息(kafka)之后,hdfs位置都会更新为带有.c000.csv格式的部分文件。我已经在HDFS位置的顶部创建了一个hive表,但是HIVE无法读取从火花结构化流写入的任何数据。 以下是spark结构化流媒体之后的文件格式 以下是我要插入的代码: 谁能帮帮我,为什么要创建这样的文件? 如果我执行dfs-cat/pa
我正在使用Spark结构化流媒体;我的DataFrame具有以下架构 如何使用Parquet格式执行writeStream并写入数据(包含zoneId、deviceId、TimesInclast;除日期外的所有内容)并按日期对数据进行分区?我尝试了以下代码,但partition by子句不起作用
我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。
我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\
背景:我写了一个简单的spark结构化蒸app,把数据从Kafka搬到S3。我发现,为了支持一次准确的保证,spark创建了_spark_metadata文件夹,但该文件夹最终变得太大,当流应用程序运行很长时间时,元数据文件夹变得太大,以至于我们开始出现OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,自己管理偏移量。 我们如何管理Spark Streaming中的偏移量:我使用了va
我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。