WebRTC学习进阶之路 — 十七、源码分析之WebRTC的数据流水线详解&模块机制核心ProcessThread与ProcessThreadImpl

  • Post author:
  • Post category:其他


WebRTC学习进阶之路系列总目录:https://blog.csdn.net/xiaomucgwlmx/article/details/103204274

对于实时音视频应用来讲,媒体数据从采集到渲染,在数据流水线上依次完成一系列处理。流水线由不同的功能模块组成,彼此分工协作:数据采集模块负责从摄像头/麦克风采集音视频数据,编解码模块负责对数据进行编解码,RTP模块负责数据打包和解包。数据流水线上的数据处理速度是影响应用实时性的最重要因素。与此同时,从服务质量保证角度讲,应用需要知道数据流水线的运行状态,如视频采集模块的实时帧率、当前网络的实时速率、接收端的数据丢包率,等等。各个功能模块可以基于这些运行状态信息作相应调整,从而在质量、速度等方面优化数据流水线的运行,实现更快、更好的用户体验。

WebRTC采用模块机制,把数据流水线上功能相对独立的处理点定义为模块,每个模块专注于自己的任务,模块之间基于数据流进行通信。与此同时,专有线程收集和处理模块内部的运行状态信息,并把这些信息反馈到目标模块,实现模块运行状态监控和服务质量保证。

下面我们来具体看下WebRTC的模块处理机制和数据流水线的原理和实现。

一、WebRTC模块

最新的源码中WebRTC模块虚基类Module定义在webrtc/modules/include/modue.h中,具体内容如下:

namespace webrtc {
 
class ProcessThread;
 
class Module {
 public:
  // Returns the number of milliseconds until the module wants a worker
  // thread to call Process.
  // This method is called on the same worker thread as Process will
  // be called on.
  // TODO(tommi): Almost all implementations of this function, need to know
  // the current tick count.  Consider passing it as an argument.  It could
  // also improve the accuracy of when the next callback occurs since the
  // thread that calls Process() will also have it's tick count reference
  // which might not match with what the implementations use.
  virtual int64_t TimeUntilNextProcess() = 0;
 
  // Process any pending tasks such as timeouts.
  // Called on a worker thread.
  virtual void Process() = 0;
 
  // This method is called when the module is attached to a *running* process
  // thread or detached from one.  In the case of detaching, |process_thread|
  // will be nullptr.
  //
  // This method will be called in the following cases:
  //
  // * Non-null process_thread:
  //   * ProcessThread::RegisterModule() is called while the thread is running.
  //   * ProcessThread::Start() is called and RegisterModule has previously
  //     been called.  The thread will be started immediately after notifying
  //     all modules.
  //
  // * Null process_thread:
  //   * ProcessThread::DeRegisterModule() is called while the thread is
  //     running.
  //   * ProcessThread::Stop() was called and the thread has been stopped.
  //
  // NOTE: This method is not called from the worker thread itself, but from
  //       the thread that registers/deregisters the module or calls Start/Stop.
  virtual void ProcessThreadAttached(ProcessThread* process_thread) {}
 
 protected:
  virtual ~Module() {}
};
}  // namespace webrtc

总共有三个API,源码中如上也有详细的注释,简单来说就是:

TimeUntilNextProcess()用来计算距下次调用处理函数Process()的时间间隔,Called on a worker thread;

Process()是模块的处理函数,负责模块内部运行监控、状态更新和模块间通信,Called on a worker thread;

ProcessThreadAttached()用来把模块挂载到模块处理线程,或者从模块处理线程分离出来,不是从worker thread本身调用此方法,而是从注册/注销模块或调用启动/停止的线程调用该方法。

Module的派生类分布在WebRTC数据流水线上的不同部分,各自承担自己的数据处理和服务质量保证任务。从上边Module中我们可以看到一个叫做ProcessThread的类出现,这个就是下面我们将要说的模块梳理的核心处理线程。

二、WebRTC模块处理线程

WebRTC模块处理线程是模块处理机制的驱动器,它的核心作用是对所有挂载在本线程下的模块,周期性调用其Process()处理函数处理模块内部事务,并处理异步任务。

虚基类ProcessThread

虚基类ProcessThread在webrtc/modules/utility/include/process_thread.h中,具体内容如下:

namespace webrtc {
class Module;
 
// TODO(tommi): ProcessThread probably doesn't need to be a virtual
// interface.  There exists one override besides ProcessThreadImpl,
// MockProcessThread, but when looking at how it is used, it seems
// a nullptr might suffice (or simply an actual ProcessThread instance).
class ProcessThread {
 public:
  virtual ~ProcessThread();
 
  static std::unique_ptr<ProcessThread> Create(const char* thread_name);
 
  // Starts the worker thread.  Must be called from the construction thread.
  virtual void Start() = 0;
 
  // Stops the worker thread.  Must be called from the construction thread.
  virtual void Stop() = 0;
 
  // Wakes the thread up to give a module a chance to do processing right
  // away.  This causes the worker thread to wake up and requery the specified
  // module for when it should be called back. (Typically the module should
  // return 0 from TimeUntilNextProcess on the worker thread at that point).
  // Can be called on any thread.
  virtual void WakeUp(Module* module) = 0;
 
  // Queues a task object to run on the worker thread.  Ownership of the
  // task object is transferred to the ProcessThread and the object will
  // either be deleted after running on the worker thread, or on the
  // construction thread of the ProcessThread instance, if the task did not
  // get a chance to run (e.g. posting the task while shutting down or when
  // the thread never runs).
  virtual void PostTask(std::unique_ptr<QueuedTask> task) = 0;
 
  // Adds a module that will start to receive callbacks on the worker thread.
  // Can be called from any thread.
  virtual void RegisterModule(Module* module, const rtc::Location& from) = 0;
 
  // Removes a previously registered module.
  // Can be called from any thread.
  virtual void DeRegisterModule(Module* module) = 0;
};
 
}  // namespace webrtc

Start()/Stop()函数用来启动和结束线程,必须从构造线程中调用;

WakeUp()函数用来唤醒挂载在本线程下的某个模块,使得该模块有机会马上执行其Process()处理函数;

PostTask()函数投递一个任务给本线程,任务对象的所有权已转移到ProcessThread,如果该任务没有运行的机会(例如,在关闭或线程永不运行时),则该对象将在工作线程被删除;

RegisterModule()和DeRegisterModule()用来向线程注册/注销模块。

派生类ProcessThreadImpl

ProcessThread源码在webrtc/modules/utility/sourcr/process_thread_impl.h中,具体内容如下:

namespace webrtc {
 
class ProcessThreadImpl : public ProcessThread {
 public:
  explicit ProcessThreadImpl(const char* thread_name);
  ~ProcessThreadImpl() override;
 
  void Start() override;
  void Stop() override;
 
  void WakeUp(Module* module) override;
  void PostTask(std::unique_ptr<QueuedTask> task) override;
 
  void RegisterModule(Module* module, const rtc::Location& from) override;
  void DeRegisterModule(Module* module) override;
 
 protected:
  static void Run(void* obj);
  bool Process();
 
 private:
  struct ModuleCallback {
    ModuleCallback() = delete;
    ModuleCallback(ModuleCallback&& cb) = default;
    ModuleCallback(const ModuleCallback& cb) = default;
    ModuleCallback(Module* module, const rtc::Location& location)
        : module(module), location(location) {}
    bool operator==(const ModuleCallback& cb) const {
      return cb.module == module;
    }
 
    Module* const module;
    int64_t next_callback = 0;  // Absolute timestamp.
    const rtc::Location location;
 
   private:
    ModuleCallback& operator=(ModuleCallback&);
  };
 
  typedef std::list<ModuleCallback> ModuleList;
 
  // Warning: For some reason, if |lock_| comes immediately before |modules_|
  // with the current class layout, we will  start to have mysterious crashes
  // on Mac 10.9 debug.  I (Tommi) suspect we're hitting some obscure alignemnt
  // issues, but I haven't figured out what they are, if there are alignment
  // requirements for mutexes on Mac or if there's something else to it.
  // So be careful with changing the layout.
  rtc::CriticalSection lock_;  // Used to guard modules_, tasks_ and stop_.
 
  rtc::ThreadChecker thread_checker_;
  rtc::Event wake_up_;
  // TODO(pbos): Remove unique_ptr and stop recreating the thread.
  std::unique_ptr<rtc::PlatformThread> thread_;
 
  ModuleList modules_;
  std::queue<QueuedTask*> queue_;
  bool stop_;
  const char* thread_name_;
};
 
}  // namespace webrtc

这里的方法即就是实现了基类的几个方法,有几个重要的成员变量我们看下:

rtc::Event wake_up_:唤醒处于等待状态的线程;

ModuleList modules_:注册在本线程下的模块集合;

queue_:投递给本线程的任务集合;

thread_name_:线程名称;

处理线程的核心便是Process()方法,我们来具体看下:

bool ProcessThreadImpl::Process() {
  TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
  int64_t now = rtc::TimeMillis();
  int64_t next_checkpoint = now + (1000 * 60);
 
  {
    rtc::CritScope lock(&lock_);
    if (stop_)
      return false;
    for (ModuleCallback& m : modules_) {
      // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
      // takes and dcheck if it takes too long (e.g. >=10ms).  Ideally this
      // operation should not require taking a lock, so querying all modules
      // should run in a matter of nanoseconds.
      if (m.next_callback == 0)
        m.next_callback = GetNextCallbackTime(m.module, now);
 
      if (m.next_callback <= now ||
          m.next_callback == kCallProcessImmediately) {
        {
          TRACE_EVENT2("webrtc", "ModuleProcess", "function",
                       m.location.function_name(), "file",
                       m.location.file_and_line());
          m.module->Process();
        }
        // Use a new 'now' reference to calculate when the next callback
        // should occur.  We'll continue to use 'now' above for the baseline
        // of calculating how long we should wait, to reduce variance.
        int64_t new_now = rtc::TimeMillis();
        m.next_callback = GetNextCallbackTime(m.module, new_now);
      }
 
      if (m.next_callback < next_checkpoint)
        next_checkpoint = m.next_callback;
    }
 
    while (!queue_.empty()) {
      QueuedTask* task = queue_.front();
      queue_.pop();
      lock_.Leave();
      task->Run();
      delete task;
      lock_.Enter();
    }
  }
 
  int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
  if (time_to_wait > 0)
    wake_up_.Wait(static_cast<int>(time_to_wait));
 
  return true;
}

Process()函数首先处理挂载在本线程下的模块,这也是模块处理线程的核心任务:针对每个模块,计算其下次调用模块Process()处理函数的时刻(调用该模块的TimeUntilNextProcess()函数)。如果时刻是当前时刻,则调用模块的Process()处理函数,并更新下次调用时刻。需要注意,不同模块的执行频率不一样,线程在本轮调用末尾的等待时间和本线程下所有模块的最近下次调用时刻相关。

接下来线程Process()函数处理ProcessTask队列中的异步任务,针对每个任务调用Run()函数,然后任务出队列并销毁。等模块调用和任务都处理完后,则把本次时间片的剩余时间等待完毕,然后返回。如果在等待期间其他线程向本线程Wakeup模块或者邮递一个任务,则线程被立即唤醒并返回,进行下一轮时间片的执行。

ProcessThread与ProcessThreadImpl的关系

在这里插入图片描述

三、WebRTC数据流水线

我们可以把WebRTC看作是一个专注于实时音视频通信的SDK。其对外的API主要负责PeerConnection建立、MediaStream创建、NAT穿透、SDP协商等工作,对内则主要集中于音视频数据的处理,从数据采集到渲染的整个处理过程可以用一个数据流水线来描述,如图所示:

在这里插入图片描述

音视频数据首先从采集端进行采集,一般来说音频数据来自麦克风,视频数据来自摄像头。在某些应用场景下,音频数据来自扬声器,视频数据来自桌面共享。采集端的输出是音视频Raw Data。然后Raw Data到达编码模块,数据被编码器编码成符合语法规则的NAL单元,到达发送端缓冲区PacedSender处。接下来PacedSender把NAL单元发送到RTP模块打包为RTP数据包,最后经过网络模块发送到网络。

在接收端,RTP数据包经过网络模块接收后进行解包得到NAL单元,接下来NAL单元到达接收端缓冲区(JitterBuffer或者NetEQ)进行乱序重排和组帧。一帧完整的数据接收并组帧之后,调用解码模块进行解码,得到该帧数据的Raw Data。最后Raw Data交给渲染模块进行播放/显示。

在数据流水线上,还有一系列模块负责服务质量监控,如丢帧策略,丢包策略,编码器过度使用保护,码率估计,前向纠错,丢包重传,等等。

WebRTC数据流水线上的功能单元被定义为模块,每个模块从上游模块获取输入数据,在本模块进行加工后得到输出数据,交给下游模块进行下一步处理。WebRTC的模块处理机制包括模块和模块处理线程,模块把WebRTC数据流水线上的功能部件封装为模块,模块处理线程驱动模块内部状态更新和模块之间状态传递。模块一般挂载到模块处理线程上,由处理线程驱动模块的处理函数。