使用WAL模式的好处是执行的sync动作比较少,默认情况下,直接数据写入WAL文件,当数据积攒到1000(默认值)个page时,会触发自动checkpoint。
但是自动checkpoint触发的时候,仍然会执行fdatasync()系统调用,这可能阻塞执行线程几十毫秒到十几秒。如果执行线程还处理UI查询请求,那么UI查询会等待比较长的时间。
为了规避这一问题,可以关闭自动checkpoint,然后启动一个线程,在线程里面执行checkpoint。具体的流程如下。
定义一个结构体,用来存储相关变量,作为一个统一个参数。
struct wal_check_ctx { sqlite3 *db; pthread_t thread; pthread_cond_t cond; pthread_mutex_t mutex; int check; int check_threshold; int exit; }; // 初始化ctx struct wal_check_ctx ctx = { .cond = PTHREAD_COND_INITIALIZER, .mutex = PTHREAD_MUTEX_INITIALIZER, .check_threshold = 1000, };
db是打开的sqlite connection。注意异步线程和主线程可以共用一个connection,可以单独打开一个connection(性能单独打开貌似更好)。
thread是线程id。
cond和mutex用来加锁和唤醒异步checkpoint线程。
check,表示是否需要执行checkpoint。
check_threshold,是执行checkpoint的阈值(WAL文件积攒的page数)
exit表示是否退出异步线程。
1创建线程,注册wal hook
ctx.db = db; ret = pthread_create(&ctx.thread, NULL, wal_check_thread, &ctx); if (ret) { fprintf(stderr, "create wal check thread failed: %s\n", strerror(ret)); return 1; } _ctx = sqlite3_wal_hook(db, wal_check_hook, &ctx); if (!_ctx) return 1;
在主线程,创建一个异步checkpoint线程,并且设置好wal hook,设置wal hook后会关闭自动checkpoint。
2 hook处理函数
int wal_check_hook(void *_ctx, sqlite3 *db, const char *name, int page) { struct wal_check_ctx *ctx = _ctx; printf("page %d\n", page); if (page > ctx->check_threshold) { pthread_mutex_lock(&ctx->mutex); ctx->check = 1; pthread_cond_broadcast(&ctx->cond); pthread_mutex_unlock(&ctx->mutex); } return SQLITE_OK; }
hook函数是由sqlite自动调用的,调用的时机是在主线程插入数据完成,释放write lock之后。
the callback is invoked by SQLite after the commit has taken place and the associated write-lock on the database released, so the implementation may read, write or checkpoint the database as required.
hook函数有一个参数,是WAL文件中当前有多少个page,判断page大于阈值的话。就将check设置为1,然后唤醒异步checkpoint线程。
3 异步checkpoint线程
void *wal_check_thread(void *_ctx) { struct wal_check_ctx *ctx = _ctx; pthread_mutex_lock(&ctx->mutex); while (!ctx->exit) { pthread_cond_wait(&ctx->cond, &ctx->mutex); if (ctx->check) { int nlog, nCkpt; int ret; pthread_mutex_unlock(&ctx->mutex); /* * note: when wal is not truncated, nlog & nCkpt both contains checked pages */ ret = sqlite3_wal_checkpoint_v2(ctx->db, NULL, SQLITE_CHECKPOINT_PASSIVE, &nlog, &nCkpt); if (ret == 0) { printf("nlog %d, nCkpt %d\n", nlog, nCkpt); } else { printf("wal checkpoint failed: %s\n", sqlite3_errstr(ret)); } pthread_mutex_lock(&ctx->mutex); if (ret == 0 && (nlog - nCkpt) < ctx->check_threshold) { ctx->check = 0; } } } pthread_mutex_unlock(&ctx->mutex); return NULL; }
异步线程的工作就是循环等待唤醒,判断check是1的话,就执行checkpoint。这里使用PASSIVE模式,这样就不会阻塞writer。但是PASSIVE模式不一定能成功。会报错:wal checkpoint failed: database table is locked。因为一个writer已经拿到了table lock。
4 主动唤醒
void wake_wal_check(struct wal_check_ctx *ctx) { pthread_mutex_lock(&ctx->mutex); if (ctx->check) pthread_cond_broadcast(&ctx->cond); pthread_mutex_unlock(&ctx->mutex); }
因为sqlite wal hook唤醒执行的时候,可能数据库被writer锁住导致checkpoint失败。因此添加这个接口,在主线程空闲的时候主动调用来唤醒异步checkpoint线程。一个好的调用时机是在主线程每次完成数据库更新操作之后,立即调用此函数。
5 退出异步checkpoint线程
void wait_wal_thread(struct wal_check_ctx *ctx) { int nlog, nCkpt; int ret; pthread_mutex_lock(&ctx->mutex); ctx->exit = 1; pthread_cond_broadcast(&ctx->cond); pthread_mutex_unlock(&ctx->mutex); pthread_join(ctx->thread, NULL); }
将exit设置为1,然后唤醒异步线程,然后等待异步线程结束。
疑问?
执行checkpoint是否会阻塞writer?
参考:
[1] Write-Ahead Log Commit Hook. https://www.sqlite.org/c3ref/wal_hook.html
[2] Configure an auto-checkpoint. https://www.sqlite.org/c3ref/wal_autocheckpoint.html
[3] Checkpoint a database. https://sqlite.org/c3ref/wal_checkpoint_v2.html