use std::collections::HashMap; use crate::Args; use crate::db; #[derive(serde::Deserialize)] struct NginxEntry { name: String, #[serde(rename = "type")] entry_type: String, #[serde(default)] size: Option, } /// If a key has different sizes across volumes, takes the max. pub fn merge_volume_scans( scans: &[(String, Vec<(String, i64)>)], ) -> HashMap, i64)> { let mut index: HashMap, i64)> = HashMap::new(); for (vol_url, keys) in scans { for (key, size) in keys { let entry = index .entry(key.clone()) .or_insert_with(|| (Vec::new(), *size)); entry.0.push(vol_url.clone()); if *size > entry.1 { entry.1 = *size; } } } index } async fn list_volume_keys(volume_url: &str) -> Result, String> { let http = reqwest::Client::new(); let mut keys = Vec::new(); let mut dirs = vec![String::new()]; while let Some(prefix) = dirs.pop() { let url = format!("{volume_url}/{prefix}"); let resp = http .get(&url) .send() .await .map_err(|e| format!("GET {url}: {e}"))?; if !resp.status().is_success() { return Err(format!("GET {url}: status {}", resp.status())); } let entries: Vec = resp.json().await.map_err(|e| format!("parse {url}: {e}"))?; for entry in entries { let full_path = if prefix.is_empty() { entry.name.clone() } else { format!("{prefix}{}", entry.name) }; match entry.entry_type.as_str() { "directory" => dirs.push(format!("{full_path}/")), "file" => keys.push((full_path, entry.size.unwrap_or(0))), _ => {} } } } Ok(keys) } pub async fn run(args: &Args) { let db_path = &args.db_path; if let Some(parent) = std::path::Path::new(db_path).parent() { let _ = std::fs::create_dir_all(parent); } let _ = std::fs::remove_file(db_path); let _ = std::fs::remove_file(format!("{db_path}-wal")); let _ = std::fs::remove_file(format!("{db_path}-shm")); let db = db::Db::new(db_path); let mut scans = Vec::new(); for vol_url in &args.volumes { eprintln!("Scanning {vol_url}..."); match list_volume_keys(vol_url).await { Ok(keys) => { eprintln!(" Found {} keys", keys.len()); scans.push((vol_url.clone(), keys)); } Err(e) => eprintln!(" Error scanning {vol_url}: {e}"), } } let index = merge_volume_scans(&scans); let records: Vec<_> = index .into_iter() .map(|(k, (v, s))| (k, v, Some(s))) .collect(); let count = records.len(); db.bulk_put(records).await.expect("bulk_put failed"); eprintln!("Rebuilt index with {count} keys"); } #[cfg(test)] mod tests { use super::*; #[test] fn test_merge_takes_max_size() { // Can happen due to incomplete writes or corruption let scans = vec![ ("http://vol1".to_string(), vec![("key".to_string(), 50)]), ("http://vol2".to_string(), vec![("key".to_string(), 200)]), ("http://vol3".to_string(), vec![("key".to_string(), 100)]), ]; let index = merge_volume_scans(&scans); let (volumes, size) = index.get("key").unwrap(); assert_eq!(volumes.len(), 3); assert_eq!(*size, 200, "should take maximum size across volumes"); } }