文先生的博客 求职,坐标深圳。(wenfh2020@126.com)

高性能服务异步通信逻辑

2020-06-11

最近整理了一下服务程序异步通信逻辑思路。异步逻辑与同步逻辑处理差别比较大,异步逻辑可能涉及多次回调才能完成一个完整的请求处理,逻辑被碎片化,切分成串行的步骤。习惯了写同步逻辑的朋友,有可能思维上转不过来。


1. 逻辑

  • 高性能异步非阻塞服务,底层一般用多路复用 I/O 模型对事件进行管理,例如 Linux 平台的 epoll。
  • epoll 支持异步事件逻辑。epoll_wait 会将就绪事件从内核中取出进行处理。
  • 服务处理事件,每个 fd 对应一个事件处理器 callback 处理取出的 events。
  • callback 逻辑被分散为逻辑步骤 step,这些步骤一般是异步串行处理,时序跟同步差不多,只是异步逻辑可能需要回调多次才能处理完一个完整的逻辑。

高性能异步框架通信流程

设计图来源:《异步服务框架通信流程


2. redis 源码逻辑

正常逻辑一般有 N 个步骤,异步逻辑不同之处,通过 callback 逻辑实现,与同步比较确实有点反人类。callback 回调回来还能定位到原来执行体,关键点在于 privdata

我们看看 redis 的 callback 逻辑。(github 源码

详细请参考:《[redis 源码走读] 事件 - 文件事件》《epoll 多路复用 I/O工作流程

  • 事件结构。
1
2
3
4
5
6
typedef struct redisAeEvents {
    redisAsyncContext *context;
    aeEventLoop *loop;
    int fd;
    int reading, writing;
} redisAeEvents;
  • 添加读事件,将 privdata (redisAeEvents) 与对应事件,对应回调函数绑定。
1
2
3
4
5
6
7
8
static void redisAeAddRead(void *privdata) {
    redisAeEvents *e = (redisAeEvents*)privdata;
    aeEventLoop *loop = e->loop;
    if (!e->reading) {
        e->reading = 1;
        aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
    }
}
  • 回调。
1
2
3
4
5
6
static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
    ((void)el); ((void)fd); ((void)mask);

    redisAeEvents *e = (redisAeEvents*)privdata;
    redisAsyncHandleRead(e->context);
}

3. 状态机

用状态机实现异步逻辑是常用做法,异步逻辑本来已经很复杂了,状态机如果设计复杂,那将会增加项目的复杂度。所以状态机用 switch 实现,简简单单就足够了。

下面的测试代码写得比较粗糙,只实现了简单的几个操作,就有几十行代码了。用 (python/golang) 协程,源码可控制在 20 行以内,而且也能一定程度上兼顾性能。

在一些致力于敏捷研发的团队,用 callback 写异步逻辑不是一个明智的做法,非性能瓶颈,不建议使用异步逻辑去写业务。毕竟快速交付项目,推进业务,才是目标。而且很多时候,增加几台机器的成本,远远低于增加一个员工。

github 测试源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
namespace kim {

enum E_STEP {
    E_STEP_PARSE_REQUEST = 0,
    E_STEP_REDIS_SET,
    E_STEP_REDIS_SET_CALLBACK,
    E_STEP_REDIS_GET,
    E_STEP_REDIS_GET_CALLBACK,
};

Cmd::STATUS CmdTestRedis::execute_steps(int err, void* data) {
    int port = 6379;
    std::string host("127.0.0.1");

    switch (get_exec_step()) {
        case E_STEP_PARSE_REQUEST: {
            const HttpMsg* msg = m_req->get_http_msg();
            if (msg == nullptr) {
                return Cmd::STATUS::ERROR;
            }

            LOG_DEBUG("cmd test redis, http path: %s, data: %s",
                      msg->path().c_str(), msg->body().c_str());

            CJsonObject req_data(msg->body());
            if (!req_data.Get("key", m_key) ||
                !req_data.Get("value", m_value)) {
                LOG_ERROR("invalid request data! pls check!");
                return response_http(ERR_FAILED, "invalid request data");
            }
            return execute_next_step(err, data);
        }
        case E_STEP_REDIS_SET: {
            LOG_DEBUG("step redis set, key: %s, value: %s", m_key.c_str(), m_value.c_str());
            std::vector<std::string> rds_cmds{"set", m_key, m_value};
            Cmd::STATUS status = redis_send_to(host, port, rds_cmds);
            if (status == Cmd::STATUS::ERROR) {
                return response_http(ERR_FAILED, "redis failed!");
            }
            set_next_step();
            return status;
        }
        case E_STEP_REDIS_SET_CALLBACK: {
            redisReply* reply = (redisReply*)data;
            if (err != ERR_OK || reply == nullptr ||
                reply->type != REDIS_REPLY_STATUS || strncmp(reply->str, "OK", 2) != 0) {
                LOG_ERROR("redis set data callback failed!");
                return response_http(ERR_FAILED, "redis set data callback failed!");
            }
            LOG_DEBUG("redis set callback result: %s", reply->str);
            return execute_next_step(err, data);
        }
        case E_STEP_REDIS_GET: {
            std::vector<std::string> rds_cmds{"get", m_key};
            Cmd::STATUS status = redis_send_to(host, port, rds_cmds);
            if (status == Cmd::STATUS::ERROR) {
                return response_http(ERR_FAILED, "redis failed!");
            }
            return status;
        }
        case E_STEP_REDIS_GET_CALLBACK: {
            redisReply* reply = (redisReply*)data;
            if (err != ERR_OK || reply == nullptr || reply->type != REDIS_REPLY_STRING) {
                LOG_ERROR("redis get data callback failed!");
                return response_http(ERR_FAILED, "redis set data failed!");
            }
            LOG_DEBUG("redis get callback result: %s, type: %d", reply->str, reply->type);
            CJsonObject rsp_data;
            rsp_data.Add("key", m_key);
            rsp_data.Add("value", m_value);
            return response_http(ERR_OK, "success", rsp_data);
        }
        default: {
            LOG_ERROR("invalid step");
            return response_http(ERR_FAILED, "invalid step!");
        }
    }
}

}  // namespace kim

4. 性能

用 siege 对异步 http 服务进行压力测试。服务单进程单线程支持:长连接 1.5w qps,短连接 1w qps。多进程整体的并发能力将会更大。

数据是通过 Mac 本子本地压测获得的,不同机器,得出的数据可能不一样,进程并发能力与物理机器配置也有直接关系。

  • 长连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# ./http_pressure.sh
{       "transactions":                        50000,
        "availability":                       100.00,
        "elapsed_time":                         3.38,
        "data_transferred":                     3.43,
        "response_time":                        0.01,
        "transaction_rate":                 14792.90,
        "throughput":                           1.02,
        "concurrency":                         99.66,
        "successful_transactions":             50000,
        "failed_transactions":                     0,
        "longest_transaction":                  0.02,
        "shortest_transaction":                 0.00
}
  • 短连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# ./http_pressure.sh
{       "transactions":                        10000,
        "availability":                       100.00,
        "elapsed_time":                         0.99,
        "data_transferred":                     0.69,
        "response_time":                        0.01,
        "transaction_rate":                 10101.01,
        "throughput":                           0.69,
        "concurrency":                         97.59,
        "successful_transactions":             10000,
        "failed_transactions":                     0,
        "longest_transaction":                  0.08,
        "shortest_transaction":                 0.00
}

5. 参考


作者公众号
微信公众号,干货持续更新~