use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use std::sync::Arc; use crate::db; use crate::error::{AppError, VolumeError}; #[derive(Clone)] pub struct AppState { pub db: db::Db, pub volumes: Arc>, pub replicas: usize, pub http: reqwest::Client, } /// Result of probing volumes for a key. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ProbeResult { /// Found a healthy volume at this URL Found(String), /// All volumes were probed, none healthy AllFailed, } /// Pure function: given volumes and their probe results (true = healthy), /// returns the first healthy volume URL for the key, or None. pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult { for (vol, &healthy) in volumes.iter().zip(results) { if healthy { return ProbeResult::Found(format!("{vol}/{key}")); } } ProbeResult::AllFailed } /// Pure function: shuffle volumes for load balancing. /// Takes a seed for deterministic testing. pub fn shuffle_volumes(volumes: Vec, seed: u64) -> Vec { use rand::seq::SliceRandom; use rand::SeedableRng; let mut rng = rand::rngs::StdRng::seed_from_u64(seed); let mut vols = volumes; vols.shuffle(&mut rng); vols } pub async fn get_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; if record.volumes.is_empty() { return Err(AppError::CorruptRecord { key }); } // Shuffle for load balancing (random seed in production) let seed = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_nanos() as u64) .unwrap_or(0); let volumes = shuffle_volumes(record.volumes, seed); // Probe volumes and collect results let mut results = Vec::with_capacity(volumes.len()); for vol in &volumes { let url = format!("{vol}/{key}"); let healthy = match state.http.head(&url).send().await { Ok(resp) if resp.status().is_success() => true, Ok(resp) => { tracing::warn!("volume {vol} returned {} for {key}", resp.status()); false } Err(e) => { tracing::warn!("volume {vol} unreachable for {key}: {e}"); false } }; results.push(healthy); // Early exit on first healthy volume if healthy { break; } } match first_healthy_volume(&key, &volumes, &results) { ProbeResult::Found(url) => Ok(( StatusCode::FOUND, [(axum::http::header::LOCATION, url)], ) .into_response()), ProbeResult::AllFailed => Err(AppError::AllVolumesUnreachable), } } pub async fn put_key( State(state): State, Path(key): Path, body: Bytes, ) -> Result { let target_volumes = crate::hasher::volumes_for_key(&key, &state.volumes, state.replicas); if target_volumes.len() < state.replicas { return Err(AppError::InsufficientVolumes { need: state.replicas, have: target_volumes.len(), }); } // Fan out PUTs to all target volumes concurrently let mut handles = Vec::with_capacity(target_volumes.len()); for vol in &target_volumes { let url = format!("{vol}/{key}"); let handle = tokio::spawn({ let client = state.http.clone(); let data = body.clone(); async move { let resp = client.put(&url).body(data).send().await.map_err(|e| { VolumeError::Request { url: url.clone(), source: e, } })?; if !resp.status().is_success() { return Err(VolumeError::BadStatus { url, status: resp.status(), }); } Ok(()) } }); handles.push(handle); } let mut failed = false; for handle in handles { match handle.await { Ok(Err(e)) => { tracing::error!("{e}"); failed = true; } Err(e) => { tracing::error!("volume write task failed: {e}"); failed = true; } Ok(Ok(())) => {} } } if failed { // Rollback: best-effort delete from volumes for vol in &target_volumes { let _ = state.http.delete(format!("{vol}/{key}")).send().await; } return Err(AppError::PartialWrite); } let size = Some(body.len() as i64); if let Err(e) = state .db .put(key.clone(), target_volumes.clone(), size) .await { for vol in &target_volumes { let _ = state.http.delete(format!("{vol}/{key}")).send().await; } return Err(e); } Ok(StatusCode::CREATED.into_response()) } pub async fn delete_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; let mut handles = Vec::new(); for vol in &record.volumes { let url = format!("{vol}/{key}"); let handle = tokio::spawn({ let client = state.http.clone(); async move { let resp = client .delete(&url) .send() .await .map_err(|e| VolumeError::Request { url: url.clone(), source: e, })?; if !resp.status().is_success() { return Err(VolumeError::BadStatus { url, status: resp.status(), }); } Ok(()) } }); handles.push(handle); } for handle in handles { match handle.await { Ok(Err(e)) => tracing::error!("{e}"), Err(e) => tracing::error!("volume delete task failed: {e}"), Ok(Ok(())) => {} } } state.db.delete(key).await?; Ok(StatusCode::NO_CONTENT.into_response()) } pub async fn head_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; let mut headers = HeaderMap::new(); if let Some(size) = record.size { headers.insert( axum::http::header::CONTENT_LENGTH, size.to_string().parse().unwrap(), ); } Ok((StatusCode::OK, headers).into_response()) } #[derive(serde::Deserialize)] pub struct ListQuery { #[serde(default)] pub prefix: String, } pub async fn list_keys( State(state): State, Query(query): Query, ) -> Result { let keys = state.db.list_keys(&query.prefix).await?; Ok((StatusCode::OK, keys.join("\n")).into_response()) } #[cfg(test)] mod tests { use super::*; #[test] fn test_volumes_for_key_sufficient() { let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); assert_eq!(selected.len(), 2); } #[test] fn test_volumes_for_key_insufficient() { let volumes: Vec = vec!["http://vol1".into()]; let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); assert_eq!(selected.len(), 1); } #[test] fn test_first_healthy_volume_finds_first() { let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; let results = vec![true, true, true]; assert_eq!( first_healthy_volume("key", &volumes, &results), ProbeResult::Found("http://vol1/key".into()) ); } #[test] fn test_first_healthy_volume_skips_unhealthy() { let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; let results = vec![false, false, true]; assert_eq!( first_healthy_volume("key", &volumes, &results), ProbeResult::Found("http://vol3/key".into()) ); } #[test] fn test_first_healthy_volume_all_failed() { let volumes = vec!["http://vol1".into(), "http://vol2".into()]; let results = vec![false, false]; assert_eq!( first_healthy_volume("key", &volumes, &results), ProbeResult::AllFailed ); } #[test] fn test_first_healthy_volume_early_exit() { // Simulates early exit: only first two volumes were probed let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; let results = vec![false, true]; // Only 2 results because we stopped early assert_eq!( first_healthy_volume("key", &volumes, &results), ProbeResult::Found("http://vol2/key".into()) ); } #[test] fn test_shuffle_volumes_deterministic_with_seed() { let volumes: Vec = (1..=5).map(|i| format!("http://vol{i}")).collect(); let a = shuffle_volumes(volumes.clone(), 42); let b = shuffle_volumes(volumes.clone(), 42); assert_eq!(a, b, "same seed should produce same order"); } #[test] fn test_shuffle_volumes_different_seeds() { let volumes: Vec = (1..=10).map(|i| format!("http://vol{i}")).collect(); let a = shuffle_volumes(volumes.clone(), 1); let b = shuffle_volumes(volumes.clone(), 2); assert_ne!(a, b, "different seeds should produce different orders"); } #[test] fn test_shuffle_volumes_preserves_elements() { let volumes: Vec = (1..=5).map(|i| format!("http://vol{i}")).collect(); let mut shuffled = shuffle_volumes(volumes.clone(), 123); shuffled.sort(); let mut original = volumes; original.sort(); assert_eq!(shuffled, original); } }