diff --git a/Cargo.toml b/Cargo.toml index 5a8e58b..bc2fcbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2024" axum = "0.8" tokio = { version = "1", features = ["full"] } rusqlite = { version = "0.35", features = ["bundled"] } -reqwest = { version = "0.12", features = ["stream"] } +reqwest = { version = "0.12", features = ["stream", "json"] } serde = { version = "1", features = ["derive"] } serde_json = "1" toml = "0.8" diff --git a/src/lib.rs b/src/lib.rs index ecdc948..af9f2f2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod error; pub mod hasher; pub mod health; pub mod server; +pub mod rebuild; pub mod volume; use std::collections::HashSet; diff --git a/src/main.rs b/src/main.rs index 37a045f..2f49fc6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,8 +45,7 @@ async fn main() { axum::serve(listener, app).await.unwrap(); } Commands::Rebuild => { - eprintln!("rebuild not yet implemented"); - std::process::exit(1); + mkv::rebuild::run(&config).await; } Commands::Rebalance { dry_run: _ } => { eprintln!("rebalance not yet implemented"); diff --git a/src/rebuild.rs b/src/rebuild.rs new file mode 100644 index 0000000..f7e5aa3 --- /dev/null +++ b/src/rebuild.rs @@ -0,0 +1,107 @@ +use std::collections::HashMap; + +use crate::config::Config; +use crate::db; + +#[derive(serde::Deserialize)] +struct NginxEntry { + name: String, + #[serde(rename = "type")] + entry_type: String, + #[serde(default)] + size: Option, +} + +/// List all keys on a volume by recursively walking nginx autoindex. +async fn list_volume_keys(volume_url: &str) -> Result, String> { + let http = reqwest::Client::new(); + let mut keys = Vec::new(); + let mut dirs = vec![String::new()]; // start at root + + while let Some(prefix) = dirs.pop() { + let url = format!("{volume_url}/{prefix}"); + let resp = http + .get(&url) + .send() + .await + .map_err(|e| format!("GET {url}: {e}"))?; + + if !resp.status().is_success() { + return Err(format!("GET {url}: status {}", resp.status())); + } + + let entries: Vec = resp + .json() + .await + .map_err(|e| format!("parse {url}: {e}"))?; + + for entry in entries { + let full_path = if prefix.is_empty() { + entry.name.clone() + } else { + format!("{prefix}{}", entry.name) + }; + + match entry.entry_type.as_str() { + "directory" => dirs.push(format!("{full_path}/")), + "file" => keys.push((full_path, entry.size.unwrap_or(0))), + _ => {} + } + } + } + + Ok(keys) +} + +pub async fn run(config: &Config) { + let db_path = &config.database.path; + + // Ensure parent directory exists + if let Some(parent) = std::path::Path::new(db_path).parent() { + let _ = std::fs::create_dir_all(parent); + } + + // Delete old database + let _ = std::fs::remove_file(db_path); + let _ = std::fs::remove_file(format!("{db_path}-wal")); + let _ = std::fs::remove_file(format!("{db_path}-shm")); + + let (writer, ready_rx) = db::spawn_writer(db_path.to_string()); + ready_rx.await.expect("writer failed to initialize"); + + let volume_urls = config.volume_urls(); + + // key -> (volumes, size) + let mut index: HashMap, i64)> = HashMap::new(); + + for vol_url in &volume_urls { + eprintln!("Scanning {vol_url}..."); + match list_volume_keys(vol_url).await { + Ok(keys) => { + eprintln!(" Found {} keys", keys.len()); + for (key, size) in keys { + let entry = index.entry(key).or_insert_with(|| (Vec::new(), size)); + entry.0.push(vol_url.clone()); + // Use the largest size seen (they should all match) + if size > entry.1 { + entry.1 = size; + } + } + } + Err(e) => { + eprintln!(" Error scanning {vol_url}: {e}"); + } + } + } + + // Batch insert into SQLite + let records: Vec<(String, Vec, Option)> = index + .into_iter() + .map(|(key, (volumes, size))| (key, volumes, Some(size))) + .collect(); + + let count = records.len(); + writer.bulk_put(records).await.expect("bulk_put failed"); + + eprintln!("Rebuilt index with {count} keys"); +}