Linux I/O select、poll、epoll的使用方式和优缺点

Linux I/O select、poll、epoll的使用方式和优缺点

I/O复用

I/O复用使得程序可以同时监听多个文件描述符,对提升程序性能至关重要。
I/O复用虽然能同时监听多个文件描述符,但它本身是阻塞的。多个文件描述符同时就绪,如果不采取措施,程序只能顺序以此处理其中的每个文件描述符,使得服务器看起来像是串行工作。如果要实现并发,只能使用多进程或多线程等编程手段。

Linux下实现I/O复用的系统调用主要有select、poll和epoll。

select

select 系统调用的用途:在一段时间内,监听用户感兴趣的文件描述符的可读、可写和异常等事件。
select系统调用的原型:

1
2
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);

  1. nfds通常被设置为selcet监听的所有文件描述符中的最大值加1,因为文件描述符是从0开始计数的。
  2. readfds、writefds、exceptfds分别指向可读、可写、异常等事件对应的文件描述符集合。fd_set结构体仅包含一个整数数组,数组中每个元素的每一个位标记一个文件描述符。fd_set能容纳的文件描述符数量由FD_SETSIZE指定,限制了select能同时处理的文件描述符的总量。linux定义了一系列的宏来访问fd_set结构体中的位。

    1
    2
    3
    4
    void FD_CLR(int fd, fd_set *set);
    int FD_ISSET(int fd, fd_set *set);
    void FD_SET(int fd, fd_set *set);
    void FD_ZERO(fd_set *set);
  3. timeout用来设置select函数的超时时间。是timeval结构体类型的指针。内核修改它告诉应用程序select等待了多长时间,这个timeval的返回指不能完全信任,比如调用失败时timeval值是不确定的。

    1
    2
    3
    4
    struct timeval {
    long tv_sec; /* seconds */
    long tv_usec; /* microseconds */
    };

由定义可见,select提供了微秒级的定时方式,如果给timeout变量的tv_sec成员和tv_usec成员都传递0,select将立即返回。select失败时返回-1并设置errno。如果在select等待期间,程序接收到信号,则select立即返回-1,并设置errno为EINTR。

网络编程中socket可读的情况:

  1. socket内核接收缓冲区中的字节数大于或等于其低水位标记SO_REVLOWAT。此时可以无阻塞地读socket,并且读操作返回的字节数大于0。
  2. socket通信的对方关闭连接。此时对该socket的读操作返回0。
  3. 监听socket上有新的连接请求。
  4. socket上有未处理的错误。可用getsockopt来读取和清除错误。

网络编程中socket可写的情况:

  1. socket内核发送缓存区中的可用字节数大于或等于其最低水位标记SO_SNDLOWAT。此时可以无阻塞的写该socket,并且写操作返回的字节数大于0。
  2. socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
  3. socket使用非阻塞connect连接成功或失败(超时)之后。
  4. socket上由未处理的错误。可使用getsockopt来读取和清楚该错误。

网络编程中异常情况只有一种:socket上接收到带外数据。

poll

poll原型:

1
2
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

fds是pollfd结构类型的数组,指定所有感兴趣的文件描述符上发生的可读、可写和异常等时间。

1
2
3
4
5
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};

  1. fd成员指定文件描述符
  2. events告诉poll监听fd的哪些事件
  3. revents由内核修改,通知引用程序fd上实际发生了哪些事件

poll支持的事件:

事件 描述 是否可作为输入 是否可作为输出
POLLIN 数据(包括普通数据和优先数据)可读
POLLRDNORM 普通数据可读
POLLRDBAND 优先级带数据可读(Linux)不支持
POLLPRI 高优先级数据可读,比如TCP带外数据
POLLOUT 数据(包括普通数据和优先数据)可写
POLLWRNORM 普通数据可写
POLLWRBAND 优先级带数据可写
POLLRDHUP TCP连接被对方关闭,或者对方关闭了写操作,由GNU引入
POLLERR 错误
POLLHUP 挂起。比如通道的写端被关闭后,读端描述符将收到POLLHUP事件
POLLNVAL 文件描述符没有打开

nfds参数指定被监听事件集合fds的大小。
timeout参数指定poll的超时时间,单位是毫秒。当timeout为-1时,poll调用将永远阻塞,直到某个事件发生;当timeout为0时,poll调用将立即返回。
poll系统调用的返回值含义与select相同。

epoll

epoll维持一个内核事件表。
epoll是Linux特有的I/O复用函数。
epoll使用一组函数来完成任务。epoll将用户关心的文件描述符上的事件放在内核的一个事件表里。不需要像select和poll每次调用都重复传入文件描述符集或事件集。但epool需要一个额外的文件描述符,来唯一标识内核中的事件表。
内核事件表使用epoll_create函数创建。

1
2
#include <sys/epoll.h>
int epoll_create(int size);

size并不起作用,只是给内核一个提示,告诉其内核事件表的期望大小。
操作内核事件表:

1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

fd参数是要操作的文件描述符,op参数则指定操作类型。操作类型有如下3种:

  1. EPOLL_CTL_ADD 往事件表注册fd上的事件
  2. EPOLL_CTL_MOD 修改fd上的注册事件
  3. EPOLL_CTL_DEL 删除fd上的注册事件

event参数指定事件,是epoll_event结构指针类型。

1
2
3
4
5
6
7
8
9
10
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events epoll事件*/
epoll_data_t data; /* User data variable 用户数据*/
};

epoll支持的事件类型和poll基本相同,表示epoll类型的宏是在poll对应的宏前加上”E”。epoll两个额外的事件类型——EPOLLET和EPOLLONESHOT。
epoll_ctl成功返回0,失败返回-1并设置errno。
epoll_wait函数在一段时间内等待一组文件描述符上的事件。该函数返回就绪的文件描述符的个数,失败返回-1并设置errno。

1
2
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);

maxevents指定最多监听多少个事件,必须大于0。
epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表复制到events所指的数组中。

poll和epoll如何索引就绪文件描述符:
poll必须遍历所有已注册文件描述符并找到其中的就绪者
epoll仅遍历就绪的ret个文件描述符

epoll对文件描述符的操作有两种模式:LT(电平触发)模式和ET(边沿触发)模式LT是默认的工作模式,错误率比较小。LT模式下epoll相当于一个效率较高的poll。当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作文件描述符。ET是epoll的高效工作模式,错误率较大。
LT模式下,epoll_wait检测到事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件,下次调用epoll_wait时,epoll_wait还会再次通知该应用程序事件,直到事件被处理。
ET模式下,epoll_wait检测到事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件
每个使用ET模式的文件描述符都应该是非阻塞的。如果文件描述符是阻塞的,那么读或写操作就会因为没有后续的事件而一直处于阻塞状态(饥渴状态)。因为ET模式只通知以此,所以需要循环读、循环写,如果时阻塞模式当不能读写的时候会阻塞,所以要设置非阻塞模式。
epoll的EPOLLONESHOT事件,注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上的注册的一个可读、可写或者异常事件,且只触发一次。

select、poll、epoll的对比

系统调用 select poll epoll
事件集合 用户通过3个参数分别出入感兴趣的可读、可写、异常等事件,内核通过对这些参数的在线修改来反馈其中的就绪事件。使得用户每次调用select都要重置这3个参数。 统一处理所有事件类型,因此只需要一个事件参数集合。用户通过pollfd.events传入感兴趣的事件,内核通过修改pollfd.revents反馈其中就绪的事件 内核通过一个事件表直接管理用户感兴趣的所有事件。每次调用epoll_wait时,无须重复传入用户感兴趣的事件。epoll_wait系统调用的参数events仅用来反馈就绪的事件。
应用程序索引就绪文件描述符的时间复杂度 O(n) O(n) O(1)
最大支持文件描述符数 一般由最大值限制 65535 65535
工作模式 LT LT 支持ET高效模式
内核实现和工作效率 采用轮询方式来检测就绪事件,算法复杂度O(n) 采用轮询方式来检测就绪事件,算法时间复杂度O(n) 采用回调方式来检测就绪事件,算法时间复杂度为O(1)

简单实现

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <string.h>
#include <string>
#include <unistd.h>


using namespace std;

int main()
{
int clisock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(9999);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

if(connect(clisock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
{
cout<<"connect error "<<errno<<endl;
return 0;
}
cout<<"connect succ "<<endl;

fd_set rfds;
fd_set wfds;
struct timeval tv;
tv.tv_sec = 5;
int retval;
char buff[1024] = {0};

string input; //命令行输入
string serinput; //服务器返回

while(1)
{
FD_ZERO(&rfds);
FD_ZERO(&wfds);

FD_SET(0, &rfds);
FD_SET(1, &wfds);
FD_SET(clisock, &rfds);
FD_SET(clisock, &wfds);

retval = select(clisock+1, &rfds, &wfds, NULL, &tv);
if(retval == -1)
{
cout<<"select error "<<errno<<endl;
}
else
{
if(FD_ISSET(0, &rfds))
{
cout<<"可读"<<endl;
cin>>input;
}
if(FD_ISSET(1, &wfds) && !serinput.empty())
{
cout<<"server return: "<<serinput<<endl;
serinput.clear();
}
if(FD_ISSET(clisock, &rfds))
{
char buffer[1024] = {0};
if(recv(clisock, buffer, sizeof(buffer), 0) < 0)
{
close(clisock);
cout<<"close "<<clisock<<endl;
return 0;
}
serinput = string(buffer);
}
if(FD_ISSET(clisock, &wfds) && !input.empty())
{
if(send(clisock, input.c_str(), input.size(), 0) > 0)
{
cout<<"send "<<input<<endl;
}
else
{
close(clisock);
cout<<"close "<<clisock<<endl;
}
input.clear();
}
}
}

return 0;
}

select_server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <string.h>
#include <string>
#include <map>
#include <unistd.h>


using namespace std;

int main()
{
int sersock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(9999);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

int on = 1;
setsockopt(sersock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

if(bind(sersock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
{
cout<<"bind error "<<errno<<endl;
return 0;
}

if(listen(sersock, 5) == -1)
{
cout<<"listen error "<<errno<<endl;
return 0;
}

fd_set rfds;
fd_set wfds;
struct timeval tv;
int retval;
char buff[1024] = {0};

map<int, string> clisock;

while(1)
{
int maxfd = sersock;

FD_ZERO(&rfds);
FD_ZERO(&wfds);

FD_SET(sersock, &rfds);
for( auto pairs : clisock)
{
int c = pairs.first;
string s = pairs.second;
FD_SET(c, &rfds);
if(!s.empty())
{
cout<<"s info: "<<s<<endl;
FD_SET(c, &wfds);
}
cout<<"set fd: "<<c<<endl;
maxfd = c>maxfd ? c : maxfd;
}

tv.tv_sec = 5;
retval = select(maxfd+1, &rfds, &wfds, NULL, &tv);
if(retval < -1)
{
cout<<"select error "<<errno<<endl;
return 0;
}
else
{
if(FD_ISSET(sersock, &rfds))
{
struct sockaddr_in clientaddr;
socklen_t length = sizeof(clientaddr);
int c = accept(sersock, (struct sockaddr*)&clientaddr, &length);
clisock[c] = string("");
cout<<"new conn "<<c<<endl;
}
for( auto &pairs : clisock)
{
int c = pairs.first;
string &s = pairs.second;
if(FD_ISSET(c, &rfds))
{
char buffer[1024] = {0};
if(recv(c, buffer, sizeof(buffer), 0) > 0)
{
clisock[c] = string(buffer);
cout<<"recv: "<<clisock[c]<<endl;
}
else
{
clisock.erase(c);
close(c);
FD_CLR(c, &wfds);
FD_CLR(c, &rfds);
cout<<"recv close conn "<<c<<endl;
}
}
if(FD_ISSET(c, &wfds))
{
if(send(c, s.c_str(), s.size(), 0) > 0)
{
cout<<"send succ: "<<s<<endl;
}
else
{
clisock.erase(c);
cout<<"send close conn "<<c<<endl;
}
s.clear();
}
}
}
}

return 0;
}

poll_server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/poll.h>
#include <string.h>
#include <string>
#include <map>
#include <unistd.h>


using namespace std;

int main()
{
int sersock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(9999);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

int on = 1;
setsockopt(sersock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

if(bind(sersock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
{
cout<<"bind error "<<errno<<endl;
return 0;
}

if(listen(sersock, 5) == -1)
{
cout<<"listen error "<<errno<<endl;
return 0;
}

int retval;
struct pollfd fds[1024] = {-1, 0, 0};
fds[0].fd = sersock;
fds[0].events = POLLRDNORM;

char buff[1024] = {0};
int maxi = 0;

map<int, string> clisock;

for(;;)
{
retval = poll(fds, maxi+1, -1);
if(retval < -1)
{
cout<<"poll error "<<errno<<endl;
return 0;
}
else
{
if(POLLRDNORM == (fds[0].revents & POLLRDNORM))
{
struct sockaddr_in clientaddr;
socklen_t length = sizeof(clientaddr);
int c = accept(sersock, (struct sockaddr*)&clientaddr, &length);
clisock[c] = string("");
fds[++maxi].fd = c;
fds[maxi].events = POLLRDNORM;
fds[maxi].revents = 0;
cout<<"new conn "<<c<<endl;
}
for(int i=1; i<1024; ++i)
{
if(fds[i].fd < 0)
{
continue;
}
int &c = fds[i].fd;
auto &s = clisock[fds[i].fd];
if(POLLRDNORM == (fds[i].revents & POLLRDNORM))
{
fds[i].revents &= ~POLLRDNORM;
char buffer[1024] = {0};
if(recv(c, buffer, sizeof(buffer), 0) > 0)
{
clisock[c] = string(buffer);
cout<<"recv: "<<clisock[c]<<endl;
fds[i].events |= POLLWRNORM;
}
else
{
clisock.erase(c);
close(c);
fds[i].fd = -1;
fds[i].events = 0;
fds[i].revents = 0;
cout<<"recv close conn "<<c<<endl;
}
}
if(POLLWRNORM == (fds[i].revents & POLLWRNORM))
{
fds[i].revents &= ~POLLWRNORM;
if(send(c, s.c_str(), s.size(), 0) > 0)
{
cout<<"send succ: "<<s<<endl;
}
else
{
clisock.erase(c);
close(c);
cout<<"send close conn "<<c<<endl;
}
fds[i].events &= ~POLLWRNORM;
s.clear();
}
fds[i].revents = 0;
}
}
}

return 0;
}

epoll_server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <string.h>
#include <string>
#include <map>
#include <unistd.h>


using namespace std;

int main()
{
int sersock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(9999);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

int on = 1;
setsockopt(sersock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

if(bind(sersock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
{
cout<<"bind error "<<errno<<endl;
return 0;
}

if(listen(sersock, 5) == -1)
{
cout<<"listen error "<<errno<<endl;
return 0;
}

int retval;
int epollfd = epoll_create(1024);
if(epollfd < 0)
{
cout<<"epoll_create error"<<endl;
return 0;
}
struct epoll_event tmpfds;
tmpfds.events = EPOLLRDNORM;
tmpfds.data.fd = sersock;
epoll_ctl(epollfd, EPOLL_CTL_ADD, sersock, &tmpfds);

char buff[1024] = {0};
map<int, string> clisock;

for(;;)
{
struct epoll_event fds[1024];
retval = epoll_wait(epollfd, fds, 1024, -1);
if(retval < -1)
{
cout<<"epoll_wait error "<<errno<<endl;
return 0;
}
else
{
for(int i=0; i<retval; ++i)
{
if(fds[i].data.fd == sersock && (fds[i].events & EPOLLRDNORM) == EPOLLRDNORM)
{
struct sockaddr_in clientaddr;
socklen_t length = sizeof(clientaddr);
int c = accept(sersock, (struct sockaddr*)&clientaddr, &length);
clisock[c] = string("");
tmpfds.data.fd = c;
tmpfds.events = EPOLLRDNORM;
epoll_ctl(epollfd, EPOLL_CTL_ADD, c, &tmpfds);
cout<<"new conn "<<c<<endl;
continue;
}
int c = fds[i].data.fd;
auto &s = clisock[c];
if(fds[i].data.fd != sersock && (fds[i].events & EPOLLRDNORM) == EPOLLRDNORM)
{

tmpfds.data.fd = c;
tmpfds.events = EPOLLWRNORM | EPOLLRDNORM;
epoll_ctl(epollfd, EPOLL_CTL_MOD, c, &tmpfds);
char buffer[1024] = {0};
if(recv(c, buffer, sizeof(buffer), 0) > 0)
{
clisock[c] = string(buffer);
cout<<"recv: "<<clisock[c]<<endl;
}
else
{
clisock.erase(c);
close(c);
epoll_ctl(epollfd, EPOLL_CTL_DEL, c, NULL);
cout<<"recv close conn "<<c<<endl;
}
}
if((fds[i].events & EPOLLWRNORM) == EPOLLWRNORM)
{
tmpfds.data.fd = c;
tmpfds.events = EPOLLRDNORM;
epoll_ctl(epollfd, EPOLL_CTL_MOD, c, &tmpfds);
if(send(c, s.c_str(), s.size(), 0) > 0)
{
cout<<"send succ: "<<s<<endl;
}
else
{
clisock.erase(c);
close(c);
epoll_ctl(epollfd, EPOLL_CTL_DEL, c, NULL);
cout<<"send close conn "<<c<<endl;
}
s.clear();
}
}
}
}

return 0;
}