ros melodic源码分析之poll_manager

  • Post author:
  • Post category:其他




一、poll底层操作

涉及源文件 io.cpp io.h,关键源码如下

int create_socket_watcher() {
  epfd = ::epoll_create1(0);
  return epfd;
}

void add_socket_to_watcher(int epfd, int fd) {
  struct epoll_event ev;
  bzero(&ev, sizeof(ev));

  ev.events = 0;
  ev.data.fd = fd;

  if (::epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev))
}

void del_socket_from_watcher(int epfd, int fd) {
  if (::epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL))
}

void set_events_on_socket(int epfd, int fd, int events) {
  struct epoll_event ev;
  bzero(&ev, sizeof(ev));

  ev.events = events;
  ev.data.fd = fd;
  if (::epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev))
}

上述代码中

create_socket_watcher创建epoll句柄,

add_socket_to_watcher del_socket_from_watcher实现增加删除监控文件fd,

set_events_on_socket实现监控事件的设置

pollfd_vector_ptr poll_sockets(int epfd, socket_pollfd *fds, nfds_t nfds, int timeout) {

	struct epoll_event ev[nfds];
	pollfd_vector_ptr ofds;

	int fd_cnt = ::epoll_wait(epfd, ev, nfds, timeout);

	if (fd_cnt < 0)
	{
        //...
	}
	else
	{
		ofds.reset(new std::vector<socket_pollfd>);
		for (int i = 0; i < fd_cnt; i++)
		{
			socket_pollfd pfd;
			pfd.fd = ev[i].data.fd;
			pfd.revents = ev[i].events;
			ofds->push_back(pfd);
		}
	}
	return ofds;
}

上述代码完成了epoll等待事件发生,有事件时返回的ofds中包含监控到的fd以及revents

int create_signal_pair(signal_fd_t signal_pair[2]) {

	pipe(signal_pair);
	fcntl(signal_pair[0], F_SETFL, O_NONBLOCK);
	fcntl(signal_pair[1], F_SETFL, O_NONBLOCK);
}

创建pipe,并设置为O_NONBLOCK



二、poll_set分析

涉及到poll_set.h poll_set.cpp,关键源码如下:

class ROSCPP_DECL PollSet
{
public:
  PollSet();
  ~PollSet();

  typedef boost::function<void(int)> SocketUpdateFunc;

  //增加删除监控sock 以及event
  bool addSocket(int sock, const SocketUpdateFunc& update_func, const TransportPtr& transport = TransportPtr());
  bool delSocket(int sock);
  bool addEvents(int sock, int events);
  bool delEvents(int sock, int events);

  //poll_set update中处理是否有监控事件发生
  void update(int poll_timeout);

  void signal();

private:

  void createNativePollset();

  void onLocalPipeEvents(int events);

  //监控socket消息
  struct SocketInfo
  {
    TransportPtr transport_;
    SocketUpdateFunc func_;
    int fd_;
    int events_;
  };
  typedef std::map<int, SocketInfo> M_SocketInfo;
  M_SocketInfo socket_info_;  //将监控的socket_info放到map中
  boost::mutex socket_info_mutex_;
  bool sockets_changed_;

  boost::mutex just_deleted_mutex_;
  typedef std::vector<int> V_int;
  V_int just_deleted_;

  std::vector<socket_pollfd> ufds_;

  boost::mutex signal_mutex_;
  signal_fd_t signal_pipe_[2];//pipe(signal_pair)生成的pipe句柄

  int epfd_;  //epfd = ::epoll_create1(0);epoll操作使用的fd
};

上述声明了poll_set的操作,其实现关键源码如下:

PollSet::PollSet()
    : sockets_changed_(false), epfd_(create_socket_watcher())//epfd_调用create_socket_watcher生成epoll句柄
{
    //创建本地pipe
	if ( create_signal_pair(signal_pipe_) != 0 ) {
        //...
  }
  //signal_pipe_[0] POLLIN时的回调onLocalPipeEvents
  addSocket(signal_pipe_[0], boost::bind(&PollSet::onLocalPipeEvents, this, _1));
  addEvents(signal_pipe_[0], POLLIN);
}
void PollSet::signal()
{
  boost::mutex::scoped_try_lock lock(signal_mutex_);

  if (lock.owns_lock())
  {
    char b = 0;
    if (write_signal(signal_pipe_[1], &b, 1) < 0)
    {
      // do nothing... this prevents warnings on gcc 4.3
    }
  }
}

void PollSet::onLocalPipeEvents(int events)
{
  if(events & POLLIN)
  {
    char b;
    while(read_signal(signal_pipe_[0], &b, 1) > 0)
    {
      //do nothing keep draining
    };
  }
}

PollSet构造函数创建了epoll监控句柄;创建了signal_pipe_,其中signal函数写pipe, onLocalPipeEvents读取pipe,函数放空没做实质性动作。

bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport)
{
  SocketInfo info;
  info.fd_ = fd;
  info.events_ = 0;
  info.transport_ = transport;
  info.func_ = update_func;
  {
    boost::mutex::scoped_lock lock(socket_info_mutex_);
    //插入到socket_info_监控map中
    bool b = socket_info_.insert(std::make_pair(fd, info)).second;

    //增加到epoll_监控中
    add_socket_to_watcher(epfd_, fd);
  }
  //触发一次本地pipe
  signal();
  return true;
}

bool PollSet::delSocket(int fd)
{
  boost::mutex::scoped_lock lock(socket_info_mutex_);
  M_SocketInfo::iterator it = socket_info_.find(fd);
  if (it != socket_info_.end())
  {
    socket_info_.erase(it);
    {
      boost::mutex::scoped_lock lock(just_deleted_mutex_);
      just_deleted_.push_back(fd);
    }

    del_socket_from_watcher(epfd_, fd);

    sockets_changed_ = true;
    signal();

    return true;
  }

  return false;
}

bool PollSet::addEvents(int sock, int events)
{
  boost::mutex::scoped_lock lock(socket_info_mutex_);

  M_SocketInfo::iterator it = socket_info_.find(sock);

  //更新监控的event
  it->second.events_ |= events;

  set_events_on_socket(epfd_, sock, it->second.events_);

  sockets_changed_ = true;
  signal();

  return true;
}

bool PollSet::delEvents(int sock, int events)
{
  boost::mutex::scoped_lock lock(socket_info_mutex_);

  M_SocketInfo::iterator it = socket_info_.find(sock);
  if (it != socket_info_.end())
  {
    //清空监控的event
    it->second.events_ &= ~events;
  }

  set_events_on_socket(epfd_, sock, it->second.events_);

  sockets_changed_ = true;
  signal();

  return true;
}

addSocket delSocket 调用poll底层接口增加删除监控socket

addEvents delEvents 调用poll底层接口增加删除监控socket的event

void PollSet::update(int poll_timeout)
{
  createNativePollset();
  
  // Poll across the sockets we're servicing
  //通过epfd_做epoll操作,查看变化的监控fd
  boost::shared_ptr<std::vector<socket_pollfd> > ofds = poll_sockets(epfd_, &ufds_.front(), ufds_.size(), poll_timeout);
  if (!ofds)
  {
    if (last_socket_error() != EINTR)
    {
      ROS_ERROR_STREAM("poll failed with error " << last_socket_error_string());
    }
  }
  else
  {
    //无错误
    for (std::vector<socket_pollfd>::iterator it = ofds->begin() ; it != ofds->end(); ++it)
    {
      int fd = it->fd;
      int revents = it->revents;
      SocketUpdateFunc func;
      TransportPtr transport;
      int events = 0;

      if (revents == 0)
      {
        continue;
      }
      {
        //有监控事件发生
        boost::mutex::scoped_lock lock(socket_info_mutex_);
        M_SocketInfo::iterator it = socket_info_.find(fd);
        // the socket has been entirely deleted
        if (it == socket_info_.end())
        {
          continue;
        }

        //在socket_info_查找到记录的信息
        const SocketInfo& info = it->second;

        // Store off the function and transport in case the socket is deleted from another thread
        func = info.func_;
        transport = info.transport_;
        events = info.events_;
      }

      if (func
          && ((events & revents)
              || (revents & POLLERR)
              || (revents & POLLHUP)
              || (revents & POLLNVAL)))
      {
        //如果刚刚删除,skip
        bool skip = false;
        if (revents & (POLLNVAL|POLLERR|POLLHUP))
        {
          boost::mutex::scoped_lock lock(just_deleted_mutex_);
          if (std::find(just_deleted_.begin(), just_deleted_.end(), fd) != just_deleted_.end())
          {
            skip = true;
          }
        }

        if (!skip)
        {
          //调用回调函数
          func(revents & (events|POLLERR|POLLHUP|POLLNVAL));
        }
      }
    }
  }

  boost::mutex::scoped_lock lock(just_deleted_mutex_);
  just_deleted_.clear();
}

PollSet::update(int poll_timeout)这是poll_set轮转的核心,通过poll_sockets底层操作获取到变化的监控文件以及事件,然后在socket_info_查找到记录的信息,调用回调函数(如果刚刚删除跳过调用回调函数)。

PollSet::update是怎么被调用的呢?请看下一部分。



三、poll_manager分析

涉及到poll_manager.h poll_manager.cpp,关键源码如下:

class ROSCPP_DECL PollManager
{
public:
  static const PollManagerPtr& instance();

  PollManager();
  ~PollManager();

  PollSet& getPollSet() { return poll_set_; }

  //boost signal2 增加 删除监听器
  boost::signals2::connection addPollThreadListener(const VoidFunc& func);
  void removePollThreadListener(boost::signals2::connection c);

  //start中创建线程threadFunc
  void start();
  void shutdown();
private:

  //线程中反复循环
  void threadFunc();

  PollSet poll_set_;  //pollset,组合了PollSet
  volatile bool shutting_down_;

  VoidSignal poll_signal_;  //pollsignal
  boost::recursive_mutex signal_mutex_;

  boost::thread thread_;
};

以上是PollManager类声明,源码实现如下:

const PollManagerPtr& PollManager::instance()
{
  static PollManagerPtr poll_manager = boost::make_shared<PollManager>();
  return poll_manager;
}

单例模式,返回PollManager句柄

void PollManager::start()
{
  shutting_down_ = false;
  thread_ = boost::thread(&PollManager::threadFunc, this);
}

void PollManager::threadFunc()
{
  while (!shutting_down_)
  {
    {
      //调用boost signal2的监听器,即通过addPollThreadListener removePollThreadListener增加删除的监听器
      boost::recursive_mutex::scoped_lock lock(signal_mutex_);
      poll_signal_();
    }

    if (shutting_down_)
    {
      return;
    }

    //调用poll_set的update
    poll_set_.update(100);
  }
}

start会创建线程,线程中循环调用poll_signal_()以及poll_set_.update(100);,这样完成了boost sinal监听器的调用以及poll_set中监控socket文件事件都会被监控。

boost::signals2::connection PollManager::addPollThreadListener(const VoidFunc& func)
{
  boost::recursive_mutex::scoped_lock lock(signal_mutex_);
  return poll_signal_.connect(func);
}

void PollManager::removePollThreadListener(boost::signals2::connection c)
{
  boost::recursive_mutex::scoped_lock lock(signal_mutex_);
  c.disconnect();
}

boost的signal2 增加删除监听器, threadFunc()中调用poll_signal_();时会触发监听器的fucn被调用



四、其他模块关系

在其他文件中如何使用poll_manager的?

init.cpp文件中
void start()
{
  ...
  PollManager::instance()->addPollThreadListener(checkForShutdown);
  ...
  PollManager::instance()->start();

}

void TopicManager::start()
{
  ...
  poll_manager_ = PollManager::instance();
  ...
  poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
}

void ConnectionManager::start()
{
  poll_manager_ = PollManager::instance();
  poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections, 
								this));
  ...
}

bool TransportTCP::initializeSocket()
{
    poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, _1), shared_from_this());
}

void TransportTCP::enableRead()
{
    ...
    poll_set_->addEvents(sock_, POLLIN);
    ...
}

void TransportTCP::enableWrite()
{
    ...
    poll_set_->addEvents(sock_, POLLOUT);
    ...
}

上述代码可以看到:

1.init.cpp中调用了PollManager::instance()->start();实例化了并启动了poll_manager;

2.通过addPollThreadListener增加了 checkForShutdown processPublishQueues removeDroppedConnections这三个回调会循环调用,检测关机、循环处理发布队列、去除无效连接。

3.TransportTCP通过调用addSocket addEvents实现socket监控,完成数据的收发。



五、总结

总得来看ros中poll_manager操作实现了epoll、pipe、以及boost signal2的封装:

epoll实现socket监控网络socket的变化,监控数据的收发;

boost signal2监听器实现检测关机、循环处理发布队列、去除无效连接的循环检测。

pipe目前没有实际用途;



版权声明:本文为weixin_45609638原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。