Cigna优化Spark Streaming实时处理应用
1 框架一览
事件处理的架构图如下所示。
2 优化总结
当我们第一次部署整个方案时,kafka
和flume
组件都执行得非常好,但是spark streaming
应用需要花费4-8分钟来处理单个batch
。这个延迟的原因有两点,一是我们使用DataFrame
来强化数据,而强化数据需要从hive
中读取大量的数据;
二是我们的参数配置不理想。
为了优化我们的处理时间,我们从两方面着手改进:第一,缓存合适的数据和分区;第二,改变配置参数优化spark应用。运行spark应用的spark-submit
命令如下所示。通过参数优化和代码改进,我们显著减少了处理时间,处理时间从4-8分钟降到了低于25秒。
/opt/app/dev/spark-1.5.2/bin/spark-submit
--jars
/opt/cloudera/parcels/CDH/jars/zkclient-0.3.jar,/opt/cloudera/parcels/CDH/jars/kafka_2.10-0.8.1.1.jar,
/opt/app/dev/jars/datanucleus-core-3.2.2.jar,/opt/app/dev/jars/datanucleus-api-jdo-3.2.1.jar,/opt/app/dev/jars/datanucleus-rdbms-3.2.1.jar
--files /opt/app/dev/spark-1.5.2/conf/hive-site.xml,/opt/app/dev/jars/log4j-eir.properties
--queue spark_service_pool
--master yarn
--deploy-mode cluster
--conf "spark.ui.showConsoleProgress=false"
--conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties"
--conf "spark.sql.tungsten.enabled=false"
--conf "spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory"
--conf "spark.eventLog.enabled=true"
--conf "spark.sql.codegen=false"
--conf "spark.sql.unsafe.enabled=false"
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties"
--conf "spark.streaming.backpressure.enabled=true"
--conf "spark.locality.wait=1s"
--conf "spark.streaming.blockInterval=1500ms"
--conf "spark.shuffle.consolidateFiles=true"
--driver-memory 10G
--executor-memory 8G
--executor-cores 20
--num-executors 20
--class com.bigdata.streaming.OurApp /opt/app/dev/jars/OurStreamingApplication.jar external_props.conf
下面我们将详细介绍这些改变的参数。
2.1 driver选项
这里需要注意的是,driver
运行在spark on yarn
的集群模式下。因为spark streaming
应用是一个长期运行的任务,生成的日志文件会很大。为了解决这个问题,我们限制了写入日志的消息的条数,
并且用RollingFileAppender
限制了它们的大小。我们也关闭了spark.ui.showConsoleProgress
选项来禁用控制台日志消息。
通过测试,我们的driver
因为永久代空间填满而频繁发生内存耗尽(永久代空间是类、方法等存储的地方,不会被重新分配)。将永久代空间的大小升高到6G可以解决这个问题。
spark.driver.extraJavaOptions=-XX:MaxPermSize=6G
2.2 垃圾回收
因为我们的spark streaming
应用程序是一个长期运行的进程,在处理一段时间之后,我们注意到GC
暂停时间过长,我们想在后台减少或者保持这个时间。调整UseConcMarkSweepGC
参数是一个技巧。
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties"
2.3 禁用Tungsten
Tungsten
是spark
执行引擎主要的改进。但是它的第一个版本是有问题的,所以我们暂时禁用它。
spark.sql.tungsten.enabled=false
spark.sql.codegen=false
spark.sql.unsafe.enabled=false
2.4 启用反压
Spark Streaming
在批处理时间大于批间隔时间时会出现问题。换一句话说,就是spark
读取数据的速度慢于kafka
数据到达的速度。如果按照这个吞吐量执行过长的时间,它会造成不稳定的情况。
即接收executor
的内存溢出。设置下面的参数解决这个问题。
spark.streaming.backpressure.enabled=true
2.5 调整本地化和块配置
下面的两个参数是互补的。一个决定了数据本地化到task
或者executor
等待的时间,另外一个被spark streaming receiver
使用对数据进行组块。块越大越好,但是如果数据没有本地化到executor
,它将会通过网络移动到
任务执行的地方。我们必须在这两个参数间找到一个好的平衡,因为我们不想数据块太大,并且也不想等待本地化太长时间。我们希望所有的任务都在几秒内完成。
因此,我们改变本地化选项从3s到1s,我们也改变块间隔为1.5s。
--conf "spark.locality.wait=1s"
--conf "spark.streaming.blockInterval=1500ms"
2.6 合并临时文件
在ext4
文件系统中,推荐开启这个功能。因为这会产生更少的临时文件。
--conf "spark.shuffle.consolidateFiles=true"
2.7 开启executor配置
在你配置kafka Dstream
时,你能够指定并发消费线程的数量。然而,kafka Dstream
的消费者会运行在相同的spark driver
节点上面。因此,为了从多台机器上面并行消费kafka topic
,
我们必须实例化多个Dstream
。虽然可以在处理之前合并相应的RDD
,但是运行多个应用程序实例,把它们都作为相同kafka consumer group
的一部分。
为了达到这个目的,我们设置20个executor
,并且每个executor
有20个核。
--executor-memory 8G
--executor-cores 20
--num-executors 20
2.8 缓存方法
使用RDD
之前缓存RDD
,但是记住在下次迭代之前从缓存中删除它。缓存那些需要使用多次的数据非常有用。然而,不要使分区数目过大。保持分区数目较低可以减少,最小化调度延迟。下面的公式是我们使用的分区数的计算公式。
# of executors * # of cores = # of partitions
参考文献
【1】How Cigna Tuned Its Spark Streaming App for Real-time Processing with Apache Kafka