io多路复用 可以同时检测多个网络IO是否准备就绪,将就绪消息的代码段与处理 消息的代码段分离,从而达到高效操作就绪fd的作用。
io 多路复用只负责检测 io,不负责操作 io
其中select、poll在各系统中基本都有实现, linux有epoll, windows下则有iocp。
select、poll 、epoll共同点:提供检测fd就绪的机制,使其可以在一个线程内处理多个客户端消息。
1 select
优点:
-
调用简单,大多操作系统都有支持。
缺陷:
-
每次调用select,要把 fdset 从用户态拷贝到内核态
-
调用select 阻塞后,内核要通过遍历的方式来检查fd 的就绪状态
-
由于上述缺陷select 限制了 maxfd 的大小为 1024,即只能监听1024个事件。
-
select触发后,用户线程还需要所有的fd 检测哪些fd 已准备
2 poll
poll 的机制与 select 一样,只是取消了1024的限制。
3 epoll
epoll在内核中有一个存放socket fd的数据结构(红黑树),允许进程监听多个 socket 的事件,epoll通过 epoll_ctl 把感兴趣的 fd 添加进去。每次调用epoll_wait 检测事件后,只返回就绪的事件通知用户进程处理。
优点:
-
只有第一次调用 epoll_ctl 添加感兴趣的 fd 到内核的 epoll 实例
-
内核只需要关注用户感兴趣的事件,并使用异步事件通知
-
内核直接返回已就绪的 fd 集合
缺陷:
-
对于大量网络连接,并且大量活跃连接的情况,与epoll 和 select 的表现是差不多的。因为要对所有活跃的链接进行通知。
-
支持的操作系统有限,目前linux下应用比较广泛。
4 实例应用
以下分别使用select、poll 、epoll实现一个tcp服务,使用linux系统,支持多个客户端连接,实现echo消息回传的功能。
4.1 select 代码
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/types.h>
#include <netinet/in.h>
#define BUF_MAX 1024
#define BUF_LEN 64
int main(int argc, char **argv){
printf("-----select---\n");
char buf[BUF_LEN] = {0};
int listen_fd = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd == -1)
{
printf("socket error\n");
return -1;
}
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(6789);
if(bind(listen_fd,(struct sockaddr*)&saddr,sizeof(saddr)) == -1)
{
printf("bind error\n");
return -1;
}
listen(listen_fd,5);
printf("listen %d success\n",listen_fd);
//----------------------------------------
fd_set rfds,set_rfds;
FD_ZERO(&rfds);
FD_ZERO(&set_rfds);
FD_SET(listen_fd,&set_rfds);
int max_fd = listen_fd;
struct sockaddr_in client;
socklen_t len = sizeof(client);
while(1)
{
rfds = set_rfds; // select rfds为输入输出参数,所以每次检测前重新设置rfds
printf("select blocking\n");
int nready = select(max_fd+1,&rfds,NULL,NULL,NULL);
if(nready <= 0)
{
continue;
}
printf("select count %d\n",nready);
//连接
if(FD_ISSET(listen_fd,&rfds))
{
printf("new connect wait\n");
int client_fd = accept(listen_fd,(struct sockaddr*)&client,&len);
if(client_fd == -1)
{
printf("accept error\n");
return -1;
}
printf("new connect in %d\n",client_fd);
FD_SET(client_fd,&set_rfds);
if(client_fd > max_fd) max_fd = client_fd;
if (--nready == 0) continue;
}
//业务
//多个客户端共用一个buf
int index = listen_fd+1;
printf("index=%d,max_fd=%d\n",index,max_fd);
for(;index < max_fd+1; index++) //可能出问题(client_fd < listen_fd),可将连接的所有fd存入容器。
{
if(FD_ISSET(index,&rfds))
{
printf("recv ...\n");
len = recv(index,buf,BUF_LEN,0);
if(len > 0)
{
buf[len] = '\0';
printf("recv buf:%s\n",buf);
send(index,buf,BUF_LEN,0);
}else{
printf("disconnect\n");
FD_CLR(index,&set_rfds);
close(index);
}
//nready 可提前退出 continue
}
}
}
close(listen_fd);
return 0;
}
4.2 poll 代码
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/poll.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#define BUF_MAX 1024
#define BUF_LEN 64
#define POLL_SIZE 1024
int main(int argc, char **argv){
printf("----poll----\n");
char buf[BUF_LEN] = {0};
int listen_fd = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd == -1)
{
printf("socket error\n");
return -1;
}
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(6789);
if(bind(listen_fd,(struct sockaddr*)&saddr,sizeof(saddr)) == -1)
{
printf("bind error\n");
return -1;
}
listen(listen_fd,5);
printf("listen %d success\n",listen_fd);
//----------------------------------------
pollfd pfds[POLL_SIZE]= {0};
pfds[0].fd = listen_fd;
pfds[0].events = POLLIN;
int max_fd = listen_fd;
for(int i=1;i<POLL_SIZE;i++)
{
pfds[i].fd=-1;
}
struct sockaddr_in client;
socklen_t len = sizeof(client);
while(1)
{
printf("poll blocking\n");
int nready = poll(pfds,max_fd+1,-1);
if(nready == 0)
{
continue;
}else if(nready < 0)
{
printf("poll error\n");
break;
}
printf("poll count %d\n",nready);
//连接
if(pfds[0].revents & POLLIN)
{
printf("new connect wait\n");
int client_fd = accept(listen_fd,(struct sockaddr*)&client,&len);
if(client_fd == -1)
{
printf("accept error\n");
return -1;
}
printf("new connect in %d\n",client_fd);
pfds[client_fd].fd = client_fd;
pfds[client_fd].events = POLLIN;
if(client_fd > max_fd) max_fd = client_fd;
if(--nready == 0) continue;
}
//业务
//多个客户端共用一个buf
int index = listen_fd+1;
printf("index=%d,max_fd=%d\n",index,max_fd);
for(;index < max_fd+1; index++) //可能出问题(client_fd < listen_fd),将连接的所有fd存入容器。
{
memset(buf,0,BUF_LEN);
if(pfds[index].revents & POLLIN)
{
printf("recv ...\n");
int rlen = recv(index,buf,BUF_LEN,0); //注意单位
if(rlen > 0)
{
//buf[len] = '\0';
printf("recv rlen:%d,buf:%s\n",rlen,buf);
send(index,buf,BUF_LEN,0);
}else if(rlen == 0){
printf("disconnect\n");
pfds[index].fd = -1;
close(index);
}
//nready 提前退出
if (--nready == 0) break;
}
}
/*
*/
}
close(listen_fd);
return 0;
}
4.3 epoll代码
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#define BUF_MAX 1024
#define BUF_LEN 64
#define POLL_SIZE 1024
int main(int argc, char **argv){
printf("----epoll----\n");
char buf[BUF_LEN] = {0};
int listen_fd = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd == -1)
{
printf("socket error\n");
return -1;
}
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(6789);
if(bind(listen_fd,(struct sockaddr*)&saddr,sizeof(saddr)) == -1)
{
printf("bind error\n");
return -1;
}
listen(listen_fd,5);
printf("listen %d success\n",listen_fd);
//----------------------------------------
int epollfd = epoll_create(1);
struct epoll_event event,events[POLL_SIZE] = {0};
event.events = EPOLLIN;
event.data.fd = listen_fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,listen_fd,&event);
struct sockaddr_in client;
socklen_t len = sizeof(client);
while(1)
{
printf("epoll blocking\n");
int nready = epoll_wait(epollfd,events,POLL_SIZE,30000);
if(nready == -1)
{
continue;
}
printf("epoll count %d\n",nready);
for(int index = 0; index < nready; index++){
//连接
int ready_fd = events[index].data.fd;
if(listen_fd == ready_fd)
{
printf("new connect wait\n");
int client_fd = accept(listen_fd,(struct sockaddr*)&client,&len);
if(client_fd == -1)
{
printf("accept error\n");
return -1;
}
printf("new connect client<%d>\n",client_fd);
event.events = EPOLLIN;
event.data.fd = client_fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,client_fd,&event);
if(--nready == 0) continue;
}else if(events[index].events & EPOLLIN)
{
memset(buf,0,BUF_LEN);
printf("recv ...\n");
int rlen = recv(ready_fd,buf,BUF_LEN,0); //注意rlen类型
printf("recv client:%d,rlen:%d,buf:%s\n",events[index].data.fd,rlen,buf);
if(rlen > 0)
{
send(index,buf,BUF_LEN,0);
}else if(rlen == 0){
printf("disconnect\n");
event.events = EPOLLIN;
event.data.fd = ready_fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,ready_fd,&event);
close(index);
}
//nready 提前退出
if (--nready == 0) break;
}
}
/*
*/
}
close(listen_fd);
return 0;
}
4.4 测试结果
文章评论