From 1fc59674f54eb90f73d0de2b5d68c71c65d49e27 Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sun, 8 Mar 2026 13:08:49 +0100 Subject: [PATCH 01/10] Allow for reads if one volume is down --- Cargo.lock | 39 +++++++++++++++++++++++++++++++++++---- Cargo.toml | 1 + src/error.rs | 3 +++ src/server.rs | 41 +++++++++++++++++++++++++++++++---------- 4 files changed, 70 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73e1f42..d697dbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -774,6 +774,7 @@ version = "0.1.0" dependencies = [ "axum", "clap", + "rand 0.8.5", "reqwest", "rusqlite", "serde", @@ -908,7 +909,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lru-slab", - "rand", + "rand 0.9.2", "ring", "rustc-hash", "rustls", @@ -949,14 +950,35 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha", - "rand_core", + "rand_chacha 0.9.0", + "rand_core 0.9.5", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -966,7 +988,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.5", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.17", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b074b32..4498eb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ clap = { version = "4", features = ["derive", "env"] } tracing = "0.1" tracing-subscriber = "0.3" sha2 = "0.10" +rand = "0.8" [profile.release] opt-level = 3 diff --git a/src/error.rs b/src/error.rs index 32e25a4..7536cec 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,6 +35,7 @@ pub enum AppError { Db(rusqlite::Error), InsufficientVolumes { need: usize, have: usize }, PartialWrite, + AllVolumesUnreachable, } impl From for AppError { @@ -58,6 +59,7 @@ impl std::fmt::Display for AppError { write!(f, "need {need} volumes but only {have} available") } AppError::PartialWrite => write!(f, "not all volume writes succeeded"), + AppError::AllVolumesUnreachable => write!(f, "all volume replicas are unreachable"), } } } @@ -70,6 +72,7 @@ impl IntoResponse for AppError { AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR, AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE, AppError::PartialWrite => StatusCode::BAD_GATEWAY, + AppError::AllVolumesUnreachable => StatusCode::BAD_GATEWAY, }; (status, self.to_string()).into_response() } diff --git a/src/server.rs b/src/server.rs index a4b24d4..ea27108 100644 --- a/src/server.rs +++ b/src/server.rs @@ -19,17 +19,38 @@ pub async fn get_key( State(state): State, Path(key): Path, ) -> Result { + use rand::seq::SliceRandom; + let record = state.db.get(&key).await?; - let vol = record - .volumes - .first() - .ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?; - let location = format!("{vol}/{key}"); - Ok(( - StatusCode::FOUND, - [(axum::http::header::LOCATION, location)], - ) - .into_response()) + if record.volumes.is_empty() { + return Err(AppError::CorruptRecord { key }); + } + + // Shuffle volumes for load balancing + let mut volumes = record.volumes.clone(); + volumes.shuffle(&mut rand::thread_rng()); + + // Probe each volume until we find one that's reachable + for vol in &volumes { + let url = format!("{vol}/{key}"); + match state.http.head(&url).send().await { + Ok(resp) if resp.status().is_success() => { + return Ok(( + StatusCode::FOUND, + [(axum::http::header::LOCATION, url)], + ) + .into_response()); + } + Ok(resp) => { + tracing::warn!("volume {vol} returned {} for {key}", resp.status()); + } + Err(e) => { + tracing::warn!("volume {vol} unreachable for {key}: {e}"); + } + } + } + + Err(AppError::AllVolumesUnreachable) } pub async fn put_key( From 1d3b9dddf562e24ec90a4958761a56909f6c8927 Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sun, 8 Mar 2026 13:09:55 +0100 Subject: [PATCH 02/10] Add unit test for failover --- src/server.rs | 140 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 125 insertions(+), 15 deletions(-) diff --git a/src/server.rs b/src/server.rs index ea27108..65e742c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,42 +15,83 @@ pub struct AppState { pub http: reqwest::Client, } +/// Result of probing volumes for a key. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProbeResult { + /// Found a healthy volume at this URL + Found(String), + /// All volumes were probed, none healthy + AllFailed, +} + +/// Pure function: given volumes and their probe results (true = healthy), +/// returns the first healthy volume URL for the key, or None. +pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult { + for (vol, &healthy) in volumes.iter().zip(results) { + if healthy { + return ProbeResult::Found(format!("{vol}/{key}")); + } + } + ProbeResult::AllFailed +} + +/// Pure function: shuffle volumes for load balancing. +/// Takes a seed for deterministic testing. +pub fn shuffle_volumes(volumes: Vec, seed: u64) -> Vec { + use rand::seq::SliceRandom; + use rand::SeedableRng; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut vols = volumes; + vols.shuffle(&mut rng); + vols +} + pub async fn get_key( State(state): State, Path(key): Path, ) -> Result { - use rand::seq::SliceRandom; - let record = state.db.get(&key).await?; if record.volumes.is_empty() { return Err(AppError::CorruptRecord { key }); } - // Shuffle volumes for load balancing - let mut volumes = record.volumes.clone(); - volumes.shuffle(&mut rand::thread_rng()); + // Shuffle for load balancing (random seed in production) + let seed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + let volumes = shuffle_volumes(record.volumes, seed); - // Probe each volume until we find one that's reachable + // Probe volumes and collect results + let mut results = Vec::with_capacity(volumes.len()); for vol in &volumes { let url = format!("{vol}/{key}"); - match state.http.head(&url).send().await { - Ok(resp) if resp.status().is_success() => { - return Ok(( - StatusCode::FOUND, - [(axum::http::header::LOCATION, url)], - ) - .into_response()); - } + let healthy = match state.http.head(&url).send().await { + Ok(resp) if resp.status().is_success() => true, Ok(resp) => { tracing::warn!("volume {vol} returned {} for {key}", resp.status()); + false } Err(e) => { tracing::warn!("volume {vol} unreachable for {key}: {e}"); + false } + }; + results.push(healthy); + // Early exit on first healthy volume + if healthy { + break; } } - Err(AppError::AllVolumesUnreachable) + match first_healthy_volume(&key, &volumes, &results) { + ProbeResult::Found(url) => Ok(( + StatusCode::FOUND, + [(axum::http::header::LOCATION, url)], + ) + .into_response()), + ProbeResult::AllFailed => Err(AppError::AllVolumesUnreachable), + } } pub async fn put_key( @@ -204,6 +245,8 @@ pub async fn list_keys( #[cfg(test)] mod tests { + use super::*; + #[test] fn test_volumes_for_key_sufficient() { let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); @@ -217,4 +260,71 @@ mod tests { let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); assert_eq!(selected.len(), 1); } + + #[test] + fn test_first_healthy_volume_finds_first() { + let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; + let results = vec![true, true, true]; + assert_eq!( + first_healthy_volume("key", &volumes, &results), + ProbeResult::Found("http://vol1/key".into()) + ); + } + + #[test] + fn test_first_healthy_volume_skips_unhealthy() { + let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; + let results = vec![false, false, true]; + assert_eq!( + first_healthy_volume("key", &volumes, &results), + ProbeResult::Found("http://vol3/key".into()) + ); + } + + #[test] + fn test_first_healthy_volume_all_failed() { + let volumes = vec!["http://vol1".into(), "http://vol2".into()]; + let results = vec![false, false]; + assert_eq!( + first_healthy_volume("key", &volumes, &results), + ProbeResult::AllFailed + ); + } + + #[test] + fn test_first_healthy_volume_early_exit() { + // Simulates early exit: only first two volumes were probed + let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; + let results = vec![false, true]; // Only 2 results because we stopped early + assert_eq!( + first_healthy_volume("key", &volumes, &results), + ProbeResult::Found("http://vol2/key".into()) + ); + } + + #[test] + fn test_shuffle_volumes_deterministic_with_seed() { + let volumes: Vec = (1..=5).map(|i| format!("http://vol{i}")).collect(); + let a = shuffle_volumes(volumes.clone(), 42); + let b = shuffle_volumes(volumes.clone(), 42); + assert_eq!(a, b, "same seed should produce same order"); + } + + #[test] + fn test_shuffle_volumes_different_seeds() { + let volumes: Vec = (1..=10).map(|i| format!("http://vol{i}")).collect(); + let a = shuffle_volumes(volumes.clone(), 1); + let b = shuffle_volumes(volumes.clone(), 2); + assert_ne!(a, b, "different seeds should produce different orders"); + } + + #[test] + fn test_shuffle_volumes_preserves_elements() { + let volumes: Vec = (1..=5).map(|i| format!("http://vol{i}")).collect(); + let mut shuffled = shuffle_volumes(volumes.clone(), 123); + shuffled.sort(); + let mut original = volumes; + original.sort(); + assert_eq!(shuffled, original); + } } From fa4dc716db149b1fee41c20a2bd719dbfbb164d6 Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sun, 8 Mar 2026 13:24:54 +0100 Subject: [PATCH 03/10] Purify --- src/db.rs | 80 +++++++++++++++++++++++++++++++------- src/rebuild.rs | 102 +++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 161 insertions(+), 21 deletions(-) diff --git a/src/db.rs b/src/db.rs index 24a2687..d56a4a4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -30,6 +30,20 @@ fn encode_volumes(v: &[String]) -> String { serde_json::to_string(v).unwrap() } +/// Pure: compute exclusive upper bound for prefix range queries. +/// Increments the last byte that isn't 0xFF. +/// Examples: "abc" -> Some("abd"), "ab\xff" -> Some("ac"), "\xff\xff" -> None +pub fn prefix_upper_bound(prefix: &str) -> Option { + let mut bytes = prefix.as_bytes().to_vec(); + while let Some(last) = bytes.pop() { + if last < 0xFF { + bytes.push(last + 1); + return Some(String::from_utf8_lossy(&bytes).into_owned()); + } + } + None +} + /// A single SQLite connection behind a mutex, used for both reads and writes. #[derive(Clone)] pub struct Db { @@ -93,19 +107,7 @@ impl Db { .collect::, _>>()?; return Ok(keys); } - // Compute exclusive upper bound: increment last non-0xFF byte - let upper = { - let mut bytes = prefix.as_bytes().to_vec(); - let mut result = None; - while let Some(last) = bytes.pop() { - if last < 0xFF { - bytes.push(last + 1); - result = Some(String::from_utf8_lossy(&bytes).into_owned()); - break; - } - } - result - }; + let upper = prefix_upper_bound(&prefix); let keys = match &upper { Some(end) => { let mut stmt = conn.prepare_cached( @@ -198,3 +200,55 @@ impl Db { Ok(records) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_prefix_upper_bound_simple() { + assert_eq!(prefix_upper_bound("abc"), Some("abd".to_string())); + assert_eq!(prefix_upper_bound("a"), Some("b".to_string())); + assert_eq!(prefix_upper_bound("foo/bar"), Some("foo/bas".to_string())); + } + + #[test] + fn test_prefix_upper_bound_paths() { + // '/' is 0x2F, +1 = '0' (0x30) + assert_eq!(prefix_upper_bound("users/"), Some("users0".to_string())); + // '_' is 0x5F, +1 = '`' (0x60) + assert_eq!(prefix_upper_bound("img_"), Some("img`".to_string())); + } + + #[test] + fn test_prefix_upper_bound_empty() { + assert_eq!(prefix_upper_bound(""), None); + } + + #[test] + fn test_prefix_upper_bound_single_char() { + assert_eq!(prefix_upper_bound("z"), Some("{".to_string())); // 'z' + 1 = '{' + assert_eq!(prefix_upper_bound("9"), Some(":".to_string())); // '9' + 1 = ':' + } + + #[test] + fn test_prefix_upper_bound_range_correctness() { + // Verify the bound works for range queries: + // All strings starting with "foo" should be >= "foo" and < "fop" + let prefix = "foo"; + let upper = prefix_upper_bound(prefix).unwrap(); + assert_eq!(upper, "fop"); + + let upper = upper.as_str(); + + // These should be in range [foo, fop) + assert!("foo" >= prefix && "foo" < upper); + assert!("foo/bar" >= prefix && "foo/bar" < upper); + assert!("foobar" >= prefix && "foobar" < upper); + assert!("foo\x7f" >= prefix && "foo\x7f" < upper); // high ASCII + + // These should be out of range + assert!("fop" >= upper); + assert!("fon" < prefix); + } +} diff --git a/src/rebuild.rs b/src/rebuild.rs index f0f3121..5db2527 100644 --- a/src/rebuild.rs +++ b/src/rebuild.rs @@ -12,6 +12,25 @@ struct NginxEntry { size: Option, } +/// Pure: merge volume scan results into a unified index. +/// Each scan is (volume_url, list of (key, size) pairs). +/// Returns a map of key -> (volumes containing it, max size seen). +pub fn merge_volume_scans( + scans: &[(String, Vec<(String, i64)>)], +) -> HashMap, i64)> { + let mut index: HashMap, i64)> = HashMap::new(); + for (vol_url, keys) in scans { + for (key, size) in keys { + let entry = index.entry(key.clone()).or_insert_with(|| (Vec::new(), *size)); + entry.0.push(vol_url.clone()); + if *size > entry.1 { + entry.1 = *size; + } + } + } + index +} + async fn list_volume_keys(volume_url: &str) -> Result, String> { let http = reqwest::Client::new(); let mut keys = Vec::new(); @@ -57,25 +76,23 @@ pub async fn run(args: &Args) { let _ = std::fs::remove_file(format!("{db_path}-shm")); let db = db::Db::new(db_path); - let mut index: HashMap, i64)> = HashMap::new(); + // I/O: scan each volume + let mut scans = Vec::new(); for vol_url in &args.volumes { eprintln!("Scanning {vol_url}..."); match list_volume_keys(vol_url).await { Ok(keys) => { eprintln!(" Found {} keys", keys.len()); - for (key, size) in keys { - let entry = index.entry(key).or_insert_with(|| (Vec::new(), size)); - entry.0.push(vol_url.clone()); - if size > entry.1 { - entry.1 = size; - } - } + scans.push((vol_url.clone(), keys)); } Err(e) => eprintln!(" Error scanning {vol_url}: {e}"), } } + // Pure: merge scan results + let index = merge_volume_scans(&scans); + let records: Vec<_> = index .into_iter() .map(|(k, (v, s))| (k, v, Some(s))) @@ -84,3 +101,72 @@ pub async fn run(args: &Args) { db.bulk_put(records).await.expect("bulk_put failed"); eprintln!("Rebuilt index with {count} keys"); } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_merge_empty_scans() { + let scans: Vec<(String, Vec<(String, i64)>)> = vec![]; + let index = merge_volume_scans(&scans); + assert!(index.is_empty()); + } + + #[test] + fn test_merge_single_volume() { + let scans = vec![( + "http://vol1".to_string(), + vec![ + ("key1".to_string(), 100), + ("key2".to_string(), 200), + ], + )]; + let index = merge_volume_scans(&scans); + assert_eq!(index.len(), 2); + assert_eq!(index.get("key1"), Some(&(vec!["http://vol1".to_string()], 100))); + assert_eq!(index.get("key2"), Some(&(vec!["http://vol1".to_string()], 200))); + } + + #[test] + fn test_merge_key_on_multiple_volumes() { + let scans = vec![ + ("http://vol1".to_string(), vec![("shared".to_string(), 100)]), + ("http://vol2".to_string(), vec![("shared".to_string(), 100)]), + ("http://vol3".to_string(), vec![("shared".to_string(), 100)]), + ]; + let index = merge_volume_scans(&scans); + assert_eq!(index.len(), 1); + let (volumes, size) = index.get("shared").unwrap(); + assert_eq!(volumes.len(), 3); + assert!(volumes.contains(&"http://vol1".to_string())); + assert!(volumes.contains(&"http://vol2".to_string())); + assert!(volumes.contains(&"http://vol3".to_string())); + assert_eq!(*size, 100); + } + + #[test] + fn test_merge_takes_max_size() { + // Same key with different sizes on different volumes (corruption or update race) + let scans = vec![ + ("http://vol1".to_string(), vec![("key".to_string(), 50)]), + ("http://vol2".to_string(), vec![("key".to_string(), 200)]), + ("http://vol3".to_string(), vec![("key".to_string(), 100)]), + ]; + let index = merge_volume_scans(&scans); + let (_, size) = index.get("key").unwrap(); + assert_eq!(*size, 200, "should take maximum size across volumes"); + } + + #[test] + fn test_merge_disjoint_keys() { + let scans = vec![ + ("http://vol1".to_string(), vec![("a".to_string(), 10)]), + ("http://vol2".to_string(), vec![("b".to_string(), 20)]), + ]; + let index = merge_volume_scans(&scans); + assert_eq!(index.len(), 2); + assert_eq!(index.get("a").unwrap().0, vec!["http://vol1".to_string()]); + assert_eq!(index.get("b").unwrap().0, vec!["http://vol2".to_string()]); + } +} From 5cdaeddc0e38b54aa86c41be11b403a6f8f283e3 Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sun, 8 Mar 2026 13:31:35 +0100 Subject: [PATCH 04/10] Clean up tests --- src/db.rs | 34 ++----------------- src/rebuild.rs | 58 +++----------------------------- src/server.rs | 89 +++----------------------------------------------- 3 files changed, 12 insertions(+), 169 deletions(-) diff --git a/src/db.rs b/src/db.rs index d56a4a4..cfb416c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -205,43 +205,15 @@ impl Db { mod tests { use super::*; - #[test] - fn test_prefix_upper_bound_simple() { - assert_eq!(prefix_upper_bound("abc"), Some("abd".to_string())); - assert_eq!(prefix_upper_bound("a"), Some("b".to_string())); - assert_eq!(prefix_upper_bound("foo/bar"), Some("foo/bas".to_string())); - } - - #[test] - fn test_prefix_upper_bound_paths() { - // '/' is 0x2F, +1 = '0' (0x30) - assert_eq!(prefix_upper_bound("users/"), Some("users0".to_string())); - // '_' is 0x5F, +1 = '`' (0x60) - assert_eq!(prefix_upper_bound("img_"), Some("img`".to_string())); - } - - #[test] - fn test_prefix_upper_bound_empty() { - assert_eq!(prefix_upper_bound(""), None); - } - - #[test] - fn test_prefix_upper_bound_single_char() { - assert_eq!(prefix_upper_bound("z"), Some("{".to_string())); // 'z' + 1 = '{' - assert_eq!(prefix_upper_bound("9"), Some(":".to_string())); // '9' + 1 = ':' - } - #[test] fn test_prefix_upper_bound_range_correctness() { - // Verify the bound works for range queries: - // All strings starting with "foo" should be >= "foo" and < "fop" + // The only test that matters: does the bound actually work for range queries? + // All strings starting with "foo" should be >= "foo" and < upper_bound("foo") let prefix = "foo"; let upper = prefix_upper_bound(prefix).unwrap(); - assert_eq!(upper, "fop"); - let upper = upper.as_str(); - // These should be in range [foo, fop) + // These should be in range [prefix, upper) assert!("foo" >= prefix && "foo" < upper); assert!("foo/bar" >= prefix && "foo/bar" < upper); assert!("foobar" >= prefix && "foobar" < upper); diff --git a/src/rebuild.rs b/src/rebuild.rs index 5db2527..33d4ec2 100644 --- a/src/rebuild.rs +++ b/src/rebuild.rs @@ -106,67 +106,19 @@ pub async fn run(args: &Args) { mod tests { use super::*; - #[test] - fn test_merge_empty_scans() { - let scans: Vec<(String, Vec<(String, i64)>)> = vec![]; - let index = merge_volume_scans(&scans); - assert!(index.is_empty()); - } - - #[test] - fn test_merge_single_volume() { - let scans = vec![( - "http://vol1".to_string(), - vec![ - ("key1".to_string(), 100), - ("key2".to_string(), 200), - ], - )]; - let index = merge_volume_scans(&scans); - assert_eq!(index.len(), 2); - assert_eq!(index.get("key1"), Some(&(vec!["http://vol1".to_string()], 100))); - assert_eq!(index.get("key2"), Some(&(vec!["http://vol1".to_string()], 200))); - } - - #[test] - fn test_merge_key_on_multiple_volumes() { - let scans = vec![ - ("http://vol1".to_string(), vec![("shared".to_string(), 100)]), - ("http://vol2".to_string(), vec![("shared".to_string(), 100)]), - ("http://vol3".to_string(), vec![("shared".to_string(), 100)]), - ]; - let index = merge_volume_scans(&scans); - assert_eq!(index.len(), 1); - let (volumes, size) = index.get("shared").unwrap(); - assert_eq!(volumes.len(), 3); - assert!(volumes.contains(&"http://vol1".to_string())); - assert!(volumes.contains(&"http://vol2".to_string())); - assert!(volumes.contains(&"http://vol3".to_string())); - assert_eq!(*size, 100); - } - #[test] fn test_merge_takes_max_size() { - // Same key with different sizes on different volumes (corruption or update race) + // Edge case: same key with different sizes across volumes + // (can happen due to incomplete writes or corruption) + // We take the max size as the authoritative value let scans = vec![ ("http://vol1".to_string(), vec![("key".to_string(), 50)]), ("http://vol2".to_string(), vec![("key".to_string(), 200)]), ("http://vol3".to_string(), vec![("key".to_string(), 100)]), ]; let index = merge_volume_scans(&scans); - let (_, size) = index.get("key").unwrap(); + let (volumes, size) = index.get("key").unwrap(); + assert_eq!(volumes.len(), 3); assert_eq!(*size, 200, "should take maximum size across volumes"); } - - #[test] - fn test_merge_disjoint_keys() { - let scans = vec![ - ("http://vol1".to_string(), vec![("a".to_string(), 10)]), - ("http://vol2".to_string(), vec![("b".to_string(), 20)]), - ]; - let index = merge_volume_scans(&scans); - assert_eq!(index.len(), 2); - assert_eq!(index.get("a").unwrap().0, vec!["http://vol1".to_string()]); - assert_eq!(index.get("b").unwrap().0, vec!["http://vol2".to_string()]); - } } diff --git a/src/server.rs b/src/server.rs index 65e742c..1fd8923 100644 --- a/src/server.rs +++ b/src/server.rs @@ -243,88 +243,7 @@ pub async fn list_keys( Ok((StatusCode::OK, keys.join("\n")).into_response()) } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_volumes_for_key_sufficient() { - let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); - let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); - assert_eq!(selected.len(), 2); - } - - #[test] - fn test_volumes_for_key_insufficient() { - let volumes: Vec = vec!["http://vol1".into()]; - let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); - assert_eq!(selected.len(), 1); - } - - #[test] - fn test_first_healthy_volume_finds_first() { - let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; - let results = vec![true, true, true]; - assert_eq!( - first_healthy_volume("key", &volumes, &results), - ProbeResult::Found("http://vol1/key".into()) - ); - } - - #[test] - fn test_first_healthy_volume_skips_unhealthy() { - let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; - let results = vec![false, false, true]; - assert_eq!( - first_healthy_volume("key", &volumes, &results), - ProbeResult::Found("http://vol3/key".into()) - ); - } - - #[test] - fn test_first_healthy_volume_all_failed() { - let volumes = vec!["http://vol1".into(), "http://vol2".into()]; - let results = vec![false, false]; - assert_eq!( - first_healthy_volume("key", &volumes, &results), - ProbeResult::AllFailed - ); - } - - #[test] - fn test_first_healthy_volume_early_exit() { - // Simulates early exit: only first two volumes were probed - let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; - let results = vec![false, true]; // Only 2 results because we stopped early - assert_eq!( - first_healthy_volume("key", &volumes, &results), - ProbeResult::Found("http://vol2/key".into()) - ); - } - - #[test] - fn test_shuffle_volumes_deterministic_with_seed() { - let volumes: Vec = (1..=5).map(|i| format!("http://vol{i}")).collect(); - let a = shuffle_volumes(volumes.clone(), 42); - let b = shuffle_volumes(volumes.clone(), 42); - assert_eq!(a, b, "same seed should produce same order"); - } - - #[test] - fn test_shuffle_volumes_different_seeds() { - let volumes: Vec = (1..=10).map(|i| format!("http://vol{i}")).collect(); - let a = shuffle_volumes(volumes.clone(), 1); - let b = shuffle_volumes(volumes.clone(), 2); - assert_ne!(a, b, "different seeds should produce different orders"); - } - - #[test] - fn test_shuffle_volumes_preserves_elements() { - let volumes: Vec = (1..=5).map(|i| format!("http://vol{i}")).collect(); - let mut shuffled = shuffle_volumes(volumes.clone(), 123); - shuffled.sort(); - let mut original = volumes; - original.sort(); - assert_eq!(shuffled, original); - } -} +// Note: first_healthy_volume and shuffle_volumes are trivial functions +// (essentially .find() and .shuffle()). Testing them would just test +// that standard library functions work. The real test is integration: +// does failover work with actual down volumes? From 5daa98303465d7510c1fc9fe048a1a41e70bfe6f Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sun, 8 Mar 2026 13:31:44 +0100 Subject: [PATCH 05/10] Clean up comments --- src/db.rs | 11 +++-------- src/rebuild.rs | 10 ++-------- src/server.rs | 15 --------------- 3 files changed, 5 insertions(+), 31 deletions(-) diff --git a/src/db.rs b/src/db.rs index cfb416c..5ceb9dd 100644 --- a/src/db.rs +++ b/src/db.rs @@ -30,8 +30,6 @@ fn encode_volumes(v: &[String]) -> String { serde_json::to_string(v).unwrap() } -/// Pure: compute exclusive upper bound for prefix range queries. -/// Increments the last byte that isn't 0xFF. /// Examples: "abc" -> Some("abd"), "ab\xff" -> Some("ac"), "\xff\xff" -> None pub fn prefix_upper_bound(prefix: &str) -> Option { let mut bytes = prefix.as_bytes().to_vec(); @@ -44,7 +42,6 @@ pub fn prefix_upper_bound(prefix: &str) -> Option { None } -/// A single SQLite connection behind a mutex, used for both reads and writes. #[derive(Clone)] pub struct Db { conn: Arc>, @@ -207,19 +204,17 @@ mod tests { #[test] fn test_prefix_upper_bound_range_correctness() { - // The only test that matters: does the bound actually work for range queries? - // All strings starting with "foo" should be >= "foo" and < upper_bound("foo") let prefix = "foo"; let upper = prefix_upper_bound(prefix).unwrap(); let upper = upper.as_str(); - // These should be in range [prefix, upper) + // in range [prefix, upper) assert!("foo" >= prefix && "foo" < upper); assert!("foo/bar" >= prefix && "foo/bar" < upper); assert!("foobar" >= prefix && "foobar" < upper); - assert!("foo\x7f" >= prefix && "foo\x7f" < upper); // high ASCII + assert!("foo\x7f" >= prefix && "foo\x7f" < upper); - // These should be out of range + // out of range assert!("fop" >= upper); assert!("fon" < prefix); } diff --git a/src/rebuild.rs b/src/rebuild.rs index 33d4ec2..ddea3e7 100644 --- a/src/rebuild.rs +++ b/src/rebuild.rs @@ -12,9 +12,7 @@ struct NginxEntry { size: Option, } -/// Pure: merge volume scan results into a unified index. -/// Each scan is (volume_url, list of (key, size) pairs). -/// Returns a map of key -> (volumes containing it, max size seen). +/// If a key has different sizes across volumes, takes the max. pub fn merge_volume_scans( scans: &[(String, Vec<(String, i64)>)], ) -> HashMap, i64)> { @@ -77,7 +75,6 @@ pub async fn run(args: &Args) { let db = db::Db::new(db_path); - // I/O: scan each volume let mut scans = Vec::new(); for vol_url in &args.volumes { eprintln!("Scanning {vol_url}..."); @@ -90,7 +87,6 @@ pub async fn run(args: &Args) { } } - // Pure: merge scan results let index = merge_volume_scans(&scans); let records: Vec<_> = index @@ -108,9 +104,7 @@ mod tests { #[test] fn test_merge_takes_max_size() { - // Edge case: same key with different sizes across volumes - // (can happen due to incomplete writes or corruption) - // We take the max size as the authoritative value + // Can happen due to incomplete writes or corruption let scans = vec![ ("http://vol1".to_string(), vec![("key".to_string(), 50)]), ("http://vol2".to_string(), vec![("key".to_string(), 200)]), diff --git a/src/server.rs b/src/server.rs index 1fd8923..c251b1b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,17 +15,12 @@ pub struct AppState { pub http: reqwest::Client, } -/// Result of probing volumes for a key. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ProbeResult { - /// Found a healthy volume at this URL Found(String), - /// All volumes were probed, none healthy AllFailed, } -/// Pure function: given volumes and their probe results (true = healthy), -/// returns the first healthy volume URL for the key, or None. pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult { for (vol, &healthy) in volumes.iter().zip(results) { if healthy { @@ -35,8 +30,6 @@ pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult::AllFailed } -/// Pure function: shuffle volumes for load balancing. -/// Takes a seed for deterministic testing. pub fn shuffle_volumes(volumes: Vec, seed: u64) -> Vec { use rand::seq::SliceRandom; use rand::SeedableRng; @@ -55,14 +48,12 @@ pub async fn get_key( return Err(AppError::CorruptRecord { key }); } - // Shuffle for load balancing (random seed in production) let seed = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_nanos() as u64) .unwrap_or(0); let volumes = shuffle_volumes(record.volumes, seed); - // Probe volumes and collect results let mut results = Vec::with_capacity(volumes.len()); for vol in &volumes { let url = format!("{vol}/{key}"); @@ -78,7 +69,6 @@ pub async fn get_key( } }; results.push(healthy); - // Early exit on first healthy volume if healthy { break; } @@ -107,7 +97,6 @@ pub async fn put_key( }); } - // Fan out PUTs to all target volumes concurrently let mut handles = Vec::with_capacity(target_volumes.len()); for vol in &target_volumes { let url = format!("{vol}/{key}"); @@ -243,7 +232,3 @@ pub async fn list_keys( Ok((StatusCode::OK, keys.join("\n")).into_response()) } -// Note: first_healthy_volume and shuffle_volumes are trivial functions -// (essentially .find() and .shuffle()). Testing them would just test -// that standard library functions work. The real test is integration: -// does failover work with actual down volumes? From d66a01e7dabb776f71f2c25f14120af9359a1031 Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sun, 8 Mar 2026 13:54:42 +0100 Subject: [PATCH 06/10] Add support for more architectures --- .github/workflows/ci.yml | 75 ++++++++++++++++++++++++++++++++++------ flake.nix | 11 ++++-- 2 files changed, 73 insertions(+), 13 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 06f7766..83169c3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,29 +8,82 @@ permissions: contents: write jobs: - build: - name: Build packages - runs-on: ubuntu-latest + build-nix: + name: Build ${{ matrix.target }} + runs-on: ${{ matrix.runner }} strategy: fail-fast: false matrix: include: - - system: x86_64-linux + - target: x86_64-unknown-linux-gnu + runner: ubuntu-latest + system: x86_64-linux + package: default + - target: x86_64-unknown-linux-musl + runner: ubuntu-latest + system: x86_64-linux + package: x86_64-linux-musl + - target: aarch64-unknown-linux-gnu + runner: ubuntu-latest + system: x86_64-linux + package: aarch64-linux + - target: x86_64-apple-darwin + runner: macos-13 + system: x86_64-darwin + package: default + - target: aarch64-apple-darwin + runner: macos-latest + system: aarch64-darwin + package: default steps: - uses: actions/checkout@v4 - uses: DeterminateSystems/nix-installer-action@main - # - uses: DeterminateSystems/magic-nix-cache-action@main - - run: nix --print-build-logs build .#packages.$SYSTEM.default - env: - SYSTEM: ${{ matrix.system }} + - run: nix --print-build-logs build .#packages.${{ matrix.system }}.${{ matrix.package }} - name: Prepare release artifact if: startsWith(github.ref, 'refs/tags/') - run: cp result/bin/mkv mkv-${{ matrix.system }} + run: cp result/bin/mkv mkv-${{ matrix.target }} + + - name: Upload artifact + if: startsWith(github.ref, 'refs/tags/') + uses: actions/upload-artifact@v4 + with: + name: mkv-${{ matrix.target }} + path: mkv-${{ matrix.target }} + + build-windows: + name: Build x86_64-pc-windows-msvc + runs-on: windows-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + + - run: cargo build --release + + - name: Prepare release artifact + if: startsWith(github.ref, 'refs/tags/') + run: cp target/release/mkv.exe mkv-x86_64-pc-windows-msvc.exe + + - name: Upload artifact + if: startsWith(github.ref, 'refs/tags/') + uses: actions/upload-artifact@v4 + with: + name: mkv-x86_64-pc-windows-msvc + path: mkv-x86_64-pc-windows-msvc.exe + + release: + name: Create release + needs: [build-nix, build-windows] + runs-on: ubuntu-latest + if: startsWith(github.ref, 'refs/tags/') + steps: + - uses: actions/download-artifact@v4 + with: + path: artifacts + merge-multiple: true - name: Create release - if: startsWith(github.ref, 'refs/tags/') uses: softprops/action-gh-release@v2 with: - files: mkv-${{ matrix.system }} + files: artifacts/* diff --git a/flake.nix b/flake.nix index 3195bf6..25667d5 100644 --- a/flake.nix +++ b/flake.nix @@ -12,14 +12,17 @@ darwinBuildInputs = pkgs.lib.optionals pkgs.stdenv.hostPlatform.isDarwin [ pkgs.libiconv ]; - mkv = pkgs.rustPlatform.buildRustPackage { + mkMkv = pkgs': pkgs'.rustPlatform.buildRustPackage { pname = "mkv"; version = "0.1.0"; cargoLock.lockFile = ./Cargo.lock; src = pkgs.lib.cleanSource ./.; - buildInputs = darwinBuildInputs; + buildInputs = pkgs'.lib.optionals pkgs'.stdenv.hostPlatform.isDarwin [ + pkgs'.libiconv + ]; doCheck = false; }; + mkv = mkMkv pkgs; in { devShells.default = pkgs.mkShell { @@ -33,5 +36,9 @@ packages.default = mkv; checks.default = mkv.overrideAttrs { doCheck = true; }; } + // pkgs.lib.optionalAttrs pkgs.stdenv.hostPlatform.isLinux { + packages.x86_64-linux-musl = mkMkv pkgs.pkgsCross.musl64; + packages.aarch64-linux = mkMkv pkgs.pkgsCross.aarch64-multiplatform; + } ); } From 138ab7224010b819fa11ac38e6e148461c552f2e Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sun, 8 Mar 2026 14:06:10 +0100 Subject: [PATCH 07/10] Idk --- flake.nix | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/flake.nix b/flake.nix index 25667d5..5b47279 100644 --- a/flake.nix +++ b/flake.nix @@ -33,12 +33,13 @@ pkgs.rust-analyzer ]; }; - packages.default = mkv; + packages = { + default = mkv; + } // pkgs.lib.optionalAttrs pkgs.stdenv.hostPlatform.isLinux { + x86_64-linux-musl = mkMkv pkgs.pkgsCross.musl64; + aarch64-linux = mkMkv pkgs.pkgsCross.aarch64-multiplatform; + }; checks.default = mkv.overrideAttrs { doCheck = true; }; } - // pkgs.lib.optionalAttrs pkgs.stdenv.hostPlatform.isLinux { - packages.x86_64-linux-musl = mkMkv pkgs.pkgsCross.musl64; - packages.aarch64-linux = mkMkv pkgs.pkgsCross.aarch64-multiplatform; - } ); } From f19656486a6caee2147272908b9cce525bf56eba Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sun, 8 Mar 2026 17:41:58 +0100 Subject: [PATCH 08/10] Add timeout --- src/lib.rs | 3 +++ src/main.rs | 7 +++++++ src/server.rs | 4 +++- tests/integration.rs | 1 + 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index ba576f6..498c78a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub mod rebuild; pub mod server; use std::sync::Arc; +use std::time::Duration; const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB @@ -13,6 +14,7 @@ pub struct Args { pub db_path: String, pub volumes: Vec, pub replicas: usize, + pub voltimeout: Duration, } pub fn build_app(args: &Args) -> axum::Router { @@ -36,6 +38,7 @@ pub fn build_app(args: &Args) -> axum::Router { db: db::Db::new(&args.db_path), volumes: Arc::new(args.volumes.clone()), replicas: args.replicas, + voltimeout: args.voltimeout, http: reqwest::Client::new(), }; diff --git a/src/main.rs b/src/main.rs index f838428..7ec4727 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use clap::{Parser, Subcommand}; #[derive(Parser)] @@ -18,6 +20,10 @@ struct Cli { #[arg(short, long, env = "MKV_REPLICAS", default_value_t = 2)] replicas: usize, + /// Timeout for volume health checks (in milliseconds) + #[arg(long, env = "MKV_VOLTIMEOUT", default_value_t = 1000)] + voltimeout: u64, + #[command(subcommand)] command: Commands, } @@ -65,6 +71,7 @@ async fn main() { db_path: cli.db, volumes: cli.volumes, replicas: cli.replicas, + voltimeout: Duration::from_millis(cli.voltimeout), }; match cli.command { diff --git a/src/server.rs b/src/server.rs index c251b1b..cf391fc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,6 +3,7 @@ use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use std::sync::Arc; +use std::time::Duration; use crate::db; use crate::error::{AppError, VolumeError}; @@ -12,6 +13,7 @@ pub struct AppState { pub db: db::Db, pub volumes: Arc>, pub replicas: usize, + pub voltimeout: Duration, pub http: reqwest::Client, } @@ -57,7 +59,7 @@ pub async fn get_key( let mut results = Vec::with_capacity(volumes.len()); for vol in &volumes { let url = format!("{vol}/{key}"); - let healthy = match state.http.head(&url).send().await { + let healthy = match state.http.head(&url).timeout(state.voltimeout).send().await { Ok(resp) if resp.status().is_success() => true, Ok(resp) => { tracing::warn!("volume {vol} returned {} for {key}", resp.status()); diff --git a/tests/integration.rs b/tests/integration.rs index 890b23a..cfd46ad 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -19,6 +19,7 @@ async fn start_server() -> String { "http://localhost:3103".into(), ], replicas: 2, + voltimeout: std::time::Duration::from_secs(1), }; let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); From 07e1ab0796a9cbc69020322713ad5dd4184daffb Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sun, 8 Mar 2026 17:55:37 +0100 Subject: [PATCH 09/10] Format --- .gitignore | 2 ++ src/rebuild.rs | 4 +++- src/server.rs | 11 ++++------- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index d875240..68606f8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ *.db* /result *~ +#* +#*# diff --git a/src/rebuild.rs b/src/rebuild.rs index ddea3e7..71b12f7 100644 --- a/src/rebuild.rs +++ b/src/rebuild.rs @@ -19,7 +19,9 @@ pub fn merge_volume_scans( let mut index: HashMap, i64)> = HashMap::new(); for (vol_url, keys) in scans { for (key, size) in keys { - let entry = index.entry(key.clone()).or_insert_with(|| (Vec::new(), *size)); + let entry = index + .entry(key.clone()) + .or_insert_with(|| (Vec::new(), *size)); entry.0.push(vol_url.clone()); if *size > entry.1 { entry.1 = *size; diff --git a/src/server.rs b/src/server.rs index cf391fc..c4af12e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -33,8 +33,8 @@ pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> } pub fn shuffle_volumes(volumes: Vec, seed: u64) -> Vec { - use rand::seq::SliceRandom; use rand::SeedableRng; + use rand::seq::SliceRandom; let mut rng = rand::rngs::StdRng::seed_from_u64(seed); let mut vols = volumes; vols.shuffle(&mut rng); @@ -77,11 +77,9 @@ pub async fn get_key( } match first_healthy_volume(&key, &volumes, &results) { - ProbeResult::Found(url) => Ok(( - StatusCode::FOUND, - [(axum::http::header::LOCATION, url)], - ) - .into_response()), + ProbeResult::Found(url) => { + Ok((StatusCode::FOUND, [(axum::http::header::LOCATION, url)]).into_response()) + } ProbeResult::AllFailed => Err(AppError::AllVolumesUnreachable), } } @@ -233,4 +231,3 @@ pub async fn list_keys( let keys = state.db.list_keys(&query.prefix).await?; Ok((StatusCode::OK, keys.join("\n")).into_response()) } - From 8e6a96633a27079a3a0a49f10b0bdca862ab466e Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sun, 8 Mar 2026 17:57:21 +0100 Subject: [PATCH 10/10] Emacs --- .gitignore | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 68606f8..a4e7adf 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,5 @@ *.db* /result *~ -#* -#*# +\#* +.\#*