diff --git a/.forgejo/workflows/ci.yaml b/.forgejo/workflows/ci.yaml deleted file mode 100644 index 4ced5db..0000000 --- a/.forgejo/workflows/ci.yaml +++ /dev/null @@ -1,89 +0,0 @@ -name: "CI" - -on: - push: - branches: ["main"] - tags: ["v*"] - pull_request: - branches: ["main"] - -jobs: - test: - name: Run tests - runs-on: native - steps: - - uses: actions/checkout@v4 - - name: Run unit tests - run: nix build --print-build-logs .#checks.x86_64-linux.default - - build: - name: Build ${{ matrix.target }} - runs-on: native - strategy: - fail-fast: false - matrix: - include: - - package: x86_64-linux - target: x86_64-unknown-linux-gnu - # - package: x86_64-linux-musl - # target: x86_64-unknown-linux-musl - # - package: aarch64-linux - # target: aarch64-unknown-linux-gnu - # - package: default - # target: x86_64-apple-darwin - # system: x86_64-darwin - # - package: default - # target: aarch64-apple-darwin - # system: aarch64-darwin - steps: - - uses: actions/checkout@v4 - - - name: Build - run: nix build --print-build-logs .#packages.x86_64-linux.${{ matrix.package }} - - - name: Prepare release artifact - if: startsWith(forgejo.ref, 'refs/tags/') - run: cp result/bin/mkv mkv-${{ matrix.target }} - - - name: Upload artifact - if: startsWith(forgejo.ref, 'refs/tags/') - uses: actions/upload-artifact@v3 - with: - name: mkv-${{ matrix.target }} - path: mkv-${{ matrix.target }} - - release: - name: Create release - needs: [test, build] - runs-on: native - if: startsWith(forgejo.ref, 'refs/tags/') - steps: - - uses: actions/download-artifact@v3 - with: - path: artifacts - - - name: Collect artifacts - run: | - mkdir -p release - find artifacts -type f -name 'mkv-*' -exec mv {} release/ \; - ls -la release/ - - - name: Create release - run: | - tag="${GITHUB_REF#refs/tags/}" - curl -X POST \ - -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - -H "Content-Type: application/json" \ - -d "{\"tag_name\": \"${tag}\", \"name\": \"${tag}\"}" \ - "${{ env.GITHUB_SERVER_URL }}/api/v1/repos/${{ github.repository }}/releases" - release_id=$(curl -s \ - -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - "${{ env.GITHUB_SERVER_URL }}/api/v1/repos/${{ github.repository }}/releases/tags/${tag}" \ - | jq -r '.id') - for file in release/*; do - curl -X POST \ - -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - -H "Content-Type: application/octet-stream" \ - --data-binary "@${file}" \ - "${{ env.GITHUB_SERVER_URL }}/api/v1/repos/${{ github.repository }}/releases/${release_id}/assets?name=$(basename $file)" - done diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..e45b5f9 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,33 @@ +name: "CI" +on: + pull_request: + push: + branches: ["main"] + tags: ["v*"] +jobs: + build: + name: Build packages + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - system: x86_64-linux + steps: + - uses: actions/checkout@v4 + - uses: DeterminateSystems/nix-installer-action@main + # - uses: DeterminateSystems/magic-nix-cache-action@main + + - run: nix --print-build-logs build .#packages.$SYSTEM.default + env: + SYSTEM: ${{ matrix.system }} + + - name: Prepare release artifact + if: startsWith(github.ref, 'refs/tags/') + run: cp result/bin/mkv mkv-${{ matrix.system }} + + - name: Create release + if: startsWith(github.ref, 'refs/tags/') + uses: softprops/action-gh-release@v2 + with: + files: mkv-${{ matrix.system }} diff --git a/.gitignore b/.gitignore index a4e7adf..d875240 100644 --- a/.gitignore +++ b/.gitignore @@ -2,5 +2,3 @@ *.db* /result *~ -\#* -.\#* diff --git a/Cargo.lock b/Cargo.lock index d697dbc..73e1f42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -774,7 +774,6 @@ version = "0.1.0" dependencies = [ "axum", "clap", - "rand 0.8.5", "reqwest", "rusqlite", "serde", @@ -909,7 +908,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lru-slab", - "rand 0.9.2", + "rand", "ring", "rustc-hash", "rustls", @@ -950,35 +949,14 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - [[package]] name = "rand" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha 0.9.0", - "rand_core 0.9.5", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.4", + "rand_chacha", + "rand_core", ] [[package]] @@ -988,16 +966,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core 0.9.5", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom 0.2.17", + "rand_core", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4498eb2..b074b32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ clap = { version = "4", features = ["derive", "env"] } tracing = "0.1" tracing-subscriber = "0.3" sha2 = "0.10" -rand = "0.8" [profile.release] opt-level = 3 diff --git a/flake.nix b/flake.nix index 109ed01..3195bf6 100644 --- a/flake.nix +++ b/flake.nix @@ -12,17 +12,14 @@ darwinBuildInputs = pkgs.lib.optionals pkgs.stdenv.hostPlatform.isDarwin [ pkgs.libiconv ]; - mkMkv = pkgs': pkgs'.rustPlatform.buildRustPackage { + mkv = pkgs.rustPlatform.buildRustPackage { pname = "mkv"; version = "0.1.0"; cargoLock.lockFile = ./Cargo.lock; src = pkgs.lib.cleanSource ./.; - buildInputs = pkgs'.lib.optionals pkgs'.stdenv.hostPlatform.isDarwin [ - pkgs'.libiconv - ]; + buildInputs = darwinBuildInputs; doCheck = false; }; - mkv = mkMkv pkgs; in { devShells.default = pkgs.mkShell { @@ -33,21 +30,8 @@ pkgs.rust-analyzer ]; }; - packages = { - default = mkv; - x86_64-linux = mkv; - } // pkgs.lib.optionalAttrs pkgs.stdenv.hostPlatform.isLinux { - x86_64-linux-musl = mkMkv pkgs.pkgsCross.musl64; - aarch64-linux = mkMkv pkgs.pkgsCross.aarch64-multiplatform; - }; - checks.default = mkv.overrideAttrs { - doCheck = true; - checkPhase = '' - runHook preCheck - cargo test --lib - runHook postCheck - ''; - }; + packages.default = mkv; + checks.default = mkv.overrideAttrs { doCheck = true; }; } ); } diff --git a/src/db.rs b/src/db.rs index 5ceb9dd..24a2687 100644 --- a/src/db.rs +++ b/src/db.rs @@ -30,18 +30,7 @@ fn encode_volumes(v: &[String]) -> String { serde_json::to_string(v).unwrap() } -/// Examples: "abc" -> Some("abd"), "ab\xff" -> Some("ac"), "\xff\xff" -> None -pub fn prefix_upper_bound(prefix: &str) -> Option { - let mut bytes = prefix.as_bytes().to_vec(); - while let Some(last) = bytes.pop() { - if last < 0xFF { - bytes.push(last + 1); - return Some(String::from_utf8_lossy(&bytes).into_owned()); - } - } - None -} - +/// A single SQLite connection behind a mutex, used for both reads and writes. #[derive(Clone)] pub struct Db { conn: Arc>, @@ -104,7 +93,19 @@ impl Db { .collect::, _>>()?; return Ok(keys); } - let upper = prefix_upper_bound(&prefix); + // Compute exclusive upper bound: increment last non-0xFF byte + let upper = { + let mut bytes = prefix.as_bytes().to_vec(); + let mut result = None; + while let Some(last) = bytes.pop() { + if last < 0xFF { + bytes.push(last + 1); + result = Some(String::from_utf8_lossy(&bytes).into_owned()); + break; + } + } + result + }; let keys = match &upper { Some(end) => { let mut stmt = conn.prepare_cached( @@ -197,25 +198,3 @@ impl Db { Ok(records) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_prefix_upper_bound_range_correctness() { - let prefix = "foo"; - let upper = prefix_upper_bound(prefix).unwrap(); - let upper = upper.as_str(); - - // in range [prefix, upper) - assert!("foo" >= prefix && "foo" < upper); - assert!("foo/bar" >= prefix && "foo/bar" < upper); - assert!("foobar" >= prefix && "foobar" < upper); - assert!("foo\x7f" >= prefix && "foo\x7f" < upper); - - // out of range - assert!("fop" >= upper); - assert!("fon" < prefix); - } -} diff --git a/src/error.rs b/src/error.rs index 7536cec..32e25a4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,7 +35,6 @@ pub enum AppError { Db(rusqlite::Error), InsufficientVolumes { need: usize, have: usize }, PartialWrite, - AllVolumesUnreachable, } impl From for AppError { @@ -59,7 +58,6 @@ impl std::fmt::Display for AppError { write!(f, "need {need} volumes but only {have} available") } AppError::PartialWrite => write!(f, "not all volume writes succeeded"), - AppError::AllVolumesUnreachable => write!(f, "all volume replicas are unreachable"), } } } @@ -72,7 +70,6 @@ impl IntoResponse for AppError { AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR, AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE, AppError::PartialWrite => StatusCode::BAD_GATEWAY, - AppError::AllVolumesUnreachable => StatusCode::BAD_GATEWAY, }; (status, self.to_string()).into_response() } diff --git a/src/lib.rs b/src/lib.rs index 498c78a..ba576f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,6 @@ pub mod rebuild; pub mod server; use std::sync::Arc; -use std::time::Duration; const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB @@ -14,7 +13,6 @@ pub struct Args { pub db_path: String, pub volumes: Vec, pub replicas: usize, - pub voltimeout: Duration, } pub fn build_app(args: &Args) -> axum::Router { @@ -38,7 +36,6 @@ pub fn build_app(args: &Args) -> axum::Router { db: db::Db::new(&args.db_path), volumes: Arc::new(args.volumes.clone()), replicas: args.replicas, - voltimeout: args.voltimeout, http: reqwest::Client::new(), }; diff --git a/src/main.rs b/src/main.rs index 7ec4727..f838428 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use clap::{Parser, Subcommand}; #[derive(Parser)] @@ -20,10 +18,6 @@ struct Cli { #[arg(short, long, env = "MKV_REPLICAS", default_value_t = 2)] replicas: usize, - /// Timeout for volume health checks (in milliseconds) - #[arg(long, env = "MKV_VOLTIMEOUT", default_value_t = 1000)] - voltimeout: u64, - #[command(subcommand)] command: Commands, } @@ -71,7 +65,6 @@ async fn main() { db_path: cli.db, volumes: cli.volumes, replicas: cli.replicas, - voltimeout: Duration::from_millis(cli.voltimeout), }; match cli.command { diff --git a/src/rebuild.rs b/src/rebuild.rs index 71b12f7..f0f3121 100644 --- a/src/rebuild.rs +++ b/src/rebuild.rs @@ -12,25 +12,6 @@ struct NginxEntry { size: Option, } -/// If a key has different sizes across volumes, takes the max. -pub fn merge_volume_scans( - scans: &[(String, Vec<(String, i64)>)], -) -> HashMap, i64)> { - let mut index: HashMap, i64)> = HashMap::new(); - for (vol_url, keys) in scans { - for (key, size) in keys { - let entry = index - .entry(key.clone()) - .or_insert_with(|| (Vec::new(), *size)); - entry.0.push(vol_url.clone()); - if *size > entry.1 { - entry.1 = *size; - } - } - } - index -} - async fn list_volume_keys(volume_url: &str) -> Result, String> { let http = reqwest::Client::new(); let mut keys = Vec::new(); @@ -76,21 +57,25 @@ pub async fn run(args: &Args) { let _ = std::fs::remove_file(format!("{db_path}-shm")); let db = db::Db::new(db_path); + let mut index: HashMap, i64)> = HashMap::new(); - let mut scans = Vec::new(); for vol_url in &args.volumes { eprintln!("Scanning {vol_url}..."); match list_volume_keys(vol_url).await { Ok(keys) => { eprintln!(" Found {} keys", keys.len()); - scans.push((vol_url.clone(), keys)); + for (key, size) in keys { + let entry = index.entry(key).or_insert_with(|| (Vec::new(), size)); + entry.0.push(vol_url.clone()); + if size > entry.1 { + entry.1 = size; + } + } } Err(e) => eprintln!(" Error scanning {vol_url}: {e}"), } } - let index = merge_volume_scans(&scans); - let records: Vec<_> = index .into_iter() .map(|(k, (v, s))| (k, v, Some(s))) @@ -99,22 +84,3 @@ pub async fn run(args: &Args) { db.bulk_put(records).await.expect("bulk_put failed"); eprintln!("Rebuilt index with {count} keys"); } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_merge_takes_max_size() { - // Can happen due to incomplete writes or corruption - let scans = vec![ - ("http://vol1".to_string(), vec![("key".to_string(), 50)]), - ("http://vol2".to_string(), vec![("key".to_string(), 200)]), - ("http://vol3".to_string(), vec![("key".to_string(), 100)]), - ]; - let index = merge_volume_scans(&scans); - let (volumes, size) = index.get("key").unwrap(); - assert_eq!(volumes.len(), 3); - assert_eq!(*size, 200, "should take maximum size across volumes"); - } -} diff --git a/src/server.rs b/src/server.rs index c4af12e..a4b24d4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,7 +3,6 @@ use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use std::sync::Arc; -use std::time::Duration; use crate::db; use crate::error::{AppError, VolumeError}; @@ -13,75 +12,24 @@ pub struct AppState { pub db: db::Db, pub volumes: Arc>, pub replicas: usize, - pub voltimeout: Duration, pub http: reqwest::Client, } -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ProbeResult { - Found(String), - AllFailed, -} - -pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult { - for (vol, &healthy) in volumes.iter().zip(results) { - if healthy { - return ProbeResult::Found(format!("{vol}/{key}")); - } - } - ProbeResult::AllFailed -} - -pub fn shuffle_volumes(volumes: Vec, seed: u64) -> Vec { - use rand::SeedableRng; - use rand::seq::SliceRandom; - let mut rng = rand::rngs::StdRng::seed_from_u64(seed); - let mut vols = volumes; - vols.shuffle(&mut rng); - vols -} - pub async fn get_key( State(state): State, Path(key): Path, ) -> Result { let record = state.db.get(&key).await?; - if record.volumes.is_empty() { - return Err(AppError::CorruptRecord { key }); - } - - let seed = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .unwrap_or(0); - let volumes = shuffle_volumes(record.volumes, seed); - - let mut results = Vec::with_capacity(volumes.len()); - for vol in &volumes { - let url = format!("{vol}/{key}"); - let healthy = match state.http.head(&url).timeout(state.voltimeout).send().await { - Ok(resp) if resp.status().is_success() => true, - Ok(resp) => { - tracing::warn!("volume {vol} returned {} for {key}", resp.status()); - false - } - Err(e) => { - tracing::warn!("volume {vol} unreachable for {key}: {e}"); - false - } - }; - results.push(healthy); - if healthy { - break; - } - } - - match first_healthy_volume(&key, &volumes, &results) { - ProbeResult::Found(url) => { - Ok((StatusCode::FOUND, [(axum::http::header::LOCATION, url)]).into_response()) - } - ProbeResult::AllFailed => Err(AppError::AllVolumesUnreachable), - } + let vol = record + .volumes + .first() + .ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?; + let location = format!("{vol}/{key}"); + Ok(( + StatusCode::FOUND, + [(axum::http::header::LOCATION, location)], + ) + .into_response()) } pub async fn put_key( @@ -97,6 +45,7 @@ pub async fn put_key( }); } + // Fan out PUTs to all target volumes concurrently let mut handles = Vec::with_capacity(target_volumes.len()); for vol in &target_volumes { let url = format!("{vol}/{key}"); @@ -231,3 +180,20 @@ pub async fn list_keys( let keys = state.db.list_keys(&query.prefix).await?; Ok((StatusCode::OK, keys.join("\n")).into_response()) } + +#[cfg(test)] +mod tests { + #[test] + fn test_volumes_for_key_sufficient() { + let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); + let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); + assert_eq!(selected.len(), 2); + } + + #[test] + fn test_volumes_for_key_insufficient() { + let volumes: Vec = vec!["http://vol1".into()]; + let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2); + assert_eq!(selected.len(), 1); + } +} diff --git a/tests/integration.rs b/tests/integration.rs index cfd46ad..890b23a 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -19,7 +19,6 @@ async fn start_server() -> String { "http://localhost:3103".into(), ], replicas: 2, - voltimeout: std::time::Duration::from_secs(1), }; let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();