Purify
This commit is contained in:
parent
1d3b9dddf5
commit
fa4dc716db
2 changed files with 161 additions and 21 deletions
80
src/db.rs
80
src/db.rs
|
|
@ -30,6 +30,20 @@ fn encode_volumes(v: &[String]) -> String {
|
||||||
serde_json::to_string(v).unwrap()
|
serde_json::to_string(v).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pure: compute exclusive upper bound for prefix range queries.
|
||||||
|
/// Increments the last byte that isn't 0xFF.
|
||||||
|
/// Examples: "abc" -> Some("abd"), "ab\xff" -> Some("ac"), "\xff\xff" -> None
|
||||||
|
pub fn prefix_upper_bound(prefix: &str) -> Option<String> {
|
||||||
|
let mut bytes = prefix.as_bytes().to_vec();
|
||||||
|
while let Some(last) = bytes.pop() {
|
||||||
|
if last < 0xFF {
|
||||||
|
bytes.push(last + 1);
|
||||||
|
return Some(String::from_utf8_lossy(&bytes).into_owned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
/// A single SQLite connection behind a mutex, used for both reads and writes.
|
/// A single SQLite connection behind a mutex, used for both reads and writes.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Db {
|
pub struct Db {
|
||||||
|
|
@ -93,19 +107,7 @@ impl Db {
|
||||||
.collect::<Result<Vec<String>, _>>()?;
|
.collect::<Result<Vec<String>, _>>()?;
|
||||||
return Ok(keys);
|
return Ok(keys);
|
||||||
}
|
}
|
||||||
// Compute exclusive upper bound: increment last non-0xFF byte
|
let upper = prefix_upper_bound(&prefix);
|
||||||
let upper = {
|
|
||||||
let mut bytes = prefix.as_bytes().to_vec();
|
|
||||||
let mut result = None;
|
|
||||||
while let Some(last) = bytes.pop() {
|
|
||||||
if last < 0xFF {
|
|
||||||
bytes.push(last + 1);
|
|
||||||
result = Some(String::from_utf8_lossy(&bytes).into_owned());
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
result
|
|
||||||
};
|
|
||||||
let keys = match &upper {
|
let keys = match &upper {
|
||||||
Some(end) => {
|
Some(end) => {
|
||||||
let mut stmt = conn.prepare_cached(
|
let mut stmt = conn.prepare_cached(
|
||||||
|
|
@ -198,3 +200,55 @@ impl Db {
|
||||||
Ok(records)
|
Ok(records)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_prefix_upper_bound_simple() {
|
||||||
|
assert_eq!(prefix_upper_bound("abc"), Some("abd".to_string()));
|
||||||
|
assert_eq!(prefix_upper_bound("a"), Some("b".to_string()));
|
||||||
|
assert_eq!(prefix_upper_bound("foo/bar"), Some("foo/bas".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_prefix_upper_bound_paths() {
|
||||||
|
// '/' is 0x2F, +1 = '0' (0x30)
|
||||||
|
assert_eq!(prefix_upper_bound("users/"), Some("users0".to_string()));
|
||||||
|
// '_' is 0x5F, +1 = '`' (0x60)
|
||||||
|
assert_eq!(prefix_upper_bound("img_"), Some("img`".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_prefix_upper_bound_empty() {
|
||||||
|
assert_eq!(prefix_upper_bound(""), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_prefix_upper_bound_single_char() {
|
||||||
|
assert_eq!(prefix_upper_bound("z"), Some("{".to_string())); // 'z' + 1 = '{'
|
||||||
|
assert_eq!(prefix_upper_bound("9"), Some(":".to_string())); // '9' + 1 = ':'
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_prefix_upper_bound_range_correctness() {
|
||||||
|
// Verify the bound works for range queries:
|
||||||
|
// All strings starting with "foo" should be >= "foo" and < "fop"
|
||||||
|
let prefix = "foo";
|
||||||
|
let upper = prefix_upper_bound(prefix).unwrap();
|
||||||
|
assert_eq!(upper, "fop");
|
||||||
|
|
||||||
|
let upper = upper.as_str();
|
||||||
|
|
||||||
|
// These should be in range [foo, fop)
|
||||||
|
assert!("foo" >= prefix && "foo" < upper);
|
||||||
|
assert!("foo/bar" >= prefix && "foo/bar" < upper);
|
||||||
|
assert!("foobar" >= prefix && "foobar" < upper);
|
||||||
|
assert!("foo\x7f" >= prefix && "foo\x7f" < upper); // high ASCII
|
||||||
|
|
||||||
|
// These should be out of range
|
||||||
|
assert!("fop" >= upper);
|
||||||
|
assert!("fon" < prefix);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
102
src/rebuild.rs
102
src/rebuild.rs
|
|
@ -12,6 +12,25 @@ struct NginxEntry {
|
||||||
size: Option<i64>,
|
size: Option<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pure: merge volume scan results into a unified index.
|
||||||
|
/// Each scan is (volume_url, list of (key, size) pairs).
|
||||||
|
/// Returns a map of key -> (volumes containing it, max size seen).
|
||||||
|
pub fn merge_volume_scans(
|
||||||
|
scans: &[(String, Vec<(String, i64)>)],
|
||||||
|
) -> HashMap<String, (Vec<String>, i64)> {
|
||||||
|
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
|
||||||
|
for (vol_url, keys) in scans {
|
||||||
|
for (key, size) in keys {
|
||||||
|
let entry = index.entry(key.clone()).or_insert_with(|| (Vec::new(), *size));
|
||||||
|
entry.0.push(vol_url.clone());
|
||||||
|
if *size > entry.1 {
|
||||||
|
entry.1 = *size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
index
|
||||||
|
}
|
||||||
|
|
||||||
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
|
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
|
||||||
let http = reqwest::Client::new();
|
let http = reqwest::Client::new();
|
||||||
let mut keys = Vec::new();
|
let mut keys = Vec::new();
|
||||||
|
|
@ -57,25 +76,23 @@ pub async fn run(args: &Args) {
|
||||||
let _ = std::fs::remove_file(format!("{db_path}-shm"));
|
let _ = std::fs::remove_file(format!("{db_path}-shm"));
|
||||||
|
|
||||||
let db = db::Db::new(db_path);
|
let db = db::Db::new(db_path);
|
||||||
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
|
|
||||||
|
|
||||||
|
// I/O: scan each volume
|
||||||
|
let mut scans = Vec::new();
|
||||||
for vol_url in &args.volumes {
|
for vol_url in &args.volumes {
|
||||||
eprintln!("Scanning {vol_url}...");
|
eprintln!("Scanning {vol_url}...");
|
||||||
match list_volume_keys(vol_url).await {
|
match list_volume_keys(vol_url).await {
|
||||||
Ok(keys) => {
|
Ok(keys) => {
|
||||||
eprintln!(" Found {} keys", keys.len());
|
eprintln!(" Found {} keys", keys.len());
|
||||||
for (key, size) in keys {
|
scans.push((vol_url.clone(), keys));
|
||||||
let entry = index.entry(key).or_insert_with(|| (Vec::new(), size));
|
|
||||||
entry.0.push(vol_url.clone());
|
|
||||||
if size > entry.1 {
|
|
||||||
entry.1 = size;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"),
|
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pure: merge scan results
|
||||||
|
let index = merge_volume_scans(&scans);
|
||||||
|
|
||||||
let records: Vec<_> = index
|
let records: Vec<_> = index
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(k, (v, s))| (k, v, Some(s)))
|
.map(|(k, (v, s))| (k, v, Some(s)))
|
||||||
|
|
@ -84,3 +101,72 @@ pub async fn run(args: &Args) {
|
||||||
db.bulk_put(records).await.expect("bulk_put failed");
|
db.bulk_put(records).await.expect("bulk_put failed");
|
||||||
eprintln!("Rebuilt index with {count} keys");
|
eprintln!("Rebuilt index with {count} keys");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_merge_empty_scans() {
|
||||||
|
let scans: Vec<(String, Vec<(String, i64)>)> = vec![];
|
||||||
|
let index = merge_volume_scans(&scans);
|
||||||
|
assert!(index.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_merge_single_volume() {
|
||||||
|
let scans = vec![(
|
||||||
|
"http://vol1".to_string(),
|
||||||
|
vec![
|
||||||
|
("key1".to_string(), 100),
|
||||||
|
("key2".to_string(), 200),
|
||||||
|
],
|
||||||
|
)];
|
||||||
|
let index = merge_volume_scans(&scans);
|
||||||
|
assert_eq!(index.len(), 2);
|
||||||
|
assert_eq!(index.get("key1"), Some(&(vec!["http://vol1".to_string()], 100)));
|
||||||
|
assert_eq!(index.get("key2"), Some(&(vec!["http://vol1".to_string()], 200)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_merge_key_on_multiple_volumes() {
|
||||||
|
let scans = vec![
|
||||||
|
("http://vol1".to_string(), vec![("shared".to_string(), 100)]),
|
||||||
|
("http://vol2".to_string(), vec![("shared".to_string(), 100)]),
|
||||||
|
("http://vol3".to_string(), vec![("shared".to_string(), 100)]),
|
||||||
|
];
|
||||||
|
let index = merge_volume_scans(&scans);
|
||||||
|
assert_eq!(index.len(), 1);
|
||||||
|
let (volumes, size) = index.get("shared").unwrap();
|
||||||
|
assert_eq!(volumes.len(), 3);
|
||||||
|
assert!(volumes.contains(&"http://vol1".to_string()));
|
||||||
|
assert!(volumes.contains(&"http://vol2".to_string()));
|
||||||
|
assert!(volumes.contains(&"http://vol3".to_string()));
|
||||||
|
assert_eq!(*size, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_merge_takes_max_size() {
|
||||||
|
// Same key with different sizes on different volumes (corruption or update race)
|
||||||
|
let scans = vec![
|
||||||
|
("http://vol1".to_string(), vec![("key".to_string(), 50)]),
|
||||||
|
("http://vol2".to_string(), vec![("key".to_string(), 200)]),
|
||||||
|
("http://vol3".to_string(), vec![("key".to_string(), 100)]),
|
||||||
|
];
|
||||||
|
let index = merge_volume_scans(&scans);
|
||||||
|
let (_, size) = index.get("key").unwrap();
|
||||||
|
assert_eq!(*size, 200, "should take maximum size across volumes");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_merge_disjoint_keys() {
|
||||||
|
let scans = vec![
|
||||||
|
("http://vol1".to_string(), vec![("a".to_string(), 10)]),
|
||||||
|
("http://vol2".to_string(), vec![("b".to_string(), 20)]),
|
||||||
|
];
|
||||||
|
let index = merge_volume_scans(&scans);
|
||||||
|
assert_eq!(index.len(), 2);
|
||||||
|
assert_eq!(index.get("a").unwrap().0, vec!["http://vol1".to_string()]);
|
||||||
|
assert_eq!(index.get("b").unwrap().0, vec!["http://vol2".to_string()]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue