CCnet | 单线程reactor演变为多线程
ccnet从单线程演变为多线程
ccnet现状:
- epoll
- linux平台
- 事件循环
- 单线程
- reactor
- 监听和新连接的事件都在主线程中
单线程的ccnet
EventLoop
: 整个事件循环。EPollPoller
: 负责事件的收集。Channel
:负责事件的分发。Acceptor
:处理客户端新连接,绑定监听回调时间。TcpConnection
: 包含EventLoop
组件,Channel
组件,接收缓冲区和发送缓冲区,负责数据的收发。TcpServer
:包含EventLoop
组件,Acceptor
组件,客户端连接map
,还有回调接口Buffer
: 缓冲区
*ccnet
的单线程模型只是对epoll进行了封装,然后根据reactor模型在代码上进行了组件的区分,但是整体操作还是单线程,不能充分利用硬件资源。TcpServer
的回调函数中,数据接收和业务处理在同一条线程中。
单线程的Reactor
- Reactor:负责监听和分配事件,将I/O事件分派给对应的Handler。新的事件包含连接建立就绪、读就绪、写就绪等。
- Acceptor:处理客户端新连接,并分派请求到处理器中。
- Handler:将自身与事件绑定,执行非阻塞读/写任务。
单线程reactor
的消息处理流程:
Reactor
通过select/poll/epoll
监控连接事件,通过dispatch
进行分发- 如果是连接建立的事件,则由
acceptor
处理,并创建handler
处理后续事件。 - 如果不是建立连接事件,则
Reactor
会分发调用Handler
来响应。 handler
会完成read
->业务处理->send
的完整业务流程。
Reactor
单线程模型只是在代码上进行了组件的区分,但是整体操作还是单线程,不能充分利用硬件资源。handler
业务处理部分没有异步。
对于一些小容量应用场景,可以使用单Reactor
单线程模型。但是对于高负载、大并发的应用场景却不合适,主要原因如下:
- 即便
Reactor
线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送。 - 当
Reactor
线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重Reactor
线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈。 - 一旦
Reactor
线程意外中断或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
主从Reactor多线程模型
- 从主线程池中随机选择一个
Reactor
线程作为acceptor
线程,用于绑定监听端口,接收客户端连接 acceptor
线程接收客户端连接请求之后创建新的SocketChannel
,将其注册到主线程池的其它Reactor
线程上。- 步骤2完成之后,业务层的链路正式建立,将
SocketChannel
从主线程池的Reactor线程的多路复用器上摘除,重新注册到SubReactor
线程池的线程上,并创建一个Handler
用于处理各种读写事件 - 当有新的事件发生时,
SubReactor
会调用连接对应的Handler
进行响应 Handler
通过Read
读取数据后,会分发给后面的Worker
线程池进行业务处理Worker
线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给Handler进行处理Handler
收到响应结果后通过Send
将响应结果返回给Client
多线程的ccnet
根据主从Reactor
多线程模型,可以想到ccnet
单线程模型中还缺少事件循环线程池EventLoopThreadPool
组件和worker线程池,EventLoopThreadPool
顾名思义,每个线程里都有一个事件循环。ccnet
没有worker线程池。
可以把ccnet
的单线程模型中 TcpServer
组件修改一下,因为他包含事件循环EventLoop
组件和Acceptor
组件,也绑定了 Acceptor
的新连接事件回调,TcpServer
一个 ccnet
多线程模型所需组件的集合,他还需要的组件如下:
EventLoopThread
: on loop peer thread,负责开启一个线程一个事件循环。EventLoopThreadPool
: 是事件循环线程池。TcpServer
: 增加EventLoopThreadPool
组件。 修改TcpServer.cc
的代码:
void TcpServer::newConnection(int sockfd, const TcpAddr& peerAddr)
{
...
EventLoop *io_loop = thread_pool_->getNextLoop(); //增加的行
TcpConnectionPtr conn(new TcpConnection(io_loop, //修改的行
connName,
sockfd,
localaddr,
peerAddr));
...
io_loop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));//修改的行
}
修改TcpServer.h
代码:
class TcpServer
{
...
//thread pool
std::shared_ptr<EventLoopThreadPool> thread_pool_;
...
}
修改example中的EchoServer.cc
代码:
server_.setNumThreads(4); //设置线程数量
详细代码请参考:github: https://github.com/zfunnily/ccnet
--完--
- 原文作者: 留白
- 原文链接: https://zfunnily.github.io/2020/12/ccnet/
- 更新时间:2024-04-16 01:01:05
- 本文声明:转载请标记原文作者及链接