序列化在分布式应用的性能中扮演重要角色,提供两种序列化:
ObjectOutputStream
framework, and can work with any class you create that implements java.io.Serializable
. You can also control the performance of your serialization more closely by extendingjava.io.Externalizable
. Java serialization is flexible but often quite slow, and leads to large serialized formats for many classes.Serializable
types and requires you to register the classes you’ll use in the program in advance for best performance. You can switch to using Kryo by initializing your job with a SparkConf and calling conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
. This setting configures the serializer used for not only shuffling data between worker nodes but also when serializing RDDs to disk. The only reason Kryo is not the default is because of the custom registration requirement, but we recommend trying it in any network-intensive application. Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.
To register your own custom classes with Kryo, use the registerKryoClasses
method.
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
The Kryo documentation describes more advanced registration options, such as adding custom serialization code.
If your objects are large, you may also need to increase the spark.kryoserializer.buffer
config. This value needs to be large enough to hold the largest object you will serialize.
Finally, if you don’t register your custom classes, Kryo will still work, but it will have to store the full class name with each object, which is wasteful.
spark.memory.fraction
标识(default 0.6)JVM heap space。剩下的40%为用户数据结构,spark内部元数据使用,保障减少OOM errorsspark.memory.storageFraction
标识R
的size(default 0.5),用于cached blocks ,不被execution抢占 dataset的amount衡量,建立一个RDD并将它cache,在web UI.的“Storage” page查看
预估一个特定object的内存占用,SizeEstimator
’s estimate
method
减少内存消耗,尽量避免 指针类数据和包装器对象:
groupByKey
and reduceByKey
)会根据输入RDD(父节点RDD最多的分区数)决定,可以在第2个参数中配置并行度。一般建议2-3 tasks per CPU core
sortByKey
, groupByKey
, reduceByKey
, join
, etc) 会为每个task建一个hash table ,hash table 通常很大。简单解决这个问题,可以增加并发,这样每个task的输入集将减少而tasks间在executor中是共享JVM的,task 的launching代价小。可以设多于cores的并发度
可以用broadcast functionality available in SparkContext减小serialized task的大小,这是job间的一个重要消耗。
如果tasks用到了driver program的一些大的对象如static table,应考虑将他们broadcast,大于 20 KB的task 值得优化
Data locality是jobs性能的重要影响因素,data and the code 在一起计算将更快,通常将序列化code的搬移快于数据,Spark 基于此原则调度:
PROCESS_LOCAL
data is in the same JVM as the running code. This is the best locality possibleNODE_LOCAL
data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL
because the data has to travel between processesNO_PREF
data is accessed equally quickly from anywhere and has no locality preferenceRACK_LOCAL
data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switchANY
data is elsewhere on the network and not in the same rack
spark.locality
parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well. 基于上原则,若本地的CPU忙则等待,超等待时间则数据搬移到他处执行,当发现程序执行特慢并发现是数据搬移导致,可以适当变更spark.locality
参数(on the configuration page),通常是不需要最调整的。
Feel free to ask on the Spark mailing list about other tuning best practices.