From ec408aff2997a899d39fcbc876dddecbf53ce464 Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sat, 7 Mar 2026 15:24:05 +0100 Subject: [PATCH] Improve typing and errors, clean up --- README.md | 132 ++++++++++++++++++++++++++++++++++++++++++++++++++ TODO.md | 4 +- src/error.rs | 39 ++++++++++++--- src/main.rs | 25 +++++++++- src/server.rs | 27 ++++++----- 5 files changed, 205 insertions(+), 22 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..956bf74 --- /dev/null +++ b/README.md @@ -0,0 +1,132 @@ +# mkv + +Distributed key-value store for blobs. Thin index server (Rust + SQLite) in front of nginx volume servers. Inspired by [minikeyvalue](https://github.com/geohot/minikeyvalue). + +## Usage + +```bash +# Start the index server (replicates to 2 of 3 volumes) +mkv -d /tmp/index.db -v http://vol1:8080,http://vol2:8080,http://vol3:8080 -r 2 serve -p 3000 + +# Store a file +curl -X PUT -d "contents" http://localhost:3000/path/to/key + +# Retrieve (returns 302 redirect to nginx) +curl -L http://localhost:3000/path/to/key + +# Check existence and size +curl -I http://localhost:3000/path/to/key + +# Delete +curl -X DELETE http://localhost:3000/path/to/key + +# List keys (with optional prefix filter) +curl http://localhost:3000/?prefix=path/to/ +``` + +### Operations + +```bash +# Rebuild index by scanning all volumes (disaster recovery) +mkv -d /tmp/index.db -v http://vol1:8080,http://vol2:8080,http://vol3:8080 -r 2 rebuild + +# Rebalance after adding/removing volumes (preview with --dry-run) +mkv -d /tmp/index.db -v http://vol1:8080,http://vol2:8080,http://vol3:8080,http://vol4:8080 -r 2 rebalance --dry-run +mkv -d /tmp/index.db -v http://vol1:8080,http://vol2:8080,http://vol3:8080,http://vol4:8080 -r 2 rebalance +``` + +### Volume servers + +Any nginx with WebDAV enabled works: + +```nginx +server { + listen 80; + root /data; + location / { + dav_methods PUT DELETE; + create_full_put_path on; + autoindex on; + autoindex_format json; + } +} +``` + +## What it does + +- **HTTP API** — PUT, GET (302 redirect), DELETE, HEAD, LIST with prefix filtering +- **Replication** — fan-out writes to N volumes concurrently, all-or-nothing with rollback +- **Consistent hashing** — stable volume assignment; adding/removing a volume only moves ~1/N of keys +- **Rebuild** — reconstructs the SQLite index by scanning nginx autoindex on all volumes +- **Rebalance** — migrates data to correct volumes after topology changes, with `--dry-run` preview +- **Key-as-path** — blobs stored at `/{key}` on nginx, no content-addressing or sidecar files +- **Single binary** — no config files, everything via CLI flags + +## What it doesn't do + +- **Checksums** — no integrity verification; bit rot goes undetected +- **Auth** — no access control; anyone who can reach the server can read/write/delete +- **Encryption** — blobs stored as plain files on nginx +- **Streaming / range requests** — entire blob must fit in memory +- **Metadata** — no EXIF, tags, or content types; key path is all you get +- **Versioning** — PUT overwrites; no history +- **Compression** — blobs stored as-is + +## Comparison to minikeyvalue + +mkv is a ground-up rewrite of [minikeyvalue](https://github.com/geohot/minikeyvalue) in Rust. + +| | mkv | minikeyvalue | +|--|-----|--------------| +| Language | Rust | Go | +| Index | SQLite (WAL mode) | LevelDB | +| Storage paths | key-as-path (`/{key}`) | content-addressed (md5 + base64) | +| GET behavior | Index lookup, 302 redirect | HEAD to volume first, then 302 redirect | +| PUT overwrite | Allowed | Forbidden (returns 403) | +| Hash function | SHA-256 per volume, sort by score | MD5 per volume, sort by score | +| MD5 of values | No | Yes (stored in index) | +| Health checker | No | No (checks per-request via HEAD) | +| Subvolumes | No | Yes (configurable fan-out directories) | +| Soft delete | No (hard delete) | Yes (UNLINK + DELETE two-phase) | +| S3 API | No | Partial (list, multipart upload) | +| App code | ~600 lines | ~1,000 lines | +| Tests | 17 (unit + integration) | 1 | + +### Performance (10k keys, 1KB values, 100 concurrency) + +Tested on the same machine with shared nginx volumes: + +| Operation | mkv | minikeyvalue | +|-----------|-----|--------------| +| PUT | 10,000 req/s | 10,500 req/s | +| GET (full round-trip) | 7,000 req/s | 6,500 req/s | +| GET (index only) | 15,800 req/s | 13,800 req/s | +| DELETE | 13,300 req/s | 13,600 req/s | + +Both are bottlenecked by nginx volume I/O. The index layer (SQLite) can sustain 378,000 writes/sec in isolation. + +## Security + +mkv assumes a **trusted network**. There is no built-in authentication, authorization, or encryption. This is the same security model as minikeyvalue — neither system is designed for direct exposure to the public internet. + +### Trust model + +The index server and volume servers (nginx) are expected to live on the same private network. GET requests return a 302 redirect to a volume URL, so clients must be able to reach the volumes directly. Anyone who can reach the index server can read, write, and delete any key. Anyone who can reach a volume can read any blob. + +### Deploying with auth + +Put a reverse proxy in front of the index server and handle authentication there: + +- **Basic auth or API keys** at the reverse proxy for simple setups +- **mTLS** for machine-to-machine access +- **OAuth / JWT** validation at the proxy for multi-user setups + +Volume servers should be on a private network that clients cannot reach directly, or use nginx's `secure_link` module to validate signed redirect URLs. + +### What neither mkv nor minikeyvalue protect against + +- Unauthorized reads/writes (no auth) +- Data in transit (no TLS unless the proxy adds it) +- Data at rest (blobs are plain files on disk) +- Malicious keys (no input sanitization beyond what nginx enforces on paths) +- Index tampering (SQLite file has no integrity protection) diff --git a/TODO.md b/TODO.md index 093fcaf..0c511d8 100644 --- a/TODO.md +++ b/TODO.md @@ -20,9 +20,9 @@ ### Worth doing -- [ ] **Typed volume errors** — replace `String` errors in `volume.rs` with a proper enum +- [x] **Typed volume errors** — replace `String` errors with a proper enum - [ ] **Unit tests for `db.rs`** — CRUD round-trip with in-memory SQLite -- [ ] **Graceful shutdown** — drain in-flight requests, flush writer +- [x] **Graceful shutdown** — drain in-flight requests on SIGINT/SIGTERM ### Nice to have diff --git a/src/error.rs b/src/error.rs index 04dad66..61eacc2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,11 +1,33 @@ use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +/// Errors from individual volume HTTP requests — used for logging, not HTTP responses. +#[derive(Debug)] +pub enum VolumeError { + Request { url: String, source: reqwest::Error }, + BadStatus { url: String, status: reqwest::StatusCode }, +} + +impl std::fmt::Display for VolumeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + VolumeError::Request { url, source } => { + write!(f, "volume request to {url} failed: {source}") + } + VolumeError::BadStatus { url, status } => { + write!(f, "volume {url} returned status {status}") + } + } + } +} + +/// Application-level errors that map to HTTP responses. #[derive(Debug)] pub enum AppError { NotFound, Db(rusqlite::Error), - VolumeError(String), + InsufficientVolumes { need: usize, have: usize }, + PartialWrite, } impl From for AppError { @@ -22,17 +44,22 @@ impl std::fmt::Display for AppError { match self { AppError::NotFound => write!(f, "not found"), AppError::Db(e) => write!(f, "database error: {e}"), - AppError::VolumeError(msg) => write!(f, "volume error: {msg}"), + AppError::InsufficientVolumes { need, have } => { + write!(f, "need {need} volumes but only {have} available") + } + AppError::PartialWrite => write!(f, "not all volume writes succeeded"), } } } impl IntoResponse for AppError { fn into_response(self) -> Response { - let (status, msg) = match &self { - AppError::NotFound => (StatusCode::NOT_FOUND, self.to_string()), - _ => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), + let status = match &self { + AppError::NotFound => StatusCode::NOT_FOUND, + AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR, + AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE, + AppError::PartialWrite => StatusCode::BAD_GATEWAY, }; - (status, msg).into_response() + (status, self.to_string()).into_response() } } diff --git a/src/main.rs b/src/main.rs index 9e72269..6925ea8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,6 +32,25 @@ enum Commands { }, } +async fn shutdown_signal() { + let ctrl_c = tokio::signal::ctrl_c(); + #[cfg(unix)] + { + let mut sigterm = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to install SIGTERM handler"); + tokio::select! { + _ = ctrl_c => tracing::info!("Received SIGINT, shutting down..."), + _ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down..."), + } + } + #[cfg(not(unix))] + { + ctrl_c.await.expect("failed to listen for ctrl-c"); + tracing::info!("Received ctrl-c, shutting down..."); + } +} + #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); @@ -49,7 +68,11 @@ async fn main() { let addr = format!("0.0.0.0:{port}"); tracing::info!("Listening on {addr}"); let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); - axum::serve(listener, app).await.unwrap(); + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal()) + .await + .unwrap(); + tracing::info!("Shutdown complete"); } Commands::Rebuild => { mkv::rebuild::run(&args).await; diff --git a/src/server.rs b/src/server.rs index bd9755f..1f8a290 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,7 +5,7 @@ use axum::response::{IntoResponse, Response}; use std::sync::Arc; use crate::db; -use crate::error::AppError; +use crate::error::{AppError, VolumeError}; #[derive(Clone)] pub struct AppState { @@ -32,11 +32,10 @@ pub async fn put_key( ) -> Result { let target_volumes = crate::hasher::volumes_for_key(&key, &state.volumes, state.replicas); if target_volumes.len() < state.replicas { - return Err(AppError::VolumeError(format!( - "need {} volumes but only {} available", - state.replicas, - target_volumes.len() - ))); + return Err(AppError::InsufficientVolumes { + need: state.replicas, + have: target_volumes.len(), + }); } // Fan out PUTs to all target volumes concurrently @@ -47,9 +46,11 @@ pub async fn put_key( let client = state.http.clone(); let data = body.clone(); async move { - let resp = client.put(&url).body(data).send().await.map_err(|e| format!("PUT {url}: {e}"))?; + let resp = client.put(&url).body(data).send().await.map_err(|e| { + VolumeError::Request { url: url.clone(), source: e } + })?; if !resp.status().is_success() { - return Err(format!("PUT {url}: status {}", resp.status())); + return Err(VolumeError::BadStatus { url, status: resp.status() }); } Ok(()) } @@ -57,20 +58,20 @@ pub async fn put_key( handles.push(handle); } - let mut all_ok = true; + let mut failed = false; for handle in handles { if let Err(e) = handle.await.unwrap() { - tracing::error!("PUT to volume failed: {e}"); - all_ok = false; + tracing::error!("{e}"); + failed = true; } } - if !all_ok { + if failed { // Rollback: best-effort delete from volumes for vol in &target_volumes { let _ = state.http.delete(format!("{vol}/{key}")).send().await; } - return Err(AppError::VolumeError("not all volume writes succeeded".into())); + return Err(AppError::PartialWrite); } let size = Some(body.len() as i64);