Compare commits
No commits in common. "main" and "v0.1.0" have entirely different histories.
13 changed files with 91 additions and 300 deletions
|
|
@ -1,89 +0,0 @@
|
|||
name: "CI"
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: ["main"]
|
||||
tags: ["v*"]
|
||||
pull_request:
|
||||
branches: ["main"]
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: Run tests
|
||||
runs-on: native
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Run unit tests
|
||||
run: nix build --print-build-logs .#checks.x86_64-linux.default
|
||||
|
||||
build:
|
||||
name: Build ${{ matrix.target }}
|
||||
runs-on: native
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- package: x86_64-linux
|
||||
target: x86_64-unknown-linux-gnu
|
||||
# - package: x86_64-linux-musl
|
||||
# target: x86_64-unknown-linux-musl
|
||||
# - package: aarch64-linux
|
||||
# target: aarch64-unknown-linux-gnu
|
||||
# - package: default
|
||||
# target: x86_64-apple-darwin
|
||||
# system: x86_64-darwin
|
||||
# - package: default
|
||||
# target: aarch64-apple-darwin
|
||||
# system: aarch64-darwin
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Build
|
||||
run: nix build --print-build-logs .#packages.x86_64-linux.${{ matrix.package }}
|
||||
|
||||
- name: Prepare release artifact
|
||||
if: startsWith(forgejo.ref, 'refs/tags/')
|
||||
run: cp result/bin/mkv mkv-${{ matrix.target }}
|
||||
|
||||
- name: Upload artifact
|
||||
if: startsWith(forgejo.ref, 'refs/tags/')
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: mkv-${{ matrix.target }}
|
||||
path: mkv-${{ matrix.target }}
|
||||
|
||||
release:
|
||||
name: Create release
|
||||
needs: [test, build]
|
||||
runs-on: native
|
||||
if: startsWith(forgejo.ref, 'refs/tags/')
|
||||
steps:
|
||||
- uses: actions/download-artifact@v3
|
||||
with:
|
||||
path: artifacts
|
||||
|
||||
- name: Collect artifacts
|
||||
run: |
|
||||
mkdir -p release
|
||||
find artifacts -type f -name 'mkv-*' -exec mv {} release/ \;
|
||||
ls -la release/
|
||||
|
||||
- name: Create release
|
||||
run: |
|
||||
tag="${GITHUB_REF#refs/tags/}"
|
||||
curl -X POST \
|
||||
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"tag_name\": \"${tag}\", \"name\": \"${tag}\"}" \
|
||||
"${{ env.GITHUB_SERVER_URL }}/api/v1/repos/${{ github.repository }}/releases"
|
||||
release_id=$(curl -s \
|
||||
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
|
||||
"${{ env.GITHUB_SERVER_URL }}/api/v1/repos/${{ github.repository }}/releases/tags/${tag}" \
|
||||
| jq -r '.id')
|
||||
for file in release/*; do
|
||||
curl -X POST \
|
||||
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
|
||||
-H "Content-Type: application/octet-stream" \
|
||||
--data-binary "@${file}" \
|
||||
"${{ env.GITHUB_SERVER_URL }}/api/v1/repos/${{ github.repository }}/releases/${release_id}/assets?name=$(basename $file)"
|
||||
done
|
||||
33
.github/workflows/ci.yml
vendored
Normal file
33
.github/workflows/ci.yml
vendored
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
name: "CI"
|
||||
on:
|
||||
pull_request:
|
||||
push:
|
||||
branches: ["main"]
|
||||
tags: ["v*"]
|
||||
jobs:
|
||||
build:
|
||||
name: Build packages
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- system: x86_64-linux
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: DeterminateSystems/nix-installer-action@main
|
||||
# - uses: DeterminateSystems/magic-nix-cache-action@main
|
||||
|
||||
- run: nix --print-build-logs build .#packages.$SYSTEM.default
|
||||
env:
|
||||
SYSTEM: ${{ matrix.system }}
|
||||
|
||||
- name: Prepare release artifact
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
run: cp result/bin/mkv mkv-${{ matrix.system }}
|
||||
|
||||
- name: Create release
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
uses: softprops/action-gh-release@v2
|
||||
with:
|
||||
files: mkv-${{ matrix.system }}
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -2,5 +2,3 @@
|
|||
*.db*
|
||||
/result
|
||||
*~
|
||||
\#*
|
||||
.\#*
|
||||
|
|
|
|||
39
Cargo.lock
generated
39
Cargo.lock
generated
|
|
@ -774,7 +774,6 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"axum",
|
||||
"clap",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"rusqlite",
|
||||
"serde",
|
||||
|
|
@ -909,7 +908,7 @@ dependencies = [
|
|||
"bytes",
|
||||
"getrandom 0.3.4",
|
||||
"lru-slab",
|
||||
"rand 0.9.2",
|
||||
"rand",
|
||||
"ring",
|
||||
"rustc-hash",
|
||||
"rustls",
|
||||
|
|
@ -950,35 +949,14 @@ version = "5.3.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.8.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"rand_chacha 0.3.1",
|
||||
"rand_core 0.6.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.9.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
|
||||
dependencies = [
|
||||
"rand_chacha 0.9.0",
|
||||
"rand_core 0.9.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_chacha"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"rand_core 0.6.4",
|
||||
"rand_chacha",
|
||||
"rand_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -988,16 +966,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
|
||||
dependencies = [
|
||||
"ppv-lite86",
|
||||
"rand_core 0.9.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_core"
|
||||
version = "0.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
|
||||
dependencies = [
|
||||
"getrandom 0.2.17",
|
||||
"rand_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ clap = { version = "4", features = ["derive", "env"] }
|
|||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
sha2 = "0.10"
|
||||
rand = "0.8"
|
||||
|
||||
[profile.release]
|
||||
opt-level = 3
|
||||
|
|
|
|||
24
flake.nix
24
flake.nix
|
|
@ -12,17 +12,14 @@
|
|||
darwinBuildInputs = pkgs.lib.optionals pkgs.stdenv.hostPlatform.isDarwin [
|
||||
pkgs.libiconv
|
||||
];
|
||||
mkMkv = pkgs': pkgs'.rustPlatform.buildRustPackage {
|
||||
mkv = pkgs.rustPlatform.buildRustPackage {
|
||||
pname = "mkv";
|
||||
version = "0.1.0";
|
||||
cargoLock.lockFile = ./Cargo.lock;
|
||||
src = pkgs.lib.cleanSource ./.;
|
||||
buildInputs = pkgs'.lib.optionals pkgs'.stdenv.hostPlatform.isDarwin [
|
||||
pkgs'.libiconv
|
||||
];
|
||||
buildInputs = darwinBuildInputs;
|
||||
doCheck = false;
|
||||
};
|
||||
mkv = mkMkv pkgs;
|
||||
in
|
||||
{
|
||||
devShells.default = pkgs.mkShell {
|
||||
|
|
@ -33,21 +30,8 @@
|
|||
pkgs.rust-analyzer
|
||||
];
|
||||
};
|
||||
packages = {
|
||||
default = mkv;
|
||||
x86_64-linux = mkv;
|
||||
} // pkgs.lib.optionalAttrs pkgs.stdenv.hostPlatform.isLinux {
|
||||
x86_64-linux-musl = mkMkv pkgs.pkgsCross.musl64;
|
||||
aarch64-linux = mkMkv pkgs.pkgsCross.aarch64-multiplatform;
|
||||
};
|
||||
checks.default = mkv.overrideAttrs {
|
||||
doCheck = true;
|
||||
checkPhase = ''
|
||||
runHook preCheck
|
||||
cargo test --lib
|
||||
runHook postCheck
|
||||
'';
|
||||
};
|
||||
packages.default = mkv;
|
||||
checks.default = mkv.overrideAttrs { doCheck = true; };
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
|||
49
src/db.rs
49
src/db.rs
|
|
@ -30,18 +30,7 @@ fn encode_volumes(v: &[String]) -> String {
|
|||
serde_json::to_string(v).unwrap()
|
||||
}
|
||||
|
||||
/// Examples: "abc" -> Some("abd"), "ab\xff" -> Some("ac"), "\xff\xff" -> None
|
||||
pub fn prefix_upper_bound(prefix: &str) -> Option<String> {
|
||||
let mut bytes = prefix.as_bytes().to_vec();
|
||||
while let Some(last) = bytes.pop() {
|
||||
if last < 0xFF {
|
||||
bytes.push(last + 1);
|
||||
return Some(String::from_utf8_lossy(&bytes).into_owned());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// A single SQLite connection behind a mutex, used for both reads and writes.
|
||||
#[derive(Clone)]
|
||||
pub struct Db {
|
||||
conn: Arc<Mutex<Connection>>,
|
||||
|
|
@ -104,7 +93,19 @@ impl Db {
|
|||
.collect::<Result<Vec<String>, _>>()?;
|
||||
return Ok(keys);
|
||||
}
|
||||
let upper = prefix_upper_bound(&prefix);
|
||||
// Compute exclusive upper bound: increment last non-0xFF byte
|
||||
let upper = {
|
||||
let mut bytes = prefix.as_bytes().to_vec();
|
||||
let mut result = None;
|
||||
while let Some(last) = bytes.pop() {
|
||||
if last < 0xFF {
|
||||
bytes.push(last + 1);
|
||||
result = Some(String::from_utf8_lossy(&bytes).into_owned());
|
||||
break;
|
||||
}
|
||||
}
|
||||
result
|
||||
};
|
||||
let keys = match &upper {
|
||||
Some(end) => {
|
||||
let mut stmt = conn.prepare_cached(
|
||||
|
|
@ -197,25 +198,3 @@ impl Db {
|
|||
Ok(records)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_prefix_upper_bound_range_correctness() {
|
||||
let prefix = "foo";
|
||||
let upper = prefix_upper_bound(prefix).unwrap();
|
||||
let upper = upper.as_str();
|
||||
|
||||
// in range [prefix, upper)
|
||||
assert!("foo" >= prefix && "foo" < upper);
|
||||
assert!("foo/bar" >= prefix && "foo/bar" < upper);
|
||||
assert!("foobar" >= prefix && "foobar" < upper);
|
||||
assert!("foo\x7f" >= prefix && "foo\x7f" < upper);
|
||||
|
||||
// out of range
|
||||
assert!("fop" >= upper);
|
||||
assert!("fon" < prefix);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,7 +35,6 @@ pub enum AppError {
|
|||
Db(rusqlite::Error),
|
||||
InsufficientVolumes { need: usize, have: usize },
|
||||
PartialWrite,
|
||||
AllVolumesUnreachable,
|
||||
}
|
||||
|
||||
impl From<rusqlite::Error> for AppError {
|
||||
|
|
@ -59,7 +58,6 @@ impl std::fmt::Display for AppError {
|
|||
write!(f, "need {need} volumes but only {have} available")
|
||||
}
|
||||
AppError::PartialWrite => write!(f, "not all volume writes succeeded"),
|
||||
AppError::AllVolumesUnreachable => write!(f, "all volume replicas are unreachable"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -72,7 +70,6 @@ impl IntoResponse for AppError {
|
|||
AppError::Db(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
AppError::InsufficientVolumes { .. } => StatusCode::SERVICE_UNAVAILABLE,
|
||||
AppError::PartialWrite => StatusCode::BAD_GATEWAY,
|
||||
AppError::AllVolumesUnreachable => StatusCode::BAD_GATEWAY,
|
||||
};
|
||||
(status, self.to_string()).into_response()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ pub mod rebuild;
|
|||
pub mod server;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB
|
||||
|
||||
|
|
@ -14,7 +13,6 @@ pub struct Args {
|
|||
pub db_path: String,
|
||||
pub volumes: Vec<String>,
|
||||
pub replicas: usize,
|
||||
pub voltimeout: Duration,
|
||||
}
|
||||
|
||||
pub fn build_app(args: &Args) -> axum::Router {
|
||||
|
|
@ -38,7 +36,6 @@ pub fn build_app(args: &Args) -> axum::Router {
|
|||
db: db::Db::new(&args.db_path),
|
||||
volumes: Arc::new(args.volumes.clone()),
|
||||
replicas: args.replicas,
|
||||
voltimeout: args.voltimeout,
|
||||
http: reqwest::Client::new(),
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,3 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
|
||||
#[derive(Parser)]
|
||||
|
|
@ -20,10 +18,6 @@ struct Cli {
|
|||
#[arg(short, long, env = "MKV_REPLICAS", default_value_t = 2)]
|
||||
replicas: usize,
|
||||
|
||||
/// Timeout for volume health checks (in milliseconds)
|
||||
#[arg(long, env = "MKV_VOLTIMEOUT", default_value_t = 1000)]
|
||||
voltimeout: u64,
|
||||
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
|
@ -71,7 +65,6 @@ async fn main() {
|
|||
db_path: cli.db,
|
||||
volumes: cli.volumes,
|
||||
replicas: cli.replicas,
|
||||
voltimeout: Duration::from_millis(cli.voltimeout),
|
||||
};
|
||||
|
||||
match cli.command {
|
||||
|
|
|
|||
|
|
@ -12,25 +12,6 @@ struct NginxEntry {
|
|||
size: Option<i64>,
|
||||
}
|
||||
|
||||
/// If a key has different sizes across volumes, takes the max.
|
||||
pub fn merge_volume_scans(
|
||||
scans: &[(String, Vec<(String, i64)>)],
|
||||
) -> HashMap<String, (Vec<String>, i64)> {
|
||||
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
|
||||
for (vol_url, keys) in scans {
|
||||
for (key, size) in keys {
|
||||
let entry = index
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| (Vec::new(), *size));
|
||||
entry.0.push(vol_url.clone());
|
||||
if *size > entry.1 {
|
||||
entry.1 = *size;
|
||||
}
|
||||
}
|
||||
}
|
||||
index
|
||||
}
|
||||
|
||||
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
|
||||
let http = reqwest::Client::new();
|
||||
let mut keys = Vec::new();
|
||||
|
|
@ -76,21 +57,25 @@ pub async fn run(args: &Args) {
|
|||
let _ = std::fs::remove_file(format!("{db_path}-shm"));
|
||||
|
||||
let db = db::Db::new(db_path);
|
||||
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
|
||||
|
||||
let mut scans = Vec::new();
|
||||
for vol_url in &args.volumes {
|
||||
eprintln!("Scanning {vol_url}...");
|
||||
match list_volume_keys(vol_url).await {
|
||||
Ok(keys) => {
|
||||
eprintln!(" Found {} keys", keys.len());
|
||||
scans.push((vol_url.clone(), keys));
|
||||
for (key, size) in keys {
|
||||
let entry = index.entry(key).or_insert_with(|| (Vec::new(), size));
|
||||
entry.0.push(vol_url.clone());
|
||||
if size > entry.1 {
|
||||
entry.1 = size;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
let index = merge_volume_scans(&scans);
|
||||
|
||||
let records: Vec<_> = index
|
||||
.into_iter()
|
||||
.map(|(k, (v, s))| (k, v, Some(s)))
|
||||
|
|
@ -99,22 +84,3 @@ pub async fn run(args: &Args) {
|
|||
db.bulk_put(records).await.expect("bulk_put failed");
|
||||
eprintln!("Rebuilt index with {count} keys");
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_merge_takes_max_size() {
|
||||
// Can happen due to incomplete writes or corruption
|
||||
let scans = vec![
|
||||
("http://vol1".to_string(), vec![("key".to_string(), 50)]),
|
||||
("http://vol2".to_string(), vec![("key".to_string(), 200)]),
|
||||
("http://vol3".to_string(), vec![("key".to_string(), 100)]),
|
||||
];
|
||||
let index = merge_volume_scans(&scans);
|
||||
let (volumes, size) = index.get("key").unwrap();
|
||||
assert_eq!(volumes.len(), 3);
|
||||
assert_eq!(*size, 200, "should take maximum size across volumes");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ use axum::extract::{Path, Query, State};
|
|||
use axum::http::{HeaderMap, StatusCode};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::db;
|
||||
use crate::error::{AppError, VolumeError};
|
||||
|
|
@ -13,75 +12,24 @@ pub struct AppState {
|
|||
pub db: db::Db,
|
||||
pub volumes: Arc<Vec<String>>,
|
||||
pub replicas: usize,
|
||||
pub voltimeout: Duration,
|
||||
pub http: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ProbeResult {
|
||||
Found(String),
|
||||
AllFailed,
|
||||
}
|
||||
|
||||
pub fn first_healthy_volume(key: &str, volumes: &[String], results: &[bool]) -> ProbeResult {
|
||||
for (vol, &healthy) in volumes.iter().zip(results) {
|
||||
if healthy {
|
||||
return ProbeResult::Found(format!("{vol}/{key}"));
|
||||
}
|
||||
}
|
||||
ProbeResult::AllFailed
|
||||
}
|
||||
|
||||
pub fn shuffle_volumes(volumes: Vec<String>, seed: u64) -> Vec<String> {
|
||||
use rand::SeedableRng;
|
||||
use rand::seq::SliceRandom;
|
||||
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
|
||||
let mut vols = volumes;
|
||||
vols.shuffle(&mut rng);
|
||||
vols
|
||||
}
|
||||
|
||||
pub async fn get_key(
|
||||
State(state): State<AppState>,
|
||||
Path(key): Path<String>,
|
||||
) -> Result<Response, AppError> {
|
||||
let record = state.db.get(&key).await?;
|
||||
if record.volumes.is_empty() {
|
||||
return Err(AppError::CorruptRecord { key });
|
||||
}
|
||||
|
||||
let seed = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_nanos() as u64)
|
||||
.unwrap_or(0);
|
||||
let volumes = shuffle_volumes(record.volumes, seed);
|
||||
|
||||
let mut results = Vec::with_capacity(volumes.len());
|
||||
for vol in &volumes {
|
||||
let url = format!("{vol}/{key}");
|
||||
let healthy = match state.http.head(&url).timeout(state.voltimeout).send().await {
|
||||
Ok(resp) if resp.status().is_success() => true,
|
||||
Ok(resp) => {
|
||||
tracing::warn!("volume {vol} returned {} for {key}", resp.status());
|
||||
false
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("volume {vol} unreachable for {key}: {e}");
|
||||
false
|
||||
}
|
||||
};
|
||||
results.push(healthy);
|
||||
if healthy {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
match first_healthy_volume(&key, &volumes, &results) {
|
||||
ProbeResult::Found(url) => {
|
||||
Ok((StatusCode::FOUND, [(axum::http::header::LOCATION, url)]).into_response())
|
||||
}
|
||||
ProbeResult::AllFailed => Err(AppError::AllVolumesUnreachable),
|
||||
}
|
||||
let vol = record
|
||||
.volumes
|
||||
.first()
|
||||
.ok_or_else(|| AppError::CorruptRecord { key: key.clone() })?;
|
||||
let location = format!("{vol}/{key}");
|
||||
Ok((
|
||||
StatusCode::FOUND,
|
||||
[(axum::http::header::LOCATION, location)],
|
||||
)
|
||||
.into_response())
|
||||
}
|
||||
|
||||
pub async fn put_key(
|
||||
|
|
@ -97,6 +45,7 @@ pub async fn put_key(
|
|||
});
|
||||
}
|
||||
|
||||
// Fan out PUTs to all target volumes concurrently
|
||||
let mut handles = Vec::with_capacity(target_volumes.len());
|
||||
for vol in &target_volumes {
|
||||
let url = format!("{vol}/{key}");
|
||||
|
|
@ -231,3 +180,20 @@ pub async fn list_keys(
|
|||
let keys = state.db.list_keys(&query.prefix).await?;
|
||||
Ok((StatusCode::OK, keys.join("\n")).into_response())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn test_volumes_for_key_sufficient() {
|
||||
let volumes: Vec<String> = (1..=3).map(|i| format!("http://vol{i}")).collect();
|
||||
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
|
||||
assert_eq!(selected.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_volumes_for_key_insufficient() {
|
||||
let volumes: Vec<String> = vec!["http://vol1".into()];
|
||||
let selected = crate::hasher::volumes_for_key("test-key", &volumes, 2);
|
||||
assert_eq!(selected.len(), 1);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ async fn start_server() -> String {
|
|||
"http://localhost:3103".into(),
|
||||
],
|
||||
replicas: 2,
|
||||
voltimeout: std::time::Duration::from_secs(1),
|
||||
};
|
||||
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue