use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use std::collections::HashSet; use std::sync::Arc; use tokio::sync::RwLock; use crate::config::Config; use crate::db; use crate::error::AppError; use crate::hasher::Ring; use crate::volume::VolumeClient; #[derive(Clone)] pub struct AppState { pub writer: db::WriterHandle, pub reads: db::ReadPool, pub ring: Arc>, pub volume_client: VolumeClient, pub healthy_volumes: Arc>>, pub config: Arc, } // --- Pure decision functions --- /// Pick the first volume from the record that appears in the healthy set. fn pick_healthy_volume<'a>( record_volumes: &'a [String], healthy: &HashSet, ) -> Option<&'a str> { record_volumes .iter() .find(|v| healthy.contains(v.as_str())) .map(|v| v.as_str()) } /// Select target volumes for a key, ensuring we have enough for replication. fn select_volumes(ring: &Ring, key: &str, replication: usize) -> Result, AppError> { let target_volumes = ring.get_volumes(key, replication); if target_volumes.len() < replication { return Err(AppError::VolumeError(format!( "need {replication} volumes but only {} available", target_volumes.len() ))); } Ok(target_volumes) } /// Evaluate fan-out results. Returns the list of volumes that succeeded, /// or the error messages if any failed. fn evaluate_fanout( results: Vec>, volumes: &[String], ) -> Result, Vec> { let mut succeeded = Vec::new(); let mut errors = Vec::new(); for (i, result) in results.into_iter().enumerate() { match result { Ok(()) => succeeded.push(volumes[i].clone()), Err(e) => errors.push(e), } } if errors.is_empty() { Ok(succeeded) } else { Err(errors) } } // --- Handlers --- /// GET /:key — look up key, redirect to a healthy volume pub async fn get_key( State(state): State, Path(key): Path, ) -> Result { let record = state .reads .query({ let key = key.clone(); move |conn| db::get(conn, &key) }) .await?; let healthy = state.healthy_volumes.read().await; let vol = pick_healthy_volume(&record.volumes, &healthy).ok_or(AppError::NoHealthyVolume)?; let location = format!("{vol}/{key}"); Ok(( StatusCode::FOUND, [(axum::http::header::LOCATION, location)], ) .into_response()) } /// PUT /:key — store blob on volumes, record in index pub async fn put_key( State(state): State, Path(key): Path, body: Bytes, ) -> Result { let target_volumes = { let ring = state.ring.read().await; select_volumes(&ring, &key, state.config.server.replication_factor)? }; // Fan out PUTs to all target volumes concurrently let mut handles = Vec::with_capacity(target_volumes.len()); for vol in &target_volumes { let client = state.volume_client.clone(); let vol = vol.clone(); let key = key.clone(); let data = body.clone(); handles.push(tokio::spawn(async move { client.put(&vol, &key, data).await })); } let mut results = Vec::with_capacity(handles.len()); for handle in handles { results.push(handle.await.unwrap()); } match evaluate_fanout(results, &target_volumes) { Ok(succeeded_volumes) => { let size = Some(body.len() as i64); state.writer.put(key, succeeded_volumes, size).await?; Ok(StatusCode::CREATED.into_response()) } Err(errors) => { for e in &errors { tracing::error!("PUT to volume failed: {e}"); } // Rollback: best-effort delete from any volumes that may have succeeded for vol in &target_volumes { let _ = state.volume_client.delete(vol, &key).await; } Err(AppError::VolumeError( "not all volume writes succeeded".into(), )) } } } /// DELETE /:key — remove from volumes and index pub async fn delete_key( State(state): State, Path(key): Path, ) -> Result { let record = state .reads .query({ let key = key.clone(); move |conn| db::get(conn, &key) }) .await?; // Fan out DELETEs concurrently let mut handles = Vec::new(); for vol in &record.volumes { let client = state.volume_client.clone(); let vol = vol.clone(); let key = key.clone(); handles.push(tokio::spawn( async move { client.delete(&vol, &key).await }, )); } for handle in handles { if let Err(e) = handle.await.unwrap() { tracing::error!("DELETE from volume failed: {e}"); } } // Remove from index regardless of volume DELETE results state.writer.delete(key).await?; Ok(StatusCode::NO_CONTENT.into_response()) } /// HEAD /:key — check if key exists, return size pub async fn head_key( State(state): State, Path(key): Path, ) -> Result { let record = state .reads .query(move |conn| db::get(conn, &key)) .await?; let mut headers = HeaderMap::new(); if let Some(size) = record.size { headers.insert( axum::http::header::CONTENT_LENGTH, size.to_string().parse().unwrap(), ); } Ok((StatusCode::OK, headers).into_response()) } #[derive(serde::Deserialize)] pub struct ListQuery { #[serde(default)] pub prefix: String, } /// GET / — list keys with optional prefix filter pub async fn list_keys( State(state): State, Query(query): Query, ) -> Result { let keys = state .reads .query(move |conn| db::list_keys(conn, &query.prefix)) .await?; let body = keys.join("\n"); Ok((StatusCode::OK, body).into_response()) } // --- Tests for pure functions --- #[cfg(test)] mod tests { use super::*; #[test] fn test_pick_healthy_volume_first_match() { let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; let healthy: HashSet = ["http://vol2".into(), "http://vol3".into()].into(); assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol2")); } #[test] fn test_pick_healthy_volume_none_healthy() { let volumes = vec!["http://vol1".into(), "http://vol2".into()]; let healthy: HashSet = HashSet::new(); assert_eq!(pick_healthy_volume(&volumes, &healthy), None); } #[test] fn test_pick_healthy_volume_all_healthy() { let volumes = vec!["http://vol1".into(), "http://vol2".into()]; let healthy: HashSet = ["http://vol1".into(), "http://vol2".into()].into(); assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol1")); } #[test] fn test_select_volumes_sufficient() { let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); let ring = Ring::new(&volumes, 50); let selected = select_volumes(&ring, "test-key", 2).unwrap(); assert_eq!(selected.len(), 2); } #[test] fn test_select_volumes_insufficient() { let volumes: Vec = vec!["http://vol1".into()]; let ring = Ring::new(&volumes, 50); assert!(select_volumes(&ring, "test-key", 2).is_err()); } #[test] fn test_evaluate_fanout_all_ok() { let volumes = vec!["http://vol1".into(), "http://vol2".into()]; let results = vec![Ok(()), Ok(())]; assert_eq!(evaluate_fanout(results, &volumes).unwrap(), volumes); } #[test] fn test_evaluate_fanout_partial_failure() { let volumes = vec!["http://vol1".into(), "http://vol2".into()]; let results = vec![Ok(()), Err("connection refused".into())]; let errors = evaluate_fanout(results, &volumes).unwrap_err(); assert_eq!(errors, vec!["connection refused"]); } #[test] fn test_evaluate_fanout_all_fail() { let volumes = vec!["http://vol1".into(), "http://vol2".into()]; let results = vec![Err("err1".into()), Err("err2".into())]; assert_eq!(evaluate_fanout(results, &volumes).unwrap_err().len(), 2); } }