当前位置: 首页 > 工具软件 > LevelDB > 使用案例 >

leveldb之write

谢俊力
2023-12-01

leveldb 不论写入还是删除都会调用该部分程序。该部分程序的看似短小但是思想却是相当经典经典。下面我们来分析下这块代码。

一般来说对于多线程我们都会通过锁机制来实现写入数据的正确性,但是每次都进行独占锁开销是很大的。下面的这块代码真的是颠覆了我的认识。

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;
//这部分代码相当精彩也很难理解
  MutexLock l(&mutex_);     //当多线程写入的时候我们都先进行锁定该区域 
  writers_.push_back(&w);   //将写入内容入队列
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();      //如果本次写入不是队列的头部,而且没有写入则等待
  }
  if (w.done) {        //写完后会把它设置成true 可能其他写入的时候顺带的把自己也写了那么直接返回,实现批量写入。
    return w.status;      
  }<span style="white-space:pre">	</span>
//进入下面的逻辑时队列内容是不会改变的因为上面是锁定的
  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(my_batch == NULL);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
    WriteBatch* updates = BuildBatchGroup(&last_writer);                  //如果是队列首元素则会进行组装。将后面的没有写入的也顺带写入<span style="font-family: Arial, Helvetica, sans-serif;">last_write</span>
<span style="font-family: Arial, Helvetica, sans-serif;">r   最后指向的是该次批量时最后一个被组装的元素</span>

    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();                                                     //解锁,这时候不会影响后面的写入,因为其他的肯定不是队列首元素所以上面机会wait
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));     //写log
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(updates, mem_);            //写memtable
      }
      mutex_.Lock();                       
    }                                                                      //上面ulock到lock之间会有很多写入被添加进写入队列。
    if (updates == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {  //自己本本身不需要Signal 因为根本就没有wait,或者说是被其他线程已经唤醒了
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;                         //这里表示我们已经弹出了本次写入的最后一个元素
  }

  // Notify new head of write queue
  if (!writers_.empty()) {                                   //在此通知下一个对列首元素可以写入了
    writers_.front()->cv.Signal();
  }

  return status;
}
//下面是进行批量组装的过程
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-NULL batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
  assert(!writers_.empty());
  Writer* first = writers_.front();
  WriteBatch* result = first->batch;
  assert(result != NULL);

  size_t size = WriteBatchInternal::ByteSize(first->batch);

  // Allow the group to grow up to a maximum size, but if the
  // original write is small, limit the growth so we do not slow
  // down the small write too much.
  size_t max_size = 1 << 20;
  if (size <= (128<<10)) {
    max_size = size + (128<<10);
  }

  *last_writer = first;
  std::deque<Writer*>::iterator iter = writers_.begin();
  ++iter;  // Advance past "first"
  for (; iter != writers_.end(); ++iter) {
    Writer* w = *iter;
    if (w->sync && !first->sync) {
      // Do not include a sync write into a batch handled by a non-sync write.
      break;
    }

    if (w->batch != NULL) {
      size += WriteBatchInternal::ByteSize(w->batch);
      if (size > max_size) {
        // Do not make batch too big
        break;
      }

      // Append to *reuslt
      if (result == first->batch) {
        // Switch to temporary batch instead of disturbing caller's batch
        result = tmp_batch_;
        assert(WriteBatchInternal::Count(result) == 0);
        WriteBatchInternal::Append(result, first->batch);
      }
      WriteBatchInternal::Append(result, w->batch);
    }
    *last_writer = w;
  }
  return result;
}


 类似资料: