MemoryManager存储管理器是 StaticMemoryManager静态存储管理器和UnifiedMemoryManager动态存储管理器的父类,
在MemoryManager为堆内、堆外存分配了空间,空间大小会在继承类中定义。并提供了获取和释放存储空间、执行空间、unroll空间的方法。
为堆内外的存储和执行创建内存池
@GuardedBy("this")
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
增加堆内的存储空间,增加大小为onHeapStorageMemory,具体大小根据StaticMemoryManager静态存储管理器和UnifiedMemoryManager动态存储管理器而不同
增加堆内的执行空间,增加大小为onHeapExecutionMemory,具体大小根据StaticMemoryManager静态存储管理器和UnifiedMemoryManager动态存储管理器而不同
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
创建堆外的存储空间和执行空间
protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
获取内存、与释放内存
其中unroll memory用以缓存iterator形式的block数据
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
*
* @return whether all N bytes were successfully granted.
*/
//获取numBytes字节内存存放指定的块,失败则返回false
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
*
* This extra method allows subclasses to differentiate behavior between acquiring storage
* memory and acquiring unroll memory. For instance, the memory management model in Spark
* 1.5 and before places a limit on the amount of space that can be freed from unrolling.
*
* @return whether all N bytes were successfully granted.
*/
//获取UnrollMemory中numBytes字节大小的内存块存放指定的块
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
* number of bytes obtained, or 0 if none can be allocated.
*
* This call may block until there is enough free memory in some situations, to make sure each
* task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
* but an older task had a lot of memory already.
*/
//获取numBytes字节大小的运行内存用以执行任务
private[memory]
def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long
/**
* Release numBytes of execution memory belonging to the given task.
*/
//释放属于给定任务的执行内存数量
private[memory]
def releaseExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Unit = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
}
}
/**
* Release all memory for the given task and mark it as inactive (e.g. when a task ends).
*
* @return the number of bytes freed.
*/
//释放给定任务的所有内存并将其标记为非活动,并返回释放的字节数
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}
/**
* Release N bytes of storage memory.
*/
//释放n字节的存储内存
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
}
}