Linux I/O 复用 select源码解析

Linux I/O select 源码分析

select

select系统调用原型:

1
2
3
4
5
6
7
8
9
10
11
12
13
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
struct timeval {
long tv_sec; //秒
long tv_usec; //微秒 10E-6秒
};
struct timespec {
long tv_sec; //秒
long tv_nsec; //纳秒 10E-9秒
};
typedef struct{
unsigned long fds_bits [1024/(8 * sizeof(unsigned long))]; //unsigned long fds_bits[32]
//32*4*8 = 1024
}fd_set;

select实现在fs/select.c中,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//select主流程函数
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
struct timespec end_time, *to = NULL;
struct timeval tv;
int ret;
if (tvp) { //超时时间非空
if (copy_from_user(&tv, tvp, sizeof(tv))) //把超时时间从用户空间复制到内核空间
return -EFAULT;
to = &end_time;
//计算timespec类型的超时时间
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}
ret = core_sys_select(n, inp, outp, exp, to); //select实现的核心函数
//复制剩余的超时时间拷贝到用户空间
ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
return ret; //返回就绪的文件描述符的个数
}

core_sys_select实现select事件监听的主要功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec *end_time)
{
fd_set_bits fds;

/*
typedef struct {
unsigned long *in, *out, *ex;
unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;
*/

void *bits;
int ret, max_fds;
unsigned int size;
struct fdtable *fdt;

/*
struct fdtable { //进程的文件描述符表
unsigned int max_fds;
struct file ** fd; //文件对象指针数组,长度放在max_fds中
fd_set *close_on_exec;
fd_set *open_fds;
struct rcu_head rcu;
struct fdtable *next;
};
*/

long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; //long stack_fds[64];

ret = -EINVAL;
if (n < 0)
goto out_nofds;

rcu_read_lock(); //加锁
//取得进程中的fdtable(文件描述符表)
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds; //取得系统当前允许的最大文件描述符
rcu_read_unlock(); //解锁
if (n > max_fds) //如果传入的n大于当前进程最大的文件描述符,修改n
n = max_fds;
size = FDS_BYTES(n); //按一个文件描述符一个bit位计算一个字段所需字节数

/*
#define FDS_BITPERLONG (8*sizeof(long))
#define FDS_LONGS(nr) (((nr)+FDS_BITPERLONG-1)/FDS_BITPERLONG)
#define FDS_BYTES(nr) (FDS_LONGS(nr)*sizeof(long))
*/

bits = stack_fds; //bits先指向栈上空间,小对象使用栈上空间
if (size > sizeof(stack_fds) / 6) { //栈上空间不足,全部使用堆上空间,使用kmalloc申请空间
ret = -ENOMEM;
bits = kmalloc(6 * size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
//从用户空间拷贝fd_set到内核空间,调用copy_from_user()
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
//将存放返回状态的字段清0
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
ret = do_select(n, &fds, end_time); //核心函数
if (ret < 0) //有错误
goto out;
if (!ret) { //ret == 0,没有设备就绪
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}
//将结果从内核空间拷贝到用户空间,调用__copy_to_user()
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
out:
//如果申请了空间kmalloc,释放空间
if (bits != stack_fds)
kfree(bits);
out_nofds:
return ret; //返回就绪的文件描述符的个数
}

一些select内核实现需要了解的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
struct poll_wqueues {    //非常重要,一个调用select()的应用程序只存在一个poll_wqueues结构体,                    //用来统一辅佐实现这个进程中所有待检测的fd的轮询工作
poll_table pt;
struct poll_table_page *table; //动态申请的物理页挂在它上
struct task_struct *polling_task; //保存当前调用select的用户进程struct task_struct结构体
//struct task_struct即进程控制块PCB结构
int triggered; // 当前用户进程被唤醒后置成1,以免该进程接着进睡眠
int error; //错误码
int inline_index; //数组inline_entries的引用下标
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
typedef struct poll_table_struct {
poll_queue_proc qproc;
unsigned long key;
} poll_table;
struct poll_table_page { //申请的物理页都会将起始地址强制转换成该结构体指针
struct poll_table_page * next; //指向下一个申请的物理页
struct poll_table_entry * entry; //指向entries[]中首个待分配(空的)poll_table_entry结构
struct poll_table_entry entries[0]; //该page页后边都是待分配的poll_table_entry结构体
};
struct poll_table_entry { //每个fd的都有一个该结构体
struct file *filp; //指向特定fd对应的file结构体
unsigned long key; //等待特定fd对应硬件设备的事件掩码
wait_queue_t wait; //代表调用select()的应用进程,等待在fd对应设备的特定事件(读或 //写)的等待队列头上的等待队列项
wait_queue_head_t *wait_address; //设备驱动程序中特定事件的等待队列头
};
typedef struct __wait_queue wait_queue_t;
typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int flags, void *key);
struct __wait_queue { //等待队列项
unsigned int flags; //标记该等待的进程是互斥进程还是非互斥进程
#define WQ_FLAG_EXCLUSIVE 0x01 //互斥进程
void *private; //指向poll_wqueues
wait_queue_func_t func; //等待队列的回调函数
struct list_head task_list; //链表的前向和后向指针
};
struct __wait_queue_head { //等待队列头
spinlock_t lock; //锁
struct list_head task_list;
};
typedef struct __wait_queue_head wait_queue_head_t;
struct list_head {
struct list_head *next, *prev;
};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;

int retval, i, timed_out = 0;
unsigned long slack = 0;

rcu_read_lock();
//检查fds中fd的有效性(要求fd必须打开),并获取当前最大的fd
retval = max_select_fd(n, fds);
rcu_read_unlock();

if (retval < 0)
return retval;
n = retval;
//初始化table,设置函数指针table.pt.qproc为__pollwait
poll_initwait(&table);

wait = &table.pt;
//超时时间设为0时
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait = NULL; //不执行__pollwait
timed_out = 1; //timed_out设置为1,表示已超时。
}

if (end_time && !timed_out) //设置的超时时间不为0
//超时时间转换
slack = estimate_accuracy(end_time);

retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
//遍历所有的描述符(n个),i文件描述符 从这里可以看出select使用了遍历的方式
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
//file_operations定义了一系列对字符设备的操作方法
const struct file_operations *f_op = NULL;
struct file *file = NULL;
//检查当前的位置中的描述符
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex; //组合要监测的事件
if (all_bits == 0) { //没有需要监测的描述符,下一个位置
// #define __NFDBITS (8 * sizeof(unsigned long))
i += __NFDBITS;
continue;
}
//每次循环32次 4字节 32个位 使用位图的方式
for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) { //第一次bit==1
int fput_needed;
if (i >= n) //是否超出了最大待检测的fd
break;
//不需要监听描述符i,直接跳过
if (!(bit & all_bits))
continue;
//根据i指定的索引,从当前进程描述符中取得文件结构,并增加引用计数
file = fget_light(i, &fput_needed);
if (file) { //file存在
f_op = file->f_op;
//#define DEFAULT_POLLMASK (POLLIN | POLLOUT | POLLRDNORM | POLLWRNORM)
mask = DEFAULT_POLLMASK;
if (f_op && f_op->poll) {
//设置当前fd待检测的事件掩码,wait->key
wait_key_set(wait, in, out, bit);

//获取当前的就绪状态,并添加进程到文件的对应等待队列中
mask = (*f_op->poll)(file, wait);
}
//释放通过fget_light得到的file指针,实际就是减少引用计数
fput_light(file, fput_needed);
//检测文件i是否已有事件就绪
//mask是每一个 (*f_op->poll)()程序返回的设备状态掩码
if ((mask & POLLIN_SET) && (in & bit)) {
//mask&POLLIN_SET该文件是否可写,in&bit应用程序是否关心该文件可写
res_in |= bit; //fd对应的设备可写
retval++; //就绪的文件描述符个数加1
//如果已有就绪事件就不再向其他文件的等待队列中添加回调函数。
//避免重复执行__pollwait()
wait = NULL;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait = NULL;
}
}
}
//根据poll()的结果,写回到输出字段里面
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
wait = NULL; //避免重复执行__pollwait()
//有设备就绪或有异常 || 超时 || 有终止信号出现
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}
//第一轮循环,时间的转换timespec到ktime
if (end_time && !to) {
expire = timespec_to_ktime(*end_time);
to = &expire;
}
//当前进程从这里进入睡眠,等待直到超时,或由注册的函数唤醒超时,返回0;被唤醒,返回-EINTR
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1; //超时后,将timed_out设为1,再遍历一次文件描述符后,退出循环
}
//清除等待队列
poll_freewait(&table);
return retval; //返回就绪的文件描述符的个数
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->polling_task = current;
pwq->triggered = 0;
pwq->error = 0;
pwq->table = NULL;
pwq->inline_index = 0;
}
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->qproc = qproc; //__pollwait
pt->key = ~0UL; /* all events enabled */
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)
#define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
#define POLLEX_SET (POLLPRI)
static inline void wait_key_set(poll_table *wait, unsigned long in,
unsigned long out, unsigned long bit)
{
if (wait) {
wait->key = POLLEX_SET;
if (in & bit)
wait->key |= POLLIN_SET;
if (out & bit)
wait->key |= POLLOUT_SET;
}
}

poll 方法是3 个系统调用的后端:poll, epoll, 和select, 都用作查询对一个或多个文件描述符的读或写是否会阻塞.poll 方法应当返回一个位掩码指示是否非阻塞的读或写是可能的,并且,可能地,提供给内核信息用来使调用进程睡眠直到I/O 变为可能.如果一个驱动的poll 方法为NULL, 设备假定为不阻塞地可读可写.
首先,对可能引起设备文件状态变化的等待队列调用poll_wait(),将对应的等待队列头添加到poll_table.然后,返回表示是否能对设备进行无阻塞读写访问的掩码。把当前进程添加到wait_address参数指定的等待列表中。需要注意的是这个函数是不会引起阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
unsigned int (*poll) (struct file *, struct poll_table_struct *);        //设备驱动
static unsigned int evdev_poll(struct file *file, poll_table *wait)
{
struct evdev_client *client = file->private_data;
struct evdev *evdev = client->evdev;
poll_wait(file, &evdev->wait, wait);
return ((client->head == client->tail) ? 0 : (POLLIN | POLLRDNORM)) |
(evdev->exist ? 0 : (POLLHUP | POLLERR));
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
p->qproc(filp, wait_address, p); //__pollwait
}
//filp特定fd对应的file结构体指针
//wait 特定fd对应的硬件驱动程序中的等待队列头指针
//p 调用select()的应用程序中poll_wqueues结构体的poll_table项
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,poll_table *p)
{
//通过container_of宏得到结构体poll_wqueues的地址
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
//调用poll_get_entry()得到一个poll_table_entry结构体
//空间不足,动态申请物理内存页,以链表的形式挂在poll_wqueues.table上统一管理。
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
get_file(filp);
entry->filp = filp; // 保存对应的file结构体
entry->wait_address = wait_address; // 保存来自设备驱动程序的等待队列头
entry->key = p->key; // 保存对该fd关心的事件掩码

// 初始化等待队列项,pollwake是唤醒该等待队列项时候调用的函数
init_waitqueue_func_entry(&entry->wait, pollwake);

//static inline void init_waitqueue_func_entry(wait_queue_t *q,wait_queue_func_t func)
//{
// q->flags = 0; //设为非互斥进程
// q->private = NULL;
// q->func = func;
//}

//等待队列项private指向poll_wqueues
entry->wait.private = pwq;
//将该等待队列项添加到从驱动程序中传递过来的等待队列头中去
//即:设置等待的进程为非互斥进程,并将其添加进等待队列头的队头中
add_wait_queue(wait_address, &entry->wait);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void poll_freewait(struct poll_wqueues *pwq)
{
struct poll_table_page * p = pwq->table;
int i;
for (i = 0; i < pwq->inline_index; i++)
free_poll_entry(pwq->inline_entries + i);
while (p) {
struct poll_table_entry * entry;
struct poll_table_page *old;
entry = p->entry;
do {
entry--;
free_poll_entry(entry);
} while (entry > p->entries);
old = p;
p = p->next;
free_page((unsigned long) old);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int poll_schedule_timeout(struct poll_wqueues *pwq, int state,
ktime_t *expires, unsigned long slack)
{
int rc = -EINTR;

set_current_state(state); //设置当前进程的状态state = TASK_INTERRUPTIBLE,有可能在尚 //未到期时由其它信号唤醒进程从而导致函数返回
if (!pwq->triggered) //只要有一个fd对应的设备将当前进程唤醒,将会把triggered设置为1
rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS);
__set_current_state(TASK_RUNNING);
set_mb(pwq->triggered, 0);
return rc;
}
//唤醒的时候调用pollwake
static int pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
struct poll_table_entry *entry;
//取得文件对应的poll_table_entry
entry = container_of(wait, struct poll_table_entry, wait);
//过滤不关心的事件,防止应用进程被误唤醒
if (key && !((unsigned long)key & entry->key))
return 0;
//唤醒
return __pollwake(wait, mode, sync, key);
}
static int __pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
struct poll_wqueues *pwq = wait->private;
//定义一个wait_queue_t类型的变量dummy_wait,并初始化
DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);

//#define __WAITQUEUE_INITIALIZER(name, tsk) { \
// .private = tsk, \
// .func = default_wake_function, \
// .task_list = { NULL, NULL } }

//#define DECLARE_WAITQUEUE(name, tsk) \
// wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)

smp_wmb();
pwq->triggered = 1; //标记为已触发
//唤醒函数
return default_wake_function(&dummy_wait, mode, sync, key);
}

select系统调用的具体实现过程图解:

select结构体依赖关系