diff --git a/src/server.rs b/src/server.rs index 4abeb2c..45c2fd7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -22,6 +22,60 @@ pub struct AppState { pub config: Arc, } +// --- Pure decision functions --- + +/// Pick the first volume from the record that appears in the healthy set. +fn pick_healthy_volume<'a>( + record_volumes: &'a [String], + healthy: &HashSet, +) -> Option<&'a str> { + record_volumes + .iter() + .find(|v| healthy.contains(v.as_str())) + .map(|v| v.as_str()) +} + +/// Compute the placement plan for a PUT: which path and which volumes. +struct PutPlan { + path: String, + target_volumes: Vec, +} + +fn plan_put(ring: &Ring, key: &str, replication: usize) -> Result { + let path = Ring::key_path(key); + let target_volumes = ring.get_volumes(key, replication); + if target_volumes.len() < replication { + return Err(AppError::VolumeError(format!( + "need {replication} volumes but only {} available", + target_volumes.len() + ))); + } + Ok(PutPlan { + path, + target_volumes, + }) +} + +/// Evaluate fan-out PUT results. Returns the list of volumes that succeeded, +/// or an error if any failed. +fn evaluate_fanout(results: Vec>, volumes: &[String]) -> Result, Vec> { + let mut succeeded = Vec::new(); + let mut errors = Vec::new(); + for (i, result) in results.into_iter().enumerate() { + match result { + Ok(()) => succeeded.push(volumes[i].clone()), + Err(e) => errors.push(e), + } + } + if errors.is_empty() { + Ok(succeeded) + } else { + Err(errors) + } +} + +// --- Handlers --- + /// GET /:key — look up key, redirect to a healthy volume pub async fn get_key( State(state): State, @@ -36,20 +90,14 @@ pub async fn get_key( .await?; let healthy = state.healthy_volumes.read().await; + let vol = pick_healthy_volume(&record.volumes, &healthy).ok_or(AppError::NoHealthyVolume)?; + let location = format!("{vol}{}", record.path); - // Pick the first healthy volume - for vol in &record.volumes { - if healthy.contains(vol) { - let location = format!("{}{}", vol, record.path); - return Ok(( - StatusCode::FOUND, - [(axum::http::header::LOCATION, location)], - ) - .into_response()); - } - } - - Err(AppError::NoHealthyVolume) + Ok(( + StatusCode::FOUND, + [(axum::http::header::LOCATION, location)], + ) + .into_response()) } /// PUT /:key — store blob on volumes, record in index @@ -58,27 +106,17 @@ pub async fn put_key( Path(key): Path, body: Bytes, ) -> Result { - let replication = state.config.server.replication_factor; - let path = Ring::key_path(&key); - - let target_volumes = { + let plan = { let ring = state.ring.read().await; - ring.get_volumes(&key, replication) + plan_put(&ring, &key, state.config.server.replication_factor)? }; - if target_volumes.len() < replication { - return Err(AppError::VolumeError(format!( - "need {replication} volumes but only {} available", - target_volumes.len() - ))); - } - // Fan out PUTs to all target volumes concurrently - let mut handles = Vec::with_capacity(target_volumes.len()); - for vol in &target_volumes { + let mut handles = Vec::with_capacity(plan.target_volumes.len()); + for vol in &plan.target_volumes { let client = state.volume_client.clone(); let vol = vol.clone(); - let path = path.clone(); + let path = plan.path.clone(); let key = key.clone(); let data = body.clone(); handles.push(tokio::spawn(async move { @@ -86,37 +124,33 @@ pub async fn put_key( })); } - let mut succeeded = Vec::new(); - let mut failed = false; - for (i, handle) in handles.into_iter().enumerate() { - match handle.await.unwrap() { - Ok(()) => succeeded.push(target_volumes[i].clone()), - Err(e) => { + let mut results = Vec::with_capacity(handles.len()); + for handle in handles { + results.push(handle.await.unwrap()); + } + + match evaluate_fanout(results, &plan.target_volumes) { + Ok(succeeded_volumes) => { + let size = Some(body.len() as i64); + state + .writer + .put(key, succeeded_volumes, plan.path, size) + .await?; + Ok(StatusCode::CREATED.into_response()) + } + Err(errors) => { + for e in &errors { tracing::error!("PUT to volume failed: {e}"); - failed = true; } + // Rollback: best-effort delete from any volumes that may have succeeded + for vol in &plan.target_volumes { + let _ = state.volume_client.delete(vol, &plan.path).await; + } + Err(AppError::VolumeError( + "not all volume writes succeeded".into(), + )) } } - - if failed { - // Rollback: delete from volumes that succeeded - for vol in &succeeded { - if let Err(e) = state.volume_client.delete(vol, &path).await { - tracing::error!("Rollback DELETE failed: {e}"); - } - } - return Err(AppError::VolumeError( - "not all volume writes succeeded".into(), - )); - } - - let size = Some(body.len() as i64); - state - .writer - .put(key, target_volumes, path, size) - .await?; - - Ok(StatusCode::CREATED.into_response()) } /// DELETE /:key — remove from volumes and index @@ -195,3 +229,83 @@ pub async fn list_keys( let body = keys.join("\n"); Ok((StatusCode::OK, body).into_response()) } + +// --- Tests for pure functions --- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pick_healthy_volume_first_match() { + let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()]; + let healthy: HashSet = ["http://vol2".into(), "http://vol3".into()].into(); + + assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol2")); + } + + #[test] + fn test_pick_healthy_volume_none_healthy() { + let volumes = vec!["http://vol1".into(), "http://vol2".into()]; + let healthy: HashSet = HashSet::new(); + + assert_eq!(pick_healthy_volume(&volumes, &healthy), None); + } + + #[test] + fn test_pick_healthy_volume_all_healthy() { + let volumes = vec!["http://vol1".into(), "http://vol2".into()]; + let healthy: HashSet = ["http://vol1".into(), "http://vol2".into()].into(); + + // Should return the first one + assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol1")); + } + + #[test] + fn test_plan_put_sufficient_volumes() { + let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); + let ring = Ring::new(&volumes, 50); + + let plan = plan_put(&ring, "test-key", 2).unwrap(); + assert_eq!(plan.target_volumes.len(), 2); + assert!(!plan.path.is_empty()); + } + + #[test] + fn test_plan_put_insufficient_volumes() { + let volumes: Vec = vec!["http://vol1".into()]; + let ring = Ring::new(&volumes, 50); + + let result = plan_put(&ring, "test-key", 2); + assert!(result.is_err()); + } + + #[test] + fn test_evaluate_fanout_all_ok() { + let volumes = vec!["http://vol1".into(), "http://vol2".into()]; + let results = vec![Ok(()), Ok(())]; + + let outcome = evaluate_fanout(results, &volumes); + assert_eq!(outcome.unwrap(), volumes); + } + + #[test] + fn test_evaluate_fanout_partial_failure() { + let volumes = vec!["http://vol1".into(), "http://vol2".into()]; + let results = vec![Ok(()), Err("connection refused".into())]; + + let outcome = evaluate_fanout(results, &volumes); + let errors = outcome.unwrap_err(); + assert_eq!(errors.len(), 1); + assert_eq!(errors[0], "connection refused"); + } + + #[test] + fn test_evaluate_fanout_all_fail() { + let volumes = vec!["http://vol1".into(), "http://vol2".into()]; + let results = vec![Err("err1".into()), Err("err2".into())]; + + let outcome = evaluate_fanout(results, &volumes); + assert_eq!(outcome.unwrap_err().len(), 2); + } +}