Add rebalance tool with --dry-run support

Pure plan_rebalance() computes diffs between current and desired placement.
Execution copies blobs to new volumes, updates index, deletes from old.
Skips keys where copy fails. Includes unit tests for planning logic.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Silas Brack 2026-03-07 10:40:36 +01:00
parent ef6f025c3a
commit 2fad27efc6
3 changed files with 208 additions and 3 deletions

View file

@ -4,6 +4,7 @@ pub mod error;
pub mod hasher;
pub mod health;
pub mod server;
pub mod rebalance;
pub mod rebuild;
pub mod volume;

View file

@ -47,9 +47,8 @@ async fn main() {
Commands::Rebuild => {
mkv::rebuild::run(&config).await;
}
Commands::Rebalance { dry_run: _ } => {
eprintln!("rebalance not yet implemented");
std::process::exit(1);
Commands::Rebalance { dry_run } => {
mkv::rebalance::run(&config, dry_run).await;
}
}
}

205
src/rebalance.rs Normal file
View file

@ -0,0 +1,205 @@
use crate::config::Config;
use crate::db;
use crate::hasher::Ring;
use crate::volume::VolumeClient;
/// What needs to happen to a single key during rebalance.
pub struct KeyMove {
pub key: String,
pub size: Option<i64>,
pub current_volumes: Vec<String>,
pub desired_volumes: Vec<String>,
pub to_add: Vec<String>,
pub to_remove: Vec<String>,
}
/// Pure: compute the diff between current and desired placement for all keys.
pub fn plan_rebalance(
records: &[db::Record],
ring: &Ring,
replication: usize,
) -> Vec<KeyMove> {
let mut moves = Vec::new();
for record in records {
let desired = ring.get_volumes(&record.key, replication);
let to_add: Vec<String> = desired
.iter()
.filter(|v| !record.volumes.contains(v))
.cloned()
.collect();
let to_remove: Vec<String> = record
.volumes
.iter()
.filter(|v| !desired.contains(v))
.cloned()
.collect();
if !to_add.is_empty() || !to_remove.is_empty() {
moves.push(KeyMove {
key: record.key.clone(),
size: record.size,
current_volumes: record.volumes.clone(),
desired_volumes: desired,
to_add,
to_remove,
});
}
}
moves
}
pub async fn run(config: &Config, dry_run: bool) {
let db_path = &config.database.path;
let replication = config.server.replication_factor;
// Open DB read-only to plan
let conn = rusqlite::Connection::open_with_flags(
db_path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
)
.expect("failed to open database");
let records = db::all_records(&conn).expect("failed to read records");
drop(conn);
let ring = Ring::new(&config.volume_urls(), config.server.virtual_nodes);
let moves = plan_rebalance(&records, &ring, replication);
if moves.is_empty() {
eprintln!("Nothing to rebalance — all keys are already correctly placed.");
return;
}
let total_bytes: i64 = moves.iter().filter_map(|m| m.size).sum();
eprintln!(
"{} keys to move ({} bytes)",
moves.len(),
total_bytes
);
if dry_run {
for m in &moves {
eprintln!(
" {} : add {:?}, remove {:?}",
m.key, m.to_add, m.to_remove
);
}
return;
}
// Open writer for updates
let (writer, ready_rx) = db::spawn_writer(db_path.to_string());
ready_rx.await.expect("writer failed to initialize");
let client = VolumeClient::new();
let mut moved = 0;
let mut errors = 0;
for m in &moves {
// Pick a source volume to copy from (any current volume)
let src = &m.current_volumes[0];
// Copy to new volumes
let mut copy_ok = true;
for dst in &m.to_add {
match client.get(src, &m.key).await {
Ok(data) => {
if let Err(e) = client.put(dst, &m.key, data).await {
eprintln!(" ERROR copy {} to {}: {}", m.key, dst, e);
copy_ok = false;
errors += 1;
}
}
Err(e) => {
eprintln!(" ERROR read {} from {}: {}", m.key, src, e);
copy_ok = false;
errors += 1;
}
}
}
if !copy_ok {
continue; // don't update index or delete if copy failed
}
// Update index with new volume list
writer
.put(m.key.clone(), m.desired_volumes.clone(), m.size)
.await
.expect("failed to update index");
// Delete from old volumes
for old in &m.to_remove {
if let Err(e) = client.delete(old, &m.key).await {
eprintln!(" WARN delete {} from {}: {}", m.key, old, e);
}
}
moved += 1;
}
eprintln!("Rebalanced {moved}/{} keys ({errors} errors)", moves.len());
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_plan_rebalance_no_change() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let ring = Ring::new(&volumes, 100);
// Create records that are already correctly placed
let records: Vec<db::Record> = (0..100)
.map(|i| {
let key = format!("key-{i}");
let vols = ring.get_volumes(&key, 2);
db::Record {
key,
volumes: vols,
size: Some(100),
}
})
.collect();
let moves = plan_rebalance(&records, &ring, 2);
assert!(moves.is_empty());
}
#[test]
fn test_plan_rebalance_new_volume() {
let volumes3: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let ring3 = Ring::new(&volumes3, 100);
// Place keys on 3-volume ring
let records: Vec<db::Record> = (0..1000)
.map(|i| {
let key = format!("key-{i}");
let vols = ring3.get_volumes(&key, 2);
db::Record {
key,
volumes: vols,
size: Some(100),
}
})
.collect();
// Build new ring with 4 volumes
let volumes4: Vec<String> = (1..=4).map(|i| format!("http://vol{i}")).collect();
let ring4 = Ring::new(&volumes4, 100);
let moves = plan_rebalance(&records, &ring4, 2);
// Some keys should need to move, but not all
assert!(!moves.is_empty());
assert!(moves.len() < 800, "too many moves: {}", moves.len());
// Every move should involve vol4 (the new volume)
for m in &moves {
let involves_vol4 = m.to_add.iter().any(|v| v == "http://vol4")
|| m.to_remove.iter().any(|v| v == "http://vol4");
assert!(involves_vol4, "move for {} doesn't involve vol4", m.key);
}
}
}