diff --git a/Cargo.lock b/Cargo.lock index 2fc31eb..052121b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -377,23 +377,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" -[[package]] -name = "futures-io" -version = "0.3.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" - -[[package]] -name = "futures-macro" -version = "0.3.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "futures-sink" version = "0.3.32" @@ -413,11 +396,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", - "futures-io", - "futures-macro", - "futures-sink", "futures-task", - "memchr", "pin-project-lite", "slab", ] @@ -878,7 +857,6 @@ name = "mkv" version = "0.1.0" dependencies = [ "axum", - "bytes", "clap", "reqwest", "rusqlite", @@ -886,7 +864,6 @@ dependencies = [ "serde_json", "sha2", "tokio", - "toml", "tracing", "tracing-subscriber", ] @@ -1082,7 +1059,6 @@ dependencies = [ "bytes", "encoding_rs", "futures-core", - "futures-util", "h2", "http", "http-body", @@ -1104,14 +1080,12 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", "web-sys", ] @@ -1299,15 +1273,6 @@ dependencies = [ "serde_core", ] -[[package]] -name = "serde_spanned" -version = "0.6.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" -dependencies = [ - "serde", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1541,47 +1506,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "toml" -version = "0.8.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" -dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit", -] - -[[package]] -name = "toml_datetime" -version = "0.6.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" -dependencies = [ - "serde", -] - -[[package]] -name = "toml_edit" -version = "0.22.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" -dependencies = [ - "indexmap", - "serde", - "serde_spanned", - "toml_datetime", - "toml_write", - "winnow", -] - -[[package]] -name = "toml_write" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" - [[package]] name = "tower" version = "0.5.3" @@ -1872,19 +1796,6 @@ dependencies = [ "wasmparser", ] -[[package]] -name = "wasm-streams" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" -dependencies = [ - "futures-util", - "js-sys", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "wasmparser" version = "0.244.0" @@ -2024,15 +1935,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "winnow" -version = "0.7.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" -dependencies = [ - "memchr", -] - [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/Cargo.toml b/Cargo.toml index bc2fcbc..659e8bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,15 +7,13 @@ edition = "2024" axum = "0.8" tokio = { version = "1", features = ["full"] } rusqlite = { version = "0.35", features = ["bundled"] } -reqwest = { version = "0.12", features = ["stream", "json"] } +reqwest = { version = "0.12", features = ["json"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -toml = "0.8" clap = { version = "4", features = ["derive"] } tracing = "0.1" tracing-subscriber = "0.3" sha2 = "0.10" -bytes = "1" [profile.release] opt-level = 3 diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index 0fe1a3f..0000000 --- a/src/config.rs +++ /dev/null @@ -1,47 +0,0 @@ -use serde::Deserialize; -use std::path::Path; - -#[derive(Debug, Deserialize, Clone)] -pub struct Config { - pub server: ServerConfig, - pub database: DatabaseConfig, - pub volumes: Vec, -} - -#[derive(Debug, Deserialize, Clone)] -pub struct ServerConfig { - pub port: u16, - pub replication_factor: usize, - pub virtual_nodes: usize, -} - -#[derive(Debug, Deserialize, Clone)] -pub struct DatabaseConfig { - pub path: String, -} - -#[derive(Debug, Deserialize, Clone)] -pub struct VolumeConfig { - pub url: String, -} - -impl Config { - pub fn load(path: &Path) -> Result> { - let contents = std::fs::read_to_string(path)?; - let config: Config = toml::from_str(&contents)?; - if config.volumes.is_empty() { - return Err("at least one volume is required".into()); - } - if config.server.replication_factor == 0 { - return Err("replication_factor must be >= 1".into()); - } - if config.server.replication_factor > config.volumes.len() { - return Err("replication_factor exceeds number of volumes".into()); - } - Ok(config) - } - - pub fn volume_urls(&self) -> Vec { - self.volumes.iter().map(|v| v.url.clone()).collect() - } -} diff --git a/src/db.rs b/src/db.rs index 37b1a5f..ee835fd 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,11 +1,8 @@ use rusqlite::{params, Connection, OpenFlags}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use crate::error::AppError; -// --- Record type --- - #[derive(Debug, Clone)] pub struct Record { pub key: String, @@ -13,174 +10,95 @@ pub struct Record { pub size: Option, } -// --- SQLite setup --- - fn apply_pragmas(conn: &Connection) { conn.execute_batch( - " - PRAGMA journal_mode = WAL; - PRAGMA synchronous = NORMAL; - PRAGMA busy_timeout = 5000; - PRAGMA temp_store = memory; - PRAGMA cache_size = -64000; - PRAGMA mmap_size = 268435456; - PRAGMA page_size = 4096; - ", + "PRAGMA journal_mode = WAL; + PRAGMA synchronous = NORMAL; + PRAGMA busy_timeout = 5000; + PRAGMA temp_store = memory; + PRAGMA cache_size = -64000; + PRAGMA mmap_size = 268435456; + PRAGMA page_size = 4096;", ) .expect("failed to set pragmas"); } -fn open_readonly(path: &str) -> Connection { - let conn = Connection::open_with_flags( - path, - OpenFlags::SQLITE_OPEN_READ_ONLY - | OpenFlags::SQLITE_OPEN_NO_MUTEX - | OpenFlags::SQLITE_OPEN_URI, - ) - .expect("failed to open read connection"); - apply_pragmas(&conn); - conn +fn parse_volumes(s: &str) -> Vec { + serde_json::from_str(s).unwrap_or_default() } -fn open_readwrite(path: &str) -> Connection { - let conn = Connection::open_with_flags( - path, - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_NO_MUTEX - | OpenFlags::SQLITE_OPEN_URI, - ) - .expect("failed to open write connection"); - apply_pragmas(&conn); - conn +fn encode_volumes(v: &[String]) -> String { + serde_json::to_string(v).unwrap() } -fn create_tables(conn: &Connection) { - conn.execute_batch( - " - CREATE TABLE IF NOT EXISTS kv ( - key TEXT PRIMARY KEY, - volumes TEXT NOT NULL, - size INTEGER, - created_at INTEGER DEFAULT (unixepoch()) - ); - ", - ) - .expect("failed to create tables"); -} - -fn parse_volumes(volumes_json: &str) -> Vec { - serde_json::from_str(volumes_json).unwrap_or_default() -} - -fn encode_volumes(volumes: &[String]) -> String { - serde_json::to_string(volumes).unwrap() -} - -// --- ReadPool --- - +/// A single SQLite connection behind a mutex, used for both reads and writes. #[derive(Clone)] -pub struct ReadPool { - conns: Vec>>, - next: Arc, +pub struct Db { + conn: Arc>, } -impl ReadPool { - pub fn new(path: &str, size: usize) -> Self { - let conns = (0..size) - .map(|_| Arc::new(Mutex::new(open_readonly(path)))) - .collect(); - Self { - conns, - next: Arc::new(AtomicUsize::new(0)), - } +impl Db { + pub fn new(path: &str) -> Self { + let conn = Connection::open_with_flags( + path, + OpenFlags::SQLITE_OPEN_READ_WRITE + | OpenFlags::SQLITE_OPEN_CREATE + | OpenFlags::SQLITE_OPEN_NO_MUTEX + | OpenFlags::SQLITE_OPEN_URI, + ) + .expect("failed to open database"); + apply_pragmas(&conn); + conn.execute_batch( + "CREATE TABLE IF NOT EXISTS kv ( + key TEXT PRIMARY KEY, + volumes TEXT NOT NULL, + size INTEGER, + created_at INTEGER DEFAULT (unixepoch()) + );", + ) + .expect("failed to create tables"); + Self { conn: Arc::new(Mutex::new(conn)) } } - pub async fn query(&self, f: F) -> Result - where - T: Send + 'static, - F: FnOnce(&Connection) -> Result + Send + 'static, - { - let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len(); - let conn = self.conns[idx].clone(); + pub async fn get(&self, key: &str) -> Result { + let conn = self.conn.clone(); + let key = key.to_string(); tokio::task::spawn_blocking(move || { let conn = conn.lock().unwrap(); - f(&conn) + let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv WHERE key = ?1")?; + Ok(stmt.query_row(params![key], |row| { + let vj: String = row.get(1)?; + Ok(Record { key: row.get(0)?, volumes: parse_volumes(&vj), size: row.get(2)? }) + })?) }) .await .unwrap() } -} -// --- Read query functions --- - -pub fn get(conn: &Connection, key: &str) -> Result { - let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv WHERE key = ?1")?; - Ok(stmt.query_row(params![key], |row| { - let volumes_json: String = row.get(1)?; - Ok(Record { - key: row.get(0)?, - volumes: parse_volumes(&volumes_json), - size: row.get(2)?, + pub async fn list_keys(&self, prefix: &str) -> Result, AppError> { + let conn = self.conn.clone(); + let pattern = format!("{prefix}%"); + tokio::task::spawn_blocking(move || { + let conn = conn.lock().unwrap(); + let mut stmt = conn.prepare_cached("SELECT key FROM kv WHERE key LIKE ?1 ORDER BY key")?; + let keys = stmt + .query_map(params![pattern], |row| row.get(0))? + .collect::, _>>()?; + Ok(keys) }) - })?) -} - -pub fn list_keys(conn: &Connection, prefix: &str) -> Result, AppError> { - let mut stmt = conn.prepare_cached("SELECT key FROM kv WHERE key LIKE ?1 ORDER BY key")?; - let pattern = format!("{prefix}%"); - let keys = stmt - .query_map(params![pattern], |row| row.get(0))? - .collect::, _>>()?; - Ok(keys) -} - -pub fn all_records(conn: &Connection) -> Result, AppError> { - let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv")?; - let records = stmt - .query_map([], |row| { - let volumes_json: String = row.get(1)?; - Ok(Record { - key: row.get(0)?, - volumes: parse_volumes(&volumes_json), - size: row.get(2)?, - }) - })? - .collect::, _>>()?; - Ok(records) -} - -// --- WriterHandle --- - -#[derive(Clone)] -pub struct WriterHandle { - conn: Arc>, -} - -impl WriterHandle { - pub fn new(path: &str) -> Self { - let conn = open_readwrite(path); - create_tables(&conn); - Self { - conn: Arc::new(Mutex::new(conn)), - } + .await + .unwrap() } - pub async fn put( - &self, - key: String, - volumes: Vec, - size: Option, - ) -> Result<(), AppError> { + pub async fn put(&self, key: String, volumes: Vec, size: Option) -> Result<(), AppError> { let conn = self.conn.clone(); tokio::task::spawn_blocking(move || { let conn = conn.lock().unwrap(); - let volumes_json = encode_volumes(&volumes); conn.prepare_cached( "INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3) ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", )? - .execute(params![key, volumes_json, size])?; + .execute(params![key, encode_volumes(&volumes), size])?; Ok(()) }) .await @@ -199,10 +117,7 @@ impl WriterHandle { .unwrap() } - pub async fn bulk_put( - &self, - records: Vec<(String, Vec, Option)>, - ) -> Result<(), AppError> { + pub async fn bulk_put(&self, records: Vec<(String, Vec, Option)>) -> Result<(), AppError> { let conn = self.conn.clone(); tokio::task::spawn_blocking(move || { let conn = conn.lock().unwrap(); @@ -211,12 +126,23 @@ impl WriterHandle { ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", )?; for (key, volumes, size) in &records { - let volumes_json = encode_volumes(volumes); - stmt.execute(params![key, volumes_json, size])?; + stmt.execute(params![key, encode_volumes(volumes), size])?; } Ok(()) }) .await .unwrap() } + + pub fn all_records_sync(&self) -> Result, AppError> { + let conn = self.conn.lock().unwrap(); + let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv")?; + let records = stmt + .query_map([], |row| { + let vj: String = row.get(1)?; + Ok(Record { key: row.get(0)?, volumes: parse_volumes(&vj), size: row.get(2)? }) + })? + .collect::, _>>()?; + Ok(records) + } } diff --git a/src/error.rs b/src/error.rs index 75272d0..04dad66 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,7 +6,6 @@ pub enum AppError { NotFound, Db(rusqlite::Error), VolumeError(String), - NoHealthyVolume, } impl From for AppError { @@ -24,7 +23,6 @@ impl std::fmt::Display for AppError { AppError::NotFound => write!(f, "not found"), AppError::Db(e) => write!(f, "database error: {e}"), AppError::VolumeError(msg) => write!(f, "volume error: {msg}"), - AppError::NoHealthyVolume => write!(f, "no healthy volume available"), } } } @@ -33,7 +31,6 @@ impl IntoResponse for AppError { fn into_response(self) -> Response { let (status, msg) = match &self { AppError::NotFound => (StatusCode::NOT_FOUND, self.to_string()), - AppError::NoHealthyVolume => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()), _ => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), }; (status, msg).into_response() diff --git a/src/hasher.rs b/src/hasher.rs index f917db0..9b684ea 100644 --- a/src/hasher.rs +++ b/src/hasher.rs @@ -1,61 +1,18 @@ use sha2::{Digest, Sha256}; -use std::collections::BTreeMap; -pub struct Ring { - nodes: BTreeMap, - virtual_nodes: usize, -} - -impl Ring { - pub fn new(volumes: &[String], virtual_nodes: usize) -> Self { - let mut ring = Self { - nodes: BTreeMap::new(), - virtual_nodes, - }; - for url in volumes { - ring.add_volume(url); - } - ring - } - - pub fn add_volume(&mut self, url: &str) { - for i in 0..self.virtual_nodes { - let hash = hash_key(&format!("{url}:{i}")); - self.nodes.insert(hash, url.to_string()); - } - } - - pub fn remove_volume(&mut self, url: &str) { - self.nodes.retain(|_, v| v != url); - } - - /// Walk the ring clockwise from the key's hash position and return - /// `count` distinct physical volumes. - pub fn get_volumes(&self, key: &str, count: usize) -> Vec { - if self.nodes.is_empty() { - return vec![]; - } - - let hash = hash_key(key); - let mut result = Vec::with_capacity(count); - - // Walk clockwise from hash position - for (_, url) in self.nodes.range(hash..).chain(self.nodes.iter()) { - if !result.contains(url) { - result.push(url.clone()); - } - if result.len() == count { - break; - } - } - - result - } -} - -fn hash_key(input: &str) -> u64 { - let hash = Sha256::digest(input.as_bytes()); - u64::from_be_bytes(hash[..8].try_into().unwrap()) +/// Pick `count` volumes for a key by hashing key+volume, sorting by score. +/// Same idea as minikeyvalue's key2volume — stable in volume name, not position. +pub fn volumes_for_key(key: &str, volumes: &[String], count: usize) -> Vec { + let mut scored: Vec<(u64, &String)> = volumes + .iter() + .map(|v| { + let hash = Sha256::digest(format!("{key}:{v}").as_bytes()); + let score = u64::from_be_bytes(hash[..8].try_into().unwrap()); + (score, v) + }) + .collect(); + scored.sort_by_key(|(score, _)| *score); + scored.into_iter().take(count).map(|(_, v)| v.clone()).collect() } #[cfg(test)] @@ -63,48 +20,29 @@ mod tests { use super::*; #[test] - fn test_ring_distribution() { - let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect(); - let ring = Ring::new(&volumes, 100); - - let selected = ring.get_volumes("test-key", 3); - assert_eq!(selected.len(), 3); - for v in &volumes { - assert!(selected.contains(v), "missing volume {v}"); - } - } - - #[test] - fn test_ring_deterministic() { - let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect(); - let ring = Ring::new(&volumes, 100); - - let a = ring.get_volumes("my-key", 2); - let b = ring.get_volumes("my-key", 2); + fn test_deterministic() { + let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); + let a = volumes_for_key("my-key", &volumes, 2); + let b = volumes_for_key("my-key", &volumes, 2); assert_eq!(a, b); } #[test] - fn test_ring_count_capped() { - let volumes: Vec = (1..=2).map(|i| format!("http://vol{i}:300{i}")).collect(); - let ring = Ring::new(&volumes, 100); - - let selected = ring.get_volumes("key", 5); + fn test_count_capped() { + let volumes: Vec = (1..=2).map(|i| format!("http://vol{i}")).collect(); + let selected = volumes_for_key("key", &volumes, 5); assert_eq!(selected.len(), 2); } #[test] - fn test_ring_even_distribution() { - let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect(); - let ring = Ring::new(&volumes, 100); - + fn test_even_distribution() { + let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); let mut counts = std::collections::HashMap::new(); for i in 0..3000 { let key = format!("key-{i}"); - let primary = &ring.get_volumes(&key, 1)[0]; + let primary = &volumes_for_key(&key, &volumes, 1)[0]; *counts.entry(primary.clone()).or_insert(0u32) += 1; } - for (vol, count) in &counts { assert!( *count > 700 && *count < 1300, @@ -114,65 +52,30 @@ mod tests { } #[test] - fn test_ring_stability_on_add() { - // Adding a 4th volume to a 3-volume ring should only move ~25% of keys + fn test_stability_on_add() { let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); - let ring_before = Ring::new(&volumes, 100); - let mut volumes4 = volumes.clone(); volumes4.push("http://vol4".into()); - let ring_after = Ring::new(&volumes4, 100); let total = 10000; let mut moved = 0; for i in 0..total { let key = format!("key-{i}"); - let before = &ring_before.get_volumes(&key, 1)[0]; - let after = &ring_after.get_volumes(&key, 1)[0]; + let before = &volumes_for_key(&key, &volumes, 1)[0]; + let after = &volumes_for_key(&key, &volumes4, 1)[0]; if before != after { moved += 1; } } - - // Ideal: 1/4 of keys move (25%). Allow 15%-40%. let pct = moved as f64 / total as f64 * 100.0; assert!( pct > 15.0 && pct < 40.0, - "expected ~25% of keys to move, got {pct:.1}% ({moved}/{total})" + "expected ~25% of keys to move, got {pct:.1}%" ); } #[test] - fn test_ring_stability_on_remove() { - // Removing 1 volume from a 4-volume ring should only move ~25% of keys - let volumes: Vec = (1..=4).map(|i| format!("http://vol{i}")).collect(); - let ring_before = Ring::new(&volumes, 100); - - let volumes3: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); - let ring_after = Ring::new(&volumes3, 100); - - let total = 10000; - let mut moved = 0; - for i in 0..total { - let key = format!("key-{i}"); - let before = &ring_before.get_volumes(&key, 1)[0]; - let after = &ring_after.get_volumes(&key, 1)[0]; - if before != after { - moved += 1; - } - } - - // Ideal: 1/4 of keys move (25%). Allow 15%-40%. - let pct = moved as f64 / total as f64 * 100.0; - assert!( - pct > 15.0 && pct < 40.0, - "expected ~25% of keys to move, got {pct:.1}% ({moved}/{total})" - ); - } - - #[test] - fn test_ring_empty() { - let ring = Ring::new(&[], 100); - assert_eq!(ring.get_volumes("key", 1), Vec::::new()); + fn test_empty() { + assert_eq!(volumes_for_key("key", &[], 1), Vec::::new()); } } diff --git a/src/health.rs b/src/health.rs deleted file mode 100644 index 2637c37..0000000 --- a/src/health.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::collections::HashSet; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::RwLock; - -use crate::volume::VolumeClient; - -pub type HealthyVolumes = Arc>>; - -pub fn spawn_health_checker( - volume_client: VolumeClient, - all_volumes: Vec, - healthy: HealthyVolumes, -) { - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(10)); - loop { - interval.tick().await; - for url in &all_volumes { - let ok = volume_client.check(url).await; - let mut set = healthy.write().await; - if ok { - set.insert(url.clone()); - } else { - if set.remove(url) { - tracing::warn!("Volume {url} is unhealthy"); - } - } - } - } - }); -} diff --git a/src/lib.rs b/src/lib.rs index ba245f6..fa5c1fa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,60 +1,31 @@ -pub mod config; pub mod db; pub mod error; pub mod hasher; -pub mod health; pub mod server; pub mod rebalance; pub mod rebuild; -pub mod volume; -use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::RwLock; -/// Build the axum Router with all state wired up. Returns the router and -/// a handle to the writer (caller must keep it alive). -pub async fn build_app(config: config::Config) -> axum::Router { - let db_path = &config.database.path; +pub struct Args { + pub db_path: String, + pub volumes: Vec, + pub replicas: usize, +} - if let Some(parent) = std::path::Path::new(db_path).parent() { +pub fn build_app(args: &Args) -> axum::Router { + if let Some(parent) = std::path::Path::new(&args.db_path).parent() { std::fs::create_dir_all(parent).unwrap_or_else(|e| { eprintln!("Failed to create database directory: {e}"); std::process::exit(1); }); } - let writer = db::WriterHandle::new(db_path); - - let num_readers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(4); - let reads = db::ReadPool::new(db_path, num_readers); - - let volume_urls = config.volume_urls(); - let ring = Arc::new(RwLock::new(hasher::Ring::new( - &volume_urls, - config.server.virtual_nodes, - ))); - - let volume_client = volume::VolumeClient::new(); - - let healthy_volumes: health::HealthyVolumes = - Arc::new(RwLock::new(HashSet::from_iter(volume_urls.clone()))); - - health::spawn_health_checker( - volume_client.clone(), - volume_urls, - healthy_volumes.clone(), - ); - let state = server::AppState { - writer, - reads, - ring, - volume_client, - healthy_volumes, - config: Arc::new(config), + db: db::Db::new(&args.db_path), + volumes: Arc::new(args.volumes.clone()), + replicas: args.replicas, + http: reqwest::Client::new(), }; axum::Router::new() diff --git a/src/main.rs b/src/main.rs index e337246..9e72269 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,16 @@ use clap::{Parser, Subcommand}; -use std::path::PathBuf; #[derive(Parser)] #[command(name = "mkv", about = "Distributed key-value store")] struct Cli { - #[arg(short, long, default_value = "config.toml")] - config: PathBuf, + #[arg(short, long, default_value = "/tmp/mkv/index.db")] + db: String, + + #[arg(short, long, required = true, value_delimiter = ',')] + volumes: Vec, + + #[arg(short, long, default_value_t = 2)] + replicas: usize, #[command(subcommand)] command: Commands, @@ -14,7 +19,10 @@ struct Cli { #[derive(Subcommand)] enum Commands { /// Start the index server - Serve, + Serve { + #[arg(short, long, default_value_t = 3000)] + port: u16, + }, /// Rebuild SQLite index from volume servers Rebuild, /// Rebalance data after adding/removing volumes @@ -27,28 +35,27 @@ enum Commands { #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); - let cli = Cli::parse(); - let config = mkv::config::Config::load(&cli.config).unwrap_or_else(|e| { - eprintln!("Failed to load config: {e}"); - std::process::exit(1); - }); + + let args = mkv::Args { + db_path: cli.db, + volumes: cli.volumes, + replicas: cli.replicas, + }; match cli.command { - Commands::Serve => { - let port = config.server.port; - let app = mkv::build_app(config).await; - + Commands::Serve { port } => { + let app = mkv::build_app(&args); let addr = format!("0.0.0.0:{port}"); tracing::info!("Listening on {addr}"); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); axum::serve(listener, app).await.unwrap(); } Commands::Rebuild => { - mkv::rebuild::run(&config).await; + mkv::rebuild::run(&args).await; } Commands::Rebalance { dry_run } => { - mkv::rebalance::run(&config, dry_run).await; + mkv::rebalance::run(&args, dry_run).await; } } } diff --git a/src/rebalance.rs b/src/rebalance.rs index bcba766..e214f15 100644 --- a/src/rebalance.rs +++ b/src/rebalance.rs @@ -1,9 +1,6 @@ -use crate::config::Config; use crate::db; -use crate::hasher::Ring; -use crate::volume::VolumeClient; +use crate::Args; -/// What needs to happen to a single key during rebalance. pub struct KeyMove { pub key: String, pub size: Option, @@ -13,26 +10,12 @@ pub struct KeyMove { pub to_remove: Vec, } -/// Pure: compute the diff between current and desired placement for all keys. -pub fn plan_rebalance( - records: &[db::Record], - ring: &Ring, - replication: usize, -) -> Vec { +pub fn plan_rebalance(records: &[db::Record], volumes: &[String], replication: usize) -> Vec { let mut moves = Vec::new(); for record in records { - let desired = ring.get_volumes(&record.key, replication); - let to_add: Vec = desired - .iter() - .filter(|v| !record.volumes.contains(v)) - .cloned() - .collect(); - let to_remove: Vec = record - .volumes - .iter() - .filter(|v| !desired.contains(v)) - .cloned() - .collect(); + let desired = crate::hasher::volumes_for_key(&record.key, volumes, replication); + let to_add: Vec = desired.iter().filter(|v| !record.volumes.contains(v)).cloned().collect(); + let to_remove: Vec = record.volumes.iter().filter(|v| !desired.contains(v)).cloned().collect(); if !to_add.is_empty() || !to_remove.is_empty() { moves.push(KeyMove { @@ -48,22 +31,10 @@ pub fn plan_rebalance( moves } -pub async fn run(config: &Config, dry_run: bool) { - let db_path = &config.database.path; - let replication = config.server.replication_factor; - - // Open DB read-only to plan - let conn = rusqlite::Connection::open_with_flags( - db_path, - rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY, - ) - .expect("failed to open database"); - - let records = db::all_records(&conn).expect("failed to read records"); - drop(conn); - - let ring = Ring::new(&config.volume_urls(), config.server.virtual_nodes); - let moves = plan_rebalance(&records, &ring, replication); +pub async fn run(args: &Args, dry_run: bool) { + let db = db::Db::new(&args.db_path); + let records = db.all_records_sync().expect("failed to read records"); + let moves = plan_rebalance(&records, &args.volumes, args.replicas); if moves.is_empty() { eprintln!("Nothing to rebalance — all keys are already correctly placed."); @@ -71,44 +42,40 @@ pub async fn run(config: &Config, dry_run: bool) { } let total_bytes: i64 = moves.iter().filter_map(|m| m.size).sum(); - eprintln!( - "{} keys to move ({} bytes)", - moves.len(), - total_bytes - ); + eprintln!("{} keys to move ({} bytes)", moves.len(), total_bytes); if dry_run { for m in &moves { - eprintln!( - " {} : add {:?}, remove {:?}", - m.key, m.to_add, m.to_remove - ); + eprintln!(" {} : add {:?}, remove {:?}", m.key, m.to_add, m.to_remove); } return; } - // Open writer for updates - let writer = db::WriterHandle::new(db_path); - - let client = VolumeClient::new(); + let client = reqwest::Client::new(); let mut moved = 0; let mut errors = 0; for m in &moves { - // Pick a source volume to copy from (any current volume) let src = &m.current_volumes[0]; - - // Copy to new volumes let mut copy_ok = true; + for dst in &m.to_add { - match client.get(src, &m.key).await { - Ok(data) => { - if let Err(e) = client.put(dst, &m.key, data).await { + let src_url = format!("{src}/{}", m.key); + match client.get(&src_url).send().await { + Ok(resp) if resp.status().is_success() => { + let data = resp.bytes().await.unwrap(); + let dst_url = format!("{dst}/{}", m.key); + if let Err(e) = client.put(&dst_url).body(data).send().await { eprintln!(" ERROR copy {} to {}: {}", m.key, dst, e); copy_ok = false; errors += 1; } } + Ok(resp) => { + eprintln!(" ERROR read {} from {}: status {}", m.key, src, resp.status()); + copy_ok = false; + errors += 1; + } Err(e) => { eprintln!(" ERROR read {} from {}: {}", m.key, src, e); copy_ok = false; @@ -117,23 +84,16 @@ pub async fn run(config: &Config, dry_run: bool) { } } - if !copy_ok { - continue; // don't update index or delete if copy failed - } + if !copy_ok { continue; } - // Update index with new volume list - writer - .put(m.key.clone(), m.desired_volumes.clone(), m.size) - .await - .expect("failed to update index"); + db.put(m.key.clone(), m.desired_volumes.clone(), m.size).await.expect("failed to update index"); - // Delete from old volumes for old in &m.to_remove { - if let Err(e) = client.delete(old, &m.key).await { + let url = format!("{old}/{}", m.key); + if let Err(e) = client.delete(&url).send().await { eprintln!(" WARN delete {} from {}: {}", m.key, old, e); } } - moved += 1; } @@ -147,58 +107,33 @@ mod tests { #[test] fn test_plan_rebalance_no_change() { let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); - let ring = Ring::new(&volumes, 100); - - // Create records that are already correctly placed let records: Vec = (0..100) .map(|i| { let key = format!("key-{i}"); - let vols = ring.get_volumes(&key, 2); - db::Record { - key, - volumes: vols, - size: Some(100), - } + let vols = crate::hasher::volumes_for_key(&key, &volumes, 2); + db::Record { key, volumes: vols, size: Some(100) } }) .collect(); - let moves = plan_rebalance(&records, &ring, 2); + let moves = plan_rebalance(&records, &volumes, 2); assert!(moves.is_empty()); } #[test] fn test_plan_rebalance_new_volume() { let volumes3: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); - let ring3 = Ring::new(&volumes3, 100); - - // Place keys on 3-volume ring let records: Vec = (0..1000) .map(|i| { let key = format!("key-{i}"); - let vols = ring3.get_volumes(&key, 2); - db::Record { - key, - volumes: vols, - size: Some(100), - } + let vols = crate::hasher::volumes_for_key(&key, &volumes3, 2); + db::Record { key, volumes: vols, size: Some(100) } }) .collect(); - // Build new ring with 4 volumes let volumes4: Vec = (1..=4).map(|i| format!("http://vol{i}")).collect(); - let ring4 = Ring::new(&volumes4, 100); + let moves = plan_rebalance(&records, &volumes4, 2); - let moves = plan_rebalance(&records, &ring4, 2); - - // Some keys should need to move, but not all assert!(!moves.is_empty()); assert!(moves.len() < 800, "too many moves: {}", moves.len()); - - // Every move should involve vol4 (the new volume) - for m in &moves { - let involves_vol4 = m.to_add.iter().any(|v| v == "http://vol4") - || m.to_remove.iter().any(|v| v == "http://vol4"); - assert!(involves_vol4, "move for {} doesn't involve vol4", m.key); - } } } diff --git a/src/rebuild.rs b/src/rebuild.rs index b2d9f9d..1ba67ed 100644 --- a/src/rebuild.rs +++ b/src/rebuild.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; -use crate::config::Config; use crate::db; +use crate::Args; #[derive(serde::Deserialize)] struct NginxEntry { @@ -12,36 +12,20 @@ struct NginxEntry { size: Option, } -/// List all keys on a volume by recursively walking nginx autoindex. async fn list_volume_keys(volume_url: &str) -> Result, String> { let http = reqwest::Client::new(); let mut keys = Vec::new(); - let mut dirs = vec![String::new()]; // start at root + let mut dirs = vec![String::new()]; while let Some(prefix) = dirs.pop() { let url = format!("{volume_url}/{prefix}"); - let resp = http - .get(&url) - .send() - .await - .map_err(|e| format!("GET {url}: {e}"))?; - + let resp = http.get(&url).send().await.map_err(|e| format!("GET {url}: {e}"))?; if !resp.status().is_success() { return Err(format!("GET {url}: status {}", resp.status())); } - - let entries: Vec = resp - .json() - .await - .map_err(|e| format!("parse {url}: {e}"))?; - + let entries: Vec = resp.json().await.map_err(|e| format!("parse {url}: {e}"))?; for entry in entries { - let full_path = if prefix.is_empty() { - entry.name.clone() - } else { - format!("{prefix}{}", entry.name) - }; - + let full_path = if prefix.is_empty() { entry.name.clone() } else { format!("{prefix}{}", entry.name) }; match entry.entry_type.as_str() { "directory" => dirs.push(format!("{full_path}/")), "file" => keys.push((full_path, entry.size.unwrap_or(0))), @@ -49,31 +33,24 @@ async fn list_volume_keys(volume_url: &str) -> Result, String } } } - Ok(keys) } -pub async fn run(config: &Config) { - let db_path = &config.database.path; +pub async fn run(args: &Args) { + let db_path = &args.db_path; - // Ensure parent directory exists if let Some(parent) = std::path::Path::new(db_path).parent() { let _ = std::fs::create_dir_all(parent); } - // Delete old database let _ = std::fs::remove_file(db_path); let _ = std::fs::remove_file(format!("{db_path}-wal")); let _ = std::fs::remove_file(format!("{db_path}-shm")); - let writer = db::WriterHandle::new(db_path); - - let volume_urls = config.volume_urls(); - - // key -> (volumes, size) + let db = db::Db::new(db_path); let mut index: HashMap, i64)> = HashMap::new(); - for vol_url in &volume_urls { + for vol_url in &args.volumes { eprintln!("Scanning {vol_url}..."); match list_volume_keys(vol_url).await { Ok(keys) => { @@ -81,26 +58,15 @@ pub async fn run(config: &Config) { for (key, size) in keys { let entry = index.entry(key).or_insert_with(|| (Vec::new(), size)); entry.0.push(vol_url.clone()); - // Use the largest size seen (they should all match) - if size > entry.1 { - entry.1 = size; - } + if size > entry.1 { entry.1 = size; } } } - Err(e) => { - eprintln!(" Error scanning {vol_url}: {e}"); - } + Err(e) => eprintln!(" Error scanning {vol_url}: {e}"), } } - // Batch insert into SQLite - let records: Vec<(String, Vec, Option)> = index - .into_iter() - .map(|(key, (volumes, size))| (key, volumes, Some(size))) - .collect(); - + let records: Vec<_> = index.into_iter().map(|(k, (v, s))| (k, v, Some(s))).collect(); let count = records.len(); - writer.bulk_put(records).await.expect("bulk_put failed"); - + db.bulk_put(records).await.expect("bulk_put failed"); eprintln!("Rebuilt index with {count} keys"); } diff --git a/src/server.rs b/src/server.rs index 13c7d43..bd9755f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,201 +2,113 @@ use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; -use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::RwLock; -use crate::config::Config; use crate::db; use crate::error::AppError; -use crate::hasher::Ring; -use crate::volume::VolumeClient; #[derive(Clone)] pub struct AppState { - pub writer: db::WriterHandle, - pub reads: db::ReadPool, - pub ring: Arc>, - pub volume_client: VolumeClient, - pub healthy_volumes: Arc>>, - pub config: Arc, + pub db: db::Db, + pub volumes: Arc>, + pub replicas: usize, + pub http: reqwest::Client, } -// --- Pure decision functions --- - -/// Pick the first volume from the record that appears in the healthy set. -fn pick_healthy_volume<'a>( - record_volumes: &'a [String], - healthy: &HashSet, -) -> Option<&'a str> { - record_volumes - .iter() - .find(|v| healthy.contains(v.as_str())) - .map(|v| v.as_str()) -} - -/// Select target volumes for a key, ensuring we have enough for replication. -fn select_volumes(ring: &Ring, key: &str, replication: usize) -> Result, AppError> { - let target_volumes = ring.get_volumes(key, replication); - if target_volumes.len() < replication { - return Err(AppError::VolumeError(format!( - "need {replication} volumes but only {} available", - target_volumes.len() - ))); - } - Ok(target_volumes) -} - -/// Evaluate fan-out results. Returns the list of volumes that succeeded, -/// or the error messages if any failed. -fn evaluate_fanout( - results: Vec>, - volumes: &[String], -) -> Result, Vec> { - let mut succeeded = Vec::new(); - let mut errors = Vec::new(); - for (i, result) in results.into_iter().enumerate() { - match result { - Ok(()) => succeeded.push(volumes[i].clone()), - Err(e) => errors.push(e), - } - } - if errors.is_empty() { - Ok(succeeded) - } else { - Err(errors) - } -} - -// --- Handlers --- - -/// GET /:key — look up key, redirect to a healthy volume pub async fn get_key( State(state): State, Path(key): Path, ) -> Result { - let record = state - .reads - .query({ - let key = key.clone(); - move |conn| db::get(conn, &key) - }) - .await?; - - let healthy = state.healthy_volumes.read().await; - let vol = pick_healthy_volume(&record.volumes, &healthy).ok_or(AppError::NoHealthyVolume)?; + let record = state.db.get(&key).await?; + let vol = &record.volumes[0]; let location = format!("{vol}/{key}"); - - Ok(( - StatusCode::FOUND, - [(axum::http::header::LOCATION, location)], - ) - .into_response()) + Ok((StatusCode::FOUND, [(axum::http::header::LOCATION, location)]).into_response()) } -/// PUT /:key — store blob on volumes, record in index pub async fn put_key( State(state): State, Path(key): Path, body: Bytes, ) -> Result { - let target_volumes = { - let ring = state.ring.read().await; - select_volumes(&ring, &key, state.config.server.replication_factor)? - }; + let target_volumes = crate::hasher::volumes_for_key(&key, &state.volumes, state.replicas); + if target_volumes.len() < state.replicas { + return Err(AppError::VolumeError(format!( + "need {} volumes but only {} available", + state.replicas, + target_volumes.len() + ))); + } // Fan out PUTs to all target volumes concurrently let mut handles = Vec::with_capacity(target_volumes.len()); for vol in &target_volumes { - let client = state.volume_client.clone(); - let vol = vol.clone(); - let key = key.clone(); - let data = body.clone(); - handles.push(tokio::spawn(async move { - client.put(&vol, &key, data).await - })); + let url = format!("{vol}/{key}"); + let handle = tokio::spawn({ + let client = state.http.clone(); + let data = body.clone(); + async move { + let resp = client.put(&url).body(data).send().await.map_err(|e| format!("PUT {url}: {e}"))?; + if !resp.status().is_success() { + return Err(format!("PUT {url}: status {}", resp.status())); + } + Ok(()) + } + }); + handles.push(handle); } - let mut results = Vec::with_capacity(handles.len()); + let mut all_ok = true; for handle in handles { - results.push(handle.await.unwrap()); + if let Err(e) = handle.await.unwrap() { + tracing::error!("PUT to volume failed: {e}"); + all_ok = false; + } } - match evaluate_fanout(results, &target_volumes) { - Ok(succeeded_volumes) => { - let size = Some(body.len() as i64); - state.writer.put(key, succeeded_volumes, size).await?; - Ok(StatusCode::CREATED.into_response()) - } - Err(errors) => { - for e in &errors { - tracing::error!("PUT to volume failed: {e}"); - } - // Rollback: best-effort delete from any volumes that may have succeeded - for vol in &target_volumes { - let _ = state.volume_client.delete(vol, &key).await; - } - Err(AppError::VolumeError( - "not all volume writes succeeded".into(), - )) + if !all_ok { + // Rollback: best-effort delete from volumes + for vol in &target_volumes { + let _ = state.http.delete(format!("{vol}/{key}")).send().await; } + return Err(AppError::VolumeError("not all volume writes succeeded".into())); } + + let size = Some(body.len() as i64); + state.db.put(key, target_volumes, size).await?; + Ok(StatusCode::CREATED.into_response()) } -/// DELETE /:key — remove from volumes and index pub async fn delete_key( State(state): State, Path(key): Path, ) -> Result { - let record = state - .reads - .query({ - let key = key.clone(); - move |conn| db::get(conn, &key) - }) - .await?; + let record = state.db.get(&key).await?; - // Fan out DELETEs concurrently let mut handles = Vec::new(); for vol in &record.volumes { - let client = state.volume_client.clone(); - let vol = vol.clone(); - let key = key.clone(); - handles.push(tokio::spawn( - async move { client.delete(&vol, &key).await }, - )); + let url = format!("{vol}/{key}"); + let client = state.http.clone(); + handles.push(tokio::spawn(async move { client.delete(&url).send().await })); } - for handle in handles { if let Err(e) = handle.await.unwrap() { tracing::error!("DELETE from volume failed: {e}"); } } - // Remove from index regardless of volume DELETE results - state.writer.delete(key).await?; - + state.db.delete(key).await?; Ok(StatusCode::NO_CONTENT.into_response()) } -/// HEAD /:key — check if key exists, return size pub async fn head_key( State(state): State, Path(key): Path, ) -> Result { - let record = state - .reads - .query(move |conn| db::get(conn, &key)) - .await?; - + let record = state.db.get(&key).await?; let mut headers = HeaderMap::new(); if let Some(size) = record.size { - headers.insert( - axum::http::header::CONTENT_LENGTH, - size.to_string().parse().unwrap(), - ); + headers.insert(axum::http::header::CONTENT_LENGTH, size.to_string().parse().unwrap()); } - Ok((StatusCode::OK, headers).into_response()) } @@ -206,89 +118,27 @@ pub struct ListQuery { pub prefix: String, } -/// GET / — list keys with optional prefix filter pub async fn list_keys( State(state): State, Query(query): Query, ) -> Result { - let keys = state - .reads - .query(move |conn| db::list_keys(conn, &query.prefix)) - .await?; - - let body = keys.join("\n"); - Ok((StatusCode::OK, body).into_response()) + let keys = state.db.list_keys(&query.prefix).await?; + Ok((StatusCode::OK, keys.join("\n")).into_response()) } -// --- Tests for pure functions --- - #[cfg(test)] mod tests { - use super::*; - #[test] - fn test_pick_healthy_volume_first_match() { - let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; - let healthy: HashSet = ["http://vol2".into(), "http://vol3".into()].into(); - - assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol2")); - } - - #[test] - fn test_pick_healthy_volume_none_healthy() { - let volumes = vec!["http://vol1".into(), "http://vol2".into()]; - let healthy: HashSet = HashSet::new(); - - assert_eq!(pick_healthy_volume(&volumes, &healthy), None); - } - - #[test] - fn test_pick_healthy_volume_all_healthy() { - let volumes = vec!["http://vol1".into(), "http://vol2".into()]; - let healthy: HashSet = ["http://vol1".into(), "http://vol2".into()].into(); - - assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol1")); - } - - #[test] - fn test_select_volumes_sufficient() { + fn test_volumes_for_key_sufficient() { let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); - let ring = Ring::new(&volumes, 50); - - let selected = select_volumes(&ring, "test-key", 2).unwrap(); + let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); assert_eq!(selected.len(), 2); } #[test] - fn test_select_volumes_insufficient() { + fn test_volumes_for_key_insufficient() { let volumes: Vec = vec!["http://vol1".into()]; - let ring = Ring::new(&volumes, 50); - - assert!(select_volumes(&ring, "test-key", 2).is_err()); - } - - #[test] - fn test_evaluate_fanout_all_ok() { - let volumes = vec!["http://vol1".into(), "http://vol2".into()]; - let results = vec![Ok(()), Ok(())]; - - assert_eq!(evaluate_fanout(results, &volumes).unwrap(), volumes); - } - - #[test] - fn test_evaluate_fanout_partial_failure() { - let volumes = vec!["http://vol1".into(), "http://vol2".into()]; - let results = vec![Ok(()), Err("connection refused".into())]; - - let errors = evaluate_fanout(results, &volumes).unwrap_err(); - assert_eq!(errors, vec!["connection refused"]); - } - - #[test] - fn test_evaluate_fanout_all_fail() { - let volumes = vec!["http://vol1".into(), "http://vol2".into()]; - let results = vec![Err("err1".into()), Err("err2".into())]; - - assert_eq!(evaluate_fanout(results, &volumes).unwrap_err().len(), 2); + let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); + assert_eq!(selected.len(), 1); } } diff --git a/src/volume.rs b/src/volume.rs deleted file mode 100644 index 8a1aa1c..0000000 --- a/src/volume.rs +++ /dev/null @@ -1,88 +0,0 @@ -use bytes::Bytes; -use std::time::Duration; - -#[derive(Clone)] -pub struct VolumeClient { - client: reqwest::Client, -} - -impl VolumeClient { - pub fn new() -> Self { - let client = reqwest::Client::builder() - .connect_timeout(Duration::from_secs(2)) - .timeout(Duration::from_secs(30)) - .pool_max_idle_per_host(10) - .build() - .expect("failed to build HTTP client"); - Self { client } - } - - /// PUT a blob to a volume server at /{key}. - pub async fn put(&self, volume_url: &str, key: &str, data: Bytes) -> Result<(), String> { - let url = format!("{volume_url}/{key}"); - let resp = self - .client - .put(&url) - .body(data) - .send() - .await - .map_err(|e| format!("PUT {url}: {e}"))?; - - if !resp.status().is_success() { - return Err(format!("PUT {url}: status {}", resp.status())); - } - - Ok(()) - } - - /// GET a blob from a volume server. - pub async fn get(&self, volume_url: &str, key: &str) -> Result { - let url = format!("{volume_url}/{key}"); - let resp = self - .client - .get(&url) - .send() - .await - .map_err(|e| format!("GET {url}: {e}"))?; - - if resp.status() == reqwest::StatusCode::NOT_FOUND { - return Err(format!("GET {url}: not found")); - } - if !resp.status().is_success() { - return Err(format!("GET {url}: status {}", resp.status())); - } - - resp.bytes() - .await - .map_err(|e| format!("GET {url} body: {e}")) - } - - /// DELETE a blob from a volume server. - pub async fn delete(&self, volume_url: &str, key: &str) -> Result<(), String> { - let url = format!("{volume_url}/{key}"); - let resp = self - .client - .delete(&url) - .send() - .await - .map_err(|e| format!("DELETE {url}: {e}"))?; - - // 404 is fine — already gone - if !resp.status().is_success() && resp.status() != reqwest::StatusCode::NOT_FOUND { - return Err(format!("DELETE {url}: status {}", resp.status())); - } - - Ok(()) - } - - /// Health check a volume server. - pub async fn check(&self, volume_url: &str) -> bool { - let url = format!("{volume_url}/"); - self.client - .head(&url) - .timeout(Duration::from_secs(2)) - .send() - .await - .is_ok_and(|r| r.status().is_success()) - } -} diff --git a/tests/integration.rs b/tests/integration.rs index fce1057..7473358 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,37 +1,35 @@ use reqwest::StatusCode; -use std::path::Path; use std::sync::atomic::{AtomicU32, Ordering}; static TEST_COUNTER: AtomicU32 = AtomicU32::new(0); -/// Start the mkv server in-process on a random port with its own DB. -/// Returns the base URL (e.g. "http://localhost:12345"). async fn start_server() -> String { let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed); let db_path = format!("/tmp/mkv-test/index-{id}.db"); - // Clean up any previous test database let _ = std::fs::remove_file(&db_path); let _ = std::fs::remove_file(format!("{db_path}-wal")); let _ = std::fs::remove_file(format!("{db_path}-shm")); - let mut config = - mkv::config::Config::load(Path::new("tests/test_config.toml")).expect("load test config"); - config.database.path = db_path; + let args = mkv::Args { + db_path, + volumes: vec![ + "http://localhost:3101".into(), + "http://localhost:3102".into(), + "http://localhost:3103".into(), + ], + replicas: 2, + }; - // Bind to port 0 to get a random available port let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let port = listener.local_addr().unwrap().port(); - - let app = mkv::build_app(config).await; + let app = mkv::build_app(&args); tokio::spawn(async move { axum::serve(listener, app).await.unwrap(); }); - // Give the server a moment to start tokio::time::sleep(std::time::Duration::from_millis(50)).await; - format!("http://127.0.0.1:{port}") } @@ -47,26 +45,12 @@ async fn test_put_and_head() { let base = start_server().await; let c = client(); - // PUT a key - let resp = c - .put(format!("{base}/hello")) - .body("world") - .send() - .await - .unwrap(); + let resp = c.put(format!("{base}/hello")).body("world").send().await.unwrap(); assert_eq!(resp.status(), StatusCode::CREATED); - // HEAD should return 200 with content-length let resp = c.head(format!("{base}/hello")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - resp.headers() - .get("content-length") - .unwrap() - .to_str() - .unwrap(), - "5" - ); + assert_eq!(resp.headers().get("content-length").unwrap().to_str().unwrap(), "5"); } #[tokio::test] @@ -74,26 +58,15 @@ async fn test_put_and_get_redirect() { let base = start_server().await; let c = client(); - // PUT - let resp = c - .put(format!("{base}/redirect-test")) - .body("some data") - .send() - .await - .unwrap(); + let resp = c.put(format!("{base}/redirect-test")).body("some data").send().await.unwrap(); assert_eq!(resp.status(), StatusCode::CREATED); - // GET should return 302 with Location header pointing to a volume let resp = c.get(format!("{base}/redirect-test")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::FOUND); let location = resp.headers().get("location").unwrap().to_str().unwrap(); - assert!( - location.starts_with("http://localhost:310"), - "location should point to a volume, got: {location}" - ); + assert!(location.starts_with("http://localhost:310"), "got: {location}"); - // Follow the redirect manually and verify the blob content let blob_resp = reqwest::get(location).await.unwrap(); assert_eq!(blob_resp.status(), StatusCode::OK); assert_eq!(blob_resp.text().await.unwrap(), "some data"); @@ -103,7 +76,6 @@ async fn test_put_and_get_redirect() { async fn test_get_nonexistent_returns_404() { let base = start_server().await; let c = client(); - let resp = c.get(format!("{base}/does-not-exist")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); } @@ -113,24 +85,15 @@ async fn test_put_get_delete_get() { let base = start_server().await; let c = client(); - // PUT - let resp = c - .put(format!("{base}/delete-me")) - .body("temporary") - .send() - .await - .unwrap(); + let resp = c.put(format!("{base}/delete-me")).body("temporary").send().await.unwrap(); assert_eq!(resp.status(), StatusCode::CREATED); - // GET → 302 let resp = c.get(format!("{base}/delete-me")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::FOUND); - // DELETE → 204 let resp = c.delete(format!("{base}/delete-me")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); - // GET after delete → 404 let resp = c.get(format!("{base}/delete-me")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); } @@ -139,12 +102,7 @@ async fn test_put_get_delete_get() { async fn test_delete_nonexistent_returns_404() { let base = start_server().await; let c = client(); - - let resp = c - .delete(format!("{base}/never-existed")) - .send() - .await - .unwrap(); + let resp = c.delete(format!("{base}/never-existed")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND); } @@ -153,28 +111,17 @@ async fn test_list_keys() { let base = start_server().await; let c = client(); - // PUT a few keys with a common prefix for name in ["docs/a", "docs/b", "docs/c", "other/x"] { - c.put(format!("{base}/{name}")) - .body("data") - .send() - .await - .unwrap(); + c.put(format!("{base}/{name}")).body("data").send().await.unwrap(); } - // List all let resp = c.get(format!("{base}/")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); let body = resp.text().await.unwrap(); assert!(body.contains("docs/a")); assert!(body.contains("other/x")); - // List with prefix filter - let resp = c - .get(format!("{base}/?prefix=docs/")) - .send() - .await - .unwrap(); + let resp = c.get(format!("{base}/?prefix=docs/")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); let body = resp.text().await.unwrap(); let lines: Vec<&str> = body.lines().collect(); @@ -187,34 +134,14 @@ async fn test_put_overwrite() { let base = start_server().await; let c = client(); - // PUT v1 - c.put(format!("{base}/overwrite")) - .body("version1") - .send() - .await - .unwrap(); + c.put(format!("{base}/overwrite")).body("version1").send().await.unwrap(); - // PUT v2 (same key) - let resp = c - .put(format!("{base}/overwrite")) - .body("version2") - .send() - .await - .unwrap(); + let resp = c.put(format!("{base}/overwrite")).body("version2").send().await.unwrap(); assert_eq!(resp.status(), StatusCode::CREATED); - // HEAD should reflect new size let resp = c.head(format!("{base}/overwrite")).send().await.unwrap(); - assert_eq!( - resp.headers() - .get("content-length") - .unwrap() - .to_str() - .unwrap(), - "8" - ); + assert_eq!(resp.headers().get("content-length").unwrap().to_str().unwrap(), "8"); - // Follow redirect, verify content is v2 let resp = c.get(format!("{base}/overwrite")).send().await.unwrap(); let location = resp.headers().get("location").unwrap().to_str().unwrap(); let body = reqwest::get(location).await.unwrap().text().await.unwrap(); @@ -226,27 +153,12 @@ async fn test_replication_writes_to_multiple_volumes() { let base = start_server().await; let c = client(); - // PUT a key (replication_factor=2 in test config) - c.put(format!("{base}/replicated")) - .body("replica-data") - .send() - .await - .unwrap(); + c.put(format!("{base}/replicated")).body("replica-data").send().await.unwrap(); - // HEAD to confirm it exists - let resp = c - .head(format!("{base}/replicated")) - .send() - .await - .unwrap(); + let resp = c.head(format!("{base}/replicated")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); - // GET and verify the blob is accessible - let resp = c - .get(format!("{base}/replicated")) - .send() - .await - .unwrap(); + let resp = c.get(format!("{base}/replicated")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::FOUND); let location = resp.headers().get("location").unwrap().to_str().unwrap(); let body = reqwest::get(location).await.unwrap().text().await.unwrap();