最近的nas项目中,开发的相册模块,使用sqlite存储用户数据,包括相册、照片等信息。然而在使用中碰到了sqlite的很多坑。主要是写入和读取速度的问题。
想着写入不是频繁的操作,写完关闭数据库,这样可以节省内存,但是发现这种情况效率非常之低,可能1s只能写入几条数据。
strace发现,每次打开数据库都会执行一次sync操作,需要等待几百毫秒到几秒。
所以,对于守护程序,有频繁读写操作的,需要保持数据库为打开状态。
sqlite是atomic commit and rollback(原子提交和回滚)类型的数据库。默认模式是rollback journal(回滚日志)。但是在2010年的3.7版本引入了一种新的模式WAL,Write-Ahead Log。
rollback journal模式,是把要覆盖的旧数据写入到一个-journal的日志文件。
而WAL模式,则直接把新数据写入到一个-wal文件,这样就完全不动旧数据库文件,当-wal文件大于4M时,再写回到数据库文件。
WAL模式的优点:
大多数情况下它的写入速度都比rollback journal模式显著的快,因为它在有在checkpoint的时候,才执行sync操作。
支持读写并发
synchronous用来设定数据同步到磁盘的模式,也就是调用操作系统sync()接口的方式。
不同的方式会显著影响 atomic, consistent, isolated, and durable (ACID)。
有4种模式:
OFF
完全关闭同步,速度会非常快,但是当程序崩溃或者操作系统崩溃或者断电的场景下,数据库文件可能会损坏。
NORMAL
大多数场景执行sync,对于WAL模式可以平衡写入性能和数据完整性。
FULL
是rollback journal的默认模式。
EXTRA
在FULL的基础上,对父目录也执行sync操作,比如删除-journal的时候,就需要sync父目录,比FULL更严格。
WAL模式用NORMAIL就行了。我们的项目选择了WAL模式,测试写入性能快很多。
和大型商业数据库相比,SQLite是一个并发支持很差的数据库,
sqlite的写粒度是整个数据库,而postgreSQL支持行级锁。
sqlite只能有1个writer,而postgre支持多写并发。
sqlite是一个in-process数据库,其并发有2种类型,一种是多进程,一种是多线程。
多进程本质就是多个连接的并发,因为每个进程会单独打开数据,建立连接。
多线程一般是指单个连接、多个statement。如果多线程都有独立的连接,那就等价于多进程。
多连接情况的并发。
journal rollback模式,不支持并发,读和写也不能并发。只有一个进程能进行读或者写。
进程A正在写,进程B去读,会返回SQLITE_BUSY database is locked
WAL模式,支持一个写,但是可以多个读,而且写不阻塞读。
进程A正在写,进程B也尝试去写,会返回SQLITE_BUSY database is locked
busy timeout
sqlite提供一个busy timeout的机制,可以在获取锁失败的时候,自动去等待重试。
这是一个非常糟糕的机制,因为它没有通知机制,就是简单的重复sleep,重试。
测试两个进程并发写,设置busy timeout非常大,一个进程的写入速度几秒钟才写入一行数据。
单连接,多线程同时读写的并发和多连接是一样的。比如WAL模式,只有一个写,但是可以多个读。
但是由于多个线程是在同一个进程环境,sqlite在操作内部实现了锁,不会出现busy wait。使用者是无感的。
比如测试程序两个线程同时写
$ time ./a.out -c -v -w wr wr -s 100000
enable WAL mode
try create table
wr, start 0 count 0 batch 0
wr, start 100000 count 0 batch 0
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++^C+wr 331 rows
+wr 324 rows
real 0m3.988s
user 0m0.028s
sys 0m0.062s
因此,我们得出了一个重要的结论,单连接、多线程同时读写
在代码上,我们可以像单线程一样,直接编程,sqlite内部会加锁。
参考5,sqlite定义了3种多线程模式,这个就是针对sqlite内部锁的。
single-thread,取消了所有的mutex,只能在单线程场景使用。
multiple-thread,可以多线程,但是database connection或者object,例如prepared statement不要跨线程使用。
Serialized,connection和object都可以跨线程使用,默认是这种模式,内部有完整的锁。
参考
[1] Write-Ahead Logging. https://www.sqlite.org/wal.html
[2] Atomic Commit In SQLite. https://www.sqlite.org/atomiccommit.html
[3] The Rollback Journal. https://www.sqlite.org/lockingv3.html#rollback
[4] PRAGMA schema.synchronous; https://sqlite.org/pragma.html
[5] Using SQLite In Multi-Threaded Applications. https://www.sqlite.org/threadsafe.html
[6] Result and Error Codes. https://www.sqlite.org/rescode.html
[7] Set A Busy Timeout. https://www.sqlite.org/c3ref/busy_timeout.html.
[8] Abusing SQLite to Handle Concurrency. https://blog.skypilot.co/abusing-sqlite-to-handle-concurrency
并发测试程序,可以起多个线程,进行读、写、删除、更新。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <unistd.h>
#include <sqlite3.h>
static struct sqlite3 *conn;
static int wal = 0, create = 0, timeout = 0;
static int verbose = 0;
static struct cmd {
pthread_t t;
const char *name;
int start;
int count;
int batch;
int delay;
} cmds[8];
static int n_cmd;
static volatile int sig_exit;
static void usage() {
fprintf(stderr, "./sqlite_batch [sqlite options] <cmd> ...\n"
"\n"
"sqlite options:\n"
"\t-w, WAL mode\n"
"\t-c, try create table\n"
"\t-t <timeout>, sqlite busy timeout\n"
"\t-v, verbose\n"
"\n"
"cmd: <rd|wr|del|update> [cmd options] ...\n"
"cmd options:\n"
"\t-s <start id>\n"
"\t-n <rows>\n"
"\t-b <rows per transaction>\n"
"\t-d <ms> delay milliseconds\n"
"\n"
);
exit(1);
}
static void sig_handler(int signum)
{
switch (signum)
{
case SIGINT:
sig_exit = 1;
break;
};
}
static void install_sig_handler()
{
struct sigaction act = {
.sa_handler = sig_handler,
};
sigaction(SIGINT, &act, NULL);
}
static char *sql_err(int ret)
{
static char errstr[512];
snprintf(errstr, sizeof(errstr), "%d(%s): %s",
ret, sqlite3_errstr(ret), sqlite3_errmsg(conn));
return errstr;
}
static int open_db(const char *path)
{
int ret;
const char *sql = "CREATE TABLE IF NOT EXISTS T (id INTEGER PRIMARY KEY, name TEXT);";
ret = sqlite3_open(path, &conn);
if (ret != SQLITE_OK) {
fprintf(stderr, "sqlite3_open() open %s failed: %s\n", path, sqlite3_errstr(ret));
return -1;
}
if (wal) {
printf("enable WAL mode\n");
ret = sqlite3_exec(conn, "PRAGMA journal_mode=WAL;", 0, 0, 0);
if (ret != SQLITE_OK) {
fprintf(stderr, "enable WAL failed: %s\n", sql_err(ret));
return -1;
}
}
if (timeout) {
printf("sqlite busy timeout %dms\n", timeout);
ret = sqlite3_busy_timeout(conn, timeout);
if (ret != SQLITE_OK)
fprintf(stderr, "sqlite3_busy_timeout failed: %s\n", sql_err(ret));
}
if (create) {
printf("try create table\n");
ret = sqlite3_exec(conn, sql, 0, 0, NULL);
if (ret != SQLITE_OK) {
fprintf(stderr, "create table failed: %s\n", sql_err(ret));
return -1;
}
}
return 0;
}
static void close_db()
{
int ret ;
ret = sqlite3_close(conn);
if (ret != SQLITE_OK)
fprintf(stderr, "sqlite3 close fialed: %s\n", sql_err(ret));
}
static void begin_trans()
{
int ret;
ret = sqlite3_exec(conn, "BEGIN TRANSACTION;", 0, 0, NULL);
if (ret != SQLITE_OK) {
fprintf(stderr, "begin transaction failed: %s\n", sql_err(ret));
exit(1);
}
}
static void end_trans()
{
int ret;
ret = sqlite3_exec(conn, "END TRANSACTION;", 0, 0, NULL);
if (ret != SQLITE_OK) {
fprintf(stderr, "end transaction failed: %s\n", sql_err(ret));
exit(1);
}
}
static struct sqlite3_stmt *prepare_read(int start)
{
sqlite3_stmt *stmt = NULL;
int ret;
const char *sql = "SELECT id, name FROM T where id > ?;";
ret = sqlite3_prepare_v2(conn, sql, -1, &stmt, NULL);
if (ret != SQLITE_OK) {
fprintf(stderr, "prepare statement failed: %s\n", sql_err(ret));
return NULL;
}
ret = sqlite3_bind_int(stmt, 1, start);
if (ret != SQLITE_OK) {
fprintf(stderr, "bind failed: %s\n", sql_err(ret));
return NULL;
}
return stmt;
}
static int read_next(struct sqlite3_stmt *stmt, int *id, const char **name)
{
int ret;
ret = sqlite3_step(stmt);
if (ret == SQLITE_DONE)
return 0;
else if (ret != SQLITE_ROW) {
fprintf(stderr, "list album failed: %s\n", sql_err(ret));
return -1;
}
*id = sqlite3_column_int(stmt, 0);
*name = (const char *)sqlite3_column_text(stmt, 1);
return 1;
}
static int insert(int id, const char *name)
{
sqlite3_stmt *stmt = NULL;
int ret;
int err = -1;
const char *sql = "INSERT INTO T (id, name) VALUES(?, ?);";
ret = sqlite3_prepare_v2(conn, sql, -1, &stmt, NULL);
if (ret != SQLITE_OK) {
fprintf(stderr, "prepare statement failed: %s\n", sql_err(ret));
return -1;
}
ret = 0;
ret += sqlite3_bind_int(stmt, 1, id);
ret += sqlite3_bind_text(stmt, 2, name, -1, SQLITE_STATIC);
if (ret != SQLITE_OK) {
fprintf(stderr, "bind failed\n");
goto out;
}
ret = sqlite3_step(stmt);
if (ret != SQLITE_DONE) {
fprintf(stderr, "step failed: %s\n", sql_err(ret));
goto out;
}
err = 1;
out:
sqlite3_finalize(stmt);
return err;
}
static int delete(int id)
{
sqlite3_stmt *stmt = NULL;
int ret;
int err = -1;
const char *sql = "DELETE FROM T where id = ?;";
ret = sqlite3_prepare_v2(conn, sql, -1, &stmt, NULL);
if (ret != SQLITE_OK) {
fprintf(stderr, "prepare statement failed: %s\n", sql_err(ret));
return -1;
}
ret = 0;
ret += sqlite3_bind_int(stmt, 1, id);
if (ret != SQLITE_OK) {
fprintf(stderr, "bind failed\n");
goto out;
}
ret = sqlite3_step(stmt);
if (ret != SQLITE_DONE) {
fprintf(stderr, "step failed: %s\n", sql_err(ret));
goto out;
}
err = sqlite3_changes(conn);
out:
sqlite3_finalize(stmt);
return err;
}
static int update(int id, const char *name)
{
sqlite3_stmt *stmt = NULL;
int ret;
int err = -1;
const char *sql = "UPDATE T SET name = ? WHERE id = ?;";
ret = sqlite3_prepare_v2(conn, sql, -1, &stmt, NULL);
if (ret != SQLITE_OK) {
fprintf(stderr, "prepare statement failed: %s\n", sql_err(ret));
return -1;
}
ret = 0;
ret += sqlite3_bind_text(stmt, 1, name, -1, SQLITE_STATIC);
ret += sqlite3_bind_int(stmt, 2, id);
if (ret != SQLITE_OK) {
fprintf(stderr, "bind failed\n");
goto out;
}
ret = sqlite3_step(stmt);
if (ret != SQLITE_DONE) {
fprintf(stderr, "step failed: %s\n", sql_err(ret));
goto out;
}
err = sqlite3_changes(conn);
out:
sqlite3_finalize(stmt);
return err;
}
static void do_read(struct cmd *cmd)
{
sqlite3_stmt *stmt;
int n = 0;
printf("read, start %d count %d\n", cmd->start, cmd->count);
stmt = prepare_read(cmd->start);
if (!stmt)
return;
while (!sig_exit) {
int ret;
int id;
const char *name;
ret = read_next(stmt, &id, &name);
if (ret <= 0)
break;
if (verbose) {
printf(".");
fflush(stdout);
}
n++;
}
sqlite3_finalize(stmt);
printf("read %d rows\n", n);
}
static void do_write(struct cmd *cmd, char type)
{
int id = cmd->start;
int n = 0;
int batch = 0;
printf("%s, start %d count %d batch %d\n", cmd->name, cmd->start, cmd->count, cmd->batch);
while (!sig_exit) {
int ret;
if (cmd->batch > 1) {
if (batch == 0)
begin_trans();
}
switch (type) {
case '+':
ret = insert(id, "hello world");
break;
case '-':
ret = delete(id);
break;
case '*':
ret = update(id, "aaaaaaaaaaaaaaaaa");
break;
break;
}
if (ret <= 0)
break;
if (verbose) {
printf("%c", type);
fflush(stdout);
}
if (cmd->batch > 1) {
batch++;
if (batch >= cmd->batch) {
end_trans();
batch = 0;
if (verbose) {
printf("|");
fflush(stdout);
}
}
}
id++;
n++;
if (cmd->count && n >= cmd->count)
break;
}
printf("%s %d rows\n", cmd->name, n);
}
static void parse_arg(int argc, char **argv)
{
char *k;
while ((k = *argv++)) {
char *v = *argv;
if (!strcmp(k, "-w"))
wal = 1;
else if (!strcmp(k, "-c"))
create = 1;
else if (!strcmp(k, "-t")) {
timeout = atoi(v);
argv++;
}
else if (!strcmp(k, "-v"))
verbose = 1;
else {
struct cmd *cmd;
if (k[0] != '-') {
cmd = &cmds[n_cmd];
n_cmd++;
cmd->name = k;
}
else {
cmd = &cmds[n_cmd-1];
if (!strcmp(k, "-s"))
cmd->start = atoi(v);
else if (!strcmp(k, "-n"))
cmd->count = atoi(v);
else if (!strcmp(k, "-b"))
cmd->batch = atoi(v);
else if (!strcmp(k, "-d"))
cmd->delay = atoi(v);
argv++;
}
}
}
if (n_cmd == 0)
usage();
}
static void* run_cmd(void *arg)
{
struct cmd *cmd = (struct cmd *)arg;
if (cmd->delay)
usleep(cmd->delay * 1000);
if (!strcmp(cmd->name, "rd"))
do_read(cmd);
else if (!strcmp(cmd->name, "wr"))
do_write(cmd, '+');
else if (!strcmp(cmd->name, "del"))
do_write(cmd, '-');
else if (!strcmp(cmd->name, "update"))
do_write(cmd, '*');
else {
fprintf(stderr, "unknown command\n");
usage();
}
}
int main(int argc, char **argv)
{
int i;
parse_arg(argc-1, argv+1);
install_sig_handler();
i = open_db("test.db");
if (i < 0)
return 1;
for (i = 1; i < n_cmd; i++)
pthread_create(&cmds[i].t, NULL, run_cmd, &cmds[i]);
run_cmd(&cmds[0]);
for (i = 1; i < n_cmd; i++)
pthread_join(cmds[i].t, NULL);
close_db();
return 0;
}