Allow for reads if one volume is down
This commit is contained in:
parent
640f9afba5
commit
1fc59674f5
4 changed files with 70 additions and 14 deletions
39
Cargo.lock
generated
39
Cargo.lock
generated
|
|
@ -774,6 +774,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"axum",
|
"axum",
|
||||||
"clap",
|
"clap",
|
||||||
|
"rand 0.8.5",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"rusqlite",
|
"rusqlite",
|
||||||
"serde",
|
"serde",
|
||||||
|
|
@ -908,7 +909,7 @@ dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"getrandom 0.3.4",
|
"getrandom 0.3.4",
|
||||||
"lru-slab",
|
"lru-slab",
|
||||||
"rand",
|
"rand 0.9.2",
|
||||||
"ring",
|
"ring",
|
||||||
"rustc-hash",
|
"rustc-hash",
|
||||||
"rustls",
|
"rustls",
|
||||||
|
|
@ -949,14 +950,35 @@ version = "5.3.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
|
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rand"
|
||||||
|
version = "0.8.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"rand_chacha 0.3.1",
|
||||||
|
"rand_core 0.6.4",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rand"
|
name = "rand"
|
||||||
version = "0.9.2"
|
version = "0.9.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
|
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"rand_chacha",
|
"rand_chacha 0.9.0",
|
||||||
"rand_core",
|
"rand_core 0.9.5",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rand_chacha"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
|
||||||
|
dependencies = [
|
||||||
|
"ppv-lite86",
|
||||||
|
"rand_core 0.6.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
@ -966,7 +988,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
|
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ppv-lite86",
|
"ppv-lite86",
|
||||||
"rand_core",
|
"rand_core 0.9.5",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rand_core"
|
||||||
|
version = "0.6.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
|
||||||
|
dependencies = [
|
||||||
|
"getrandom 0.2.17",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ clap = { version = "4", features = ["derive", "env"] }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
sha2 = "0.10"
|
sha2 = "0.10"
|
||||||
|
rand = "0.8"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
opt-level = 3
|
opt-level = 3
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ pub enum AppError {
|
||||||
Db(rusqlite::Error),
|
Db(rusqlite::Error),
|
||||||
InsufficientVolumes { need: usize, have: usize },
|
InsufficientVolumes { need: usize, have: usize },
|
||||||
PartialWrite,
|
PartialWrite,
|
||||||
|
AllVolumesUnreachable,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<rusqlite::Error> for AppError {
|
impl From<rusqlite::Error> for AppError {
|
||||||
|
|
@ -58,6 +59,7 @@ impl std::fmt::Display for AppError {
|
||||||
write!(f, "need {need} volumes but only {have} available")
|
write!(f, "need {need} volumes but only {have} available")
|
||||||
}
|
}
|
||||||
AppError::PartialWrite => write!(f, "not all volume writes succeeded"),
|
AppError::PartialWrite => write!(f, "not all volume writes succeeded"),
|
||||||
|
AppError::AllVolumesUnreachable => write!(f, "all volume replicas are unreachable"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -70,6 +72,7 @@ impl IntoResponse for AppError {
|
||||||
AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE,
|
AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE,
|
||||||
AppError::PartialWrite => StatusCode::BAD_GATEWAY,
|
AppError::PartialWrite => StatusCode::BAD_GATEWAY,
|
||||||
|
AppError::AllVolumesUnreachable => StatusCode::BAD_GATEWAY,
|
||||||
};
|
};
|
||||||
(status, self.to_string()).into_response()
|
(status, self.to_string()).into_response()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,17 +19,38 @@ pub async fn get_key(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Path(key): Path<String>,
|
Path(key): Path<String>,
|
||||||
) -> Result<Response, AppError> {
|
) -> Result<Response, AppError> {
|
||||||
|
use rand::seq::SliceRandom;
|
||||||
|
|
||||||
let record = state.db.get(&key).await?;
|
let record = state.db.get(&key).await?;
|
||||||
let vol = record
|
if record.volumes.is_empty() {
|
||||||
.volumes
|
return Err(AppError::CorruptRecord { key });
|
||||||
.first()
|
}
|
||||||
.ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?;
|
|
||||||
let location = format!("{vol}/{key}");
|
// Shuffle volumes for load balancing
|
||||||
Ok((
|
let mut volumes = record.volumes.clone();
|
||||||
StatusCode::FOUND,
|
volumes.shuffle(&mut rand::thread_rng());
|
||||||
[(axum::http::header::LOCATION, location)],
|
|
||||||
)
|
// Probe each volume until we find one that's reachable
|
||||||
.into_response())
|
for vol in &volumes {
|
||||||
|
let url = format!("{vol}/{key}");
|
||||||
|
match state.http.head(&url).send().await {
|
||||||
|
Ok(resp) if resp.status().is_success() => {
|
||||||
|
return Ok((
|
||||||
|
StatusCode::FOUND,
|
||||||
|
[(axum::http::header::LOCATION, url)],
|
||||||
|
)
|
||||||
|
.into_response());
|
||||||
|
}
|
||||||
|
Ok(resp) => {
|
||||||
|
tracing::warn!("volume {vol} returned {} for {key}", resp.status());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("volume {vol} unreachable for {key}: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(AppError::AllVolumesUnreachable)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn put_key(
|
pub async fn put_key(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue