use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use std::sync::Arc; use crate::db; use crate::error::{AppError, VolumeError}; #[derive(Clone)] pub struct AppState { pub db: db::Db, pub volumes: Arc>, pub replicas: usize, pub http: reqwest::Client, } /// Result of probing volumes for a key. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ProbeResult { /// Found a healthy volume at this URL Found(String), /// All volumes were probed, none healthy AllFailed, } /// Pure function: given volumes and their probe results (true = healthy), /// returns the first healthy volume URL for the key, or None. pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult { for (vol, &healthy) in volumes.iter().zip(results) { if healthy { return ProbeResult::Found(format!("{vol}/{key}")); } } ProbeResult::AllFailed } /// Pure function: shuffle volumes for load balancing. /// Takes a seed for deterministic testing. pub fn shuffle_volumes(volumes: Vec, seed: u64) -> Vec { use rand::seq::SliceRandom; use rand::SeedableRng; let mut rng = rand::rngs::StdRng::seed_from_u64(seed); let mut vols = volumes; vols.shuffle(&mut rng); vols } pub async fn get_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; if record.volumes.is_empty() { return Err(AppError::CorruptRecord { key }); } // Shuffle for load balancing (random seed in production) let seed = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_nanos() as u64) .unwrap_or(0); let volumes = shuffle_volumes(record.volumes, seed); // Probe volumes and collect results let mut results = Vec::with_capacity(volumes.len()); for vol in &volumes { let url = format!("{vol}/{key}"); let healthy = match state.http.head(&url).send().await { Ok(resp) if resp.status().is_success() => true, Ok(resp) => { tracing::warn!("volume {vol} returned {} for {key}", resp.status()); false } Err(e) => { tracing::warn!("volume {vol} unreachable for {key}: {e}"); false } }; results.push(healthy); // Early exit on first healthy volume if healthy { break; } } match first_healthy_volume(&key, &volumes, &results) { ProbeResult::Found(url) => Ok(( StatusCode::FOUND, [(axum::http::header::LOCATION, url)], ) .into_response()), ProbeResult::AllFailed => Err(AppError::AllVolumesUnreachable), } } pub async fn put_key( State(state): State, Path(key): Path, body: Bytes, ) -> Result { let target_volumes = crate::hasher::volumes_for_key(&key, &state.volumes, state.replicas); if target_volumes.len() < state.replicas { return Err(AppError::InsufficientVolumes { need: state.replicas, have: target_volumes.len(), }); } // Fan out PUTs to all target volumes concurrently let mut handles = Vec::with_capacity(target_volumes.len()); for vol in &target_volumes { let url = format!("{vol}/{key}"); let handle = tokio::spawn({ let client = state.http.clone(); let data = body.clone(); async move { let resp = client.put(&url).body(data).send().await.map_err(|e| { VolumeError::Request { url: url.clone(), source: e, } })?; if !resp.status().is_success() { return Err(VolumeError::BadStatus { url, status: resp.status(), }); } Ok(()) } }); handles.push(handle); } let mut failed = false; for handle in handles { match handle.await { Ok(Err(e)) => { tracing::error!("{e}"); failed = true; } Err(e) => { tracing::error!("volume write task failed: {e}"); failed = true; } Ok(Ok(())) => {} } } if failed { // Rollback: best-effort delete from volumes for vol in &target_volumes { let _ = state.http.delete(format!("{vol}/{key}")).send().await; } return Err(AppError::PartialWrite); } let size = Some(body.len() as i64); if let Err(e) = state .db .put(key.clone(), target_volumes.clone(), size) .await { for vol in &target_volumes { let _ = state.http.delete(format!("{vol}/{key}")).send().await; } return Err(e); } Ok(StatusCode::CREATED.into_response()) } pub async fn delete_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; let mut handles = Vec::new(); for vol in &record.volumes { let url = format!("{vol}/{key}"); let handle = tokio::spawn({ let client = state.http.clone(); async move { let resp = client .delete(&url) .send() .await .map_err(|e| VolumeError::Request { url: url.clone(), source: e, })?; if !resp.status().is_success() { return Err(VolumeError::BadStatus { url, status: resp.status(), }); } Ok(()) } }); handles.push(handle); } for handle in handles { match handle.await { Ok(Err(e)) => tracing::error!("{e}"), Err(e) => tracing::error!("volume delete task failed: {e}"), Ok(Ok(())) => {} } } state.db.delete(key).await?; Ok(StatusCode::NO_CONTENT.into_response()) } pub async fn head_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; let mut headers = HeaderMap::new(); if let Some(size) = record.size { headers.insert( axum::http::header::CONTENT_LENGTH, size.to_string().parse().unwrap(), ); } Ok((StatusCode::OK, headers).into_response()) } #[derive(serde::Deserialize)] pub struct ListQuery { #[serde(default)] pub prefix: String, } pub async fn list_keys( State(state): State, Query(query): Query, ) -> Result { let keys = state.db.list_keys(&query.prefix).await?; Ok((StatusCode::OK, keys.join("\n")).into_response()) } // Note: first_healthy_volume and shuffle_volumes are trivial functions // (essentially .find() and .shuffle()). Testing them would just test // that standard library functions work. The real test is integration: // does failover work with actual down volumes?