Fixes
This commit is contained in:
parent
71abb1ed7d
commit
689b85e6f2
3 changed files with 53 additions and 6 deletions
|
|
@ -27,7 +27,7 @@ curl http://localhost:3000/?prefix=path/to/
|
||||||
### Operations
|
### Operations
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Rebuild index by scanning all volumes (disaster recovery)
|
# Rebuild index by scanning all volumes (stop the server first)
|
||||||
mkv -d /tmp/index.db -v http://vol1:8080,http://vol2:8080,http://vol3:8080 -r 2 rebuild
|
mkv -d /tmp/index.db -v http://vol1:8080,http://vol2:8080,http://vol3:8080 -r 2 rebuild
|
||||||
|
|
||||||
# Rebalance after adding/removing volumes (preview with --dry-run)
|
# Rebalance after adding/removing volumes (preview with --dry-run)
|
||||||
|
|
|
||||||
41
src/db.rs
41
src/db.rs
|
|
@ -76,13 +76,45 @@ impl Db {
|
||||||
|
|
||||||
pub async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, AppError> {
|
pub async fn list_keys(&self, prefix: &str) -> Result<Vec<String>, AppError> {
|
||||||
let conn = self.conn.clone();
|
let conn = self.conn.clone();
|
||||||
let pattern = format!("{prefix}%");
|
let prefix = prefix.to_string();
|
||||||
tokio::task::spawn_blocking(move || {
|
tokio::task::spawn_blocking(move || {
|
||||||
let conn = conn.lock().unwrap();
|
let conn = conn.lock().unwrap();
|
||||||
let mut stmt = conn.prepare_cached("SELECT key FROM kv WHERE key LIKE ?1 ORDER BY key")?;
|
if prefix.is_empty() {
|
||||||
|
let mut stmt = conn.prepare_cached("SELECT key FROM kv ORDER BY key")?;
|
||||||
let keys = stmt
|
let keys = stmt
|
||||||
.query_map(params![pattern], |row| row.get(0))?
|
.query_map([], |row| row.get(0))?
|
||||||
.collect::<Result<Vec<String>, _>>()?;
|
.collect::<Result<Vec<String>, _>>()?;
|
||||||
|
return Ok(keys);
|
||||||
|
}
|
||||||
|
// Compute exclusive upper bound: increment last non-0xFF byte
|
||||||
|
let upper = {
|
||||||
|
let mut bytes = prefix.as_bytes().to_vec();
|
||||||
|
let mut result = None;
|
||||||
|
while let Some(last) = bytes.pop() {
|
||||||
|
if last < 0xFF {
|
||||||
|
bytes.push(last + 1);
|
||||||
|
result = Some(String::from_utf8_lossy(&bytes).into_owned());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result
|
||||||
|
};
|
||||||
|
let keys = match &upper {
|
||||||
|
Some(end) => {
|
||||||
|
let mut stmt = conn.prepare_cached(
|
||||||
|
"SELECT key FROM kv WHERE key >= ?1 AND key < ?2 ORDER BY key",
|
||||||
|
)?;
|
||||||
|
stmt.query_map(params![prefix, end], |row| row.get(0))?
|
||||||
|
.collect::<Result<Vec<String>, _>>()?
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
let mut stmt = conn.prepare_cached(
|
||||||
|
"SELECT key FROM kv WHERE key >= ?1 ORDER BY key",
|
||||||
|
)?;
|
||||||
|
stmt.query_map(params![prefix], |row| row.get(0))?
|
||||||
|
.collect::<Result<Vec<String>, _>>()?
|
||||||
|
}
|
||||||
|
};
|
||||||
Ok(keys)
|
Ok(keys)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
|
@ -120,6 +152,7 @@ impl Db {
|
||||||
let conn = self.conn.clone();
|
let conn = self.conn.clone();
|
||||||
tokio::task::spawn_blocking(move || {
|
tokio::task::spawn_blocking(move || {
|
||||||
let conn = conn.lock().unwrap();
|
let conn = conn.lock().unwrap();
|
||||||
|
conn.execute_batch("BEGIN")?;
|
||||||
let mut stmt = conn.prepare_cached(
|
let mut stmt = conn.prepare_cached(
|
||||||
"INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3)
|
"INSERT INTO kv (key, volumes, size) VALUES (?1, ?2, ?3)
|
||||||
ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3",
|
ON CONFLICT(key) DO UPDATE SET volumes = ?2, size = ?3",
|
||||||
|
|
@ -127,6 +160,8 @@ impl Db {
|
||||||
for (key, volumes, size) in &records {
|
for (key, volumes, size) in &records {
|
||||||
stmt.execute(params![key, encode_volumes(volumes), size])?;
|
stmt.execute(params![key, encode_volumes(volumes), size])?;
|
||||||
}
|
}
|
||||||
|
drop(stmt);
|
||||||
|
conn.execute_batch("COMMIT")?;
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
|
|
||||||
12
src/lib.rs
12
src/lib.rs
|
|
@ -7,6 +7,8 @@ pub mod rebuild;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
const DEFAULT_BODY_LIMIT: usize = 256 * 1024 * 1024; // 256 MB
|
||||||
|
|
||||||
pub struct Args {
|
pub struct Args {
|
||||||
pub db_path: String,
|
pub db_path: String,
|
||||||
pub volumes: Vec<String>,
|
pub volumes: Vec<String>,
|
||||||
|
|
@ -14,6 +16,15 @@ pub struct Args {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn build_app(args: &Args) -> axum::Router {
|
pub fn build_app(args: &Args) -> axum::Router {
|
||||||
|
if args.replicas > args.volumes.len() {
|
||||||
|
eprintln!(
|
||||||
|
"Error: replication factor ({}) exceeds number of volumes ({})",
|
||||||
|
args.replicas,
|
||||||
|
args.volumes.len()
|
||||||
|
);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(parent) = std::path::Path::new(&args.db_path).parent() {
|
if let Some(parent) = std::path::Path::new(&args.db_path).parent() {
|
||||||
std::fs::create_dir_all(parent).unwrap_or_else(|e| {
|
std::fs::create_dir_all(parent).unwrap_or_else(|e| {
|
||||||
eprintln!("Failed to create database directory: {e}");
|
eprintln!("Failed to create database directory: {e}");
|
||||||
|
|
@ -37,5 +48,6 @@ pub fn build_app(args: &Args) -> axum::Router {
|
||||||
.delete(server::delete_key)
|
.delete(server::delete_key)
|
||||||
.head(server::head_key),
|
.head(server::head_key),
|
||||||
)
|
)
|
||||||
|
.layer(axum::extract::DefaultBodyLimit::max(DEFAULT_BODY_LIMIT))
|
||||||
.with_state(state)
|
.with_state(state)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue