From 7f3ec69cf6b9a3f55fe3e848be2ef25d9f4a15a8 Mon Sep 17 00:00:00 2001 From: Silas Brack Date: Sat, 7 Mar 2026 13:09:53 +0100 Subject: [PATCH] Simplify --- config.bench.toml | 16 ++++ load_test.py | 171 ++++++++++++++++++++++++++++++++++++++++ src/db.rs | 194 +++++++++++----------------------------------- src/error.rs | 4 - src/lib.rs | 3 +- src/rebalance.rs | 3 +- src/rebuild.rs | 3 +- 7 files changed, 236 insertions(+), 158 deletions(-) create mode 100644 config.bench.toml create mode 100644 load_test.py diff --git a/config.bench.toml b/config.bench.toml new file mode 100644 index 0000000..f4f05d7 --- /dev/null +++ b/config.bench.toml @@ -0,0 +1,16 @@ +[server] +port = 3000 +replication_factor = 2 +virtual_nodes = 100 + +[database] +path = "/tmp/mkv-bench/index.db" + +[[volumes]] +url = "http://localhost:3101" + +[[volumes]] +url = "http://localhost:3102" + +[[volumes]] +url = "http://localhost:3103" diff --git a/load_test.py b/load_test.py new file mode 100644 index 0000000..44d4693 --- /dev/null +++ b/load_test.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +""" +Load test for mkv or minikeyvalue. + +Usage: + python3 load_test.py http://localhost:3000 # test mkv + python3 load_test.py http://localhost:3001 # test minikeyvalue + +Options: + --keys N Number of keys to test (default: 1000) + --concurrency N Number of concurrent requests (default: 50) + --size N Value size in bytes (default: 1024) +""" + +import argparse +import asyncio +import os +import time +import aiohttp + + +def make_value(size: int) -> bytes: + return os.urandom(size) + + +async def run_puts(session, base_url, keys, value, concurrency): + """PUT all keys, return (total_time, errors).""" + sem = asyncio.Semaphore(concurrency) + errors = 0 + + async def put_one(key): + nonlocal errors + async with sem: + try: + async with session.put(f"{base_url}/{key}", data=value) as resp: + if resp.status not in (200, 201, 204): + errors += 1 + except Exception: + errors += 1 + + start = time.monotonic() + await asyncio.gather(*(put_one(k) for k in keys)) + elapsed = time.monotonic() - start + return elapsed, errors + + +async def run_gets(session, base_url, keys, concurrency, follow_redirects): + """GET all keys, return (total_time, errors).""" + sem = asyncio.Semaphore(concurrency) + errors = 0 + + async def get_one(key): + nonlocal errors + async with sem: + try: + async with session.get( + f"{base_url}/{key}", + allow_redirects=follow_redirects, + ) as resp: + if follow_redirects: + if resp.status != 200: + errors += 1 + else: + await resp.read() + else: + # For redirect-based (mkv), 302 is success + if resp.status not in (200, 302): + errors += 1 + except Exception: + errors += 1 + + start = time.monotonic() + await asyncio.gather(*(get_one(k) for k in keys)) + elapsed = time.monotonic() - start + return elapsed, errors + + +async def run_deletes(session, base_url, keys, concurrency): + """DELETE all keys, return (total_time, errors).""" + sem = asyncio.Semaphore(concurrency) + errors = 0 + + async def delete_one(key): + nonlocal errors + async with sem: + try: + async with session.delete(f"{base_url}/{key}") as resp: + if resp.status not in (200, 204): + errors += 1 + except Exception: + errors += 1 + + start = time.monotonic() + await asyncio.gather(*(delete_one(k) for k in keys)) + elapsed = time.monotonic() - start + return elapsed, errors + + +def print_result(label, count, elapsed, errors): + rps = count / elapsed if elapsed > 0 else 0 + print(f" {label:12s} {elapsed:7.2f}s {rps:8.0f} req/s {errors} errors") + + +async def main(): + parser = argparse.ArgumentParser(description="Load test mkv or minikeyvalue") + parser.add_argument("url", help="Base URL (e.g. http://localhost:3000)") + parser.add_argument("--keys", type=int, default=1000, help="Number of keys") + parser.add_argument("--concurrency", type=int, default=50, help="Concurrent requests") + parser.add_argument("--size", type=int, default=1024, help="Value size in bytes") + parser.add_argument( + "--follow-redirects", action="store_true", + help="Follow GET redirects (use for mkv to measure full round-trip)", + ) + parser.add_argument( + "--prefix", default="loadtest", + help="Key prefix (use different prefixes to avoid collisions)", + ) + args = parser.parse_args() + + base = args.url.rstrip("/") + keys = [f"{args.prefix}/key-{i:06d}" for i in range(args.keys)] + value = make_value(args.size) + + print(f"Target: {base}") + print(f"Keys: {args.keys}") + print(f"Concurrency: {args.concurrency}") + print(f"Value size: {args.size} bytes") + print(f"Follow redir:{args.follow_redirects}") + print() + + conn = aiohttp.TCPConnector(limit=args.concurrency + 10) + async with aiohttp.ClientSession(connector=conn) as session: + # Warmup — check server is reachable + try: + async with session.get(base) as resp: + pass + except Exception as e: + print(f"ERROR: Cannot reach {base}: {e}") + return + + # PUTs + put_time, put_err = await run_puts(session, base, keys, value, args.concurrency) + print_result("PUT", len(keys), put_time, put_err) + + # GETs + get_time, get_err = await run_gets( + session, base, keys, args.concurrency, args.follow_redirects + ) + print_result("GET", len(keys), get_time, get_err) + + # Second GET pass (warm) + get2_time, get2_err = await run_gets( + session, base, keys, args.concurrency, args.follow_redirects + ) + print_result("GET (warm)", len(keys), get2_time, get2_err) + + # DELETEs + del_time, del_err = await run_deletes(session, base, keys, args.concurrency) + print_result("DELETE", len(keys), del_time, del_err) + + print() + total = put_time + get_time + get2_time + del_time + total_ops = len(keys) * 4 + print(f"Total: {total_ops} ops in {total:.2f}s ({total_ops / total:.0f} ops/s)") + print() + print("Note: PUT/DELETE throughput is bottlenecked by HTTP round-trips") + print("to volume servers (nginx), not by the index (SQLite/LevelDB).") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/db.rs b/src/db.rs index 0e6386b..37b1a5f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,7 +1,6 @@ use rusqlite::{params, Connection, OpenFlags}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use tokio::sync::{mpsc, oneshot}; use crate::error::AppError; @@ -151,174 +150,73 @@ pub fn all_records(conn: &Connection) -> Result, AppError> { Ok(records) } -// --- Write commands --- - -pub enum WriteCmd { - Put { - key: String, - volumes: Vec, - size: Option, - reply: oneshot::Sender>, - }, - Delete { - key: String, - reply: oneshot::Sender>, - }, - BulkPut { - records: Vec<(String, Vec, Option)>, - reply: oneshot::Sender>, - }, -} - -fn execute_cmd( - conn: &Connection, - cmd: WriteCmd, -) -> (Result<(), AppError>, oneshot::Sender>) { - match cmd { - WriteCmd::Put { - key, - volumes, - size, - reply, - } => { - let volumes_json = encode_volumes(&volumes); - let result = conn - .prepare_cached( - "INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3) - ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", - ) - .and_then(|mut s| s.execute(params![key, volumes_json, size])) - .map(|_| ()) - .map_err(AppError::from); - (result, reply) - } - WriteCmd::Delete { key, reply } => { - let result = conn - .prepare_cached("DELETE FROM kv WHERE key = ?1") - .and_then(|mut s| s.execute(params![key])) - .map(|_| ()) - .map_err(AppError::from); - (result, reply) - } - WriteCmd::BulkPut { records, reply } => { - let result = (|| -> Result<(), AppError> { - let mut stmt = conn.prepare_cached( - "INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3) - ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", - )?; - for (key, volumes, size) in &records { - let volumes_json = encode_volumes(volumes); - stmt.execute(params![key, volumes_json, size])?; - } - Ok(()) - })(); - (result, reply) - } - } -} - // --- WriterHandle --- #[derive(Clone)] pub struct WriterHandle { - tx: mpsc::Sender, + conn: Arc>, } impl WriterHandle { + pub fn new(path: &str) -> Self { + let conn = open_readwrite(path); + create_tables(&conn); + Self { + conn: Arc::new(Mutex::new(conn)), + } + } + pub async fn put( &self, key: String, volumes: Vec, size: Option, ) -> Result<(), AppError> { - let (reply_tx, reply_rx) = oneshot::channel(); - self.tx - .send(WriteCmd::Put { - key, - volumes, - size, - reply: reply_tx, - }) - .await - .map_err(|_| AppError::WriterDead)?; - reply_rx.await.map_err(|_| AppError::WriterDroppedReply)? + let conn = self.conn.clone(); + tokio::task::spawn_blocking(move || { + let conn = conn.lock().unwrap(); + let volumes_json = encode_volumes(&volumes); + conn.prepare_cached( + "INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3) + ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", + )? + .execute(params![key, volumes_json, size])?; + Ok(()) + }) + .await + .unwrap() } pub async fn delete(&self, key: String) -> Result<(), AppError> { - let (reply_tx, reply_rx) = oneshot::channel(); - self.tx - .send(WriteCmd::Delete { - key, - reply: reply_tx, - }) - .await - .map_err(|_| AppError::WriterDead)?; - reply_rx.await.map_err(|_| AppError::WriterDroppedReply)? + let conn = self.conn.clone(); + tokio::task::spawn_blocking(move || { + let conn = conn.lock().unwrap(); + conn.prepare_cached("DELETE FROM kv WHERE key = ?1")? + .execute(params![key])?; + Ok(()) + }) + .await + .unwrap() } pub async fn bulk_put( &self, records: Vec<(String, Vec, Option)>, ) -> Result<(), AppError> { - let (reply_tx, reply_rx) = oneshot::channel(); - self.tx - .send(WriteCmd::BulkPut { - records, - reply: reply_tx, - }) - .await - .map_err(|_| AppError::WriterDead)?; - reply_rx.await.map_err(|_| AppError::WriterDroppedReply)? + let conn = self.conn.clone(); + tokio::task::spawn_blocking(move || { + let conn = conn.lock().unwrap(); + let mut stmt = conn.prepare_cached( + "INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3) + ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", + )?; + for (key, volumes, size) in &records { + let volumes_json = encode_volumes(volumes); + stmt.execute(params![key, volumes_json, size])?; + } + Ok(()) + }) + .await + .unwrap() } } - -// --- spawn_writer --- - -pub fn spawn_writer(path: String) -> (WriterHandle, oneshot::Receiver<()>) { - let (tx, mut rx) = mpsc::channel::(4096); - let (ready_tx, ready_rx) = oneshot::channel(); - - std::thread::spawn(move || { - let conn = open_readwrite(&path); - create_tables(&conn); - let _ = ready_tx.send(()); - - loop { - let Some(first) = rx.blocking_recv() else { - break; - }; - - let mut batch = vec![first]; - while batch.len() < 512 { - match rx.try_recv() { - Ok(cmd) => batch.push(cmd), - Err(_) => break, - } - } - - let _ = conn.execute_batch("BEGIN"); - let mut replies: Vec<(Result<(), AppError>, oneshot::Sender>)> = - Vec::with_capacity(batch.len()); - - for (i, cmd) in batch.into_iter().enumerate() { - let sp = format!("sp{i}"); - let _ = conn.execute(&format!("SAVEPOINT {sp}"), []); - let (result, reply) = execute_cmd(&conn, cmd); - if result.is_ok() { - let _ = conn.execute(&format!("RELEASE {sp}"), []); - } else { - let _ = conn.execute(&format!("ROLLBACK TO {sp}"), []); - let _ = conn.execute(&format!("RELEASE {sp}"), []); - } - replies.push((result, reply)); - } - - let _ = conn.execute_batch("COMMIT"); - for (result, reply) in replies { - let _ = reply.send(result); - } - } - }); - - (WriterHandle { tx }, ready_rx) -} diff --git a/src/error.rs b/src/error.rs index 8c46966..75272d0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,8 +5,6 @@ use axum::response::{IntoResponse, Response}; pub enum AppError { NotFound, Db(rusqlite::Error), - WriterDead, - WriterDroppedReply, VolumeError(String), NoHealthyVolume, } @@ -25,8 +23,6 @@ impl std::fmt::Display for AppError { match self { AppError::NotFound => write!(f, "not found"), AppError::Db(e) => write!(f, "database error: {e}"), - AppError::WriterDead => write!(f, "writer dead"), - AppError::WriterDroppedReply => write!(f, "writer dropped reply"), AppError::VolumeError(msg) => write!(f, "volume error: {msg}"), AppError::NoHealthyVolume => write!(f, "no healthy volume available"), } diff --git a/src/lib.rs b/src/lib.rs index 8689b72..ba245f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,8 +24,7 @@ pub async fn build_app(config: config::Config) -> axum::Router { }); } - let (writer, ready_rx) = db::spawn_writer(db_path.to_string()); - ready_rx.await.expect("writer failed to initialize"); + let writer = db::WriterHandle::new(db_path); let num_readers = std::thread::available_parallelism() .map(|n| n.get()) diff --git a/src/rebalance.rs b/src/rebalance.rs index 6bd1df9..bcba766 100644 --- a/src/rebalance.rs +++ b/src/rebalance.rs @@ -88,8 +88,7 @@ pub async fn run(config: &Config, dry_run: bool) { } // Open writer for updates - let (writer, ready_rx) = db::spawn_writer(db_path.to_string()); - ready_rx.await.expect("writer failed to initialize"); + let writer = db::WriterHandle::new(db_path); let client = VolumeClient::new(); let mut moved = 0; diff --git a/src/rebuild.rs b/src/rebuild.rs index f7e5aa3..b2d9f9d 100644 --- a/src/rebuild.rs +++ b/src/rebuild.rs @@ -66,8 +66,7 @@ pub async fn run(config: &Config) { let _ = std::fs::remove_file(format!("{db_path}-wal")); let _ = std::fs::remove_file(format!("{db_path}-shm")); - let (writer, ready_rx) = db::spawn_writer(db_path.to_string()); - ready_rx.await.expect("writer failed to initialize"); + let writer = db::WriterHandle::new(db_path); let volume_urls = config.volume_urls();