Compare commits
No commits in common. "8e6a96633a27079a3a0a49f10b0bdca862ab466e" and "640f9afba5135099e67e2fe7ec4f487dc37063f9" have entirely different histories.
8e6a96633a
...
640f9afba5
12 changed files with 68 additions and 266 deletions
75
.github/workflows/ci.yml
vendored
75
.github/workflows/ci.yml
vendored
|
|
@ -8,82 +8,29 @@ permissions:
|
||||||
contents: write
|
contents: write
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build-nix:
|
build:
|
||||||
name: Build ${{ matrix.target }}
|
name: Build packages
|
||||||
runs-on: ${{ matrix.runner }}
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
include:
|
include:
|
||||||
- target: x86_64-unknown-linux-gnu
|
- system: x86_64-linux
|
||||||
runner: ubuntu-latest
|
|
||||||
system: x86_64-linux
|
|
||||||
package: default
|
|
||||||
- target: x86_64-unknown-linux-musl
|
|
||||||
runner: ubuntu-latest
|
|
||||||
system: x86_64-linux
|
|
||||||
package: x86_64-linux-musl
|
|
||||||
- target: aarch64-unknown-linux-gnu
|
|
||||||
runner: ubuntu-latest
|
|
||||||
system: x86_64-linux
|
|
||||||
package: aarch64-linux
|
|
||||||
- target: x86_64-apple-darwin
|
|
||||||
runner: macos-13
|
|
||||||
system: x86_64-darwin
|
|
||||||
package: default
|
|
||||||
- target: aarch64-apple-darwin
|
|
||||||
runner: macos-latest
|
|
||||||
system: aarch64-darwin
|
|
||||||
package: default
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
- uses: DeterminateSystems/nix-installer-action@main
|
- uses: DeterminateSystems/nix-installer-action@main
|
||||||
|
# - uses: DeterminateSystems/magic-nix-cache-action@main
|
||||||
|
|
||||||
- run: nix --print-build-logs build .#packages.${{ matrix.system }}.${{ matrix.package }}
|
- run: nix --print-build-logs build .#packages.$SYSTEM.default
|
||||||
|
env:
|
||||||
|
SYSTEM: ${{ matrix.system }}
|
||||||
|
|
||||||
- name: Prepare release artifact
|
- name: Prepare release artifact
|
||||||
if: startsWith(github.ref, 'refs/tags/')
|
if: startsWith(github.ref, 'refs/tags/')
|
||||||
run: cp result/bin/mkv mkv-${{ matrix.target }}
|
run: cp result/bin/mkv mkv-${{ matrix.system }}
|
||||||
|
|
||||||
- name: Upload artifact
|
|
||||||
if: startsWith(github.ref, 'refs/tags/')
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: mkv-${{ matrix.target }}
|
|
||||||
path: mkv-${{ matrix.target }}
|
|
||||||
|
|
||||||
build-windows:
|
|
||||||
name: Build x86_64-pc-windows-msvc
|
|
||||||
runs-on: windows-latest
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- uses: dtolnay/rust-toolchain@stable
|
|
||||||
|
|
||||||
- run: cargo build --release
|
|
||||||
|
|
||||||
- name: Prepare release artifact
|
|
||||||
if: startsWith(github.ref, 'refs/tags/')
|
|
||||||
run: cp target/release/mkv.exe mkv-x86_64-pc-windows-msvc.exe
|
|
||||||
|
|
||||||
- name: Upload artifact
|
|
||||||
if: startsWith(github.ref, 'refs/tags/')
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: mkv-x86_64-pc-windows-msvc
|
|
||||||
path: mkv-x86_64-pc-windows-msvc.exe
|
|
||||||
|
|
||||||
release:
|
|
||||||
name: Create release
|
|
||||||
needs: [build-nix, build-windows]
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
if: startsWith(github.ref, 'refs/tags/')
|
|
||||||
steps:
|
|
||||||
- uses: actions/download-artifact@v4
|
|
||||||
with:
|
|
||||||
path: artifacts
|
|
||||||
merge-multiple: true
|
|
||||||
|
|
||||||
- name: Create release
|
- name: Create release
|
||||||
|
if: startsWith(github.ref, 'refs/tags/')
|
||||||
uses: softprops/action-gh-release@v2
|
uses: softprops/action-gh-release@v2
|
||||||
with:
|
with:
|
||||||
files: artifacts/*
|
files: mkv-${{ matrix.system }}
|
||||||
|
|
|
||||||
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -2,5 +2,3 @@
|
||||||
*.db*
|
*.db*
|
||||||
/result
|
/result
|
||||||
*~
|
*~
|
||||||
\#*
|
|
||||||
.\#*
|
|
||||||
|
|
|
||||||
39
Cargo.lock
generated
39
Cargo.lock
generated
|
|
@ -774,7 +774,6 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"axum",
|
"axum",
|
||||||
"clap",
|
"clap",
|
||||||
"rand 0.8.5",
|
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"rusqlite",
|
"rusqlite",
|
||||||
"serde",
|
"serde",
|
||||||
|
|
@ -909,7 +908,7 @@ dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"getrandom 0.3.4",
|
"getrandom 0.3.4",
|
||||||
"lru-slab",
|
"lru-slab",
|
||||||
"rand 0.9.2",
|
"rand",
|
||||||
"ring",
|
"ring",
|
||||||
"rustc-hash",
|
"rustc-hash",
|
||||||
"rustls",
|
"rustls",
|
||||||
|
|
@ -950,35 +949,14 @@ version = "5.3.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
|
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "rand"
|
|
||||||
version = "0.8.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
|
|
||||||
dependencies = [
|
|
||||||
"libc",
|
|
||||||
"rand_chacha 0.3.1",
|
|
||||||
"rand_core 0.6.4",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rand"
|
name = "rand"
|
||||||
version = "0.9.2"
|
version = "0.9.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
|
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"rand_chacha 0.9.0",
|
"rand_chacha",
|
||||||
"rand_core 0.9.5",
|
"rand_core",
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "rand_chacha"
|
|
||||||
version = "0.3.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
|
|
||||||
dependencies = [
|
|
||||||
"ppv-lite86",
|
|
||||||
"rand_core 0.6.4",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
@ -988,16 +966,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
|
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ppv-lite86",
|
"ppv-lite86",
|
||||||
"rand_core 0.9.5",
|
"rand_core",
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "rand_core"
|
|
||||||
version = "0.6.4"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
|
|
||||||
dependencies = [
|
|
||||||
"getrandom 0.2.17",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,6 @@ clap = { version = "4", features = ["derive", "env"] }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
sha2 = "0.10"
|
sha2 = "0.10"
|
||||||
rand = "0.8"
|
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
opt-level = 3
|
opt-level = 3
|
||||||
|
|
|
||||||
14
flake.nix
14
flake.nix
|
|
@ -12,17 +12,14 @@
|
||||||
darwinBuildInputs = pkgs.lib.optionals pkgs.stdenv.hostPlatform.isDarwin [
|
darwinBuildInputs = pkgs.lib.optionals pkgs.stdenv.hostPlatform.isDarwin [
|
||||||
pkgs.libiconv
|
pkgs.libiconv
|
||||||
];
|
];
|
||||||
mkMkv = pkgs': pkgs'.rustPlatform.buildRustPackage {
|
mkv = pkgs.rustPlatform.buildRustPackage {
|
||||||
pname = "mkv";
|
pname = "mkv";
|
||||||
version = "0.1.0";
|
version = "0.1.0";
|
||||||
cargoLock.lockFile = ./Cargo.lock;
|
cargoLock.lockFile = ./Cargo.lock;
|
||||||
src = pkgs.lib.cleanSource ./.;
|
src = pkgs.lib.cleanSource ./.;
|
||||||
buildInputs = pkgs'.lib.optionals pkgs'.stdenv.hostPlatform.isDarwin [
|
buildInputs = darwinBuildInputs;
|
||||||
pkgs'.libiconv
|
|
||||||
];
|
|
||||||
doCheck = false;
|
doCheck = false;
|
||||||
};
|
};
|
||||||
mkv = mkMkv pkgs;
|
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
devShells.default = pkgs.mkShell {
|
devShells.default = pkgs.mkShell {
|
||||||
|
|
@ -33,12 +30,7 @@
|
||||||
pkgs.rust-analyzer
|
pkgs.rust-analyzer
|
||||||
];
|
];
|
||||||
};
|
};
|
||||||
packages = {
|
packages.default = mkv;
|
||||||
default = mkv;
|
|
||||||
} // pkgs.lib.optionalAttrs pkgs.stdenv.hostPlatform.isLinux {
|
|
||||||
x86_64-linux-musl = mkMkv pkgs.pkgsCross.musl64;
|
|
||||||
aarch64-linux = mkMkv pkgs.pkgsCross.aarch64-multiplatform;
|
|
||||||
};
|
|
||||||
checks.default = mkv.overrideAttrs { doCheck = true; };
|
checks.default = mkv.overrideAttrs { doCheck = true; };
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
|
||||||
49
src/db.rs
49
src/db.rs
|
|
@ -30,18 +30,7 @@ fn encode_volumes(v: &[String]) -> String {
|
||||||
serde_json::to_string(v).unwrap()
|
serde_json::to_string(v).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Examples: "abc" -> Some("abd"), "ab\xff" -> Some("ac"), "\xff\xff" -> None
|
/// A single SQLite connection behind a mutex, used for both reads and writes.
|
||||||
pub fn prefix_upper_bound(prefix: &str) -> Option<String> {
|
|
||||||
let mut bytes = prefix.as_bytes().to_vec();
|
|
||||||
while let Some(last) = bytes.pop() {
|
|
||||||
if last < 0xFF {
|
|
||||||
bytes.push(last + 1);
|
|
||||||
return Some(String::from_utf8_lossy(&bytes).into_owned());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Db {
|
pub struct Db {
|
||||||
conn: Arc<Mutex<Connection>>,
|
conn: Arc<Mutex<Connection>>,
|
||||||
|
|
@ -104,7 +93,19 @@ impl Db {
|
||||||
.collect::<Result<Vec<String>, _>>()?;
|
.collect::<Result<Vec<String>, _>>()?;
|
||||||
return Ok(keys);
|
return Ok(keys);
|
||||||
}
|
}
|
||||||
let upper = prefix_upper_bound(&prefix);
|
// Compute exclusive upper bound: increment last non-0xFF byte
|
||||||
|
let upper = {
|
||||||
|
let mut bytes = prefix.as_bytes().to_vec();
|
||||||
|
let mut result = None;
|
||||||
|
while let Some(last) = bytes.pop() {
|
||||||
|
if last < 0xFF {
|
||||||
|
bytes.push(last + 1);
|
||||||
|
result = Some(String::from_utf8_lossy(&bytes).into_owned());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result
|
||||||
|
};
|
||||||
let keys = match &upper {
|
let keys = match &upper {
|
||||||
Some(end) => {
|
Some(end) => {
|
||||||
let mut stmt = conn.prepare_cached(
|
let mut stmt = conn.prepare_cached(
|
||||||
|
|
@ -197,25 +198,3 @@ impl Db {
|
||||||
Ok(records)
|
Ok(records)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_prefix_upper_bound_range_correctness() {
|
|
||||||
let prefix = "foo";
|
|
||||||
let upper = prefix_upper_bound(prefix).unwrap();
|
|
||||||
let upper = upper.as_str();
|
|
||||||
|
|
||||||
// in range [prefix, upper)
|
|
||||||
assert!("foo" >= prefix && "foo" < upper);
|
|
||||||
assert!("foo/bar" >= prefix && "foo/bar" < upper);
|
|
||||||
assert!("foobar" >= prefix && "foobar" < upper);
|
|
||||||
assert!("foo\x7f" >= prefix && "foo\x7f" < upper);
|
|
||||||
|
|
||||||
// out of range
|
|
||||||
assert!("fop" >= upper);
|
|
||||||
assert!("fon" < prefix);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,6 @@ pub enum AppError {
|
||||||
Db(rusqlite::Error),
|
Db(rusqlite::Error),
|
||||||
InsufficientVolumes { need: usize, have: usize },
|
InsufficientVolumes { need: usize, have: usize },
|
||||||
PartialWrite,
|
PartialWrite,
|
||||||
AllVolumesUnreachable,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<rusqlite::Error> for AppError {
|
impl From<rusqlite::Error> for AppError {
|
||||||
|
|
@ -59,7 +58,6 @@ impl std::fmt::Display for AppError {
|
||||||
write!(f, "need {need} volumes but only {have} available")
|
write!(f, "need {need} volumes but only {have} available")
|
||||||
}
|
}
|
||||||
AppError::PartialWrite => write!(f, "not all volume writes succeeded"),
|
AppError::PartialWrite => write!(f, "not all volume writes succeeded"),
|
||||||
AppError::AllVolumesUnreachable => write!(f, "all volume replicas are unreachable"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -72,7 +70,6 @@ impl IntoResponse for AppError {
|
||||||
AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE,
|
AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE,
|
||||||
AppError::PartialWrite => StatusCode::BAD_GATEWAY,
|
AppError::PartialWrite => StatusCode::BAD_GATEWAY,
|
||||||
AppError::AllVolumesUnreachable => StatusCode::BAD_GATEWAY,
|
|
||||||
};
|
};
|
||||||
(status, self.to_string()).into_response()
|
(status, self.to_string()).into_response()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@ pub mod rebuild;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB
|
const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB
|
||||||
|
|
||||||
|
|
@ -14,7 +13,6 @@ pub struct Args {
|
||||||
pub db_path: String,
|
pub db_path: String,
|
||||||
pub volumes: Vec<String>,
|
pub volumes: Vec<String>,
|
||||||
pub replicas: usize,
|
pub replicas: usize,
|
||||||
pub voltimeout: Duration,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn build_app(args: &Args) -> axum::Router {
|
pub fn build_app(args: &Args) -> axum::Router {
|
||||||
|
|
@ -38,7 +36,6 @@ pub fn build_app(args: &Args) -> axum::Router {
|
||||||
db: db::Db::new(&args.db_path),
|
db: db::Db::new(&args.db_path),
|
||||||
volumes: Arc::new(args.volumes.clone()),
|
volumes: Arc::new(args.volumes.clone()),
|
||||||
replicas: args.replicas,
|
replicas: args.replicas,
|
||||||
voltimeout: args.voltimeout,
|
|
||||||
http: reqwest::Client::new(),
|
http: reqwest::Client::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,3 @@
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
|
|
@ -20,10 +18,6 @@ struct Cli {
|
||||||
#[arg(short, long, env = "MKV_REPLICAS", default_value_t = 2)]
|
#[arg(short, long, env = "MKV_REPLICAS", default_value_t = 2)]
|
||||||
replicas: usize,
|
replicas: usize,
|
||||||
|
|
||||||
/// Timeout for volume health checks (in milliseconds)
|
|
||||||
#[arg(long, env = "MKV_VOLTIMEOUT", default_value_t = 1000)]
|
|
||||||
voltimeout: u64,
|
|
||||||
|
|
||||||
#[command(subcommand)]
|
#[command(subcommand)]
|
||||||
command: Commands,
|
command: Commands,
|
||||||
}
|
}
|
||||||
|
|
@ -71,7 +65,6 @@ async fn main() {
|
||||||
db_path: cli.db,
|
db_path: cli.db,
|
||||||
volumes: cli.volumes,
|
volumes: cli.volumes,
|
||||||
replicas: cli.replicas,
|
replicas: cli.replicas,
|
||||||
voltimeout: Duration::from_millis(cli.voltimeout),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
match cli.command {
|
match cli.command {
|
||||||
|
|
|
||||||
|
|
@ -12,25 +12,6 @@ struct NginxEntry {
|
||||||
size: Option<i64>,
|
size: Option<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If a key has different sizes across volumes, takes the max.
|
|
||||||
pub fn merge_volume_scans(
|
|
||||||
scans: &[(String, Vec<(String, i64)>)],
|
|
||||||
) -> HashMap<String, (Vec<String>, i64)> {
|
|
||||||
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
|
|
||||||
for (vol_url, keys) in scans {
|
|
||||||
for (key, size) in keys {
|
|
||||||
let entry = index
|
|
||||||
.entry(key.clone())
|
|
||||||
.or_insert_with(|| (Vec::new(), *size));
|
|
||||||
entry.0.push(vol_url.clone());
|
|
||||||
if *size > entry.1 {
|
|
||||||
entry.1 = *size;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
index
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
|
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
|
||||||
let http = reqwest::Client::new();
|
let http = reqwest::Client::new();
|
||||||
let mut keys = Vec::new();
|
let mut keys = Vec::new();
|
||||||
|
|
@ -76,21 +57,25 @@ pub async fn run(args: &Args) {
|
||||||
let _ = std::fs::remove_file(format!("{db_path}-shm"));
|
let _ = std::fs::remove_file(format!("{db_path}-shm"));
|
||||||
|
|
||||||
let db = db::Db::new(db_path);
|
let db = db::Db::new(db_path);
|
||||||
|
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
|
||||||
|
|
||||||
let mut scans = Vec::new();
|
|
||||||
for vol_url in &args.volumes {
|
for vol_url in &args.volumes {
|
||||||
eprintln!("Scanning {vol_url}...");
|
eprintln!("Scanning {vol_url}...");
|
||||||
match list_volume_keys(vol_url).await {
|
match list_volume_keys(vol_url).await {
|
||||||
Ok(keys) => {
|
Ok(keys) => {
|
||||||
eprintln!(" Found {} keys", keys.len());
|
eprintln!(" Found {} keys", keys.len());
|
||||||
scans.push((vol_url.clone(), keys));
|
for (key, size) in keys {
|
||||||
|
let entry = index.entry(key).or_insert_with(|| (Vec::new(), size));
|
||||||
|
entry.0.push(vol_url.clone());
|
||||||
|
if size > entry.1 {
|
||||||
|
entry.1 = size;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"),
|
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let index = merge_volume_scans(&scans);
|
|
||||||
|
|
||||||
let records: Vec<_> = index
|
let records: Vec<_> = index
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, (v, s))| (k, v, Some(s)))
|
.map(|(k, (v, s))| (k, v, Some(s)))
|
||||||
|
|
@ -99,22 +84,3 @@ pub async fn run(args: &Args) {
|
||||||
db.bulk_put(records).await.expect("bulk_put failed");
|
db.bulk_put(records).await.expect("bulk_put failed");
|
||||||
eprintln!("Rebuilt index with {count} keys");
|
eprintln!("Rebuilt index with {count} keys");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_merge_takes_max_size() {
|
|
||||||
// Can happen due to incomplete writes or corruption
|
|
||||||
let scans = vec![
|
|
||||||
("http://vol1".to_string(), vec![("key".to_string(), 50)]),
|
|
||||||
("http://vol2".to_string(), vec![("key".to_string(), 200)]),
|
|
||||||
("http://vol3".to_string(), vec![("key".to_string(), 100)]),
|
|
||||||
];
|
|
||||||
let index = merge_volume_scans(&scans);
|
|
||||||
let (volumes, size) = index.get("key").unwrap();
|
|
||||||
assert_eq!(volumes.len(), 3);
|
|
||||||
assert_eq!(*size, 200, "should take maximum size across volumes");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@ use axum::extract::{Path, Query, State};
|
||||||
use axum::http::{HeaderMap, StatusCode};
|
use axum::http::{HeaderMap, StatusCode};
|
||||||
use axum::response::{IntoResponse, Response};
|
use axum::response::{IntoResponse, Response};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use crate::db;
|
use crate::db;
|
||||||
use crate::error::{AppError, VolumeError};
|
use crate::error::{AppError, VolumeError};
|
||||||
|
|
@ -13,75 +12,24 @@ pub struct AppState {
|
||||||
pub db: db::Db,
|
pub db: db::Db,
|
||||||
pub volumes: Arc<Vec<String>>,
|
pub volumes: Arc<Vec<String>>,
|
||||||
pub replicas: usize,
|
pub replicas: usize,
|
||||||
pub voltimeout: Duration,
|
|
||||||
pub http: reqwest::Client,
|
pub http: reqwest::Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
||||||
pub enum ProbeResult {
|
|
||||||
Found(String),
|
|
||||||
AllFailed,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult {
|
|
||||||
for (vol, &healthy) in volumes.iter().zip(results) {
|
|
||||||
if healthy {
|
|
||||||
return ProbeResult::Found(format!("{vol}/{key}"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ProbeResult::AllFailed
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn shuffle_volumes(volumes: Vec<String>, seed: u64) -> Vec<String> {
|
|
||||||
use rand::SeedableRng;
|
|
||||||
use rand::seq::SliceRandom;
|
|
||||||
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
|
|
||||||
let mut vols = volumes;
|
|
||||||
vols.shuffle(&mut rng);
|
|
||||||
vols
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_key(
|
pub async fn get_key(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
Path(key): Path<String>,
|
Path(key): Path<String>,
|
||||||
) -> Result<Response, AppError> {
|
) -> Result<Response, AppError> {
|
||||||
let record = state.db.get(&key).await?;
|
let record = state.db.get(&key).await?;
|
||||||
if record.volumes.is_empty() {
|
let vol = record
|
||||||
return Err(AppError::CorruptRecord { key });
|
.volumes
|
||||||
}
|
.first()
|
||||||
|
.ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?;
|
||||||
let seed = std::time::SystemTime::now()
|
let location = format!("{vol}/{key}");
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
Ok((
|
||||||
.map(|d| d.as_nanos() as u64)
|
StatusCode::FOUND,
|
||||||
.unwrap_or(0);
|
[(axum::http::header::LOCATION, location)],
|
||||||
let volumes = shuffle_volumes(record.volumes, seed);
|
)
|
||||||
|
.into_response())
|
||||||
let mut results = Vec::with_capacity(volumes.len());
|
|
||||||
for vol in &volumes {
|
|
||||||
let url = format!("{vol}/{key}");
|
|
||||||
let healthy = match state.http.head(&url).timeout(state.voltimeout).send().await {
|
|
||||||
Ok(resp) if resp.status().is_success() => true,
|
|
||||||
Ok(resp) => {
|
|
||||||
tracing::warn!("volume {vol} returned {} for {key}", resp.status());
|
|
||||||
false
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!("volume {vol} unreachable for {key}: {e}");
|
|
||||||
false
|
|
||||||
}
|
|
||||||
};
|
|
||||||
results.push(healthy);
|
|
||||||
if healthy {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match first_healthy_volume(&key, &volumes, &results) {
|
|
||||||
ProbeResult::Found(url) => {
|
|
||||||
Ok((StatusCode::FOUND, [(axum::http::header::LOCATION, url)]).into_response())
|
|
||||||
}
|
|
||||||
ProbeResult::AllFailed => Err(AppError::AllVolumesUnreachable),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn put_key(
|
pub async fn put_key(
|
||||||
|
|
@ -97,6 +45,7 @@ pub async fn put_key(
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fan out PUTs to all target volumes concurrently
|
||||||
let mut handles = Vec::with_capacity(target_volumes.len());
|
let mut handles = Vec::with_capacity(target_volumes.len());
|
||||||
for vol in &target_volumes {
|
for vol in &target_volumes {
|
||||||
let url = format!("{vol}/{key}");
|
let url = format!("{vol}/{key}");
|
||||||
|
|
@ -231,3 +180,20 @@ pub async fn list_keys(
|
||||||
let keys = state.db.list_keys(&query.prefix).await?;
|
let keys = state.db.list_keys(&query.prefix).await?;
|
||||||
Ok((StatusCode::OK, keys.join("\n")).into_response())
|
Ok((StatusCode::OK, keys.join("\n")).into_response())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
#[test]
|
||||||
|
fn test_volumes_for_key_sufficient() {
|
||||||
|
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
|
||||||
|
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
|
||||||
|
assert_eq!(selected.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_volumes_for_key_insufficient() {
|
||||||
|
let volumes: Vec<String> = vec!["http://vol1".into()];
|
||||||
|
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
|
||||||
|
assert_eq!(selected.len(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ async fn start_server() -> String {
|
||||||
"http://localhost:3103".into(),
|
"http://localhost:3103".into(),
|
||||||
],
|
],
|
||||||
replicas: 2,
|
replicas: 2,
|
||||||
voltimeout: std::time::Duration::from_secs(1),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue