This commit is contained in:
Silas Brack 2026-03-07 09:53:24 +01:00
parent 8d32777f9f
commit 2a2afa5f69
7 changed files with 534 additions and 12 deletions

1
Cargo.lock generated
View file

@ -878,6 +878,7 @@ name = "mkv"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"axum", "axum",
"bytes",
"clap", "clap",
"reqwest", "reqwest",
"rusqlite", "rusqlite",

View file

@ -16,6 +16,7 @@ tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
tokio-stream = "0.1" tokio-stream = "0.1"
sha2 = "0.10" sha2 = "0.10"
bytes = "1"
[profile.release] [profile.release]
opt-level = 3 opt-level = 3

145
src/hasher.rs Normal file
View file

@ -0,0 +1,145 @@
use sha2::{Digest, Sha256};
use std::collections::BTreeMap;
pub struct Ring {
nodes: BTreeMap<u64, String>,
virtual_nodes: usize,
}
impl Ring {
pub fn new(volumes: &[String], virtual_nodes: usize) -> Self {
let mut ring = Self {
nodes: BTreeMap::new(),
virtual_nodes,
};
for url in volumes {
ring.add_volume(url);
}
ring
}
pub fn add_volume(&mut self, url: &str) {
for i in 0..self.virtual_nodes {
let hash = hash_key(&format!("{url}:{i}"));
self.nodes.insert(hash, url.to_string());
}
}
pub fn remove_volume(&mut self, url: &str) {
self.nodes.retain(|_, v| v != url);
}
/// Walk the ring clockwise from the key's hash position and return
/// `count` distinct physical volumes.
pub fn get_volumes(&self, key: &str, count: usize) -> Vec<String> {
if self.nodes.is_empty() {
return vec![];
}
let hash = hash_key(key);
let mut result = Vec::with_capacity(count);
// Walk clockwise from hash position
for (_, url) in self.nodes.range(hash..).chain(self.nodes.iter()) {
if !result.contains(url) {
result.push(url.clone());
}
if result.len() == count {
break;
}
}
result
}
/// Compute the content-addressed path for a key: /{first2}/{next2}/{full_hex}
pub fn key_path(key: &str) -> String {
let hex = hex_hash(key);
format!("/{}/{}/{}", &hex[..2], &hex[2..4], &hex)
}
}
fn hash_key(input: &str) -> u64 {
let hash = Sha256::digest(input.as_bytes());
u64::from_be_bytes(hash[..8].try_into().unwrap())
}
fn hex_hash(input: &str) -> String {
let hash = Sha256::digest(input.as_bytes());
hex_encode(&hash)
}
fn hex_encode(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
use std::fmt::Write;
write!(s, "{b:02x}").unwrap();
}
s
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ring_distribution() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect();
let ring = Ring::new(&volumes, 100);
// All 3 volumes should appear in results
let selected = ring.get_volumes("test-key", 3);
assert_eq!(selected.len(), 3);
for v in &volumes {
assert!(selected.contains(v), "missing volume {v}");
}
}
#[test]
fn test_ring_deterministic() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect();
let ring = Ring::new(&volumes, 100);
let a = ring.get_volumes("my-key", 2);
let b = ring.get_volumes("my-key", 2);
assert_eq!(a, b);
}
#[test]
fn test_ring_count_capped() {
let volumes: Vec<String> = (1..=2).map(|i| format!("http://vol{i}:300{i}")).collect();
let ring = Ring::new(&volumes, 100);
// Requesting more than available should return what's available
let selected = ring.get_volumes("key", 5);
assert_eq!(selected.len(), 2);
}
#[test]
fn test_key_path_format() {
let path = Ring::key_path("hello");
// SHA256("hello") = 2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824
assert_eq!(path, "/2c/f2/2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824");
}
#[test]
fn test_ring_even_distribution() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect();
let ring = Ring::new(&volumes, 100);
let mut counts = std::collections::HashMap::new();
for i in 0..3000 {
let key = format!("key-{i}");
let primary = &ring.get_volumes(&key, 1)[0];
*counts.entry(primary.clone()).or_insert(0u32) += 1;
}
// Each volume should get roughly 1000 keys (±30%)
for (vol, count) in &counts {
assert!(
*count > 700 && *count < 1300,
"volume {vol} got {count} keys, expected ~1000"
);
}
}
}

32
src/health.rs Normal file
View file

@ -0,0 +1,32 @@
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use crate::volume::VolumeClient;
pub type HealthyVolumes = Arc<RwLock<HashSet<String>>>;
pub fn spawn_health_checker(
volume_client: VolumeClient,
all_volumes: Vec<String>,
healthy: HealthyVolumes,
) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
for url in &all_volumes {
let ok = volume_client.check(url).await;
let mut set = healthy.write().await;
if ok {
set.insert(url.clone());
} else {
if set.remove(url) {
tracing::warn!("Volume {url} is unhealthy");
}
}
}
}
});
}

View file

@ -1,9 +1,16 @@
mod config; mod config;
mod db; mod db;
mod error; mod error;
mod hasher;
mod health;
mod server;
mod volume;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use std::collections::HashSet;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Parser)] #[derive(Parser)]
#[command(name = "mkv", about = "Distributed key-value store")] #[command(name = "mkv", about = "Distributed key-value store")]
@ -70,27 +77,53 @@ async fn serve(config: config::Config) {
.unwrap_or(4); .unwrap_or(4);
let reads = db::ReadPool::new(db_path, num_readers); let reads = db::ReadPool::new(db_path, num_readers);
let volume_urls = config.volume_urls();
let ring = Arc::new(RwLock::new(hasher::Ring::new(
&volume_urls,
config.server.virtual_nodes,
)));
let volume_client = volume::VolumeClient::new();
// Start with all volumes assumed healthy, health checker will update
let healthy_volumes: health::HealthyVolumes =
Arc::new(RwLock::new(HashSet::from_iter(volume_urls.clone())));
health::spawn_health_checker(
volume_client.clone(),
volume_urls.clone(),
healthy_volumes.clone(),
);
let port = config.server.port; let port = config.server.port;
let volumes = config.volume_urls();
let state = server::AppState {
writer,
reads,
ring,
volume_client,
healthy_volumes,
config: Arc::new(config),
};
tracing::info!("Starting mkv server on port {port}"); tracing::info!("Starting mkv server on port {port}");
tracing::info!(" Readers: {num_readers}"); tracing::info!(" Readers: {num_readers}");
tracing::info!(" Volumes: {volumes:?}"); tracing::info!(" Volumes: {volume_urls:?}");
tracing::info!( tracing::info!(" Replication factor: {}", state.config.server.replication_factor);
" Replication factor: {}",
config.server.replication_factor
);
// TODO: wire up axum routes, volume client, hasher, health checker
let app = axum::Router::new() let app = axum::Router::new()
.route("/health", axum::routing::get(|| async { "ok" })); .route("/", axum::routing::get(server::list_keys))
.route(
"/{*key}",
axum::routing::get(server::get_key)
.put(server::put_key)
.delete(server::delete_key)
.head(server::head_key),
)
.with_state(state);
let addr = format!("0.0.0.0:{port}"); let addr = format!("0.0.0.0:{port}");
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
tracing::info!("Listening on {addr}"); tracing::info!("Listening on {addr}");
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
// Keep these alive (will be used in later phases)
drop(writer);
drop(reads);
} }

197
src/server.rs Normal file
View file

@ -0,0 +1,197 @@
use axum::body::Bytes;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::config::Config;
use crate::db;
use crate::error::AppError;
use crate::hasher::Ring;
use crate::volume::VolumeClient;
#[derive(Clone)]
pub struct AppState {
pub writer: db::WriterHandle,
pub reads: db::ReadPool,
pub ring: Arc<RwLock<Ring>>,
pub volume_client: VolumeClient,
pub healthy_volumes: Arc<RwLock<HashSet<String>>>,
pub config: Arc<Config>,
}
/// GET /:key — look up key, redirect to a healthy volume
pub async fn get_key(
State(state): State<AppState>,
Path(key): Path<String>,
) -> Result<Response, AppError> {
let record = state
.reads
.query({
let key = key.clone();
move |conn| db::get(conn, &key)
})
.await?;
let healthy = state.healthy_volumes.read().await;
// Pick the first healthy volume
for vol in &record.volumes {
if healthy.contains(vol) {
let location = format!("{}{}", vol, record.path);
return Ok((
StatusCode::FOUND,
[(axum::http::header::LOCATION, location)],
)
.into_response());
}
}
Err(AppError::NoHealthyVolume)
}
/// PUT /:key — store blob on volumes, record in index
pub async fn put_key(
State(state): State<AppState>,
Path(key): Path<String>,
body: Bytes,
) -> Result<Response, AppError> {
let replication = state.config.server.replication_factor;
let path = Ring::key_path(&key);
let target_volumes = {
let ring = state.ring.read().await;
ring.get_volumes(&key, replication)
};
if target_volumes.len() < replication {
return Err(AppError::VolumeError(format!(
"need {replication} volumes but only {} available",
target_volumes.len()
)));
}
// Fan out PUTs to all target volumes concurrently
let mut handles = Vec::with_capacity(target_volumes.len());
for vol in &target_volumes {
let client = state.volume_client.clone();
let vol = vol.clone();
let path = path.clone();
let key = key.clone();
let data = body.clone();
handles.push(tokio::spawn(async move {
client.put(&vol, &path, &key, data).await
}));
}
let mut succeeded = Vec::new();
let mut failed = false;
for (i, handle) in handles.into_iter().enumerate() {
match handle.await.unwrap() {
Ok(()) => succeeded.push(target_volumes[i].clone()),
Err(e) => {
tracing::error!("PUT to volume failed: {e}");
failed = true;
}
}
}
if failed {
// Rollback: delete from volumes that succeeded
for vol in &succeeded {
if let Err(e) = state.volume_client.delete(vol, &path).await {
tracing::error!("Rollback DELETE failed: {e}");
}
}
return Err(AppError::VolumeError(
"not all volume writes succeeded".into(),
));
}
let size = Some(body.len() as i64);
state
.writer
.put(key, target_volumes, path, size)
.await?;
Ok(StatusCode::CREATED.into_response())
}
/// DELETE /:key — remove from volumes and index
pub async fn delete_key(
State(state): State<AppState>,
Path(key): Path<String>,
) -> Result<Response, AppError> {
let record = state
.reads
.query({
let key = key.clone();
move |conn| db::get(conn, &key)
})
.await?;
// Fan out DELETEs concurrently
let mut handles = Vec::new();
for vol in &record.volumes {
let client = state.volume_client.clone();
let vol = vol.clone();
let path = record.path.clone();
handles.push(tokio::spawn(
async move { client.delete(&vol, &path).await },
));
}
for handle in handles {
if let Err(e) = handle.await.unwrap() {
tracing::error!("DELETE from volume failed: {e}");
}
}
// Remove from index regardless of volume DELETE results
state.writer.delete(key).await?;
Ok(StatusCode::NO_CONTENT.into_response())
}
/// HEAD /:key — check if key exists, return size
pub async fn head_key(
State(state): State<AppState>,
Path(key): Path<String>,
) -> Result<Response, AppError> {
let record = state
.reads
.query(move |conn| db::get(conn, &key))
.await?;
let mut headers = HeaderMap::new();
if let Some(size) = record.size {
headers.insert(
axum::http::header::CONTENT_LENGTH,
size.to_string().parse().unwrap(),
);
}
Ok((StatusCode::OK, headers).into_response())
}
#[derive(serde::Deserialize)]
pub struct ListQuery {
#[serde(default)]
pub prefix: String,
}
/// GET / — list keys with optional prefix filter
pub async fn list_keys(
State(state): State<AppState>,
Query(query): Query<ListQuery>,
) -> Result<Response, AppError> {
let keys = state
.reads
.query(move |conn| db::list_keys(conn, &query.prefix))
.await?;
let body = keys.join("\n");
Ok((StatusCode::OK, body).into_response())
}

113
src/volume.rs Normal file
View file

@ -0,0 +1,113 @@
use bytes::Bytes;
use std::time::Duration;
#[derive(Clone)]
pub struct VolumeClient {
client: reqwest::Client,
}
impl VolumeClient {
pub fn new() -> Self {
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(2))
.timeout(Duration::from_secs(30))
.pool_max_idle_per_host(10)
.build()
.expect("failed to build HTTP client");
Self { client }
}
/// PUT a blob to a volume server. Also writes a .key sidecar file
/// so the key can be recovered during rebuild.
pub async fn put(
&self,
volume_url: &str,
path: &str,
key: &str,
data: Bytes,
) -> Result<(), String> {
let url = format!("{volume_url}{path}");
let resp = self
.client
.put(&url)
.body(data)
.send()
.await
.map_err(|e| format!("PUT {url}: {e}"))?;
if !resp.status().is_success() {
return Err(format!("PUT {url}: status {}", resp.status()));
}
// Write .key sidecar with the original key name
let key_url = format!("{volume_url}{path}.key");
let resp = self
.client
.put(&key_url)
.body(key.to_string())
.send()
.await
.map_err(|e| format!("PUT {key_url}: {e}"))?;
if !resp.status().is_success() {
return Err(format!("PUT {key_url}: status {}", resp.status()));
}
Ok(())
}
/// GET a blob from a volume server.
pub async fn get(&self, volume_url: &str, path: &str) -> Result<Bytes, String> {
let url = format!("{volume_url}{path}");
let resp = self
.client
.get(&url)
.send()
.await
.map_err(|e| format!("GET {url}: {e}"))?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Err(format!("GET {url}: not found"));
}
if !resp.status().is_success() {
return Err(format!("GET {url}: status {}", resp.status()));
}
resp.bytes()
.await
.map_err(|e| format!("GET {url} body: {e}"))
}
/// DELETE a blob (and its .key sidecar) from a volume server.
pub async fn delete(&self, volume_url: &str, path: &str) -> Result<(), String> {
let url = format!("{volume_url}{path}");
let resp = self
.client
.delete(&url)
.send()
.await
.map_err(|e| format!("DELETE {url}: {e}"))?;
// 404 is fine — already gone
if !resp.status().is_success() && resp.status() != reqwest::StatusCode::NOT_FOUND {
return Err(format!("DELETE {url}: status {}", resp.status()));
}
// Best-effort delete the .key sidecar
let key_url = format!("{volume_url}{path}.key");
let _ = self.client.delete(&key_url).send().await;
Ok(())
}
/// Health check a volume server.
pub async fn check(&self, volume_url: &str) -> bool {
let url = format!("{volume_url}/");
self.client
.head(&url)
.timeout(Duration::from_secs(2))
.send()
.await
.is_ok_and(|r| r.status().is_success())
}
}