muduo源码剖析——以三个切片浅析muduo库代码设计的严谨性、高效性与灵活性

  • Post author:
  • Post category:其他




0 前言

陈硕大佬的muduo网络库的源码我已经看了好久了,奈何本人实力有限,每每看到其代码设计的精巧之处只能内心称赞,无法用言语表达出来。实在令人汗颜。最近在看到网络设计部分时有了一些体会,结合自己之前在网络编程方面的积累,特对代码设计中的一些精巧之处做一些总结。

就muduo在多线程并发服务器设计而言,除了其高效的并发服务架构之外,其在代码设计方面的高效性和灵活性可以从下面三个切片得以体现。

在这之前,首先说明muduo的

并发服务器架构

为multi-reactors+thread pool架构,其架构图如下:

简单而言,mainReactor负责处理新连接IO的创建和管理,subReactor+thread pool负责已连接IO的读写事件。其中,为了降低subReactor的压力、提高IO读写效率,可以添加多个subReactor。

在这里插入图片描述



1 切片一:EventLoop线程绑定与跨线程调用

muduo_net设计中,每一个EventLoop对象对应一个reactor,并且每个EventLoop对象绑定一个线程,EventLoop对象中的loop()在内的多数函数只能由该对象所属的线程调用,即EventLoop对象有严格的线程所属特性。某些函数可以由外部线程调用,可以起到激活睡眠或阻塞的EventLoop当前线程的作用。

首先看一下EventLoop类的成员变量(部分):

  typedef std::vector<Channel*> ChannelList;
  
  bool looping_; /* atomic */
  bool quit_; /* atomic */
  bool eventHandling_; /* atomic */
  bool callingPendingFunctors_; /* atomic */
  const pid_t threadId_;		// 当前对象所属线程ID
  Timestamp pollReturnTime_;
  boost::scoped_ptr<Poller> poller_;
  boost::scoped_ptr<TimerQueue> timerQueue_;
  int wakeupFd_;				// 用于保存eventfd创建的file descriptor--进程间通信
  // unlike in TimerQueue, which is an internal class,
  // we don't expose Channel to client.
  // eventfd的wakeupFd_对应的通道 该通道将会纳入poller_来管理
  boost::scoped_ptr<Channel> wakeupChannel_;// EventLoop对象负责wakeupChannel_对象的创建/生命周期
  // Poller返回的活动通道	
  ChannelList activeChannels_;// 这些channel的生存期不由EventLoop管理和负责-->由TcpConnection和TimerQueue管理
  Channel* currentActiveChannel_;	// 当前正在处理的活动通道
  MutexLock mutex_;
  std::vector<Functor> pendingFunctors_; // @BuardedBy mutex_

具体体现如下。



1.1 判断当前线程是否已经拥有所属EventLoop对象

主要通过下面两条实现。

1 使用线程局部变量记录当前线程是否已经拥有所属EventLoop对象

// 当前线程EventLoop对象指针
// 线程局部存储
__thread EventLoop* t_loopInThisThread = 0;// 指向EventLoop对象

2 在创建EventLoop对象前进行判断

EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    eventHandling_(false),
    callingPendingFunctors_(false),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),
    timerQueue_(new TimerQueue(this)),
    wakeupFd_(createEventfd()),// 创建一个eventfd
    wakeupChannel_(new Channel(this, wakeupFd_)),// 创建一个通道 并将wawkeupFd_传进Channel
    currentActiveChannel_(NULL)
{
  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
  
  /* 如果当前线程已经创建了EventLoop对象,终止(LOG_FATAL) */
  if (t_loopInThisThread)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;// 指向EventLoop对象
  }
  
  // 绑定eventfd的回调处理函数handleRead()
  wakeupChannel_->setReadCallback(
      boost::bind(&EventLoop::handleRead, this));
  // we are always reading the wakeupfd
  // 绑定eventfd对应事件--EPOLLIN可读事件
  wakeupChannel_->enableReading();
}



1.2 执行函数前判断当前线程是否为EventLoop对象所属线程

通过assertInLoopThread()和isInLoopThread()函数进行判断。

  void assertInLoopThread()
  {
    if (!isInLoopThread())
    {
      abortNotInLoopThread();
    }
  }
// ...
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }



1.3 部分函数可以跨线程调用以唤醒阻塞的EventLoop所属线程

以quit()函数为例,当要停止loop()循环时,此时EventLoop对象所属线程可能正处于poll的阻塞中,因此此时需要借助外部进程调用该quit()同时唤醒处于阻塞中的EventLoop对象所属线程,并退出poll循环。

// 该函数可以跨线程调用
void EventLoop::quit()
{
  quit_ = true;
  if (!isInLoopThread())
  {
    wakeup();
  }
}



2 切片二:IO线程与计算线程的灵活调度

为了充分了利用CPU,在IO线程空间时,可以为loop()线程分配一些计算任务。因此,muduo_net既可以完成IO处理又能进行计算任务。

具体如下。



2.1 poll+handleEvent

该部分主要负责IO事件的监听、响应和处理。

// 事件循环,该函数不能跨线程调用
// 只能在创建该对象的线程中调用
void EventLoop::loop()
{
  assert(!looping_);
  // 断言当前处于创建该对象的线程中
  assertInLoopThread();
  looping_ = true;
  quit_ = false;
  LOG_TRACE << "EventLoop " << this << " start looping";

  //::poll(NULL, 0, 5*1000);
  while (!quit_)
  {
    activeChannels_.clear();
    /* IO事件监听、响应和处理 */
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    //++iteration_;
    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    // TODO sort channel by priority
    eventHandling_ = true;
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
      currentActiveChannel_ = *it;
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
    
    /* 处理计算任务 */
    doPendingFunctors();// 让IO线程不繁忙的时候也能执行一些计算任务-->避免IO线程一直处于阻塞状态
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}



2.2 queueInLoop+doPendingFunctors

该部分负责计算任务的添加和处理。其中添加计算任务queueInLoop可以由外部线程和EventLoop所属线程添加,执行计算任务doPendingFunctors只能由EventLoop线程处理。

// 将cb添加到队列中
void EventLoop::queueInLoop(const Functor& cb)
{
  {
  MutexLockGuard lock(mutex_);
  pendingFunctors_.push_back(cb);// 添加外部任务到pendingFunctors_数组
  }

  // 调用queueInLoop的线程不是IO线程需要唤醒(唤醒EventLoop对应的线程 以便该线程及时执行cb函数)
  // 或者调用queueInLoop的线程是EventLoop对应的IO线程,并且此时该线程正在调用pending functor(正在执行计算任务),需要唤醒
  // 只有IO线程的事件回调中调用queueInLoop才不需要唤醒
  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();
  }
}


void EventLoop::doPendingFunctors()
{
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;

  // 添加互斥锁
  // 互斥访问vector--此时pendingFunctors_位于临界区 不能被其他线程访问
  {
  MutexLockGuard lock(mutex_);
  functors.swap(pendingFunctors_);// swap:交换两容器的内容 pendingFunctors_变为空
  }

  // 为什么functors的执行没有放在临界区?
  // 1 减小临界区长度 减少其他进程queueInloop()的阻塞时间
  // 2 loop()线程发生IO事件时 可以及时中断该doPendingFunctors()函数中的计算任务,优先处理IO事件
  for (size_t i = 0; i < functors.size(); ++i)
  {
    functors[i]();// 执行函数
  }
  callingPendingFunctors_ = false;
}



3 切片三:线程安全与执行效率

下面通过doPendingFunctors()函数观察muduo在实现线程安全方与保证执行效率之间权衡的设计。

该函数主要包含两个部分:获取待处理计算任务序列、依次执行计算任务。

其中pendingFunctors_存放计算任务的数组作为外部线程和EventLoop线程都能够处理(插入数据)的共享变量,EventLoop线程在获取计算任务序列时需要对pendingFunctors_进行加锁处理。

这其中涉及到锁的作用范围即临界区的作用范围的问题。muduo此处只将对pendingFunctors_的操作置于临界区,而将执行计算任务的操作置于临界区之外。原因主要有两个:


1 减小临界区长度 减少其他进程queueInloop()的阻塞时间;

2 loop()线程发生IO事件时 可以及时中断该doPendingFunctors()函数中的计算任务,优先处理IO事件。

void EventLoop::doPendingFunctors()
{
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;

  /* 1 获取待处理计算任务序列 */
  // 添加互斥锁
  // 互斥访问vector--此时pendingFunctors_位于临界区 不能被其他线程访问(其他线程不能操作pendingFunctors_数组)
  {
  MutexLockGuard lock(mutex_);
  functors.swap(pendingFunctors_);// swap:交换两容器的内容 pendingFunctors_变为空
  }

  /* 依次执行计算任务 */
  // 为什么functors的执行没有放在临界区?
  // 1 减小临界区长度 减少其他进程queueInloop()的阻塞时间
  // 2 loop()线程发生IO事件时 可以及时中断该doPendingFunctors()函数中的计算任务,优先处理IO事件
  for (size_t i = 0; i < functors.size(); ++i)
  {
    functors[i]();// 执行函数
  }
  callingPendingFunctors_ = false;
}



4 参考材料


https://www.bilibili.com/video/BV11b411q7zr?p=29



版权声明:本文为qq_33429968原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。