当前位置: 首页 > 知识库问答 >
问题:

持续火花流输出

莫河
2023-03-14

我正在从一个消息应用程序收集数据,我目前正在使用Flume,它每天发送大约5000万条记录

我希望使用Kafka,使用Spark Streaming从Kafka消费并将其持久化到hadoop并使用impala进行查询

我尝试的每种方法都有问题。。

方法1-将RDD另存为parquet,将外部配置单元parquet表指向parquet目录

// scala
val ssc =  new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {

    // 1 - Create a SchemaRDD object from the rdd and specify the schema
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)

    // 2 - register it as a spark sql table
    SchemaRDD1.registerTempTable("sparktable")

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
    val finalParquet = sqlContext.sql(sql)
    finalParquet.saveAsParquetFile(dir)

问题是finalParquet.saveAsParquetFile输出大量文件,从Kafka收到的Dstream以1分钟的批处理大小输出超过200个文件。它输出许多文件的原因是因为计算是分布式的,如另一篇文章所述-如何使saveAsTextFile不将输出拆分为多个文件?

然而,支持的解决方案对我来说似乎不是最优的,例如,正如一个用户所说,只有在数据很少的情况下,才有一个单独的输出文件。

方法2-使用HiveContext。将RDD数据直接插入hive表

# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)

def sendRecord(rdd):

  sql = "INSERT INTO TABLE table select * from beacon_sparktable"

  # 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
  beaconDF = sqlContext.jsonRDD(rdd,schema)

  # 2- Register the DataFrame as a spark sql table.
  beaconDF.registerTempTable("beacon_sparktable")

  # 3 - insert to hive directly from a qry on the spark sql table
  sqlContext.sql(sql);

这可以很好地工作,它直接插入到拼花地板表中,但由于处理时间超过了批次间隔时间,批次的调度会出现延迟。消费者跟不上生产的产品,要加工的批次开始排队。

向蜂巢写信似乎很慢。我尝试调整批处理间隔大小,运行更多的消费者实例。

考虑到多个文件存在问题以及写入hive的潜在延迟,保存Spark流媒体中的大数据的最佳方法是什么?其他人在做什么?

这里也有人问过类似的问题,但他对目录有一个问题,因为目录中有太多的文件,如何让Spark Streaming写入其输出,以便黑斑羚能够读取?

非常感谢您的帮助

共有2个答案

叶修永
2023-03-14

我认为小文件问题可以在一定程度上得到解决。您可能会收到大量基于kafka分区的文件。对我来说,我有12个分区Kafka主题,我使用spark.write.mode("append")编写。parket("/位置/on/hdfs")

现在,根据您的需求,您可以添加或更多以减少文件数量。另一种选择是增加微批次持续时间。例如,如果您可以接受写入日延迟5分钟,则可以使用300秒的微批处理。

对于第二个问题,批次排队只是因为没有启用背压。首先,您应该验证在单个批次中可以处理的最大值是多少。一旦你能绕过这个数字,你就可以设置火花。流动。Kafka。maxRatePerPartition值和spark。流动。背压。enabled=true以启用每个微批次的有限记录数。如果您仍然无法满足需求,那么唯一的选择就是增加主题上的分区,或者增加spark应用html" target="_blank">程序上的资源。

洪博涛
2023-03-14

在解决方案#2中,可以通过每个RDD的分区数来控制创建的文件数。

请参见此示例:

// create a Hive table (assume it's already existing)
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET")

// create a RDD with 2 records and only 1 partition
val rdd = sc.parallelize(List( List(1, "hello"), List(2, "world") ), 1)

// create a DataFrame from the RDD
val schema = StructType(Seq(
 StructField("id", IntegerType, nullable = false),
 StructField("txt", StringType, nullable = false)
))
val df = sqlContext.createDataFrame(rdd.map( Row(_:_*) ), schema)

// this creates a single file, because the RDD has 1 partition
df.write.mode("append").saveAsTable("test")

现在,我想您可以使用从Kafka中提取数据的频率,以及每个RDD的分区数量(默认情况下,您的Kafka主题的分区,您可以通过重新分区来减少)。

我使用的是CDH 5.5.1中的Spark 1.5,我使用df得到了相同的结果。写模式(“追加”)。saveAsTable(“test”)或您的SQL字符串。

 类似资料:
  • 更新:为了迭代支持,我不得不转向Flink流。不过还是会和Kafka试试看!

  • 我正在EMR EMR-4.3.0上运行一个spark应用程序,有1个主机和4个节点 它们每一个都有5GB内存和2个核心。 最后Yarn杀死了应用程序主人 错误ApplicationMaster:接收信号15:SIGTERM 1)我是否可以进一步改进num-executors和executor-core的spark-submit选项。

  • 我试图从聚合原理的角度来理解火花流。Spark DF 基于迷你批次,计算在特定时间窗口内出现的迷你批次上完成。 假设我们有数据作为- 然后首先对Window_period_1进行计算,然后对Window_period_2进行计算。如果我需要将新的传入数据与历史数据一起使用,比如说Window_priod_new与Window_pperid_1和Window_perid_2的数据之间的分组函数,我该

  • 如果spark streaming在10秒的批处理间隔中获得50行消息,并且在40.5行消息之后,这10秒就结束了,剩下的时间落入另一个10秒的间隔中,前40.5行的文本是一个RDD被首先处理,在我的用例中,前40行是有意义的,但是下一个。5行没有意义,第二个RDD首先也是这样。5行,我的问题是否有效?。请提供建议如何处理这个问题?。 谢谢比尔。

  • null 触发器是否支持一次追加模式? 这里有一个最小的应用程序来再现这个问题。要旨

  • 我正在使用火花流,我从Kafka读取流。阅读此流后,我将其添加到hazelcast地图中。 问题是,我需要在读取Kafka的流之后立即从地图中过滤值。 我正在使用下面的代码来并行化地图值。 但在这个逻辑中,我在另一个逻辑中使用JavaRDD,即JavaInputDStream.foreachRDD,这会导致序列化问题。 第一个问题是,如何通过事件驱动来运行spark作业? 另一方面,我只是想得到一