ILD

sqlite mode and concurrency
作者:Yuan Jianpeng 邮箱:yuanjp89@163.com
发布时间:2026-1-28 站点:Inside Linux Development

最近的nas项目中,开发的相册模块,使用sqlite存储用户数据,包括相册、照片等信息。然而在使用中碰到了sqlite的很多坑。主要是写入和读取速度的问题。


1 写完关闭连接

想着写入不是频繁的操作,写完关闭数据库,这样可以节省内存,但是发现这种情况效率非常之低,可能1s只能写入几条数据。

strace发现,每次打开数据库都会执行一次sync操作,需要等待几百毫秒到几秒。

所以,对于守护程序,有频繁读写操作的,需要保持数据库为打开状态。


2 WAL模式

sqlite是atomic commit and rollback(原子提交和回滚)类型的数据库。默认模式是rollback journal(回滚日志)。但是在2010年的3.7版本引入了一种新的模式WAL,Write-Ahead Log。


rollback journal模式,是把要覆盖的旧数据写入到一个-journal的日志文件。

而WAL模式,则直接把新数据写入到一个-wal文件,这样就完全不动旧数据库文件,当-wal文件大于4M时,再写回到数据库文件。


WAL模式的优点:

大多数情况下它的写入速度都比rollback journal模式显著的快,因为它在有在checkpoint的时候,才执行sync操作。

支持读写并发


3 synchronous

synchronous用来设定数据同步到磁盘的模式,也就是调用操作系统sync()接口的方式。

不同的方式会显著影响 atomic, consistent, isolated, and durable (ACID)。


有4种模式:

OFF

完全关闭同步,速度会非常快,但是当程序崩溃或者操作系统崩溃或者断电的场景下,数据库文件可能会损坏。


NORMAL

大多数场景执行sync,对于WAL模式可以平衡写入性能和数据完整性。


FULL

是rollback journal的默认模式。


EXTRA

在FULL的基础上,对父目录也执行sync操作,比如删除-journal的时候,就需要sync父目录,比FULL更严格。


WAL模式用NORMAIL就行了。我们的项目选择了WAL模式,测试写入性能快很多。


4 concurrency

和大型商业数据库相比,SQLite是一个并发支持很差的数据库,

sqlite的写粒度是整个数据库,而postgreSQL支持行级锁。

sqlite只能有1个writer,而postgre支持多写并发。


sqlite是一个in-process数据库,其并发有2种类型,一种是多进程,一种是多线程。

多进程本质就是多个连接的并发,因为每个进程会单独打开数据,建立连接。

多线程一般是指单个连接、多个statement。如果多线程都有独立的连接,那就等价于多进程。


multiple connection

多连接情况的并发。


journal rollback模式,不支持并发,读和写也不能并发。只有一个进程能进行读或者写。

进程A正在写,进程B去读,会返回SQLITE_BUSY database is locked


WAL模式,支持一个写,但是可以多个读,而且写不阻塞读。

进程A正在写,进程B也尝试去写,会返回SQLITE_BUSY database is locked


busy timeout

sqlite提供一个busy timeout的机制,可以在获取锁失败的时候,自动去等待重试。

这是一个非常糟糕的机制,因为它没有通知机制,就是简单的重复sleep,重试。

测试两个进程并发写,设置busy timeout非常大,一个进程的写入速度几秒钟才写入一行数据。


one conneciton, multiple operation

单连接,多线程同时读写的并发和多连接是一样的。比如WAL模式,只有一个写,但是可以多个读。

但是由于多个线程是在同一个进程环境,sqlite在操作内部实现了锁,不会出现busy wait。使用者是无感的。


比如测试程序两个线程同时写

$ time ./a.out -c -v -w wr  wr -s 100000

enable WAL mode

try create table

wr, start 0 count 0 batch 0

wr, start 100000 count 0 batch 0

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++^C+wr 331 rows

+wr 324 rows


real    0m3.988s

user    0m0.028s

sys     0m0.062s


因此,我们得出了一个重要的结论,单连接、多线程同时读写

在代码上,我们可以像单线程一样,直接编程,sqlite内部会加锁。


参考5,sqlite定义了3种多线程模式,这个就是针对sqlite内部锁的。

single-thread,取消了所有的mutex,只能在单线程场景使用。

multiple-thread,可以多线程,但是database connection或者object,例如prepared statement不要跨线程使用。

Serialized,connection和object都可以跨线程使用,默认是这种模式,内部有完整的锁。



参考

[1] Write-Ahead Logging. https://www.sqlite.org/wal.html

[2] Atomic Commit In SQLite. https://www.sqlite.org/atomiccommit.html

[3] The Rollback Journal. https://www.sqlite.org/lockingv3.html#rollback

[4] PRAGMA schema.synchronous; https://sqlite.org/pragma.html

[5] Using SQLite In Multi-Threaded Applications. https://www.sqlite.org/threadsafe.html

[6] Result and Error Codes. https://www.sqlite.org/rescode.html

[7] Set A Busy Timeout. https://www.sqlite.org/c3ref/busy_timeout.html.

[8] Abusing SQLite to Handle Concurrency. https://blog.skypilot.co/abusing-sqlite-to-handle-concurrency


并发测试程序,可以起多个线程,进行读、写、删除、更新。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <unistd.h>
#include <sqlite3.h>

static struct sqlite3 *conn;
static int wal = 0, create = 0, timeout = 0;
static int verbose = 0;

static struct cmd {
	pthread_t t;
	const char *name;
	int start;
	int count;
	int batch;
	int delay;
} cmds[8];
static int n_cmd;

static volatile int sig_exit;

static void usage() {
	fprintf(stderr, "./sqlite_batch [sqlite options] <cmd> ...\n"
			"\n"
			"sqlite options:\n"
			"\t-w, WAL mode\n"
			"\t-c, try create table\n"
			"\t-t <timeout>, sqlite busy timeout\n"
			"\t-v, verbose\n"
			"\n"
			"cmd: <rd|wr|del|update> [cmd options] ...\n"
			"cmd options:\n"
			"\t-s <start id>\n"
			"\t-n <rows>\n"
			"\t-b <rows per transaction>\n"
			"\t-d <ms> delay milliseconds\n"
			"\n"
	);
	exit(1);
}

static void sig_handler(int signum)
{
	switch (signum)
	{
	case SIGINT:
		sig_exit = 1;
		break;
	};
}

static void install_sig_handler()
{
	struct sigaction act = {
		.sa_handler = sig_handler,
	};

	sigaction(SIGINT, &act, NULL);
}

static char *sql_err(int ret)
{
	static char errstr[512];
	snprintf(errstr, sizeof(errstr), "%d(%s): %s",
			ret, sqlite3_errstr(ret), sqlite3_errmsg(conn));
	return errstr;
}

static int open_db(const char *path)
{
	int ret;
	const char *sql = "CREATE TABLE IF NOT EXISTS T (id INTEGER PRIMARY KEY, name TEXT);";

	ret = sqlite3_open(path, &conn);
	if (ret != SQLITE_OK) {
		fprintf(stderr, "sqlite3_open() open %s failed: %s\n", path, sqlite3_errstr(ret));
		return -1;
	}

	if (wal) {
		printf("enable WAL mode\n");

		ret = sqlite3_exec(conn, "PRAGMA journal_mode=WAL;", 0, 0, 0);
		if (ret != SQLITE_OK) {
			fprintf(stderr, "enable WAL failed: %s\n", sql_err(ret));
			return -1;
		}
	}

	if (timeout) {
		printf("sqlite busy timeout %dms\n", timeout);

		ret = sqlite3_busy_timeout(conn, timeout);
		if (ret != SQLITE_OK)
			fprintf(stderr, "sqlite3_busy_timeout failed: %s\n", sql_err(ret));
	}

	if (create) {
		printf("try create table\n");

		ret = sqlite3_exec(conn, sql, 0, 0, NULL);
		if (ret != SQLITE_OK) {
			fprintf(stderr, "create table failed: %s\n", sql_err(ret));
			return -1;
		}
	}

	return 0;
}

static void close_db()
{
	int ret ;

	ret = sqlite3_close(conn);
	if (ret != SQLITE_OK)
		fprintf(stderr, "sqlite3 close fialed: %s\n", sql_err(ret));
}

static void begin_trans()
{
	int ret;

	ret = sqlite3_exec(conn, "BEGIN TRANSACTION;", 0, 0, NULL);
	if (ret != SQLITE_OK) {
		fprintf(stderr, "begin transaction failed: %s\n", sql_err(ret));
		exit(1);
	}
}

static void end_trans()
{
	int ret;

	ret = sqlite3_exec(conn, "END TRANSACTION;", 0, 0, NULL);
	if (ret != SQLITE_OK) {
		fprintf(stderr, "end transaction failed: %s\n", sql_err(ret));
		exit(1);
	}
}

static struct sqlite3_stmt *prepare_read(int start)
{
	sqlite3_stmt *stmt = NULL;
	int ret;
	const char *sql = "SELECT id, name FROM T where id > ?;";

	ret = sqlite3_prepare_v2(conn, sql, -1, &stmt, NULL);
	if (ret != SQLITE_OK) {
		fprintf(stderr, "prepare statement failed: %s\n", sql_err(ret));
		return NULL;
	}

	ret = sqlite3_bind_int(stmt, 1, start);
	if (ret != SQLITE_OK) {
		fprintf(stderr, "bind failed: %s\n", sql_err(ret));
		return NULL;
	}

	return stmt;
}

static int read_next(struct sqlite3_stmt *stmt, int *id, const char **name)
{
	int ret;

	ret = sqlite3_step(stmt);
	if (ret == SQLITE_DONE)
		return 0;

	else if (ret != SQLITE_ROW) {
		fprintf(stderr, "list album failed: %s\n", sql_err(ret));
		return -1;
	}

	*id = sqlite3_column_int(stmt, 0);
	*name = (const char *)sqlite3_column_text(stmt, 1);

	return 1;
}

static int insert(int id, const char *name)
{
	sqlite3_stmt *stmt = NULL;
	int ret;
	int err = -1;
	const char *sql = "INSERT INTO T (id, name) VALUES(?, ?);";

	ret = sqlite3_prepare_v2(conn, sql, -1, &stmt, NULL);
	if (ret != SQLITE_OK) {
		fprintf(stderr, "prepare statement failed: %s\n", sql_err(ret));
		return -1;
	}

	ret = 0;
	ret += sqlite3_bind_int(stmt, 1, id);
	ret += sqlite3_bind_text(stmt, 2, name, -1, SQLITE_STATIC);

	if (ret != SQLITE_OK) {
		fprintf(stderr, "bind failed\n");
		goto out;
	}

	ret = sqlite3_step(stmt);
	if (ret != SQLITE_DONE) {
		fprintf(stderr, "step failed: %s\n", sql_err(ret));
		goto out;
	}

	err = 1;

out:
	sqlite3_finalize(stmt);
	return err;
}

static int delete(int id)
{
	sqlite3_stmt *stmt = NULL;
	int ret;
	int err = -1;
	const char *sql = "DELETE FROM T where id = ?;";

	ret = sqlite3_prepare_v2(conn, sql, -1, &stmt, NULL);
	if (ret != SQLITE_OK) {
		fprintf(stderr, "prepare statement failed: %s\n", sql_err(ret));
		return -1;
	}

	ret = 0;
	ret += sqlite3_bind_int(stmt, 1, id);
	if (ret != SQLITE_OK) {
		fprintf(stderr, "bind failed\n");
		goto out;
	}

	ret = sqlite3_step(stmt);
	if (ret != SQLITE_DONE) {
		fprintf(stderr, "step failed: %s\n", sql_err(ret));
		goto out;
	}

	err = sqlite3_changes(conn);

out:
	sqlite3_finalize(stmt);
	return err;
}

static int update(int id, const char *name)
{
	sqlite3_stmt *stmt = NULL;
	int ret;
	int err = -1;
	const char *sql = "UPDATE T SET name = ? WHERE id = ?;";

	ret = sqlite3_prepare_v2(conn, sql, -1, &stmt, NULL);
	if (ret != SQLITE_OK) {
		fprintf(stderr, "prepare statement failed: %s\n", sql_err(ret));
		return -1;
	}

	ret = 0;
	ret += sqlite3_bind_text(stmt, 1, name, -1, SQLITE_STATIC);
	ret += sqlite3_bind_int(stmt, 2, id);

	if (ret != SQLITE_OK) {
		fprintf(stderr, "bind failed\n");
		goto out;
	}

	ret = sqlite3_step(stmt);
	if (ret != SQLITE_DONE) {
		fprintf(stderr, "step failed: %s\n", sql_err(ret));
		goto out;
	}

	err = sqlite3_changes(conn);

out:
	sqlite3_finalize(stmt);
	return err;
}

static void do_read(struct cmd *cmd)
{
	sqlite3_stmt *stmt;
	int n = 0;

	printf("read, start %d count %d\n", cmd->start, cmd->count);

	stmt = prepare_read(cmd->start);
	if (!stmt)
		return;

	while (!sig_exit) {
		int ret;
		int id;
		const char *name;

		ret = read_next(stmt, &id, &name);
		if (ret <= 0)
			break;

		if (verbose) {
			printf(".");
			fflush(stdout);
		}

		n++;
	}

	sqlite3_finalize(stmt);
	printf("read %d rows\n", n);
}

static void do_write(struct cmd *cmd, char type)
{
	int id = cmd->start;
	int n = 0;
	int batch = 0;

	printf("%s, start %d count %d batch %d\n", cmd->name, cmd->start, cmd->count, cmd->batch);

	while (!sig_exit) {
		int ret;

		if (cmd->batch > 1) {
			if (batch == 0)
				begin_trans();
		}

		switch (type) {
		case '+':
			ret = insert(id, "hello world");
			break;
		case '-':
			ret = delete(id);
			break;
		case '*':
			ret = update(id, "aaaaaaaaaaaaaaaaa");
			break;
			break;
		}
		if (ret <= 0)
			break;

		if (verbose) {
			printf("%c", type);
			fflush(stdout);
		}

		if (cmd->batch > 1) {
			batch++;
			if (batch >= cmd->batch) {
				end_trans();
				batch = 0;

				if (verbose) {
					printf("|");
					fflush(stdout);
				}
			}
		}

		id++;
		n++;
		if (cmd->count && n >= cmd->count)
			break;
	}

	printf("%s %d rows\n", cmd->name, n);
}

static void parse_arg(int argc, char **argv)
{
	char *k;

	while ((k = *argv++)) {
		char *v = *argv;

		if (!strcmp(k, "-w"))
			wal = 1;

		else if (!strcmp(k, "-c"))
			create = 1;

		else if (!strcmp(k, "-t")) {
			timeout = atoi(v);
			argv++;
		}

		else if (!strcmp(k, "-v"))
			verbose = 1;

		else {
			struct cmd *cmd;

			if (k[0] != '-') {
				cmd = &cmds[n_cmd];
				n_cmd++;
				cmd->name = k;
			}
			else {
				cmd = &cmds[n_cmd-1];

				if (!strcmp(k, "-s"))
					cmd->start = atoi(v);

				else if (!strcmp(k, "-n"))
					cmd->count = atoi(v);

				else if (!strcmp(k, "-b"))
					cmd->batch = atoi(v);

				else if (!strcmp(k, "-d"))
					cmd->delay = atoi(v);

				argv++;
			}
		}
	}

	if (n_cmd == 0)
		usage();
}

static void* run_cmd(void *arg)
{
	struct cmd *cmd = (struct cmd *)arg;

	if (cmd->delay)
		usleep(cmd->delay * 1000);

	if (!strcmp(cmd->name, "rd"))
		do_read(cmd);

	else if (!strcmp(cmd->name, "wr"))
		do_write(cmd, '+');

	else if (!strcmp(cmd->name, "del"))
		do_write(cmd, '-');

	else if (!strcmp(cmd->name, "update"))
		do_write(cmd, '*');

	else {
		fprintf(stderr, "unknown command\n");
		usage();
	}
}

int main(int argc, char **argv)
{
	int i;

	parse_arg(argc-1, argv+1);

	install_sig_handler();

	i = open_db("test.db");
	if (i < 0)
		return 1;

	for (i = 1; i < n_cmd; i++)
		pthread_create(&cmds[i].t, NULL, run_cmd, &cmds[i]);

	run_cmd(&cmds[0]);

	for (i = 1; i < n_cmd; i++)
		pthread_join(cmds[i].t, NULL);

	close_db();
	return 0;
}


Copyright © linuxdev.cc 2017-2024. Some Rights Reserved.