Skip to content

Instantly share code, notes, and snippets.

@michalc
Last active April 3, 2024 17:26
Show Gist options
  • Select an option

  • Save michalc/a3147997e21665896836e0f4157975cb to your computer and use it in GitHub Desktop.

Select an option

Save michalc/a3147997e21665896836e0f4157975cb to your computer and use it in GitHub Desktop.
Use libsqlite3 directly from Python with ctypes: without using the built-in sqlite3 Python package, and without compiling anything
# From https://stackoverflow.com/a/68876046/1319998, which is itself inspired by https://stackoverflow.com/a/68814418/1319998
from contextlib import contextmanager
from collections import namedtuple
from ctypes import cdll, byref, string_at, c_char_p, c_int, c_double, c_int64, c_void_p
from ctypes.util import find_library
from sys import platform
def query(db_file, sql, params=()):
libsqlite3 = cdll.LoadLibrary(find_library('sqlite3'))
libsqlite3.sqlite3_errstr.restype = c_char_p
libsqlite3.sqlite3_errmsg.restype = c_char_p
libsqlite3.sqlite3_column_name.restype = c_char_p
libsqlite3.sqlite3_column_double.restype = c_double
libsqlite3.sqlite3_column_int64.restype = c_int64
libsqlite3.sqlite3_column_blob.restype = c_void_p
libsqlite3.sqlite3_column_bytes.restype = c_int64
SQLITE_ROW = 100
SQLITE_DONE = 101
SQLITE_TRANSIENT = -1
SQLITE_OPEN_READWRITE = 0x00000002
bind = {
type(0): libsqlite3.sqlite3_bind_int64,
type(0.0): libsqlite3.sqlite3_bind_double,
type(''): lambda pp_stmt, i, value: libsqlite3.sqlite3_bind_text(pp_stmt, i, value.encode('utf-8'), len(value.encode('utf-8')), SQLITE_TRANSIENT),
type(b''): lambda pp_stmt, i, value: libsqlite3.sqlite3_bind_blob(pp_stmt, i, value, len(value), SQLITE_TRANSIENT),
type(None): lambda pp_stmt, i, _: libsqlite3.sqlite3_bind_null(pp_stmt, i),
}
extract = {
1: libsqlite3.sqlite3_column_int64,
2: libsqlite3.sqlite3_column_double,
3: lambda pp_stmt, i: string_at(
libsqlite3.sqlite3_column_blob(pp_stmt, i),
libsqlite3.sqlite3_column_bytes(pp_stmt, i),
).decode(),
4: lambda pp_stmt, i: string_at(
libsqlite3.sqlite3_column_blob(pp_stmt, i),
libsqlite3.sqlite3_column_bytes(pp_stmt, i),
),
5: lambda pp_stmt, i: None,
}
def run(func, *args):
res = func(*args)
if res != 0:
raise Exception(libsqlite3.sqlite3_errstr(res).decode())
def run_with_db(db, func, *args):
if func(*args) != 0:
raise Exception(libsqlite3.sqlite3_errmsg(db).decode())
@contextmanager
def get_db(db_file):
db = c_void_p()
run(libsqlite3.sqlite3_open_v2, db_file.encode(), byref(db), SQLITE_OPEN_READWRITE, None)
try:
yield db
finally:
run_with_db(db, libsqlite3.sqlite3_close, db)
@contextmanager
def get_pp_stmt(db, sql):
pp_stmt = c_void_p()
run_with_db(db, libsqlite3.sqlite3_prepare_v3, db, sql.encode(), -1, 0, byref(pp_stmt), None)
try:
yield pp_stmt
finally:
run_with_db(db, libsqlite3.sqlite3_finalize, pp_stmt)
with \
get_db(db_file) as db, \
get_pp_stmt(db, sql) as pp_stmt:
for i, param in enumerate(params):
run_with_db(db, bind[type(param)], pp_stmt, i + 1, param)
row_constructor = namedtuple('Row', (
libsqlite3.sqlite3_column_name(pp_stmt, i).decode()
for i in range(0, libsqlite3.sqlite3_column_count(pp_stmt))
))
while True:
res = libsqlite3.sqlite3_step(pp_stmt)
if res == SQLITE_DONE:
break
if res != SQLITE_ROW:
raise Exception(libsqlite3.sqlite3_errstr(res).decode())
yield row_constructor(*(
extract[libsqlite3.sqlite3_column_type(pp_stmt, i)](pp_stmt, i)
for i in range(0, len(row_constructor._fields))
))
@michalc
Copy link
Copy Markdown
Author

michalc commented May 12, 2023

No problem!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment