epoll 多路复用驱动是异步事件处理,在用户空间它提供了用户数据(epoll_data
),方便事件触发后回调给用户处理。
1. epoll_data
我们来看看 epoll 事件和接口,epoll_data
是用户数据,内核并不会处理,只与对应的 fd 绑定,当 fd 产生事件后,epoll_wait 会回调回来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* 用户数据。*/
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
/* 事件。 */
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
/* 事件控制接口。 */
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/* 事件监控回调接口 */
int epoll_wait(int epfd, struct epoll_event* events, int maxevents. int timeout);
2. 内核
走读 epoll 源码,简单走一下 epoll 事件的添加和回调流程。
2.1. epoll_ctl
监控的 fd 事件信息,被添加到内核红黑树节点进行监控。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/* eventpoll.c */
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event) {
struct epoll_event epds;
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
return -EFAULT;
return do_epoll_ctl(epfd, op, fd, &epds, false);
}
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
bool nonblock) {
...
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, epds, tf.file, fd, full_check);
}
...
}
...
}
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check) {
...
struct epitem *epi;
...
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
...
/* 监控的事件信息被添加到 epi 红黑树节点。 */
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
...
ep_rbtree_insert(ep, epi);
...
}
2.2. epoll_wait
epoll_wait 是阻塞(超时)等待事件发生的接口,当内核检测到 fd 有事件发生,会唤醒进程,内核将 fd 对应事件从内核拷贝(__put_user)到用户空间处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/* eventpoll.c */
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int,
maxevents, int, timeout) {
return do_epoll_wait(epfd, events, maxevents, timeout);
}
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout) {
...
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
...
}
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout) {
...
/* 网络驱动检测到有事件发生,唤醒进程,通知用户。 */
write_lock_irq(&ep->lock);
eavail = ep_events_available(ep);
write_unlock_irq(&ep->lock);
if (eavail)
goto send_events;
...
send_events:
res = ep_send_events(ep, events, maxevents);
...
}
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents) {
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
/* 遍历就绪列表,发送就绪事件到用户态。 */
ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
return esed.res;
}
/* 遍历事件就绪列表。 */
static __poll_t ep_scan_ready_list(struct eventpoll *ep,
__poll_t (*sproc)(struct eventpoll *,
struct list_head *,
void *),
void *priv, int depth, bool ep_locked) {
...
/* 就绪列表(ep->rdllist)数据分片到 txlist 进行发送处理。 */
write_lock_irq(&ep->lock);
list_splice_init(&ep->rdllist, &txlist);
WRITE_ONCE(ep->ovflist, NULL);
write_unlock_irq(&ep->lock);
/* ep_send_events_proc */
res = (*sproc)(ep, &txlist, priv);
...
}
static __poll_t ep_send_events_proc(struct eventpoll *ep,
struct list_head *head, void *priv) {
struct ep_send_events_data *esed = priv;
...
struct epoll_event __user *uevent = esed->events;
...
/* epi 是 fd 对应的红黑树节点,遍历就绪列表,拷贝事件。 */
list_for_each_entry_safe(epi, tmp, head, rdllink) {
...
revents = ep_item_poll(epi, &pt, 1);
...
/* 数据从内核空间拷贝到用户空间。 */
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
...
}
...
}
...
}
3. 开源
epoll_data 在用户层是一个 union 值,那么用户应该如何传参?我们参考一下其它开源项目是如何处理的。
3.1. libco
Linux 系统的 epoll_wait 回调数据,data 是一个 stTimeoutItem_t
指针。
传指针缺点:如果程序在 epoll_wait 回调前,把指针释放了,那么 epoll_wait 回调后回传的指针就变成 野指针
了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* co_coroutine.cpp */
void co_eventloop(stCoEpoll_t *ctx, pfn_co_eventloop_t pfn, void *arg) {
...
co_epoll_res *result = ctx->result;
...
for (;;) {
int ret = co_epoll_wait(ctx->iEpollFd, result, stCoEpoll_t::_EPOLL_SIZE, 1);
...
for (int i = 0; i < ret; i++) {
/* data 值是 stTimeoutItem_t 指针。 */
stTimeoutItem_t *item = (stTimeoutItem_t *)result->events[i].data.ptr;
...
}
}
...
}
3.2. muduo
陈硕大神的 muduo,epoll_wait 回调数据,data 是一个 Channel
对象指针,跟上面的情况一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void EPollPoller::update(int operation, Channel* channel) {
struct epoll_event event;
memZero(&event, sizeof event);
event.events = channel->events();
/* 传指针。*/
event.data.ptr = channel;
int fd = channel->fd();
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) {
...
}
}
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels) {
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0) {
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size()) {
events_.resize(events_.size() * 2);
}
}
...
}
void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const {
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i) {
/* 返回指针。*/
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
...
}
}
3.3. nginx
nginx epoll_event.data 传的是指针,这样也会存在“野指针”的风险,它与 libco 不同的地方,nginx 处理这个指针,根据内存对齐规则,利用指针末位一个 bit 的标识进行保护,以此来检查 fd 的时效性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
struct ngx_event_s {
...
/* used to detect the stale events in kqueue and epoll */
unsigned instance : 1;
...
};
static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event,
ngx_uint_t flags) {
...
struct epoll_event ee;
...
ee.events = events | (uint32_t)flags;
/* 增加 instance 值。 */
ee.data.ptr = (void *)((uintptr_t)c | ev->instance);
...
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}
...
}
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
ngx_uint_t flags) {
...
events = epoll_wait(ep, event_list, (int)nevents, timer);
...
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
instance = (uintptr_t)c & 1;
c = (ngx_connection_t *)((uintptr_t)c & (uintptr_t)~1);
rev = c->read;
/* 检查 instance 值。 */
if (c->fd == -1 || rev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
...
}
...
}
3.4. redis
epoll_event.data 传的是 fd。
传 fd 缺点:我们日常使用,fd 并不是用户的唯一标识,因为当旧的 fd 被 close 掉后,它会被系统回收重复使用,导致新来的用户可能重用原来的 fd,如果逻辑处理不好,也可能会出现问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* ae_epoll.c */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd, state->events, eventLoop->setsize,
tvp ? (tvp->tv_sec * 1000 + tvp->tv_usec / 1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
...
struct epoll_event *e = state->events + j;
...
/* data 传的是 fd。 */
eventLoop->fired[j].fd = e->data.fd;
...
}
}
return numevents;
}
3.5. libev
libev epoll_event.data 传的是 fd 和索引的(uint64_t)组合,这样添加一个索引对数据进行保护,感觉更安全一些。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/* ev_epoll.c */
static void epoll_modify(EV_P_ int fd, int oev, int nev) {
struct epoll_event ev;
...
/* store the generation counter in the upper 32 bits, the fd in the
* lower 32 bits */
ev.data.u64 =
(uint64_t)(uint32_t)fd | ((uint64_t)(uint32_t)++anfds[fd].egen << 32);
ev.events = (nev & EV_READ ? EPOLLIN : 0) | (nev & EV_WRITE ? EPOLLOUT : 0);
if (ecb_expect_true(!epoll_ctl(
backend_fd, oev && oldmask != nev ? EPOLL_CTL_MOD : EPOLL_CTL_ADD,
fd, &ev)))
return;
...
}
static void epoll_poll(EV_P_ ev_tstamp timeout) {
...
eventcnt = epoll_wait(backend_fd, epoll_events, epoll_eventmax,
EV_TS_TO_MSEC(timeout));
...
for (i = 0; i < eventcnt; ++i) {
struct epoll_event *ev = epoll_events + i;
int fd = (uint32_t)ev->data.u64; /* mask out the lower 32 bits */
int want = anfds[fd].events;
...
/*
* check for spurious notification.
* this only finds spurious notifications on egen updates
* other spurious notifications will be found by epoll_ctl, below
* we assume that fd is always in range, as we never shrink the
* anfds array
*/
if (ecb_expect_false((uint32_t)anfds[fd].egen !=
(uint32_t)(ev->data.u64 >> 32))) {
/* recreate kernel state */
postfork |= 2;
continue;
}
...
}
}
4. 小结
- 阅读经典开源源码,可以深层次理解作者代码设计思路,通过对比,加深对代码的理解。
- 做底层的事件驱动逻辑,需要缜密的思维,只要逻辑错误,无论传的是啥参数一样会出现错误。
- 现实使用,我们可以参考开源的实现。