mkv/src/rebuild.rs
2026-03-08 17:55:37 +01:00

120 lines
3.6 KiB
Rust

use std::collections::HashMap;
use crate::Args;
use crate::db;
#[derive(serde::Deserialize)]
struct NginxEntry {
name: String,
#[serde(rename = "type")]
entry_type: String,
#[serde(default)]
size: Option<i64>,
}
/// If a key has different sizes across volumes, takes the max.
pub fn merge_volume_scans(
scans: &[(String, Vec<(String, i64)>)],
) -> HashMap<String, (Vec<String>, i64)> {
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
for (vol_url, keys) in scans {
for (key, size) in keys {
let entry = index
.entry(key.clone())
.or_insert_with(|| (Vec::new(), *size));
entry.0.push(vol_url.clone());
if *size > entry.1 {
entry.1 = *size;
}
}
}
index
}
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
let http = reqwest::Client::new();
let mut keys = Vec::new();
let mut dirs = vec![String::new()];
while let Some(prefix) = dirs.pop() {
let url = format!("{volume_url}/{prefix}");
let resp = http
.get(&url)
.send()
.await
.map_err(|e| format!("GET {url}: {e}"))?;
if !resp.status().is_success() {
return Err(format!("GET {url}: status {}", resp.status()));
}
let entries: Vec<NginxEntry> =
resp.json().await.map_err(|e| format!("parse {url}: {e}"))?;
for entry in entries {
let full_path = if prefix.is_empty() {
entry.name.clone()
} else {
format!("{prefix}{}", entry.name)
};
match entry.entry_type.as_str() {
"directory" => dirs.push(format!("{full_path}/")),
"file" => keys.push((full_path, entry.size.unwrap_or(0))),
_ => {}
}
}
}
Ok(keys)
}
pub async fn run(args: &Args) {
let db_path = &args.db_path;
if let Some(parent) = std::path::Path::new(db_path).parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::remove_file(db_path);
let _ = std::fs::remove_file(format!("{db_path}-wal"));
let _ = std::fs::remove_file(format!("{db_path}-shm"));
let db = db::Db::new(db_path);
let mut scans = Vec::new();
for vol_url in &args.volumes {
eprintln!("Scanning {vol_url}...");
match list_volume_keys(vol_url).await {
Ok(keys) => {
eprintln!(" Found {} keys", keys.len());
scans.push((vol_url.clone(), keys));
}
Err(e) => eprintln!(" Error scanning {vol_url}: {e}"),
}
}
let index = merge_volume_scans(&scans);
let records: Vec<_> = index
.into_iter()
.map(|(k, (v, s))| (k, v, Some(s)))
.collect();
let count = records.len();
db.bulk_put(records).await.expect("bulk_put failed");
eprintln!("Rebuilt index with {count} keys");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_merge_takes_max_size() {
// Can happen due to incomplete writes or corruption
let scans = vec![
("http://vol1".to_string(), vec![("key".to_string(), 50)]),
("http://vol2".to_string(), vec![("key".to_string(), 200)]),
("http://vol3".to_string(), vec![("key".to_string(), 100)]),
];
let index = merge_volume_scans(&scans);
let (volumes, size) = index.get("key").unwrap();
assert_eq!(volumes.len(), 3);
assert_eq!(*size, 200, "should take maximum size across volumes");
}
}