在上篇文章中,我们介绍了RSet的原理,当对象引用关系变化时,都需要更新RSet。为了不影响Mutator的性能,RSet的更新通常是异步进行的,这一异步更新操作需要引入DCQS(Dirty Card Queue Set)结构。
本文将分析DCQS的原理。
JVM声明了一个全局的静态结构G1BarrierSet,其中包含两个Queue Set,DirtyCardQueueSet和G1SATBMarkQueueSet,分别用于处理DCQS和STAB。
g1BarrierSet.hpp
class G1BarrierSet: public CardTableBarrierSet {
friend class VMStructs;
private:
BufferNode::Allocator _satb_mark_queue_buffer_allocator;
BufferNode::Allocator _dirty_card_queue_buffer_allocator;
G1SATBMarkQueueSet _satb_mark_queue_set;
DirtyCardQueueSet _dirty_card_queue_set;
}
g1BarrierSet.cpp
G1BarrierSet::G1BarrierSet(G1CardTable* card_table) :
CardTableBarrierSet(make_barrier_set_assembler<G1BarrierSetAssembler>(),
make_barrier_set_c1<G1BarrierSetC1>(),
make_barrier_set_c2<G1BarrierSetC2>(),
card_table,
BarrierSet::FakeRtti(BarrierSet::G1BarrierSet)),
_satb_mark_queue_buffer_allocator(G1SATBBufferSize, SATB_Q_FL_lock),
_dirty_card_queue_buffer_allocator(G1UpdateBufferSize, DirtyCardQ_FL_lock),
_satb_mark_queue_set(),
_dirty_card_queue_set()
{}
DCQ的入队逻辑也在g1BarrierSet.cpp
void G1BarrierSet::write_ref_field_post_slow(volatile jbyte* byte) {
// In the slow path, we know a card is not young
assert(*byte != G1CardTable::g1_young_card_val(), "slow path invoked without filtering");
OrderAccess::storeload();
if (*byte != G1CardTable::dirty_card_val()) {
*byte = G1CardTable::dirty_card_val();
Thread* thr = Thread::current();
if (thr->is_Java_thread()) {
G1ThreadLocalData::dirty_card_queue(thr).enqueue(byte);
} else {
MutexLockerEx x(Shared_DirtyCardQ_lock,
Mutex::_no_safepoint_check_flag);
_dirty_card_queue_set.shared_dirty_card_queue()->enqueue(byte);
}
}
}
入队最后都会调用到DirtyCardQueue的enqueue方法,DirtyCardQueue是PtrQueue的子类,因此实际会调用PtrQueue的enqueue方法。
ptrQueue.hpp
void enqueue(void* ptr) {
if (!_active) return;
else enqueue_known_active(ptr);
}
ptrQueue.cpp
void PtrQueue::enqueue_known_active(void* ptr) {
while (_index == 0) {
handle_zero_index();
}
assert(_buf != NULL, "postcondition");
assert(index() > 0, "postcondition");
assert(index() <= capacity(), "invariant");
_index -= _element_size;
_buf[index()] = ptr;
}
void PtrQueue::handle_zero_index() {
assert(index() == 0, "precondition");
// This thread records the full buffer and allocates a new one (while
// holding the lock if there is one).
if (_buf != NULL) {
if (!should_enqueue_buffer()) {
assert(index() > 0, "the buffer can only be re-used if it's not full");
return;
}
if (_lock) {
assert(_lock->owned_by_self(), "Required.");
BufferNode* node = BufferNode::make_node_from_buffer(_buf, index());
_buf = NULL; // clear shared _buf field
qset()->enqueue_complete_buffer(node);
assert(_buf == NULL, "multiple enqueuers appear to be racing");
} else {
BufferNode* node = BufferNode::make_node_from_buffer(_buf, index());
if (qset()->process_or_enqueue_complete_buffer(node)) {
// Recycle the buffer. No allocation.
assert(_buf == BufferNode::make_buffer_from_node(node), "invariant");
assert(capacity() == qset()->buffer_size(), "invariant");
reset();
return;
}
}
}
// Set capacity in case this is the first allocation.
set_capacity(qset()->buffer_size());
// Allocate a new buffer.
_buf = qset()->allocate_buffer();
reset();
}
bool PtrQueueSet::process_or_enqueue_complete_buffer(BufferNode* node) {
if (Thread::current()->is_Java_thread()) {
// If the number of buffers exceeds the limit, make this Java
// thread do the processing itself. We don't lock to access
// buffer count or padding; it is fine to be imprecise here. The
// add of padding could overflow, which is treated as unlimited.
size_t limit = _max_completed_buffers + _completed_buffers_padding;
if ((_n_completed_buffers > limit) && (limit >= _max_completed_buffers)) {
if (mut_process_buffer(node)) {
// Successfully processed; return true to allow buffer reuse.
return true;
}
}
}
// The buffer will be enqueued. The caller will have to get a new one.
enqueue_complete_buffer(node);
return false;
}
void PtrQueueSet::enqueue_complete_buffer(BufferNode* cbn) {
MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag);
cbn->set_next(NULL);
if (_completed_buffers_tail == NULL) {
assert(_completed_buffers_head == NULL, "Well-formedness");
_completed_buffers_head = cbn;
_completed_buffers_tail = cbn;
} else {
_completed_buffers_tail->set_next(cbn);
_completed_buffers_tail = cbn;
}
_n_completed_buffers++;
if (!_process_completed &&
(_n_completed_buffers > _process_completed_buffers_threshold)) {
_process_completed = true;
if (_notify_when_complete) {
_cbl_mon->notify();
}
}
DEBUG_ONLY(assert_completed_buffer_list_len_correct_locked());
}
enqueue_known_active
判断是否buffer已满,如果已满则调用handle_zero_index
如果未满,则对象入队,_index减去size
handle_zero_index
判断是否全局DCQ,如是则将全局DCQ放入DCQS,并申请为全局DCQ申请新空间
否则,调用process_or_enqueue_complete_buffer处理DCQ
process_or_enqueue_complete_buffer
如果DCQS中队列数量超过阈值,则调用mut_process_buffer,由Mutator线程协助处理引用变更
否则调用enqueue_complete_buffer,有refine线程组处理引用变更
enqueue_complete_buffer
enqueue_complete_buffer的逻辑较简单,将DCQ加入链表,如果没有refine线程在工作,则通过全局monitor通知refine线程
mut_process_buffer是在虚函数,在G1中的实现在dirtyCardQueue.cpp中
bool DirtyCardQueueSet::mut_process_buffer(BufferNode* node) {
guarantee(_free_ids != NULL, "must be");
uint worker_i = _free_ids->claim_par_id(); // temporarily claim an id
G1RefineCardConcurrentlyClosure cl;
bool result = apply_closure_to_buffer(&cl, node, true, worker_i);
_free_ids->release_par_id(worker_i); // release the id
if (result) {
assert_fully_consumed(node, buffer_size());
Atomic::inc(&_processed_buffers_mut);
}
return result;
}
不论是Refine线程和Mutator线程处理,最后都会调用到apply_closure_to_buffer函数执行具体逻辑,Mutator线程处理模式较为简单,直接同步调用即可。这里先介绍Refine线程处理模式。
JVM定义了ConcurrentGCThread作为GC线程的基类
class ConcurrentGCThread: public NamedThread {
friend class VMStructs;
bool volatile _should_terminate;
bool _has_terminated;
public:
ConcurrentGCThread();
bool should_terminate() { return _should_terminate; }
bool has_terminated() { return _has_terminated; }
};
void ConcurrentGCThread::run() {
initialize_in_thread();
wait_for_universe_init();
run_service();
terminate();
}
Refine线程的具体逻辑在g1ConcurrentRefineThread.cpp的run_service方法中
void G1ConcurrentRefineThread::run_service() {
_vtime_start = os::elapsedVTime();
while (!should_terminate()) {
// Wait for work
wait_for_completed_buffers();
if (should_terminate()) {
break;
}
size_t buffers_processed = 0;
log_debug(gc, refine)("Activated worker %d, on threshold: " SIZE_FORMAT ", current: " SIZE_FORMAT,
_worker_id, _cr->activation_threshold(_worker_id),
G1BarrierSet::dirty_card_queue_set().completed_buffers_num());
{
SuspendibleThreadSetJoiner sts_join;
while (!should_terminate()) {
if (sts_join.should_yield()) {
sts_join.yield();
continue; // Re-check for termination after yield delay.
}
if (!_cr->do_refinement_step(_worker_id)) {
break;
}
++buffers_processed;
}
}
deactivate();
log_debug(gc, refine)("Deactivated worker %d, off threshold: " SIZE_FORMAT
", current: " SIZE_FORMAT ", processed: " SIZE_FORMAT,
_worker_id, _cr->deactivation_threshold(_worker_id),
G1BarrierSet::dirty_card_queue_set().completed_buffers_num(),
buffers_processed);
if (os::supports_vtime()) {
_vtime_accum = (os::elapsedVTime() - _vtime_start);
} else {
_vtime_accum = 0.0;
}
}
log_debug(gc, refine)("Stopping %d", _worker_id);
}
card的size定义在cardTable.hpp中
// Constants
enum SomePublicConstants {
card_shift = 9,
card_size = 1 << card_shift,
card_size_in_words = card_size / sizeof(HeapWord)
};
具体写RSet的逻辑在g1RemSet.cpp的do_card_ptr中
bool do_card_ptr(jbyte* card_ptr, uint worker_i) {
// The only time we care about recording cards that
// contain references that point into the collection set
// is during RSet updating within an evacuation pause.
// In this case worker_i should be the id of a GC worker thread.
assert(SafepointSynchronize::is_at_safepoint(), "not during an evacuation pause");
bool card_scanned = _g1rs->refine_card_during_gc(card_ptr, _update_rs_cl);
if (card_scanned) {
_update_rs_cl->trim_queue_partially();
_cards_scanned++;
} else {
_cards_skipped++;
}
return true;
}
size_t cards_scanned() const { return _cards_scanned; }
size_t cards_skipped() const { return _cards_skipped; }
};
bool G1RemSet::refine_card_during_gc(jbyte* card_ptr,
G1ScanObjsDuringUpdateRSClosure* update_rs_cl) {
assert(_g1h->is_gc_active(), "Only call during GC");
// Construct the region representing the card.
HeapWord* card_start = _ct->addr_for(card_ptr);
// And find the region containing it.
uint const card_region_idx = _g1h->addr_to_region(card_start);
HeapWord* scan_limit = _scan_state->scan_top(card_region_idx);
if (scan_limit == NULL) {
// This is a card into an uncommitted region. We need to bail out early as we
// should not access the corresponding card table entry.
return false;
}
check_card_ptr(card_ptr, _ct);
// If the card is no longer dirty, nothing to do. This covers cards that were already
// scanned as parts of the remembered sets.
if (*card_ptr != G1CardTable::dirty_card_val()) {
return false;
}
// We claim lazily (so races are possible but they're benign), which reduces the
// number of potential duplicate scans (multiple threads may enqueue the same card twice).
*card_ptr = G1CardTable::clean_card_val() | G1CardTable::claimed_card_val();
_scan_state->add_dirty_region(card_region_idx);
if (scan_limit <= card_start) {
// If the card starts above the area in the region containing objects to scan, skip it.
return false;
}
// Don't use addr_for(card_ptr + 1) which can ask for
// a card beyond the heap.
HeapWord* card_end = card_start + G1CardTable::card_size_in_words;
MemRegion dirty_region(card_start, MIN2(scan_limit, card_end));
assert(!dirty_region.is_empty(), "sanity");
HeapRegion* const card_region = _g1h->region_at(card_region_idx);
assert(!card_region->is_young(), "Should not scan card in young region %u", card_region_idx);
bool card_processed = card_region->oops_on_card_seq_iterate_careful<true>(dirty_region, update_rs_cl);
assert(card_processed, "must be");
return true;
}
综上所述,当JAVA进程修改了对象引用关系后,如果被引用对象处于老年代中,则将数据写入DCQ中,当DCQ满后,将DCQ加入DCQS中,并由Refine线程组离线更新RSet。如果Refine线程组线程数量超过阈值时,由Mutator线程协助处理。
jdk12源代码[https://hg.openjdk.java.net/jdk/jdk12]