当前位置: 首页 > 工具软件 > Muduo > 使用案例 >

muduo库的高性能日志库(五)——AsyncLogging文件

庄萧迟
2023-12-01

概述

这一部分就是muduo库之所以十分高效的原因,将前端与后端联系起来,实现了多生产者单消费者的异步网络日志库。

在多线程服务器程序当中,异步日志(非阻塞日志)是必须的,因为如果在网络IO线程或业务线程中直接往磁盘写数据的话,写操作偶尔可能阻塞长达数秒之久。这可能导致请求方超时,或耽误发送心跳消息,在分布式系统过更可能发生连锁效应。因此在正常的实现业务处理流程中应该彻底避免磁盘IO,这在one loop per thread模型的非阻塞服务端程序中尤为重要

问题

  1. 在AsyncLogging中的CountDownLatch,有什么用?是否必要呢?

设计结构

使用双缓冲技术,准备两块缓冲A和B,缓冲A接收日志消息,缓冲B将日志消息写入磁盘。当A写满,交换A和B,后端将B写入磁盘文件,前端则往A写的日志消息,如此反复。

  //Buffer使用c++11的std::unique_ptr进行管理,自动管理声明周期,其具备移动语义,
  //提高了缓冲区交换的效率。mutex_用于保护后面4个成员。buffer_存放供后端写入的buffer(目前最多16个)。
  typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; //缓冲区约4MB(4000*1000;)
  typedef std::vector<std::unique_ptr<Buffer>> BufferVector;  //因为要采用多缓冲技术,用vector来管理多个缓冲区
  typedef BufferVector::value_type BufferPtr;   //指针

  const int flushInterval_;
  //atomic修饰变量是原子类型
  std::atomic<bool> running_;
  const string basename_; //日志文件名
  const off_t rollSize_;  //滚动大小
  muduo::Thread thread_;
  muduo::CountDownLatch latch_;
  muduo::MutexLock mutex_;
  muduo::Condition cond_ GUARDED_BY(mutex_);
  BufferPtr currentBuffer_ GUARDED_BY(mutex_);  //当前缓冲区
  BufferPtr nextBuffer_ GUARDED_BY(mutex_);   //预备缓冲区
  BufferVector buffers_ GUARDED_BY(mutex_);   //待写入文件的已填满缓冲,供后端使用

多生产者(产生日志消息)

前端在生产一条日志消息短的时候会调用AsyncLogging::append
在这个函数中,若当前缓冲区currentBuffer_最够大,就直接写入该缓冲区。
currentBuffer_缓冲区剩余空间不够大,使用move将当前缓冲区移动到待写入缓冲区队列当中(Buffers_),则使用移动语义move将备用缓冲区移用为当前缓冲区,然后在进行消息追加。
当前端写入速度过快,一下子把两个缓冲区都用完了,那就重新分配一块buffer,作为当前缓冲区。

//日志发送者(多生产者)
void AsyncLogging::append(const char* logline, int len)
{
  muduo::MutexLockGuard lock(mutex_);
  //判断当前缓冲区是否空间足够
  if (currentBuffer_->avail() > len)
  {
    //直接追加到当前缓冲区当中
    currentBuffer_->append(logline, len);
  }
  //当前缓冲区空间不够
  else
  {
    //使用move将当前缓冲区移动到待写入文件队列当中
    buffers_.push_back(std::move(currentBuffer_));
    //查看备用缓冲区是否还在
    if (nextBuffer_)
    {
      //存在
      //当备用缓冲区移动给当前缓冲区
      currentBuffer_ = std::move(nextBuffer_);
    }
    else得在start结得在start结
    {
      //不存在,new一个新的buffer
      currentBuffer_.reset(new Buffer); // Rarely happens
    }
    currentBuffer_->append(logline, len);
    cond_.notify();   //通知消费者(写端),有一个待写入的已填满缓冲
  }
}

单消费(消费日志消息)

要提前准备两个缓冲区
目的是缩短临界区大小,防止阻塞


//单消费者
void AsyncLogging::threadFunc()
{
  assert(running_ == true);
  latch_.countDown();
  LogFile output(basename_, rollSize_, false); //采用非线程安全
  BufferPtr newBuffer1(new Buffer);
  BufferPtr newBuffer2(new Buffer);
  newBuffer1->bzero();
  newBuffer2->bzero();
  BufferVector buffersToWrite;
  buffersToWrite.reserve(16);
  while (running_)
  {
    assert(newBuffer1 && newBuffer1->length() == 0);
    assert(newBuffer2 && newBuffer2->length() == 0);
    assert(buffersToWrite.empty());
    //临界区
    {
      muduo::MutexLockGuard lock(mutex_);
      //暂无待写入内容
      if (buffers_.empty())  // unusual usage!
      {
        cond_.waitForSeconds(flushInterval_); //有超时时长
      }
      //将当前缓冲区放入待写入队列当中
      buffers_.push_back(std::move(currentBuffer_));

      //将新的缓冲移动给当前缓冲
      currentBuffer_ = std::move(newBuffer1);

      //交换缓冲区,可以缩短临界区。这样就可以在非临界区完成对待写入队列进行操作
      buffersToWrite.swap(buffers_);

      //保证前端一直有预备缓冲区
      if (!nextBuffer_)
      {
        nextBuffer_ = std::move(newBuffer2);
      }
    }
  
    //出临界区
    assert(!buffersToWrite.empty());
    //短时间写入大量日志,超出待写入缓冲区大小限制,日志堆积,为异常情况
    if (buffersToWrite.size() > 25)
    {
      char buf[256];
      snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
               Timestamp::now().toFormattedString().c_str(),
               buffersToWrite.size()-2);
      fputs(buf, stderr);
      output.append(buf, static_cast<int>(strlen(buf)));
      //可能是避免浪费,后面还要使用
      buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
    }

    for (const auto& buffer : buffersToWrite)
    {
      // FIXME: use unbuffered stdio FILE ? or use ::writev ?
      //数据写入
      output.append(buffer->data(), buffer->length());
    }

    if (buffersToWrite.size() > 2)
    {
      // drop non-bzero-ed buffers, avoid trashing
      //丢弃垃圾
      buffersToWrite.resize(2);
    }


   //重新填充newbuffer1和newbuffer2
    if (!newBuffer1)
    {
      assert(!buffersToWrite.empty());
      newBuffer1 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer1->reset();
    }

    if (!newBuffer2)
    {
      assert(!buffersToWrite.empty());
      newBuffer2 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer2->reset();
    }
    //清空
    buffersToWrite.clear();
    output.flush();
  }

  //日志关闭再flush
  output.flush();
}


问题解决

1.这里的CountDownLatch作用就是,让wait()之后的语句在countDown()之后执行,这里就是想在start()结束前进入threadFunc()函数。若不添加CountDownLatch,就可能会导致前端调用第一次的notify()唤醒丢失。也就是生产者notify()以后,消费者(异步线程ThreadFunc)未收到信号,而生产者已将消息产出。

思考一下,这里的CountDownLatch是否必要呢,在我看来是不必要的原因如下

因为每一次notify(),都会把前面收到的信息一起写入文件,极端情况,生产者只notify()一次就退出,那么在flushInterval_之后,之前消息也会被写入,就算不到flushInterval_那么也会在析构函数中进行一次notify()保证无消息漏掉。

综上我认为CountDownLatch并不必要

 类似资料: