目录
epoll的工作方式
epoll有2种工作方式-水平触发(LT)和边缘触发(ET) ;
epoll默认状态下就是LT工作模式 ,select和poll其实也是工作在LT模式下,epoll既可以支持LT, 也可以支持ET.
这个LT ET的名字是从物理学方面来的; 可以看到LT很长,ET水平看就是一个点;
LT模式
- 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分
- 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait
仍然会立刻返回并通知socket读事件就绪- 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
- 支持阻塞读写和非阻塞读写
通俗说就是,一个sock想你发消息或者你向它发,由于种种原因这部分消息只读取了一部分,那么epoll就会不断地返回这个sock就绪,直到与这个sock通信这次结束,才会恢复正常机制;
ET模式
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式
- 当epoll检测到socket上事件就绪时, 必须立刻处理
- 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,
epoll_wait 不会再返回了- 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会
- ET的性能比LT性能更高( epoll_wait 返回的次数少了很多), Nginx默认采用ET模式使用epoll.
- 支支持非阻塞读写
通俗说就是,ET模式的epoll_wait,不管这次处理完没完,都不会再进行第二次了,就能让其他更多的sock都能就绪即拷贝,不在一个身上浪费很多时间.显然是更高效的
同时ET这种只来一次的无脑模式,在上层也就意味着想要读取数据完整,我们程序员就得被逼利用循环读取等方式来读取数据;
对比LT和ET
- LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把
所有的数据都处理完. - 相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到
每次就绪的文件描述符都立刻处理(完整的处理), 不让这个就绪被重复提示的话, 其实性能也是一样的. - 另一方面, ET 的代码复杂程度更高了.
高效原因:假设当sock1 和 sock2同时向server发数据;
- LT:sock1的数据发了一半,堵塞了,那我epoll_wait会直接返回,继续读sock1这个就绪事件;
- ET:我只搞你一遍,上层用户while轮询的读取的,如果你不堵塞,我while轮询直接读完整,如果你堵塞了,我读一部分放在你自己的缓冲区,然后break出while,下次再接着读你另一半,这样就不影响后续sock2的通信了,高效;
理解ET模式需要非阻塞文件描述符
- 使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 “工程实践” 上的要求
假设这样的场景: 服务器接受到一个10k的请求, 会向客户端返回一个应答数据. 如果客户端收不到应答, 不会发送第
二个10k请求 ;
如果服务端写的代码是阻塞式的read, 并且一次只 read 1k 数据的话(read不能保证一次就把所有的数据都读出来,
参考 man 手册的说明, 可能被信号打断), 剩下的9k数据就会待在缓冲区中.,下次epoll_wait立马又返回这个sock就绪,又针对这9k再io,对于其他sock来说就是低效;
此时由于 epoll 是ET模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回 (起到了高效的作用)
但是问题来了:套娃问题
- 务器只读到1k个数据, 要10k读完才会给客户端返回响应数据.
- 客户端要读到服务器的响应, 才会发送下一个请求
- 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据.
所以, 为了解决上述问题(阻塞read可能会会让程序挂起,不一定能一下把完整的请求读完), 于是就可以使用非阻塞轮训的方式来读缓冲区;保证一定能把完整的请求都读出来.
而如果是LT模式没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 接着立刻返回没读完的文件描述符读就绪
epoll的使用场景
epoll的高性能, 是有一定的特定场景的. 如果场景选择的不适宜, epoll的性能可能适得其反 ;
具体要根据需求和场景特点来决定使用哪种IO模型.
对于多连接, 且多连接中只有一部分连接比较活跃时, 比较适合使用epoll ;
eg(例如, 典型的一个需要处理上万个客户端的服务器, 例如各种互联网APP的入口服务器, 这样的服务器就很适合epoll )这也是能看出互联网企业服务器底层常用epoll的原因
如果只是系统内部, 服务器和服务器之间进行通信, 只有少数的几个连接, 这种情况下用epoll就并不合适.
Reactor设计模式
Reactor简介
Reactor 释义“反应堆”,是一种事件驱动机制
- 事件驱动(event handling)
- 可以处理一个或多个输入源(one or more inputs)
- 通过Service Handler同步的将输入事件(Event)采用多路复用分发给相应的Request Handler(多个)处理
结合ET模式+epoll的多路复用:
- 同步的等待多个事件源到达(采用epoll()实现)
- 将事件多路分解以及分配相应的事件服务进行处理,这个分派采用server集中处理(dispatch)
- 分解的事件以及对应的事件服务应用从分派服务中分离出去(handler函数指针)
形象化为打地鼠游戏:
这一个个的Events其实就是epoll所需要监听的等待事件,如果有就绪发生,就跟地鼠冒出来一样,我们ET模式下的Reactor直接通过Dispatch将事件分发出去,我们handler处理函数被逼循环式的直到把他砸下去(循环拷贝直到没数据了),注意,Reactor分发事件出去以后,就脱离了Reactor体系,具体的业务逻辑修改handler即可,你也可以引入线程池,多进程等这类多并发技术进一步提升效率;
上图黄色就是分发出去的handler;
用Reactor模式设计一个计算器server
我们写的是一个单Reactor单线程的简易reactor,目的是理解reactor模式的工作框架,当然可以根据需要,在处理业务逻辑的地方引入线程池等技术提高效率,也可以创建多进程多Reactor,这里不深究;
// 单进程基于epoll的ET非阻塞形式设计的一个Reactor模式
// 检测事件就绪 + 对数据的读写 + 对数据的分析处理
ReactorServer.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <unordered_map>
#include <sys/epoll.h>
#include <unistd.h>
void f(std::string s) {
std::cout << s << std::endl; }//TEST
namespace ns_reactor
{
class Event; //事件类
typedef void (*callback_t)(Event &); //回调
class Reactor; //包含epoll和一个{sock:event}的map;
class Event
{
public:
int sock_;
Reactor *r_; // 指向该Event对应的Reactor,
std::string inbuffer_; //对应的sock,私有的读取缓冲区
std::string outbuffer_; //对应的sock,私有的发送缓冲区
callback_t recv_callback_; //对应的sock,读回调
callback_t send_callback_; //对应的sock,写回调
callback_t error_callback_; //对应的sock,异常回调
public:
Event() : sock_(-1), r_(nullptr)
{
recv_callback_ = nullptr;
send_callback_ = nullptr;
error_callback_ = nullptr;
}
void RegisterCallback(callback_t _recv, callback_t _send, callback_t _error) // Accept后 给非listen _sock搞回调方式
{
recv_callback_ = _recv;
send_callback_ = _send;
error_callback_ = _error;
}
~Event() {
}//这里不能close(ev.sock_)噢,我们在ev的的error处理中关了sock,这里再析构会出错;
};
class Reactor
{
int epfd_; // EPOLL
std::unordered_map<int, Event> events_; // sock : Event
public:
Reactor() : epfd_(-1)
{
}
void InitReactor()
{
epfd_ = epoll_create(128);
if (epfd_ < 0)
{
std::cerr << "epoll_create error" << std::endl;
exit(1);
}
}
void AddEvent(const Event &ev, uint32_t events) //将需要监听事件add入epoll,别忘了reactor的map也要加一份
{
epoll_event tmp;
tmp.data.fd = ev.sock_;
tmp.events = events;
if (epoll_ctl(epfd_, EPOLL_CTL_ADD, ev.sock_, &tmp) < 0) // ADD入epoll
{
std::cerr << "add error" << std::endl;
exit(-1);
}
events_.insert({
ev.sock_, ev}); //插入map
std::cout << "添加事件成功, sock: " << ev.sock_ << std::endl;
}
void DelEvent(const Event &ev)
{
epoll_ctl(epfd_, EPOLL_CTL_DEL, ev.sock_, nullptr); // DEL入epoll
events_.erase(ev.sock_); //从map删掉
}
void EnableReadWrite(int sock, bool in, bool out)
{
epoll_event ev;
ev.data.fd = sock;
ev.events = (in == true ? EPOLLIN : 0) | (out == true ? EPOLLOUT : 0);
epoll_ctl(epfd_, EPOLL_CTL_MOD, sock, &ev);
}
void Dispatcher(int timeout) //派发 wait拿到 就绪了以后 根据类型甩IO任务;
{
epoll_event revs[128];
int n = epoll_wait(epfd_, revs, 128, timeout);
for (int i = 0; i < n; i++)
{
int sock = revs[i].data.fd;
uint32_t events = revs[i].events;
//有bug??
if (events & EPOLLIN)
{
if (events_[sock].recv_callback_)
events_[sock].recv_callback_(events_[sock]);
}
if (events & EPOLLOUT)
{
if (events_[sock].send_callback_)
events_[sock].send_callback_(events_[sock]);
}
}
}
~Reactor()
{
if (epfd_ >= 0)
close(epfd_);
}
};
}
ReactorServer.cc
#include "ReactorServer.hpp"
#include "sock.hpp"
#include "Accepter.hpp"
using namespace ns_reactor;
using namespace ns_sock;
int main()
{
// 建立epoll对象; 创建套接字 -> ET模式 建立连接 ->调派发
Reactor *R = new Reactor();//单Reactor 全局就一个。
R->InitReactor();
int listen_sock = Sock::Socket();
Sock::Bind(listen_sock, 8080); // Bind封装了error处理
Sock::Listen(listen_sock);
SetNoBlock(listen_sock);
//有listen_sock了
Event ev;
ev.sock_ = listen_sock;
ev.r_ = R;
// Accepter链接管理器 也相当于一种回调函数; listensock-->的epollin == Accepter链接管理器
ev.RegisterCallback(Accepter, nullptr, nullptr);
R->AddEvent(ev, EPOLLIN | EPOLLET);
// listen搞完 链接自动被管理了,进入事件派发器,服务器正式开始运作;
int timeout = 1000;
while (true)
{
R->Dispatcher(timeout); // epoll_wait派发!
}
return 0;
}
sock.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <strings.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
void SetNoBlock(int fd)
{
int fl = fcntl(fd, F_GETFL); //文件描述符的属性取出来存入fl中
if (fl < 0)
{
//执行失败返回-1并报错
perror("fcntl");
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK); //设置fl | O_NONBLOCK 类似位图填充类型设置
}
namespace ns_sock
{
enum
{
SOCKET_ERR = 2,
BIND_ERR,
LISTEN_ERR
};
const int g_backlog = 5;
class Sock
{
public:
static int Socket()
{
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
std::cerr << "socket error!" << std::endl;
exit(SOCKET_ERR);
}
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); //将sock搞成非阻塞的 满足ET模式
return sock;
}
static void Bind(const int &sock, const u_int16_t &port)
{
struct sockaddr_in local;
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
std::cerr << "bind error!" << std::endl;
exit(BIND_ERR);
}
}
static void Listen(const int &sock)
{
if (listen(sock, g_backlog) < 0)
{
std::cerr << "bind error!" << std::endl;
exit(LISTEN_ERR);
}
}
static void SetNonBlock(int sock)
{
int fl = fcntl(sock, F_GETFL);
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}
};
}
Accepter.hpp
#pragma once
#include <iostream>
#include <string>
#include <cerrno>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "ReactorServer.hpp"
#include "Callback.hpp"
using namespace ns_reactor;
void Accepter(Event &event) //只会被listensock调;
{
std::cout << "Accepter 回调方法被调用" << std::endl;
// ET 模式 连接事件到来,在一个时间段,有很多的连接到来的!保证读完那就得while读干净!
while (1)
{
sockaddr_in peer;
socklen_t len = sizeof(peer);
int listen_sock = event.sock_;
int sock = accept(listen_sock, (sockaddr *)&peer, &len);
// SetNoBlock(sock);//设置 ET 模式 注意 如果在accept前设置又bug 会显示accept错误。。。。 accept轮询返回,放这里的话会出问题 自己研究;
if (sock > 0)
{
SetNoBlock(sock); //设置 ET 模式 注意 如果在accept前设置又bug 会显示accept错误。。。。
Event ev;
ev.r_ = event.r_;
ev.sock_ = sock;
ev.RegisterCallback(Recver, Sender, Errorer); //为到来的sock注册三个Request Handler
(event.r_)->AddEvent(ev, EPOLLIN | EPOLLET); //甩入eopll中监听
}
else if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//非阻塞sock中 返回这两个errno不是出错了,而是 暂时没链接了--(这批 ET 模式下 得 连接读完了)!
break;
}
else if (errno == EINTR)
{
//当前的accept调用,被信号中断,没出错,并不代表底层没有新的链接了
continue;
}
else if (sock < 0)
{
//真正出错了
std::cerr << "accept error" << std::endl;
continue; //读别的链接
}
}
}
Callback.hpp
#pragma once
#include "ReactorServer.hpp"
#include "Util.hpp"
using namespace ns_reactor;
/*************************************** * return: * 0: 本轮读取完毕 * -1: 读取过程中出错了 * * sock: 要读取的fd * out: 输出型参数 * * ************************************/
//读好了直接将返回的 内容 放入该sock的 out缓冲区作为输出型参数本质是 sock的 string in
int RecvHepler(int sock, std::string *out)
{
while (1) // ET模式 被逼循环直到完;
{
char buffer[1024]; //临时的
ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (s > 0)
{
//读到了,但是不能保证读完了
buffer[s] = 0; //上'\0',进而可以string操作
(*out) += buffer;
}
else if (s < 0) //出错,但是读完也是存在于出错中的一种 errno == EAGAIN || errno == EWOULDBLOCK
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
return 0; //读完,没数据了
else if (errno == EINTR)
continue; //信号中断 继续读
else
return -1; //真有错了,error
}
else // s == 0
return -1; //被关闭,同上有错
}
}
//
void Recver(Event &event)
{
//讲 读数据 和 业务(数据)操作 过程解耦;
if (-1 == RecvHepler(event.sock_, &(event.inbuffer_)))
{
//出错 or 被关闭 --同一给error_handler
if (event.error_callback_)
event.error_callback_(event);
return;
}
//读取过程完成了,下来就是业务过程;
// 往后我们所写的内容,已经和Reactor无关了!全部都是数据分析与处理
//定制协议:
// 1+1X2*3X3*5X7*9X9*10X9*
// X: 叫做报文和报文之间的分割符
// 类似1+1:一个完整报文,协议解析, 解决粘包问题
std::cout<<event.inbuffer_<<std::endl;
std::vector<std::string> packages; //存"1+1" "2*2"这种等待处理的子段;
Util::StringSplit(event.inbuffer_, &packages, "X");
for (int i = 0; i < packages.size(); i++) //
{
for (int j = 0; j < packages[i].size(); j++)
{
char op = packages[i][j];
if (op == '+' || op == '-' || op == '*' || op == '/' || op == '%') //
{
int a = atoi(packages[i].substr(0, j).c_str()); // string -> char*c
int b = atoi(packages[i].substr(j + 1).c_str());
int ret;
std::cout << a << " " << b << " " << op << std::endl;
switch (op)
{
case '+':
ret = a + b;
break;
case '-':
ret = a - b;
break;
case '*':
ret = a * b;
break;
case '/':
ret = a / b;
break;
case '%':
ret = a % b;
break;
}
//构造回复报文
std::string respond;
respond += packages[i];
respond += "=";
respond += std::to_string(ret);
respond += "X"; //添加报文和报文的分隔符
// 5. 发送的核心:a. 不是我们调用send! 是甩给Reactor!b. 还需要将报文添加到outbuffer中即可!
event.outbuffer_ += respond;
// 6. 多路转接中,一般EPOLLIN是常设的, 而EPOLLOUT是按需设置的
(event.r_)->EnableReadWrite(event.sock_, true, true);
}
}
// out string搞定可以监听写事件了;
}
// std::cout<<"计算结果:"<<event.outbuffer_<<std::endl;
}
// send_string: 输入输出型参数
// ret > 0 : 缓冲区数据全部发完
// ret == 0 : 本轮发送完, 但是缓冲区还有数据
// ret < 0 : 发送失败
int SenderHepler(Event &event)
{
int total = event.outbuffer_.size(); //还剩多少i
while (1)
{
int s = send(event.sock_, event.outbuffer_.c_str(), total, 0);
if (s > 0)
{
if (total <= s)
{
//发成功 全完
event.outbuffer_.erase(0);
return 1;
}
else if (total > s)
{
event.outbuffer_.erase(0, s); //头删
total -= s;
continue;
}
}
else if (s < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//全部写完
event.outbuffer_.erase(0);
return 1;
}
else if (errno == EINTR)
continue; //信号中断 继续读
else
{
//真出错;
return -1;
}
}
else
{
return -1;
}
}
}
void Sender(Event &event)
{
if (-1 == SenderHepler(event))
{
if (event.error_callback_)
event.error_callback_(event);
return;
}
(event.r_)->EnableReadWrite(event.sock_, true, false); //写完之后关掉了
}
void Errorer(Event &event) // error 我们就关sock 然后退出
{
std::cout << "call Errorer... closed sock me too" << std::endl;
event.r_->DelEvent(event);
close(event.sock_); //关闭该文件描述符
}
Util.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
class Util
{
// in 解析完 还得删了,不然占空间;
public:
static void StringSplit(std::string &in, std::vector<std::string> *out, const std::string &sep)
{
// 1+1X 2*3X 3*3X 45*
while (true)
{
auto pos = in.find(sep); //返回sep的index
if (pos == std::string::npos)
{
//没有找到分割符 结束分包;
break;
}
std::string sub = in.substr(0, pos); //[)
out->push_back(sub); // out尾插
in.erase(0, pos + sep.size()); // in头删
}
}
};
运行结果
client:
server:
总结
ET和LT是IO的两种模式;
Reactor是一种上层设计,效率不同模式,重点是事件派发,与ET,LT无关;
区别:
Reactor 模式:非阻塞同步 : 这里的同步io是指IO等待由内核完成,就绪了通知你你来拷贝;(早期服务器)Linux特别常用;
Proactor前摄式;非阻塞异步 : 这里的异步io是指IO等待+拷贝数据都直接由内核完成; (显然异步在实际场景更高效;)
我们上面设计的单Reactor单线程的简易的reactor,是基于Epoll的ET模式下的Reactor,因为将io拷贝操作也写了接口,交给了Reactor管理,因此来说算是一个 半同步半异步的Reactor 反应堆模式;
文章评论