mysql
非阻塞异步 client 比较小众,能搜索出来的资料不多。
只要做单线程的异步服务,就绕不开 mysql 数据库操作。很幸运,mariadb
提供了异步接口,在 github 上找到一个项目(mysql_async)是结合 libev 实现的异步项目,正合我意!接下来对其进行改造。
1. 异步接口文档
Mariadb 提供异步接口,官网文档:《Non-blocking API Reference》(链接可能需要翻墙)。
2. 安装
异步 client driver 需要依赖 mariadb 的 mariadb-connector-c
,下面是源码安装步骤流程。
- Linux
1
2
3
4
5
6
sudo yum -y install git gcc openssl-devel make cmake
git clone https://github.com/MariaDB/mariadb-connector-c.git
mkdir build && cd build
cmake ../mariadb-connector-c/ -DCMAKE_INSTALL_PREFIX=/usr
make
sudo make install
- MacOS
mariadb-connector-c
依赖 openssl
库,根据你的安装路径设置依赖关系:OPENSSL_ROOT_DIR
, OPENSSL_LIBRARIES
。
1
2
3
4
5
wget http://mariadb.mirror.iweb.com//connector-c-3.1.9/mariadb-connector-c-3.1.9-src.tar.gz
tar zxf mariadb-connector-c-3.1.9-src.tar.gz
mkdir build && cd build
sudo cmake ../mariadb-connector-c-3.1.9-src/ -DCMAKE_INSTALL_PREFIX=/usr/local -DOPENSSL_ROOT_DIR=/usr/local/opt/openssl -DOPENSSL_LIBRARIES=/usr/local/opt/openssl/lib
sudo make && make install
3. 性能
测试数据: 100,000。
测试场景:单线程。
测试结果:看数据表吧,因为读写 sql 命令比较简单,测试结果只作参考吧。
- Mac (8 核,16G 内存)
links | driver | read / s | write / s |
---|---|---|---|
1 | sync | 18913.9 | 2706.23 |
1 | async | 13576.3 | 3773.74 |
5 | async | 35166.9 | 12635.7 |
10 | async | 40861.2 | 17500.7 |
- Centos(双核,4G 内存)
links | driver | read / s | write / s |
---|---|---|---|
1 | sync | 6730.01 | 6985.49 |
1 | async | 5379.34 | 5827.66 |
2 | async | 8009.77 | 8774.84 |
5 | async | 8788.27 | 9544.37 |
4. 源码
4.1. 原理
虽然是异步非阻塞操作,mysql 不像 redis(pipeline) 那样支持批量命令处理。
异步 client 发送命令,每个命令需要等待 mysql 返回结果后,才能再发送下一个,所以单链接的异步处理本质上也是串行的,在 Linux 上,异步与同步比较,并没有什么优势可言。
但是异步处理,是非阻塞的,单线程能支持多个链接“并行”工作,这样 单线程的吞吐性能
得到提升。
测试项目的异步链接池基于 libev
对链接事件进行管理,我们来看看读数据的流程逻辑:
4.2. 配置
数据库链接信息,写在 json 配置文件里。
1
2
3
4
5
6
7
8
9
10
11
12
{
"database": {
"test": {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "root123!@#",
"charset": "utf8mb4",
"max_conn_cnt": 5
}
}
}
4.3. 连接池接口
尽量简化连接池接口,只有 3 个对外接口:初始化,读数据,写数据。
详细连接池源码可以查看 github
1
2
3
4
5
6
7
8
9
10
/* 回调接口定义. */
typedef void(MysqlExecCallbackFn)(const MysqlAsyncConn*, sql_task_t* task);
typedef void(MysqlQueryCallbackFn)(const MysqlAsyncConn*, sql_task_t* task, MysqlResult* res);
/* 初始化数据库信息,读取配置,加载数据库连接信息。*/
bool init(CJsonObject& config);
/* 写数据接口。node 参数是 json 配置里的 database 信息。*/
bool async_exec(const char* node, MysqlExecCallbackFn* fn, const char* sql, void* privdata = nullptr);
/* 读数据接口。node 参数是 json 配置里的 database 信息。*/
bool async_query(const char* node, MysqlQueryCallbackFn* fn, const char* sql, void* privdata = nullptr);
4.4. 状态机工作流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
bool MysqlAsyncConn::init(const db_info_t* db_info, struct ev_loop* loop) {
...
/* 设置 mysql client 异步属性。 */
mysql_options(&m_mysql, MYSQL_OPT_NONBLOCK, 0);
...
}
void MysqlAsyncConn::wait_for_mysql(struct ev_loop* loop, ev_io* w, int event) {
switch (m_state) {
case STATE::CONNECT_WAITING:
connect_wait(loop, w, event);
break;
case STATE::WAIT_OPERATE:
operate_wait();
break;
case STATE::QUERY_WAITING:
query_wait(loop, w, event);
break;
case STATE::EXECSQL_WAITING:
exec_sql_wait(loop, w, event);
break;
case STATE::STORE_WAITING:
store_result_wait(loop, w, event);
break;
case STATE::PING_WAITING:
ping_wait(loop, w, event);
break;
default:
LOG_ERROR("invalid state: %d", m_state);
break;
}
}
4.5. 测试源码
- 详细测试源码可以查看 github
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static void mysql_exec_callback(const kim::MysqlAsyncConn* c, kim::sql_task_t* task) {...}
static void mysql_query_callback(const kim::MysqlAsyncConn* c, kim::sql_task_t* task, kim::MysqlResult* res) {...}
int main(int args, char** argv) {
...
struct ev_loop* loop = EV_DEFAULT;
kim::DBMgr* pool = new kim::DBMgr(m_logger, loop);
...
for (int i = 0; i < g_test_cnt; i++) {
if (g_is_write) {
snprintf(sql, sizeof(sql),
"insert into mytest.test_async_mysql (value) values ('%s %d');", "hello world", i);
if (!pool->async_exec("test", &mysql_exec_callback, sql)) {
LOG_ERROR("exec sql failed! sql: %s", sql);
return 1;
}
} else {
snprintf(sql, sizeof(sql), "select value from mytest.test_async_mysql where id = 1;");
if (!pool->async_query("test", &mysql_query_callback, sql)) {
LOG_ERROR("quert sql failed! sql: %s", sql);
return 1;
}
}
}
...
ev_run(loop, 0);
...
}
5. 小结
- mysql client 异步读写需要 mariadb client 支持。
- mysql 异步与同步 client,单连接性能差距不大,区别在于:异步是非阻塞的,同步是阻塞的。
- mariadb 异步 client 使用复杂度还是有点高,需要造轮子,这不是一件简单的事。
- 如果你正在使用鹅厂的轻量级协程库:libco,使用同步的 mysql client 能达到异步效果:《libco 协程库学习,测试连接 mysql》,但是当你实际使用,可能又会遇到新的坑,太难了…
- 我认为无论多牛的技术,首先需要使用简单才行,所以折腾过 C/C++,你才会发现为啥越来越多人拥抱 golang;它有强大的生态,一个
go get
就能轻松获得一个高质量的数据库连接池🙃,而且性能还不错。