Reactor模式以及Poco SocketReactor 源码浅析

"reactor"

Posted by leiyiming on September 23, 2017

常见的五种IO模式


服务器端编程经常需要构造高性能的IO模型,Unix下可用的IO模型有五种:

  1. 阻塞式IO
  2. 非阻塞式IO
  3. IO复用(select/poll/epoll)
  4. 信号驱动式IO(SIGIO)
  5. 异步IO(Asynchronous IO)

同步和异步

描述的是用户线程调用IO操作相关的系统调用时,是否需要等待内核IO操作完成。

区别

  • 同步IO操作:需要等待内核IO操作完成。用户线程在调用系统调用时,会向内核发起IO操作的请求,内核会立即执行IO操作,只有IO操作完成时才会返回。

  • 异步IO操作:不需要等待内核IO操作完成。用户线程调用系统调用时,向内核发起IO操作请求后,立即返回。内核接收到请求后会自行执行IO操作,当数据就绪时,直接将数据从内核缓冲区复制到用户缓冲区,并通知用户线程。

异步IO需要操作系统支持,目前有Lunix 的 AIO 和 Windows 的 IOCP 。

阻塞与非阻塞式IO

描述的是内核的IO操作是否需要等待数据就绪。

区别

 
阻 塞IO 内核会等待直到网络数据到达并复制到应用进程的缓冲区中或者发生错误才返回,期间会挂起线程 内核会直到将用户数据复制到内核缓冲区中或者发生错误才返回,期间会挂起线程
非阻塞IO 如果有数据到达,内核将会复制到用户缓冲区并返回,如果没有数据到达,则将立即返回一个EWOULDBLOCK错误 如果内核缓冲区有空间,则将其复制到内核缓冲区,如果内核缓冲区已满,则立即返回一个EWOULDBLOCK错误
     

通常在实际使用中,阻塞IO会挂起用户线程,不会造成负担,但非阻塞IO需要用户去进行轮询才能保证数据及时收到,会耗费大量CPU时间。

阻塞IO流程图:

非阻塞IO流程图:

2

IO复用

IO复用的关键在于使用多路分离器(select/poll/epoll)去监听 socket 是否就绪,并返回相应的事件和文件描述符。

IO多路复用模型的流程图如下图所示,使用select可以避免非阻塞IO模型中轮询等待的问题。用户首先将需要监视的socket添加到select中,当数据到达时,select被激活,select函数返回,用户线程发起read请求,读取数据并继续执行。

3

从流程上来看,IO 复用并没有比同步阻塞模型有更大的优势,甚至还多了添加监视socket,以及调用select函数的额外操作。但是,使用 IO 复用最大的优势是可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,不断地调用多路分离函数获得就绪的文件描述符,然后进行IO操作,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

Reactor 模式


在上述的多路IO复用模型中,其实用户线程可以通过辅助线程去等待select函数的返回结果,当select函数返回时,辅助线程通过消息通知用户线程,用户现场根据消息来确定就绪的socket,然后发起读写请求。Reactor模式 的原理就与此类似,其流程图如下:

4

用户线程通过向 Reactor 注册事件处理器,并立即返回执行后续操作,Reactor进行调用 select 请求,并轮询其结果,当select函数返回即有可读的socket时,Reactor就通知用户线程,执行注册的事件处理器进行数据的读取、处理工作。其类图如下:

5

Handle:Linux上的文件操作符,Windows上的句柄,在网络编程中通常是Socket。这里统称事件源。

Event Demutiplexer:多路复用器。由操作系统提供的IO多路复用机制,比如 select/epoll 。使用时用户需要将其关心的事件源注册到多路复用器上,当有事件发生时,多路复用器会从多个事件源中选出对应的事件源。

Reactor:事件管理的接口,提供对外的注册事件和注销事件的接口,内部则使用多路复用器注册和注销事件;运行时启动事件循环,当有事件进入就绪状态时,通知事件处理器,通过注册事件的回调函数处理事件。

Enevt Handler:事件处理的接口类,每个接口对应了一种类型的事件,供Reactor在相应事件发生时调用,执行相应的事件处理。

Poco 中的 Reactor


Poco中实现了Reactor模式,并整合了Acceptor-Connetor模式的服务器。下面是Poco中Reactor包的重要的类的概述:

SocketReactor:模式中的Reactor,提供了注册事件和注销事件的接口。

SocketNotifier:模式内部用于通知事件处理器的类。

SocketAcceptor:Acceptor-Connetor模式的Acceptor。

SocketConnector:Acceptor-Connetor模式的Connetor。

ParallelSocketAcceptor:多线程版的 SocketAcceptor,与单线程版唯一的区别在于构造和轮询中被声明和使用的 SocketReactor 对象的个数(缺省值等于处理器个数)。

SocketNotification:SocketReactor 生成的所有消息的基类。一共有六个派生类,分别是: ErrorNotification,IdleNotification,ReadableNotification,ShutdownNotification,TimeoutNotification,WritableNotification。

使用方法


Reactor

SocketReactor 提供了注册事件和注销事件的接口,第一个参数代表了事件源,第二个参数代表了注册的事件处理器,这里使用了Poco的Observer。当某个 socket 就绪时,Observer就会接收到相应的消息,来根据不同的消息类型调用不同的回调函数进行处理。

void addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer);
	/// Registers an event handler with the SocketReactor.
	///
	/// Usage:
	///     Poco::Observer<MyEventHandler, SocketNotification> obs(*this, &MyEventHandler::handleMyEvent);
	///     reactor.addEventHandler(obs);

void removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer);
	/// Unregisters an event handler with the SocketReactor.
	///
	/// Usage:
	///     Poco::Observer<MyEventHandler, SocketNotification> obs(*this, &MyEventHandler::handleMyEvent);
	///     reactor.removeEventHandler(obs);

SocketAcceptor

在Poco中使用了 SocketAcceptor 来监听客户端的连接,其采用了经典的Acceptor-Connetor模式,将客户端服务端解耦为三个组件,分别是:AcceptorsConnectorsService Handlers

SocketAcceptor 的工作原理:它有一个重要的模板参数 class ServiceHandler 。构造时需要传入 SocketReactor 和 ServerSocket 对象的引用,然后向 SocketReactor 对象注册传入的 ServerSocket 和对 ReadableNotification 关心的 Observer,这个 Observer 在内部会注册一个 onAccept 回调函数。

当有客户端发起连接请求时,ServerSocket 被 select 选中,Observer 接收到 ReadableNotification 消息并调用 onAccept ,然后创建一个 ServiceHandler 对象。

ServiceHandler 对象的作用就是和客户端进行通信,其在Reactor模式中对应的就是Event Handler,只不过 Poco 中使用了自己的消息机制,不需要用户自己通过多态来实现消息通知,所以用户需要做的只是实现 ServiceHandler 。

下面的代码就是开启服务器的代码,分别声明一个 ServerSocket 对象,一个 SocketReactor 对象,一个 ParallelSocketAcceptor 对象,然后调用 SocketReactor 的 run 方法就可以了,当有客户端连接时,会自动生成一个 ServerHandler 对象处理连接。

Poco::Net::ServerSocket serverSocket(4569);

Poco::Net::SocketReactor reactor;

Poco::Net::ParallelSocketAcceptor<ServerHandler, Poco::Net::SocketReactor> acceptor(serverSocket, reactor);

reactor.run();

ServiceHandler

ServiceHandler 的责任是提供处理消息的回调函数,并向 SocketReactor 注册 Observer。当对应事件发生时,SocketReactor 能根据被 select 的 socket 向某个 Observer 发出特定的消息,对应的回调函数就能够执行相应的处理。

首先,设计 ServiceHandler 时,它的构造函数必须只含有 StreamSocket 和 ServiceReactor 类型的引用参数。例如:

MyServiceHandler(const StreamSocket& socket, ServiceReactor& reactor)

其次,ServiceHandler 必须提供相应 SocketNotification 的 Observer 并添加回调函数,并且需要调用 SocketReactor 的 addEventHandler 方法来注册。这样就可以正常使用了Poco的Reactor模式了。

ServerHandler 示例声明:

class ServerHandler : public Poco::RefCountedObject {
public:
    ServerHandler(Poco::Net::StreamSocket& socket, Poco::Net::SocketReactor& reactor);
    ~ServerHandler();

    //回调函数
    void OnReadable(Poco::Net::ReadableNotification* pNf);
    void OnWriteable(Poco::Net::WritableNotification* pNf);
    void OnError(Poco::Net::ErrorNotification* pNf);
    void OnTimeout(Poco::Net::TimeoutNotification* pNf);
    void OnShutdown(Poco::Net::ShutdownNotification* pNf);
private:
    Poco::Net::StreamSocket         _socket;
    Poco::Net::SocketReactor&       _reactor;
  
    Poco::Observer<ServerHandler, Poco::Net::ReadableNotification> _or;
    Poco::Observer<ServerHandler, Poco::Net::WritableNotification> _ow;
    Poco::Observer<ServerHandler, Poco::Net::ErrorNotification>    _oe;
    Poco::Observer<ServerHandler, Poco::Net::TimeoutNotification>  _ot;
    Poco::Observer<ServerHandler, Poco::Net::ShutdownNotification> _os;
};

实现:

ServerHandler::ServerHandler(StreamSocket & socket, SocketReactor & reactor)
    :_logger(Poco::Logger::get("ReactorServer.ServerHandler"))
    , _socket(socket)
    , _reactor(reactor)
    , _or(*this, &ServerHandler::OnReadable)
    , _ow(*this, &ServerHandler::OnWriteable)
    , _oe(*this, &ServerHandler::OnError)
    , _ot(*this, &ServerHandler::OnTimeout)
    , _os(*this, &ServerHandler::OnShutdown)
{
    _address = socket.peerAddress().toString();

    AddReactorEventHandlers();

    _socket.setNoDelay(true);
    _socket.setBlocking(false);
}

ServerHandler::~ServerHandler()
{
    RemoveReactorEventHandlers();
}

void ServerHandler::AddReactorEventHandlers()
{
    _reactor.addEventHandler(_socket, _oe);
    _reactor.addEventHandler(_socket, _os);
    _reactor.addEventHandler(_socket, _or);
    _reactor.addEventHandler(_socket, _ow);
}

void ServerHandler::RemoveReactorEventHandlers()
{
    _reactor.removeEventHandler(_socket, _oe);
    _reactor.removeEventHandler(_socket, _os);
    _reactor.removeEventHandler(_socket, _or);
    _reactor.removeEventHandler(_socket, _ow);
}

至此,Poco Reactor 基本的使用方法是这些了,关键在于实现 ServerHandler 并向 SocketReactor 注册,通过 SocketAcceptor 监听客户端连接,并自动生成 ServerHandler 实例。ServerHandler 中还可以根据用户自己的需求来进行扩展,比如实现读写缓冲区,解析包,心跳等等。

具体的项目移位链接:ReactorServer

SocketReactor 核心代码解读


Reactor 模式依赖于操作系统提供的 select 操作,select 能够轮询检查多个 Socket 的状态,包括检查可读状态,可写状态以及错误信息状态。当某个 Socket 的某一个状态就绪时,select 能够将其标识符置1,使用者可以根据标识符来判断 Socket 的状态。

在 Poco 的 Net 包中,Poco::NET::Socket 封装了操作系统提供 select 方法,并声明为静态函数,以供其他类使用,其声明如下:

static int select(
  SocketList & readList,
  SocketList & writeList,
  SocketList & exceptList,
  const Poco::Timespan & timeout
);

SocketListtypedef std::vector<Socket> SocketList; 声明。readList、writeList、exceptList 分别是需要轮询状态的 Sokcet 集合。

SocketReactor 会先将通过其 addEventHandler 方法注册的 Socket 按照是否注册消息类型来构建三个集合:

for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
{
  if (it->second->accepts(_pReadableNotification))
  {
    readable.push_back(it->first);
    nSockets++;
  }
  if (it->second->accepts(_pWritableNotification))
  {
    writable.push_back(it->first);
    nSockets++;
  }
  if (it->second->accepts(_pErrorNotification))
  {
    except.push_back(it->first);
    nSockets++;
  }
}

然后,通过 Poco::NET::Socket::select 方法将集合中准备就绪的 Socket 挑出(底层通过std::swap交换),然后向 Socket 的 Observer 发送相应通知,如果没有就绪的话就进入等待:

if (nSockets == 0)
{
  onIdle();
  Thread::trySleep(_timeout.totalMilliseconds());
}
else if (Socket::select(readable, writable, except, _timeout))
{
  onBusy();

  for (Socket::SocketList::iterator it = readable.begin(); it != readable.end(); ++it)
  	dispatch(*it, _pReadableNotification);
  for (Socket::SocketList::iterator it = writable.begin(); it != writable.end(); ++it)
  	dispatch(*it, _pWritableNotification);
  for (Socket::SocketList::iterator it = except.begin(); it != except.end(); ++it)
  	dispatch(*it, _pErrorNotification);
}

参考文献


高性能IO模型浅析

Poco官方文档之Network Programming

[libevent之Reactor模式