[redis 源码走读] 事件 - 定时器

2020-04-06

定时器是 redis 异步处理事件的一个十分重要的功能。

redis 定时器功能由多个时间事件组成,事件由一个双向链表维护。

时间事件可以处理多个定时任务。


理解 redis 定时器,我们带着问题,看看 redis 是怎么处理的:

  • 定时器作用是什么。
  • 定时器实现原理。
  • 单进程里如何同时处理文件事件和时间事件。
  • 如何实现多定时任务。

1. 作用

redis 定时器核心逻辑在 serverCron 函数里。

  • 对设置了过期时间的数据进行检查回收。
  • 异步回收需要关闭的链接(socket)。
  • 检查 fork 的子进程是否已经关闭,处理回收的相关工作。
  • 检查内存数据是否符合 rdb 持久化快照落地条件,fork 子进程进行快照保存。
  • bgsave rdb 生成快照或 bgrewriteaof aof 重写延后操作。
  • 对需要扩容和缩容的哈希表(dict)进行数据迁移。
  • 集群里节点间的断线重连。
  • redis 服务的一些信息统计。

2. 定时器实现原理

redis 定时器功能由多个时间事件组成,事件由一个双向链表维护。

2.1. 事件结构

  • 事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 时间事件
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc; /* 时钟到期事件触发回调处理函数。*/
    aeEventFinalizerProc *finalizerProc; /* 时间事件删除时,触发回调。*/
    void *clientData; /* 扩展参数,异步操作方便数据回调,在 timeProc 通过参数回传。*/
    struct aeTimeEvent *prev; /* 前一个节点。*/
    struct aeTimeEvent *next; /* 后一个节点。*/
} aeTimeEvent;

// 事件管理
typedef struct aeEventLoop {
    ...
    long long timeEventNextId;  // 时间事件下一个 id (通过 ‘++’ 递增)
    ...
    aeTimeEvent *timeEventHead; // 时间事件链表。
    ...
} aeEventLoop;
  • 事件循环。
1
2
3
4
5
6
7
8
9
// 循环处理事件。
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
  • 事件回调函数。
1
2
3
4
5
6
// 时间事件触发处理函数。
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);

// 时间事件处理完毕,被删除时,触发的回调处理。
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
  • 创建时间事件,时间事件通过双向链表管理,新的事件插入到链表头。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc) {
    // 事件 id 递增。
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    // 设置到期时间。
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}
  • 设置事件到期时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
    long cur_sec, cur_ms, when_sec, when_ms;

    // 当前时间增加到期时间间隔。
    aeGetTime(&cur_sec, &cur_ms);
    when_sec = cur_sec + milliseconds/1000;
    when_ms = cur_ms + milliseconds%1000;
    if (when_ms >= 1000) {
        when_sec ++;
        when_ms -= 1000;
    }
    *sec = when_sec;
    *ms = when_ms;
}

2.2. 定时器执行流程

  • redis 启动添加时钟处理事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ae.c
int main(int argc, char **argv) {
    ...
    initServer();
    ...
    aeMain(server.el);
    ...
}

void initServer(void) {
    ...
    // 创建定时事件,绑定回调函数。
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    ...
}

// 时钟回调处理函数
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
}
  • 事件循环处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 循环处理事件。
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

// 处理时间事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    ...
     /* Check time events */
    if (flags & AE_TIME_EVENTS)
        // 处理时间事件
        processed += processTimeEvents(eventLoop);
    ...
}

3. 单进程异步处理事件逻辑

进程通过循环,不停地处理时间事件和文件事件。

redis 单进程处理文件事件和时间事件

  • 在进程的循环中不停地捞出文件和时间事件进行处理 aeProcessEvents
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ae.c
int main(int argc, char **argv) {
    ...
    aeMain(server.el);
    ...
}

// 循环处理事件。
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        ...
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

// 处理事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    ...
    // 先搜索出最快到期的定时器,查看时间戳,文件事件要在定时器到期前从系统内核捞出来处理。
    shortest = aeSearchNearestTimer(eventLoop);
    ...
    // 处理文件事件,等待获取事件时间间隔不能太长,否则定时器事件处理要超时了。
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
        ...
    }
    ...
    // 处理时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    ...
}
  • 对于文件事件,在 linux 系统中,redis 采用了 epoll 多路复用 I/O 事件驱动处理文件事件。通过 epoll_wait 捞出就绪事件进行处理。
1
2
3
4
5
6
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    ...
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    ...
}
  • 先处理完文件事件,再处理时间事件。
1
2
3
static int processTimeEvents(aeEventLoop *eventLoop) {
    ...
}

4. 多定时任务

我们看看 processTimeEvents 是如何处理多个定时任务的:

redis 多定时任务

  • 遍历时间事件链表,先删除已处理,被标识需要删除的事件,再执行到期事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);
    ...
    eventLoop->lastTime = now;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    // 遍历时间链表,处理到期执行的时间事件。
    while(te) {
        long now_sec, now_ms;
        long long id;

        // 从时间事件链表中,删除时被标识为删除状态的事件。
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }
        ...
        // 时间事件到期,执行事件。
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms)) {
            int retval;

            id = te->id;
            // 执行时钟回调函数
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            // 如果回调函数不返回 AE_NOMORE,重新更新该事件的到期时间,等待下次触发,否则标识事件为删除状态。
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}
  • 时间事件定时执行原理。

    到期事件回调处理函数 timeProc,例如 redis 对应的处理函数 serverCronserverCron 返回下一次到期的时间间隔,事件到期时间被(aeAddMillisecondsToNow)修改延后一个时间间隔,下一次到期再重新执行,从而达到时钟定期执行的效果。

1
2
3
4
5
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    // 返回时间间隔,单位毫秒。
    return 1000/server.hz;
}
  • 定时器定时执行频率。

    很多后台任务都在定时器里执行,定时执行频率可以由配置文件的 hz 频率控制,时间事件 (1000/hz) 毫秒执行一次,频率越高,到期时间间隔越小,刷得越快,定时后台任务处理得越快,但是这样也会相应地损耗更多的系统资源,而且定时事件和文件事件是在同一个进程中进行的,这样肯定会影响到文件事件执行。一般情况下,系统默认一秒定时执行 10 次,也就是 hz == 10

1
2
3
4
# redis.conf

# 定时器事件刷新频率 1 < hz < 500,默认 10
hz 10
  • 多定时任务

    事件里有不同的定时任务,它们定时执行的任务有快有慢,那对于多个定时任务,在时间事件触发后,它是如何处理的呢? 可以通过宏 run_with_period 处理。当时间间隔 _ms_ 很小的时候,每次触发时间事件,任务都会执行,否则通过记录事件触发的次数 server.cronloops++,当 hz 触发的事件时间间隔累积起来达到长时间间隔,就执行慢任务。(参考上图)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
struct redisServer {
    ...
    int cronloops;              /* Number of times the cron function run */
    ...
}

#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    // 快任务,时间间隔 <= 时间事件触发时间间隔,执行。
    run_with_period(100) {
        ...
    }
    ...
    // 慢任务,定时时间间隔比较长的,需要通过 server.cronloops 累加达到长时间间隔,才会执行慢任务。
    run_with_period(5000) {
        ...
    }

    // 每次触发定时事件,都会执行。
    /* We need to do a few operations on clients asynchronously. */
    clientsCron();

    /* Handle background operations on Redis databases. */
    databasesCron();
    ...
    server.cronloops++;
    ...
}

5. 效率

  • 添加事件,新的事件是直接添加到链表头,数据并没有排序。
  • 搜索最快过期的时钟事件,遍历查找,时间复杂度 O(N)。

基于上面两个问题,所以时钟过期处理效率是比较低的,但是 redis 里的基本没啥时钟事件,感觉暂时没有优化的必要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/* Search the first timer to fire.
 * This operation is useful to know how many time the select can be
 * put in sleep without to delay any event.
 * If there are no timers NULL is returned.
 *
 * Note that's O(N) since time events are unsorted.
 * Possible optimizations (not needed by Redis so far, but...):
 * 1) Insert the event in order, so that the nearest is just the head.
 *    Much better but still insertion or deletion of timers is O(N).
 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
 */
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) {
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;

    while (te) {
        if (!nearest || te->when_sec < nearest->when_sec ||
            (te->when_sec == nearest->when_sec &&
             te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}

6. 小结

  • 定时器是由时间事件组成的,目前,redis 核心的定时事件只有一个,核心逻辑都在 serverCron 函数里。
  • 定时器定时触发频率受配置 hz 影响,可以通过修改该配置项,调整定时处理速度。
  • 多定时任务,其实在同一个定时器时间事件里处理。
  • 定时事件和文件事件都在同一个进程中执行,所以虽然定时事件时间精度是毫秒,但是不一定会十分精确,会受到文件事件处理影响。
  • 定时事件执行还会受到定时器里的任务处理影响,例如系统在一个时间段内很多过期数据,那么系统有可能会分配更多的时间片去处理。
  • 基于 redis 主逻辑都在单进程主线程中实现,所以定时任务不能执行太长的时间,所以很多复杂的定时任务,都会限制处理数量和处理时间。(例如 字典 dict 的扩容和缩容maxmemory 数据淘汰策略 等等。)

7. 后记

通读一个知识点后,知识在脑海中是模糊的,需要通过不同方式去强化清晰这个脑海中的映像。让抽象思维落地,我自己会经常将一些知识点图形化,这样一点一点地将知识碎片拼接起来。

外接显示器