diff --git a/README.md b/README.md index 8fc89de..022621d 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ curl http://localhost:3000/?prefix=path/to/ ### Operations ```bash -# Rebuild index by scanning all volumes (disaster recovery) +# Rebuild index by scanning all volumes (stop the server first) mkv -d /tmp/index.db -v http://vol1:8080,http://vol2:8080,http://vol3:8080 -r 2 rebuild # Rebalance after adding/removing volumes (preview with --dry-run) diff --git a/src/db.rs b/src/db.rs index f928d03..cd65587 100644 --- a/src/db.rs +++ b/src/db.rs @@ -76,13 +76,45 @@ impl Db { pub async fn list_keys(&self, prefix: &str) -> Result, AppError> { let conn = self.conn.clone(); - let pattern = format!("{prefix}%"); + let prefix = prefix.to_string(); tokio::task::spawn_blocking(move || { let conn = conn.lock().unwrap(); - let mut stmt = conn.prepare_cached("SELECT key FROM kv WHERE key LIKE ?1 ORDER BY key")?; - let keys = stmt - .query_map(params![pattern], |row| row.get(0))? - .collect::, _>>()?; + if prefix.is_empty() { + let mut stmt = conn.prepare_cached("SELECT key FROM kv ORDER BY key")?; + let keys = stmt + .query_map([], |row| row.get(0))? + .collect::, _>>()?; + return Ok(keys); + } + // Compute exclusive upper bound: increment last non-0xFF byte + let upper = { + let mut bytes = prefix.as_bytes().to_vec(); + let mut result = None; + while let Some(last) = bytes.pop() { + if last < 0xFF { + bytes.push(last + 1); + result = Some(String::from_utf8_lossy(&bytes).into_owned()); + break; + } + } + result + }; + let keys = match &upper { + Some(end) => { + let mut stmt = conn.prepare_cached( + "SELECT key FROM kv WHERE key >= ?1 AND key < ?2 ORDER BY key", + )?; + stmt.query_map(params![prefix, end], |row| row.get(0))? + .collect::, _>>()? + } + None => { + let mut stmt = conn.prepare_cached( + "SELECT key FROM kv WHERE key >= ?1 ORDER BY key", + )?; + stmt.query_map(params![prefix], |row| row.get(0))? + .collect::, _>>()? + } + }; Ok(keys) }) .await @@ -120,6 +152,7 @@ impl Db { let conn = self.conn.clone(); tokio::task::spawn_blocking(move || { let conn = conn.lock().unwrap(); + conn.execute_batch("BEGIN")?; let mut stmt = conn.prepare_cached( "INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3) ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", @@ -127,6 +160,8 @@ impl Db { for (key, volumes, size) in &records { stmt.execute(params![key, encode_volumes(volumes), size])?; } + drop(stmt); + conn.execute_batch("COMMIT")?; Ok(()) }) .await diff --git a/src/lib.rs b/src/lib.rs index fa5c1fa..4e80a27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,8 @@ pub mod rebuild; use std::sync::Arc; +const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB + pub struct Args { pub db_path: String, pub volumes: Vec, @@ -14,6 +16,15 @@ pub struct Args { } pub fn build_app(args: &Args) -> axum::Router { + if args.replicas > args.volumes.len() { + eprintln!( + "Error: replication factor ({}) exceeds number of volumes ({})", + args.replicas, + args.volumes.len() + ); + std::process::exit(1); + } + if let Some(parent) = std::path::Path::new(&args.db_path).parent() { std::fs::create_dir_all(parent).unwrap_or_else(|e| { eprintln!("Failed to create database directory: {e}"); @@ -37,5 +48,6 @@ pub fn build_app(args: &Args) -> axum::Router { .delete(server::delete_key) .head(server::head_key), ) + .layer(axum::extract::DefaultBodyLimit::max(DEFAULT_BODY_LIMIT)) .with_state(state) }