From 2fad27efc6db7045667ffb655072718901f66554 Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sat, 7 Mar 2026 10:40:36 +0100 Subject: [PATCH] Add rebalance tool with --dry-run support Pure plan_rebalance() computes diffs between current and desired placement. Execution copies blobs to new volumes, updates index, deletes from old. Skips keys where copy fails. Includes unit tests for planning logic. Co-Authored-By: Claude Opus 4.6 --- src/lib.rs | 1 + src/main.rs | 5 +- src/rebalance.rs | 205 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+), 3 deletions(-) create mode 100644 src/rebalance.rs diff --git a/src/lib.rs b/src/lib.rs index af9f2f2..8689b72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod error; pub mod hasher; pub mod health; pub mod server; +pub mod rebalance; pub mod rebuild; pub mod volume; diff --git a/src/main.rs b/src/main.rs index 2f49fc6..e337246 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,9 +47,8 @@ async fn main() { Commands::Rebuild => { mkv::rebuild::run(&config).await; } - Commands::Rebalance { dry_run: _ } => { - eprintln!("rebalance not yet implemented"); - std::process::exit(1); + Commands::Rebalance { dry_run } => { + mkv::rebalance::run(&config, dry_run).await; } } } diff --git a/src/rebalance.rs b/src/rebalance.rs new file mode 100644 index 0000000..6bd1df9 --- /dev/null +++ b/src/rebalance.rs @@ -0,0 +1,205 @@ +use crate::config::Config; +use crate::db; +use crate::hasher::Ring; +use crate::volume::VolumeClient; + +/// What needs to happen to a single key during rebalance. +pub struct KeyMove { + pub key: String, + pub size: Option, + pub current_volumes: Vec, + pub desired_volumes: Vec, + pub to_add: Vec, + pub to_remove: Vec, +} + +/// Pure: compute the diff between current and desired placement for all keys. +pub fn plan_rebalance( + records: &[db::Record], + ring: &Ring, + replication: usize, +) -> Vec { + let mut moves = Vec::new(); + for record in records { + let desired = ring.get_volumes(&record.key, replication); + let to_add: Vec = desired + .iter() + .filter(|v| !record.volumes.contains(v)) + .cloned() + .collect(); + let to_remove: Vec = record + .volumes + .iter() + .filter(|v| !desired.contains(v)) + .cloned() + .collect(); + + if !to_add.is_empty() || !to_remove.is_empty() { + moves.push(KeyMove { + key: record.key.clone(), + size: record.size, + current_volumes: record.volumes.clone(), + desired_volumes: desired, + to_add, + to_remove, + }); + } + } + moves +} + +pub async fn run(config: &Config, dry_run: bool) { + let db_path = &config.database.path; + let replication = config.server.replication_factor; + + // Open DB read-only to plan + let conn = rusqlite::Connection::open_with_flags( + db_path, + rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY, + ) + .expect("failed to open database"); + + let records = db::all_records(&conn).expect("failed to read records"); + drop(conn); + + let ring = Ring::new(&config.volume_urls(), config.server.virtual_nodes); + let moves = plan_rebalance(&records, &ring, replication); + + if moves.is_empty() { + eprintln!("Nothing to rebalance — all keys are already correctly placed."); + return; + } + + let total_bytes: i64 = moves.iter().filter_map(|m| m.size).sum(); + eprintln!( + "{} keys to move ({} bytes)", + moves.len(), + total_bytes + ); + + if dry_run { + for m in &moves { + eprintln!( + " {} : add {:?}, remove {:?}", + m.key, m.to_add, m.to_remove + ); + } + return; + } + + // Open writer for updates + let (writer, ready_rx) = db::spawn_writer(db_path.to_string()); + ready_rx.await.expect("writer failed to initialize"); + + let client = VolumeClient::new(); + let mut moved = 0; + let mut errors = 0; + + for m in &moves { + // Pick a source volume to copy from (any current volume) + let src = &m.current_volumes[0]; + + // Copy to new volumes + let mut copy_ok = true; + for dst in &m.to_add { + match client.get(src, &m.key).await { + Ok(data) => { + if let Err(e) = client.put(dst, &m.key, data).await { + eprintln!(" ERROR copy {} to {}: {}", m.key, dst, e); + copy_ok = false; + errors += 1; + } + } + Err(e) => { + eprintln!(" ERROR read {} from {}: {}", m.key, src, e); + copy_ok = false; + errors += 1; + } + } + } + + if !copy_ok { + continue; // don't update index or delete if copy failed + } + + // Update index with new volume list + writer + .put(m.key.clone(), m.desired_volumes.clone(), m.size) + .await + .expect("failed to update index"); + + // Delete from old volumes + for old in &m.to_remove { + if let Err(e) = client.delete(old, &m.key).await { + eprintln!(" WARN delete {} from {}: {}", m.key, old, e); + } + } + + moved += 1; + } + + eprintln!("Rebalanced {moved}/{} keys ({errors} errors)", moves.len()); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_plan_rebalance_no_change() { + let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); + let ring = Ring::new(&volumes, 100); + + // Create records that are already correctly placed + let records: Vec = (0..100) + .map(|i| { + let key = format!("key-{i}"); + let vols = ring.get_volumes(&key, 2); + db::Record { + key, + volumes: vols, + size: Some(100), + } + }) + .collect(); + + let moves = plan_rebalance(&records, &ring, 2); + assert!(moves.is_empty()); + } + + #[test] + fn test_plan_rebalance_new_volume() { + let volumes3: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); + let ring3 = Ring::new(&volumes3, 100); + + // Place keys on 3-volume ring + let records: Vec = (0..1000) + .map(|i| { + let key = format!("key-{i}"); + let vols = ring3.get_volumes(&key, 2); + db::Record { + key, + volumes: vols, + size: Some(100), + } + }) + .collect(); + + // Build new ring with 4 volumes + let volumes4: Vec = (1..=4).map(|i| format!("http://vol{i}")).collect(); + let ring4 = Ring::new(&volumes4, 100); + + let moves = plan_rebalance(&records, &ring4, 2); + + // Some keys should need to move, but not all + assert!(!moves.is_empty()); + assert!(moves.len() < 800, "too many moves: {}", moves.len()); + + // Every move should involve vol4 (the new volume) + for m in &moves { + let involves_vol4 = m.to_add.iter().any(|v| v == "http://vol4") + || m.to_remove.iter().any(|v| v == "http://vol4"); + assert!(involves_vol4, "move for {} doesn't involve vol4", m.key); + } + } +}