惊群效应理解

2019-09-25

惊群效应理解:多个进程或者线程阻塞等待同一个事件,当事件到来,多线程或者多进程同时被唤醒,只有一个线程或进程获得资源。通俗点说:往鸡群里仍一颗稻谷,鸡群争抢,只有一个成功,其它失败。


1. 现象

火焰图观察 accept,或者用 strace 命令观察底层调用。可以用脚本获取对应进程的火焰图。

1
2
3
4
5
6
7
8
9
10
11
12
#!/bin/sh

if [ $# -lt 1 ]; then
    echo 'input pid'
    exit 1
fi

rm -f perf.*
perf record -F 99 -p $1 -g -- sleep 60
perf script -i perf.data &> perf.unfold
stackcollapse-perf.pl perf.unfold &> perf.folded
flamegraph.pl perf.folded > perf.svg

火焰图参考:软件性能检测–火焰图🔥


2. 结果

线程或进程切换,内核需要保存上下文以及寄存器等资源,频繁切换会导致系统资源损耗。


3. 解决方案

解决 epoll 的惊群问题:

  1. 代码同步加锁(参考 nginx 源码)。
  2. 设置 socket 属性 SO_REUSEPORT (Linux 系统内核层面解决,这个方案简单,参考 nginx 这个属性设置)

4. 原理

当用 epoll 事件处理高并发事件模型时候,多个进程或线程 epoll_wait 会阻塞等待网络事件,当有新的 client connect 进来,epoll_wait 会同时被会唤醒争抢这个链接资源,然后调用 accept 处理,争抢资源失败的 accept 会返回 EAGAIN。


5. 测试

源码 server 是 epoll 事件模型,client 用 telnet 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define MAXEVENTS 64
#define PROCESS_NUM 5
#define PORT 1234

void test_accept() {
    printf("test_accept...\n");

    int fd = socket(PF_INET, SOCK_STREAM, 0);
    int connfd;
    int pid;

    char sendbuff[1024] = {0};
    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(PORT);

    int iReuse = 1;
    ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &iReuse, sizeof(iReuse));
    bind(fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    listen(fd, 1024);
    for (int i = 0; i < PROCESS_NUM; i++) {
        pid = fork();
        if (pid == 0) {
            while (1) {
                connfd = accept(fd, (struct sockaddr *)NULL, NULL);
                snprintf(sendbuff, sizeof(sendbuff), "accept pid = %d\n",
                         getpid());

                send(connfd, sendbuff, strlen(sendbuff) + 1, 0);
                printf("process %d accept success\n", getpid());
                close(connfd);
            }
        }
    }

    wait(0);
}

int sock_creat_bind(u_short uiPort) {
    int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_port = htons(uiPort);
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

    int iReuse = 1;
    ::setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &iReuse, sizeof(iReuse));
    ::setsockopt(sock_fd, SOL_SOCKET, SO_REUSEPORT, &iReuse, sizeof(iReuse));
    bind(sock_fd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    return sock_fd;
}

int make_nonblocking(int fd) {
    int val = fcntl(fd, F_GETFL);
    val |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, val) < 0) {
        perror("fcntl set");
        return -1;
    }
    return 0;
}

int create_listen_socket() {
    int sock_fd;

    if ((sock_fd = sock_creat_bind(PORT)) < 0) {
        perror("socket and bind");
        return -1;
    }

    if (make_nonblocking(sock_fd) < 0) {
        perror("make non blocking");
        return -1;
    }

    if (listen(sock_fd, SOMAXCONN) < 0) {
        perror("listen");
        return -1;
    }

    return sock_fd;
}

void worker(int iIndex) {
    printf("epoll accept....worker index: %d\n", iIndex);

    int epoll_fd;
    int sock_fd = create_listen_socket();
    if (sock_fd == -1) {
        printf("create socket fail, worker index: %d\n", iIndex);
        exit(1);
    }

    struct epoll_event event;
    struct epoll_event *events;

    if ((epoll_fd = epoll_create(MAXEVENTS)) < 0) {
        perror("epoll_create");
        exit(1);
    }

    event.data.fd = sock_fd;
    event.events = EPOLLIN;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) < 0) {
        perror("epoll_ctl");
        exit(1);
    }

    events = (epoll_event *)calloc(MAXEVENTS, sizeof(event));

    while (1) {
        int num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
        printf("process %d return from epoll_wait\n", getpid());
        for (int i = 0; i < num; i++) {
            if ((events[i].events & EPOLLERR) ||
                (events[i].events & EPOLLHUP) ||
                (!(events[i].events & EPOLLIN))) {
                fprintf(stderr, "epoll error\n");
                close(events[i].data.fd);
                continue;
            } else if (sock_fd == events[i].data.fd) {
                struct sockaddr in_addr;
                socklen_t in_len = sizeof(in_addr);
                if (accept(sock_fd, &in_addr, &in_len) < 0) {
                    printf("process %d accept failed!\n", getpid());
                } else {
                    printf("process %d accept successful!\n", getpid());
                }
            }
        }
    }

    free(events);
    close(sock_fd);
}

void test_epoll_accept() {
    printf("test_epoll_accept...\n");

    int epoll_fd;
    int sock_fd = create_listen_socket();
    if (sock_fd == -1) {
        printf("create socket fail\n");
        exit(1);
    }

    struct epoll_event event;
    struct epoll_event *events;

    if ((epoll_fd = epoll_create(MAXEVENTS)) < 0) {
        perror("epoll_create");
        exit(1);
    }

    event.data.fd = sock_fd;
    event.events = EPOLLIN;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event) < 0) {
        perror("epoll_ctl");
        exit(1);
    }

    events = (epoll_event *)calloc(MAXEVENTS, sizeof(event));

    for (int k = 0; k < PROCESS_NUM; k++) {
        int iPid = fork();
        if (iPid != 0) {
            printf("fork process pid: %d\n", getpid());
            while (1) {
                int num = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
                printf("process %d return from epoll_wait\n", getpid());
                for (int i = 0; i < num; i++) {
                    if ((events[i].events & EPOLLERR) ||
                        (events[i].events & EPOLLHUP) ||
                        (!(events[i].events & EPOLLIN))) {
                        fprintf(stderr, "epoll error\n");
                        close(events[i].data.fd);
                        continue;
                    } else if (sock_fd == events[i].data.fd) {
                        struct sockaddr in_addr;
                        socklen_t in_len = sizeof(in_addr);
                        if (accept(sock_fd, &in_addr, &in_len) < 0) {
                            printf("process %d accept failed!\n", getpid());
                        } else {
                            printf("process %d accept successful!\n", getpid());
                        }
                    }
                }
            }
        }
    }

    wait(0);
    free(events);
    close(sock_fd);
}

void test_epoll_reuseport_accept() {
    printf("test_epoll_reuseport_accept... \n");

    for (int i = 0; i < PROCESS_NUM; i++) {
        int iPid = fork();
        if (iPid == 0) {
            worker(i);
            sleep(1);
            return;
        }
    }

    printf("parent, pid: %d\n", getpid());
    wait(0);
}

void test_fork() {
    for (int i = 0; i < PROCESS_NUM; i++) {
        int iPid = fork();
        if (iPid == 0) {
            printf("child, pid: %d, index: %d\n", getpid(), i);
            return;
        } else {
            printf("parent, pid: %d, index: %d\n", getpid(), i);
        }
    }

    printf("end parent, pid: %d\n", getpid());
}

int main(int argc, char *argv[]) {
    // test_fork();
    // test_accept();
    // test_epoll_accept();
    test_epoll_reuseport_accept();
    return 0;
}

6. 参考