ros源码分析(二)topic调用了publish之后发生了什么?

  • Post author:
  • Post category:其他


隔了很久才写第二篇实在是抱歉。继续从ros wiki的角度下手,最开始接触的publisher和subscriber的demo十分经典,而进程间通信这一过程本身就是它的最大卖点,发布器、接收器这样的名词以及advertise、publish这样简单易懂的API更是焕然一新,回过头来研究它的实现方式,是从了解、掌握到熟悉ros的必要过程。

publisher的实现,在demo中的实现为如下,那么NodeHandle就是第一步需要了解的,第二步将是Publisher的实现以及publish函数的内部调用。

NodeHandle n;
Publisher pub = n.advertise<std_msgs::String>("topic",1000);

一、NodeHandle的各种准备工作

1.构造函数;先转到ros_comm/roscpp下面的node_handle.cpp,它的构造函数为:

NodeHandle::NodeHandle(const std::string& ns, const M_string& remappings)
  : namespace_(this_node::getNamespace())
  , callback_queue_(0)
  , collection_(0)
{
  std::string tilde_resolved_ns;

  // 在这里,是之前一直疑惑发布的话题名称为什么有些带了节点名有些没有的起因,如果给句柄命名并以'~'开头,它就将节点名加上'/'加在话题名之前
  if (!ns.empty() && ns[0] == '~')
    tilde_resolved_ns = names::resolve(ns);
  else
    tilde_resolved_ns = ns;

  construct(tilde_resolved_ns, true);
  
  //remappings所对应的是一个维护节点名称字符串的哈希表
  initRemappings(remappings);
}

其中的construct函数,首先是调用了ros::start()函数,它位于init.cpp中,在此需要关注的一部分内容如下,这些管理者均为单例模式:

TopicManager::instance()->start();
ServiceManager::instance()->start();
ConnectionManager::instance()->start();
PollManager::instance()->start();
XMLRPCManager::instance()->start();
const TopicManagerPtr& TopicManager::instance()
{
  //凡是类名加上Ptr的均是对象的智能指针,而它们都是静态对象,在每一个句柄对象的实现都是增加引用次数而已
  static TopicManagerPtr topic_manager = boost::make_shared<TopicManager>();
  return topic_manager;
}

①TopicManager的start函数,它从其他三个类中得到了这个类的实例,建立进程间通信需要这三个类的协助。

void TopicManager::start()
{
  boost::mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
  shutting_down_ = false;

  poll_manager_ = PollManager::instance();//负责处理发布消息的缓冲队列的消息塞给网络连接
  connection_manager_ = ConnectionManager::instance();//负责建立进程间的网络通信连接
  xmlrpc_manager_ = XMLRPCManager::instance();//负责通知rosmaster

  xmlrpc_manager_->bind("publisherUpdate", boost::bind(&TopicManager::pubUpdateCallback, this, _1, _2));
  xmlrpc_manager_->bind("requestTopic", boost::bind(&TopicManager::requestTopicCallback, this, _1, _2));
  xmlrpc_manager_->bind("getBusStats", boost::bind(&TopicManager::getBusStatsCallback, this, _1, _2));
  xmlrpc_manager_->bind("getBusInfo", boost::bind(&TopicManager::getBusInfoCallback, this, _1, _2));
  xmlrpc_manager_->bind("getSubscriptions", boost::bind(&TopicManager::getSubscriptionsCallback, this, _1, _2));
  xmlrpc_manager_->bind("getPublications", boost::bind(&TopicManager::getPublicationsCallback, this, _1, _2));

//建立信号与槽的联系,具体来说是使processPublishQueue被定时调用 poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));
}

②ConnectionManager位于connection_manager.cpp,它实实在在地执行了进程间TCP的通信,是真正的执行者,这一部分在下面将仔细研读。

void ConnectionManager::start()
{
  poll_manager_ = PollManager::instance();
  poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections, 
								this));

  // Bring up the TCP listener socket
  tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
  if (!tcpserver_transport_->listen(network::getTCPROSPort(), 
				    MAX_TCPROS_CONN_QUEUE, 
				    boost::bind(&ConnectionManager::tcprosAcceptConnection, this, _1)))
  {
    ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
    ROS_BREAK();
  }

  // Bring up the UDP listener socket
  udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
  if (!udpserver_transport_->createIncoming(0, true))
  {
    ROS_FATAL("Listen failed");
    ROS_BREAK();
  }
}

③PollManager是用来管理发布器的消息发布的,所利用的是信号与槽的机制,在publish部分将有大用。

void PollManager::start()
{
  shutting_down_ = false;
  thread_ = boost::thread(&PollManager::threadFunc, this);
}

写到这里,nodehandle与rosmaster的关联在哪呢?从上一部分的探索来看,rosmaster才是掌管整体的话题库的管理者,nodehandle注册的话题名必须要得到master的认可,而这里并没有与rosmaster相关联的实体。因此引入第四点,也就是XMLRPCmanager。它通过xml为形式进行客户端与服务端的连接。

打开XMLRPCmanager的start函数,便可以看到一个循环在不断轮转,如下所示,它不断地将新注册的话题与发布器信息添加到处理队列中,而xmlrpc存在一个处理器,也在不断发布处理队列的消息给rosmaster。

void XMLRPCManager::serverThreadFunc()
{
  disableAllSignalsInThisThread();

  while(!shutting_down_)
  {
    {
      boost::mutex::scoped_lock lock(added_connections_mutex_);
      S_ASyncXMLRPCConnection::iterator it = added_connections_.begin();
      S_ASyncXMLRPCConnection::iterator end = added_connections_.end();
      for (; it != end; ++it)
      {
        (*it)->addToDispatch(server_.get_dispatch());
        connections_.insert(*it);
      }

      added_connections_.clear();
    }
   。。。。

nodehandle这一端是客户端,至于server端,我们在第一部分的roscore的分析可知,在rosmaster启动时,通过rosgraph的ThreadingXMLRPCServer启动了服务端,这样便实现了Python编写的rosmaster与C++编写的nodehandle的联系。

2.第二个要分析的是advertise函数;它的函数原型为:

template <class M>
    Publisher advertise(const std::string& topic, uint32_t queue_size, bool latch = false)
    {
      AdvertiseOptions ops;
      ops.template init<M>(topic, queue_size);
      ops.latch = latch;
      return advertise(ops);
    }

先定位到advertise_options.h,它具备的要素如下,很多费解的名称均为typedef,位于forward.h中。

  template <class M>
  void init(const std::string& _topic, uint32_t _queue_size,
            const SubscriberStatusCallback& _connect_cb = SubscriberStatusCallback(),
            const SubscriberStatusCallback& _disconnect_cb = SubscriberStatusCallback())
  {
    topic = _topic;//话题名称
    queue_size = _queue_size;//消息缓冲队列的长度
    connect_cb = _connect_cb;//
    disconnect_cb = _disconnect_cb;
    md5sum = message_traits::md5sum<M>();//校验码
    datatype = message_traits::datatype<M>();
    message_definition = message_traits::definition<M>();
    has_header = message_traits::hasHeader<M>();
  }

advertise函数首先解决了进程内部的通信问题。可能存在比较费解的声明~

typedef boost::function<void(const SingleSubscriberPublisher&)> SubscriberStatusCallback;
Publisher NodeHandle::advertise(AdvertiseOptions& ops)
{
  //考察nodehandle是私有的还是公有的,决定最终的话题名
  ops.topic = resolveName(ops.topic);
  if (ops.callback_queue == 0)
  {
    if (callback_queue_)
    {
      ops.callback_queue = callback_queue_;
    }
    else
    {
      ops.callback_queue = getGlobalCallbackQueue();
    }
  }
  //进程内部进行话题传输
  SubscriberCallbacksPtr callbacks(boost::make_shared<SubscriberCallbacks>(ops.connect_cb, ops.disconnect_cb, 
                                                                           ops.tracked_object, ops.callback_queue));

  if (TopicManager::instance()->advertise(ops, callbacks))
  {
    Publisher pub(ops.topic, ops.md5sum, ops.datatype, *this, callbacks);

    {
      boost::mutex::scoped_lock lock(collection_->mutex_);
      collection_->pubs_.push_back(pub.impl_);
    }

    return pub;
  }

  return Publisher();
}

接下来每个层级所调用的函数都值得仔细端详 。首先是上文中的advertise所带入的topicmanager,它先申请了一个发布器pub,再建立一个临时的xmlrpc客户端。

bool TopicManager::advertise(const AdvertiseOptions& ops, const SubscriberCallbacksPtr& callbacks)
{
  if (ops.datatype == "*")
  {
    std::stringstream ss;
    ss << "Advertising with * as the datatype is not allowed.  Topic [" << ops.topic << "]";
    throw InvalidParameterException(ss.str());
  }

  if (ops.md5sum == "*")
  {
    std::stringstream ss;
    ss << "Advertising with * as the md5sum is not allowed.  Topic [" << ops.topic << "]";
    throw InvalidParameterException(ss.str());
  }

  if (ops.md5sum.empty())
  {
    throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty md5sum");
  }

  if (ops.datatype.empty())
  {
    throw InvalidParameterException("Advertising on topic [" + ops.topic + "] with an empty datatype");
  }

  if (ops.message_definition.empty())
  {
    ROS_WARN("Advertising on topic [%s] with an empty message definition.  Some tools (e.g. rosbag) may not work correctly.", ops.topic.c_str());
  }

  PublicationPtr pub;

  {
    boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);

    if (isShuttingDown())
    {
      return false;
    }
    //从已发布的话题中查找有没有未初始化的发布器(废物利用?)
    pub = lookupPublicationWithoutLock(ops.topic);
    if (pub && pub->getNumCallbacks() == 0)
    {
      pub.reset();
    }
    
    //不能是非空指针!
    if (pub)
    {
      //校验码要一致
      if (pub->getMD5Sum() != ops.md5sum)
      {
        ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
                  ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
        return false;
      }
      //将callback对象加入到本进程内的监听队列中
      pub->addCallbacks(callbacks);

      return true;
    }
    //当然,正常来说是进程间的通信,所以程序不会在上面返回的,这里新申请一个发布器
    pub = PublicationPtr(boost::make_shared<Publication>(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, ops.latch, ops.has_header));
    pub->addCallbacks(callbacks);
    advertised_topics_.push_back(pub);
  }


  {
    boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
    advertised_topic_names_.push_back(ops.topic);
  }

  // Check whether we've already subscribed to this topic.  If so, we'll do
  // the self-subscription here, to avoid the deadlock that would happen if
  // the ROS thread later got the publisherUpdate with its own XMLRPC URI.
  // The assumption is that advertise() is called from somewhere other
  // than the ROS thread.
  bool found = false;
  SubscriptionPtr sub;
  {
    boost::mutex::scoped_lock lock(subs_mutex_);

    for (L_Subscription::iterator s = subscriptions_.begin();
         s != subscriptions_.end() && !found; ++s)
    {
      if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
      {
        found = true;
        sub = *s;
        break;
      }
    }
  }
  //本进程内部的简单链接
  if(found)
  {
    sub->addLocalConnection(pub);
  }
  //制作一个xmlrpc的客户端,把话题名、节点名作为参数传入,告诉master
  XmlRpcValue args, result, payload;
  args[0] = this_node::getName();
  args[1] = ops.topic;
  args[2] = ops.datatype;
  args[3] = xmlrpc_manager_->getServerURI();
  master::execute("registerPublisher", args, result, payload, true);

  return true;
}

一般来说,进程内的通信不需要master参与,而进程间的情况,上述函数会运行到末尾,从而进入到master::execute函数。

bool execute(const std::string& method, const XmlRpc::XmlRpcValue& request, XmlRpc::XmlRpcValue& response, XmlRpc::XmlRpcValue& payload, bool wait_for_master)
{
  ros::SteadyTime start_time = ros::SteadyTime::now();

  //获得我方的端口号
  std::string master_host = getHost();
  uint32_t master_port = getPort();

  //创建或复用一个客户端,复用的原则是缓存队列中在一段时间内有第二次的调用,否则会被删掉
  XmlRpc::XmlRpcClient *c = XMLRPCManager::instance()->getXMLRPCClient(master_host, master_port, "/");
  bool printed = false;
  bool slept = false;
  bool ok = true;
  bool b = false;
  do
  {
    {
#if defined(__APPLE__)
      boost::mutex::scoped_lock lock(g_xmlrpc_call_mutex);
#endif
      //重头戏,将节点信息化为request发给server
      b = c->execute(method.c_str(), request, response);
    }

    ok = !ros::isShuttingDown() && !XMLRPCManager::instance()->isShuttingDown();

    if (!b && ok)
    {
      if (!printed && wait_for_master)
      {
        ROS_ERROR("[%s] Failed to contact master at [%s:%d].  %s", method.c_str(), master_host.c_str(), master_port, wait_for_master ? "Retrying..." : "");
        printed = true;
      }

      if (!wait_for_master)
      {
        XMLRPCManager::instance()->releaseXMLRPCClient(c);
        return false;
      }

      if (!g_retry_timeout.isZero() && (ros::SteadyTime::now() - start_time) >= g_retry_timeout)
      {
        ROS_ERROR("[%s] Timed out trying to connect to the master after [%f] seconds", method.c_str(), g_retry_timeout.toSec());
        XMLRPCManager::instance()->releaseXMLRPCClient(c);
        return false;
      }

      ros::WallDuration(0.05).sleep();
      slept = true;
    }
    else
    {
      if (!XMLRPCManager::instance()->validateXmlrpcResponse(method, response, payload))
      {
        XMLRPCManager::instance()->releaseXMLRPCClient(c);

        return false;
      }

      break;
    }

    ok = !ros::isShuttingDown() && !XMLRPCManager::instance()->isShuttingDown();
  } while(ok);

  if (ok && slept)
  {
    ROS_INFO("Connected to master at [%s:%d]", master_host.c_str(), master_port);
  }

  XMLRPCManager::instance()->releaseXMLRPCClient(c);

  return b;
}

重头戏在于 XmlRpcClient::execute函数,它在不断地将新建的request加入到队列中,然后在XmlRpcDispatch中,有一个work函数,它不断地将累积的询问送给server,将异步的思想贯彻到底。

bool
XmlRpcClient::execute(const char* method, XmlRpcValue const& params, XmlRpcValue& result)
{
  XmlRpcUtil::log(1, "XmlRpcClient::execute: method %s (_connectionState %d).", method, _connectionState);

  // This is not a thread-safe operation, if you want to do multithreading, use separate
  // clients for each thread. If you want to protect yourself from multiple threads
  // accessing the same client, replace this code with a real mutex.
  if (_executing)
    return false;

  _executing = true;
  ClearFlagOnExit cf(_executing);

  _sendAttempts = 0;
  _isFault = false;
  //建立client与server的连接
  if ( ! setupConnection())
    return false;
  //将params也就是新增的话题信息加入到request_中统一发送
  if ( ! generateRequest(method, params))
    return false;

  result.clear();
  double msTime = -1.0;   // Process until exit is called
  _disp.work(msTime);

  if (_connectionState != IDLE || ! parseResponse(result))
    return false;

  XmlRpcUtil::log(1, "XmlRpcClient::execute: method %s completed.", method);
  _response = "";
  return true;
}
    // The xml-encoded request, http header of response, and response xml
    std::string _request;
    std::string _header;
    std::string _response;

因此,advertise函数的最终目的是将新注册的话题与节点信息告诉rosmaster并在master的哈希表中得以存储,节点信息中包含了节点名以及它的端口号,这样便可以方便地建立进程间的连接。

二、Publisher的publish函数,它充分使用了第一部分中初始化的成员。在第一部分结束后,rosmaster已经获悉了新注册了的话题和节点,接下来的话题发布将会继续彻底贯彻异步通信这一原则。

//SerializedMessage的声明位于roscpp_core/roscpp_serialization中,它用来将结构化的rosmsg与socket通信消息相互转化

void Publisher::publish(const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) const
{
  //当我们先于advertise而调用本函数,则会报这个错误
  if (!impl_)
  {
    ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
    return;
  }

  if (!impl_->isValid())
  {
    ROS_ASSERT_MSG(false, "Call to publish() on an invalid Publisher (topic [%s])", impl_->topic_.c_str());
    return;
  }

  TopicManager::instance()->publish(impl_->topic_, serfunc, m);
}

因此转到TopicManager::publish函数:

void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
{
  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);

  if (isShuttingDown())
  {
    return;
  }

  PublicationPtr p = lookupPublicationWithoutLock(topic);//根据话题捕捉一个发布器
  if (p->hasSubscribers() || p->isLatching())//如果拥有接收器或该话题是持续拉起的状态,每个publisher所拥有的接收器数量在nodehandle的subscribe函数中会一一注册于容器中,我们将在第三篇中对它详细展开
  {
    ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());

    // Determine what kinds of subscribers we're publishing to.  If they're intraprocess with the same C++ type we can
    // do a no-copy publish.
    //需要判断接收器是否是来自同一本地,进程内通信可以避免远程服务调用的所有开销
    bool nocopy = false;
    bool serialize = false;

    // We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
    if (m.type_info && m.message)
    {
      p->getPublishTypes(serialize, nocopy, *m.type_info);
    }
    else
    {
      serialize = true;
    }

    if (!nocopy)
    {
      m.message.reset();
      m.type_info = 0;
    }

    if (serialize || p->isLatching())
    {
      SerializedMessage m2 = serfunc();
      m.buf = m2.buf;
      m.num_bytes = m2.num_bytes;
      m.message_start = m2.message_start;
    }

    p->publish(m);//将msg扔进消息队列中先不管了

    // If we're not doing a serialized publish we don't need to signal the pollset.  The write()
    // call inside signal() is actually relatively expensive when doing a nocopy publish.
    if (serialize)//也就是说,如果是一个进程内的通信,就直接在进程所共享的队列里存取就行,但是进程间通信就需要使用到pollmanager的信号槽机制了
    {
      poll_manager_->getPollSet().signal();
    }
  }
  else
  {
    p->incrementSequence();
  }
}

那么,需要展开阅读一番最后pollset的signal的部分,它使用的是进程间的管道通信,而不是socket的传送。

inline ssize_t write_signal(const signal_fd_t &signal, const void *buffer, const size_t &nbyte) {
		return write(signal, buffer, nbyte);
	}

这个信号在PollSet构造时调用的addSocket函数中就已经注册在系统中了,具体的过程等看完《Unix系统》再详细解读。

之后,再转到Publication类,可以看到topic_manager中继续调用的publish函数:

void Publication::publish(SerializedMessage& m)
{
  if (m.message)
  {
    boost::mutex::scoped_lock lock(subscriber_links_mutex_);
    V_SubscriberLink::const_iterator it = subscriber_links_.begin();
    V_SubscriberLink::const_iterator end = subscriber_links_.end();
    for (; it != end; ++it)
    {
      const SubscriberLinkPtr& sub = *it;
      if (sub->isIntraprocess())//关于继承关系的恶趣味,当然,当接收器是同一本地的进程内就直接扔进对方的消息队列中
      {
        sub->enqueueMessage(m, false, true);
      }
    }

    m.message.reset();
  }

  if (m.buf)
  {
    boost::mutex::scoped_lock lock(publish_queue_mutex_);
    publish_queue_.push_back(m);//这样,扔进去就不用管了
  }
}

接下来我们将从以下两个方面来详细阅读这个publish函数的深入调用。

1.IntraProcessSubscriberLink::enqueueMessage函数

IntraProcessPublisherLinkPtr subscriber_;//误导性质强的声明

也就是说,它实际上只是调用了IntraProcessPublisherLink::handleMessage,进一步地,转到Subscription类:

uint32_t Subscription::handleMessage(const SerializedMessage& m, bool ser, bool nocopy, const boost::shared_ptr<M_string>& connection_header, const PublisherLinkPtr& link)
{
  boost::mutex::scoped_lock lock(callbacks_mutex_);

  uint32_t drops = 0;

  // Cache the deserializers by type info.  If all the subscriptions are the same type this has the same performance as before.  If
  // there are subscriptions with different C++ type (but same ROS message type), this now works correctly rather than passing
  // garbage to the messages with different C++ types than the first one.
  cached_deserializers_.clear();

  ros::Time receipt_time = ros::Time::now();

  for (V_CallbackInfo::iterator cb = callbacks_.begin();
       cb != callbacks_.end(); ++cb)
  {
    const CallbackInfoPtr& info = *cb;

    ROS_ASSERT(info->callback_queue_);

    const std::type_info* ti = &info->helper_->getTypeInfo();

    if ((nocopy && m.type_info && *ti == *m.type_info) || (ser && (!m.type_info || *ti != *m.type_info)))
    {
      MessageDeserializerPtr deserializer;//开始将结构化的rosmsg解析为数据传输的特殊格式

      V_TypeAndDeserializer::iterator des_it = cached_deserializers_.begin();
      V_TypeAndDeserializer::iterator des_end = cached_deserializers_.end();
      for (; des_it != des_end; ++des_it)
      {
        if (*des_it->first == *ti)
        {
          deserializer = des_it->second;
          break;
        }
      }

      if (!deserializer)
      {
        deserializer = boost::make_shared<MessageDeserializer>(info->helper_, m, connection_header);
        cached_deserializers_.push_back(std::make_pair(ti, deserializer));
      }

      bool was_full = false;
      bool nonconst_need_copy = false;
      if (callbacks_.size() > 1)
      {
        nonconst_need_copy = true;
      }

      info->subscription_queue_->push(info->helper_, deserializer, info->has_tracked_object_, info->tracked_object_, nonconst_need_copy, receipt_time, &was_full);

      if (was_full)
      {
        ++drops;
      }
      else
      {
        info->callback_queue_->addCallback(info->subscription_queue_, (uint64_t)info.get());
      }
    }
  }

  // measure statistics
  statistics_.callback(connection_header, name_, link->getCallerID(), m, link->getStats().bytes_received_, receipt_time, drops > 0);

  // If this link is latched, store off the message so we can immediately pass it to new subscribers later
  if (link->isLatched())
  {
    LatchInfo li;
    li.connection_header = connection_header;
    li.link = link;
    li.message = m;
    li.receipt_time = receipt_time;
    latched_messages_[link] = li;
  }

  cached_deserializers_.clear();

  return drops;
}

2.当消息被放置在publish_queue_,扔进去之后,接管的是poll manager,在构造函数中,就已经说明,在循环中会不断调用Publication::enqueueMessage函数。

void TopicManager::processPublishQueues()
{
  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);

  V_Publication::iterator it = advertised_topics_.begin();
  V_Publication::iterator end = advertised_topics_.end();
  for (; it != end; ++it)
  {
    const PublicationPtr& pub = *it;
    pub->processPublishQueue();
  }
}

在topic manager开始工作时,poll manager将该函数作为信号的槽,与它连接的是poll_signal_这个信号。

poll_manager_->addPollThreadListener(boost::bind(&TopicManager::processPublishQueues, this));

接下来,需要了解一下signal2的知识。signals2基于Boost的另一个库signals,实现了线程安全的观察者模式。在signals2中,观察者模式被称为信号/插槽,它是一种函数回调机制,一个信号关联了多个插槽,当信号发出时,所有关联它的插槽都会被调用。

boost::signals2::connection PollManager::addPollThreadListener(const VoidFunc& func)
{
  boost::recursive_mutex::scoped_lock lock(signal_mutex_);
  return poll_signal_.connect(func);
}

也就是说,在之前提到的PollManager::threadFunc函数轮转过程当中,会不断发生信号,使TopicManager::processPublishQueues函数响应。同时,会调用PollSet::update函数。因此再转到PollSet类。

PollSet类其实是对socket连接事件的直接管理,我们处理它必须格外小心。

①createNativePollset

PollSet::update函数接着调用了poll_sockets函数,这一块的定义在io.cpp中,在我看来是利用poll()函数对所需要的文件描述符加以监听,这个函数如果返回负数或0则说明出错了,如果是poll()正常返回,则我们记录下相应的文件描述符。

typedef struct pollfd socket_pollfd;

pollfd包含在Linux系统中socket事件的信息,具体上有文件描述符与事件掩码。

②func

这个在两个地方出现过,另一个是ConnectionManager::removeDroppedConnections,用来清理连接的。

在解决这个问题之后,进一步地,connection_manager管理着真正的连接问题,它用一个set管理着所有的连接,维护它即是新增或删除条目的过程。当然,为了把异步贯彻到底,不在通信建立之前,是根本不知道谁在收听这个话题的。

typedef std::set<ConnectionPtr> S_Connection;

那么, connect manager在什么地方被激发的呢?这在一开始阅读的时候也很令人费解,与advertise的master联系一样,是理解的难点之一。其实,它与之前的publish函数的联系就是下面通过poll_manager来联通的,当它开始工作时,

void ConnectionManager::start()
{
  poll_manager_ = PollManager::instance();
  poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections,
								this));

  // Bring up the TCP listener socket
  tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
  if (!tcpserver_transport_->listen(network::getTCPROSPort(),
				    MAX_TCPROS_CONN_QUEUE,
				    boost::bind(&ConnectionManager::tcprosAcceptConnection, this, _1)))
  {
    ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
    ROS_BREAK();
  }

  // Bring up the UDP listener socket
  udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
  if (!udpserver_transport_->createIncoming(0, true))
  {
    ROS_FATAL("Listen failed");
    ROS_BREAK();
  }
}

bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
{
  is_server_ = true;
  accept_cb_ = accept_cb;

  if (s_use_ipv6_)
  {
    sock_ = socket(AF_INET6, SOCK_STREAM, 0);
    sockaddr_in6 *address = (sockaddr_in6 *)&server_address_;
    address->sin6_family = AF_INET6;
    address->sin6_addr = isOnlyLocalhostAllowed() ? 
                         in6addr_loopback : 
                         in6addr_any;
    address->sin6_port = htons(port);
    sa_len_ = sizeof(sockaddr_in6);
  }
  else
  {
    sock_ = socket(AF_INET, SOCK_STREAM, 0);
    sockaddr_in *address = (sockaddr_in *)&server_address_;
    address->sin_family = AF_INET;
    address->sin_addr.s_addr = isOnlyLocalhostAllowed() ? 
                               htonl(INADDR_LOOPBACK) : 
                               INADDR_ANY;
    address->sin_port = htons(port);
    sa_len_ = sizeof(sockaddr_in);
  }

  if (sock_ <= 0)
  {
    ROS_ERROR("socket() failed with error [%s]", last_socket_error_string());
    return false;
  }


  if (bind(sock_, (sockaddr *)&server_address_, sa_len_) < 0)
  {
    ROS_ERROR("bind() failed with error [%s]", last_socket_error_string());
    return false;
  }

  ::listen(sock_, backlog);
  getsockname(sock_, (sockaddr *)&server_address_, &sa_len_);

  switch (server_address_.ss_family)
  {
    case AF_INET:
      server_port_ = ntohs(((sockaddr_in *)&server_address_)->sin_port);
      break;
    case AF_INET6:
      server_port_ = ntohs(((sockaddr_in6 *)&server_address_)->sin6_port);
      break;
  }

  if (!initializeSocket())
  {
    return false;
  }

  if (!(flags_ & SYNCHRONOUS))
  {
    enableRead();
  }

  return true;
}

void ConnectionManager::addConnection(const ConnectionPtr& conn)
{
  boost::mutex::scoped_lock lock(connections_mutex_);

  connections_.insert(conn);
  conn->addDropListener(boost::bind(&ConnectionManager::onConnectionDropped, this, _1));
}

当初始化连接时,它将会把TCP通信的读写动作进行注册。

void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func)
{
  ROS_ASSERT(transport);

  transport_ = transport;
  header_func_ = header_func;
  is_server_ = is_server;

  transport_->setReadCallback(boost::bind(&Connection::onReadable, this, _1));
  transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, _1));
  transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, _1));

  if (header_func)
  {
    read(4, boost::bind(&Connection::onHeaderLengthRead, this, _1, _2, _3, _4));
  }
}

void Connection::readTransport()
{
  boost::recursive_mutex::scoped_try_lock lock(read_mutex_);

  if (!lock.owns_lock() || dropped_ || reading_)
  {
    return;
  }

  reading_ = true;

  while (!dropped_ && has_read_callback_)
  {
    ROS_ASSERT(read_buffer_);
    uint32_t to_read = read_size_ - read_filled_;
    if (to_read > 0)
    {
      int32_t bytes_read = transport_->read(read_buffer_.get() + read_filled_, to_read);
      ROS_DEBUG_NAMED("superdebug", "Connection read %d bytes", bytes_read);
      if (dropped_)
      {
        return;
      }
      else if (bytes_read < 0)
      {
        // Bad read, throw away results and report error
        ReadFinishedFunc callback;
        callback = read_callback_;
        read_callback_.clear();
        read_buffer_.reset();
        uint32_t size = read_size_;
        read_size_ = 0;
        read_filled_ = 0;
        has_read_callback_ = 0;

        if (callback)
        {
          callback(shared_from_this(), read_buffer_, size, false);
        }

        break;
      }

      read_filled_ += bytes_read;
    }

    ROS_ASSERT((int32_t)read_size_ >= 0);
    ROS_ASSERT((int32_t)read_filled_ >= 0);
    ROS_ASSERT_MSG(read_filled_ <= read_size_, "read_filled_ = %d, read_size_ = %d", read_filled_, read_size_);

    if (read_filled_ == read_size_ && !dropped_)
    {
      ReadFinishedFunc callback;
      uint32_t size;
      boost::shared_array<uint8_t> buffer;

      ROS_ASSERT(has_read_callback_);

      // store off the read info in case another read() call is made from within the callback
      callback = read_callback_;
      size = read_size_;
      buffer = read_buffer_;
      read_callback_.clear();
      read_buffer_.reset();
      read_size_ = 0;
      read_filled_ = 0;
      has_read_callback_ = 0;

      ROS_DEBUG_NAMED("superdebug", "Calling read callback");
      callback(shared_from_this(), buffer, size, true);
    }
    else
    {
      break;
    }
  }

  if (!has_read_callback_)
  {
    transport_->disableRead();
  }

  reading_ = false;
}

void Connection::writeTransport()
{
  boost::recursive_mutex::scoped_try_lock lock(write_mutex_);

  if (!lock.owns_lock() || dropped_ || writing_)
  {
    return;
  }

  writing_ = true;
  bool can_write_more = true;

  while (has_write_callback_ && can_write_more && !dropped_)
  {
    uint32_t to_write = write_size_ - write_sent_;
    ROS_DEBUG_NAMED("superdebug", "Connection writing %d bytes", to_write);
    int32_t bytes_sent = transport_->write(write_buffer_.get() + write_sent_, to_write);
    ROS_DEBUG_NAMED("superdebug", "Connection wrote %d bytes", bytes_sent);

    if (bytes_sent < 0)
    {
      writing_ = false;
      return;
    }

    write_sent_ += bytes_sent;

    if (bytes_sent < (int)write_size_ - (int)write_sent_)
    {
      can_write_more = false;
    }

    if (write_sent_ == write_size_ && !dropped_)
    {
      WriteFinishedFunc callback;

      {
        boost::mutex::scoped_lock lock(write_callback_mutex_);
        ROS_ASSERT(has_write_callback_);
        // Store off a copy of the callback in case another write() call happens in it
        callback = write_callback_;
        write_callback_ = WriteFinishedFunc();
        write_buffer_ = boost::shared_array<uint8_t>();
        write_sent_ = 0;
        write_size_ = 0;
        has_write_callback_ = 0;
      }

      ROS_DEBUG_NAMED("superdebug", "Calling write callback");
      callback(shared_from_this());
    }
  }

  {
    boost::mutex::scoped_lock lock(write_callback_mutex_);
    if (!has_write_callback_)
    {
      transport_->disableWrite();
    }
  }

  writing_ = false;
}

果然已经摸清楚CSDN吞草稿的尿性了。。。sigh,在这一部分的话题发布环节,有一些没有搞清楚的,我们将继续探讨,并且在第三部分(接收器)与ros::spin函数一并进行补充。另外,作为菜狗的我,在写博客的同时也在阅读《Linux环境高级编程》和《Linux网络通信》,里面的一些感悟会让我写的内容越来越详实、正确,与大家共勉~



版权声明:本文为weixin_42048023原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。