From 68ae92e4bf07b48e5264aa2ae2fc1241ff460b67 Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sat, 7 Mar 2026 10:17:52 +0100 Subject: [PATCH] Add integration tests --- docker-compose.test.yml | 32 +++++ src/lib.rs | 69 +++++++++++ src/main.rs | 93 ++------------- tests/integration.rs | 254 ++++++++++++++++++++++++++++++++++++++++ tests/nginx.conf | 11 ++ tests/test_config.toml | 16 +++ 6 files changed, 392 insertions(+), 83 deletions(-) create mode 100644 docker-compose.test.yml create mode 100644 src/lib.rs create mode 100644 tests/integration.rs create mode 100644 tests/nginx.conf create mode 100644 tests/test_config.toml diff --git a/docker-compose.test.yml b/docker-compose.test.yml new file mode 100644 index 0000000..b287081 --- /dev/null +++ b/docker-compose.test.yml @@ -0,0 +1,32 @@ +services: + vol1: + image: nginx:alpine + volumes: + - ./tests/nginx.conf:/etc/nginx/conf.d/default.conf + - vol1_data:/data + ports: + - "3101:80" + entrypoint: ["/bin/sh", "-c", "chown nginx:nginx /data && exec nginx -g 'daemon off;'"] + + vol2: + image: nginx:alpine + volumes: + - ./tests/nginx.conf:/etc/nginx/conf.d/default.conf + - vol2_data:/data + ports: + - "3102:80" + entrypoint: ["/bin/sh", "-c", "chown nginx:nginx /data && exec nginx -g 'daemon off;'"] + + vol3: + image: nginx:alpine + volumes: + - ./tests/nginx.conf:/etc/nginx/conf.d/default.conf + - vol3_data:/data + ports: + - "3103:80" + entrypoint: ["/bin/sh", "-c", "chown nginx:nginx /data && exec nginx -g 'daemon off;'"] + +volumes: + vol1_data: + vol2_data: + vol3_data: diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..ecdc948 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,69 @@ +pub mod config; +pub mod db; +pub mod error; +pub mod hasher; +pub mod health; +pub mod server; +pub mod volume; + +use std::collections::HashSet; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Build the axum Router with all state wired up. Returns the router and +/// a handle to the writer (caller must keep it alive). +pub async fn build_app(config: config::Config) -> axum::Router { + let db_path = &config.database.path; + + if let Some(parent) = std::path::Path::new(db_path).parent() { + std::fs::create_dir_all(parent).unwrap_or_else(|e| { + eprintln!("Failed to create database directory: {e}"); + std::process::exit(1); + }); + } + + let (writer, ready_rx) = db::spawn_writer(db_path.to_string()); + ready_rx.await.expect("writer failed to initialize"); + + let num_readers = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(4); + let reads = db::ReadPool::new(db_path, num_readers); + + let volume_urls = config.volume_urls(); + let ring = Arc::new(RwLock::new(hasher::Ring::new( + &volume_urls, + config.server.virtual_nodes, + ))); + + let volume_client = volume::VolumeClient::new(); + + let healthy_volumes: health::HealthyVolumes = + Arc::new(RwLock::new(HashSet::from_iter(volume_urls.clone()))); + + health::spawn_health_checker( + volume_client.clone(), + volume_urls, + healthy_volumes.clone(), + ); + + let state = server::AppState { + writer, + reads, + ring, + volume_client, + healthy_volumes, + config: Arc::new(config), + }; + + axum::Router::new() + .route("/", axum::routing::get(server::list_keys)) + .route( + "/{*key}", + axum::routing::get(server::get_key) + .put(server::put_key) + .delete(server::delete_key) + .head(server::head_key), + ) + .with_state(state) +} diff --git a/src/main.rs b/src/main.rs index a993ae9..37a045f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,5 @@ -mod config; -mod db; -mod error; -mod hasher; -mod health; -mod server; -mod volume; - use clap::{Parser, Subcommand}; -use std::collections::HashSet; use std::path::PathBuf; -use std::sync::Arc; -use tokio::sync::RwLock; #[derive(Parser)] #[command(name = "mkv", about = "Distributed key-value store")] @@ -40,13 +29,21 @@ async fn main() { tracing_subscriber::fmt::init(); let cli = Cli::parse(); - let config = config::Config::load(&cli.config).unwrap_or_else(|e| { + let config = mkv::config::Config::load(&cli.config).unwrap_or_else(|e| { eprintln!("Failed to load config: {e}"); std::process::exit(1); }); match cli.command { - Commands::Serve => serve(config).await, + Commands::Serve => { + let port = config.server.port; + let app = mkv::build_app(config).await; + + let addr = format!("0.0.0.0:{port}"); + tracing::info!("Listening on {addr}"); + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + axum::serve(listener, app).await.unwrap(); + } Commands::Rebuild => { eprintln!("rebuild not yet implemented"); std::process::exit(1); @@ -57,73 +54,3 @@ async fn main() { } } } - -async fn serve(config: config::Config) { - let db_path = &config.database.path; - - // Ensure parent directory exists - if let Some(parent) = std::path::Path::new(db_path).parent() { - std::fs::create_dir_all(parent).unwrap_or_else(|e| { - eprintln!("Failed to create database directory: {e}"); - std::process::exit(1); - }); - } - - let (writer, ready_rx) = db::spawn_writer(db_path.to_string()); - ready_rx.await.expect("writer failed to initialize"); - - let num_readers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(4); - let reads = db::ReadPool::new(db_path, num_readers); - - let volume_urls = config.volume_urls(); - let ring = Arc::new(RwLock::new(hasher::Ring::new( - &volume_urls, - config.server.virtual_nodes, - ))); - - let volume_client = volume::VolumeClient::new(); - - // Start with all volumes assumed healthy, health checker will update - let healthy_volumes: health::HealthyVolumes = - Arc::new(RwLock::new(HashSet::from_iter(volume_urls.clone()))); - - health::spawn_health_checker( - volume_client.clone(), - volume_urls.clone(), - healthy_volumes.clone(), - ); - - let port = config.server.port; - - let state = server::AppState { - writer, - reads, - ring, - volume_client, - healthy_volumes, - config: Arc::new(config), - }; - - tracing::info!("Starting mkv server on port {port}"); - tracing::info!(" Readers: {num_readers}"); - tracing::info!(" Volumes: {volume_urls:?}"); - tracing::info!(" Replication factor: {}", state.config.server.replication_factor); - - let app = axum::Router::new() - .route("/", axum::routing::get(server::list_keys)) - .route( - "/{*key}", - axum::routing::get(server::get_key) - .put(server::put_key) - .delete(server::delete_key) - .head(server::head_key), - ) - .with_state(state); - - let addr = format!("0.0.0.0:{port}"); - let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); - tracing::info!("Listening on {addr}"); - axum::serve(listener, app).await.unwrap(); -} diff --git a/tests/integration.rs b/tests/integration.rs new file mode 100644 index 0000000..fce1057 --- /dev/null +++ b/tests/integration.rs @@ -0,0 +1,254 @@ +use reqwest::StatusCode; +use std::path::Path; +use std::sync::atomic::{AtomicU32, Ordering}; + +static TEST_COUNTER: AtomicU32 = AtomicU32::new(0); + +/// Start the mkv server in-process on a random port with its own DB. +/// Returns the base URL (e.g. "http://localhost:12345"). +async fn start_server() -> String { + let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed); + let db_path = format!("/tmp/mkv-test/index-{id}.db"); + + // Clean up any previous test database + let _ = std::fs::remove_file(&db_path); + let _ = std::fs::remove_file(format!("{db_path}-wal")); + let _ = std::fs::remove_file(format!("{db_path}-shm")); + + let mut config = + mkv::config::Config::load(Path::new("tests/test_config.toml")).expect("load test config"); + config.database.path = db_path; + + // Bind to port 0 to get a random available port + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + let app = mkv::build_app(config).await; + + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + // Give the server a moment to start + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + format!("http://127.0.0.1:{port}") +} + +fn client() -> reqwest::Client { + reqwest::Client::builder() + .redirect(reqwest::redirect::Policy::none()) + .build() + .unwrap() +} + +#[tokio::test] +async fn test_put_and_head() { + let base = start_server().await; + let c = client(); + + // PUT a key + let resp = c + .put(format!("{base}/hello")) + .body("world") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + + // HEAD should return 200 with content-length + let resp = c.head(format!("{base}/hello")).send().await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + resp.headers() + .get("content-length") + .unwrap() + .to_str() + .unwrap(), + "5" + ); +} + +#[tokio::test] +async fn test_put_and_get_redirect() { + let base = start_server().await; + let c = client(); + + // PUT + let resp = c + .put(format!("{base}/redirect-test")) + .body("some data") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + + // GET should return 302 with Location header pointing to a volume + let resp = c.get(format!("{base}/redirect-test")).send().await.unwrap(); + assert_eq!(resp.status(), StatusCode::FOUND); + + let location = resp.headers().get("location").unwrap().to_str().unwrap(); + assert!( + location.starts_with("http://localhost:310"), + "location should point to a volume, got: {location}" + ); + + // Follow the redirect manually and verify the blob content + let blob_resp = reqwest::get(location).await.unwrap(); + assert_eq!(blob_resp.status(), StatusCode::OK); + assert_eq!(blob_resp.text().await.unwrap(), "some data"); +} + +#[tokio::test] +async fn test_get_nonexistent_returns_404() { + let base = start_server().await; + let c = client(); + + let resp = c.get(format!("{base}/does-not-exist")).send().await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn test_put_get_delete_get() { + let base = start_server().await; + let c = client(); + + // PUT + let resp = c + .put(format!("{base}/delete-me")) + .body("temporary") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + + // GET → 302 + let resp = c.get(format!("{base}/delete-me")).send().await.unwrap(); + assert_eq!(resp.status(), StatusCode::FOUND); + + // DELETE → 204 + let resp = c.delete(format!("{base}/delete-me")).send().await.unwrap(); + assert_eq!(resp.status(), StatusCode::NO_CONTENT); + + // GET after delete → 404 + let resp = c.get(format!("{base}/delete-me")).send().await.unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn test_delete_nonexistent_returns_404() { + let base = start_server().await; + let c = client(); + + let resp = c + .delete(format!("{base}/never-existed")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn test_list_keys() { + let base = start_server().await; + let c = client(); + + // PUT a few keys with a common prefix + for name in ["docs/a", "docs/b", "docs/c", "other/x"] { + c.put(format!("{base}/{name}")) + .body("data") + .send() + .await + .unwrap(); + } + + // List all + let resp = c.get(format!("{base}/")).send().await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.text().await.unwrap(); + assert!(body.contains("docs/a")); + assert!(body.contains("other/x")); + + // List with prefix filter + let resp = c + .get(format!("{base}/?prefix=docs/")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.text().await.unwrap(); + let lines: Vec<&str> = body.lines().collect(); + assert_eq!(lines.len(), 3); + assert!(!body.contains("other/x")); +} + +#[tokio::test] +async fn test_put_overwrite() { + let base = start_server().await; + let c = client(); + + // PUT v1 + c.put(format!("{base}/overwrite")) + .body("version1") + .send() + .await + .unwrap(); + + // PUT v2 (same key) + let resp = c + .put(format!("{base}/overwrite")) + .body("version2") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + + // HEAD should reflect new size + let resp = c.head(format!("{base}/overwrite")).send().await.unwrap(); + assert_eq!( + resp.headers() + .get("content-length") + .unwrap() + .to_str() + .unwrap(), + "8" + ); + + // Follow redirect, verify content is v2 + let resp = c.get(format!("{base}/overwrite")).send().await.unwrap(); + let location = resp.headers().get("location").unwrap().to_str().unwrap(); + let body = reqwest::get(location).await.unwrap().text().await.unwrap(); + assert_eq!(body, "version2"); +} + +#[tokio::test] +async fn test_replication_writes_to_multiple_volumes() { + let base = start_server().await; + let c = client(); + + // PUT a key (replication_factor=2 in test config) + c.put(format!("{base}/replicated")) + .body("replica-data") + .send() + .await + .unwrap(); + + // HEAD to confirm it exists + let resp = c + .head(format!("{base}/replicated")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + // GET and verify the blob is accessible + let resp = c + .get(format!("{base}/replicated")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::FOUND); + let location = resp.headers().get("location").unwrap().to_str().unwrap(); + let body = reqwest::get(location).await.unwrap().text().await.unwrap(); + assert_eq!(body, "replica-data"); +} diff --git a/tests/nginx.conf b/tests/nginx.conf new file mode 100644 index 0000000..6bc98e8 --- /dev/null +++ b/tests/nginx.conf @@ -0,0 +1,11 @@ +server { + listen 80; + root /data; + + location / { + dav_methods PUT DELETE; + create_full_put_path on; + autoindex on; + autoindex_format json; + } +} diff --git a/tests/test_config.toml b/tests/test_config.toml new file mode 100644 index 0000000..d00eac0 --- /dev/null +++ b/tests/test_config.toml @@ -0,0 +1,16 @@ +[server] +port = 3100 +replication_factor = 2 +virtual_nodes = 100 + +[database] +path = "/tmp/mkv-test/index.db" + +[[volumes]] +url = "http://localhost:3101" + +[[volumes]] +url = "http://localhost:3102" + +[[volumes]] +url = "http://localhost:3103"