From 2a2afa5f69b471d5ecd2747f9c4a86975bb869f8 Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sat, 7 Mar 2026 09:53:24 +0100 Subject: [PATCH] V2 --- Cargo.lock | 1 + Cargo.toml | 1 + src/hasher.rs | 145 +++++++++++++++++++++++++++++++++++++ src/health.rs | 32 ++++++++ src/main.rs | 57 ++++++++++++--- src/server.rs | 197 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/volume.rs | 113 +++++++++++++++++++++++++++++ 7 files changed, 534 insertions(+), 12 deletions(-) create mode 100644 src/hasher.rs create mode 100644 src/health.rs create mode 100644 src/server.rs create mode 100644 src/volume.rs diff --git a/Cargo.lock b/Cargo.lock index 739f9fa..61f724d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -878,6 +878,7 @@ name = "mkv" version = "0.1.0" dependencies = [ "axum", + "bytes", "clap", "reqwest", "rusqlite", diff --git a/Cargo.toml b/Cargo.toml index 131e855..aee013b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ tracing = "0.1" tracing-subscriber = "0.3" tokio-stream = "0.1" sha2 = "0.10" +bytes = "1" [profile.release] opt-level = 3 diff --git a/src/hasher.rs b/src/hasher.rs new file mode 100644 index 0000000..e659b20 --- /dev/null +++ b/src/hasher.rs @@ -0,0 +1,145 @@ +use sha2::{Digest, Sha256}; +use std::collections::BTreeMap; + +pub struct Ring { + nodes: BTreeMap, + virtual_nodes: usize, +} + +impl Ring { + pub fn new(volumes: &[String], virtual_nodes: usize) -> Self { + let mut ring = Self { + nodes: BTreeMap::new(), + virtual_nodes, + }; + for url in volumes { + ring.add_volume(url); + } + ring + } + + pub fn add_volume(&mut self, url: &str) { + for i in 0..self.virtual_nodes { + let hash = hash_key(&format!("{url}:{i}")); + self.nodes.insert(hash, url.to_string()); + } + } + + pub fn remove_volume(&mut self, url: &str) { + self.nodes.retain(|_, v| v != url); + } + + /// Walk the ring clockwise from the key's hash position and return + /// `count` distinct physical volumes. + pub fn get_volumes(&self, key: &str, count: usize) -> Vec { + if self.nodes.is_empty() { + return vec![]; + } + + let hash = hash_key(key); + let mut result = Vec::with_capacity(count); + + // Walk clockwise from hash position + for (_, url) in self.nodes.range(hash..).chain(self.nodes.iter()) { + if !result.contains(url) { + result.push(url.clone()); + } + if result.len() == count { + break; + } + } + + result + } + + /// Compute the content-addressed path for a key: /{first2}/{next2}/{full_hex} + pub fn key_path(key: &str) -> String { + let hex = hex_hash(key); + format!("/{}/{}/{}", &hex[..2], &hex[2..4], &hex) + } +} + +fn hash_key(input: &str) -> u64 { + let hash = Sha256::digest(input.as_bytes()); + u64::from_be_bytes(hash[..8].try_into().unwrap()) +} + +fn hex_hash(input: &str) -> String { + let hash = Sha256::digest(input.as_bytes()); + hex_encode(&hash) +} + +fn hex_encode(bytes: &[u8]) -> String { + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + use std::fmt::Write; + write!(s, "{b:02x}").unwrap(); + } + s +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ring_distribution() { + let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect(); + let ring = Ring::new(&volumes, 100); + + // All 3 volumes should appear in results + let selected = ring.get_volumes("test-key", 3); + assert_eq!(selected.len(), 3); + for v in &volumes { + assert!(selected.contains(v), "missing volume {v}"); + } + } + + #[test] + fn test_ring_deterministic() { + let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect(); + let ring = Ring::new(&volumes, 100); + + let a = ring.get_volumes("my-key", 2); + let b = ring.get_volumes("my-key", 2); + assert_eq!(a, b); + } + + #[test] + fn test_ring_count_capped() { + let volumes: Vec = (1..=2).map(|i| format!("http://vol{i}:300{i}")).collect(); + let ring = Ring::new(&volumes, 100); + + // Requesting more than available should return what's available + let selected = ring.get_volumes("key", 5); + assert_eq!(selected.len(), 2); + } + + #[test] + fn test_key_path_format() { + let path = Ring::key_path("hello"); + // SHA256("hello") = 2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824 + assert_eq!(path, "/2c/f2/2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"); + } + + #[test] + fn test_ring_even_distribution() { + let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect(); + let ring = Ring::new(&volumes, 100); + + let mut counts = std::collections::HashMap::new(); + for i in 0..3000 { + let key = format!("key-{i}"); + let primary = &ring.get_volumes(&key, 1)[0]; + *counts.entry(primary.clone()).or_insert(0u32) += 1; + } + + // Each volume should get roughly 1000 keys (±30%) + for (vol, count) in &counts { + assert!( + *count > 700 && *count < 1300, + "volume {vol} got {count} keys, expected ~1000" + ); + } + } +} diff --git a/src/health.rs b/src/health.rs new file mode 100644 index 0000000..2637c37 --- /dev/null +++ b/src/health.rs @@ -0,0 +1,32 @@ +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::RwLock; + +use crate::volume::VolumeClient; + +pub type HealthyVolumes = Arc>>; + +pub fn spawn_health_checker( + volume_client: VolumeClient, + all_volumes: Vec, + healthy: HealthyVolumes, +) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(10)); + loop { + interval.tick().await; + for url in &all_volumes { + let ok = volume_client.check(url).await; + let mut set = healthy.write().await; + if ok { + set.insert(url.clone()); + } else { + if set.remove(url) { + tracing::warn!("Volume {url} is unhealthy"); + } + } + } + } + }); +} diff --git a/src/main.rs b/src/main.rs index 6d4d9c6..a993ae9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,16 @@ mod config; mod db; mod error; +mod hasher; +mod health; +mod server; +mod volume; use clap::{Parser, Subcommand}; +use std::collections::HashSet; use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::RwLock; #[derive(Parser)] #[command(name = "mkv", about = "Distributed key-value store")] @@ -70,27 +77,53 @@ async fn serve(config: config::Config) { .unwrap_or(4); let reads = db::ReadPool::new(db_path, num_readers); + let volume_urls = config.volume_urls(); + let ring = Arc::new(RwLock::new(hasher::Ring::new( + &volume_urls, + config.server.virtual_nodes, + ))); + + let volume_client = volume::VolumeClient::new(); + + // Start with all volumes assumed healthy, health checker will update + let healthy_volumes: health::HealthyVolumes = + Arc::new(RwLock::new(HashSet::from_iter(volume_urls.clone()))); + + health::spawn_health_checker( + volume_client.clone(), + volume_urls.clone(), + healthy_volumes.clone(), + ); + let port = config.server.port; - let volumes = config.volume_urls(); + + let state = server::AppState { + writer, + reads, + ring, + volume_client, + healthy_volumes, + config: Arc::new(config), + }; tracing::info!("Starting mkv server on port {port}"); tracing::info!(" Readers: {num_readers}"); - tracing::info!(" Volumes: {volumes:?}"); - tracing::info!( - " Replication factor: {}", - config.server.replication_factor - ); + tracing::info!(" Volumes: {volume_urls:?}"); + tracing::info!(" Replication factor: {}", state.config.server.replication_factor); - // TODO: wire up axum routes, volume client, hasher, health checker let app = axum::Router::new() - .route("/health", axum::routing::get(|| async { "ok" })); + .route("/", axum::routing::get(server::list_keys)) + .route( + "/{*key}", + axum::routing::get(server::get_key) + .put(server::put_key) + .delete(server::delete_key) + .head(server::head_key), + ) + .with_state(state); let addr = format!("0.0.0.0:{port}"); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); tracing::info!("Listening on {addr}"); axum::serve(listener, app).await.unwrap(); - - // Keep these alive (will be used in later phases) - drop(writer); - drop(reads); } diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..4abeb2c --- /dev/null +++ b/src/server.rs @@ -0,0 +1,197 @@ +use axum::body::Bytes; +use axum::extract::{Path, Query, State}; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::{IntoResponse, Response}; +use std::collections::HashSet; +use std::sync::Arc; +use tokio::sync::RwLock; + +use crate::config::Config; +use crate::db; +use crate::error::AppError; +use crate::hasher::Ring; +use crate::volume::VolumeClient; + +#[derive(Clone)] +pub struct AppState { + pub writer: db::WriterHandle, + pub reads: db::ReadPool, + pub ring: Arc>, + pub volume_client: VolumeClient, + pub healthy_volumes: Arc>>, + pub config: Arc, +} + +/// GET /:key — look up key, redirect to a healthy volume +pub async fn get_key( + State(state): State, + Path(key): Path, +) -> Result { + let record = state + .reads + .query({ + let key = key.clone(); + move |conn| db::get(conn, &key) + }) + .await?; + + let healthy = state.healthy_volumes.read().await; + + // Pick the first healthy volume + for vol in &record.volumes { + if healthy.contains(vol) { + let location = format!("{}{}", vol, record.path); + return Ok(( + StatusCode::FOUND, + [(axum::http::header::LOCATION, location)], + ) + .into_response()); + } + } + + Err(AppError::NoHealthyVolume) +} + +/// PUT /:key — store blob on volumes, record in index +pub async fn put_key( + State(state): State, + Path(key): Path, + body: Bytes, +) -> Result { + let replication = state.config.server.replication_factor; + let path = Ring::key_path(&key); + + let target_volumes = { + let ring = state.ring.read().await; + ring.get_volumes(&key, replication) + }; + + if target_volumes.len() < replication { + return Err(AppError::VolumeError(format!( + "need {replication} volumes but only {} available", + target_volumes.len() + ))); + } + + // Fan out PUTs to all target volumes concurrently + let mut handles = Vec::with_capacity(target_volumes.len()); + for vol in &target_volumes { + let client = state.volume_client.clone(); + let vol = vol.clone(); + let path = path.clone(); + let key = key.clone(); + let data = body.clone(); + handles.push(tokio::spawn(async move { + client.put(&vol, &path, &key, data).await + })); + } + + let mut succeeded = Vec::new(); + let mut failed = false; + for (i, handle) in handles.into_iter().enumerate() { + match handle.await.unwrap() { + Ok(()) => succeeded.push(target_volumes[i].clone()), + Err(e) => { + tracing::error!("PUT to volume failed: {e}"); + failed = true; + } + } + } + + if failed { + // Rollback: delete from volumes that succeeded + for vol in &succeeded { + if let Err(e) = state.volume_client.delete(vol, &path).await { + tracing::error!("Rollback DELETE failed: {e}"); + } + } + return Err(AppError::VolumeError( + "not all volume writes succeeded".into(), + )); + } + + let size = Some(body.len() as i64); + state + .writer + .put(key, target_volumes, path, size) + .await?; + + Ok(StatusCode::CREATED.into_response()) +} + +/// DELETE /:key — remove from volumes and index +pub async fn delete_key( + State(state): State, + Path(key): Path, +) -> Result { + let record = state + .reads + .query({ + let key = key.clone(); + move |conn| db::get(conn, &key) + }) + .await?; + + // Fan out DELETEs concurrently + let mut handles = Vec::new(); + for vol in &record.volumes { + let client = state.volume_client.clone(); + let vol = vol.clone(); + let path = record.path.clone(); + handles.push(tokio::spawn( + async move { client.delete(&vol, &path).await }, + )); + } + + for handle in handles { + if let Err(e) = handle.await.unwrap() { + tracing::error!("DELETE from volume failed: {e}"); + } + } + + // Remove from index regardless of volume DELETE results + state.writer.delete(key).await?; + + Ok(StatusCode::NO_CONTENT.into_response()) +} + +/// HEAD /:key — check if key exists, return size +pub async fn head_key( + State(state): State, + Path(key): Path, +) -> Result { + let record = state + .reads + .query(move |conn| db::get(conn, &key)) + .await?; + + let mut headers = HeaderMap::new(); + if let Some(size) = record.size { + headers.insert( + axum::http::header::CONTENT_LENGTH, + size.to_string().parse().unwrap(), + ); + } + + Ok((StatusCode::OK, headers).into_response()) +} + +#[derive(serde::Deserialize)] +pub struct ListQuery { + #[serde(default)] + pub prefix: String, +} + +/// GET / — list keys with optional prefix filter +pub async fn list_keys( + State(state): State, + Query(query): Query, +) -> Result { + let keys = state + .reads + .query(move |conn| db::list_keys(conn, &query.prefix)) + .await?; + + let body = keys.join("\n"); + Ok((StatusCode::OK, body).into_response()) +} diff --git a/src/volume.rs b/src/volume.rs new file mode 100644 index 0000000..e57d334 --- /dev/null +++ b/src/volume.rs @@ -0,0 +1,113 @@ +use bytes::Bytes; +use std::time::Duration; + +#[derive(Clone)] +pub struct VolumeClient { + client: reqwest::Client, +} + +impl VolumeClient { + pub fn new() -> Self { + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(30)) + .pool_max_idle_per_host(10) + .build() + .expect("failed to build HTTP client"); + Self { client } + } + + /// PUT a blob to a volume server. Also writes a .key sidecar file + /// so the key can be recovered during rebuild. + pub async fn put( + &self, + volume_url: &str, + path: &str, + key: &str, + data: Bytes, + ) -> Result<(), String> { + let url = format!("{volume_url}{path}"); + let resp = self + .client + .put(&url) + .body(data) + .send() + .await + .map_err(|e| format!("PUT {url}: {e}"))?; + + if !resp.status().is_success() { + return Err(format!("PUT {url}: status {}", resp.status())); + } + + // Write .key sidecar with the original key name + let key_url = format!("{volume_url}{path}.key"); + let resp = self + .client + .put(&key_url) + .body(key.to_string()) + .send() + .await + .map_err(|e| format!("PUT {key_url}: {e}"))?; + + if !resp.status().is_success() { + return Err(format!("PUT {key_url}: status {}", resp.status())); + } + + Ok(()) + } + + /// GET a blob from a volume server. + pub async fn get(&self, volume_url: &str, path: &str) -> Result { + let url = format!("{volume_url}{path}"); + let resp = self + .client + .get(&url) + .send() + .await + .map_err(|e| format!("GET {url}: {e}"))?; + + if resp.status() == reqwest::StatusCode::NOT_FOUND { + return Err(format!("GET {url}: not found")); + } + if !resp.status().is_success() { + return Err(format!("GET {url}: status {}", resp.status())); + } + + resp.bytes() + .await + .map_err(|e| format!("GET {url} body: {e}")) + } + + /// DELETE a blob (and its .key sidecar) from a volume server. + pub async fn delete(&self, volume_url: &str, path: &str) -> Result<(), String> { + let url = format!("{volume_url}{path}"); + let resp = self + .client + .delete(&url) + .send() + .await + .map_err(|e| format!("DELETE {url}: {e}"))?; + + // 404 is fine — already gone + if !resp.status().is_success() && resp.status() != reqwest::StatusCode::NOT_FOUND { + return Err(format!("DELETE {url}: status {}", resp.status())); + } + + // Best-effort delete the .key sidecar + let key_url = format!("{volume_url}{path}.key"); + let _ = self.client.delete(&key_url).send().await; + + Ok(()) + } + + /// Health check a volume server. + pub async fn check(&self, volume_url: &str) -> bool { + let url = format!("{volume_url}/"); + self.client + .head(&url) + .timeout(Duration::from_secs(2)) + .send() + .await + .is_ok_and(|r| r.status().is_success()) + } +}