diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 06f7766..83169c3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,29 +8,82 @@ permissions: contents: write jobs: - build: - name: Build packages - runs-on: ubuntu-latest + build-nix: + name: Build ${{ matrix.target }} + runs-on: ${{ matrix.runner }} strategy: fail-fast: false matrix: include: - - system: x86_64-linux + - target: x86_64-unknown-linux-gnu + runner: ubuntu-latest + system: x86_64-linux + package: default + - target: x86_64-unknown-linux-musl + runner: ubuntu-latest + system: x86_64-linux + package: x86_64-linux-musl + - target: aarch64-unknown-linux-gnu + runner: ubuntu-latest + system: x86_64-linux + package: aarch64-linux + - target: x86_64-apple-darwin + runner: macos-13 + system: x86_64-darwin + package: default + - target: aarch64-apple-darwin + runner: macos-latest + system: aarch64-darwin + package: default steps: - uses: actions/checkout@v4 - uses: DeterminateSystems/nix-installer-action@main - # - uses: DeterminateSystems/magic-nix-cache-action@main - - run: nix --print-build-logs build .#packages.$SYSTEM.default - env: - SYSTEM: ${{ matrix.system }} + - run: nix --print-build-logs build .#packages.${{ matrix.system }}.${{ matrix.package }} - name: Prepare release artifact if: startsWith(github.ref, 'refs/tags/') - run: cp result/bin/mkv mkv-${{ matrix.system }} + run: cp result/bin/mkv mkv-${{ matrix.target }} + + - name: Upload artifact + if: startsWith(github.ref, 'refs/tags/') + uses: actions/upload-artifact@v4 + with: + name: mkv-${{ matrix.target }} + path: mkv-${{ matrix.target }} + + build-windows: + name: Build x86_64-pc-windows-msvc + runs-on: windows-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + + - run: cargo build --release + + - name: Prepare release artifact + if: startsWith(github.ref, 'refs/tags/') + run: cp target/release/mkv.exe mkv-x86_64-pc-windows-msvc.exe + + - name: Upload artifact + if: startsWith(github.ref, 'refs/tags/') + uses: actions/upload-artifact@v4 + with: + name: mkv-x86_64-pc-windows-msvc + path: mkv-x86_64-pc-windows-msvc.exe + + release: + name: Create release + needs: [build-nix, build-windows] + runs-on: ubuntu-latest + if: startsWith(github.ref, 'refs/tags/') + steps: + - uses: actions/download-artifact@v4 + with: + path: artifacts + merge-multiple: true - name: Create release - if: startsWith(github.ref, 'refs/tags/') uses: softprops/action-gh-release@v2 with: - files: mkv-${{ matrix.system }} + files: artifacts/* diff --git a/.gitignore b/.gitignore index d875240..a4e7adf 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ *.db* /result *~ +\#* +.\#* diff --git a/Cargo.lock b/Cargo.lock index 73e1f42..d697dbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -774,6 +774,7 @@ version = "0.1.0" dependencies = [ "axum", "clap", + "rand 0.8.5", "reqwest", "rusqlite", "serde", @@ -908,7 +909,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lru-slab", - "rand", + "rand 0.9.2", "ring", "rustc-hash", "rustls", @@ -949,14 +950,35 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha", - "rand_core", + "rand_chacha 0.9.0", + "rand_core 0.9.5", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -966,7 +988,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.5", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.17", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b074b32..4498eb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ clap = { version = "4", features = ["derive", "env"] } tracing = "0.1" tracing-subscriber = "0.3" sha2 = "0.10" +rand = "0.8" [profile.release] opt-level = 3 diff --git a/flake.nix b/flake.nix index 3195bf6..5b47279 100644 --- a/flake.nix +++ b/flake.nix @@ -12,14 +12,17 @@ darwinBuildInputs = pkgs.lib.optionals pkgs.stdenv.hostPlatform.isDarwin [ pkgs.libiconv ]; - mkv = pkgs.rustPlatform.buildRustPackage { + mkMkv = pkgs': pkgs'.rustPlatform.buildRustPackage { pname = "mkv"; version = "0.1.0"; cargoLock.lockFile = ./Cargo.lock; src = pkgs.lib.cleanSource ./.; - buildInputs = darwinBuildInputs; + buildInputs = pkgs'.lib.optionals pkgs'.stdenv.hostPlatform.isDarwin [ + pkgs'.libiconv + ]; doCheck = false; }; + mkv = mkMkv pkgs; in { devShells.default = pkgs.mkShell { @@ -30,7 +33,12 @@ pkgs.rust-analyzer ]; }; - packages.default = mkv; + packages = { + default = mkv; + } // pkgs.lib.optionalAttrs pkgs.stdenv.hostPlatform.isLinux { + x86_64-linux-musl = mkMkv pkgs.pkgsCross.musl64; + aarch64-linux = mkMkv pkgs.pkgsCross.aarch64-multiplatform; + }; checks.default = mkv.overrideAttrs { doCheck = true; }; } ); diff --git a/src/db.rs b/src/db.rs index 24a2687..5ceb9dd 100644 --- a/src/db.rs +++ b/src/db.rs @@ -30,7 +30,18 @@ fn encode_volumes(v: &[String]) -> String { serde_json::to_string(v).unwrap() } -/// A single SQLite connection behind a mutex, used for both reads and writes. +/// Examples: "abc" -> Some("abd"), "ab\xff" -> Some("ac"), "\xff\xff" -> None +pub fn prefix_upper_bound(prefix: &str) -> Option { + let mut bytes = prefix.as_bytes().to_vec(); + while let Some(last) = bytes.pop() { + if last < 0xFF { + bytes.push(last + 1); + return Some(String::from_utf8_lossy(&bytes).into_owned()); + } + } + None +} + #[derive(Clone)] pub struct Db { conn: Arc>, @@ -93,19 +104,7 @@ impl Db { .collect::, _>>()?; return Ok(keys); } - // Compute exclusive upper bound: increment last non-0xFF byte - let upper = { - let mut bytes = prefix.as_bytes().to_vec(); - let mut result = None; - while let Some(last) = bytes.pop() { - if last < 0xFF { - bytes.push(last + 1); - result = Some(String::from_utf8_lossy(&bytes).into_owned()); - break; - } - } - result - }; + let upper = prefix_upper_bound(&prefix); let keys = match &upper { Some(end) => { let mut stmt = conn.prepare_cached( @@ -198,3 +197,25 @@ impl Db { Ok(records) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_prefix_upper_bound_range_correctness() { + let prefix = "foo"; + let upper = prefix_upper_bound(prefix).unwrap(); + let upper = upper.as_str(); + + // in range [prefix, upper) + assert!("foo" >= prefix && "foo" < upper); + assert!("foo/bar" >= prefix && "foo/bar" < upper); + assert!("foobar" >= prefix && "foobar" < upper); + assert!("foo\x7f" >= prefix && "foo\x7f" < upper); + + // out of range + assert!("fop" >= upper); + assert!("fon" < prefix); + } +} diff --git a/src/error.rs b/src/error.rs index 32e25a4..7536cec 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,6 +35,7 @@ pub enum AppError { Db(rusqlite::Error), InsufficientVolumes { need: usize, have: usize }, PartialWrite, + AllVolumesUnreachable, } impl From for AppError { @@ -58,6 +59,7 @@ impl std::fmt::Display for AppError { write!(f, "need {need} volumes but only {have} available") } AppError::PartialWrite => write!(f, "not all volume writes succeeded"), + AppError::AllVolumesUnreachable => write!(f, "all volume replicas are unreachable"), } } } @@ -70,6 +72,7 @@ impl IntoResponse for AppError { AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR, AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE, AppError::PartialWrite => StatusCode::BAD_GATEWAY, + AppError::AllVolumesUnreachable => StatusCode::BAD_GATEWAY, }; (status, self.to_string()).into_response() } diff --git a/src/lib.rs b/src/lib.rs index ba576f6..498c78a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub mod rebuild; pub mod server; use std::sync::Arc; +use std::time::Duration; const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB @@ -13,6 +14,7 @@ pub struct Args { pub db_path: String, pub volumes: Vec, pub replicas: usize, + pub voltimeout: Duration, } pub fn build_app(args: &Args) -> axum::Router { @@ -36,6 +38,7 @@ pub fn build_app(args: &Args) -> axum::Router { db: db::Db::new(&args.db_path), volumes: Arc::new(args.volumes.clone()), replicas: args.replicas, + voltimeout: args.voltimeout, http: reqwest::Client::new(), }; diff --git a/src/main.rs b/src/main.rs index f838428..7ec4727 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use clap::{Parser, Subcommand}; #[derive(Parser)] @@ -18,6 +20,10 @@ struct Cli { #[arg(short, long, env = "MKV_REPLICAS", default_value_t = 2)] replicas: usize, + /// Timeout for volume health checks (in milliseconds) + #[arg(long, env = "MKV_VOLTIMEOUT", default_value_t = 1000)] + voltimeout: u64, + #[command(subcommand)] command: Commands, } @@ -65,6 +71,7 @@ async fn main() { db_path: cli.db, volumes: cli.volumes, replicas: cli.replicas, + voltimeout: Duration::from_millis(cli.voltimeout), }; match cli.command { diff --git a/src/rebuild.rs b/src/rebuild.rs index f0f3121..71b12f7 100644 --- a/src/rebuild.rs +++ b/src/rebuild.rs @@ -12,6 +12,25 @@ struct NginxEntry { size: Option, } +/// If a key has different sizes across volumes, takes the max. +pub fn merge_volume_scans( + scans: &[(String, Vec<(String, i64)>)], +) -> HashMap, i64)> { + let mut index: HashMap, i64)> = HashMap::new(); + for (vol_url, keys) in scans { + for (key, size) in keys { + let entry = index + .entry(key.clone()) + .or_insert_with(|| (Vec::new(), *size)); + entry.0.push(vol_url.clone()); + if *size > entry.1 { + entry.1 = *size; + } + } + } + index +} + async fn list_volume_keys(volume_url: &str) -> Result, String> { let http = reqwest::Client::new(); let mut keys = Vec::new(); @@ -57,25 +76,21 @@ pub async fn run(args: &Args) { let _ = std::fs::remove_file(format!("{db_path}-shm")); let db = db::Db::new(db_path); - let mut index: HashMap, i64)> = HashMap::new(); + let mut scans = Vec::new(); for vol_url in &args.volumes { eprintln!("Scanning {vol_url}..."); match list_volume_keys(vol_url).await { Ok(keys) => { eprintln!(" Found {} keys", keys.len()); - for (key, size) in keys { - let entry = index.entry(key).or_insert_with(|| (Vec::new(), size)); - entry.0.push(vol_url.clone()); - if size > entry.1 { - entry.1 = size; - } - } + scans.push((vol_url.clone(), keys)); } Err(e) => eprintln!(" Error scanning {vol_url}: {e}"), } } + let index = merge_volume_scans(&scans); + let records: Vec<_> = index .into_iter() .map(|(k, (v, s))| (k, v, Some(s))) @@ -84,3 +99,22 @@ pub async fn run(args: &Args) { db.bulk_put(records).await.expect("bulk_put failed"); eprintln!("Rebuilt index with {count} keys"); } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_merge_takes_max_size() { + // Can happen due to incomplete writes or corruption + let scans = vec![ + ("http://vol1".to_string(), vec![("key".to_string(), 50)]), + ("http://vol2".to_string(), vec![("key".to_string(), 200)]), + ("http://vol3".to_string(), vec![("key".to_string(), 100)]), + ]; + let index = merge_volume_scans(&scans); + let (volumes, size) = index.get("key").unwrap(); + assert_eq!(volumes.len(), 3); + assert_eq!(*size, 200, "should take maximum size across volumes"); + } +} diff --git a/src/server.rs b/src/server.rs index a4b24d4..c4af12e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,6 +3,7 @@ use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use std::sync::Arc; +use std::time::Duration; use crate::db; use crate::error::{AppError, VolumeError}; @@ -12,24 +13,75 @@ pub struct AppState { pub db: db::Db, pub volumes: Arc>, pub replicas: usize, + pub voltimeout: Duration, pub http: reqwest::Client, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProbeResult { + Found(String), + AllFailed, +} + +pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult { + for (vol, &healthy) in volumes.iter().zip(results) { + if healthy { + return ProbeResult::Found(format!("{vol}/{key}")); + } + } + ProbeResult::AllFailed +} + +pub fn shuffle_volumes(volumes: Vec, seed: u64) -> Vec { + use rand::SeedableRng; + use rand::seq::SliceRandom; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut vols = volumes; + vols.shuffle(&mut rng); + vols +} + pub async fn get_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; - let vol = record - .volumes - .first() - .ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?; - let location = format!("{vol}/{key}"); - Ok(( - StatusCode::FOUND, - [(axum::http::header::LOCATION, location)], - ) - .into_response()) + if record.volumes.is_empty() { + return Err(AppError::CorruptRecord { key }); + } + + let seed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + let volumes = shuffle_volumes(record.volumes, seed); + + let mut results = Vec::with_capacity(volumes.len()); + for vol in &volumes { + let url = format!("{vol}/{key}"); + let healthy = match state.http.head(&url).timeout(state.voltimeout).send().await { + Ok(resp) if resp.status().is_success() => true, + Ok(resp) => { + tracing::warn!("volume {vol} returned {} for {key}", resp.status()); + false + } + Err(e) => { + tracing::warn!("volume {vol} unreachable for {key}: {e}"); + false + } + }; + results.push(healthy); + if healthy { + break; + } + } + + match first_healthy_volume(&key, &volumes, &results) { + ProbeResult::Found(url) => { + Ok((StatusCode::FOUND, [(axum::http::header::LOCATION, url)]).into_response()) + } + ProbeResult::AllFailed => Err(AppError::AllVolumesUnreachable), + } } pub async fn put_key( @@ -45,7 +97,6 @@ pub async fn put_key( }); } - // Fan out PUTs to all target volumes concurrently let mut handles = Vec::with_capacity(target_volumes.len()); for vol in &target_volumes { let url = format!("{vol}/{key}"); @@ -180,20 +231,3 @@ pub async fn list_keys( let keys = state.db.list_keys(&query.prefix).await?; Ok((StatusCode::OK, keys.join("\n")).into_response()) } - -#[cfg(test)] -mod tests { - #[test] - fn test_volumes_for_key_sufficient() { - let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); - let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); - assert_eq!(selected.len(), 2); - } - - #[test] - fn test_volumes_for_key_insufficient() { - let volumes: Vec = vec!["http://vol1".into()]; - let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); - assert_eq!(selected.len(), 1); - } -} diff --git a/tests/integration.rs b/tests/integration.rs index 890b23a..cfd46ad 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -19,6 +19,7 @@ async fn start_server() -> String { "http://localhost:3103".into(), ], replicas: 2, + voltimeout: std::time::Duration::from_secs(1), }; let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();