kimserver 是多进程框架,在分布式系统里,多进程节点之间是如何进行通信的,客户端与分布式服务集群的通信流程是怎么样的,本章主要讲解这些问题。
1. 流程
1.1. 客户端与服务通信
1.2. 服务节点通信
A 节点与 B 节点数据透传 –> A1 与 B1 子进程建立通信。
- A1 创建 socket fd。
- A1 连接 B 节点 ip / port –> A1 连接 B0。
- A1 connect 异步返回结果,触发读写事件。
- A1 与 B0 连接成功,A1 发送连接信息(type / ip / port / index)给 B0。
- B0 接收到 A1 发的数据,将 fd 透传给对应的子进程 B1,A1 与 B1 连接成功。(《[kimserver] 父子进程传输文件描述符》
- B1 将自己的 type / ip / port / index 信息回传给 A1。
- A1 收到 B1 回包,将 B1 的 fd 保存起来。
- A1 与 B1 的通道被打通后,发送缓冲区里等待发送的的业务数据包。
2. 源码
核心逻辑在 sys_cmd.h/sys_cmd.cpp
文件里实现。
kimserver 作为异步服务,核心功能是把异步的逻辑封装在 Cmd
沙盒里,但是系统内部节点通信逻辑复杂,逻辑牵涉到多种数据结构调用,而且分开多个 Cmd
模块,让逻辑更加零散,维护起来,会让人云里雾绕。
所以笔者,将系统的父子进程异步通信逻辑集中在一个文件里实现,逻辑相对清晰,而且方便维护。
2.1. 实现逻辑
2.2. 接入
节点间相互连接的接口调用,主要参考 network.cpp/auto_send
函数的实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/* network.cpp */
bool Network::auto_send(const std::string& host, int port, int worker_index,
const MsgHead& head, const MsgBody& body) {
...
/* 创建 socket,等待连接。 */
fd = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if (fd == -1) {
LOG_ERROR("client connect server failed! errstr: %s", m_errstr);
return false;
}
/* 创建连接对象。 */
c = create_conn(fd);
if (c == nullptr) {
close_fd(fd);
LOG_ERROR("create conn failed! fd: %d", fd);
return false;
}
...
/* 关注连接读事件。 */
w = m_events->add_read_event(fd, c->get_ev_io(), this);
if (w == nullptr) {
LOG_ERROR("add read event failed! fd: %d", fd);
goto error;
}
/* 关注连接写事件。 */
w = m_events->add_write_event(fd, w, this);
if (w == nullptr) {
LOG_ERROR("add write event failed! fd: %d", fd);
goto error;
}
...
/* 设置连接状态为准备连接,需要先建立节点间的通信才算真正连接成功。 */
c->set_state(Connection::STATE::TRY_CONNECT);
/* 将需要发送的数据,添加进等待发送缓存,当连接成功后,进行发送。 */
if (c->conn_write_waiting(head, body) == Codec::STATUS::ERR) {
LOG_ERROR("write waiting data failed! fd: %d", fd);
goto error;
}
/* 添加连接超时时钟。 */
if (!add_io_timer(c, 1.5)) {
LOG_ERROR("add io timer failed! fd: %d", fd);
goto error;
}
/* A1 connect to B1, and save B1's connection.
* 记录节点连接信息。*/
node_id = format_nodes_id(host, port, worker_index);
m_node_conns[node_id] = c;
c->set_node_id(node_id);
/* 启动链接。 */
connect(fd, (struct sockaddr*)&saddr, sizeof(struct sockaddr));
...
}
2.3. 协议处理流程
节点相互连接协议通信实现,详细信息,参考源码实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/* auto_send(...)
* A1 contact with B1. (auto_send func)
*
* A1: node A's worker.
* B0: node B's manager.
* B1: node B's worker.
*
* process_sys_message(.)
* 1. A1 connect to B0. (inner host : inner port)
* 2. A1 send CMD_REQ_CONNECT_TO_WORKER to B0.
* 3. B0 send CMD_RSP_CONNECT_TO_WORKER to A1.
* 4. B0 transfer A1's fd to B1.
* 5. A1 send CMD_REQ_TELL_WORKER to B1.
* 6. B1 send CMD_RSP_TELL_WORKER A1.
* 7. A1 send waiting buffer to B1.
* 8. B1 send ack to A1.
*/
Cmd::STATUS SysCmd::process(Request& req) {
return (m_net->is_manager()) ? process_manager_msg(req) : process_worker_msg(req);
}
Cmd::STATUS SysCmd::process_manager_msg(Request& req) {
LOG_TRACE("process manager message.");
switch (req.msg_head()->cmd()) {
case CMD_REQ_CONNECT_TO_WORKER: {
return on_req_connect_to_worker(req);
}
...
default: {
return Cmd::STATUS::UNKOWN;
}
}
}
Cmd::STATUS SysCmd::process_worker_msg(Request& req) {
/* worker. */
LOG_TRACE("process worker's msg, head cmd: %d, seq: %u",
req.msg_head()->cmd(), req.msg_head()->seq());
switch (req.msg_head()->cmd()) {
case CMD_RSP_CONNECT_TO_WORKER: {
return on_rsp_connect_to_worker(req);
}
case CMD_REQ_TELL_WORKER: {
return on_req_tell_worker(req);
}
case CMD_RSP_TELL_WORKER: {
return on_rsp_tell_worker(req);
}
...
default: {
return Cmd::STATUS::UNKOWN;
}
}
}