This commit is contained in:
Silas Brack 2026-03-07 17:27:54 +01:00
parent dc1f4bd19d
commit 2c66fa50d8
9 changed files with 1125 additions and 960 deletions

382
src/db.rs
View file

@ -1,182 +1,200 @@
use rusqlite::{params, Connection, OpenFlags}; use rusqlite::{Connection, OpenFlags, params};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crate::error::AppError; use crate::error::AppError;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Record { pub struct Record {
pub key: String, pub key: String,
pub volumes: Vec<String>, pub volumes: Vec<String>,
pub size: Option<i64>, pub size: Option<i64>,
} }
fn apply_pragmas(conn: &Connection) { fn apply_pragmas(conn: &Connection) {
conn.execute_batch( conn.execute_batch(
"PRAGMA journal_mode = WAL; "PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL; PRAGMA synchronous = NORMAL;
PRAGMA busy_timeout = 5000; PRAGMA busy_timeout = 5000;
PRAGMA temp_store = memory; PRAGMA temp_store = memory;
PRAGMA cache_size = -64000; PRAGMA cache_size = -64000;
PRAGMA mmap_size = 268435456;", PRAGMA mmap_size = 268435456;",
) )
.expect("failed to set pragmas"); .expect("failed to set pragmas");
} }
fn parse_volumes(s: &str) -> Vec<String> { fn parse_volumes(s: &str) -> Vec<String> {
serde_json::from_str(s).unwrap_or_default() serde_json::from_str(s).unwrap_or_default()
} }
fn encode_volumes(v: &[String]) -> String { fn encode_volumes(v: &[String]) -> String {
serde_json::to_string(v).unwrap() serde_json::to_string(v).unwrap()
} }
/// A single SQLite connection behind a mutex, used for both reads and writes. /// A single SQLite connection behind a mutex, used for both reads and writes.
#[derive(Clone)] #[derive(Clone)]
pub struct Db { pub struct Db {
conn: Arc<Mutex<Connection>>, conn: Arc<Mutex<Connection>>,
} }
impl Db { impl Db {
pub fn new(path: &str) -> Self { pub fn new(path: &str) -> Self {
let conn = Connection::open_with_flags( let conn = Connection::open_with_flags(
path, path,
OpenFlags::SQLITE_OPEN_READ_WRITE OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_NO_MUTEX
| OpenFlags::SQLITE_OPEN_URI, | OpenFlags::SQLITE_OPEN_URI,
) )
.expect("failed to open database"); .expect("failed to open database");
apply_pragmas(&conn); apply_pragmas(&conn);
conn.execute_batch( conn.execute_batch(
"CREATE TABLE IF NOT EXISTS kv ( "CREATE TABLE IF NOT EXISTS kv (
key TEXT PRIMARY KEY, key TEXT PRIMARY KEY,
volumes TEXT NOT NULL, volumes TEXT NOT NULL,
size INTEGER, size INTEGER,
created_at INTEGER DEFAULT (unixepoch()) created_at INTEGER DEFAULT (unixepoch())
);", );",
) )
.expect("failed to create tables"); .expect("failed to create tables");
Self { conn: Arc::new(Mutex::new(conn)) } Self {
} conn: Arc::new(Mutex::new(conn)),
}
pub async fn get(&self, key: &str) -> Result<Record, AppError> { }
let conn = self.conn.clone();
let key = key.to_string(); pub async fn get(&self, key: &str) -> Result<Record, AppError> {
tokio::task::spawn_blocking(move || { let conn = self.conn.clone();
let conn = conn.lock().unwrap(); let key = key.to_string();
let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv WHERE key = ?1")?; tokio::task::spawn_blocking(move || {
Ok(stmt.query_row(params![key], |row| { let conn = conn.lock().unwrap();
let vj: String = row.get(1)?; let mut stmt =
Ok(Record { key: row.get(0)?, volumes: parse_volumes(&vj), size: row.get(2)? }) conn.prepare_cached("SELECT key, volumes, size FROM kv WHERE key = ?1")?;
})?) Ok(stmt.query_row(params![key], |row| {
}) let vj: String = row.get(1)?;
.await Ok(Record {
.unwrap() key: row.get(0)?,
} volumes: parse_volumes(&vj),
size: row.get(2)?,
pub async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, AppError> { })
let conn = self.conn.clone(); })?)
let prefix = prefix.to_string(); })
tokio::task::spawn_blocking(move || { .await
let conn = conn.lock().unwrap(); .unwrap()
if prefix.is_empty() { }
let mut stmt = conn.prepare_cached("SELECT key FROM kv ORDER BY key")?;
let keys = stmt pub async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, AppError> {
.query_map([], |row| row.get(0))? let conn = self.conn.clone();
.collect::<Result<Vec<String>, _>>()?; let prefix = prefix.to_string();
return Ok(keys); tokio::task::spawn_blocking(move || {
} let conn = conn.lock().unwrap();
// Compute exclusive upper bound: increment last non-0xFF byte if prefix.is_empty() {
let upper = { let mut stmt = conn.prepare_cached("SELECT key FROM kv ORDER BY key")?;
let mut bytes = prefix.as_bytes().to_vec(); let keys = stmt
let mut result = None; .query_map([], |row| row.get(0))?
while let Some(last) = bytes.pop() { .collect::<Result<Vec<String>, _>>()?;
if last < 0xFF { return Ok(keys);
bytes.push(last + 1); }
result = Some(String::from_utf8_lossy(&bytes).into_owned()); // Compute exclusive upper bound: increment last non-0xFF byte
break; let upper = {
} let mut bytes = prefix.as_bytes().to_vec();
} let mut result = None;
result while let Some(last) = bytes.pop() {
}; if last < 0xFF {
let keys = match &upper { bytes.push(last + 1);
Some(end) => { result = Some(String::from_utf8_lossy(&bytes).into_owned());
let mut stmt = conn.prepare_cached( break;
"SELECT key FROM kv WHERE key >= ?1 AND key < ?2 ORDER BY key", }
)?; }
stmt.query_map(params![prefix, end], |row| row.get(0))? result
.collect::<Result<Vec<String>, _>>()? };
} let keys = match &upper {
None => { Some(end) => {
let mut stmt = conn.prepare_cached( let mut stmt = conn.prepare_cached(
"SELECT key FROM kv WHERE key >= ?1 ORDER BY key", "SELECT key FROM kv WHERE key >= ?1 AND key < ?2 ORDER BY key",
)?; )?;
stmt.query_map(params![prefix], |row| row.get(0))? stmt.query_map(params![prefix, end], |row| row.get(0))?
.collect::<Result<Vec<String>, _>>()? .collect::<Result<Vec<String>, _>>()?
} }
}; None => {
Ok(keys) let mut stmt =
}) conn.prepare_cached("SELECT key FROM kv WHERE key >= ?1 ORDER BY key")?;
.await stmt.query_map(params![prefix], |row| row.get(0))?
.unwrap() .collect::<Result<Vec<String>, _>>()?
} }
};
pub async fn put(&self, key: String, volumes: Vec<String>, size: Option<i64>) -> Result<(), AppError> { Ok(keys)
let conn = self.conn.clone(); })
tokio::task::spawn_blocking(move || { .await
let conn = conn.lock().unwrap(); .unwrap()
conn.prepare_cached( }
"INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3)
ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", pub async fn put(
)? &self,
.execute(params![key, encode_volumes(&volumes), size])?; key: String,
Ok(()) volumes: Vec<String>,
}) size: Option<i64>,
.await ) -> Result<(), AppError> {
.unwrap() let conn = self.conn.clone();
} tokio::task::spawn_blocking(move || {
let conn = conn.lock().unwrap();
pub async fn delete(&self, key: String) -> Result<(), AppError> { conn.prepare_cached(
let conn = self.conn.clone(); "INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3)
tokio::task::spawn_blocking(move || { ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3",
let conn = conn.lock().unwrap(); )?
conn.prepare_cached("DELETE FROM kv WHERE key = ?1")? .execute(params![key, encode_volumes(&volumes), size])?;
.execute(params![key])?; Ok(())
Ok(()) })
}) .await
.await .unwrap()
.unwrap() }
}
pub async fn delete(&self, key: String) -> Result<(), AppError> {
pub async fn bulk_put(&self, records: Vec<(String, Vec<String>, Option<i64>)>) -> Result<(), AppError> { let conn = self.conn.clone();
let conn = self.conn.clone(); tokio::task::spawn_blocking(move || {
tokio::task::spawn_blocking(move || { let conn = conn.lock().unwrap();
let conn = conn.lock().unwrap(); conn.prepare_cached("DELETE FROM kv WHERE key = ?1")?
conn.execute_batch("BEGIN")?; .execute(params![key])?;
let mut stmt = conn.prepare_cached( Ok(())
"INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3) })
ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", .await
)?; .unwrap()
for (key, volumes, size) in &records { }
stmt.execute(params![key, encode_volumes(volumes), size])?;
} pub async fn bulk_put(
drop(stmt); &self,
conn.execute_batch("COMMIT")?; records: Vec<(String, Vec<String>, Option<i64>)>,
Ok(()) ) -> Result<(), AppError> {
}) let conn = self.conn.clone();
.await tokio::task::spawn_blocking(move || {
.unwrap() let conn = conn.lock().unwrap();
} conn.execute_batch("BEGIN")?;
let mut stmt = conn.prepare_cached(
pub fn all_records_sync(&self) -> Result<Vec<Record>, AppError> { "INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3)
let conn = self.conn.lock().unwrap(); ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3",
let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv")?; )?;
let records = stmt for (key, volumes, size) in &records {
.query_map([], |row| { stmt.execute(params![key, encode_volumes(volumes), size])?;
let vj: String = row.get(1)?; }
Ok(Record { key: row.get(0)?, volumes: parse_volumes(&vj), size: row.get(2)? }) drop(stmt);
})? conn.execute_batch("COMMIT")?;
.collect::<Result<Vec<_>, _>>()?; Ok(())
Ok(records) })
} .await
} .unwrap()
}
pub fn all_records_sync(&self) -> Result<Vec<Record>, AppError> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv")?;
let records = stmt
.query_map([], |row| {
let vj: String = row.get(1)?;
Ok(Record {
key: row.get(0)?,
volumes: parse_volumes(&vj),
size: row.get(2)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
}

View file

@ -1,70 +1,76 @@
use axum::http::StatusCode; use axum::http::StatusCode;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
/// Errors from individual volume HTTP requests — used for logging, not HTTP responses. /// Errors from individual volume HTTP requests — used for logging, not HTTP responses.
#[derive(Debug)] #[derive(Debug)]
pub enum VolumeError { pub enum VolumeError {
Request { url: String, source: reqwest::Error }, Request {
BadStatus { url: String, status: reqwest::StatusCode }, url: String,
} source: reqwest::Error,
},
impl std::fmt::Display for VolumeError { BadStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { url: String,
match self { status: reqwest::StatusCode,
VolumeError::Request { url, source } => { },
write!(f, "volume request to {url} failed: {source}") }
}
VolumeError::BadStatus { url, status } => { impl std::fmt::Display for VolumeError {
write!(f, "volume {url} returned status {status}") fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
} match self {
} VolumeError::Request { url, source } => {
} write!(f, "volume request to {url} failed: {source}")
} }
VolumeError::BadStatus { url, status } => {
/// Application-level errors that map to HTTP responses. write!(f, "volume {url} returned status {status}")
#[derive(Debug)] }
pub enum AppError { }
NotFound, }
CorruptRecord { key: String }, }
Db(rusqlite::Error),
InsufficientVolumes { need: usize, have: usize }, /// Application-level errors that map to HTTP responses.
PartialWrite, #[derive(Debug)]
} pub enum AppError {
NotFound,
impl From<rusqlite::Error> for AppError { CorruptRecord { key: String },
fn from(e: rusqlite::Error) -> Self { Db(rusqlite::Error),
match e { InsufficientVolumes { need: usize, have: usize },
rusqlite::Error::QueryReturnedNoRows => AppError::NotFound, PartialWrite,
other => AppError::Db(other), }
}
} impl From<rusqlite::Error> for AppError {
} fn from(e: rusqlite::Error) -> Self {
match e {
impl std::fmt::Display for AppError { rusqlite::Error::QueryReturnedNoRows => AppError::NotFound,
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { other => AppError::Db(other),
match self { }
AppError::NotFound => write!(f, "not found"), }
AppError::CorruptRecord { key } => { }
write!(f, "corrupt record for key {key}: no volumes")
} impl std::fmt::Display for AppError {
AppError::Db(e) => write!(f, "database error: {e}"), fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
AppError::InsufficientVolumes { need, have } => { match self {
write!(f, "need {need} volumes but only {have} available") AppError::NotFound => write!(f, "not found"),
} AppError::CorruptRecord { key } => {
AppError::PartialWrite => write!(f, "not all volume writes succeeded"), write!(f, "corrupt record for key {key}: no volumes")
} }
} AppError::Db(e) => write!(f, "database error: {e}"),
} AppError::InsufficientVolumes { need, have } => {
write!(f, "need {need} volumes but only {have} available")
impl IntoResponse for AppError { }
fn into_response(self) -> Response { AppError::PartialWrite => write!(f, "not all volume writes succeeded"),
let status = match &self { }
AppError::NotFound => StatusCode::NOT_FOUND, }
AppError::CorruptRecord { .. } => StatusCode::INTERNAL_SERVER_ERROR, }
AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR,
AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE, impl IntoResponse for AppError {
AppError::PartialWrite => StatusCode::BAD_GATEWAY, fn into_response(self) -> Response {
}; let status = match &self {
(status, self.to_string()).into_response() AppError::NotFound => StatusCode::NOT_FOUND,
} AppError::CorruptRecord { .. } => StatusCode::INTERNAL_SERVER_ERROR,
} AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR,
AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE,
AppError::PartialWrite => StatusCode::BAD_GATEWAY,
};
(status, self.to_string()).into_response()
}
}

View file

@ -1,81 +1,85 @@
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
/// Pick `count` volumes for a key by hashing key+volume, sorting by score. /// Pick `count` volumes for a key by hashing key+volume, sorting by score.
/// Same idea as minikeyvalue's key2volume — stable in volume name, not position. /// Same idea as minikeyvalue's key2volume — stable in volume name, not position.
pub fn volumes_for_key(key: &str, volumes: &[String], count: usize) -> Vec<String> { pub fn volumes_for_key(key: &str, volumes: &[String], count: usize) -> Vec<String> {
let mut scored: Vec<(u64, &String)> = volumes let mut scored: Vec<(u64, &String)> = volumes
.iter() .iter()
.map(|v| { .map(|v| {
let hash = Sha256::digest(format!("{key}:{v}").as_bytes()); let hash = Sha256::digest(format!("{key}:{v}").as_bytes());
let score = u64::from_be_bytes(hash[..8].try_into().unwrap()); let score = u64::from_be_bytes(hash[..8].try_into().unwrap());
(score, v) (score, v)
}) })
.collect(); .collect();
scored.sort_by_key(|(score, _)| *score); scored.sort_by_key(|(score, _)| *score);
scored.into_iter().take(count).map(|(_, v)| v.clone()).collect() scored
} .into_iter()
.take(count)
#[cfg(test)] .map(|(_, v)| v.clone())
mod tests { .collect()
use super::*; }
#[test] #[cfg(test)]
fn test_deterministic() { mod tests {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect(); use super::*;
let a = volumes_for_key("my-key", &volumes, 2);
let b = volumes_for_key("my-key", &volumes, 2); #[test]
assert_eq!(a, b); fn test_deterministic() {
} let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let a = volumes_for_key("my-key", &volumes, 2);
#[test] let b = volumes_for_key("my-key", &volumes, 2);
fn test_count_capped() { assert_eq!(a, b);
let volumes: Vec<String> = (1..=2).map(|i| format!("http://vol{i}")).collect(); }
let selected = volumes_for_key("key", &volumes, 5);
assert_eq!(selected.len(), 2); #[test]
} fn test_count_capped() {
let volumes: Vec<String> = (1..=2).map(|i| format!("http://vol{i}")).collect();
#[test] let selected = volumes_for_key("key", &volumes, 5);
fn test_even_distribution() { assert_eq!(selected.len(), 2);
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect(); }
let mut counts = std::collections::HashMap::new();
for i in 0..3000 { #[test]
let key = format!("key-{i}"); fn test_even_distribution() {
let primary = &volumes_for_key(&key, &volumes, 1)[0]; let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
*counts.entry(primary.clone()).or_insert(0u32) += 1; let mut counts = std::collections::HashMap::new();
} for i in 0..3000 {
for (vol, count) in &counts { let key = format!("key-{i}");
assert!( let primary = &volumes_for_key(&key, &volumes, 1)[0];
*count > 700 && *count < 1300, *counts.entry(primary.clone()).or_insert(0u32) += 1;
"volume {vol} got {count} keys, expected ~1000" }
); for (vol, count) in &counts {
} assert!(
} *count > 700 && *count < 1300,
"volume {vol} got {count} keys, expected ~1000"
#[test] );
fn test_stability_on_add() { }
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect(); }
let mut volumes4 = volumes.clone();
volumes4.push("http://vol4".into()); #[test]
fn test_stability_on_add() {
let total = 10000; let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let mut moved = 0; let mut volumes4 = volumes.clone();
for i in 0..total { volumes4.push("http://vol4".into());
let key = format!("key-{i}");
let before = &volumes_for_key(&key, &volumes, 1)[0]; let total = 10000;
let after = &volumes_for_key(&key, &volumes4, 1)[0]; let mut moved = 0;
if before != after { for i in 0..total {
moved += 1; let key = format!("key-{i}");
} let before = &volumes_for_key(&key, &volumes, 1)[0];
} let after = &volumes_for_key(&key, &volumes4, 1)[0];
let pct = moved as f64 / total as f64 * 100.0; if before != after {
assert!( moved += 1;
pct > 15.0 && pct < 40.0, }
"expected ~25% of keys to move, got {pct:.1}%" }
); let pct = moved as f64 / total as f64 * 100.0;
} assert!(
pct > 15.0 && pct < 40.0,
#[test] "expected ~25% of keys to move, got {pct:.1}%"
fn test_empty() { );
assert_eq!(volumes_for_key("key", &[], 1), Vec::<String>::new()); }
}
} #[test]
fn test_empty() {
assert_eq!(volumes_for_key("key", &[], 1), Vec::<String>::new());
}
}

View file

@ -1,53 +1,53 @@
pub mod db; pub mod db;
pub mod error; pub mod error;
pub mod hasher; pub mod hasher;
pub mod server; pub mod rebalance;
pub mod rebalance; pub mod rebuild;
pub mod rebuild; pub mod server;
use std::sync::Arc; use std::sync::Arc;
const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB
pub struct Args { pub struct Args {
pub db_path: String, pub db_path: String,
pub volumes: Vec<String>, pub volumes: Vec<String>,
pub replicas: usize, pub replicas: usize,
} }
pub fn build_app(args: &Args) -> axum::Router { pub fn build_app(args: &Args) -> axum::Router {
if args.replicas > args.volumes.len() { if args.replicas > args.volumes.len() {
eprintln!( eprintln!(
"Error: replication factor ({}) exceeds number of volumes ({})", "Error: replication factor ({}) exceeds number of volumes ({})",
args.replicas, args.replicas,
args.volumes.len() args.volumes.len()
); );
std::process::exit(1); std::process::exit(1);
} }
if let Some(parent) = std::path::Path::new(&args.db_path).parent() { if let Some(parent) = std::path::Path::new(&args.db_path).parent() {
std::fs::create_dir_all(parent).unwrap_or_else(|e| { std::fs::create_dir_all(parent).unwrap_or_else(|e| {
eprintln!("Failed to create database directory: {e}"); eprintln!("Failed to create database directory: {e}");
std::process::exit(1); std::process::exit(1);
}); });
} }
let state = server::AppState { let state = server::AppState {
db: db::Db::new(&args.db_path), db: db::Db::new(&args.db_path),
volumes: Arc::new(args.volumes.clone()), volumes: Arc::new(args.volumes.clone()),
replicas: args.replicas, replicas: args.replicas,
http: reqwest::Client::new(), http: reqwest::Client::new(),
}; };
axum::Router::new() axum::Router::new()
.route("/", axum::routing::get(server::list_keys)) .route("/", axum::routing::get(server::list_keys))
.route( .route(
"/{*key}", "/{*key}",
axum::routing::get(server::get_key) axum::routing::get(server::get_key)
.put(server::put_key) .put(server::put_key)
.delete(server::delete_key) .delete(server::delete_key)
.head(server::head_key), .head(server::head_key),
) )
.layer(axum::extract::DefaultBodyLimit::max(DEFAULT_BODY_LIMIT)) .layer(axum::extract::DefaultBodyLimit::max(DEFAULT_BODY_LIMIT))
.with_state(state) .with_state(state)
} }

View file

@ -6,7 +6,13 @@ struct Cli {
#[arg(short, long, env = "MKV_DB", default_value = "/tmp/mkv/index.db")] #[arg(short, long, env = "MKV_DB", default_value = "/tmp/mkv/index.db")]
db: String, db: String,
#[arg(short, long, env = "MKV_VOLUMES", required = true, value_delimiter = ',')] #[arg(
short,
long,
env = "MKV_VOLUMES",
required = true,
value_delimiter = ','
)]
volumes: Vec<String>, volumes: Vec<String>,
#[arg(short, long, env = "MKV_REPLICAS", default_value_t = 2)] #[arg(short, long, env = "MKV_REPLICAS", default_value_t = 2)]
@ -36,9 +42,8 @@ async fn shutdown_signal() {
let ctrl_c = tokio::signal::ctrl_c(); let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)] #[cfg(unix)]
{ {
let mut sigterm = let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) .expect("failed to install SIGTERM handler");
.expect("failed to install SIGTERM handler");
tokio::select! { tokio::select! {
_ = ctrl_c => tracing::info!("Received SIGINT, shutting down..."), _ = ctrl_c => tracing::info!("Received SIGINT, shutting down..."),
_ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down..."), _ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down..."),

View file

@ -1,159 +1,194 @@
use crate::db; use crate::Args;
use crate::Args; use crate::db;
pub struct KeyMove { pub struct KeyMove {
pub key: String, pub key: String,
pub size: Option<i64>, pub size: Option<i64>,
pub current_volumes: Vec<String>, pub current_volumes: Vec<String>,
pub desired_volumes: Vec<String>, pub desired_volumes: Vec<String>,
pub to_add: Vec<String>, pub to_add: Vec<String>,
pub to_remove: Vec<String>, pub to_remove: Vec<String>,
} }
pub fn plan_rebalance(records: &[db::Record], volumes: &[String], replication: usize) -> Vec<KeyMove> { pub fn plan_rebalance(
let mut moves = Vec::new(); records: &[db::Record],
for record in records { volumes: &[String],
let desired = crate::hasher::volumes_for_key(&record.key, volumes, replication); replication: usize,
let to_add: Vec<String> = desired.iter().filter(|v| !record.volumes.contains(v)).cloned().collect(); ) -> Vec<KeyMove> {
let to_remove: Vec<String> = record.volumes.iter().filter(|v| !desired.contains(v)).cloned().collect(); let mut moves = Vec::new();
for record in records {
if !to_add.is_empty() || !to_remove.is_empty() { let desired = crate::hasher::volumes_for_key(&record.key, volumes, replication);
moves.push(KeyMove { let to_add: Vec<String> = desired
key: record.key.clone(), .iter()
size: record.size, .filter(|v| !record.volumes.contains(v))
current_volumes: record.volumes.clone(), .cloned()
desired_volumes: desired, .collect();
to_add, let to_remove: Vec<String> = record
to_remove, .volumes
}); .iter()
} .filter(|v| !desired.contains(v))
} .cloned()
moves .collect();
}
if !to_add.is_empty() || !to_remove.is_empty() {
pub async fn run(args: &Args, dry_run: bool) { moves.push(KeyMove {
let db = db::Db::new(&args.db_path); key: record.key.clone(),
let records = db.all_records_sync().expect("failed to read records"); size: record.size,
let moves = plan_rebalance(&records, &args.volumes, args.replicas); current_volumes: record.volumes.clone(),
desired_volumes: desired,
if moves.is_empty() { to_add,
eprintln!("Nothing to rebalance — all keys are already correctly placed."); to_remove,
return; });
} }
}
let total_bytes: i64 = moves.iter().filter_map(|m| m.size).sum(); moves
eprintln!("{} keys to move ({} bytes)", moves.len(), total_bytes); }
if dry_run { pub async fn run(args: &Args, dry_run: bool) {
for m in &moves { let db = db::Db::new(&args.db_path);
eprintln!(" {} : add {:?}, remove {:?}", m.key, m.to_add, m.to_remove); let records = db.all_records_sync().expect("failed to read records");
} let moves = plan_rebalance(&records, &args.volumes, args.replicas);
return;
} if moves.is_empty() {
eprintln!("Nothing to rebalance — all keys are already correctly placed.");
let client = reqwest::Client::new(); return;
let mut moved = 0; }
let mut errors = 0;
let total_bytes: i64 = moves.iter().filter_map(|m| m.size).sum();
for m in &moves { eprintln!("{} keys to move ({} bytes)", moves.len(), total_bytes);
let Some(src) = m.current_volumes.first() else {
eprintln!(" SKIP {} : no source volume", m.key); if dry_run {
errors += 1; for m in &moves {
continue; eprintln!(" {} : add {:?}, remove {:?}", m.key, m.to_add, m.to_remove);
}; }
let mut copy_ok = true; return;
}
for dst in &m.to_add {
let src_url = format!("{src}/{}", m.key); let client = reqwest::Client::new();
match client.get(&src_url).send().await { let mut moved = 0;
Ok(resp) if resp.status().is_success() => { let mut errors = 0;
let data = match resp.bytes().await {
Ok(b) => b, for m in &moves {
Err(e) => { let Some(src) = m.current_volumes.first() else {
eprintln!(" ERROR read body {} from {}: {}", m.key, src, e); eprintln!(" SKIP {} : no source volume", m.key);
copy_ok = false; errors += 1;
errors += 1; continue;
break; };
} let mut copy_ok = true;
};
let dst_url = format!("{dst}/{}", m.key); for dst in &m.to_add {
match client.put(&dst_url).body(data).send().await { let src_url = format!("{src}/{}", m.key);
Ok(resp) if !resp.status().is_success() => { match client.get(&src_url).send().await {
eprintln!(" ERROR copy {} to {}: status {}", m.key, dst, resp.status()); Ok(resp) if resp.status().is_success() => {
copy_ok = false; let data = match resp.bytes().await {
errors += 1; Ok(b) => b,
} Err(e) => {
Err(e) => { eprintln!(" ERROR read body {} from {}: {}", m.key, src, e);
eprintln!(" ERROR copy {} to {}: {}", m.key, dst, e); copy_ok = false;
copy_ok = false; errors += 1;
errors += 1; break;
} }
Ok(_) => {} };
} let dst_url = format!("{dst}/{}", m.key);
} match client.put(&dst_url).body(data).send().await {
Ok(resp) => { Ok(resp) if !resp.status().is_success() => {
eprintln!(" ERROR read {} from {}: status {}", m.key, src, resp.status()); eprintln!(
copy_ok = false; " ERROR copy {} to {}: status {}",
errors += 1; m.key,
} dst,
Err(e) => { resp.status()
eprintln!(" ERROR read {} from {}: {}", m.key, src, e); );
copy_ok = false; copy_ok = false;
errors += 1; errors += 1;
} }
} Err(e) => {
} eprintln!(" ERROR copy {} to {}: {}", m.key, dst, e);
copy_ok = false;
if !copy_ok { continue; } errors += 1;
}
db.put(m.key.clone(), m.desired_volumes.clone(), m.size).await.expect("failed to update index"); Ok(_) => {}
}
for old in &m.to_remove { }
let url = format!("{old}/{}", m.key); Ok(resp) => {
if let Err(e) = client.delete(&url).send().await { eprintln!(
eprintln!(" WARN delete {} from {}: {}", m.key, old, e); " ERROR read {} from {}: status {}",
} m.key,
} src,
moved += 1; resp.status()
} );
copy_ok = false;
eprintln!("Rebalanced {moved}/{} keys ({errors} errors)", moves.len()); errors += 1;
} }
Err(e) => {
#[cfg(test)] eprintln!(" ERROR read {} from {}: {}", m.key, src, e);
mod tests { copy_ok = false;
use super::*; errors += 1;
}
#[test] }
fn test_plan_rebalance_no_change() { }
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let records: Vec<db::Record> = (0..100) if !copy_ok {
.map(|i| { continue;
let key = format!("key-{i}"); }
let vols = crate::hasher::volumes_for_key(&key, &volumes, 2);
db::Record { key, volumes: vols, size: Some(100) } db.put(m.key.clone(), m.desired_volumes.clone(), m.size)
}) .await
.collect(); .expect("failed to update index");
let moves = plan_rebalance(&records, &volumes, 2); for old in &m.to_remove {
assert!(moves.is_empty()); let url = format!("{old}/{}", m.key);
} if let Err(e) = client.delete(&url).send().await {
eprintln!(" WARN delete {} from {}: {}", m.key, old, e);
#[test] }
fn test_plan_rebalance_new_volume() { }
let volumes3: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect(); moved += 1;
let records: Vec<db::Record> = (0..1000) }
.map(|i| {
let key = format!("key-{i}"); eprintln!("Rebalanced {moved}/{} keys ({errors} errors)", moves.len());
let vols = crate::hasher::volumes_for_key(&key, &volumes3, 2); }
db::Record { key, volumes: vols, size: Some(100) }
}) #[cfg(test)]
.collect(); mod tests {
use super::*;
let volumes4: Vec<String> = (1..=4).map(|i| format!("http://vol{i}")).collect();
let moves = plan_rebalance(&records, &volumes4, 2); #[test]
fn test_plan_rebalance_no_change() {
assert!(!moves.is_empty()); let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
assert!(moves.len() < 800, "too many moves: {}", moves.len()); let records: Vec<db::Record> = (0..100)
} .map(|i| {
} let key = format!("key-{i}");
let vols = crate::hasher::volumes_for_key(&key, &volumes, 2);
db::Record {
key,
volumes: vols,
size: Some(100),
}
})
.collect();
let moves = plan_rebalance(&records, &volumes, 2);
assert!(moves.is_empty());
}
#[test]
fn test_plan_rebalance_new_volume() {
let volumes3: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let records: Vec<db::Record> = (0..1000)
.map(|i| {
let key = format!("key-{i}");
let vols = crate::hasher::volumes_for_key(&key, &volumes3, 2);
db::Record {
key,
volumes: vols,
size: Some(100),
}
})
.collect();
let volumes4: Vec<String> = (1..=4).map(|i| format!("http://vol{i}")).collect();
let moves = plan_rebalance(&records, &volumes4, 2);
assert!(!moves.is_empty());
assert!(moves.len() < 800, "too many moves: {}", moves.len());
}
}

View file

@ -1,72 +1,86 @@
use std::collections::HashMap; use std::collections::HashMap;
use crate::db; use crate::Args;
use crate::Args; use crate::db;
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
struct NginxEntry { struct NginxEntry {
name: String, name: String,
#[serde(rename = "type")] #[serde(rename = "type")]
entry_type: String, entry_type: String,
#[serde(default)] #[serde(default)]
size: Option<i64>, size: Option<i64>,
} }
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> { async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
let http = reqwest::Client::new(); let http = reqwest::Client::new();
let mut keys = Vec::new(); let mut keys = Vec::new();
let mut dirs = vec![String::new()]; let mut dirs = vec![String::new()];
while let Some(prefix) = dirs.pop() { while let Some(prefix) = dirs.pop() {
let url = format!("{volume_url}/{prefix}"); let url = format!("{volume_url}/{prefix}");
let resp = http.get(&url).send().await.map_err(|e| format!("GET {url}: {e}"))?; let resp = http
if !resp.status().is_success() { .get(&url)
return Err(format!("GET {url}: status {}", resp.status())); .send()
} .await
let entries: Vec<NginxEntry> = resp.json().await.map_err(|e| format!("parse {url}: {e}"))?; .map_err(|e| format!("GET {url}: {e}"))?;
for entry in entries { if !resp.status().is_success() {
let full_path = if prefix.is_empty() { entry.name.clone() } else { format!("{prefix}{}", entry.name) }; return Err(format!("GET {url}: status {}", resp.status()));
match entry.entry_type.as_str() { }
"directory" => dirs.push(format!("{full_path}/")), let entries: Vec<NginxEntry> =
"file" => keys.push((full_path, entry.size.unwrap_or(0))), resp.json().await.map_err(|e| format!("parse {url}: {e}"))?;
_ => {} for entry in entries {
} let full_path = if prefix.is_empty() {
} entry.name.clone()
} } else {
Ok(keys) format!("{prefix}{}", entry.name)
} };
match entry.entry_type.as_str() {
pub async fn run(args: &Args) { "directory" => dirs.push(format!("{full_path}/")),
let db_path = &args.db_path; "file" => keys.push((full_path, entry.size.unwrap_or(0))),
_ => {}
if let Some(parent) = std::path::Path::new(db_path).parent() { }
let _ = std::fs::create_dir_all(parent); }
} }
Ok(keys)
let _ = std::fs::remove_file(db_path); }
let _ = std::fs::remove_file(format!("{db_path}-wal"));
let _ = std::fs::remove_file(format!("{db_path}-shm")); pub async fn run(args: &Args) {
let db_path = &args.db_path;
let db = db::Db::new(db_path);
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new(); if let Some(parent) = std::path::Path::new(db_path).parent() {
let _ = std::fs::create_dir_all(parent);
for vol_url in &args.volumes { }
eprintln!("Scanning {vol_url}...");
match list_volume_keys(vol_url).await { let _ = std::fs::remove_file(db_path);
Ok(keys) => { let _ = std::fs::remove_file(format!("{db_path}-wal"));
eprintln!(" Found {} keys", keys.len()); let _ = std::fs::remove_file(format!("{db_path}-shm"));
for (key, size) in keys {
let entry = index.entry(key).or_insert_with(|| (Vec::new(), size)); let db = db::Db::new(db_path);
entry.0.push(vol_url.clone()); let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
if size > entry.1 { entry.1 = size; }
} for vol_url in &args.volumes {
} eprintln!("Scanning {vol_url}...");
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"), match list_volume_keys(vol_url).await {
} Ok(keys) => {
} eprintln!(" Found {} keys", keys.len());
for (key, size) in keys {
let records: Vec<_> = index.into_iter().map(|(k, (v, s))| (k, v, Some(s))).collect(); let entry = index.entry(key).or_insert_with(|| (Vec::new(), size));
let count = records.len(); entry.0.push(vol_url.clone());
db.bulk_put(records).await.expect("bulk_put failed"); if size > entry.1 {
eprintln!("Rebuilt index with {count} keys"); entry.1 = size;
} }
}
}
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"),
}
}
let records: Vec<_> = index
.into_iter()
.map(|(k, (v, s))| (k, v, Some(s)))
.collect();
let count = records.len();
db.bulk_put(records).await.expect("bulk_put failed");
eprintln!("Rebuilt index with {count} keys");
}

View file

@ -1,173 +1,199 @@
use axum::body::Bytes; use axum::body::Bytes;
use axum::extract::{Path, Query, State}; use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode}; use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use std::sync::Arc; use std::sync::Arc;
use crate::db; use crate::db;
use crate::error::{AppError, VolumeError}; use crate::error::{AppError, VolumeError};
#[derive(Clone)] #[derive(Clone)]
pub struct AppState { pub struct AppState {
pub db: db::Db, pub db: db::Db,
pub volumes: Arc<Vec<String>>, pub volumes: Arc<Vec<String>>,
pub replicas: usize, pub replicas: usize,
pub http: reqwest::Client, pub http: reqwest::Client,
} }
pub async fn get_key( pub async fn get_key(
State(state): State<AppState>, State(state): State<AppState>,
Path(key): Path<String>, Path(key): Path<String>,
) -> Result<Response, AppError> { ) -> Result<Response, AppError> {
let record = state.db.get(&key).await?; let record = state.db.get(&key).await?;
let vol = record let vol = record
.volumes .volumes
.first() .first()
.ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?; .ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?;
let location = format!("{vol}/{key}"); let location = format!("{vol}/{key}");
Ok((StatusCode::FOUND, [(axum::http::header::LOCATION, location)]).into_response()) Ok((
} StatusCode::FOUND,
[(axum::http::header::LOCATION, location)],
pub async fn put_key( )
State(state): State<AppState>, .into_response())
Path(key): Path<String>, }
body: Bytes,
) -> Result<Response, AppError> { pub async fn put_key(
let target_volumes = crate::hasher::volumes_for_key(&key, &state.volumes, state.replicas); State(state): State<AppState>,
if target_volumes.len() < state.replicas { Path(key): Path<String>,
return Err(AppError::InsufficientVolumes { body: Bytes,
need: state.replicas, ) -> Result<Response, AppError> {
have: target_volumes.len(), let target_volumes = crate::hasher::volumes_for_key(&key, &state.volumes, state.replicas);
}); if target_volumes.len() < state.replicas {
} return Err(AppError::InsufficientVolumes {
need: state.replicas,
// Fan out PUTs to all target volumes concurrently have: target_volumes.len(),
let mut handles = Vec::with_capacity(target_volumes.len()); });
for vol in &target_volumes { }
let url = format!("{vol}/{key}");
let handle = tokio::spawn({ // Fan out PUTs to all target volumes concurrently
let client = state.http.clone(); let mut handles = Vec::with_capacity(target_volumes.len());
let data = body.clone(); for vol in &target_volumes {
async move { let url = format!("{vol}/{key}");
let resp = client.put(&url).body(data).send().await.map_err(|e| { let handle =
VolumeError::Request { url: url.clone(), source: e } tokio::spawn({
})?; let client = state.http.clone();
if !resp.status().is_success() { let data = body.clone();
return Err(VolumeError::BadStatus { url, status: resp.status() }); async move {
} let resp = client.put(&url).body(data).send().await.map_err(|e| {
Ok(()) VolumeError::Request {
} url: url.clone(),
}); source: e,
handles.push(handle); }
} })?;
if !resp.status().is_success() {
let mut failed = false; return Err(VolumeError::BadStatus {
for handle in handles { url,
match handle.await { status: resp.status(),
Ok(Err(e)) => { });
tracing::error!("{e}"); }
failed = true; Ok(())
} }
Err(e) => { });
tracing::error!("volume write task failed: {e}"); handles.push(handle);
failed = true; }
}
Ok(Ok(())) => {} let mut failed = false;
} for handle in handles {
} match handle.await {
Ok(Err(e)) => {
if failed { tracing::error!("{e}");
// Rollback: best-effort delete from volumes failed = true;
for vol in &target_volumes { }
let _ = state.http.delete(format!("{vol}/{key}")).send().await; Err(e) => {
} tracing::error!("volume write task failed: {e}");
return Err(AppError::PartialWrite); failed = true;
} }
Ok(Ok(())) => {}
let size = Some(body.len() as i64); }
if let Err(e) = state.db.put(key.clone(), target_volumes.clone(), size).await { }
for vol in &target_volumes {
let _ = state.http.delete(format!("{vol}/{key}")).send().await; if failed {
} // Rollback: best-effort delete from volumes
return Err(e); for vol in &target_volumes {
} let _ = state.http.delete(format!("{vol}/{key}")).send().await;
Ok(StatusCode::CREATED.into_response()) }
} return Err(AppError::PartialWrite);
}
pub async fn delete_key(
State(state): State<AppState>, let size = Some(body.len() as i64);
Path(key): Path<String>, if let Err(e) = state
) -> Result<Response, AppError> { .db
let record = state.db.get(&key).await?; .put(key.clone(), target_volumes.clone(), size)
.await
let mut handles = Vec::new(); {
for vol in &record.volumes { for vol in &target_volumes {
let url = format!("{vol}/{key}"); let _ = state.http.delete(format!("{vol}/{key}")).send().await;
let handle = tokio::spawn({ }
let client = state.http.clone(); return Err(e);
async move { }
let resp = client.delete(&url).send().await.map_err(|e| { Ok(StatusCode::CREATED.into_response())
VolumeError::Request { url: url.clone(), source: e } }
})?;
if !resp.status().is_success() { pub async fn delete_key(
return Err(VolumeError::BadStatus { url, status: resp.status() }); State(state): State<AppState>,
} Path(key): Path<String>,
Ok(()) ) -> Result<Response, AppError> {
} let record = state.db.get(&key).await?;
});
handles.push(handle); let mut handles = Vec::new();
} for vol in &record.volumes {
for handle in handles { let url = format!("{vol}/{key}");
match handle.await { let handle = tokio::spawn({
Ok(Err(e)) => tracing::error!("{e}"), let client = state.http.clone();
Err(e) => tracing::error!("volume delete task failed: {e}"), async move {
Ok(Ok(())) => {} let resp = client
} .delete(&url)
} .send()
.await
state.db.delete(key).await?; .map_err(|e| VolumeError::Request {
Ok(StatusCode::NO_CONTENT.into_response()) url: url.clone(),
} source: e,
})?;
pub async fn head_key( if !resp.status().is_success() {
State(state): State<AppState>, return Err(VolumeError::BadStatus {
Path(key): Path<String>, url,
) -> Result<Response, AppError> { status: resp.status(),
let record = state.db.get(&key).await?; });
let mut headers = HeaderMap::new(); }
if let Some(size) = record.size { Ok(())
headers.insert(axum::http::header::CONTENT_LENGTH, size.to_string().parse().unwrap()); }
} });
Ok((StatusCode::OK, headers).into_response()) handles.push(handle);
} }
for handle in handles {
#[derive(serde::Deserialize)] match handle.await {
pub struct ListQuery { Ok(Err(e)) => tracing::error!("{e}"),
#[serde(default)] Err(e) => tracing::error!("volume delete task failed: {e}"),
pub prefix: String, Ok(Ok(())) => {}
} }
}
pub async fn list_keys(
State(state): State<AppState>, state.db.delete(key).await?;
Query(query): Query<ListQuery>, Ok(StatusCode::NO_CONTENT.into_response())
) -> Result<Response, AppError> { }
let keys = state.db.list_keys(&query.prefix).await?;
Ok((StatusCode::OK, keys.join("\n")).into_response()) pub async fn head_key(
} State(state): State<AppState>,
Path(key): Path<String>,
#[cfg(test)] ) -> Result<Response, AppError> {
mod tests { let record = state.db.get(&key).await?;
#[test] let mut headers = HeaderMap::new();
fn test_volumes_for_key_sufficient() { if let Some(size) = record.size {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect(); headers.insert(
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); axum::http::header::CONTENT_LENGTH,
assert_eq!(selected.len(), 2); size.to_string().parse().unwrap(),
} );
}
#[test] Ok((StatusCode::OK, headers).into_response())
fn test_volumes_for_key_insufficient() { }
let volumes: Vec<String> = vec!["http://vol1".into()];
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); #[derive(serde::Deserialize)]
assert_eq!(selected.len(), 1); pub struct ListQuery {
} #[serde(default)]
} pub prefix: String,
}
pub async fn list_keys(
State(state): State<AppState>,
Query(query): Query<ListQuery>,
) -> Result<Response, AppError> {
let keys = state.db.list_keys(&query.prefix).await?;
Ok((StatusCode::OK, keys.join("\n")).into_response())
}
#[cfg(test)]
mod tests {
#[test]
fn test_volumes_for_key_sufficient() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
assert_eq!(selected.len(), 2);
}
#[test]
fn test_volumes_for_key_insufficient() {
let volumes: Vec<String> = vec!["http://vol1".into()];
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
assert_eq!(selected.len(), 1);
}
}

View file

@ -1,166 +1,223 @@
use reqwest::StatusCode; use reqwest::StatusCode;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
static TEST_COUNTER: AtomicU32 = AtomicU32::new(0); static TEST_COUNTER: AtomicU32 = AtomicU32::new(0);
async fn start_server() -> String { async fn start_server() -> String {
let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed); let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
let db_path = format!("/tmp/mkv-test/index-{id}.db"); let db_path = format!("/tmp/mkv-test/index-{id}.db");
let _ = std::fs::remove_file(&db_path); let _ = std::fs::remove_file(&db_path);
let _ = std::fs::remove_file(format!("{db_path}-wal")); let _ = std::fs::remove_file(format!("{db_path}-wal"));
let _ = std::fs::remove_file(format!("{db_path}-shm")); let _ = std::fs::remove_file(format!("{db_path}-shm"));
let args = mkv::Args { let args = mkv::Args {
db_path, db_path,
volumes: vec![ volumes: vec![
"http://localhost:3101".into(), "http://localhost:3101".into(),
"http://localhost:3102".into(), "http://localhost:3102".into(),
"http://localhost:3103".into(), "http://localhost:3103".into(),
], ],
replicas: 2, replicas: 2,
}; };
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port(); let port = listener.local_addr().unwrap().port();
let app = mkv::build_app(&args); let app = mkv::build_app(&args);
tokio::spawn(async move { tokio::spawn(async move {
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
}); });
tokio::time::sleep(std::time::Duration::from_millis(50)).await; tokio::time::sleep(std::time::Duration::from_millis(50)).await;
format!("http://127.0.0.1:{port}") format!("http://127.0.0.1:{port}")
} }
fn client() -> reqwest::Client { fn client() -> reqwest::Client {
reqwest::Client::builder() reqwest::Client::builder()
.redirect(reqwest::redirect::Policy::none()) .redirect(reqwest::redirect::Policy::none())
.build() .build()
.unwrap() .unwrap()
} }
#[tokio::test] #[tokio::test]
async fn test_put_and_head() { async fn test_put_and_head() {
let base = start_server().await; let base = start_server().await;
let c = client(); let c = client();
let resp = c.put(format!("{base}/hello")).body("world").send().await.unwrap(); let resp = c
assert_eq!(resp.status(), StatusCode::CREATED); .put(format!("{base}/hello"))
.body("world")
let resp = c.head(format!("{base}/hello")).send().await.unwrap(); .send()
assert_eq!(resp.status(), StatusCode::OK); .await
assert_eq!(resp.headers().get("content-length").unwrap().to_str().unwrap(), "5"); .unwrap();
} assert_eq!(resp.status(), StatusCode::CREATED);
#[tokio::test] let resp = c.head(format!("{base}/hello")).send().await.unwrap();
async fn test_put_and_get_redirect() { assert_eq!(resp.status(), StatusCode::OK);
let base = start_server().await; assert_eq!(
let c = client(); resp.headers()
.get("content-length")
let resp = c.put(format!("{base}/redirect-test")).body("some data").send().await.unwrap(); .unwrap()
assert_eq!(resp.status(), StatusCode::CREATED); .to_str()
.unwrap(),
let resp = c.get(format!("{base}/redirect-test")).send().await.unwrap(); "5"
assert_eq!(resp.status(), StatusCode::FOUND); );
}
let location = resp.headers().get("location").unwrap().to_str().unwrap();
assert!(location.starts_with("http://localhost:310"), "got: {location}"); #[tokio::test]
async fn test_put_and_get_redirect() {
let blob_resp = reqwest::get(location).await.unwrap(); let base = start_server().await;
assert_eq!(blob_resp.status(), StatusCode::OK); let c = client();
assert_eq!(blob_resp.text().await.unwrap(), "some data");
} let resp = c
.put(format!("{base}/redirect-test"))
#[tokio::test] .body("some data")
async fn test_get_nonexistent_returns_404() { .send()
let base = start_server().await; .await
let c = client(); .unwrap();
let resp = c.get(format!("{base}/does-not-exist")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::CREATED);
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
} let resp = c.get(format!("{base}/redirect-test")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::FOUND);
#[tokio::test]
async fn test_put_get_delete_get() { let location = resp.headers().get("location").unwrap().to_str().unwrap();
let base = start_server().await; assert!(
let c = client(); location.starts_with("http://localhost:310"),
"got: {location}"
let resp = c.put(format!("{base}/delete-me")).body("temporary").send().await.unwrap(); );
assert_eq!(resp.status(), StatusCode::CREATED);
let blob_resp = reqwest::get(location).await.unwrap();
let resp = c.get(format!("{base}/delete-me")).send().await.unwrap(); assert_eq!(blob_resp.status(), StatusCode::OK);
assert_eq!(resp.status(), StatusCode::FOUND); assert_eq!(blob_resp.text().await.unwrap(), "some data");
}
let resp = c.delete(format!("{base}/delete-me")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT); #[tokio::test]
async fn test_get_nonexistent_returns_404() {
let resp = c.get(format!("{base}/delete-me")).send().await.unwrap(); let base = start_server().await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND); let c = client();
} let resp = c
.get(format!("{base}/does-not-exist"))
#[tokio::test] .send()
async fn test_delete_nonexistent_returns_404() { .await
let base = start_server().await; .unwrap();
let c = client(); assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let resp = c.delete(format!("{base}/never-existed")).send().await.unwrap(); }
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
} #[tokio::test]
async fn test_put_get_delete_get() {
#[tokio::test] let base = start_server().await;
async fn test_list_keys() { let c = client();
let base = start_server().await;
let c = client(); let resp = c
.put(format!("{base}/delete-me"))
for name in ["docs/a", "docs/b", "docs/c", "other/x"] { .body("temporary")
c.put(format!("{base}/{name}")).body("data").send().await.unwrap(); .send()
} .await
.unwrap();
let resp = c.get(format!("{base}/")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::CREATED);
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.text().await.unwrap(); let resp = c.get(format!("{base}/delete-me")).send().await.unwrap();
assert!(body.contains("docs/a")); assert_eq!(resp.status(), StatusCode::FOUND);
assert!(body.contains("other/x"));
let resp = c.delete(format!("{base}/delete-me")).send().await.unwrap();
let resp = c.get(format!("{base}/?prefix=docs/")).send().await.unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT);
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.text().await.unwrap(); let resp = c.get(format!("{base}/delete-me")).send().await.unwrap();
let lines: Vec<&str> = body.lines().collect(); assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(lines.len(), 3); }
assert!(!body.contains("other/x"));
} #[tokio::test]
async fn test_delete_nonexistent_returns_404() {
#[tokio::test] let base = start_server().await;
async fn test_put_overwrite() { let c = client();
let base = start_server().await; let resp = c
let c = client(); .delete(format!("{base}/never-existed"))
.send()
c.put(format!("{base}/overwrite")).body("version1").send().await.unwrap(); .await
.unwrap();
let resp = c.put(format!("{base}/overwrite")).body("version2").send().await.unwrap(); assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(resp.status(), StatusCode::CREATED); }
let resp = c.head(format!("{base}/overwrite")).send().await.unwrap(); #[tokio::test]
assert_eq!(resp.headers().get("content-length").unwrap().to_str().unwrap(), "8"); async fn test_list_keys() {
let base = start_server().await;
let resp = c.get(format!("{base}/overwrite")).send().await.unwrap(); let c = client();
let location = resp.headers().get("location").unwrap().to_str().unwrap();
let body = reqwest::get(location).await.unwrap().text().await.unwrap(); for name in ["docs/a", "docs/b", "docs/c", "other/x"] {
assert_eq!(body, "version2"); c.put(format!("{base}/{name}"))
} .body("data")
.send()
#[tokio::test] .await
async fn test_replication_writes_to_multiple_volumes() { .unwrap();
let base = start_server().await; }
let c = client();
let resp = c.get(format!("{base}/")).send().await.unwrap();
c.put(format!("{base}/replicated")).body("replica-data").send().await.unwrap(); assert_eq!(resp.status(), StatusCode::OK);
let body = resp.text().await.unwrap();
let resp = c.head(format!("{base}/replicated")).send().await.unwrap(); assert!(body.contains("docs/a"));
assert_eq!(resp.status(), StatusCode::OK); assert!(body.contains("other/x"));
let resp = c.get(format!("{base}/replicated")).send().await.unwrap(); let resp = c.get(format!("{base}/?prefix=docs/")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::FOUND); assert_eq!(resp.status(), StatusCode::OK);
let location = resp.headers().get("location").unwrap().to_str().unwrap(); let body = resp.text().await.unwrap();
let body = reqwest::get(location).await.unwrap().text().await.unwrap(); let lines: Vec<&str> = body.lines().collect();
assert_eq!(body, "replica-data"); assert_eq!(lines.len(), 3);
} assert!(!body.contains("other/x"));
}
#[tokio::test]
async fn test_put_overwrite() {
let base = start_server().await;
let c = client();
c.put(format!("{base}/overwrite"))
.body("version1")
.send()
.await
.unwrap();
let resp = c
.put(format!("{base}/overwrite"))
.body("version2")
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
let resp = c.head(format!("{base}/overwrite")).send().await.unwrap();
assert_eq!(
resp.headers()
.get("content-length")
.unwrap()
.to_str()
.unwrap(),
"8"
);
let resp = c.get(format!("{base}/overwrite")).send().await.unwrap();
let location = resp.headers().get("location").unwrap().to_str().unwrap();
let body = reqwest::get(location).await.unwrap().text().await.unwrap();
assert_eq!(body, "version2");
}
#[tokio::test]
async fn test_replication_writes_to_multiple_volumes() {
let base = start_server().await;
let c = client();
c.put(format!("{base}/replicated"))
.body("replica-data")
.send()
.await
.unwrap();
let resp = c.head(format!("{base}/replicated")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let resp = c.get(format!("{base}/replicated")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::FOUND);
let location = resp.headers().get("location").unwrap().to_str().unwrap();
let body = reqwest::get(location).await.unwrap().text().await.unwrap();
assert_eq!(body, "replica-data");
}