use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use std::sync::Arc; use crate::db; use crate::error::{AppError, VolumeError}; #[derive(Clone)] pub struct AppState { pub db: db::Db, pub volumes: Arc>, pub replicas: usize, pub http: reqwest::Client, } pub async fn get_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; let vol = record .volumes .first() .ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?; let location = format!("{vol}/{key}"); Ok(( StatusCode::FOUND, [(axum::http::header::LOCATION, location)], ) .into_response()) } pub async fn put_key( State(state): State, Path(key): Path, body: Bytes, ) -> Result { let target_volumes = crate::hasher::volumes_for_key(&key, &state.volumes, state.replicas); if target_volumes.len() < state.replicas { return Err(AppError::InsufficientVolumes { need: state.replicas, have: target_volumes.len(), }); } // Fan out PUTs to all target volumes concurrently let mut handles = Vec::with_capacity(target_volumes.len()); for vol in &target_volumes { let url = format!("{vol}/{key}"); let handle = tokio::spawn({ let client = state.http.clone(); let data = body.clone(); async move { let resp = client.put(&url).body(data).send().await.map_err(|e| { VolumeError::Request { url: url.clone(), source: e, } })?; if !resp.status().is_success() { return Err(VolumeError::BadStatus { url, status: resp.status(), }); } Ok(()) } }); handles.push(handle); } let mut failed = false; for handle in handles { match handle.await { Ok(Err(e)) => { tracing::error!("{e}"); failed = true; } Err(e) => { tracing::error!("volume write task failed: {e}"); failed = true; } Ok(Ok(())) => {} } } if failed { // Rollback: best-effort delete from volumes for vol in &target_volumes { let _ = state.http.delete(format!("{vol}/{key}")).send().await; } return Err(AppError::PartialWrite); } let size = Some(body.len() as i64); if let Err(e) = state .db .put(key.clone(), target_volumes.clone(), size) .await { for vol in &target_volumes { let _ = state.http.delete(format!("{vol}/{key}")).send().await; } return Err(e); } Ok(StatusCode::CREATED.into_response()) } pub async fn delete_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; let mut handles = Vec::new(); for vol in &record.volumes { let url = format!("{vol}/{key}"); let handle = tokio::spawn({ let client = state.http.clone(); async move { let resp = client .delete(&url) .send() .await .map_err(|e| VolumeError::Request { url: url.clone(), source: e, })?; if !resp.status().is_success() { return Err(VolumeError::BadStatus { url, status: resp.status(), }); } Ok(()) } }); handles.push(handle); } for handle in handles { match handle.await { Ok(Err(e)) => tracing::error!("{e}"), Err(e) => tracing::error!("volume delete task failed: {e}"), Ok(Ok(())) => {} } } state.db.delete(key).await?; Ok(StatusCode::NO_CONTENT.into_response()) } pub async fn head_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; let mut headers = HeaderMap::new(); if let Some(size) = record.size { headers.insert( axum::http::header::CONTENT_LENGTH, size.to_string().parse().unwrap(), ); } Ok((StatusCode::OK, headers).into_response()) } #[derive(serde::Deserialize)] pub struct ListQuery { #[serde(default)] pub prefix: String, } pub async fn list_keys( State(state): State, Query(query): Query, ) -> Result { let keys = state.db.list_keys(&query.prefix).await?; Ok((StatusCode::OK, keys.join("\n")).into_response()) } #[cfg(test)] mod tests { #[test] fn test_volumes_for_key_sufficient() { let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); assert_eq!(selected.len(), 2); } #[test] fn test_volumes_for_key_insufficient() { let volumes: Vec = vec!["http://vol1".into()]; let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); assert_eq!(selected.len(), 1); } }