diff --git a/src/db.rs b/src/db.rs index 7f76025..0e6386b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -11,7 +11,6 @@ use crate::error::AppError; pub struct Record { pub key: String, pub volumes: Vec, - pub path: String, pub size: Option, } @@ -63,12 +62,9 @@ fn create_tables(conn: &Connection) { CREATE TABLE IF NOT EXISTS kv ( key TEXT PRIMARY KEY, volumes TEXT NOT NULL, - path TEXT NOT NULL, size INTEGER, - created_at INTEGER DEFAULT (unixepoch()), - deleted INTEGER DEFAULT 0 + created_at INTEGER DEFAULT (unixepoch()) ); - CREATE INDEX IF NOT EXISTS idx_kv_deleted ON kv(deleted); ", ) .expect("failed to create tables"); @@ -120,22 +116,19 @@ impl ReadPool { // --- Read query functions --- pub fn get(conn: &Connection, key: &str) -> Result { - let mut stmt = - conn.prepare_cached("SELECT key, volumes, path, size FROM kv WHERE key = ?1 AND deleted = 0")?; + let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv WHERE key = ?1")?; Ok(stmt.query_row(params![key], |row| { let volumes_json: String = row.get(1)?; Ok(Record { key: row.get(0)?, volumes: parse_volumes(&volumes_json), - path: row.get(2)?, - size: row.get(3)?, + size: row.get(2)?, }) })?) } pub fn list_keys(conn: &Connection, prefix: &str) -> Result, AppError> { - let mut stmt = - conn.prepare_cached("SELECT key FROM kv WHERE key LIKE ?1 AND deleted = 0 ORDER BY key")?; + let mut stmt = conn.prepare_cached("SELECT key FROM kv WHERE key LIKE ?1 ORDER BY key")?; let pattern = format!("{prefix}%"); let keys = stmt .query_map(params![pattern], |row| row.get(0))? @@ -144,16 +137,14 @@ pub fn list_keys(conn: &Connection, prefix: &str) -> Result, AppErro } pub fn all_records(conn: &Connection) -> Result, AppError> { - let mut stmt = - conn.prepare_cached("SELECT key, volumes, path, size FROM kv WHERE deleted = 0")?; + let mut stmt = conn.prepare_cached("SELECT key, volumes, size FROM kv")?; let records = stmt .query_map([], |row| { let volumes_json: String = row.get(1)?; Ok(Record { key: row.get(0)?, volumes: parse_volumes(&volumes_json), - path: row.get(2)?, - size: row.get(3)?, + size: row.get(2)?, }) })? .collect::, _>>()?; @@ -166,7 +157,6 @@ pub enum WriteCmd { Put { key: String, volumes: Vec, - path: String, size: Option, reply: oneshot::Sender>, }, @@ -175,27 +165,29 @@ pub enum WriteCmd { reply: oneshot::Sender>, }, BulkPut { - records: Vec<(String, Vec, String, Option)>, + records: Vec<(String, Vec, Option)>, reply: oneshot::Sender>, }, } -fn execute_cmd(conn: &Connection, cmd: WriteCmd) -> (Result<(), AppError>, oneshot::Sender>) { +fn execute_cmd( + conn: &Connection, + cmd: WriteCmd, +) -> (Result<(), AppError>, oneshot::Sender>) { match cmd { WriteCmd::Put { key, volumes, - path, size, reply, } => { let volumes_json = encode_volumes(&volumes); let result = conn .prepare_cached( - "INSERT INTO kv (key, volumes, path, size) VALUES (?1, ?2, ?3, ?4) - ON CONFLICT(key) DO UPDATE SET volumes = ?2, path = ?3, size = ?4, deleted = 0", + "INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3) + ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", ) - .and_then(|mut s| s.execute(params![key, volumes_json, path, size])) + .and_then(|mut s| s.execute(params![key, volumes_json, size])) .map(|_| ()) .map_err(AppError::from); (result, reply) @@ -211,12 +203,12 @@ fn execute_cmd(conn: &Connection, cmd: WriteCmd) -> (Result<(), AppError>, onesh WriteCmd::BulkPut { records, reply } => { let result = (|| -> Result<(), AppError> { let mut stmt = conn.prepare_cached( - "INSERT INTO kv (key, volumes, path, size) VALUES (?1, ?2, ?3, ?4) - ON CONFLICT(key) DO UPDATE SET volumes = ?2, path = ?3, size = ?4, deleted = 0", + "INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3) + ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3", )?; - for (key, volumes, path, size) in &records { + for (key, volumes, size) in &records { let volumes_json = encode_volumes(volumes); - stmt.execute(params![key, volumes_json, path, size])?; + stmt.execute(params![key, volumes_json, size])?; } Ok(()) })(); @@ -237,7 +229,6 @@ impl WriterHandle { &self, key: String, volumes: Vec, - path: String, size: Option, ) -> Result<(), AppError> { let (reply_tx, reply_rx) = oneshot::channel(); @@ -245,7 +236,6 @@ impl WriterHandle { .send(WriteCmd::Put { key, volumes, - path, size, reply: reply_tx, }) @@ -268,7 +258,7 @@ impl WriterHandle { pub async fn bulk_put( &self, - records: Vec<(String, Vec, String, Option)>, + records: Vec<(String, Vec, Option)>, ) -> Result<(), AppError> { let (reply_tx, reply_rx) = oneshot::channel(); self.tx diff --git a/src/hasher.rs b/src/hasher.rs index e659b20..32cbffb 100644 --- a/src/hasher.rs +++ b/src/hasher.rs @@ -51,12 +51,6 @@ impl Ring { result } - - /// Compute the content-addressed path for a key: /{first2}/{next2}/{full_hex} - pub fn key_path(key: &str) -> String { - let hex = hex_hash(key); - format!("/{}/{}/{}", &hex[..2], &hex[2..4], &hex) - } } fn hash_key(input: &str) -> u64 { @@ -64,20 +58,6 @@ fn hash_key(input: &str) -> u64 { u64::from_be_bytes(hash[..8].try_into().unwrap()) } -fn hex_hash(input: &str) -> String { - let hash = Sha256::digest(input.as_bytes()); - hex_encode(&hash) -} - -fn hex_encode(bytes: &[u8]) -> String { - let mut s = String::with_capacity(bytes.len() * 2); - for b in bytes { - use std::fmt::Write; - write!(s, "{b:02x}").unwrap(); - } - s -} - #[cfg(test)] mod tests { use super::*; @@ -87,7 +67,6 @@ mod tests { let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect(); let ring = Ring::new(&volumes, 100); - // All 3 volumes should appear in results let selected = ring.get_volumes("test-key", 3); assert_eq!(selected.len(), 3); for v in &volumes { @@ -110,18 +89,10 @@ mod tests { let volumes: Vec = (1..=2).map(|i| format!("http://vol{i}:300{i}")).collect(); let ring = Ring::new(&volumes, 100); - // Requesting more than available should return what's available let selected = ring.get_volumes("key", 5); assert_eq!(selected.len(), 2); } - #[test] - fn test_key_path_format() { - let path = Ring::key_path("hello"); - // SHA256("hello") = 2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824 - assert_eq!(path, "/2c/f2/2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"); - } - #[test] fn test_ring_even_distribution() { let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}:300{i}")).collect(); @@ -134,7 +105,6 @@ mod tests { *counts.entry(primary.clone()).or_insert(0u32) += 1; } - // Each volume should get roughly 1000 keys (±30%) for (vol, count) in &counts { assert!( *count > 700 && *count < 1300, diff --git a/src/server.rs b/src/server.rs index 45c2fd7..13c7d43 100644 --- a/src/server.rs +++ b/src/server.rs @@ -35,14 +35,8 @@ fn pick_healthy_volume<'a>( .map(|v| v.as_str()) } -/// Compute the placement plan for a PUT: which path and which volumes. -struct PutPlan { - path: String, - target_volumes: Vec, -} - -fn plan_put(ring: &Ring, key: &str, replication: usize) -> Result { - let path = Ring::key_path(key); +/// Select target volumes for a key, ensuring we have enough for replication. +fn select_volumes(ring: &Ring, key: &str, replication: usize) -> Result, AppError> { let target_volumes = ring.get_volumes(key, replication); if target_volumes.len() < replication { return Err(AppError::VolumeError(format!( @@ -50,15 +44,15 @@ fn plan_put(ring: &Ring, key: &str, replication: usize) -> Result>, volumes: &[String]) -> Result, Vec> { +/// Evaluate fan-out results. Returns the list of volumes that succeeded, +/// or the error messages if any failed. +fn evaluate_fanout( + results: Vec>, + volumes: &[String], +) -> Result, Vec> { let mut succeeded = Vec::new(); let mut errors = Vec::new(); for (i, result) in results.into_iter().enumerate() { @@ -91,7 +85,7 @@ pub async fn get_key( let healthy = state.healthy_volumes.read().await; let vol = pick_healthy_volume(&record.volumes, &healthy).ok_or(AppError::NoHealthyVolume)?; - let location = format!("{vol}{}", record.path); + let location = format!("{vol}/{key}"); Ok(( StatusCode::FOUND, @@ -106,21 +100,20 @@ pub async fn put_key( Path(key): Path, body: Bytes, ) -> Result { - let plan = { + let target_volumes = { let ring = state.ring.read().await; - plan_put(&ring, &key, state.config.server.replication_factor)? + select_volumes(&ring, &key, state.config.server.replication_factor)? }; // Fan out PUTs to all target volumes concurrently - let mut handles = Vec::with_capacity(plan.target_volumes.len()); - for vol in &plan.target_volumes { + let mut handles = Vec::with_capacity(target_volumes.len()); + for vol in &target_volumes { let client = state.volume_client.clone(); let vol = vol.clone(); - let path = plan.path.clone(); let key = key.clone(); let data = body.clone(); handles.push(tokio::spawn(async move { - client.put(&vol, &path, &key, data).await + client.put(&vol, &key, data).await })); } @@ -129,13 +122,10 @@ pub async fn put_key( results.push(handle.await.unwrap()); } - match evaluate_fanout(results, &plan.target_volumes) { + match evaluate_fanout(results, &target_volumes) { Ok(succeeded_volumes) => { let size = Some(body.len() as i64); - state - .writer - .put(key, succeeded_volumes, plan.path, size) - .await?; + state.writer.put(key, succeeded_volumes, size).await?; Ok(StatusCode::CREATED.into_response()) } Err(errors) => { @@ -143,8 +133,8 @@ pub async fn put_key( tracing::error!("PUT to volume failed: {e}"); } // Rollback: best-effort delete from any volumes that may have succeeded - for vol in &plan.target_volumes { - let _ = state.volume_client.delete(vol, &plan.path).await; + for vol in &target_volumes { + let _ = state.volume_client.delete(vol, &key).await; } Err(AppError::VolumeError( "not all volume writes succeeded".into(), @@ -171,9 +161,9 @@ pub async fn delete_key( for vol in &record.volumes { let client = state.volume_client.clone(); let vol = vol.clone(); - let path = record.path.clone(); + let key = key.clone(); handles.push(tokio::spawn( - async move { client.delete(&vol, &path).await }, + async move { client.delete(&vol, &key).await }, )); } @@ -257,27 +247,24 @@ mod tests { let volumes = vec!["http://vol1".into(), "http://vol2".into()]; let healthy: HashSet = ["http://vol1".into(), "http://vol2".into()].into(); - // Should return the first one assert_eq!(pick_healthy_volume(&volumes, &healthy), Some("http://vol1")); } #[test] - fn test_plan_put_sufficient_volumes() { + fn test_select_volumes_sufficient() { let volumes: Vec = (1..=3).map(|i| format!("http://vol{i}")).collect(); let ring = Ring::new(&volumes, 50); - let plan = plan_put(&ring, "test-key", 2).unwrap(); - assert_eq!(plan.target_volumes.len(), 2); - assert!(!plan.path.is_empty()); + let selected = select_volumes(&ring, "test-key", 2).unwrap(); + assert_eq!(selected.len(), 2); } #[test] - fn test_plan_put_insufficient_volumes() { + fn test_select_volumes_insufficient() { let volumes: Vec = vec!["http://vol1".into()]; let ring = Ring::new(&volumes, 50); - let result = plan_put(&ring, "test-key", 2); - assert!(result.is_err()); + assert!(select_volumes(&ring, "test-key", 2).is_err()); } #[test] @@ -285,8 +272,7 @@ mod tests { let volumes = vec!["http://vol1".into(), "http://vol2".into()]; let results = vec![Ok(()), Ok(())]; - let outcome = evaluate_fanout(results, &volumes); - assert_eq!(outcome.unwrap(), volumes); + assert_eq!(evaluate_fanout(results, &volumes).unwrap(), volumes); } #[test] @@ -294,10 +280,8 @@ mod tests { let volumes = vec!["http://vol1".into(), "http://vol2".into()]; let results = vec![Ok(()), Err("connection refused".into())]; - let outcome = evaluate_fanout(results, &volumes); - let errors = outcome.unwrap_err(); - assert_eq!(errors.len(), 1); - assert_eq!(errors[0], "connection refused"); + let errors = evaluate_fanout(results, &volumes).unwrap_err(); + assert_eq!(errors, vec!["connection refused"]); } #[test] @@ -305,7 +289,6 @@ mod tests { let volumes = vec!["http://vol1".into(), "http://vol2".into()]; let results = vec![Err("err1".into()), Err("err2".into())]; - let outcome = evaluate_fanout(results, &volumes); - assert_eq!(outcome.unwrap_err().len(), 2); + assert_eq!(evaluate_fanout(results, &volumes).unwrap_err().len(), 2); } } diff --git a/src/volume.rs b/src/volume.rs index e57d334..8a1aa1c 100644 --- a/src/volume.rs +++ b/src/volume.rs @@ -17,16 +17,9 @@ impl VolumeClient { Self { client } } - /// PUT a blob to a volume server. Also writes a .key sidecar file - /// so the key can be recovered during rebuild. - pub async fn put( - &self, - volume_url: &str, - path: &str, - key: &str, - data: Bytes, - ) -> Result<(), String> { - let url = format!("{volume_url}{path}"); + /// PUT a blob to a volume server at /{key}. + pub async fn put(&self, volume_url: &str, key: &str, data: Bytes) -> Result<(), String> { + let url = format!("{volume_url}/{key}"); let resp = self .client .put(&url) @@ -39,26 +32,12 @@ impl VolumeClient { return Err(format!("PUT {url}: status {}", resp.status())); } - // Write .key sidecar with the original key name - let key_url = format!("{volume_url}{path}.key"); - let resp = self - .client - .put(&key_url) - .body(key.to_string()) - .send() - .await - .map_err(|e| format!("PUT {key_url}: {e}"))?; - - if !resp.status().is_success() { - return Err(format!("PUT {key_url}: status {}", resp.status())); - } - Ok(()) } /// GET a blob from a volume server. - pub async fn get(&self, volume_url: &str, path: &str) -> Result { - let url = format!("{volume_url}{path}"); + pub async fn get(&self, volume_url: &str, key: &str) -> Result { + let url = format!("{volume_url}/{key}"); let resp = self .client .get(&url) @@ -78,9 +57,9 @@ impl VolumeClient { .map_err(|e| format!("GET {url} body: {e}")) } - /// DELETE a blob (and its .key sidecar) from a volume server. - pub async fn delete(&self, volume_url: &str, path: &str) -> Result<(), String> { - let url = format!("{volume_url}{path}"); + /// DELETE a blob from a volume server. + pub async fn delete(&self, volume_url: &str, key: &str) -> Result<(), String> { + let url = format!("{volume_url}/{key}"); let resp = self .client .delete(&url) @@ -93,10 +72,6 @@ impl VolumeClient { return Err(format!("DELETE {url}: status {}", resp.status())); } - // Best-effort delete the .key sidecar - let key_url = format!("{volume_url}{path}.key"); - let _ = self.client.delete(&key_url).send().await; - Ok(()) }