This commit is contained in:
Silas Brack 2026-03-07 13:09:53 +01:00
parent 1461b41a36
commit 7f3ec69cf6
7 changed files with 236 additions and 158 deletions

16
config.bench.toml Normal file
View file

@ -0,0 +1,16 @@
[server]
port = 3000
replication_factor = 2
virtual_nodes = 100
[database]
path = "/tmp/mkv-bench/index.db"
[[volumes]]
url = "http://localhost:3101"
[[volumes]]
url = "http://localhost:3102"
[[volumes]]
url = "http://localhost:3103"

171
load_test.py Normal file
View file

@ -0,0 +1,171 @@
#!/usr/bin/env python3
"""
Load test for mkv or minikeyvalue.
Usage:
python3 load_test.py http://localhost:3000 # test mkv
python3 load_test.py http://localhost:3001 # test minikeyvalue
Options:
--keys N Number of keys to test (default: 1000)
--concurrency N Number of concurrent requests (default: 50)
--size N Value size in bytes (default: 1024)
"""
import argparse
import asyncio
import os
import time
import aiohttp
def make_value(size: int) -> bytes:
return os.urandom(size)
async def run_puts(session, base_url, keys, value, concurrency):
"""PUT all keys, return (total_time, errors)."""
sem = asyncio.Semaphore(concurrency)
errors = 0
async def put_one(key):
nonlocal errors
async with sem:
try:
async with session.put(f"{base_url}/{key}", data=value) as resp:
if resp.status not in (200, 201, 204):
errors += 1
except Exception:
errors += 1
start = time.monotonic()
await asyncio.gather(*(put_one(k) for k in keys))
elapsed = time.monotonic() - start
return elapsed, errors
async def run_gets(session, base_url, keys, concurrency, follow_redirects):
"""GET all keys, return (total_time, errors)."""
sem = asyncio.Semaphore(concurrency)
errors = 0
async def get_one(key):
nonlocal errors
async with sem:
try:
async with session.get(
f"{base_url}/{key}",
allow_redirects=follow_redirects,
) as resp:
if follow_redirects:
if resp.status != 200:
errors += 1
else:
await resp.read()
else:
# For redirect-based (mkv), 302 is success
if resp.status not in (200, 302):
errors += 1
except Exception:
errors += 1
start = time.monotonic()
await asyncio.gather(*(get_one(k) for k in keys))
elapsed = time.monotonic() - start
return elapsed, errors
async def run_deletes(session, base_url, keys, concurrency):
"""DELETE all keys, return (total_time, errors)."""
sem = asyncio.Semaphore(concurrency)
errors = 0
async def delete_one(key):
nonlocal errors
async with sem:
try:
async with session.delete(f"{base_url}/{key}") as resp:
if resp.status not in (200, 204):
errors += 1
except Exception:
errors += 1
start = time.monotonic()
await asyncio.gather(*(delete_one(k) for k in keys))
elapsed = time.monotonic() - start
return elapsed, errors
def print_result(label, count, elapsed, errors):
rps = count / elapsed if elapsed > 0 else 0
print(f" {label:12s} {elapsed:7.2f}s {rps:8.0f} req/s {errors} errors")
async def main():
parser = argparse.ArgumentParser(description="Load test mkv or minikeyvalue")
parser.add_argument("url", help="Base URL (e.g. http://localhost:3000)")
parser.add_argument("--keys", type=int, default=1000, help="Number of keys")
parser.add_argument("--concurrency", type=int, default=50, help="Concurrent requests")
parser.add_argument("--size", type=int, default=1024, help="Value size in bytes")
parser.add_argument(
"--follow-redirects", action="store_true",
help="Follow GET redirects (use for mkv to measure full round-trip)",
)
parser.add_argument(
"--prefix", default="loadtest",
help="Key prefix (use different prefixes to avoid collisions)",
)
args = parser.parse_args()
base = args.url.rstrip("/")
keys = [f"{args.prefix}/key-{i:06d}" for i in range(args.keys)]
value = make_value(args.size)
print(f"Target: {base}")
print(f"Keys: {args.keys}")
print(f"Concurrency: {args.concurrency}")
print(f"Value size: {args.size} bytes")
print(f"Follow redir:{args.follow_redirects}")
print()
conn = aiohttp.TCPConnector(limit=args.concurrency + 10)
async with aiohttp.ClientSession(connector=conn) as session:
# Warmup — check server is reachable
try:
async with session.get(base) as resp:
pass
except Exception as e:
print(f"ERROR: Cannot reach {base}: {e}")
return
# PUTs
put_time, put_err = await run_puts(session, base, keys, value, args.concurrency)
print_result("PUT", len(keys), put_time, put_err)
# GETs
get_time, get_err = await run_gets(
session, base, keys, args.concurrency, args.follow_redirects
)
print_result("GET", len(keys), get_time, get_err)
# Second GET pass (warm)
get2_time, get2_err = await run_gets(
session, base, keys, args.concurrency, args.follow_redirects
)
print_result("GET (warm)", len(keys), get2_time, get2_err)
# DELETEs
del_time, del_err = await run_deletes(session, base, keys, args.concurrency)
print_result("DELETE", len(keys), del_time, del_err)
print()
total = put_time + get_time + get2_time + del_time
total_ops = len(keys) * 4
print(f"Total: {total_ops} ops in {total:.2f}s ({total_ops / total:.0f} ops/s)")
print()
print("Note: PUT/DELETE throughput is bottlenecked by HTTP round-trips")
print("to volume servers (nginx), not by the index (SQLite/LevelDB).")
if __name__ == "__main__":
asyncio.run(main())

194
src/db.rs
View file

@ -1,7 +1,6 @@
use rusqlite::{params, Connection, OpenFlags};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
use crate::error::AppError;
@ -151,174 +150,73 @@ pub fn all_records(conn: &Connection) -> Result<Vec<Record>, AppError> {
Ok(records)
}
// --- Write commands ---
pub enum WriteCmd {
Put {
key: String,
volumes: Vec<String>,
size: Option<i64>,
reply: oneshot::Sender<Result<(), AppError>>,
},
Delete {
key: String,
reply: oneshot::Sender<Result<(), AppError>>,
},
BulkPut {
records: Vec<(String, Vec<String>, Option<i64>)>,
reply: oneshot::Sender<Result<(), AppError>>,
},
}
fn execute_cmd(
conn: &Connection,
cmd: WriteCmd,
) -> (Result<(), AppError>, oneshot::Sender<Result<(), AppError>>) {
match cmd {
WriteCmd::Put {
key,
volumes,
size,
reply,
} => {
let volumes_json = encode_volumes(&volumes);
let result = conn
.prepare_cached(
"INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3)
ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3",
)
.and_then(|mut s| s.execute(params![key, volumes_json, size]))
.map(|_| ())
.map_err(AppError::from);
(result, reply)
}
WriteCmd::Delete { key, reply } => {
let result = conn
.prepare_cached("DELETE FROM kv WHERE key = ?1")
.and_then(|mut s| s.execute(params![key]))
.map(|_| ())
.map_err(AppError::from);
(result, reply)
}
WriteCmd::BulkPut { records, reply } => {
let result = (|| -> Result<(), AppError> {
let mut stmt = conn.prepare_cached(
"INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3)
ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3",
)?;
for (key, volumes, size) in &records {
let volumes_json = encode_volumes(volumes);
stmt.execute(params![key, volumes_json, size])?;
}
Ok(())
})();
(result, reply)
}
}
}
// --- WriterHandle ---
#[derive(Clone)]
pub struct WriterHandle {
tx: mpsc::Sender<WriteCmd>,
conn: Arc<Mutex<Connection>>,
}
impl WriterHandle {
pub fn new(path: &str) -> Self {
let conn = open_readwrite(path);
create_tables(&conn);
Self {
conn: Arc::new(Mutex::new(conn)),
}
}
pub async fn put(
&self,
key: String,
volumes: Vec<String>,
size: Option<i64>,
) -> Result<(), AppError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx
.send(WriteCmd::Put {
key,
volumes,
size,
reply: reply_tx,
})
.await
.map_err(|_| AppError::WriterDead)?;
reply_rx.await.map_err(|_| AppError::WriterDroppedReply)?
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || {
let conn = conn.lock().unwrap();
let volumes_json = encode_volumes(&volumes);
conn.prepare_cached(
"INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3)
ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3",
)?
.execute(params![key, volumes_json, size])?;
Ok(())
})
.await
.unwrap()
}
pub async fn delete(&self, key: String) -> Result<(), AppError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx
.send(WriteCmd::Delete {
key,
reply: reply_tx,
})
.await
.map_err(|_| AppError::WriterDead)?;
reply_rx.await.map_err(|_| AppError::WriterDroppedReply)?
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || {
let conn = conn.lock().unwrap();
conn.prepare_cached("DELETE FROM kv WHERE key = ?1")?
.execute(params![key])?;
Ok(())
})
.await
.unwrap()
}
pub async fn bulk_put(
&self,
records: Vec<(String, Vec<String>, Option<i64>)>,
) -> Result<(), AppError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx
.send(WriteCmd::BulkPut {
records,
reply: reply_tx,
})
.await
.map_err(|_| AppError::WriterDead)?;
reply_rx.await.map_err(|_| AppError::WriterDroppedReply)?
let conn = self.conn.clone();
tokio::task::spawn_blocking(move || {
let conn = conn.lock().unwrap();
let mut stmt = conn.prepare_cached(
"INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3)
ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3",
)?;
for (key, volumes, size) in &records {
let volumes_json = encode_volumes(volumes);
stmt.execute(params![key, volumes_json, size])?;
}
Ok(())
})
.await
.unwrap()
}
}
// --- spawn_writer ---
pub fn spawn_writer(path: String) -> (WriterHandle, oneshot::Receiver<()>) {
let (tx, mut rx) = mpsc::channel::<WriteCmd>(4096);
let (ready_tx, ready_rx) = oneshot::channel();
std::thread::spawn(move || {
let conn = open_readwrite(&path);
create_tables(&conn);
let _ = ready_tx.send(());
loop {
let Some(first) = rx.blocking_recv() else {
break;
};
let mut batch = vec![first];
while batch.len() < 512 {
match rx.try_recv() {
Ok(cmd) => batch.push(cmd),
Err(_) => break,
}
}
let _ = conn.execute_batch("BEGIN");
let mut replies: Vec<(Result<(), AppError>, oneshot::Sender<Result<(), AppError>>)> =
Vec::with_capacity(batch.len());
for (i, cmd) in batch.into_iter().enumerate() {
let sp = format!("sp{i}");
let _ = conn.execute(&format!("SAVEPOINT {sp}"), []);
let (result, reply) = execute_cmd(&conn, cmd);
if result.is_ok() {
let _ = conn.execute(&format!("RELEASE {sp}"), []);
} else {
let _ = conn.execute(&format!("ROLLBACK TO {sp}"), []);
let _ = conn.execute(&format!("RELEASE {sp}"), []);
}
replies.push((result, reply));
}
let _ = conn.execute_batch("COMMIT");
for (result, reply) in replies {
let _ = reply.send(result);
}
}
});
(WriterHandle { tx }, ready_rx)
}

View file

@ -5,8 +5,6 @@ use axum::response::{IntoResponse, Response};
pub enum AppError {
NotFound,
Db(rusqlite::Error),
WriterDead,
WriterDroppedReply,
VolumeError(String),
NoHealthyVolume,
}
@ -25,8 +23,6 @@ impl std::fmt::Display for AppError {
match self {
AppError::NotFound => write!(f, "not found"),
AppError::Db(e) => write!(f, "database error: {e}"),
AppError::WriterDead => write!(f, "writer dead"),
AppError::WriterDroppedReply => write!(f, "writer dropped reply"),
AppError::VolumeError(msg) => write!(f, "volume error: {msg}"),
AppError::NoHealthyVolume => write!(f, "no healthy volume available"),
}

View file

@ -24,8 +24,7 @@ pub async fn build_app(config: config::Config) -> axum::Router {
});
}
let (writer, ready_rx) = db::spawn_writer(db_path.to_string());
ready_rx.await.expect("writer failed to initialize");
let writer = db::WriterHandle::new(db_path);
let num_readers = std::thread::available_parallelism()
.map(|n| n.get())

View file

@ -88,8 +88,7 @@ pub async fn run(config: &Config, dry_run: bool) {
}
// Open writer for updates
let (writer, ready_rx) = db::spawn_writer(db_path.to_string());
ready_rx.await.expect("writer failed to initialize");
let writer = db::WriterHandle::new(db_path);
let client = VolumeClient::new();
let mut moved = 0;

View file

@ -66,8 +66,7 @@ pub async fn run(config: &Config) {
let _ = std::fs::remove_file(format!("{db_path}-wal"));
let _ = std::fs::remove_file(format!("{db_path}-shm"));
let (writer, ready_rx) = db::spawn_writer(db_path.to_string());
ready_rx.await.expect("writer failed to initialize");
let writer = db::WriterHandle::new(db_path);
let volume_urls = config.volume_urls();