Add rebuild tool: scan nginx volumes to reconstruct SQLite index
Walks nginx autoindex JSON recursively, merges replicas across volumes, and bulk-inserts into a fresh database. Also adds reqwest json feature. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
17d5647e16
commit
4f10489b13
4 changed files with 110 additions and 3 deletions
|
|
@ -7,7 +7,7 @@ edition = "2024"
|
||||||
axum = "0.8"
|
axum = "0.8"
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
rusqlite = { version = "0.35", features = ["bundled"] }
|
rusqlite = { version = "0.35", features = ["bundled"] }
|
||||||
reqwest = { version = "0.12", features = ["stream"] }
|
reqwest = { version = "0.12", features = ["stream", "json"] }
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
toml = "0.8"
|
toml = "0.8"
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ pub mod error;
|
||||||
pub mod hasher;
|
pub mod hasher;
|
||||||
pub mod health;
|
pub mod health;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
|
pub mod rebuild;
|
||||||
pub mod volume;
|
pub mod volume;
|
||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
|
||||||
|
|
@ -45,8 +45,7 @@ async fn main() {
|
||||||
axum::serve(listener, app).await.unwrap();
|
axum::serve(listener, app).await.unwrap();
|
||||||
}
|
}
|
||||||
Commands::Rebuild => {
|
Commands::Rebuild => {
|
||||||
eprintln!("rebuild not yet implemented");
|
mkv::rebuild::run(&config).await;
|
||||||
std::process::exit(1);
|
|
||||||
}
|
}
|
||||||
Commands::Rebalance { dry_run: _ } => {
|
Commands::Rebalance { dry_run: _ } => {
|
||||||
eprintln!("rebalance not yet implemented");
|
eprintln!("rebalance not yet implemented");
|
||||||
|
|
|
||||||
107
src/rebuild.rs
Normal file
107
src/rebuild.rs
Normal file
|
|
@ -0,0 +1,107 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use crate::config::Config;
|
||||||
|
use crate::db;
|
||||||
|
|
||||||
|
#[derive(serde::Deserialize)]
|
||||||
|
struct NginxEntry {
|
||||||
|
name: String,
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
entry_type: String,
|
||||||
|
#[serde(default)]
|
||||||
|
size: Option<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all keys on a volume by recursively walking nginx autoindex.
|
||||||
|
async fn list_volume_keys(volume_url: &str) -> Result<Vec<(String, i64)>, String> {
|
||||||
|
let http = reqwest::Client::new();
|
||||||
|
let mut keys = Vec::new();
|
||||||
|
let mut dirs = vec![String::new()]; // start at root
|
||||||
|
|
||||||
|
while let Some(prefix) = dirs.pop() {
|
||||||
|
let url = format!("{volume_url}/{prefix}");
|
||||||
|
let resp = http
|
||||||
|
.get(&url)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("GET {url}: {e}"))?;
|
||||||
|
|
||||||
|
if !resp.status().is_success() {
|
||||||
|
return Err(format!("GET {url}: status {}", resp.status()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let entries: Vec<NginxEntry> = resp
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("parse {url}: {e}"))?;
|
||||||
|
|
||||||
|
for entry in entries {
|
||||||
|
let full_path = if prefix.is_empty() {
|
||||||
|
entry.name.clone()
|
||||||
|
} else {
|
||||||
|
format!("{prefix}{}", entry.name)
|
||||||
|
};
|
||||||
|
|
||||||
|
match entry.entry_type.as_str() {
|
||||||
|
"directory" => dirs.push(format!("{full_path}/")),
|
||||||
|
"file" => keys.push((full_path, entry.size.unwrap_or(0))),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(keys)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(config: &Config) {
|
||||||
|
let db_path = &config.database.path;
|
||||||
|
|
||||||
|
// Ensure parent directory exists
|
||||||
|
if let Some(parent) = std::path::Path::new(db_path).parent() {
|
||||||
|
let _ = std::fs::create_dir_all(parent);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete old database
|
||||||
|
let _ = std::fs::remove_file(db_path);
|
||||||
|
let _ = std::fs::remove_file(format!("{db_path}-wal"));
|
||||||
|
let _ = std::fs::remove_file(format!("{db_path}-shm"));
|
||||||
|
|
||||||
|
let (writer, ready_rx) = db::spawn_writer(db_path.to_string());
|
||||||
|
ready_rx.await.expect("writer failed to initialize");
|
||||||
|
|
||||||
|
let volume_urls = config.volume_urls();
|
||||||
|
|
||||||
|
// key -> (volumes, size)
|
||||||
|
let mut index: HashMap<String, (Vec<String>, i64)> = HashMap::new();
|
||||||
|
|
||||||
|
for vol_url in &volume_urls {
|
||||||
|
eprintln!("Scanning {vol_url}...");
|
||||||
|
match list_volume_keys(vol_url).await {
|
||||||
|
Ok(keys) => {
|
||||||
|
eprintln!(" Found {} keys", keys.len());
|
||||||
|
for (key, size) in keys {
|
||||||
|
let entry = index.entry(key).or_insert_with(|| (Vec::new(), size));
|
||||||
|
entry.0.push(vol_url.clone());
|
||||||
|
// Use the largest size seen (they should all match)
|
||||||
|
if size > entry.1 {
|
||||||
|
entry.1 = size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!(" Error scanning {vol_url}: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Batch insert into SQLite
|
||||||
|
let records: Vec<(String, Vec<String>, Option<i64>)> = index
|
||||||
|
.into_iter()
|
||||||
|
.map(|(key, (volumes, size))| (key, volumes, Some(size)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let count = records.len();
|
||||||
|
writer.bulk_put(records).await.expect("bulk_put failed");
|
||||||
|
|
||||||
|
eprintln!("Rebuilt index with {count} keys");
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue