Compare commits

..

No commits in common. "8e6a96633a27079a3a0a49f10b0bdca862ab466e" and "640f9afba5135099e67e2fe7ec4f487dc37063f9" have entirely different histories.

12 changed files with 68 additions and 266 deletions

View file

@ -8,82 +8,29 @@ permissions:
contents: write
jobs:
build-nix:
name: Build ${{ matrix.target }}
runs-on: ${{ matrix.runner }}
build:
name: Build packages
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- target: x86_64-unknown-linux-gnu
runner: ubuntu-latest
system: x86_64-linux
package: default
- target: x86_64-unknown-linux-musl
runner: ubuntu-latest
system: x86_64-linux
package: x86_64-linux-musl
- target: aarch64-unknown-linux-gnu
runner: ubuntu-latest
system: x86_64-linux
package: aarch64-linux
- target: x86_64-apple-darwin
runner: macos-13
system: x86_64-darwin
package: default
- target: aarch64-apple-darwin
runner: macos-latest
system: aarch64-darwin
package: default
- system: x86_64-linux
steps:
- uses: actions/checkout@v4
- uses: DeterminateSystems/nix-installer-action@main
# - uses: DeterminateSystems/magic-nix-cache-action@main
- run: nix --print-build-logs build .#packages.${{ matrix.system }}.${{ matrix.package }}
- run: nix --print-build-logs build .#packages.$SYSTEM.default
env:
SYSTEM: ${{ matrix.system }}
- name: Prepare release artifact
if: startsWith(github.ref, 'refs/tags/')
run: cp result/bin/mkv mkv-${{ matrix.target }}
- name: Upload artifact
if: startsWith(github.ref, 'refs/tags/')
uses: actions/upload-artifact@v4
with:
name: mkv-${{ matrix.target }}
path: mkv-${{ matrix.target }}
build-windows:
name: Build x86_64-pc-windows-msvc
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- run: cargo build --release
- name: Prepare release artifact
if: startsWith(github.ref, 'refs/tags/')
run: cp target/release/mkv.exe mkv-x86_64-pc-windows-msvc.exe
- name: Upload artifact
if: startsWith(github.ref, 'refs/tags/')
uses: actions/upload-artifact@v4
with:
name: mkv-x86_64-pc-windows-msvc
path: mkv-x86_64-pc-windows-msvc.exe
release:
name: Create release
needs: [build-nix, build-windows]
runs-on: ubuntu-latest
if: startsWith(github.ref, 'refs/tags/')
steps:
- uses: actions/download-artifact@v4
with:
path: artifacts
merge-multiple: true
run: cp result/bin/mkv mkv-${{ matrix.system }}
- name: Create release
if: startsWith(github.ref, 'refs/tags/')
uses: softprops/action-gh-release@v2
with:
files: artifacts/*
files: mkv-${{ matrix.system }}

2
.gitignore vendored
View file

@ -2,5 +2,3 @@
*.db*
/result
*~
\#*
.\#*

39
Cargo.lock generated
View file

@ -774,7 +774,6 @@ version = "0.1.0"
dependencies = [
"axum",
"clap",
"rand 0.8.5",
"reqwest",
"rusqlite",
"serde",
@ -909,7 +908,7 @@ dependencies = [
"bytes",
"getrandom 0.3.4",
"lru-slab",
"rand 0.9.2",
"rand",
"ring",
"rustc-hash",
"rustls",
@ -950,35 +949,14 @@ version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
]
[[package]]
name = "rand"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.5",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.4",
"rand_chacha",
"rand_core",
]
[[package]]
@ -988,16 +966,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core 0.9.5",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom 0.2.17",
"rand_core",
]
[[package]]

View file

@ -14,7 +14,6 @@ clap = { version = "4", features = ["derive", "env"] }
tracing = "0.1"
tracing-subscriber = "0.3"
sha2 = "0.10"
rand = "0.8"
[profile.release]
opt-level = 3

View file

@ -12,17 +12,14 @@
darwinBuildInputs = pkgs.lib.optionals pkgs.stdenv.hostPlatform.isDarwin [
pkgs.libiconv
];
mkMkv = pkgs': pkgs'.rustPlatform.buildRustPackage {
mkv = pkgs.rustPlatform.buildRustPackage {
pname = "mkv";
version = "0.1.0";
cargoLock.lockFile = ./Cargo.lock;
src = pkgs.lib.cleanSource ./.;
buildInputs = pkgs'.lib.optionals pkgs'.stdenv.hostPlatform.isDarwin [
pkgs'.libiconv
];
buildInputs = darwinBuildInputs;
doCheck = false;
};
mkv = mkMkv pkgs;
in
{
devShells.default = pkgs.mkShell {
@ -33,12 +30,7 @@
pkgs.rust-analyzer
];
};
packages = {
default = mkv;
} // pkgs.lib.optionalAttrs pkgs.stdenv.hostPlatform.isLinux {
x86_64-linux-musl = mkMkv pkgs.pkgsCross.musl64;
aarch64-linux = mkMkv pkgs.pkgsCross.aarch64-multiplatform;
};
packages.default = mkv;
checks.default = mkv.overrideAttrs { doCheck = true; };
}
);

View file

@ -30,18 +30,7 @@ fn encode_volumes(v: &[String]) -> String {
serde_json::to_string(v).unwrap()
}
/// Examples: "abc" -> Some("abd"), "ab\xff" -> Some("ac"), "\xff\xff" -> None
pub fn prefix_upper_bound(prefix: &str) -> Option<String> {
let mut bytes = prefix.as_bytes().to_vec();
while let Some(last) = bytes.pop() {
if last < 0xFF {
bytes.push(last + 1);
return Some(String::from_utf8_lossy(&bytes).into_owned());
}
}
None
}
/// A single SQLite connection behind a mutex, used for both reads and writes.
#[derive(Clone)]
pub struct Db {
conn: Arc<Mutex<Connection>>,
@ -104,7 +93,19 @@ impl Db {
.collect::<Result<Vec<String>, _>>()?;
return Ok(keys);
}
let upper = prefix_upper_bound(&prefix);
// Compute exclusive upper bound: increment last non-0xFF byte
let upper = {
let mut bytes = prefix.as_bytes().to_vec();
let mut result = None;
while let Some(last) = bytes.pop() {
if last < 0xFF {
bytes.push(last + 1);
result = Some(String::from_utf8_lossy(&bytes).into_owned());
break;
}
}
result
};
let keys = match &upper {
Some(end) => {
let mut stmt = conn.prepare_cached(
@ -197,25 +198,3 @@ impl Db {
Ok(records)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_prefix_upper_bound_range_correctness() {
let prefix = "foo";
let upper = prefix_upper_bound(prefix).unwrap();
let upper = upper.as_str();
// in range [prefix, upper)
assert!("foo" >= prefix && "foo" < upper);
assert!("foo/bar" >= prefix && "foo/bar" < upper);
assert!("foobar" >= prefix && "foobar" < upper);
assert!("foo\x7f" >= prefix && "foo\x7f" < upper);
// out of range
assert!("fop" >= upper);
assert!("fon" < prefix);
}
}

View file

@ -35,7 +35,6 @@ pub enum AppError {
Db(rusqlite::Error),
InsufficientVolumes { need: usize, have: usize },
PartialWrite,
AllVolumesUnreachable,
}
impl From<rusqlite::Error> for AppError {
@ -59,7 +58,6 @@ impl std::fmt::Display for AppError {
write!(f, "need {need} volumes but only {have} available")
}
AppError::PartialWrite => write!(f, "not all volume writes succeeded"),
AppError::AllVolumesUnreachable => write!(f, "all volume replicas are unreachable"),
}
}
}
@ -72,7 +70,6 @@ impl IntoResponse for AppError {
AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR,
AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE,
AppError::PartialWrite => StatusCode::BAD_GATEWAY,
AppError::AllVolumesUnreachable => StatusCode::BAD_GATEWAY,
};
(status, self.to_string()).into_response()
}

View file

@ -6,7 +6,6 @@ pub mod rebuild;
pub mod server;
use std::sync::Arc;
use std::time::Duration;
const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB
@ -14,7 +13,6 @@ pub struct Args {
pub db_path: String,
pub volumes: Vec<String>,
pub replicas: usize,
pub voltimeout: Duration,
}
pub fn build_app(args: &Args) -> axum::Router {
@ -38,7 +36,6 @@ pub fn build_app(args: &Args) -> axum::Router {
db: db::Db::new(&args.db_path),
volumes: Arc::new(args.volumes.clone()),
replicas: args.replicas,
voltimeout: args.voltimeout,
http: reqwest::Client::new(),
};

View file

@ -1,5 +1,3 @@
use std::time::Duration;
use clap::{Parser, Subcommand};
#[derive(Parser)]
@ -20,10 +18,6 @@ struct Cli {
#[arg(short, long, env = "MKV_REPLICAS", default_value_t = 2)]
replicas: usize,
/// Timeout for volume health checks (in milliseconds)
#[arg(long, env = "MKV_VOLTIMEOUT", default_value_t = 1000)]
voltimeout: u64,
#[command(subcommand)]
command: Commands,
}
@ -71,7 +65,6 @@ async fn main() {
db_path: cli.db,
volumes: cli.volumes,
replicas: cli.replicas,
voltimeout: Duration::from_millis(cli.voltimeout),
};
match cli.command {

View file

@ -12,25 +12,6 @@ struct NginxEntry {
size: Option<i64>,
}
/// If a key has different sizes across volumes, takes the max.
pub fn merge_volume_scans(
scans: &[(String, Vec<(String, i64)>)],
) -> HashMap<String, (Vec<String>, i64)> {
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
for (vol_url, keys) in scans {
for (key, size) in keys {
let entry = index
.entry(key.clone())
.or_insert_with(|| (Vec::new(), *size));
entry.0.push(vol_url.clone());
if *size > entry.1 {
entry.1 = *size;
}
}
}
index
}
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
let http = reqwest::Client::new();
let mut keys = Vec::new();
@ -76,21 +57,25 @@ pub async fn run(args: &Args) {
let _ = std::fs::remove_file(format!("{db_path}-shm"));
let db = db::Db::new(db_path);
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
let mut scans = Vec::new();
for vol_url in &args.volumes {
eprintln!("Scanning {vol_url}...");
match list_volume_keys(vol_url).await {
Ok(keys) => {
eprintln!(" Found {} keys", keys.len());
scans.push((vol_url.clone(), keys));
for (key, size) in keys {
let entry = index.entry(key).or_insert_with(|| (Vec::new(), size));
entry.0.push(vol_url.clone());
if size > entry.1 {
entry.1 = size;
}
}
}
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"),
}
}
let index = merge_volume_scans(&scans);
let records: Vec<_> = index
.into_iter()
.map(|(k, (v, s))| (k, v, Some(s)))
@ -99,22 +84,3 @@ pub async fn run(args: &Args) {
db.bulk_put(records).await.expect("bulk_put failed");
eprintln!("Rebuilt index with {count} keys");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_merge_takes_max_size() {
// Can happen due to incomplete writes or corruption
let scans = vec![
("http://vol1".to_string(), vec![("key".to_string(), 50)]),
("http://vol2".to_string(), vec![("key".to_string(), 200)]),
("http://vol3".to_string(), vec![("key".to_string(), 100)]),
];
let index = merge_volume_scans(&scans);
let (volumes, size) = index.get("key").unwrap();
assert_eq!(volumes.len(), 3);
assert_eq!(*size, 200, "should take maximum size across volumes");
}
}

View file

@ -3,7 +3,6 @@ use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use std::sync::Arc;
use std::time::Duration;
use crate::db;
use crate::error::{AppError, VolumeError};
@ -13,75 +12,24 @@ pub struct AppState {
pub db: db::Db,
pub volumes: Arc<Vec<String>>,
pub replicas: usize,
pub voltimeout: Duration,
pub http: reqwest::Client,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProbeResult {
Found(String),
AllFailed,
}
pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult {
for (vol, &healthy) in volumes.iter().zip(results) {
if healthy {
return ProbeResult::Found(format!("{vol}/{key}"));
}
}
ProbeResult::AllFailed
}
pub fn shuffle_volumes(volumes: Vec<String>, seed: u64) -> Vec<String> {
use rand::SeedableRng;
use rand::seq::SliceRandom;
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
let mut vols = volumes;
vols.shuffle(&mut rng);
vols
}
pub async fn get_key(
State(state): State<AppState>,
Path(key): Path<String>,
) -> Result<Response, AppError> {
let record = state.db.get(&key).await?;
if record.volumes.is_empty() {
return Err(AppError::CorruptRecord { key });
}
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let volumes = shuffle_volumes(record.volumes, seed);
let mut results = Vec::with_capacity(volumes.len());
for vol in &volumes {
let url = format!("{vol}/{key}");
let healthy = match state.http.head(&url).timeout(state.voltimeout).send().await {
Ok(resp) if resp.status().is_success() => true,
Ok(resp) => {
tracing::warn!("volume {vol} returned {} for {key}", resp.status());
false
}
Err(e) => {
tracing::warn!("volume {vol} unreachable for {key}: {e}");
false
}
};
results.push(healthy);
if healthy {
break;
}
}
match first_healthy_volume(&key, &volumes, &results) {
ProbeResult::Found(url) => {
Ok((StatusCode::FOUND, [(axum::http::header::LOCATION, url)]).into_response())
}
ProbeResult::AllFailed => Err(AppError::AllVolumesUnreachable),
}
let vol = record
.volumes
.first()
.ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?;
let location = format!("{vol}/{key}");
Ok((
StatusCode::FOUND,
[(axum::http::header::LOCATION, location)],
)
.into_response())
}
pub async fn put_key(
@ -97,6 +45,7 @@ pub async fn put_key(
});
}
// Fan out PUTs to all target volumes concurrently
let mut handles = Vec::with_capacity(target_volumes.len());
for vol in &target_volumes {
let url = format!("{vol}/{key}");
@ -231,3 +180,20 @@ pub async fn list_keys(
let keys = state.db.list_keys(&query.prefix).await?;
Ok((StatusCode::OK, keys.join("\n")).into_response())
}
#[cfg(test)]
mod tests {
#[test]
fn test_volumes_for_key_sufficient() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
assert_eq!(selected.len(), 2);
}
#[test]
fn test_volumes_for_key_insufficient() {
let volumes: Vec<String> = vec!["http://vol1".into()];
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
assert_eq!(selected.len(), 1);
}
}

View file

@ -19,7 +19,6 @@ async fn start_server() -> String {
"http://localhost:3103".into(),
],
replicas: 2,
voltimeout: std::time::Duration::from_secs(1),
};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();