Reactor的概念

  • Post author:
  • Post category:其他




一、Reactor的概念

​ Reactor模式是一种事件驱动模式,由一个或多个并发输入源(input),一个消息分发处理器(Initiation Dispatcher),以及每个消息对应的处理器(Request Handler)构成。

​ Reactor在结构上类似于 消费者模式 ,但Reactor与消费者模式不同的是,它并没有像消费者模式一样使用Queue来做缓存,而是当一个Event 输入到 Initiation Dispatcher 之后,该Initiation Dispatcher 会根据 Event 的类型 ,分发给对应的 Request Handler 。

在这里插入图片描述



二、Reactor的优缺点

优点

​ 在Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events中是这么说的:

​ 1.Separation of concerns: The Reactor pattern decouples application-independent demultiplexing and dispatching mechanisms from application-specific hook method functionality. The application-independent mechanisms become reusable components that know how to demultiplex events and dispatch the appropriate hook methods defined by Event Handlers. In contrast, the application-specific functionality in a hook method knows how to perform a particular type of service.

​ 2.Improve modularity, reusability, and configurability of event-driven applications: The pattern decouples application functionality into separate classes. For instance, there are two separate classes in the logging server: one for establishing connections and another for receiving and processing logging records. This decoupling enables the reuse of the connection establishment class for different types of connection-oriented services (such as file transfer, remote login, and video-on-demand). Therefore, modifying or extending the functionality of the logging server only affects the implementation of the logging handler class.

​ 3.Improves application portability: The Initiation Dispatcher’s interface can be reused independently of the OS system calls that perform event demultiplexing. These system calls detect and report the occurrence of one or more events that may occur simultaneously on multiple sources of events. Common sources of events may in- clude I/O handles, timers, and synchronization objects. On UNIX platforms, the event demultiplexing system calls are called select and poll [1]. In the Win32 API [16], theWaitForMultipleObjects system call performs event demultiplexing.

​ 4.Provides coarse-grained concurrency control: The Reactor pattern serializes the invocation of event handlers at the level of event demultiplexing and dispatching within a process or thread. Serialization at the Initiation Dispatcher level often eliminates the need for more complicated synchronization or locking within an application process.

注:上面这些我并没有详细去了解是否是真的这么高效,但我觉得其中最有用的是 解决线程的切换、同步、数据的移动会引起性能问题。也就是说从性能的角度上,它最大的提升就是减少了性能的使用,即不需要每个Client对应一个线程。

缺点

​ 1.相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。

​ 2.Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用Proactor模式。



三、Poco Reactor模式

​ 根据上面对 Reactor 的了解,Poco也根据Reactor模式封装了 SocketReactor 。



SocketReactor

​ 在Poco的 Reactor模式中,SocketReactor 充当着 Initiation Dispatcher 的角色,根据Event事件,分发至对应的Handler中执行。

​ 根据调用 addEventHandler ,向Reactor 中,指定socket, 添加 监听的 EventNotification,同时添加 处理该Event的方法 。

​ 调用removeEventHandler 以移除之前添加的监听。

​ 通过 setTimeout 去设置 I/O select 的超时时间,当超过指定时间内都没有收到消息时,则判定为超时,停止 select 方法。



SocketNotification

​ 在 addEventHandler 中,需要设置 EventNotification ,其中EventNotification 指的是 SocketNotification ,Poco的Notification包含以下:



ReadableNotification:

监听 socket 是否变为可读的通知



WritableNotification:

监听 socket 是否在写入的通知



ErrorNotification:

监听 socket 是否发生错误的通知



TimeoutNotification:

在所有的event处理方法中都会注册该监听通知,实现该通知的方法必须重写 onTimeout 方法去实现超时的处理。



IdleNotification:

当 SocketReactor 中没有 socket 进行 Socket :: select() ,则触发事件



ShutdownNotification:

监听socket关闭的通知



关系图

在这里插入图片描述



reactor的简单实现代码

//Header File
#include <Poco/Runnable.h>
#include <Poco/Net/SocketReactor.h>
#include "Poco/Net/SocketNotification.h"
#include <Poco/Net/WebSocket.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/ServerSocket.h>
class ReactorTest : public Poco::Runnable
{
public:
    ReactorTest(string host, uint32_t port);
    ~ReactorTest();

    bool start();
    bool stop();

protected:
	// override from Runnable
    void run();
    // override from SocketReactor
    void onReadable(Poco::Net::ReadableNotification* pNf);
    void onTimeout(Poco::Net::TimeoutNotification* pTf);

private:
    WebSocket*  _ws;

    char _buffer[1024];
    int _flags;
    int _reBit;
    std::string _payload;
    string _host;
    uint32_t _port;

    Poco::Thread _thread;
    std::unique_ptr<Poco::Net::SocketReactor> _reactor;
    std::unique_ptr<Poco::Timestamp> _timer;
    Poco::Timestamp _time;

    const int _reactorTimeout = 50000; //50ms check ticket
 };
//Cpp File
#include "ReactorTest.h"
#include "Poco/Observer.h"
ReactorTest::ReactorTest(string host, uint32_t port) :_host(host), _port(port)
{

}

ReactorTest::~ReactorTest()
{
    stop();
}

bool ReactorTest::start()
{
    try
    {
        const std::string urlPrefix("ws://");   //Need to change to TLS soon after
        Poco::URI  restful_host(urlPrefix + _host);
        HTTPClientSession cs(restful_host.getHost(), _port);
        HTTPRequest request(HTTPRequest::HTTP_GET, "/", "HTTP/1.1");
        HTTPResponse response;
        _ws = new WebSocket(cs, request, response); // Causes the timeout
        if (_callback != nullptr)
        {
            _callback->Connected();
        }
    }
    catch (Exception ex)
    {
      
    }

    if (_ws != nullptr)
    {
        _reactor.reset(new SocketReactor());
        _reactor->setTimeout(Poco::Timespan(_reactorTimeout));
        _reactor->addEventHandler(*_ws, Poco::Observer<MaxWebSocket, Poco::Net::ReadableNotification>(*this, &MaxWebSocket::onReadable));
        _reactor->addEventHandler(*_ws, Poco::Observer<MaxWebSocket, Poco::Net::TimeoutNotification>(*this, &MaxWebSocket::onTimeout));
        _thread.start(*this);
        _time.update();
        return true;
    }
    else
    {
        return false;
    }
   
}

bool ReactorTest::stop()
{
    if (_reactor != nullptr)
    {
        _reactor->removeEventHandler(*_ws, Poco::Observer<MaxWebSocket, Poco::Net::ReadableNotification>(*this, &MaxWebSocket::onReadable));
        _reactor->removeEventHandler(*_ws, Poco::Observer<MaxWebSocket, Poco::Net::TimeoutNotification>(*this, &MaxWebSocket::onTimeout));
        _reactor->stop();
        _thread.join(800);
        _ws->shutdown();
        _reactor = nullptr;
        return true;
    }
    else
    {
        return false;
    }
}

void ReactorTest::run()
{
    _reactor->run();
}
void  ReactorTest::onReadable(Poco::Net::ReadableNotification* pNf)
{
    poco_assert_dbg(pNf->socket() == *_ws);
    pNf->release();
    _reBit = _ws->receiveFrame(_buffer, sizeof(_buffer), _flags);
    if (_reBit < 0)
    {
        return;
    }
    _buffer[_reBit] = '\0';
    std::string resp(_buffer);

    //TODO: Parse json
}
void  ReactorTest::onTimeout(Poco::Net::TimeoutNotification* pTf)
{
    pTf->release();
   //TODO: Timeout handle 。It can also be executed Heartbeat
}



版权声明:本文为nb_zsy原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。