#!/usr/bin/env python3 """ Load test for mkv or minikeyvalue. Usage: python3 load_test.py http://localhost:3000 # test mkv python3 load_test.py http://localhost:3001 # test minikeyvalue Options: --keys N Number of keys to test (default: 1000) --concurrency N Number of concurrent requests (default: 50) --size N Value size in bytes (default: 1024) """ import argparse import asyncio import os import time import aiohttp def make_value(size: int) -> bytes: return os.urandom(size) async def run_puts(session, base_url, keys, value, concurrency): """PUT all keys, return (total_time, errors).""" sem = asyncio.Semaphore(concurrency) errors = 0 async def put_one(key): nonlocal errors async with sem: try: async with session.put(f"{base_url}/{key}", data=value) as resp: if resp.status not in (200, 201, 204): errors += 1 except Exception: errors += 1 start = time.monotonic() await asyncio.gather(*(put_one(k) for k in keys)) elapsed = time.monotonic() - start return elapsed, errors async def run_gets(session, base_url, keys, concurrency, follow_redirects): """GET all keys, return (total_time, errors).""" sem = asyncio.Semaphore(concurrency) errors = 0 async def get_one(key): nonlocal errors async with sem: try: async with session.get( f"{base_url}/{key}", allow_redirects=follow_redirects, ) as resp: if follow_redirects: if resp.status != 200: errors += 1 else: await resp.read() else: # For redirect-based (mkv), 302 is success if resp.status not in (200, 302): errors += 1 except Exception: errors += 1 start = time.monotonic() await asyncio.gather(*(get_one(k) for k in keys)) elapsed = time.monotonic() - start return elapsed, errors async def run_deletes(session, base_url, keys, concurrency): """DELETE all keys, return (total_time, errors).""" sem = asyncio.Semaphore(concurrency) errors = 0 async def delete_one(key): nonlocal errors async with sem: try: async with session.delete(f"{base_url}/{key}") as resp: if resp.status not in (200, 204): errors += 1 except Exception: errors += 1 start = time.monotonic() await asyncio.gather(*(delete_one(k) for k in keys)) elapsed = time.monotonic() - start return elapsed, errors def print_result(label, count, elapsed, errors): rps = count / elapsed if elapsed > 0 else 0 print(f" {label:12s} {elapsed:7.2f}s {rps:8.0f} req/s {errors} errors") async def main(): parser = argparse.ArgumentParser(description="Load test mkv or minikeyvalue") parser.add_argument("url", help="Base URL (e.g. http://localhost:3000)") parser.add_argument("--keys", type=int, default=1000, help="Number of keys") parser.add_argument("--concurrency", type=int, default=50, help="Concurrent requests") parser.add_argument("--size", type=int, default=1024, help="Value size in bytes") parser.add_argument( "--follow-redirects", action="store_true", help="Follow GET redirects (use for mkv to measure full round-trip)", ) parser.add_argument( "--prefix", default="loadtest", help="Key prefix (use different prefixes to avoid collisions)", ) args = parser.parse_args() base = args.url.rstrip("/") keys = [f"{args.prefix}/key-{i:06d}" for i in range(args.keys)] value = make_value(args.size) print(f"Target: {base}") print(f"Keys: {args.keys}") print(f"Concurrency: {args.concurrency}") print(f"Value size: {args.size} bytes") print(f"Follow redir:{args.follow_redirects}") print() conn = aiohttp.TCPConnector(limit=args.concurrency + 10) async with aiohttp.ClientSession(connector=conn) as session: # Warmup — check server is reachable try: async with session.get(base) as resp: pass except Exception as e: print(f"ERROR: Cannot reach {base}: {e}") return # PUTs put_time, put_err = await run_puts(session, base, keys, value, args.concurrency) print_result("PUT", len(keys), put_time, put_err) # GETs get_time, get_err = await run_gets( session, base, keys, args.concurrency, args.follow_redirects ) print_result("GET", len(keys), get_time, get_err) # Second GET pass (warm) get2_time, get2_err = await run_gets( session, base, keys, args.concurrency, args.follow_redirects ) print_result("GET (warm)", len(keys), get2_time, get2_err) # DELETEs del_time, del_err = await run_deletes(session, base, keys, args.concurrency) print_result("DELETE", len(keys), del_time, del_err) print() total = put_time + get_time + get2_time + del_time total_ops = len(keys) * 4 print(f"Total: {total_ops} ops in {total:.2f}s ({total_ops / total:.0f} ops/s)") print() print("Note: PUT/DELETE throughput is bottlenecked by HTTP round-trips") print("to volume servers (nginx), not by the index (SQLite/LevelDB).") if __name__ == "__main__": asyncio.run(main())