Initial commit

This commit is contained in:
Silas Brack 2026-03-07 09:53:12 +01:00
commit 8d32777f9f
8 changed files with 2790 additions and 0 deletions

334
src/db.rs Normal file
View file

@ -0,0 +1,334 @@
use rusqlite::{params, Connection, OpenFlags};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
use crate::error::AppError;
// --- Record type ---
#[derive(Debug, Clone)]
pub struct Record {
pub key: String,
pub volumes: Vec<String>,
pub path: String,
pub size: Option<i64>,
}
// --- SQLite setup ---
fn apply_pragmas(conn: &Connection) {
conn.execute_batch(
"
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA busy_timeout = 5000;
PRAGMA temp_store = memory;
PRAGMA cache_size = -64000;
PRAGMA mmap_size = 268435456;
PRAGMA page_size = 4096;
",
)
.expect("failed to set pragmas");
}
fn open_readonly(path: &str) -> Connection {
let conn = Connection::open_with_flags(
path,
OpenFlags::SQLITE_OPEN_READ_ONLY
| OpenFlags::SQLITE_OPEN_NO_MUTEX
| OpenFlags::SQLITE_OPEN_URI,
)
.expect("failed to open read connection");
apply_pragmas(&conn);
conn
}
fn open_readwrite(path: &str) -> Connection {
let conn = Connection::open_with_flags(
path,
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_NO_MUTEX
| OpenFlags::SQLITE_OPEN_URI,
)
.expect("failed to open write connection");
apply_pragmas(&conn);
conn
}
fn create_tables(conn: &Connection) {
conn.execute_batch(
"
CREATE TABLE IF NOT EXISTS kv (
key TEXT PRIMARY KEY,
volumes TEXT NOT NULL,
path TEXT NOT NULL,
size INTEGER,
created_at INTEGER DEFAULT (unixepoch()),
deleted INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_kv_deleted ON kv(deleted);
",
)
.expect("failed to create tables");
}
fn parse_volumes(volumes_json: &str) -> Vec<String> {
serde_json::from_str(volumes_json).unwrap_or_default()
}
fn encode_volumes(volumes: &[String]) -> String {
serde_json::to_string(volumes).unwrap()
}
// --- ReadPool ---
#[derive(Clone)]
pub struct ReadPool {
conns: Vec<Arc<Mutex<Connection>>>,
next: Arc<AtomicUsize>,
}
impl ReadPool {
pub fn new(path: &str, size: usize) -> Self {
let conns = (0..size)
.map(|_| Arc::new(Mutex::new(open_readonly(path))))
.collect();
Self {
conns,
next: Arc::new(AtomicUsize::new(0)),
}
}
pub async fn query<T, F>(&self, f: F) -> Result<T, AppError>
where
T: Send + 'static,
F: FnOnce(&Connection) -> Result<T, AppError> + Send + 'static,
{
let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
let conn = self.conns[idx].clone();
tokio::task::spawn_blocking(move || {
let conn = conn.lock().unwrap();
f(&conn)
})
.await
.unwrap()
}
}
// --- Read query functions ---
pub fn get(conn: &Connection, key: &str) -> Result<Record, AppError> {
let mut stmt =
conn.prepare_cached("SELECT key, volumes, path, size FROM kv WHERE key = ?1 AND deleted = 0")?;
Ok(stmt.query_row(params![key], |row| {
let volumes_json: String = row.get(1)?;
Ok(Record {
key: row.get(0)?,
volumes: parse_volumes(&volumes_json),
path: row.get(2)?,
size: row.get(3)?,
})
})?)
}
pub fn list_keys(conn: &Connection, prefix: &str) -> Result<Vec<String>, AppError> {
let mut stmt =
conn.prepare_cached("SELECT key FROM kv WHERE key LIKE ?1 AND deleted = 0 ORDER BY key")?;
let pattern = format!("{prefix}%");
let keys = stmt
.query_map(params![pattern], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()?;
Ok(keys)
}
pub fn all_records(conn: &Connection) -> Result<Vec<Record>, AppError> {
let mut stmt =
conn.prepare_cached("SELECT key, volumes, path, size FROM kv WHERE deleted = 0")?;
let records = stmt
.query_map([], |row| {
let volumes_json: String = row.get(1)?;
Ok(Record {
key: row.get(0)?,
volumes: parse_volumes(&volumes_json),
path: row.get(2)?,
size: row.get(3)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
// --- Write commands ---
pub enum WriteCmd {
Put {
key: String,
volumes: Vec<String>,
path: String,
size: Option<i64>,
reply: oneshot::Sender<Result<(), AppError>>,
},
Delete {
key: String,
reply: oneshot::Sender<Result<(), AppError>>,
},
BulkPut {
records: Vec<(String, Vec<String>, String, Option<i64>)>,
reply: oneshot::Sender<Result<(), AppError>>,
},
}
fn execute_cmd(conn: &Connection, cmd: WriteCmd) -> (Result<(), AppError>, oneshot::Sender<Result<(), AppError>>) {
match cmd {
WriteCmd::Put {
key,
volumes,
path,
size,
reply,
} => {
let volumes_json = encode_volumes(&volumes);
let result = conn
.prepare_cached(
"INSERT INTO kv (key, volumes, path, size) VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(key) DO UPDATE SET volumes = ?2, path = ?3, size = ?4, deleted = 0",
)
.and_then(|mut s| s.execute(params![key, volumes_json, path, size]))
.map(|_| ())
.map_err(AppError::from);
(result, reply)
}
WriteCmd::Delete { key, reply } => {
let result = conn
.prepare_cached("DELETE FROM kv WHERE key = ?1")
.and_then(|mut s| s.execute(params![key]))
.map(|_| ())
.map_err(AppError::from);
(result, reply)
}
WriteCmd::BulkPut { records, reply } => {
let result = (|| -> Result<(), AppError> {
let mut stmt = conn.prepare_cached(
"INSERT INTO kv (key, volumes, path, size) VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(key) DO UPDATE SET volumes = ?2, path = ?3, size = ?4, deleted = 0",
)?;
for (key, volumes, path, size) in &records {
let volumes_json = encode_volumes(volumes);
stmt.execute(params![key, volumes_json, path, size])?;
}
Ok(())
})();
(result, reply)
}
}
}
// --- WriterHandle ---
#[derive(Clone)]
pub struct WriterHandle {
tx: mpsc::Sender<WriteCmd>,
}
impl WriterHandle {
pub async fn put(
&self,
key: String,
volumes: Vec<String>,
path: String,
size: Option<i64>,
) -> Result<(), AppError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx
.send(WriteCmd::Put {
key,
volumes,
path,
size,
reply: reply_tx,
})
.await
.map_err(|_| AppError::WriterDead)?;
reply_rx.await.map_err(|_| AppError::WriterDroppedReply)?
}
pub async fn delete(&self, key: String) -> Result<(), AppError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx
.send(WriteCmd::Delete {
key,
reply: reply_tx,
})
.await
.map_err(|_| AppError::WriterDead)?;
reply_rx.await.map_err(|_| AppError::WriterDroppedReply)?
}
pub async fn bulk_put(
&self,
records: Vec<(String, Vec<String>, String, Option<i64>)>,
) -> Result<(), AppError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx
.send(WriteCmd::BulkPut {
records,
reply: reply_tx,
})
.await
.map_err(|_| AppError::WriterDead)?;
reply_rx.await.map_err(|_| AppError::WriterDroppedReply)?
}
}
// --- spawn_writer ---
pub fn spawn_writer(path: String) -> (WriterHandle, oneshot::Receiver<()>) {
let (tx, mut rx) = mpsc::channel::<WriteCmd>(4096);
let (ready_tx, ready_rx) = oneshot::channel();
std::thread::spawn(move || {
let conn = open_readwrite(&path);
create_tables(&conn);
let _ = ready_tx.send(());
loop {
let Some(first) = rx.blocking_recv() else {
break;
};
let mut batch = vec![first];
while batch.len() < 512 {
match rx.try_recv() {
Ok(cmd) => batch.push(cmd),
Err(_) => break,
}
}
let _ = conn.execute_batch("BEGIN");
let mut replies: Vec<(Result<(), AppError>, oneshot::Sender<Result<(), AppError>>)> =
Vec::with_capacity(batch.len());
for (i, cmd) in batch.into_iter().enumerate() {
let sp = format!("sp{i}");
let _ = conn.execute(&format!("SAVEPOINT {sp}"), []);
let (result, reply) = execute_cmd(&conn, cmd);
if result.is_ok() {
let _ = conn.execute(&format!("RELEASE {sp}"), []);
} else {
let _ = conn.execute(&format!("ROLLBACK TO {sp}"), []);
let _ = conn.execute(&format!("RELEASE {sp}"), []);
}
replies.push((result, reply));
}
let _ = conn.execute_batch("COMMIT");
for (result, reply) in replies {
let _ = reply.send(result);
}
}
});
(WriterHandle { tx }, ready_rx)
}