Compare commits

..

17 commits
v0.1.0 ... main

Author SHA1 Message Date
282d1faad1 ci: rename test job to "Run tests"
All checks were successful
CI / Run tests (push) Successful in 1m30s
CI / Build x86_64-unknown-linux-gnu (push) Successful in 1m13s
CI / Create release (push) Has been skipped
2026-04-03 16:38:57 +02:00
bf6060b6d1 ci: remove unnecessary builds
Some checks failed
CI / Test (push) Successful in 1m29s
CI / Build x86_64-unknown-linux-gnu (push) Has been cancelled
CI / Create release (push) Has been cancelled
2026-04-03 16:36:33 +02:00
9adeea8732 .
All checks were successful
CI / Test (push) Successful in 1s
CI / Build aarch64-unknown-linux-gnu (push) Successful in 3s
CI / Build x86_64-unknown-linux-gnu (push) Successful in 2s
CI / Build x86_64-unknown-linux-musl (push) Successful in 2s
CI / Create release (push) Successful in 8s
2026-04-03 00:35:25 +02:00
e4bc2d0740 Add comment
Some checks failed
CI / Test (push) Successful in 2s
CI / Build aarch64-unknown-linux-gnu (push) Successful in 3s
CI / Build x86_64-unknown-linux-gnu (push) Successful in 3s
CI / Build x86_64-unknown-linux-musl (push) Successful in 3s
CI / Create release (push) Failing after 11s
2026-04-02 23:39:14 +02:00
9b5b7be0de Fix CI/CD 2026-04-02 23:35:15 +02:00
f72cc0c9aa New CI/CD
Some checks failed
CI / Test (push) Failing after 23s
CI / Build aarch64-linux (push) Successful in 1m29s
CI / Build default (push) Successful in 1m13s
CI / Build x86_64-linux-musl (push) Successful in 1m17s
CI / Create release (push) Has been skipped
2026-04-02 23:22:11 +02:00
8e6a96633a Emacs
Some checks failed
CI / Build aarch64-unknown-linux-gnu (push) Has been cancelled
CI / Build x86_64-apple-darwin (push) Has been cancelled
CI / Build aarch64-apple-darwin (push) Has been cancelled
CI / Build x86_64-unknown-linux-gnu (push) Has been cancelled
CI / Build x86_64-unknown-linux-musl (push) Has been cancelled
CI / Build x86_64-pc-windows-msvc (push) Has been cancelled
CI / Create release (push) Has been cancelled
2026-03-08 17:57:21 +01:00
07e1ab0796 Format 2026-03-08 17:55:37 +01:00
f19656486a Add timeout 2026-03-08 17:41:58 +01:00
138ab72240 Idk 2026-03-08 14:06:10 +01:00
d66a01e7da Add support for more architectures 2026-03-08 13:54:42 +01:00
5daa983034 Clean up comments 2026-03-08 13:31:44 +01:00
5cdaeddc0e Clean up tests 2026-03-08 13:31:35 +01:00
fa4dc716db Purify 2026-03-08 13:24:54 +01:00
1d3b9dddf5 Add unit test for failover 2026-03-08 13:09:55 +01:00
1fc59674f5 Allow for reads if one volume is down 2026-03-08 13:08:49 +01:00
640f9afba5 Try givingpermission 2026-03-08 11:45:40 +01:00
13 changed files with 300 additions and 91 deletions

View file

@ -0,0 +1,89 @@
name: "CI"
on:
push:
branches: ["main"]
tags: ["v*"]
pull_request:
branches: ["main"]
jobs:
test:
name: Run tests
runs-on: native
steps:
- uses: actions/checkout@v4
- name: Run unit tests
run: nix build --print-build-logs .#checks.x86_64-linux.default
build:
name: Build ${{ matrix.target }}
runs-on: native
strategy:
fail-fast: false
matrix:
include:
- package: x86_64-linux
target: x86_64-unknown-linux-gnu
# - package: x86_64-linux-musl
# target: x86_64-unknown-linux-musl
# - package: aarch64-linux
# target: aarch64-unknown-linux-gnu
# - package: default
# target: x86_64-apple-darwin
# system: x86_64-darwin
# - package: default
# target: aarch64-apple-darwin
# system: aarch64-darwin
steps:
- uses: actions/checkout@v4
- name: Build
run: nix build --print-build-logs .#packages.x86_64-linux.${{ matrix.package }}
- name: Prepare release artifact
if: startsWith(forgejo.ref, 'refs/tags/')
run: cp result/bin/mkv mkv-${{ matrix.target }}
- name: Upload artifact
if: startsWith(forgejo.ref, 'refs/tags/')
uses: actions/upload-artifact@v3
with:
name: mkv-${{ matrix.target }}
path: mkv-${{ matrix.target }}
release:
name: Create release
needs: [test, build]
runs-on: native
if: startsWith(forgejo.ref, 'refs/tags/')
steps:
- uses: actions/download-artifact@v3
with:
path: artifacts
- name: Collect artifacts
run: |
mkdir -p release
find artifacts -type f -name 'mkv-*' -exec mv {} release/ \;
ls -la release/
- name: Create release
run: |
tag="${GITHUB_REF#refs/tags/}"
curl -X POST \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Content-Type: application/json" \
-d "{\"tag_name\": \"${tag}\", \"name\": \"${tag}\"}" \
"${{ env.GITHUB_SERVER_URL }}/api/v1/repos/${{ github.repository }}/releases"
release_id=$(curl -s \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
"${{ env.GITHUB_SERVER_URL }}/api/v1/repos/${{ github.repository }}/releases/tags/${tag}" \
| jq -r '.id')
for file in release/*; do
curl -X POST \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Content-Type: application/octet-stream" \
--data-binary "@${file}" \
"${{ env.GITHUB_SERVER_URL }}/api/v1/repos/${{ github.repository }}/releases/${release_id}/assets?name=$(basename $file)"
done

View file

@ -1,33 +0,0 @@
name: "CI"
on:
pull_request:
push:
branches: ["main"]
tags: ["v*"]
jobs:
build:
name: Build packages
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- system: x86_64-linux
steps:
- uses: actions/checkout@v4
- uses: DeterminateSystems/nix-installer-action@main
# - uses: DeterminateSystems/magic-nix-cache-action@main
- run: nix --print-build-logs build .#packages.$SYSTEM.default
env:
SYSTEM: ${{ matrix.system }}
- name: Prepare release artifact
if: startsWith(github.ref, 'refs/tags/')
run: cp result/bin/mkv mkv-${{ matrix.system }}
- name: Create release
if: startsWith(github.ref, 'refs/tags/')
uses: softprops/action-gh-release@v2
with:
files: mkv-${{ matrix.system }}

2
.gitignore vendored
View file

@ -2,3 +2,5 @@
*.db*
/result
*~
\#*
.\#*

39
Cargo.lock generated
View file

@ -774,6 +774,7 @@ version = "0.1.0"
dependencies = [
"axum",
"clap",
"rand 0.8.5",
"reqwest",
"rusqlite",
"serde",
@ -908,7 +909,7 @@ dependencies = [
"bytes",
"getrandom 0.3.4",
"lru-slab",
"rand",
"rand 0.9.2",
"ring",
"rustc-hash",
"rustls",
@ -949,14 +950,35 @@ version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
]
[[package]]
name = "rand"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
dependencies = [
"rand_chacha",
"rand_core",
"rand_chacha 0.9.0",
"rand_core 0.9.5",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.4",
]
[[package]]
@ -966,7 +988,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core",
"rand_core 0.9.5",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom 0.2.17",
]
[[package]]

View file

@ -14,6 +14,7 @@ clap = { version = "4", features = ["derive", "env"] }
tracing = "0.1"
tracing-subscriber = "0.3"
sha2 = "0.10"
rand = "0.8"
[profile.release]
opt-level = 3

View file

@ -12,14 +12,17 @@
darwinBuildInputs = pkgs.lib.optionals pkgs.stdenv.hostPlatform.isDarwin [
pkgs.libiconv
];
mkv = pkgs.rustPlatform.buildRustPackage {
mkMkv = pkgs': pkgs'.rustPlatform.buildRustPackage {
pname = "mkv";
version = "0.1.0";
cargoLock.lockFile = ./Cargo.lock;
src = pkgs.lib.cleanSource ./.;
buildInputs = darwinBuildInputs;
buildInputs = pkgs'.lib.optionals pkgs'.stdenv.hostPlatform.isDarwin [
pkgs'.libiconv
];
doCheck = false;
};
mkv = mkMkv pkgs;
in
{
devShells.default = pkgs.mkShell {
@ -30,8 +33,21 @@
pkgs.rust-analyzer
];
};
packages.default = mkv;
checks.default = mkv.overrideAttrs { doCheck = true; };
packages = {
default = mkv;
x86_64-linux = mkv;
} // pkgs.lib.optionalAttrs pkgs.stdenv.hostPlatform.isLinux {
x86_64-linux-musl = mkMkv pkgs.pkgsCross.musl64;
aarch64-linux = mkMkv pkgs.pkgsCross.aarch64-multiplatform;
};
checks.default = mkv.overrideAttrs {
doCheck = true;
checkPhase = ''
runHook preCheck
cargo test --lib
runHook postCheck
'';
};
}
);
}

View file

@ -30,7 +30,18 @@ fn encode_volumes(v: &[String]) -> String {
serde_json::to_string(v).unwrap()
}
/// A single SQLite connection behind a mutex, used for both reads and writes.
/// Examples: "abc" -> Some("abd"), "ab\xff" -> Some("ac"), "\xff\xff" -> None
pub fn prefix_upper_bound(prefix: &str) -> Option<String> {
let mut bytes = prefix.as_bytes().to_vec();
while let Some(last) = bytes.pop() {
if last < 0xFF {
bytes.push(last + 1);
return Some(String::from_utf8_lossy(&bytes).into_owned());
}
}
None
}
#[derive(Clone)]
pub struct Db {
conn: Arc<Mutex<Connection>>,
@ -93,19 +104,7 @@ impl Db {
.collect::<Result<Vec<String>, _>>()?;
return Ok(keys);
}
// Compute exclusive upper bound: increment last non-0xFF byte
let upper = {
let mut bytes = prefix.as_bytes().to_vec();
let mut result = None;
while let Some(last) = bytes.pop() {
if last < 0xFF {
bytes.push(last + 1);
result = Some(String::from_utf8_lossy(&bytes).into_owned());
break;
}
}
result
};
let upper = prefix_upper_bound(&prefix);
let keys = match &upper {
Some(end) => {
let mut stmt = conn.prepare_cached(
@ -198,3 +197,25 @@ impl Db {
Ok(records)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_prefix_upper_bound_range_correctness() {
let prefix = "foo";
let upper = prefix_upper_bound(prefix).unwrap();
let upper = upper.as_str();
// in range [prefix, upper)
assert!("foo" >= prefix && "foo" < upper);
assert!("foo/bar" >= prefix && "foo/bar" < upper);
assert!("foobar" >= prefix && "foobar" < upper);
assert!("foo\x7f" >= prefix && "foo\x7f" < upper);
// out of range
assert!("fop" >= upper);
assert!("fon" < prefix);
}
}

View file

@ -35,6 +35,7 @@ pub enum AppError {
Db(rusqlite::Error),
InsufficientVolumes { need: usize, have: usize },
PartialWrite,
AllVolumesUnreachable,
}
impl From<rusqlite::Error> for AppError {
@ -58,6 +59,7 @@ impl std::fmt::Display for AppError {
write!(f, "need {need} volumes but only {have} available")
}
AppError::PartialWrite => write!(f, "not all volume writes succeeded"),
AppError::AllVolumesUnreachable => write!(f, "all volume replicas are unreachable"),
}
}
}
@ -70,6 +72,7 @@ impl IntoResponse for AppError {
AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR,
AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE,
AppError::PartialWrite => StatusCode::BAD_GATEWAY,
AppError::AllVolumesUnreachable => StatusCode::BAD_GATEWAY,
};
(status, self.to_string()).into_response()
}

View file

@ -6,6 +6,7 @@ pub mod rebuild;
pub mod server;
use std::sync::Arc;
use std::time::Duration;
const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB
@ -13,6 +14,7 @@ pub struct Args {
pub db_path: String,
pub volumes: Vec<String>,
pub replicas: usize,
pub voltimeout: Duration,
}
pub fn build_app(args: &Args) -> axum::Router {
@ -36,6 +38,7 @@ pub fn build_app(args: &Args) -> axum::Router {
db: db::Db::new(&args.db_path),
volumes: Arc::new(args.volumes.clone()),
replicas: args.replicas,
voltimeout: args.voltimeout,
http: reqwest::Client::new(),
};

View file

@ -1,3 +1,5 @@
use std::time::Duration;
use clap::{Parser, Subcommand};
#[derive(Parser)]
@ -18,6 +20,10 @@ struct Cli {
#[arg(short, long, env = "MKV_REPLICAS", default_value_t = 2)]
replicas: usize,
/// Timeout for volume health checks (in milliseconds)
#[arg(long, env = "MKV_VOLTIMEOUT", default_value_t = 1000)]
voltimeout: u64,
#[command(subcommand)]
command: Commands,
}
@ -65,6 +71,7 @@ async fn main() {
db_path: cli.db,
volumes: cli.volumes,
replicas: cli.replicas,
voltimeout: Duration::from_millis(cli.voltimeout),
};
match cli.command {

View file

@ -12,6 +12,25 @@ struct NginxEntry {
size: Option<i64>,
}
/// If a key has different sizes across volumes, takes the max.
pub fn merge_volume_scans(
scans: &[(String, Vec<(String, i64)>)],
) -> HashMap<String, (Vec<String>, i64)> {
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
for (vol_url, keys) in scans {
for (key, size) in keys {
let entry = index
.entry(key.clone())
.or_insert_with(|| (Vec::new(), *size));
entry.0.push(vol_url.clone());
if *size > entry.1 {
entry.1 = *size;
}
}
}
index
}
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
let http = reqwest::Client::new();
let mut keys = Vec::new();
@ -57,25 +76,21 @@ pub async fn run(args: &Args) {
let _ = std::fs::remove_file(format!("{db_path}-shm"));
let db = db::Db::new(db_path);
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
let mut scans = Vec::new();
for vol_url in &args.volumes {
eprintln!("Scanning {vol_url}...");
match list_volume_keys(vol_url).await {
Ok(keys) => {
eprintln!(" Found {} keys", keys.len());
for (key, size) in keys {
let entry = index.entry(key).or_insert_with(|| (Vec::new(), size));
entry.0.push(vol_url.clone());
if size > entry.1 {
entry.1 = size;
}
}
scans.push((vol_url.clone(), keys));
}
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"),
}
}
let index = merge_volume_scans(&scans);
let records: Vec<_> = index
.into_iter()
.map(|(k, (v, s))| (k, v, Some(s)))
@ -84,3 +99,22 @@ pub async fn run(args: &Args) {
db.bulk_put(records).await.expect("bulk_put failed");
eprintln!("Rebuilt index with {count} keys");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_merge_takes_max_size() {
// Can happen due to incomplete writes or corruption
let scans = vec![
("http://vol1".to_string(), vec![("key".to_string(), 50)]),
("http://vol2".to_string(), vec![("key".to_string(), 200)]),
("http://vol3".to_string(), vec![("key".to_string(), 100)]),
];
let index = merge_volume_scans(&scans);
let (volumes, size) = index.get("key").unwrap();
assert_eq!(volumes.len(), 3);
assert_eq!(*size, 200, "should take maximum size across volumes");
}
}

View file

@ -3,6 +3,7 @@ use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use std::sync::Arc;
use std::time::Duration;
use crate::db;
use crate::error::{AppError, VolumeError};
@ -12,24 +13,75 @@ pub struct AppState {
pub db: db::Db,
pub volumes: Arc<Vec<String>>,
pub replicas: usize,
pub voltimeout: Duration,
pub http: reqwest::Client,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProbeResult {
Found(String),
AllFailed,
}
pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult {
for (vol, &healthy) in volumes.iter().zip(results) {
if healthy {
return ProbeResult::Found(format!("{vol}/{key}"));
}
}
ProbeResult::AllFailed
}
pub fn shuffle_volumes(volumes: Vec<String>, seed: u64) -> Vec<String> {
use rand::SeedableRng;
use rand::seq::SliceRandom;
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
let mut vols = volumes;
vols.shuffle(&mut rng);
vols
}
pub async fn get_key(
State(state): State<AppState>,
Path(key): Path<String>,
) -> Result<Response, AppError> {
let record = state.db.get(&key).await?;
let vol = record
.volumes
.first()
.ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?;
let location = format!("{vol}/{key}");
Ok((
StatusCode::FOUND,
[(axum::http::header::LOCATION, location)],
)
.into_response())
if record.volumes.is_empty() {
return Err(AppError::CorruptRecord { key });
}
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let volumes = shuffle_volumes(record.volumes, seed);
let mut results = Vec::with_capacity(volumes.len());
for vol in &volumes {
let url = format!("{vol}/{key}");
let healthy = match state.http.head(&url).timeout(state.voltimeout).send().await {
Ok(resp) if resp.status().is_success() => true,
Ok(resp) => {
tracing::warn!("volume {vol} returned {} for {key}", resp.status());
false
}
Err(e) => {
tracing::warn!("volume {vol} unreachable for {key}: {e}");
false
}
};
results.push(healthy);
if healthy {
break;
}
}
match first_healthy_volume(&key, &volumes, &results) {
ProbeResult::Found(url) => {
Ok((StatusCode::FOUND, [(axum::http::header::LOCATION, url)]).into_response())
}
ProbeResult::AllFailed => Err(AppError::AllVolumesUnreachable),
}
}
pub async fn put_key(
@ -45,7 +97,6 @@ pub async fn put_key(
});
}
// Fan out PUTs to all target volumes concurrently
let mut handles = Vec::with_capacity(target_volumes.len());
for vol in &target_volumes {
let url = format!("{vol}/{key}");
@ -180,20 +231,3 @@ pub async fn list_keys(
let keys = state.db.list_keys(&query.prefix).await?;
Ok((StatusCode::OK, keys.join("\n")).into_response())
}
#[cfg(test)]
mod tests {
#[test]
fn test_volumes_for_key_sufficient() {
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
assert_eq!(selected.len(), 2);
}
#[test]
fn test_volumes_for_key_insufficient() {
let volumes: Vec<String> = vec!["http://vol1".into()];
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
assert_eq!(selected.len(), 1);
}
}

View file

@ -19,6 +19,7 @@ async fn start_server() -> String {
"http://localhost:3103".into(),
],
replicas: 2,
voltimeout: std::time::Duration::from_secs(1),
};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();