Simplify moar

This commit is contained in:
Silas Brack 2026-03-07 13:23:38 +01:00
parent 7f3ec69cf6
commit 07490efc28
14 changed files with 261 additions and 1061 deletions

98
Cargo.lock generated
View file

@ -377,23 +377,6 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]]
name = "futures-io"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
[[package]]
name = "futures-macro"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.32"
@ -413,11 +396,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"slab",
]
@ -878,7 +857,6 @@ name = "mkv"
version = "0.1.0"
dependencies = [
"axum",
"bytes",
"clap",
"reqwest",
"rusqlite",
@ -886,7 +864,6 @@ dependencies = [
"serde_json",
"sha2",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
]
@ -1082,7 +1059,6 @@ dependencies = [
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
@ -1104,14 +1080,12 @@ dependencies = [
"sync_wrapper",
"tokio",
"tokio-native-tls",
"tokio-util",
"tower",
"tower-http",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
]
@ -1299,15 +1273,6 @@ dependencies = [
"serde_core",
]
[[package]]
name = "serde_spanned"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3"
dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@ -1541,47 +1506,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "toml"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit",
]
[[package]]
name = "toml_datetime"
version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c"
dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.22.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a"
dependencies = [
"indexmap",
"serde",
"serde_spanned",
"toml_datetime",
"toml_write",
"winnow",
]
[[package]]
name = "toml_write"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
[[package]]
name = "tower"
version = "0.5.3"
@ -1872,19 +1796,6 @@ dependencies = [
"wasmparser",
]
[[package]]
name = "wasm-streams"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "wasmparser"
version = "0.244.0"
@ -2024,15 +1935,6 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "winnow"
version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945"
dependencies = [
"memchr",
]
[[package]]
name = "wit-bindgen"
version = "0.51.0"

View file

@ -7,15 +7,13 @@ edition = "2024"
axum = "0.8"
tokio = { version = "1", features = ["full"] }
rusqlite = { version = "0.35", features = ["bundled"] }
reqwest = { version = "0.12", features = ["stream", "json"] }
reqwest = { version = "0.12", features = ["json"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
toml = "0.8"
clap = { version = "4", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = "0.3"
sha2 = "0.10"
bytes = "1"
[profile.release]
opt-level = 3

View file

@ -1,47 +0,0 @@
use serde::Deserialize;
use std::path::Path;
#[derive(Debug, Deserialize, Clone)]
pub struct Config {
pub server: ServerConfig,
pub database: DatabaseConfig,
pub volumes: Vec<VolumeConfig>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct ServerConfig {
pub port: u16,
pub replication_factor: usize,
pub virtual_nodes: usize,
}
#[derive(Debug, Deserialize, Clone)]
pub struct DatabaseConfig {
pub path: String,
}
#[derive(Debug, Deserialize, Clone)]
pub struct VolumeConfig {
pub url: String,
}
impl Config {
pub fn load(path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
let contents = std::fs::read_to_string(path)?;
let config: Config = toml::from_str(&contents)?;
if config.volumes.is_empty() {
return Err("at least one volume is required".into());
}
if config.server.replication_factor == 0 {
return Err("replication_factor must be >= 1".into());
}
if config.server.replication_factor > config.volumes.len() {
return Err("replication_factor exceeds number of volumes".into());
}
Ok(config)
}
pub fn volume_urls(&self) -> Vec<String> {
self.volumes.iter().map(|v| v.url.clone()).collect()
}
}

216
src/db.rs
View file

@ -1,11 +1,8 @@
use rusqlite::{params, Connection, OpenFlags};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use crate::error::AppError;
// --- Record type ---
#[derive(Debug, Clone)]
pub struct Record {
pub key: String,
@ -13,174 +10,95 @@ pub struct Record {
pub size: Option<i64>,
}
// --- SQLite setup ---
fn apply_pragmas(conn: &Connection) {
conn.execute_batch(
"
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA busy_timeout = 5000;
PRAGMA temp_store = memory;
PRAGMA cache_size = -64000;
PRAGMA mmap_size = 268435456;
PRAGMA page_size = 4096;
",
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA busy_timeout = 5000;
PRAGMA temp_store = memory;
PRAGMA cache_size = -64000;
PRAGMA mmap_size = 268435456;
PRAGMA page_size = 4096;",
)
.expect("failed to set pragmas");
}
fn open_readonly(path: &str) -> Connection {
let conn = Connection::open_with_flags(
path,
OpenFlags::SQLITE_OPEN_READ_ONLY
| OpenFlags::SQLITE_OPEN_NO_MUTEX
| OpenFlags::SQLITE_OPEN_URI,
)
.expect("failed to open read connection");
apply_pragmas(&conn);
conn
fn parse_volumes(s: &str) -> Vec<String> {
serde_json::from_str(s).unwrap_or_default()
}
fn open_readwrite(path: &str) -> Connection {
let conn = Connection::open_with_flags(
path,
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_NO_MUTEX
| OpenFlags::SQLITE_OPEN_URI,
)
.expect("failed to open write connection");
apply_pragmas(&conn);
conn
fn encode_volumes(v: &[String]) -> String {
serde_json::to_string(v).unwrap()
}
fn create_tables(conn: &Connection) {
conn.execute_batch(
"
CREATE TABLE IF NOT EXISTS kv (
key TEXT PRIMARY KEY,
volumes TEXT NOT NULL,
size INTEGER,
created_at INTEGER DEFAULT (unixepoch())
);
",
)
.expect("failed to create tables");
}
fn parse_volumes(volumes_json: &str) -> Vec<String> {
serde_json::from_str(volumes_json).unwrap_or_default()
}
fn encode_volumes(volumes: &[String]) -> String {
serde_json::to_string(volumes).unwrap()
}
// --- ReadPool ---
/// A single SQLite connection behind a mutex, used for both reads and writes.
#[derive(Clone)]
pub struct ReadPool {
conns: Vec<Arc<Mutex<Connection>>>,
next: Arc<AtomicUsize>,
pub struct Db {
conn: Arc<Mutex<Connection>>,
}
impl ReadPool {
pub fn new(path: &str, size: usize) -> Self {
let conns = (0..size)
.map(|_| Arc::new(Mutex::new(open_readonly(path))))
.collect();
Self {
conns,
next: Arc::new(AtomicUsize::new(0)),
}
impl Db {
pub fn new(path: &str) -> Self {
let conn = Connection::open_with_flags(
path,
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_NO_MUTEX
| OpenFlags::SQLITE_OPEN_URI,
)
.expect("failed to open database");
apply_pragmas(&conn);
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS kv (
key TEXT PRIMARY KEY,
volumes TEXT NOT NULL,
size INTEGER,
created_at INTEGER DEFAULT (unixepoch())
);",
)
.expect("failed to create tables");
Self { conn: Arc::new(Mutex::new(conn)) }
}
pub async fn query<T, F>(&self, f: F) -> Result<T, AppError>
where
T: Send + 'static,
F: FnOnce(&Connection) -> Result<T, AppError> + Send + 'static,
{
let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.conns.len();
let conn = self.conns[idx].clone();
pub async fn get(&self, key: &str) -> Result<Record, AppError> {
let conn = self.conn.clone();
let key = key.to_string();
tokio::task::spawn_blocking(move || {
let conn = conn.lock().unwrap();
f(&conn)
let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv WHERE key = ?1")?;
Ok(stmt.query_row(params![key], |row| {
let vj: String = row.get(1)?;
Ok(Record { key: row.get(0)?, volumes: parse_volumes(&vj), size: row.get(2)? })
})?)
})
.await
.unwrap()
}
}
// --- Read query functions ---
pub fn get(conn: &Connection, key: &str) -> Result<Record, AppError> {
let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv WHERE key = ?1")?;
Ok(stmt.query_row(params![key], |row| {
let volumes_json: String = row.get(1)?;
Ok(Record {
key: row.get(0)?,
volumes: parse_volumes(&volumes_json),
size: row.get(2)?,
pub async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, AppError> {
let conn = self.conn.clone();
let pattern = format!("{prefix}%");
tokio::task::spawn_blocking(move || {
let conn = conn.lock().unwrap();
let mut stmt = conn.prepare_cached("SELECT key FROM kv WHERE key LIKE ?1 ORDER BY key")?;
let keys = stmt
.query_map(params![pattern], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()?;
Ok(keys)
})
})?)
}
pub fn list_keys(conn: &Connection, prefix: &str) -> Result<Vec<String>, AppError> {
let mut stmt = conn.prepare_cached("SELECT key FROM kv WHERE key LIKE ?1 ORDER BY key")?;
let pattern = format!("{prefix}%");
let keys = stmt
.query_map(params![pattern], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()?;
Ok(keys)
}
pub fn all_records(conn: &Connection) -> Result<Vec<Record>, AppError> {
let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv")?;
let records = stmt
.query_map([], |row| {
let volumes_json: String = row.get(1)?;
Ok(Record {
key: row.get(0)?,
volumes: parse_volumes(&volumes_json),
size: row.get(2)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
// --- WriterHandle ---
#[derive(Clone)]
pub struct WriterHandle {
conn: Arc<Mutex<Connection>>,
}
impl WriterHandle {
pub fn new(path: &str) -> Self {
let conn = open_readwrite(path);
create_tables(&conn);
Self {
conn: Arc::new(Mutex::new(conn)),
}
.await
.unwrap()
}
pub async fn put(
&self,
key: String,
volumes: Vec<String>,
size: Option<i64>,
) -> Result<(), AppError> {
pub async fn put(&self, key: String, volumes: Vec<String>, size: Option<i64>) -> Result<(), AppError> {
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || {
let conn = conn.lock().unwrap();
let volumes_json = encode_volumes(&volumes);
conn.prepare_cached(
"INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3)
ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3",
)?
.execute(params![key, volumes_json, size])?;
.execute(params![key, encode_volumes(&volumes), size])?;
Ok(())
})
.await
@ -199,10 +117,7 @@ impl WriterHandle {
.unwrap()
}
pub async fn bulk_put(
&self,
records: Vec<(String, Vec<String>, Option<i64>)>,
) -> Result<(), AppError> {
pub async fn bulk_put(&self, records: Vec<(String, Vec<String>, Option<i64>)>) -> Result<(), AppError> {
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || {
let conn = conn.lock().unwrap();
@ -211,12 +126,23 @@ impl WriterHandle {
ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3",
)?;
for (key, volumes, size) in &records {
let volumes_json = encode_volumes(volumes);
stmt.execute(params![key, volumes_json, size])?;
stmt.execute(params![key, encode_volumes(volumes), size])?;
}
Ok(())
})
.await
.unwrap()
}
pub fn all_records_sync(&self) -> Result<Vec<Record>, AppError> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv")?;
let records = stmt
.query_map([], |row| {
let vj: String = row.get(1)?;
Ok(Record { key: row.get(0)?, volumes: parse_volumes(&vj), size: row.get(2)? })
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
}

View file

@ -6,7 +6,6 @@ pub enum AppError {
NotFound,
Db(rusqlite::Error),
VolumeError(String),
NoHealthyVolume,
}
impl From<rusqlite::Error> for AppError {
@ -24,7 +23,6 @@ impl std::fmt::Display for AppError {
AppError::NotFound => write!(f, "not found"),
AppError::Db(e) => write!(f, "database error: {e}"),
AppError::VolumeError(msg) => write!(f, "volume error: {msg}"),
AppError::NoHealthyVolume => write!(f, "no healthy volume available"),
}
}
}
@ -33,7 +31,6 @@ impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, msg) = match &self {
AppError::NotFound => (StatusCode::NOT_FOUND, self.to_string()),
AppError::NoHealthyVolume => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()),
_ => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
};
(status, msg).into_response()

View file

@ -1,61 +1,18 @@
use sha2::{Digest, Sha256};
use std::collections::BTreeMap;
pub struct Ring {
nodes: BTreeMap<u64, String>,
virtual_nodes: usize,
}
impl Ring {
pub fn new(volumes: &[String], virtual_nodes: usize) -> Self {
let mut ring = Self {
nodes: BTreeMap::new(),
virtual_nodes,
};
for url in volumes {
ring.add_volume(url);
}
ring
}
pub fn add_volume(&mut self, url: &str) {
for i in 0..self.virtual_nodes {
let hash = hash_key(&format!("{url}:{i}"));
self.nodes.insert(hash, url.to_string());
}
}
pub fn remove_volume(&mut self, url: &str) {
self.nodes.retain(|_, v| v != url);
}
/// Walk the ring clockwise from the key's hash position and return
/// `count` distinct physical volumes.
pub fn get_volumes(&self, key: &str, count: usize) -> Vec<String> {
if self.nodes.is_empty() {
return vec![];
}
let hash = hash_key(key);
let mut result = Vec::with_capacity(count);
// Walk clockwise from hash position
for (_, url) in self.nodes.range(hash..).chain(self.nodes.iter()) {
if !result.contains(url) {
result.push(url.clone());
}
if result.len() == count {
break;
}
}
result
}
}
fn hash_key(input: &str) -> u64 {
let hash = Sha256::digest(input.as_bytes());
u64::from_be_bytes(hash[..8].try_into().unwrap())
/// Pick `count` volumes for a key by hashing key+volume, sorting by score.
/// Same idea as minikeyvalue's key2volume — stable in volume name, not position.
pub fn volumes_for_key(key: &str, volumes: &[String], count: usize) -> Vec<String> {
let mut scored: Vec<(u64, &String)> = volumes
.iter()
.map(|v| {
let hash = Sha256::digest(format!("{key}:{v}").as_bytes());
let score = u64::from_be_bytes(hash[..8].try_into().unwrap());
(score, v)
})
.collect();
scored.sort_by_key(|(score, _)| *score);
scored.into_iter().take(count).map(|(_, v)| v.clone()).collect()
}
#[cfg(test)]
@ -63,48 +20,29 @@ mod tests {
use super::*;
#[test]
fn test_ring_distribution() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect();
let ring = Ring::new(&volumes, 100);
let selected = ring.get_volumes("test-key", 3);
assert_eq!(selected.len(), 3);
for v in &volumes {
assert!(selected.contains(v), "missing volume {v}");
}
}
#[test]
fn test_ring_deterministic() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect();
let ring = Ring::new(&volumes, 100);
let a = ring.get_volumes("my-key", 2);
let b = ring.get_volumes("my-key", 2);
fn test_deterministic() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let a = volumes_for_key("my-key", &volumes, 2);
let b = volumes_for_key("my-key", &volumes, 2);
assert_eq!(a, b);
}
#[test]
fn test_ring_count_capped() {
let volumes: Vec<String> = (1..=2).map(|i| format!("http://vol{i}:300{i}")).collect();
let ring = Ring::new(&volumes, 100);
let selected = ring.get_volumes("key", 5);
fn test_count_capped() {
let volumes: Vec<String> = (1..=2).map(|i| format!("http://vol{i}")).collect();
let selected = volumes_for_key("key", &volumes, 5);
assert_eq!(selected.len(), 2);
}
#[test]
fn test_ring_even_distribution() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect();
let ring = Ring::new(&volumes, 100);
fn test_even_distribution() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let mut counts = std::collections::HashMap::new();
for i in 0..3000 {
let key = format!("key-{i}");
let primary = &ring.get_volumes(&key, 1)[0];
let primary = &volumes_for_key(&key, &volumes, 1)[0];
*counts.entry(primary.clone()).or_insert(0u32) += 1;
}
for (vol, count) in &counts {
assert!(
*count > 700 && *count < 1300,
@ -114,65 +52,30 @@ mod tests {
}
#[test]
fn test_ring_stability_on_add() {
// Adding a 4th volume to a 3-volume ring should only move ~25% of keys
fn test_stability_on_add() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let ring_before = Ring::new(&volumes, 100);
let mut volumes4 = volumes.clone();
volumes4.push("http://vol4".into());
let ring_after = Ring::new(&volumes4, 100);
let total = 10000;
let mut moved = 0;
for i in 0..total {
let key = format!("key-{i}");
let before = &ring_before.get_volumes(&key, 1)[0];
let after = &ring_after.get_volumes(&key, 1)[0];
let before = &volumes_for_key(&key, &volumes, 1)[0];
let after = &volumes_for_key(&key, &volumes4, 1)[0];
if before != after {
moved += 1;
}
}
// Ideal: 1/4 of keys move (25%). Allow 15%-40%.
let pct = moved as f64 / total as f64 * 100.0;
assert!(
pct > 15.0 && pct < 40.0,
"expected ~25% of keys to move, got {pct:.1}% ({moved}/{total})"
"expected ~25% of keys to move, got {pct:.1}%"
);
}
#[test]
fn test_ring_stability_on_remove() {
// Removing 1 volume from a 4-volume ring should only move ~25% of keys
let volumes: Vec<String> = (1..=4).map(|i| format!("http://vol{i}")).collect();
let ring_before = Ring::new(&volumes, 100);
let volumes3: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let ring_after = Ring::new(&volumes3, 100);
let total = 10000;
let mut moved = 0;
for i in 0..total {
let key = format!("key-{i}");
let before = &ring_before.get_volumes(&key, 1)[0];
let after = &ring_after.get_volumes(&key, 1)[0];
if before != after {
moved += 1;
}
}
// Ideal: 1/4 of keys move (25%). Allow 15%-40%.
let pct = moved as f64 / total as f64 * 100.0;
assert!(
pct > 15.0 && pct < 40.0,
"expected ~25% of keys to move, got {pct:.1}% ({moved}/{total})"
);
}
#[test]
fn test_ring_empty() {
let ring = Ring::new(&[], 100);
assert_eq!(ring.get_volumes("key", 1), Vec::<String>::new());
fn test_empty() {
assert_eq!(volumes_for_key("key", &[], 1), Vec::<String>::new());
}
}

View file

@ -1,32 +0,0 @@
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use crate::volume::VolumeClient;
pub type HealthyVolumes = Arc<RwLock<HashSet<String>>>;
pub fn spawn_health_checker(
volume_client: VolumeClient,
all_volumes: Vec<String>,
healthy: HealthyVolumes,
) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
for url in &all_volumes {
let ok = volume_client.check(url).await;
let mut set = healthy.write().await;
if ok {
set.insert(url.clone());
} else {
if set.remove(url) {
tracing::warn!("Volume {url} is unhealthy");
}
}
}
}
});
}

View file

@ -1,60 +1,31 @@
pub mod config;
pub mod db;
pub mod error;
pub mod hasher;
pub mod health;
pub mod server;
pub mod rebalance;
pub mod rebuild;
pub mod volume;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
/// Build the axum Router with all state wired up. Returns the router and
/// a handle to the writer (caller must keep it alive).
pub async fn build_app(config: config::Config) -> axum::Router {
let db_path = &config.database.path;
pub struct Args {
pub db_path: String,
pub volumes: Vec<String>,
pub replicas: usize,
}
if let Some(parent) = std::path::Path::new(db_path).parent() {
pub fn build_app(args: &Args) -> axum::Router {
if let Some(parent) = std::path::Path::new(&args.db_path).parent() {
std::fs::create_dir_all(parent).unwrap_or_else(|e| {
eprintln!("Failed to create database directory: {e}");
std::process::exit(1);
});
}
let writer = db::WriterHandle::new(db_path);
let num_readers = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let reads = db::ReadPool::new(db_path, num_readers);
let volume_urls = config.volume_urls();
let ring = Arc::new(RwLock::new(hasher::Ring::new(
&volume_urls,
config.server.virtual_nodes,
)));
let volume_client = volume::VolumeClient::new();
let healthy_volumes: health::HealthyVolumes =
Arc::new(RwLock::new(HashSet::from_iter(volume_urls.clone())));
health::spawn_health_checker(
volume_client.clone(),
volume_urls,
healthy_volumes.clone(),
);
let state = server::AppState {
writer,
reads,
ring,
volume_client,
healthy_volumes,
config: Arc::new(config),
db: db::Db::new(&args.db_path),
volumes: Arc::new(args.volumes.clone()),
replicas: args.replicas,
http: reqwest::Client::new(),
};
axum::Router::new()

View file

@ -1,11 +1,16 @@
use clap::{Parser, Subcommand};
use std::path::PathBuf;
#[derive(Parser)]
#[command(name = "mkv", about = "Distributed key-value store")]
struct Cli {
#[arg(short, long, default_value = "config.toml")]
config: PathBuf,
#[arg(short, long, default_value = "/tmp/mkv/index.db")]
db: String,
#[arg(short, long, required = true, value_delimiter = ',')]
volumes: Vec<String>,
#[arg(short, long, default_value_t = 2)]
replicas: usize,
#[command(subcommand)]
command: Commands,
@ -14,7 +19,10 @@ struct Cli {
#[derive(Subcommand)]
enum Commands {
/// Start the index server
Serve,
Serve {
#[arg(short, long, default_value_t = 3000)]
port: u16,
},
/// Rebuild SQLite index from volume servers
Rebuild,
/// Rebalance data after adding/removing volumes
@ -27,28 +35,27 @@ enum Commands {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let cli = Cli::parse();
let config = mkv::config::Config::load(&cli.config).unwrap_or_else(|e| {
eprintln!("Failed to load config: {e}");
std::process::exit(1);
});
let args = mkv::Args {
db_path: cli.db,
volumes: cli.volumes,
replicas: cli.replicas,
};
match cli.command {
Commands::Serve => {
let port = config.server.port;
let app = mkv::build_app(config).await;
Commands::Serve { port } => {
let app = mkv::build_app(&args);
let addr = format!("0.0.0.0:{port}");
tracing::info!("Listening on {addr}");
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}
Commands::Rebuild => {
mkv::rebuild::run(&config).await;
mkv::rebuild::run(&args).await;
}
Commands::Rebalance { dry_run } => {
mkv::rebalance::run(&config, dry_run).await;
mkv::rebalance::run(&args, dry_run).await;
}
}
}

View file

@ -1,9 +1,6 @@
use crate::config::Config;
use crate::db;
use crate::hasher::Ring;
use crate::volume::VolumeClient;
use crate::Args;
/// What needs to happen to a single key during rebalance.
pub struct KeyMove {
pub key: String,
pub size: Option<i64>,
@ -13,26 +10,12 @@ pub struct KeyMove {
pub to_remove: Vec<String>,
}
/// Pure: compute the diff between current and desired placement for all keys.
pub fn plan_rebalance(
records: &[db::Record],
ring: &Ring,
replication: usize,
) -> Vec<KeyMove> {
pub fn plan_rebalance(records: &[db::Record], volumes: &[String], replication: usize) -> Vec<KeyMove> {
let mut moves = Vec::new();
for record in records {
let desired = ring.get_volumes(&record.key, replication);
let to_add: Vec<String> = desired
.iter()
.filter(|v| !record.volumes.contains(v))
.cloned()
.collect();
let to_remove: Vec<String> = record
.volumes
.iter()
.filter(|v| !desired.contains(v))
.cloned()
.collect();
let desired = crate::hasher::volumes_for_key(&record.key, volumes, replication);
let to_add: Vec<String> = desired.iter().filter(|v| !record.volumes.contains(v)).cloned().collect();
let to_remove: Vec<String> = record.volumes.iter().filter(|v| !desired.contains(v)).cloned().collect();
if !to_add.is_empty() || !to_remove.is_empty() {
moves.push(KeyMove {
@ -48,22 +31,10 @@ pub fn plan_rebalance(
moves
}
pub async fn run(config: &Config, dry_run: bool) {
let db_path = &config.database.path;
let replication = config.server.replication_factor;
// Open DB read-only to plan
let conn = rusqlite::Connection::open_with_flags(
db_path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
)
.expect("failed to open database");
let records = db::all_records(&conn).expect("failed to read records");
drop(conn);
let ring = Ring::new(&config.volume_urls(), config.server.virtual_nodes);
let moves = plan_rebalance(&records, &ring, replication);
pub async fn run(args: &Args, dry_run: bool) {
let db = db::Db::new(&args.db_path);
let records = db.all_records_sync().expect("failed to read records");
let moves = plan_rebalance(&records, &args.volumes, args.replicas);
if moves.is_empty() {
eprintln!("Nothing to rebalance — all keys are already correctly placed.");
@ -71,44 +42,40 @@ pub async fn run(config: &Config, dry_run: bool) {
}
let total_bytes: i64 = moves.iter().filter_map(|m| m.size).sum();
eprintln!(
"{} keys to move ({} bytes)",
moves.len(),
total_bytes
);
eprintln!("{} keys to move ({} bytes)", moves.len(), total_bytes);
if dry_run {
for m in &moves {
eprintln!(
" {} : add {:?}, remove {:?}",
m.key, m.to_add, m.to_remove
);
eprintln!(" {} : add {:?}, remove {:?}", m.key, m.to_add, m.to_remove);
}
return;
}
// Open writer for updates
let writer = db::WriterHandle::new(db_path);
let client = VolumeClient::new();
let client = reqwest::Client::new();
let mut moved = 0;
let mut errors = 0;
for m in &moves {
// Pick a source volume to copy from (any current volume)
let src = &m.current_volumes[0];
// Copy to new volumes
let mut copy_ok = true;
for dst in &m.to_add {
match client.get(src, &m.key).await {
Ok(data) => {
if let Err(e) = client.put(dst, &m.key, data).await {
let src_url = format!("{src}/{}", m.key);
match client.get(&src_url).send().await {
Ok(resp) if resp.status().is_success() => {
let data = resp.bytes().await.unwrap();
let dst_url = format!("{dst}/{}", m.key);
if let Err(e) = client.put(&dst_url).body(data).send().await {
eprintln!(" ERROR copy {} to {}: {}", m.key, dst, e);
copy_ok = false;
errors += 1;
}
}
Ok(resp) => {
eprintln!(" ERROR read {} from {}: status {}", m.key, src, resp.status());
copy_ok = false;
errors += 1;
}
Err(e) => {
eprintln!(" ERROR read {} from {}: {}", m.key, src, e);
copy_ok = false;
@ -117,23 +84,16 @@ pub async fn run(config: &Config, dry_run: bool) {
}
}
if !copy_ok {
continue; // don't update index or delete if copy failed
}
if !copy_ok { continue; }
// Update index with new volume list
writer
.put(m.key.clone(), m.desired_volumes.clone(), m.size)
.await
.expect("failed to update index");
db.put(m.key.clone(), m.desired_volumes.clone(), m.size).await.expect("failed to update index");
// Delete from old volumes
for old in &m.to_remove {
if let Err(e) = client.delete(old, &m.key).await {
let url = format!("{old}/{}", m.key);
if let Err(e) = client.delete(&url).send().await {
eprintln!(" WARN delete {} from {}: {}", m.key, old, e);
}
}
moved += 1;
}
@ -147,58 +107,33 @@ mod tests {
#[test]
fn test_plan_rebalance_no_change() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let ring = Ring::new(&volumes, 100);
// Create records that are already correctly placed
let records: Vec<db::Record> = (0..100)
.map(|i| {
let key = format!("key-{i}");
let vols = ring.get_volumes(&key, 2);
db::Record {
key,
volumes: vols,
size: Some(100),
}
let vols = crate::hasher::volumes_for_key(&key, &volumes, 2);
db::Record { key, volumes: vols, size: Some(100) }
})
.collect();
let moves = plan_rebalance(&records, &ring, 2);
let moves = plan_rebalance(&records, &volumes, 2);
assert!(moves.is_empty());
}
#[test]
fn test_plan_rebalance_new_volume() {
let volumes3: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let ring3 = Ring::new(&volumes3, 100);
// Place keys on 3-volume ring
let records: Vec<db::Record> = (0..1000)
.map(|i| {
let key = format!("key-{i}");
let vols = ring3.get_volumes(&key, 2);
db::Record {
key,
volumes: vols,
size: Some(100),
}
let vols = crate::hasher::volumes_for_key(&key, &volumes3, 2);
db::Record { key, volumes: vols, size: Some(100) }
})
.collect();
// Build new ring with 4 volumes
let volumes4: Vec<String> = (1..=4).map(|i| format!("http://vol{i}")).collect();
let ring4 = Ring::new(&volumes4, 100);
let moves = plan_rebalance(&records, &volumes4, 2);
let moves = plan_rebalance(&records, &ring4, 2);
// Some keys should need to move, but not all
assert!(!moves.is_empty());
assert!(moves.len() < 800, "too many moves: {}", moves.len());
// Every move should involve vol4 (the new volume)
for m in &moves {
let involves_vol4 = m.to_add.iter().any(|v| v == "http://vol4")
|| m.to_remove.iter().any(|v| v == "http://vol4");
assert!(involves_vol4, "move for {} doesn't involve vol4", m.key);
}
}
}

View file

@ -1,7 +1,7 @@
use std::collections::HashMap;
use crate::config::Config;
use crate::db;
use crate::Args;
#[derive(serde::Deserialize)]
struct NginxEntry {
@ -12,36 +12,20 @@ struct NginxEntry {
size: Option<i64>,
}
/// List all keys on a volume by recursively walking nginx autoindex.
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
let http = reqwest::Client::new();
let mut keys = Vec::new();
let mut dirs = vec![String::new()]; // start at root
let mut dirs = vec![String::new()];
while let Some(prefix) = dirs.pop() {
let url = format!("{volume_url}/{prefix}");
let resp = http
.get(&url)
.send()
.await
.map_err(|e| format!("GET {url}: {e}"))?;
let resp = http.get(&url).send().await.map_err(|e| format!("GET {url}: {e}"))?;
if !resp.status().is_success() {
return Err(format!("GET {url}: status {}", resp.status()));
}
let entries: Vec<NginxEntry> = resp
.json()
.await
.map_err(|e| format!("parse {url}: {e}"))?;
let entries: Vec<NginxEntry> = resp.json().await.map_err(|e| format!("parse {url}: {e}"))?;
for entry in entries {
let full_path = if prefix.is_empty() {
entry.name.clone()
} else {
format!("{prefix}{}", entry.name)
};
let full_path = if prefix.is_empty() { entry.name.clone() } else { format!("{prefix}{}", entry.name) };
match entry.entry_type.as_str() {
"directory" => dirs.push(format!("{full_path}/")),
"file" => keys.push((full_path, entry.size.unwrap_or(0))),
@ -49,31 +33,24 @@ async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String
}
}
}
Ok(keys)
}
pub async fn run(config: &Config) {
let db_path = &config.database.path;
pub async fn run(args: &Args) {
let db_path = &args.db_path;
// Ensure parent directory exists
if let Some(parent) = std::path::Path::new(db_path).parent() {
let _ = std::fs::create_dir_all(parent);
}
// Delete old database
let _ = std::fs::remove_file(db_path);
let _ = std::fs::remove_file(format!("{db_path}-wal"));
let _ = std::fs::remove_file(format!("{db_path}-shm"));
let writer = db::WriterHandle::new(db_path);
let volume_urls = config.volume_urls();
// key -> (volumes, size)
let db = db::Db::new(db_path);
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
for vol_url in &volume_urls {
for vol_url in &args.volumes {
eprintln!("Scanning {vol_url}...");
match list_volume_keys(vol_url).await {
Ok(keys) => {
@ -81,26 +58,15 @@ pub async fn run(config: &Config) {
for (key, size) in keys {
let entry = index.entry(key).or_insert_with(|| (Vec::new(), size));
entry.0.push(vol_url.clone());
// Use the largest size seen (they should all match)
if size > entry.1 {
entry.1 = size;
}
if size > entry.1 { entry.1 = size; }
}
}
Err(e) => {
eprintln!(" Error scanning {vol_url}: {e}");
}
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"),
}
}
// Batch insert into SQLite
let records: Vec<(String, Vec<String>, Option<i64>)> = index
.into_iter()
.map(|(key, (volumes, size))| (key, volumes, Some(size)))
.collect();
let records: Vec<_> = index.into_iter().map(|(k, (v, s))| (k, v, Some(s))).collect();
let count = records.len();
writer.bulk_put(records).await.expect("bulk_put failed");
db.bulk_put(records).await.expect("bulk_put failed");
eprintln!("Rebuilt index with {count} keys");
}

View file

@ -2,201 +2,113 @@ use axum::body::Bytes;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::config::Config;
use crate::db;
use crate::error::AppError;
use crate::hasher::Ring;
use crate::volume::VolumeClient;
#[derive(Clone)]
pub struct AppState {
pub writer: db::WriterHandle,
pub reads: db::ReadPool,
pub ring: Arc<RwLock<Ring>>,
pub volume_client: VolumeClient,
pub healthy_volumes: Arc<RwLock<HashSet<String>>>,
pub config: Arc<Config>,
pub db: db::Db,
pub volumes: Arc<Vec<String>>,
pub replicas: usize,
pub http: reqwest::Client,
}
// --- Pure decision functions ---
/// Pick the first volume from the record that appears in the healthy set.
fn pick_healthy_volume<'a>(
record_volumes: &'a [String],
healthy: &HashSet<String>,
) -> Option<&'a str> {
record_volumes
.iter()
.find(|v| healthy.contains(v.as_str()))
.map(|v| v.as_str())
}
/// Select target volumes for a key, ensuring we have enough for replication.
fn select_volumes(ring: &Ring, key: &str, replication: usize) -> Result<Vec<String>, AppError> {
let target_volumes = ring.get_volumes(key, replication);
if target_volumes.len() < replication {
return Err(AppError::VolumeError(format!(
"need {replication} volumes but only {} available",
target_volumes.len()
)));
}
Ok(target_volumes)
}
/// Evaluate fan-out results. Returns the list of volumes that succeeded,
/// or the error messages if any failed.
fn evaluate_fanout(
results: Vec<Result<(), String>>,
volumes: &[String],
) -> Result<Vec<String>, Vec<String>> {
let mut succeeded = Vec::new();
let mut errors = Vec::new();
for (i, result) in results.into_iter().enumerate() {
match result {
Ok(()) => succeeded.push(volumes[i].clone()),
Err(e) => errors.push(e),
}
}
if errors.is_empty() {
Ok(succeeded)
} else {
Err(errors)
}
}
// --- Handlers ---
/// GET /:key — look up key, redirect to a healthy volume
pub async fn get_key(
State(state): State<AppState>,
Path(key): Path<String>,
) -> Result<Response, AppError> {
let record = state
.reads
.query({
let key = key.clone();
move |conn| db::get(conn, &key)
})
.await?;
let healthy = state.healthy_volumes.read().await;
let vol = pick_healthy_volume(&record.volumes, &healthy).ok_or(AppError::NoHealthyVolume)?;
let record = state.db.get(&key).await?;
let vol = &record.volumes[0];
let location = format!("{vol}/{key}");
Ok((
StatusCode::FOUND,
[(axum::http::header::LOCATION, location)],
)
.into_response())
Ok((StatusCode::FOUND, [(axum::http::header::LOCATION, location)]).into_response())
}
/// PUT /:key — store blob on volumes, record in index
pub async fn put_key(
State(state): State<AppState>,
Path(key): Path<String>,
body: Bytes,
) -> Result<Response, AppError> {
let target_volumes = {
let ring = state.ring.read().await;
select_volumes(&ring, &key, state.config.server.replication_factor)?
};
let target_volumes = crate::hasher::volumes_for_key(&key, &state.volumes, state.replicas);
if target_volumes.len() < state.replicas {
return Err(AppError::VolumeError(format!(
"need {} volumes but only {} available",
state.replicas,
target_volumes.len()
)));
}
// Fan out PUTs to all target volumes concurrently
let mut handles = Vec::with_capacity(target_volumes.len());
for vol in &target_volumes {
let client = state.volume_client.clone();
let vol = vol.clone();
let key = key.clone();
let data = body.clone();
handles.push(tokio::spawn(async move {
client.put(&vol, &key, data).await
}));
let url = format!("{vol}/{key}");
let handle = tokio::spawn({
let client = state.http.clone();
let data = body.clone();
async move {
let resp = client.put(&url).body(data).send().await.map_err(|e| format!("PUT {url}: {e}"))?;
if !resp.status().is_success() {
return Err(format!("PUT {url}: status {}", resp.status()));
}
Ok(())
}
});
handles.push(handle);
}
let mut results = Vec::with_capacity(handles.len());
let mut all_ok = true;
for handle in handles {
results.push(handle.await.unwrap());
if let Err(e) = handle.await.unwrap() {
tracing::error!("PUT to volume failed: {e}");
all_ok = false;
}
}
match evaluate_fanout(results, &target_volumes) {
Ok(succeeded_volumes) => {
let size = Some(body.len() as i64);
state.writer.put(key, succeeded_volumes, size).await?;
Ok(StatusCode::CREATED.into_response())
}
Err(errors) => {
for e in &errors {
tracing::error!("PUT to volume failed: {e}");
}
// Rollback: best-effort delete from any volumes that may have succeeded
for vol in &target_volumes {
let _ = state.volume_client.delete(vol, &key).await;
}
Err(AppError::VolumeError(
"not all volume writes succeeded".into(),
))
if !all_ok {
// Rollback: best-effort delete from volumes
for vol in &target_volumes {
let _ = state.http.delete(format!("{vol}/{key}")).send().await;
}
return Err(AppError::VolumeError("not all volume writes succeeded".into()));
}
let size = Some(body.len() as i64);
state.db.put(key, target_volumes, size).await?;
Ok(StatusCode::CREATED.into_response())
}
/// DELETE /:key — remove from volumes and index
pub async fn delete_key(
State(state): State<AppState>,
Path(key): Path<String>,
) -> Result<Response, AppError> {
let record = state
.reads
.query({
let key = key.clone();
move |conn| db::get(conn, &key)
})
.await?;
let record = state.db.get(&key).await?;
// Fan out DELETEs concurrently
let mut handles = Vec::new();
for vol in &record.volumes {
let client = state.volume_client.clone();
let vol = vol.clone();
let key = key.clone();
handles.push(tokio::spawn(
async move { client.delete(&vol, &key).await },
));
let url = format!("{vol}/{key}");
let client = state.http.clone();
handles.push(tokio::spawn(async move { client.delete(&url).send().await }));
}
for handle in handles {
if let Err(e) = handle.await.unwrap() {
tracing::error!("DELETE from volume failed: {e}");
}
}
// Remove from index regardless of volume DELETE results
state.writer.delete(key).await?;
state.db.delete(key).await?;
Ok(StatusCode::NO_CONTENT.into_response())
}
/// HEAD /:key — check if key exists, return size
pub async fn head_key(
State(state): State<AppState>,
Path(key): Path<String>,
) -> Result<Response, AppError> {
let record = state
.reads
.query(move |conn| db::get(conn, &key))
.await?;
let record = state.db.get(&key).await?;
let mut headers = HeaderMap::new();
if let Some(size) = record.size {
headers.insert(
axum::http::header::CONTENT_LENGTH,
size.to_string().parse().unwrap(),
);
headers.insert(axum::http::header::CONTENT_LENGTH, size.to_string().parse().unwrap());
}
Ok((StatusCode::OK, headers).into_response())
}
@ -206,89 +118,27 @@ pub struct ListQuery {
pub prefix: String,
}
/// GET / — list keys with optional prefix filter
pub async fn list_keys(
State(state): State<AppState>,
Query(query): Query<ListQuery>,
) -> Result<Response, AppError> {
let keys = state
.reads
.query(move |conn| db::list_keys(conn, &query.prefix))
.await?;
let body = keys.join("\n");
Ok((StatusCode::OK, body).into_response())
let keys = state.db.list_keys(&query.prefix).await?;
Ok((StatusCode::OK, keys.join("\n")).into_response())
}
// --- Tests for pure functions ---
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pick_healthy_volume_first_match() {
let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()];
let healthy: HashSet<String> = ["http://vol2".into(), "http://vol3".into()].into();
assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol2"));
}
#[test]
fn test_pick_healthy_volume_none_healthy() {
let volumes = vec!["http://vol1".into(), "http://vol2".into()];
let healthy: HashSet<String> = HashSet::new();
assert_eq!(pick_healthy_volume(&volumes, &healthy), None);
}
#[test]
fn test_pick_healthy_volume_all_healthy() {
let volumes = vec!["http://vol1".into(), "http://vol2".into()];
let healthy: HashSet<String> = ["http://vol1".into(), "http://vol2".into()].into();
assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol1"));
}
#[test]
fn test_select_volumes_sufficient() {
fn test_volumes_for_key_sufficient() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let ring = Ring::new(&volumes, 50);
let selected = select_volumes(&ring, "test-key", 2).unwrap();
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
assert_eq!(selected.len(), 2);
}
#[test]
fn test_select_volumes_insufficient() {
fn test_volumes_for_key_insufficient() {
let volumes: Vec<String> = vec!["http://vol1".into()];
let ring = Ring::new(&volumes, 50);
assert!(select_volumes(&ring, "test-key", 2).is_err());
}
#[test]
fn test_evaluate_fanout_all_ok() {
let volumes = vec!["http://vol1".into(), "http://vol2".into()];
let results = vec![Ok(()), Ok(())];
assert_eq!(evaluate_fanout(results, &volumes).unwrap(), volumes);
}
#[test]
fn test_evaluate_fanout_partial_failure() {
let volumes = vec!["http://vol1".into(), "http://vol2".into()];
let results = vec![Ok(()), Err("connection refused".into())];
let errors = evaluate_fanout(results, &volumes).unwrap_err();
assert_eq!(errors, vec!["connection refused"]);
}
#[test]
fn test_evaluate_fanout_all_fail() {
let volumes = vec!["http://vol1".into(), "http://vol2".into()];
let results = vec![Err("err1".into()), Err("err2".into())];
assert_eq!(evaluate_fanout(results, &volumes).unwrap_err().len(), 2);
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
assert_eq!(selected.len(), 1);
}
}

View file

@ -1,88 +0,0 @@
use bytes::Bytes;
use std::time::Duration;
#[derive(Clone)]
pub struct VolumeClient {
client: reqwest::Client,
}
impl VolumeClient {
pub fn new() -> Self {
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(2))
.timeout(Duration::from_secs(30))
.pool_max_idle_per_host(10)
.build()
.expect("failed to build HTTP client");
Self { client }
}
/// PUT a blob to a volume server at /{key}.
pub async fn put(&self, volume_url: &str, key: &str, data: Bytes) -> Result<(), String> {
let url = format!("{volume_url}/{key}");
let resp = self
.client
.put(&url)
.body(data)
.send()
.await
.map_err(|e| format!("PUT {url}: {e}"))?;
if !resp.status().is_success() {
return Err(format!("PUT {url}: status {}", resp.status()));
}
Ok(())
}
/// GET a blob from a volume server.
pub async fn get(&self, volume_url: &str, key: &str) -> Result<Bytes, String> {
let url = format!("{volume_url}/{key}");
let resp = self
.client
.get(&url)
.send()
.await
.map_err(|e| format!("GET {url}: {e}"))?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Err(format!("GET {url}: not found"));
}
if !resp.status().is_success() {
return Err(format!("GET {url}: status {}", resp.status()));
}
resp.bytes()
.await
.map_err(|e| format!("GET {url} body: {e}"))
}
/// DELETE a blob from a volume server.
pub async fn delete(&self, volume_url: &str, key: &str) -> Result<(), String> {
let url = format!("{volume_url}/{key}");
let resp = self
.client
.delete(&url)
.send()
.await
.map_err(|e| format!("DELETE {url}: {e}"))?;
// 404 is fine — already gone
if !resp.status().is_success() && resp.status() != reqwest::StatusCode::NOT_FOUND {
return Err(format!("DELETE {url}: status {}", resp.status()));
}
Ok(())
}
/// Health check a volume server.
pub async fn check(&self, volume_url: &str) -> bool {
let url = format!("{volume_url}/");
self.client
.head(&url)
.timeout(Duration::from_secs(2))
.send()
.await
.is_ok_and(|r| r.status().is_success())
}
}

View file

@ -1,37 +1,35 @@
use reqwest::StatusCode;
use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering};
static TEST_COUNTER: AtomicU32 = AtomicU32::new(0);
/// Start the mkv server in-process on a random port with its own DB.
/// Returns the base URL (e.g. "http://localhost:12345").
async fn start_server() -> String {
let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
let db_path = format!("/tmp/mkv-test/index-{id}.db");
// Clean up any previous test database
let _ = std::fs::remove_file(&db_path);
let _ = std::fs::remove_file(format!("{db_path}-wal"));
let _ = std::fs::remove_file(format!("{db_path}-shm"));
let mut config =
mkv::config::Config::load(Path::new("tests/test_config.toml")).expect("load test config");
config.database.path = db_path;
let args = mkv::Args {
db_path,
volumes: vec![
"http://localhost:3101".into(),
"http://localhost:3102".into(),
"http://localhost:3103".into(),
],
replicas: 2,
};
// Bind to port 0 to get a random available port
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let app = mkv::build_app(config).await;
let app = mkv::build_app(&args);
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
// Give the server a moment to start
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
format!("http://127.0.0.1:{port}")
}
@ -47,26 +45,12 @@ async fn test_put_and_head() {
let base = start_server().await;
let c = client();
// PUT a key
let resp = c
.put(format!("{base}/hello"))
.body("world")
.send()
.await
.unwrap();
let resp = c.put(format!("{base}/hello")).body("world").send().await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
// HEAD should return 200 with content-length
let resp = c.head(format!("{base}/hello")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get("content-length")
.unwrap()
.to_str()
.unwrap(),
"5"
);
assert_eq!(resp.headers().get("content-length").unwrap().to_str().unwrap(), "5");
}
#[tokio::test]
@ -74,26 +58,15 @@ async fn test_put_and_get_redirect() {
let base = start_server().await;
let c = client();
// PUT
let resp = c
.put(format!("{base}/redirect-test"))
.body("some data")
.send()
.await
.unwrap();
let resp = c.put(format!("{base}/redirect-test")).body("some data").send().await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
// GET should return 302 with Location header pointing to a volume
let resp = c.get(format!("{base}/redirect-test")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::FOUND);
let location = resp.headers().get("location").unwrap().to_str().unwrap();
assert!(
location.starts_with("http://localhost:310"),
"location should point to a volume, got: {location}"
);
assert!(location.starts_with("http://localhost:310"), "got: {location}");
// Follow the redirect manually and verify the blob content
let blob_resp = reqwest::get(location).await.unwrap();
assert_eq!(blob_resp.status(), StatusCode::OK);
assert_eq!(blob_resp.text().await.unwrap(), "some data");
@ -103,7 +76,6 @@ async fn test_put_and_get_redirect() {
async fn test_get_nonexistent_returns_404() {
let base = start_server().await;
let c = client();
let resp = c.get(format!("{base}/does-not-exist")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
@ -113,24 +85,15 @@ async fn test_put_get_delete_get() {
let base = start_server().await;
let c = client();
// PUT
let resp = c
.put(format!("{base}/delete-me"))
.body("temporary")
.send()
.await
.unwrap();
let resp = c.put(format!("{base}/delete-me")).body("temporary").send().await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
// GET → 302
let resp = c.get(format!("{base}/delete-me")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::FOUND);
// DELETE → 204
let resp = c.delete(format!("{base}/delete-me")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
// GET after delete → 404
let resp = c.get(format!("{base}/delete-me")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
@ -139,12 +102,7 @@ async fn test_put_get_delete_get() {
async fn test_delete_nonexistent_returns_404() {
let base = start_server().await;
let c = client();
let resp = c
.delete(format!("{base}/never-existed"))
.send()
.await
.unwrap();
let resp = c.delete(format!("{base}/never-existed")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
@ -153,28 +111,17 @@ async fn test_list_keys() {
let base = start_server().await;
let c = client();
// PUT a few keys with a common prefix
for name in ["docs/a", "docs/b", "docs/c", "other/x"] {
c.put(format!("{base}/{name}"))
.body("data")
.send()
.await
.unwrap();
c.put(format!("{base}/{name}")).body("data").send().await.unwrap();
}
// List all
let resp = c.get(format!("{base}/")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.text().await.unwrap();
assert!(body.contains("docs/a"));
assert!(body.contains("other/x"));
// List with prefix filter
let resp = c
.get(format!("{base}/?prefix=docs/"))
.send()
.await
.unwrap();
let resp = c.get(format!("{base}/?prefix=docs/")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.text().await.unwrap();
let lines: Vec<&str> = body.lines().collect();
@ -187,34 +134,14 @@ async fn test_put_overwrite() {
let base = start_server().await;
let c = client();
// PUT v1
c.put(format!("{base}/overwrite"))
.body("version1")
.send()
.await
.unwrap();
c.put(format!("{base}/overwrite")).body("version1").send().await.unwrap();
// PUT v2 (same key)
let resp = c
.put(format!("{base}/overwrite"))
.body("version2")
.send()
.await
.unwrap();
let resp = c.put(format!("{base}/overwrite")).body("version2").send().await.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
// HEAD should reflect new size
let resp = c.head(format!("{base}/overwrite")).send().await.unwrap();
assert_eq!(
resp.headers()
.get("content-length")
.unwrap()
.to_str()
.unwrap(),
"8"
);
assert_eq!(resp.headers().get("content-length").unwrap().to_str().unwrap(), "8");
// Follow redirect, verify content is v2
let resp = c.get(format!("{base}/overwrite")).send().await.unwrap();
let location = resp.headers().get("location").unwrap().to_str().unwrap();
let body = reqwest::get(location).await.unwrap().text().await.unwrap();
@ -226,27 +153,12 @@ async fn test_replication_writes_to_multiple_volumes() {
let base = start_server().await;
let c = client();
// PUT a key (replication_factor=2 in test config)
c.put(format!("{base}/replicated"))
.body("replica-data")
.send()
.await
.unwrap();
c.put(format!("{base}/replicated")).body("replica-data").send().await.unwrap();
// HEAD to confirm it exists
let resp = c
.head(format!("{base}/replicated"))
.send()
.await
.unwrap();
let resp = c.head(format!("{base}/replicated")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
// GET and verify the blob is accessible
let resp = c
.get(format!("{base}/replicated"))
.send()
.await
.unwrap();
let resp = c.get(format!("{base}/replicated")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::FOUND);
let location = resp.headers().get("location").unwrap().to_str().unwrap();
let body = reqwest::get(location).await.unwrap().text().await.unwrap();