系列文章目录
《ZLToolKit源码学习笔记》(1)VS2019源码编译
《ZLToolKit源码学习笔记》(2)工具模块之日志功能分析
《ZLToolKit源码学习笔记》(3)工具模块之终端命令解析
《ZLToolKit源码学习笔记》(4)工具模块之消息广播器
《ZLToolKit源码学习笔记》(6)线程模块之整体框架概述
《ZLToolKit源码学习笔记》(7)线程模块之线程池组件:任务队列与线程组
《ZLToolKit源码学习笔记》(8)线程模块之线程负载计算器
《ZLToolKit源码学习笔记》(9)线程模块之任务执行器
《ZLToolKit源码学习笔记》(11)线程模块之工作线程池WorkThreadPool
《ZLToolKit源码学习笔记》(12)事件轮询模块之整体框架概述
《ZLToolKit源码学习笔记》(13)事件轮询模块之管道的简单封装
《ZLToolKit源码学习笔记》(14)事件轮询模块之定时器
《ZLToolKit源码学习笔记》(15)事件轮询模块之事件轮询器EventPoller(本文)
《ZLToolKit源码学习笔记》(16)网络模块之整体框架概述
《ZLToolKit源码学习笔记》(17)网络模块之基础接口封装类SockUtil
《ZLToolKit源码学习笔记》(18)网络模块之Buffer缓存
《ZLToolKit源码学习笔记》(19)网络模块之套接字封装
《ZLToolKit源码学习笔记》(20)网络模块之TcpServer
《ZLToolKit源码学习笔记》(21)网络模块之TcpClient与Session
《ZLToolKit源码学习笔记》(22)网络模块之UdpServer
前言
一个服务器程序,一般都需要处理三类事件:IO事件、定时事件、信号。而为了方便处理,我们需要统一事件源,比如使用IO复用来管理所有事件。其次,为了实现跨平台,我们需要提供一个平台无关的统一接口,与平台相关的实现在内部完成。比如对于IO复用,linux下有epoll,windows等其它平台有select,这里我们就可以统一封装,对外提供一致的接口。一个事件管理接口,还应该是支持多线程的,本节学习下ZLToolKit中的事件管理工具:EventPoller。支持定时事件、IO事件处理,通过内置的管道事件,也可以实现多线程执行任务的负载均衡。
目录
2.1.3、async_l-添加需要在事件监听线程中执行的任务
一、概述
EventPoller类按照本人的理解,接口大致分为4类:内部管道事件、定时器事件、用户事件(用户可以自己管理的事件,可以是管道、网络socket等)以及线程相关的接口。
EventPoller的事件处理模式为Reactor模式,每一个事件都有其对应的回调,在事件触发后,该回调被调用。
EventPoller的事件管理一般在单独的线程中进行,这可以通过runLoop函数确定。该接口的第一个参数可以设置是否为阻塞的,非阻塞状态下,会创建一个线程,后续的事件监听在线程中进行。linux系统下,事件监听通过epoll管理,非linux平台,通过select管理。
该类虽然是一个单例类(构造函数私有),但是由于其将EventPollerPool作为友元类,所以,使用时,我们一般不直接实例化该类对象,而是通过其友元类EventPollerPool来间接使用。在EventPollerPool中,会根据用户指定的size或者CPU核心数创建多个EventPoller的实例,后续使用时,根据特定条件选择其中一个来调用(比如选择负载最轻的)。
对于EventPollerPool的学习,可以参见《ZLToolKit源码学习笔记》(11)线程模块之工作线程池WorkThreadPool,两者类似。
二、功能分析
2.1、内部管道事件
内部管道事件用于用户工作线程和事件监听线程之间的通信。事件监听线程中,监听了管道的读端,用户工作线程中,使用管道的写端。
基于该管道事件,实现了一个任务队列,可以在该线程中异步的执行任务。_list_task用于存储和管道关联的任务,async_l负责将任务加入到_list_task中,然后向管道中写入数据,触发其读事件,runLoop中监听到该事件后,调用onPipeEvent来执行并清除_list_task中的任务。
2.1.1、构造函数
EventPoller::EventPoller(ThreadPool::Priority priority ) {
_priority = priority;
SockUtil::setNoBlocked(_pipe.readFD());
SockUtil::setNoBlocked(_pipe.writeFD());
#if defined(HAS_EPOLL)
_epoll_fd = epoll_create(EPOLL_SIZE);
if (_epoll_fd == -1) {
throw runtime_error(StrPrinter << "创建epoll文件描述符失败:" << get_uv_errmsg());
}
SockUtil::setCloExec(_epoll_fd);
#endif //HAS_EPOLL
_logger = Logger::Instance().shared_from_this();
_loop_thread_id = this_thread::get_id();
//添加内部管道事件
if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) {
throw std::runtime_error("epoll添加管道失败");
}
}
可以看到,管道的读端fd被添加到事件列表中,当管道读事件被触发时,会执行onPipeEvent函数。
2.1.2、onPipeEvent-管道读事件回调
inline void EventPoller::onPipeEvent() {
TimeTicker();
char buf[1024];
int err = 0;
do {
if (_pipe.read(buf, sizeof(buf)) > 0) {
continue;
}
err = get_uv_error(true);
} while (err != UV_EAGAIN);
decltype(_list_task) _list_swap;
{
lock_guard<mutex> lck(_mtx_task);
_list_swap.swap(_list_task);
}
_list_swap.for_each([&](const Task::Ptr &task) {
try {
(*task)();
} catch (ExitException &) {
_exit_flag = true;
} catch (std::exception &ex) {
ErrorL << "EventPoller执行异步任务捕获到异常:" << ex.what();
}
});
}
该接口在管道的读事件发生后被调用,执行_list_task中的任务。首先读取管道中的数据,然后交换_list_task,即清空其中任务。最后以此执行所有任务。
2.1.3、async_l-添加需要在事件监听线程中执行的任务
Task::Ptr EventPoller::async_l(TaskIn task,bool may_sync, bool first) {
TimeTicker();
if (may_sync && isCurrentThread()) {
task();
return nullptr;
}
auto ret = std::make_shared<Task>(std::move(task));
{
lock_guard<mutex> lck(_mtx_task);
if (first) {
_list_task.emplace_front(ret);
} else {
_list_task.emplace_back(ret);
}
}
//写数据到管道,唤醒主线程
_pipe.write("", 1);
return ret;
}
异步执行的情况下,将任务添加到_list_task中,然后通过向管道中写数据来触发读事件,runLoop事件监听线程中调用onPipeEvent来执行任务。
2.2、定时事件
该部分已经在定时器一节中学习过,可参见《ZLToolKit源码学习笔记》(14)事件轮询模块之定时器。
2.3、用户事件
用户可以通过addEvent、delEvent、modifyEvent三个接口,将自己关心的事件让EventPoller来管理。所有被管理的事件都存放在_event_map中。
unordered_map<int, Poll_Record::Ptr> _event_map;
2.3.1、addEvent-添加事件监听
int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
TimeTicker();
if (!cb) {
WarnL << "PollEventCB 为空!";
return -1;
}
if (isCurrentThread()) {
#if defined(HAS_EPOLL)
struct epoll_event ev = {0};
ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE;
ev.data.fd = fd;
int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
if (ret == 0) {
_event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
}
return ret;
#else
#ifndef _WIN32
//win32平台,socket套接字不等于文件描述符,所以可能不适用这个限制
if (fd >= FD_SETSIZE || _event_map.size() >= FD_SETSIZE) {
WarnL << "select最多监听" << FD_SETSIZE << "个文件描述符";
return -1;
}
#endif
Poll_Record::Ptr record(new Poll_Record);
record->event = event;
record->callBack = std::move(cb);
_event_map.emplace(fd, record);
return 0;
#endif //HAS_EPOLL
}
async([this, fd, event, cb]() {
addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb)));
});
return 0;
}
将事件注册到epoll或者select中管理。注意看函数最后的async调用(前边的if (isCurrentThread())判断了是否是事件监听线程),说明该接口实际的操作不是在用户工作线程中执行的,而是转移到了事件监听线程中执行,避免了多线程操作epoll或者select。
每一个事件都有其对应的回调,在事件触发后,该回调被调用。
对于epoll,直接调用epoll系列api进行操作,对于select,由于每次都需要重新添加要监听的事件,所以这里仅将事件放入_event_map中,添加到select的操作在runLoop中执行。节选runLoop中相关代码如下:
set_read.fdZero();
set_write.fdZero();
set_err.fdZero();
max_fd = 0;
for (auto &pr : _event_map) {
if (pr.first > max_fd) {
max_fd = pr.first;
}
if (pr.second->event & Event_Read) {
set_read.fdSet(pr.first);//监听管道可读事件
}
if (pr.second->event & Event_Write) {
set_write.fdSet(pr.first);//监听管道可写事件
}
if (pr.second->event & Event_Error) {
set_err.fdSet(pr.first);//监听管道错误事件
}
}
startSleep();//用于统计当前线程负载情况
ret = zl_select(max_fd + 1, &set_read, &set_write, &set_err, minDelay ? &tv : NULL);
sleepWakeUp();//用于统计当前线程负载情况
delEvent、modifyEvent和addEvent接口类似,不再赘述。
三、runLoop
内部管道事件、定时器事件 、以及用户添加的事件都在该接口中管理,当事件触发时,调用各自对应的回调函数,将事件通知上去。在回调中同步的进行IO操作等耗时较大的工作时,会导致事件不能被及时处理,应尽量避免此种操作,可以使用任务队列的方式,将就绪事件放在任务队列中,然后另外的工作线程从队列中获取任务执行。
文章评论