Add integration tests

This commit is contained in:
Silas Brack 2026-03-07 10:17:52 +01:00
parent d7c9192ebb
commit 68ae92e4bf
6 changed files with 392 additions and 83 deletions

32
docker-compose.test.yml Normal file
View file

@ -0,0 +1,32 @@
services:
vol1:
image: nginx:alpine
volumes:
- ./tests/nginx.conf:/etc/nginx/conf.d/default.conf
- vol1_data:/data
ports:
- "3101:80"
entrypoint: ["/bin/sh", "-c", "chown nginx:nginx /data && exec nginx -g 'daemon off;'"]
vol2:
image: nginx:alpine
volumes:
- ./tests/nginx.conf:/etc/nginx/conf.d/default.conf
- vol2_data:/data
ports:
- "3102:80"
entrypoint: ["/bin/sh", "-c", "chown nginx:nginx /data && exec nginx -g 'daemon off;'"]
vol3:
image: nginx:alpine
volumes:
- ./tests/nginx.conf:/etc/nginx/conf.d/default.conf
- vol3_data:/data
ports:
- "3103:80"
entrypoint: ["/bin/sh", "-c", "chown nginx:nginx /data && exec nginx -g 'daemon off;'"]
volumes:
vol1_data:
vol2_data:
vol3_data:

69
src/lib.rs Normal file
View file

@ -0,0 +1,69 @@
pub mod config;
pub mod db;
pub mod error;
pub mod hasher;
pub mod health;
pub mod server;
pub mod volume;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
/// Build the axum Router with all state wired up. Returns the router and
/// a handle to the writer (caller must keep it alive).
pub async fn build_app(config: config::Config) -> axum::Router {
let db_path = &config.database.path;
if let Some(parent) = std::path::Path::new(db_path).parent() {
std::fs::create_dir_all(parent).unwrap_or_else(|e| {
eprintln!("Failed to create database directory: {e}");
std::process::exit(1);
});
}
let (writer, ready_rx) = db::spawn_writer(db_path.to_string());
ready_rx.await.expect("writer failed to initialize");
let num_readers = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let reads = db::ReadPool::new(db_path, num_readers);
let volume_urls = config.volume_urls();
let ring = Arc::new(RwLock::new(hasher::Ring::new(
&volume_urls,
config.server.virtual_nodes,
)));
let volume_client = volume::VolumeClient::new();
let healthy_volumes: health::HealthyVolumes =
Arc::new(RwLock::new(HashSet::from_iter(volume_urls.clone())));
health::spawn_health_checker(
volume_client.clone(),
volume_urls,
healthy_volumes.clone(),
);
let state = server::AppState {
writer,
reads,
ring,
volume_client,
healthy_volumes,
config: Arc::new(config),
};
axum::Router::new()
.route("/", axum::routing::get(server::list_keys))
.route(
"/{*key}",
axum::routing::get(server::get_key)
.put(server::put_key)
.delete(server::delete_key)
.head(server::head_key),
)
.with_state(state)
}

View file

@ -1,16 +1,5 @@
mod config;
mod db;
mod error;
mod hasher;
mod health;
mod server;
mod volume;
use clap::{Parser, Subcommand};
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Parser)]
#[command(name = "mkv", about = "Distributed key-value store")]
@ -40,13 +29,21 @@ async fn main() {
tracing_subscriber::fmt::init();
let cli = Cli::parse();
let config = config::Config::load(&cli.config).unwrap_or_else(|e| {
let config = mkv::config::Config::load(&cli.config).unwrap_or_else(|e| {
eprintln!("Failed to load config: {e}");
std::process::exit(1);
});
match cli.command {
Commands::Serve => serve(config).await,
Commands::Serve => {
let port = config.server.port;
let app = mkv::build_app(config).await;
let addr = format!("0.0.0.0:{port}");
tracing::info!("Listening on {addr}");
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}
Commands::Rebuild => {
eprintln!("rebuild not yet implemented");
std::process::exit(1);
@ -57,73 +54,3 @@ async fn main() {
}
}
}
async fn serve(config: config::Config) {
let db_path = &config.database.path;
// Ensure parent directory exists
if let Some(parent) = std::path::Path::new(db_path).parent() {
std::fs::create_dir_all(parent).unwrap_or_else(|e| {
eprintln!("Failed to create database directory: {e}");
std::process::exit(1);
});
}
let (writer, ready_rx) = db::spawn_writer(db_path.to_string());
ready_rx.await.expect("writer failed to initialize");
let num_readers = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let reads = db::ReadPool::new(db_path, num_readers);
let volume_urls = config.volume_urls();
let ring = Arc::new(RwLock::new(hasher::Ring::new(
&volume_urls,
config.server.virtual_nodes,
)));
let volume_client = volume::VolumeClient::new();
// Start with all volumes assumed healthy, health checker will update
let healthy_volumes: health::HealthyVolumes =
Arc::new(RwLock::new(HashSet::from_iter(volume_urls.clone())));
health::spawn_health_checker(
volume_client.clone(),
volume_urls.clone(),
healthy_volumes.clone(),
);
let port = config.server.port;
let state = server::AppState {
writer,
reads,
ring,
volume_client,
healthy_volumes,
config: Arc::new(config),
};
tracing::info!("Starting mkv server on port {port}");
tracing::info!(" Readers: {num_readers}");
tracing::info!(" Volumes: {volume_urls:?}");
tracing::info!(" Replication factor: {}", state.config.server.replication_factor);
let app = axum::Router::new()
.route("/", axum::routing::get(server::list_keys))
.route(
"/{*key}",
axum::routing::get(server::get_key)
.put(server::put_key)
.delete(server::delete_key)
.head(server::head_key),
)
.with_state(state);
let addr = format!("0.0.0.0:{port}");
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
tracing::info!("Listening on {addr}");
axum::serve(listener, app).await.unwrap();
}

254
tests/integration.rs Normal file
View file

@ -0,0 +1,254 @@
use reqwest::StatusCode;
use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering};
static TEST_COUNTER: AtomicU32 = AtomicU32::new(0);
/// Start the mkv server in-process on a random port with its own DB.
/// Returns the base URL (e.g. "http://localhost:12345").
async fn start_server() -> String {
let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
let db_path = format!("/tmp/mkv-test/index-{id}.db");
// Clean up any previous test database
let _ = std::fs::remove_file(&db_path);
let _ = std::fs::remove_file(format!("{db_path}-wal"));
let _ = std::fs::remove_file(format!("{db_path}-shm"));
let mut config =
mkv::config::Config::load(Path::new("tests/test_config.toml")).expect("load test config");
config.database.path = db_path;
// Bind to port 0 to get a random available port
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let app = mkv::build_app(config).await;
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
// Give the server a moment to start
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
format!("http://127.0.0.1:{port}")
}
fn client() -> reqwest::Client {
reqwest::Client::builder()
.redirect(reqwest::redirect::Policy::none())
.build()
.unwrap()
}
#[tokio::test]
async fn test_put_and_head() {
let base = start_server().await;
let c = client();
// PUT a key
let resp = c
.put(format!("{base}/hello"))
.body("world")
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
// HEAD should return 200 with content-length
let resp = c.head(format!("{base}/hello")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers()
.get("content-length")
.unwrap()
.to_str()
.unwrap(),
"5"
);
}
#[tokio::test]
async fn test_put_and_get_redirect() {
let base = start_server().await;
let c = client();
// PUT
let resp = c
.put(format!("{base}/redirect-test"))
.body("some data")
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
// GET should return 302 with Location header pointing to a volume
let resp = c.get(format!("{base}/redirect-test")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::FOUND);
let location = resp.headers().get("location").unwrap().to_str().unwrap();
assert!(
location.starts_with("http://localhost:310"),
"location should point to a volume, got: {location}"
);
// Follow the redirect manually and verify the blob content
let blob_resp = reqwest::get(location).await.unwrap();
assert_eq!(blob_resp.status(), StatusCode::OK);
assert_eq!(blob_resp.text().await.unwrap(), "some data");
}
#[tokio::test]
async fn test_get_nonexistent_returns_404() {
let base = start_server().await;
let c = client();
let resp = c.get(format!("{base}/does-not-exist")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_put_get_delete_get() {
let base = start_server().await;
let c = client();
// PUT
let resp = c
.put(format!("{base}/delete-me"))
.body("temporary")
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
// GET → 302
let resp = c.get(format!("{base}/delete-me")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::FOUND);
// DELETE → 204
let resp = c.delete(format!("{base}/delete-me")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
// GET after delete → 404
let resp = c.get(format!("{base}/delete-me")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_delete_nonexistent_returns_404() {
let base = start_server().await;
let c = client();
let resp = c
.delete(format!("{base}/never-existed"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_list_keys() {
let base = start_server().await;
let c = client();
// PUT a few keys with a common prefix
for name in ["docs/a", "docs/b", "docs/c", "other/x"] {
c.put(format!("{base}/{name}"))
.body("data")
.send()
.await
.unwrap();
}
// List all
let resp = c.get(format!("{base}/")).send().await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.text().await.unwrap();
assert!(body.contains("docs/a"));
assert!(body.contains("other/x"));
// List with prefix filter
let resp = c
.get(format!("{base}/?prefix=docs/"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.text().await.unwrap();
let lines: Vec<&str> = body.lines().collect();
assert_eq!(lines.len(), 3);
assert!(!body.contains("other/x"));
}
#[tokio::test]
async fn test_put_overwrite() {
let base = start_server().await;
let c = client();
// PUT v1
c.put(format!("{base}/overwrite"))
.body("version1")
.send()
.await
.unwrap();
// PUT v2 (same key)
let resp = c
.put(format!("{base}/overwrite"))
.body("version2")
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::CREATED);
// HEAD should reflect new size
let resp = c.head(format!("{base}/overwrite")).send().await.unwrap();
assert_eq!(
resp.headers()
.get("content-length")
.unwrap()
.to_str()
.unwrap(),
"8"
);
// Follow redirect, verify content is v2
let resp = c.get(format!("{base}/overwrite")).send().await.unwrap();
let location = resp.headers().get("location").unwrap().to_str().unwrap();
let body = reqwest::get(location).await.unwrap().text().await.unwrap();
assert_eq!(body, "version2");
}
#[tokio::test]
async fn test_replication_writes_to_multiple_volumes() {
let base = start_server().await;
let c = client();
// PUT a key (replication_factor=2 in test config)
c.put(format!("{base}/replicated"))
.body("replica-data")
.send()
.await
.unwrap();
// HEAD to confirm it exists
let resp = c
.head(format!("{base}/replicated"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
// GET and verify the blob is accessible
let resp = c
.get(format!("{base}/replicated"))
.send()
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FOUND);
let location = resp.headers().get("location").unwrap().to_str().unwrap();
let body = reqwest::get(location).await.unwrap().text().await.unwrap();
assert_eq!(body, "replica-data");
}

11
tests/nginx.conf Normal file
View file

@ -0,0 +1,11 @@
server {
listen 80;
root /data;
location / {
dav_methods PUT DELETE;
create_full_put_path on;
autoindex on;
autoindex_format json;
}
}

16
tests/test_config.toml Normal file
View file

@ -0,0 +1,16 @@
[server]
port = 3100
replication_factor = 2
virtual_nodes = 100
[database]
path = "/tmp/mkv-test/index.db"
[[volumes]]
url = "http://localhost:3101"
[[volumes]]
url = "http://localhost:3102"
[[volumes]]
url = "http://localhost:3103"