Make more functional
This commit is contained in:
parent
23a075382f
commit
d7c9192ebb
1 changed files with 169 additions and 55 deletions
224
src/server.rs
224
src/server.rs
|
|
@ -22,6 +22,60 @@ pub struct AppState {
|
||||||
pub config: Arc<Config>,
|
pub config: Arc<Config>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Pure decision functions ---
|
||||||
|
|
||||||
|
/// Pick the first volume from the record that appears in the healthy set.
|
||||||
|
fn pick_healthy_volume<'a>(
|
||||||
|
record_volumes: &'a [String],
|
||||||
|
healthy: &HashSet<String>,
|
||||||
|
) -> Option<&'a str> {
|
||||||
|
record_volumes
|
||||||
|
.iter()
|
||||||
|
.find(|v| healthy.contains(v.as_str()))
|
||||||
|
.map(|v| v.as_str())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compute the placement plan for a PUT: which path and which volumes.
|
||||||
|
struct PutPlan {
|
||||||
|
path: String,
|
||||||
|
target_volumes: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn plan_put(ring: &Ring, key: &str, replication: usize) -> Result<PutPlan, AppError> {
|
||||||
|
let path = Ring::key_path(key);
|
||||||
|
let target_volumes = ring.get_volumes(key, replication);
|
||||||
|
if target_volumes.len() < replication {
|
||||||
|
return Err(AppError::VolumeError(format!(
|
||||||
|
"need {replication} volumes but only {} available",
|
||||||
|
target_volumes.len()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Ok(PutPlan {
|
||||||
|
path,
|
||||||
|
target_volumes,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Evaluate fan-out PUT results. Returns the list of volumes that succeeded,
|
||||||
|
/// or an error if any failed.
|
||||||
|
fn evaluate_fanout(results: Vec<Result<(), String>>, volumes: &[String]) -> Result<Vec<String>, Vec<String>> {
|
||||||
|
let mut succeeded = Vec::new();
|
||||||
|
let mut errors = Vec::new();
|
||||||
|
for (i, result) in results.into_iter().enumerate() {
|
||||||
|
match result {
|
||||||
|
Ok(()) => succeeded.push(volumes[i].clone()),
|
||||||
|
Err(e) => errors.push(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if errors.is_empty() {
|
||||||
|
Ok(succeeded)
|
||||||
|
} else {
|
||||||
|
Err(errors)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Handlers ---
|
||||||
|
|
||||||
/// GET /:key — look up key, redirect to a healthy volume
|
/// GET /:key — look up key, redirect to a healthy volume
|
||||||
pub async fn get_key(
|
pub async fn get_key(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
|
|
@ -36,20 +90,14 @@ pub async fn get_key(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let healthy = state.healthy_volumes.read().await;
|
let healthy = state.healthy_volumes.read().await;
|
||||||
|
let vol = pick_healthy_volume(&record.volumes, &healthy).ok_or(AppError::NoHealthyVolume)?;
|
||||||
|
let location = format!("{vol}{}", record.path);
|
||||||
|
|
||||||
// Pick the first healthy volume
|
Ok((
|
||||||
for vol in &record.volumes {
|
StatusCode::FOUND,
|
||||||
if healthy.contains(vol) {
|
[(axum::http::header::LOCATION, location)],
|
||||||
let location = format!("{}{}", vol, record.path);
|
)
|
||||||
return Ok((
|
.into_response())
|
||||||
StatusCode::FOUND,
|
|
||||||
[(axum::http::header::LOCATION, location)],
|
|
||||||
)
|
|
||||||
.into_response());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(AppError::NoHealthyVolume)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// PUT /:key — store blob on volumes, record in index
|
/// PUT /:key — store blob on volumes, record in index
|
||||||
|
|
@ -58,27 +106,17 @@ pub async fn put_key(
|
||||||
Path(key): Path<String>,
|
Path(key): Path<String>,
|
||||||
body: Bytes,
|
body: Bytes,
|
||||||
) -> Result<Response, AppError> {
|
) -> Result<Response, AppError> {
|
||||||
let replication = state.config.server.replication_factor;
|
let plan = {
|
||||||
let path = Ring::key_path(&key);
|
|
||||||
|
|
||||||
let target_volumes = {
|
|
||||||
let ring = state.ring.read().await;
|
let ring = state.ring.read().await;
|
||||||
ring.get_volumes(&key, replication)
|
plan_put(&ring, &key, state.config.server.replication_factor)?
|
||||||
};
|
};
|
||||||
|
|
||||||
if target_volumes.len() < replication {
|
|
||||||
return Err(AppError::VolumeError(format!(
|
|
||||||
"need {replication} volumes but only {} available",
|
|
||||||
target_volumes.len()
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fan out PUTs to all target volumes concurrently
|
// Fan out PUTs to all target volumes concurrently
|
||||||
let mut handles = Vec::with_capacity(target_volumes.len());
|
let mut handles = Vec::with_capacity(plan.target_volumes.len());
|
||||||
for vol in &target_volumes {
|
for vol in &plan.target_volumes {
|
||||||
let client = state.volume_client.clone();
|
let client = state.volume_client.clone();
|
||||||
let vol = vol.clone();
|
let vol = vol.clone();
|
||||||
let path = path.clone();
|
let path = plan.path.clone();
|
||||||
let key = key.clone();
|
let key = key.clone();
|
||||||
let data = body.clone();
|
let data = body.clone();
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
|
|
@ -86,37 +124,33 @@ pub async fn put_key(
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut succeeded = Vec::new();
|
let mut results = Vec::with_capacity(handles.len());
|
||||||
let mut failed = false;
|
for handle in handles {
|
||||||
for (i, handle) in handles.into_iter().enumerate() {
|
results.push(handle.await.unwrap());
|
||||||
match handle.await.unwrap() {
|
}
|
||||||
Ok(()) => succeeded.push(target_volumes[i].clone()),
|
|
||||||
Err(e) => {
|
match evaluate_fanout(results, &plan.target_volumes) {
|
||||||
|
Ok(succeeded_volumes) => {
|
||||||
|
let size = Some(body.len() as i64);
|
||||||
|
state
|
||||||
|
.writer
|
||||||
|
.put(key, succeeded_volumes, plan.path, size)
|
||||||
|
.await?;
|
||||||
|
Ok(StatusCode::CREATED.into_response())
|
||||||
|
}
|
||||||
|
Err(errors) => {
|
||||||
|
for e in &errors {
|
||||||
tracing::error!("PUT to volume failed: {e}");
|
tracing::error!("PUT to volume failed: {e}");
|
||||||
failed = true;
|
|
||||||
}
|
}
|
||||||
|
// Rollback: best-effort delete from any volumes that may have succeeded
|
||||||
|
for vol in &plan.target_volumes {
|
||||||
|
let _ = state.volume_client.delete(vol, &plan.path).await;
|
||||||
|
}
|
||||||
|
Err(AppError::VolumeError(
|
||||||
|
"not all volume writes succeeded".into(),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if failed {
|
|
||||||
// Rollback: delete from volumes that succeeded
|
|
||||||
for vol in &succeeded {
|
|
||||||
if let Err(e) = state.volume_client.delete(vol, &path).await {
|
|
||||||
tracing::error!("Rollback DELETE failed: {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Err(AppError::VolumeError(
|
|
||||||
"not all volume writes succeeded".into(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let size = Some(body.len() as i64);
|
|
||||||
state
|
|
||||||
.writer
|
|
||||||
.put(key, target_volumes, path, size)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(StatusCode::CREATED.into_response())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// DELETE /:key — remove from volumes and index
|
/// DELETE /:key — remove from volumes and index
|
||||||
|
|
@ -195,3 +229,83 @@ pub async fn list_keys(
|
||||||
let body = keys.join("\n");
|
let body = keys.join("\n");
|
||||||
Ok((StatusCode::OK, body).into_response())
|
Ok((StatusCode::OK, body).into_response())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Tests for pure functions ---
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pick_healthy_volume_first_match() {
|
||||||
|
let volumes = vec!["http://vol1".into(), "http://vol2".into(), "http://vol3".into()];
|
||||||
|
let healthy: HashSet<String> = ["http://vol2".into(), "http://vol3".into()].into();
|
||||||
|
|
||||||
|
assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pick_healthy_volume_none_healthy() {
|
||||||
|
let volumes = vec!["http://vol1".into(), "http://vol2".into()];
|
||||||
|
let healthy: HashSet<String> = HashSet::new();
|
||||||
|
|
||||||
|
assert_eq!(pick_healthy_volume(&volumes, &healthy), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pick_healthy_volume_all_healthy() {
|
||||||
|
let volumes = vec!["http://vol1".into(), "http://vol2".into()];
|
||||||
|
let healthy: HashSet<String> = ["http://vol1".into(), "http://vol2".into()].into();
|
||||||
|
|
||||||
|
// Should return the first one
|
||||||
|
assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_plan_put_sufficient_volumes() {
|
||||||
|
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
|
||||||
|
let ring = Ring::new(&volumes, 50);
|
||||||
|
|
||||||
|
let plan = plan_put(&ring, "test-key", 2).unwrap();
|
||||||
|
assert_eq!(plan.target_volumes.len(), 2);
|
||||||
|
assert!(!plan.path.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_plan_put_insufficient_volumes() {
|
||||||
|
let volumes: Vec<String> = vec!["http://vol1".into()];
|
||||||
|
let ring = Ring::new(&volumes, 50);
|
||||||
|
|
||||||
|
let result = plan_put(&ring, "test-key", 2);
|
||||||
|
assert!(result.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_evaluate_fanout_all_ok() {
|
||||||
|
let volumes = vec!["http://vol1".into(), "http://vol2".into()];
|
||||||
|
let results = vec![Ok(()), Ok(())];
|
||||||
|
|
||||||
|
let outcome = evaluate_fanout(results, &volumes);
|
||||||
|
assert_eq!(outcome.unwrap(), volumes);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_evaluate_fanout_partial_failure() {
|
||||||
|
let volumes = vec!["http://vol1".into(), "http://vol2".into()];
|
||||||
|
let results = vec![Ok(()), Err("connection refused".into())];
|
||||||
|
|
||||||
|
let outcome = evaluate_fanout(results, &volumes);
|
||||||
|
let errors = outcome.unwrap_err();
|
||||||
|
assert_eq!(errors.len(), 1);
|
||||||
|
assert_eq!(errors[0], "connection refused");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_evaluate_fanout_all_fail() {
|
||||||
|
let volumes = vec!["http://vol1".into(), "http://vol2".into()];
|
||||||
|
let results = vec![Err("err1".into()), Err("err2".into())];
|
||||||
|
|
||||||
|
let outcome = evaluate_fanout(results, &volumes);
|
||||||
|
assert_eq!(outcome.unwrap_err().len(), 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue