Improve typing and errors, clean up

This commit is contained in:
Silas Brack 2026-03-07 15:24:05 +01:00
parent 07490efc28
commit ec408aff29
5 changed files with 205 additions and 22 deletions

View file

@ -1,11 +1,33 @@
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
/// Errors from individual volume HTTP requests — used for logging, not HTTP responses.
#[derive(Debug)]
pub enum VolumeError {
Request { url: String, source: reqwest::Error },
BadStatus { url: String, status: reqwest::StatusCode },
}
impl std::fmt::Display for VolumeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
VolumeError::Request { url, source } => {
write!(f, "volume request to {url} failed: {source}")
}
VolumeError::BadStatus { url, status } => {
write!(f, "volume {url} returned status {status}")
}
}
}
}
/// Application-level errors that map to HTTP responses.
#[derive(Debug)]
pub enum AppError {
NotFound,
Db(rusqlite::Error),
VolumeError(String),
InsufficientVolumes { need: usize, have: usize },
PartialWrite,
}
impl From<rusqlite::Error> for AppError {
@ -22,17 +44,22 @@ impl std::fmt::Display for AppError {
match self {
AppError::NotFound => write!(f, "not found"),
AppError::Db(e) => write!(f, "database error: {e}"),
AppError::VolumeError(msg) => write!(f, "volume error: {msg}"),
AppError::InsufficientVolumes { need, have } => {
write!(f, "need {need} volumes but only {have} available")
}
AppError::PartialWrite => write!(f, "not all volume writes succeeded"),
}
}
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, msg) = match &self {
AppError::NotFound => (StatusCode::NOT_FOUND, self.to_string()),
_ => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
let status = match &self {
AppError::NotFound => StatusCode::NOT_FOUND,
AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR,
AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE,
AppError::PartialWrite => StatusCode::BAD_GATEWAY,
};
(status, msg).into_response()
(status, self.to_string()).into_response()
}
}

View file

@ -32,6 +32,25 @@ enum Commands {
},
}
async fn shutdown_signal() {
let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)]
{
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install SIGTERM handler");
tokio::select! {
_ = ctrl_c => tracing::info!("Received SIGINT, shutting down..."),
_ = sigterm.recv() => tracing::info!("Received SIGTERM, shutting down..."),
}
}
#[cfg(not(unix))]
{
ctrl_c.await.expect("failed to listen for ctrl-c");
tracing::info!("Received ctrl-c, shutting down...");
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
@ -49,7 +68,11 @@ async fn main() {
let addr = format!("0.0.0.0:{port}");
tracing::info!("Listening on {addr}");
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.unwrap();
tracing::info!("Shutdown complete");
}
Commands::Rebuild => {
mkv::rebuild::run(&args).await;

View file

@ -5,7 +5,7 @@ use axum::response::{IntoResponse, Response};
use std::sync::Arc;
use crate::db;
use crate::error::AppError;
use crate::error::{AppError, VolumeError};
#[derive(Clone)]
pub struct AppState {
@ -32,11 +32,10 @@ pub async fn put_key(
) -> Result<Response, AppError> {
let target_volumes = crate::hasher::volumes_for_key(&key, &state.volumes, state.replicas);
if target_volumes.len() < state.replicas {
return Err(AppError::VolumeError(format!(
"need {} volumes but only {} available",
state.replicas,
target_volumes.len()
)));
return Err(AppError::InsufficientVolumes {
need: state.replicas,
have: target_volumes.len(),
});
}
// Fan out PUTs to all target volumes concurrently
@ -47,9 +46,11 @@ pub async fn put_key(
let client = state.http.clone();
let data = body.clone();
async move {
let resp = client.put(&url).body(data).send().await.map_err(|e| format!("PUT {url}: {e}"))?;
let resp = client.put(&url).body(data).send().await.map_err(|e| {
VolumeError::Request { url: url.clone(), source: e }
})?;
if !resp.status().is_success() {
return Err(format!("PUT {url}: status {}", resp.status()));
return Err(VolumeError::BadStatus { url, status: resp.status() });
}
Ok(())
}
@ -57,20 +58,20 @@ pub async fn put_key(
handles.push(handle);
}
let mut all_ok = true;
let mut failed = false;
for handle in handles {
if let Err(e) = handle.await.unwrap() {
tracing::error!("PUT to volume failed: {e}");
all_ok = false;
tracing::error!("{e}");
failed = true;
}
}
if !all_ok {
if failed {
// Rollback: best-effort delete from volumes
for vol in &target_volumes {
let _ = state.http.delete(format!("{vol}/{key}")).send().await;
}
return Err(AppError::VolumeError("not all volume writes succeeded".into()));
return Err(AppError::PartialWrite);
}
let size = Some(body.len() as i64);