文先生的博客 求职,坐标深圳。(wenfh2020@126.com)

[libco] libco 定时器(时间轮)

2021-03-30

libco 定时器核心数据结构:数组 + 链表,有点像哈希表,通过空间换时间。libco 定时器也被称为时间轮,我们看看这个 “轮” 是怎么转的。


1. 概述

libco 定时器核心数据结构:数组 + 双向链表(下面左图)。

数组默认大小为:60 * 1000,以毫秒为单位,主要保存一分钟以内到期的事件数据。

相同到期时间的事件,会保存在双向链表里,当时间到期时,到期事件链表会一起取出来。

当然超过一分钟的到期事件也支持保存,通过取模路由,有可能与一分钟以内到期的数据耦合在一起,一起取出来后,再检查,没到期的重新写回去即可。

一般的应用场景,超时事件也是 1 分钟以内,超过这个时间事件的处理方案,虽然有点笨,但是代码维护代价比较小。


libco 定时器也被称为时间轮(右图):因为数组数据结构,下标是以毫秒为单位,当前时间下标 stTimeout_t.llStartIdx,沿着这个下标,随着时间推移,顺时针读写数据。

图片来源:processon


2. 定时器源码分析

  • 数据结构。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct stCoEpoll_t {
    ...
    struct stTimeout_t *pTimeout; /* 定时器。 */
    ...
};

/* 定时器。 */
struct stTimeout_t {
    stTimeoutItemLink_t *pItems; /* 数组。 */
    int iItemSize;               /* 数组大小。 */
    unsigned long long ullStart; /* 上一次获取到期事件的时间。 */
    long long llStartIdx;        /* 上一次获取到期事件的数组下标。 */
};

/* 到期事件双向链表。 */
struct stTimeoutItemLink_t {
    stTimeoutItem_t *head;
    stTimeoutItem_t *tail;
};
  • 初始化定时器数据结构,数组大小 60000。一般应用场景,定时任务都在 1 分钟(60000 毫秒)以内,那么超过一分钟的怎么办?看下面 AddTimeout 的源码实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
stCoEpoll_t *AllocEpoll() {
    ...
    ctx->pTimeout = AllocTimeout(60 * 1000);
    ...
}

stTimeout_t *AllocTimeout(int iSize) {
    stTimeout_t *lp = (stTimeout_t *)calloc(1, sizeof(stTimeout_t));
    lp->iItemSize = iSize;
    lp->pItems = (stTimeoutItemLink_t *)calloc(
        1, sizeof(stTimeoutItemLink_t) * lp->iItemSize);
    lp->ullStart = GetTickMS();
    lp->llStartIdx = 0;
    return lp;
}
  • 添加到期事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int AddTimeout(stTimeout_t *apTimeout, stTimeoutItem_t *apItem,
               unsigned long long allNow) {
    ...
    unsigned long long diff = apItem->ullExpireTime - apTimeout->ullStart;
    if (diff >= (unsigned long long)apTimeout->iItemSize) {
        /* 超过一分钟的事件,应该排在轮子尾部,最后才检查的,
         * 因为轮子是“圆”的,所以尾部就是当前数组下标(llStartIdx)的前一个下标。
         * (这里有点绕...) */
        diff = apTimeout->iItemSize - 1;
        ...
    }

    /* 通过取模,将定时器事件存储到数组对应的双向链表里。 */
    AddTail(apTimeout->pItems + (apTimeout->llStartIdx + diff) % apTimeout->iItemSize,
            apItem);
    return 0;
}
  • 获取到期事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/* 获取到期事件。 */
inline void TakeAllTimeout(stTimeout_t *apTimeout, unsigned long long allNow,
                           stTimeoutItemLink_t *apResult) {
    ...
    /* 处理当前时间与上一次处理时间间隔内到期的时间事件。 */
    int cnt = allNow - apTimeout->ullStart + 1;
    if (cnt > apTimeout->iItemSize) {
        cnt = apTimeout->iItemSize;
    }
    ...
    for (int i = 0; i < cnt; i++) {
        int idx = (apTimeout->llStartIdx + i) % apTimeout->iItemSize;
        Join<stTimeoutItem_t, stTimeoutItemLink_t>(apResult, apTimeout->pItems + idx);
    }
    /* 刷新当前数据。 */
    apTimeout->ullStart = allNow;
    apTimeout->llStartIdx += cnt - 1;
}

/* 事件循环,处理事件。 */
void co_eventloop(stCoEpoll_t *ctx, pfn_co_eventloop_t pfn, void *arg) {
    ...
    for (;;) {
        int ret = co_epoll_wait(ctx->iEpollFd, result, stCoEpoll_t::_EPOLL_SIZE, 1);
        ...
        /* 取出当前时间到期事件。 */
        unsigned long long now = GetTickMS();
        TakeAllTimeout(ctx->pTimeout, now, timeout);
        ...
        /* 标识这个时间是到期事件,因为 fd 事件和时间事件耦合在一起了(!^_^)。 */
        stTimeoutItem_t *lp = timeout->head;
        while (lp) {
            lp->bTimeout = true;
            lp = lp->pNext;
        }

        Join<stTimeoutItem_t, stTimeoutItemLink_t>(active, timeout);

        lp = active->head;
        while (lp) {
            /* 遍历处理活跃事件。(fd 事件,到期事件。) */
            PopHead<stTimeoutItem_t, stTimeoutItemLink_t>(active);
            /* 处理时间时间,到期的先处理,没到期的重新添加回去。 */
            if (lp->bTimeout && now < lp->ullExpireTime) {
                /* 因为时间轮数组只有 60000 个下标,一般存储 1 分钟以内的到期事件,
                 * 但是超过 1 分钟到期的事件也支持存储,这样有可能这些事件经过取模,
                 * 与 1 分钟以内到期事件存储在同一个双向链表里,被取出来了,
                 * 所以这些没到期的事件,应该重新存回去。 */
                int ret = AddTimeout(ctx->pTimeout, lp, now);
                if (!ret) {
                    lp->bTimeout = false;
                    lp = active->head;
                    continue;
                }
            }
            ...
            lp = active->head;
        }
    }
}

3. 缺点

  • 时间轮数组默认大小是 60 * 1000。以毫秒为单位,最大时间间隔是一分钟,但是如果处理超过一分钟的过期事件,处理效率就降低了,例如 session,它的过期时间就经常超过 1 分钟。
  • libco 的定时器设计主要是为了它内部协程切换使用,添加一个定时事件以后,很难通过查找方式,删除指定时间事件,只有等到事件触发了,事件才会从双向链表中删除。
  • 定时器对外部使用不友好,貌似只有创建协程,在协程处理函数内部通过 poll 实现。而且通过协程去实现定时器,成本太大了,一个协程结构体就几 K 的内存空间 😔,所以我自己在处理 session 过期问题,不得不造了个轻量级的定时器轮子:基于 stl map 的定时器
  • 综合上述几个问题,这也很好解析了,为啥有的开源通过小堆去维护定时器,例如 libev。

4. 小结

libco 的定时器设计非常优秀,在一定的范围内,非常高效,虽然有些小缺点,但瑕不掩瑜。


5. 参考


作者公众号
微信公众号,干货持续更新~