Linux I/O 复用 epoll源码解析

Linux I/O epoll 源码分析

目录

epoll

epoll的简单原理

  1. 执行epoll_create时,创建了红黑树和就绪list链表。
  2. 执行epoll_ctl时,如果增加fd(socket),则检查在红黑树中是否存在,存在立即返回,不存在则添加到红黑树上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪list链表中插入数据。
  3. 执行epoll_wait时立刻返回准备就绪链表里的数据即可。

epoll_create

epoll_create主要做两件事

  1. 创建并初始化一个eventpoll结构体变量
  2. 创建epoll的file结构,并指定file的private_data指针指向刚创建的eventpoll变量,这样,只要根据epoll文件描述符epfd就可以拿到file进而就拿到了eventpoll变量了,该eventpoll就是epoll_ctl和epoll_wait工作的场所
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//epoll文件系统的相关实现
//epoll文件系统初始化,在系统启动时会调用 .init段
static int __init eventpoll_init(void)
{
struct sysinfo si;
si_meminfo(&si);
//限制可添加到epoll的最多的描述符数量
max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /
EP_ITEM_COST;
//检查递归检查队列
ep_nested_calls_init(&poll_loop_ncalls);
ep_nested_calls_init(&poll_safewake_ncalls);
ep_nested_calls_init(&poll_readywalk_ncalls);
//在内核创建内存池
//epoll用kmem_cache_create(slab分配器)分别用来分配epitem和eppoll_entry
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
error = ep_alloc(&ep); //为ep在堆上申请空间并初始化 eventpoll
if (error < 0)
return error;
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));//会调用alloc_fd()分配文件描述符
if (fd < 0) {
error = fd;
goto out_free_ep;
}
//创建与eventpoll结构体相对应的file结构体,匿名文件,ep保存在file->private_data结构体中
//static const struct file_operations eventpoll_fops = {
// .release = ep_eventpoll_release,
// .poll = ep_eventpoll_poll
//};
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));

if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
//建立文件描述符和file之间的联系
fd_install(fd, file);
ep->file = file;
return fd;

out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep); //释放ep eventpoll
return error;
}

epoll_ctl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//epfd    epoll内核事件表的文件描述符
//op 要进行的操作类型
//fd 要监测的文件描述符
//event 要监测的事件
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;

error = -EFAULT;
//判断参数的合法性,将event从用户空间复制到内核空间
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;

error = -EBADF;
file = fget(epfd); //根据文件描述符得到相应的文件对象,内核事件表
if (!file)
goto error_return;

tfile = fget(fd); //要监测的文件
if (!tfile)
goto error_fput;

error = -EPERM;
if (!tfile->f_op || !tfile->f_op->poll) //要监测文件设备poll方法的检查
goto error_tgt_fput;
error = -EINVAL;
if (file == tfile || !is_file_epoll(file))
goto error_tgt_fput;
ep = file->private_data; //获取epoll对应的eventpoll结构体

if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {
mutex_lock(&epmutex);
did_lock_epmutex = 1;
}
if (op == EPOLL_CTL_ADD) {
if (is_file_epoll(tfile)) {
error = -ELOOP;
if (ep_loop_check(ep, tfile) != 0) { //循环检测
clear_tfile_check_list();
goto error_tgt_fput;
}
} else
list_add(&tfile->f_tfile_llink, &tfile_check_list);
}

mutex_lock_nested(&ep->mtx, 0); //互斥锁加锁
//防止重复添加(在ep的红黑树中查找是否存在这个fd),存在返回epitem,不存在返回NULL
epi = ep_find(ep, tfile, fd);

error = -EINVAL;
switch (op) { //不同的操作
case EPOLL_CTL_ADD:
if (!epi) { //红黑树上不存在这个节点
epds.events |= POLLERR | POLLHUP; //确保“出错,连接挂起”被当作感兴趣的事件
error = ep_insert(ep, &epds, tfile, fd); //添加
} else
error = -EEXIST;
clear_tfile_check_list(); //清空需要监测的文件的检查列表
break;
case EPOLL_CTL_DEL:
if (epi) //红黑树上存在这个节点
error = ep_remove(ep, epi); //删除
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx); //互斥锁,解锁

error_tgt_fput:
if (did_lock_epmutex)
mutex_unlock(&epmutex);

fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
struct epitem *epi;
struct ep_pqueue epq;
//struct ep_pqueue { poll_table pt; struct epitem *epi; };

if (unlikely(atomic_read(&ep->user->epoll_watches) >=
max_user_watches))
return -ENOSPC;
//初始化分配epi epitem
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;

//epi初始化
INIT_LIST_HEAD(&epi->rdllink); //就绪的事件链表
INIT_LIST_HEAD(&epi->fllink); //连接被监听的文件
INIT_LIST_HEAD(&epi->pwqlist); //poll等待队列
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;

//初始化的epq
epq.epi = epi;
//epq.pt->qproc = ep_ptable_queue_proc 回调函数的设置
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

//内部会调用ep_ptable_queue_proc, 在文件对应的wait queue head 上注册回调函数
//并返回当前文件的状态 
revents = tfile->f_op->poll(tfile, &epq.pt);

error = -ENOMEM;
if (epi->nwait < 0) //f_op->poll过程出错
goto error_unregister;

spin_lock(&tfile->f_lock);
//添加当前的epitem到要监测文件的tfile->f_ep_links链表
list_add_tail(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
//插入epi到红黑树
ep_rbtree_insert(ep, epi);

error = -EINVAL;
if (reverse_path_check()) //唤醒风暴检查
goto error_remove_epi;

spin_lock_irqsave(&ep->lock, flags);
//文件已经就绪,插入到epitem的就绪链表rdllist
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);

if (waitqueue_active(&ep->wq))
//通知epoll_wait,调用回调函数唤醒epoll_wait上的进程
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait)) //先不通知调用eventpoll_poll的进程
pwake++;
}

spin_unlock_irqrestore(&ep->lock, flags);
atomic_inc(&ep->user->epoll_watches);

if (pwake)
ep_poll_safewake(&ep->poll_wait); //安全通知调用eventpoll_poll的进程

return 0;

error_remove_epi:
spin_lock(&tfile->f_lock);
//删除文件上的epi
if (ep_is_linked(&epi->fllink))
list_del_init(&epi->fllink);
spin_unlock(&tfile->f_lock);
//从红黑树中删除
rb_erase(&epi->rbn, &ep->rbr);

error_unregister:
//从文件的wait_queue中删除,释放epitem关联的所有epoll_entry
ep_unregister_pollwait(ep, epi);
spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink))
list_del_init(&epi->rdllink);
spin_unlock_irqrestore(&ep->lock, flags);
//释放epi
kmem_cache_free(epi_cache, epi);
return error;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// tfile->f_op->poll(tfile, &epq.pt)   ->   poll_wait()    ->   ep_ptable_queue_proc()
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;

if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
//pwq初始化,pwq->wait.func = ep_poll_callback; //唤醒回调函数
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead; //监测文件的等待队列头
pwq->base = epi; //指向epitem
add_wait_queue(whead, &pwq->wait); //将等待队列项,加入到等待队列中去
list_add_tail(&pwq->llink, &epi->pwqlist); //将等待队列项保存到epi->pwqlist链表中
epi->nwait++; //poll操作中事件的个数加1
} else {
epi->nwait = -1; //标识调用出错
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int ep_remove(struct eventpoll *ep, struct epitem *epi)
{
unsigned long flags;
struct file *file = epi->ffd.file; //获得要删除文件描述符的文件结构

ep_unregister_pollwait(ep, epi); //删除被监听文件的等待队列项

spin_lock(&file->f_lock);
if (ep_is_linked(&epi->fllink))
list_del_init(&epi->fllink); //从epitem.fllink中删除
spin_unlock(&file->f_lock);

rb_erase(&epi->rbn, &ep->rbr); //从红黑树中删除

spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink))
list_del_init(&epi->rdllink); //从epitem.rdllink中删除(就绪队列)
spin_unlock_irqrestore(&ep->lock, flags);

kmem_cache_free(epi_cache, epi); //释放空间

atomic_dec(&ep->user->epoll_watches);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event)
{
int pwake = 0;
unsigned int revents;

epi->event.events = event->events;
epi->event.data = event->data;

smp_mb();

revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL); //获得事件掩码

if (revents & event->events) {
spin_lock_irq(&ep->lock);
if (!ep_is_linked(&epi->rdllink)) {
//将已就绪的等待队列项,加入到ep->rdllist链表
list_add_tail(&epi->rdllink, &ep->rdllist);
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irq(&ep->lock);
}

if (pwake)
ep_poll_safewake(&ep->poll_wait);

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
//检查输入数据有效性
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
error = -EBADF;
file = fget(epfd);
if (!file)
goto error_return;

error = -EINVAL;
if (!is_file_epoll(file))
goto error_fput;

ep = file->private_data; //获得eventpoll结构

error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
//timeout < 0 或者timeout >= EP_MAX_MSTIMEO
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;

retry:
spin_lock_irqsave(&ep->lock, flags);

res = 0;
if (list_empty(&ep->rdllist)) { //事件就绪队列为空
init_waitqueue_entry(&wait, current); //对当前进程设置等待项
//static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
//{
// q->flags = 0;
// q->private = p;
// q->func = default_wake_function;
//}
wait.flags |= WQ_FLAG_EXCLUSIVE; //标记等待的进程是互斥进程
__add_wait_queue(&ep->wq, &wait); //加入ep_wait()等待队列

for (;;) {
//进程状态的设置,TASK_INTERRUPTIBLE表示进程可以被信号和wake_up()唤醒
set_current_state(TASK_INTERRUPTIBLE);
//就绪队列为不空 或者 睡眠时间为0
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
if (signal_pending(current)) { //收到信号
res = -EINTR;
break;
}

spin_unlock_irqrestore(&ep->lock, flags);
//进入睡眠等待ep_poll_callback()将当前进程唤醒或超时,返回值是剩余的时间。
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
} //end for( ; ; )
__remove_wait_queue(&ep->wq, &wait); //将等待队列项从ep_wait()等待队列中移除

set_current_state(TASK_RUNNING); //表示TASK_RUNNING运行或就绪状态
} //end if
//ep->rdllist不空 或者 ep->ovflist不空 eavail为true
eavail = ! list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep->lock, flags);
//如果没有被信号中断,并且有事件就绪,但发送成功的文件描述符为0,并且没有超时
//则跳转到retry标签处,重新等待文件状态就绪
if ( !res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res; //返回获取的事件的个数或者错误码
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//发送结果到用户空间
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents; //需要检测的文件描述符个数
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0);
}
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *, struct list_head *, void *),
void *priv,
int depth)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);

mutex_lock_nested(&ep->mtx, depth);

spin_lock_irqsave(&ep->lock, flags);
//将ep->rdllist链表添加到txlist链表中去,使ep->rdllist链表为空
list_splice_init(&ep->rdllist, &txlist);
//标记ep_scan_ready_list()准备向用户空间传递事件,当有唤醒函数ep_poll_callback()调用时
//将就绪的文件描述符的epitem实例加入到ovflist链表中。
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);

error = (*sproc)(ep, &txlist, priv); //ep_send_events_proc

spin_lock_irqsave(&ep->lock, flags);
//在执行上面代码期间,又有可能有就绪事件,这样就进入ep->ovflist队列,
//将ovflist链表中的就绪文件描述符加入到rdllist #define EP_UNACTIVE_PTR ((void *) -1L)
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
//如果epi->rdllink为空,将epi加入到ep->rdllist中
if (!ep_is_linked(&epi->rdllink)) //list_empty(&epi->rdllink)
list_add_tail(&epi->rdllink, &ep->rdllist);
}
ep->ovflist = EP_UNACTIVE_PTR; //标记向用户空间传递事件结束

list_splice(&txlist, &ep->rdllist); //如果txlist不为空,将txlist链表加入到rdllist链表中
if ( ! list_empty(&ep->rdllist)) { //如果文件描述符就绪链表ep->rdllist不为空
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq); //如果ep->wq不空,唤醒ep->wq上的进程
if (waitqueue_active(&ep->poll_wait))
pwake++; //如果ep->poll_wait不空
}
spin_unlock_irqrestore(&ep->lock, flags);

mutex_unlock(&ep->mtx);
if (pwake)
ep_poll_safewake(&ep->poll_wait); //通知调用了poll的进程
return error; //返回发送的就绪的文件描述符个数或错误码
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;

for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents; ) {
epi = list_first_entry(head, struct epitem, rdllink); //得到epitem结构体

list_del_init(&epi->rdllink); //从就绪链表中删除该epitem
//立即返回当前文件的就绪事件
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) & epi->event.events;

if (revents) {
//将就绪事件的poll_event发送至用户空间
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head); //发送失败,将epi重新加入head
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
//#define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET)
if (epi->event.events & EPOLLONESHOT) //如果文件描述符检测EPOLLONESHOT
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
//未设置为ET(非边缘触发)模式,就将epi->rdllink加入到ep->rdllist链表中
list_add_tail(&epi->rdllink, &ep->rdllist);
}
} //end if revents
} //end for
return eventcnt; //返回发送的就绪的文件描述符个数
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//ep_poll_callback唤醒回调函数
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait); //得到epitem
struct eventpoll *ep = epi->ep; //得到eventpoll

if ((unsigned long)key & POLLFREE) { //有错误发生
ep_pwq_from_wait(wait)->whead = NULL; //等待队列头置为NULL
list_del_init(&wait->task_list); //从等待队列中删除等待队列项
}

spin_lock_irqsave(&ep->lock, flags);
//#define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET)
if (!(epi->event.events & ~EP_PRIVATE_BITS))//有非EPOLLONESHOT | EPOLLET不懂!!!
goto out_unlock;

if (key && !((unsigned long) key & epi->event.events))
goto out_unlock; //没有事件发生

//ep->ovflist != EP_UNACTIVE_PTR成立时,ep_scan_ready_list()正在向用户空间传递事件。
//如果当前进程正在向用户空间传递事件,则将当前的事件对应的epitem实例加入到ovflist链表中。
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
//如果epi->next不等于EP_UNACTIVE_PTR,
//则说明已经添加到ovflist链表中,就不用再添加了
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist; //把epi放到ovflist链表中
ep->ovflist = epi;
}
goto out_unlock;
}

if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist); //把epitem放到strcut eventpoll的rdllist中去
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq); //唤醒调用epoll_wait()函数时睡眠的进程
if (waitqueue_active(&ep->poll_wait))
pwake++;

out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);

if (pwake)
ep_poll_safewake(&ep->poll_wait);

return 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//由于epoll自身也是文件系统,其描述符也可以被poll/select/epoll监视,因此需要实现poll方法
static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
{
int pollflags;
struct eventpoll *ep = file->private_data;
//把等待队列项插入到ep->poll_wait
poll_wait(file, &ep->poll_wait, wait);
//扫描就绪的文件列表,调用每个文件上的poll检查是否真的就绪,然后复制到用户空间,
//文件列表中有可能有epoll文件,调用poll的时候有可能产生递归,
//调用所以用ep_call_nested包装一下,防止死循环和过深的调用
pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,
ep_poll_readyevents_proc, ep, ep, current);

return pollflags != -1 ? pollflags : 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static int ep_poll_readyevents_proc(void *priv, void *cookie, int call_nests)
{
return ep_scan_ready_list(priv, ep_read_events_proc, NULL, call_nests + 1);
}
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv,
int depth)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);

mutex_lock_nested(&ep->mtx, depth);
spin_lock_irqsave(&ep->lock, flags);
//移动ep->rdllist到新的链表txlist
list_splice_init(&ep->rdllist, &txlist);
ep->ovflist = NULL; //改变ovflist的状态
spin_unlock_irqrestore(&ep->lock, flags);

error = (*sproc)(ep, &txlist, priv); //ep_read_events_proc

spin_lock_irqsave(&ep->lock, flags);
//调用ep_read_events_proc时可能出现了新的事件,遍历这些新的事件将其插入到rdllink
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
//epi不在rdllink,插入
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
}
ep->ovflist = EP_UNACTIVE_PTR; //还原ep->ovflist的状态
list_splice(&txlist, &ep->rdllist); //将处理后的txlist链接到rdllist

if (!list_empty(&ep->rdllist)) {
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq); //唤醒epoll_wait
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);

mutex_unlock(&ep->mtx);
if (pwake)
ep_poll_safewake(&ep->poll_wait); //安全唤醒外部的事件通知机制
return error;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static int ep_read_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct epitem *epi, *tmp;

list_for_each_entry_safe(epi, tmp, head, rdllink) {
if (epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
epi->event.events)
//只要有一个文件描述符就绪返回可读 或 普通数据可写
return POLLIN | POLLRDNORM;
else {
//文件描述符未就绪,但在就绪队列中,将其移除
list_del_init(&epi->rdllink);
}
}
return 0;
}
1
2
3
4
5
6
7
8
9
static void ep_poll_safewake(wait_queue_head_t *wq)
{
int this_cpu = get_cpu();

ep_call_nested(&poll_safewake_ncalls, EP_MAX_NESTS,
ep_poll_wakeup_proc, NULL, wq, (void *) (long) this_cpu);

put_cpu();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int ep_poll_wakeup_proc(void *priv, void *cookie, int call_nests)
{
ep_wake_up_nested((wait_queue_head_t *) cookie, POLLIN,
1 + call_nests);
return 0;
}

static inline void ep_wake_up_nested(wait_queue_head_t *wqueue,
unsigned long events, int subclass)
{
//唤醒所有正在等待此epfd的select/epoll/poll等
//如果唤醒的是epoll就可能唤醒所有其他的epoll,产生连锁反应
wake_up_poll(wqueue, events);
}

epoll中的递归死循环和深度检查

如果epoll之间相互监视就有可能导致死循环。epoll的实现中,所有可能产生递归调用的函数都由函函数ep_call_nested进行包裹,递归调用过程中出现死循环或递归过深就会打破死循环和递归调用直接返回。该函数的实现依赖于一个外部的全局链表nested_call_node(不同的函数调用使用不同的节点),每次调用可能发生递归的函数(nproc)就向链表中添加一个包含当前函数调用上下文ctx(进程,CPU,或epoll文件)和处理的对象标识cookie的节点,通过检测是否有相同的节点就可以知道是否发生了死循环,检查链表中同一上下文包含的节点个数就可以知道递归的深度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//限制epoll中直接或间接递归调用的深度并防止死循环
//ctx:任务运行上下文(进程,CPU等)
//cokie:每个任务的标识
//priv:任务运行需要的私有数据
static int ep_call_nested(struct nested_calls *ncalls, int max_nests,
int (*nproc)(void *, void *, int), void *priv,
void *cookie, void *ctx)
{
int error, call_nests = 0;
unsigned long flags;
struct list_head *lsthead = &ncalls->tasks_call_list;
struct nested_call_node *tncur;
struct nested_call_node tnode;

spin_lock_irqsave(&ncalls->lock, flags);
//检查原有的嵌套调用链表ncalls,查看是否有深度超过限制的情况
list_for_each_entry(tncur, lsthead, llink) {
//同一上下文(ctx)中有相同的任务(cookie)说明产生了死循环
//同一上下文的递归深度call_nests超过限制
if (tncur->ctx == ctx &&
(tncur->cookie == cookie || ++call_nests > max_nests)) {
error = -1;
goto out_unlock;
}
}
//将当前的任务请求添加到调用列表
tnode.ctx = ctx;
tnode.cookie = cookie;
list_add(&tnode.llink, lsthead);

spin_unlock_irqrestore(&ncalls->lock, flags);
//nproc 可能会导致递归调用(直接或间接)ep_call_nested ,如果发生递归调用, 那么在此函数返回之前, ncalls 又会被加入额外的节点, 这样通过前面的检测就可以知道递归调用的深度 
error = (*nproc)(priv, cookie, call_nests);
spin_lock_irqsave(&ncalls->lock, flags);
//从链表中删除当前任务
list_del(&tnode.llink);
out_unlock:
spin_unlock_irqrestore(&ncalls->lock, flags);

return error;
}

循环检测

循环检查(ep_loop_check),该函数递归调用ep_loop_check_proc利用ep_call_nested来实现epoll之间相互监视的死循环。因为ep_call_nested中已经对死循环和过深的递归做了检查,实际的ep_loop_check_proc的实现只是递归调用自己。其中的visited_list和visited标记完全是为了优化处理速度,如果没有visited_list和visited标记函数也是能够工作的。该函数中得上下文就是当前的进程,cookie就是正在遍历的epoll结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int ep_loop_check(struct eventpoll *ep, struct file *file)
{
int ret;
struct eventpoll *ep_cur, *ep_next;

ret = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
ep_loop_check_proc, file, ep, current);
//清除链表和标志
list_for_each_entry_safe(ep_cur, ep_next, &visited_list,
visited_list_link) {
ep_cur->visited = 0;
list_del(&ep_cur->visited_list_link);
}
return ret;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static int ep_loop_check_proc(void *priv, void *cookie, int call_nests)
{
int error = 0;
struct file *file = priv;
struct eventpoll *ep = file->private_data;
struct eventpoll *ep_tovisit;
struct rb_node *rbp;
struct epitem *epi;

mutex_lock_nested(&ep->mtx, call_nests + 1);
ep->visited = 1; //标志当前为已遍历
list_add(&ep->visited_list_link, &visited_list);
//遍历所有ep监视的文件
for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
epi = rb_entry(rbp, struct epitem, rbn);
if (unlikely(is_file_epoll(epi->ffd.file))) {
ep_tovisit = epi->ffd.file->private_data;
if (ep_tovisit->visited) //跳过先前已遍历的,避免循环检查
continue;
//所有ep监视的未遍历的epoll
error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
ep_loop_check_proc, epi->ffd.file,
ep_tovisit, current);
if (error != 0)
break;
} else {
//文件不在file->tfile_check_list中,添加
//最外层的epoll需要检查epoll监视的文件
if (list_empty(&epi->ffd.file->f_tfile_llink))
list_add(&epi->ffd.file->f_tfile_llink,
&tfile_check_list);
}
}
mutex_unlock(&ep->mtx);
return error;
}

唤醒风暴

当文件状态发生改变时,会唤醒监听在其上的epoll文件,而这个epoll文件还可能唤醒其他的epoll文件,这种连续的唤醒就形成了一个唤醒路径,所有的唤醒路径就形成了一个有向图。如果文件对应的epoll唤醒有向图的节点过多,那么文件状态的改变就会唤醒所有的这些epoll(可能会唤醒很多进程,这样的开销是很大的),而实际上一个文件经过少数epoll处理以后就可能从就绪转到未就绪,剩余的epoll虽然认为文件已就绪而实际上经过某些处理后已不可用。epoll的实现中考虑到了此问题,在每次添加新文件到epoll中时,就会首先检查是否会出现这样的唤醒风暴。
该函数的实现逻辑是这样的,递归调用reverse_path_check_proc遍历监听在当前文件上的epoll文件,在reverse_pach_check_proc中统计并检查不同路径深度上epoll的个数,从而避免产生唤醒风暴。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int reverse_path_check(void)
{
int length = 0;
int error = 0;
struct file *current_file;
//遍历全局tfile_chack_list中的文件,第一级
list_for_each_entry(current_file, &tfile_check_list, f_tfile_llink) {
length++;
//初始化
path_count_init();
//限制递归的深度,并检查每个深度上唤醒的epoll数量
error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,
reverse_path_check_proc, current_file,
current_file, current);
if (error)
break;
}
return error;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
static int reverse_path_check_proc(void *priv, void *cookie, int call_nests)
{
int error = 0;
struct file *file = priv;
struct file *child_file;
struct epitem *epi;
list_for_each_entry(epi, &file->f_ep_links, fllink) {
//遍历监视file的epoll
child_file = epi->ep->file;
if (is_file_epoll(child_file)) {
if (list_empty(&child_file->f_ep_links)) {
//没有其他的epoll监视当前的这个epoll,已经是叶子了
if (path_count_inc(call_nests)) {
error = -1;
break;
}
} else {
//遍历监视这个epoll文件的epoll,递归调用
error = ep_call_nested(&poll_loop_ncalls,
EP_MAX_NESTS,
reverse_path_check_proc,
child_file, child_file,
current);
}
if (error != 0)
break;
} else {
printk(KERN_ERR "reverse_path_check_proc: "
"file is not an ep!\n");
}
}
return error;
}
#define PATH_ARR_SIZE 5
//深度限制
static const int path_limits[PATH_ARR_SIZE] = { 1000, 500, 100, 50, 10 };
//计算出来的深度
static int path_count[PATH_ARR_SIZE];

static int path_count_inc(int nests)
{
if (nests == 0)
return 0;
if (++path_count[nests] > path_limits[nests])
return -1;
return 0;
}

static void path_count_init(void)
{
int i;
for (i = 0; i < PATH_ARR_SIZE; i++)
path_count[i] = 0;
}

epoll_create和epoll_ctl函数调用过程

epoll_wait函数调用主要过程

epoll结构体间关系