Flink中通过MemoryManager来管理内存。
在MemoryManager中,根据要管理的内存的总量和和每个内存页的大小得到内存页的数量生成相应大小数量的内存页来作为可以使用的内存。
public MemoryManager(long memorySize, int numberOfSlots, int pageSize,
MemoryType memoryType, boolean preAllocateMemory) {
// sanity checks
if (memoryType == null) {
throw new NullPointerException();
}
if (memorySize <= 0) {
throw new IllegalArgumentException("Size of total memory must be positive.");
}
if (pageSize < MIN_PAGE_SIZE) {
throw new IllegalArgumentException("The page size must be at least " + MIN_PAGE_SIZE + " bytes.");
}
if (!MathUtils.isPowerOf2(pageSize)) {
throw new IllegalArgumentException("The given page size is not a power of two.");
}
this.memoryType = memoryType;
this.memorySize = memorySize;
this.numberOfSlots = numberOfSlots;
// assign page size and bit utilities
this.pageSize = pageSize;
this.roundingMask = ~((long) (pageSize - 1));
final long numPagesLong = memorySize / pageSize;
if (numPagesLong > Integer.MAX_VALUE) {
throw new IllegalArgumentException("The given number of memory bytes (" + memorySize
+ ") corresponds to more than MAX_INT pages.");
}
this.totalNumPages = (int) numPagesLong;
if (this.totalNumPages < 1) {
throw new IllegalArgumentException("The given amount of memory amounted to less than one page.");
}
this.allocatedSegments = new HashMap<Object, Set<MemorySegment>>();
this.isPreAllocated = preAllocateMemory;
this.numNonAllocatedPages = preAllocateMemory ? 0 : this.totalNumPages;
final int memToAllocate = preAllocateMemory ? this.totalNumPages : 0;
switch (memoryType) {
case HEAP:
this.memoryPool = new HybridHeapMemoryPool(memToAllocate, pageSize);
break;
case OFF_HEAP:
if (!preAllocateMemory) {
LOG.warn("It is advisable to set 'taskmanager.memory.preallocate' to true when" +
" the memory type 'taskmanager.memory.off-heap' is set to true.");
}
this.memoryPool = new HybridOffHeapMemoryPool(memToAllocate, pageSize);
break;
default:
throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
}
LOG.debug("Initialized MemoryManager with total memory size {}, number of slots {}, page size {}, " +
"memory type {}, pre allocate memory {} and number of non allocated pages {}.",
memorySize,
numberOfSlots,
pageSize,
memoryType,
preAllocateMemory,
numNonAllocatedPages);
}
看到MemoryManager的构造方法可以很清楚的看到各种参数在内存管理中的使用。
private final MemoryType memoryType;
MemoryType参数代表当前这个内存管理中的内存的存储形式,其中分为堆内内存和堆外内存 ,在一个只能存在一个形式的内存。
private final long memorySize;
memorySize代表当前内存管理中的管理的内存总量。
private final int pageSize;
pageSize则是内存管理当中每个分页的内存大小。
这个参数的大小存在默认值和最小值。
public static final int DEFAULT_PAGE_SIZE = 32 * 1024;
/** The minimal memory page size. Currently set to 4 KiBytes. */
public static final int MIN_PAGE_SIZE = 4 * 1024;
默认是32kb,最小是每个分页4kb。
private final int totalNumPages;
totalNumPages则是分页总量,也是上方两个参数相除的结果。
private final boolean isPreAllocated;
这个参数代表内存管理的参数是否预先分配,如果为true,其所管理的内存将会在内存管理的构造方法中就预先将所有需要管理的内存总量申请,否则将会在需要用到的时候才会申请新的内存。如果该内存管理管理的是堆外内存,那么官方建议开启预分配。
private int numNonAllocatedPages;
这个参数代表该内存管理尚未申请的内存页数,如果开启了预分配,那么这个参数就一直是0。
在清楚了上述的参数之后,构造方法的逻辑也很清晰,根据内存管理所需要管理的内存总量和每个分页的内存大小,计算所需要的分页数量,根据内存管理中的内存存储形式选择生成对应的内存池。
内存管理器的构造函数如上。
外部的类可以通过allocatePages()方法获取相应的内存。
public List<MemorySegment> allocatePages(Object owner, int numPages) throws MemoryAllocationException {
final ArrayList<MemorySegment> segs = new ArrayList<MemorySegment>(numPages);
allocatePages(owner, segs, numPages);
return segs;
}
owner则是申请内存的对象,这里会初始化一个数组,这个数组的存储对象就是owner通过内存管理器所申请到的内存片。
public void allocatePages(Object owner, List<MemorySegment> target, int numPages)
throws MemoryAllocationException {
// sanity check
if (owner == null) {
throw new IllegalArgumentException("The memory owner must not be null.");
}
// reserve array space, if applicable
if (target instanceof ArrayList) {
((ArrayList<MemorySegment>) target).ensureCapacity(numPages);
}
// -------------------- BEGIN CRITICAL SECTION -------------------
synchronized (lock) {
if (isShutDown) {
throw new IllegalStateException("Memory manager has been shut down.");
}
// in the case of pre-allocated memory, the 'numNonAllocatedPages' is zero, in the
// lazy case, the 'freeSegments.size()' is zero.
if (numPages > (memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)) {
throw new MemoryAllocationException("Could not allocate " + numPages + " pages. Only " +
(memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)
+ " pages are remaining.");
}
Set<MemorySegment> segmentsForOwner = allocatedSegments.get(owner);
if (segmentsForOwner == null) {
segmentsForOwner = new HashSet<MemorySegment>(numPages);
allocatedSegments.put(owner, segmentsForOwner);
}
if (isPreAllocated) {
for (int i = numPages; i > 0; i--) {
MemorySegment segment = memoryPool.requestSegmentFromPool(owner);
target.add(segment);
segmentsForOwner.add(segment);
}
}
else {
for (int i = numPages; i > 0; i--) {
MemorySegment segment = memoryPool.allocateNewSegment(owner);
target.add(segment);
segmentsForOwner.add(segment);
}
numNonAllocatedPages -= numPages;
}
}
// -------------------- END CRITICAL SECTION -------------------
}
这里内存管理器为了线程安全申请了锁,之后确认申请的内存页数量小于内存池中的最大内存页数量,之后获取内存池中关于该对象已经占据的内存页数组,如果没有则重新建立一个。
之后如果该内存管理器是预分配的,那么会直接从内存池中去取得相应的内存片,如果不是预分配,则还要临时申请并返回。
有申请也必定会有释放。
public void release(MemorySegment segment) {
// check if segment is null or has already been freed
if (segment == null || segment.getOwner() == null) {
return;
}
final Object owner = segment.getOwner();
// -------------------- BEGIN CRITICAL SECTION -------------------
synchronized (lock) {
// prevent double return to this memory manager
if (segment.isFreed()) {
return;
}
if (isShutDown) {
throw new IllegalStateException("Memory manager has been shut down.");
}
// remove the reference in the map for the owner
try {
Set<MemorySegment> segsForOwner = this.allocatedSegments.get(owner);
if (segsForOwner != null) {
segsForOwner.remove(segment);
if (segsForOwner.isEmpty()) {
this.allocatedSegments.remove(owner);
}
}
if (isPreAllocated) {
// release the memory in any case
memoryPool.returnSegmentToPool(segment);
}
else {
segment.free();
numNonAllocatedPages++;
}
}
catch (Throwable t) {
throw new RuntimeException("Error removing book-keeping reference to allocated memory segment.", t);
}
}
// -------------------- END CRITICAL SECTION -------------------
}
释放也需要加锁,参数则为要释放的目标内存页。
首先根据该内存页从内存池中取获取拥有该内存页的owner,从该owner所拥有的内存页数组中移出该页。
之后,如果是预分配,则将该内存页放回到内存池中,否则直接释放。
内存池的实现分为实现堆外内存的HybirdOffHeapMemoryPool和堆内内存的HybirdOnHeapMemoryPool。
以HybirdOffHeapMemory为例子,可以看到其构造方法。
HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
this.availableMemory = new ArrayDeque<>(numInitialSegments);
this.segmentSize = segmentSize;
for (int i = 0; i < numInitialSegments; i++) {
this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
}
}
内存片通过一个队列来存储,在构造方法中根据所要初始化的内存页数量来申请相应数量的ByteBuffer。
在其获取内存的方法实现requestSegmentFromPool()。
@Override
MemorySegment requestSegmentFromPool(Object owner) {
ByteBuffer buf = availableMemory.remove();
return MemorySegmentFactory.wrapPooledOffHeapMemory(buf, owner);
}
实现很简单,直接从队列头部获取相应的内存片,包装成相应的包装类返回。
同时,也有返回内存至内存池的returnSegmentPool()方法。
@Override
void returnSegmentToPool(MemorySegment segment) {
if (segment.getClass() == HybridMemorySegment.class) {
HybridMemorySegment hybridSegment = (HybridMemorySegment) segment;
ByteBuffer buf = hybridSegment.getOffHeapBuffer();
availableMemory.add(buf);
hybridSegment.free();
}
else {
throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
}
}
这里取得得到原本内存中管理的ByteBuffer,并加入到队列末端。释放在取出内存的时候的包装类。