Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 115 additions & 5 deletions crates/core/src/closet_llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,18 @@ pub fn regenerate_closets(
}

fn regenerate_entry(content: &str, endpoint: &str) -> Result<String, RegenerateError> {
regenerate_entry_with_delay(content, endpoint, INITIAL_DELAY_MS)
}

fn regenerate_entry_with_delay(
content: &str,
endpoint: &str,
initial_delay_ms: u64,
) -> Result<String, RegenerateError> {
let prompt = REGENERATE_PROMPT.replace("{context}", content);

let client = reqwest::blocking::Client::new();
let mut delay_ms = INITIAL_DELAY_MS;
let mut delay_ms = initial_delay_ms;

for attempt in 0..MAX_RETRIES {
let response = client
Expand All @@ -178,11 +186,38 @@ fn regenerate_entry(content: &str, endpoint: &str) -> Result<String, RegenerateE
Ok(resp) => {
let status = resp.status();
if status.is_success() {
let parsed: LlmResponse = resp.json().map_err(RegenerateError::Http)?;
if parsed.response.trim().is_empty() {
return Err(RegenerateError::Empty);
// Read the body as text first so we can retry malformed
// JSON (truncated streams / partial chunks under load on
// local LLM runtimes). Mirrors upstream mempalace 2a0ed0c
// (#1155) which moved JSONDecodeError onto the same
// exponential-backoff path as 429/503.
let body = match resp.text() {
Ok(b) => b,
Err(e) => {
if attempt < MAX_RETRIES - 1 {
std::thread::sleep(std::time::Duration::from_millis(delay_ms));
delay_ms *= 2;
continue;
}
return Err(RegenerateError::Http(e));
}
};
match serde_json::from_str::<LlmResponse>(&body) {
Ok(parsed) => {
if parsed.response.trim().is_empty() {
return Err(RegenerateError::Empty);
}
return Ok(parsed.response.trim().to_string());
}
Err(e) => {
if attempt < MAX_RETRIES - 1 {
std::thread::sleep(std::time::Duration::from_millis(delay_ms));
delay_ms *= 2;
continue;
}
return Err(RegenerateError::InvalidJson(e));
}
}
return Ok(parsed.response.trim().to_string());
}

if status.as_u16() == 429 || status.as_u16() == 503 && attempt < MAX_RETRIES - 1 {
Expand Down Expand Up @@ -231,4 +266,79 @@ mod tests {
let debug = format!("{:?}", stats);
assert!(debug.contains("2"));
}

/// Minimal mock HTTP/1.1 server used by the retry tests below.
///
/// `responses` is consumed in order: each connection gets the next body.
/// The first byte of every body is treated as a 200-OK payload (since
/// upstream's bug only fires on the success branch).
fn spawn_mock_server(responses: Vec<String>) -> (String, std::thread::JoinHandle<()>) {
use std::io::{Read, Write};
use std::net::TcpListener;

let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
let addr = listener.local_addr().unwrap();
let url = format!("http://{}/", addr);

let handle = std::thread::spawn(move || {
for body in responses {
let (mut stream, _) = match listener.accept() {
Ok(pair) => pair,
Err(_) => return,
};
// Read the request (and discard it).
let mut buf = [0u8; 4096];
let _ = stream.read(&mut buf);
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\
Content-Length: {}\r\nConnection: close\r\n\r\n{}",
body.len(),
body
);
let _ = stream.write_all(response.as_bytes());
let _ = stream.flush();
}
});

(url, handle)
}

#[test]
fn test_regenerate_entry_retries_on_invalid_json() {
// Regression for upstream mempalace 2a0ed0c (#1155): the first
// attempt returning malformed JSON must be retried, not bailed.
// Two malformed bodies, then a valid one — must succeed on attempt 3.
let (url, handle) = spawn_mock_server(vec![
"not-json{".to_string(),
"{\"response\":".to_string(),
"{\"response\": \"OK\"}".to_string(),
]);

// Use a small initial delay so the test does not wait the
// production 1s+2s+... backoff schedule.
let result = regenerate_entry_with_delay("hello", &url, 1);
assert!(
result.is_ok(),
"expected retry to succeed, got {:?}",
result
);
assert_eq!(result.unwrap(), "OK");

let _ = handle.join();
}

#[test]
fn test_regenerate_entry_returns_invalid_json_after_all_retries_fail() {
// If every attempt returns malformed JSON, the final error must be
// surfaced as InvalidJson (not Empty / not Http).
let (url, handle) = spawn_mock_server(vec!["not-json".to_string(); MAX_RETRIES as usize]);

let result = regenerate_entry_with_delay("hello", &url, 1);
match result {
Err(RegenerateError::InvalidJson(_)) => {}
other => panic!("expected InvalidJson, got {:?}", other),
}

let _ = handle.join();
}
}
106 changes: 103 additions & 3 deletions crates/core/src/diary_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,27 @@ pub fn ingest_diaries(
})
.unwrap_or("unknown");

// Skip if content hasn't changed
// Skip if content hasn't changed. Hash-based — size alone false-negatives
// on same-length edits (e.g. "teh" → "the"), silently dropping real edits.
let state_key = format!(
"{}|{}",
wing,
diary_path.file_name().unwrap_or_default().to_string_lossy()
);
let curr_size = text.len();
if state.get(&state_key).map(|s| s.size) == Some(curr_size) && !force {
continue;
let curr_hash = format!("{:x}", Sha256::digest(text.as_bytes()));
if !force {
if let Some(prev) = state.get(&state_key) {
if let Some(prev_hash) = prev.content_hash.as_ref() {
if prev_hash == &curr_hash {
continue;
}
} else if prev.size > 0 && prev.size == curr_size {
// Legacy state without content_hash: keep size-based skip so a
// post-upgrade run doesn't re-ingest every untouched diary.
continue;
}
}
}

let now_iso = chrono_now_iso();
Expand Down Expand Up @@ -121,6 +133,7 @@ pub fn ingest_diaries(
state_key,
StateEntry {
size: curr_size,
content_hash: Some(curr_hash),
entry_count: entries.len(),
ingested_at: now_iso,
},
Expand Down Expand Up @@ -148,6 +161,11 @@ pub fn ingest_diaries(
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct StateEntry {
size: usize,
/// sha256 hex digest of the diary file's text content. `None` is the
/// legacy schema (size-only); kept optional so a post-upgrade run does
/// not re-ingest every untouched diary.
#[serde(default, skip_serializing_if = "Option::is_none")]
content_hash: Option<String>,
entry_count: usize,
ingested_at: String,
}
Expand Down Expand Up @@ -294,4 +312,86 @@ mod tests {
let id = diary_drawer_id("diary", "2024-01-15");
assert!(id.starts_with("drawer_diary_"));
}

#[test]
fn test_state_entry_legacy_format_deserializes_without_content_hash() {
// Regression for upstream mempalace 0d1c1fb: legacy state files
// written before the content_hash field existed must still load.
let legacy = r#"{"size": 42, "entry_count": 3, "ingested_at": "2024-01-01T00:00:00"}"#;
let parsed: StateEntry = serde_json::from_str(legacy).unwrap();
assert_eq!(parsed.size, 42);
assert_eq!(parsed.entry_count, 3);
assert!(parsed.content_hash.is_none());
}

#[test]
fn test_state_entry_round_trips_with_content_hash() {
let entry = StateEntry {
size: 10,
content_hash: Some("deadbeef".to_string()),
entry_count: 1,
ingested_at: "2024-01-01T00:00:00".to_string(),
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("content_hash"));
let parsed: StateEntry = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.content_hash.as_deref(), Some("deadbeef"));
}

#[test]
fn test_ingest_diaries_detects_same_size_edit() {
// Regression for upstream mempalace 0d1c1fb (#925): an in-place edit
// that preserves byte length (e.g. "teh" -> "the") was silently
// dropped under the old size-only gate. Switch to content_hash.
let _guard = crate::test_env_lock()
.lock()
.expect("test env lock should be available");
let temp = tempfile::TempDir::new().unwrap();
let prev_home = std::env::var_os("HOME");
// diary_ingest::state_file_for uses $HOME/.mempalace/state — point it
// at the tempdir so the test does not pollute real home.
std::env::set_var("HOME", temp.path());

let palace_path = temp.path().join("palace");
let diary_dir = temp.path().join("diaries");
std::fs::create_dir_all(&diary_dir).unwrap();

// Write a diary file with enough content to clear the >50-byte filter.
let diary_file = diary_dir.join("2024-01-15.md");
let original = "## Notes\n\nteh quick brown fox jumps over the lazy dog and again here.\n";
std::fs::write(&diary_file, original).unwrap();

let stats = ingest_diaries(&diary_dir, Some(&palace_path), "diary", false).unwrap();
assert_eq!(
stats.days_updated, 1,
"first ingest should record the diary"
);

// Second ingest with no change: must skip.
let stats = ingest_diaries(&diary_dir, Some(&palace_path), "diary", false).unwrap();
assert_eq!(
stats.days_updated, 0,
"second ingest with unchanged content should skip"
);

// Same-size edit: "teh" -> "the". Old gate would silently drop this.
let edited = original.replace("teh", "the");
assert_eq!(
edited.len(),
original.len(),
"test fixture must preserve length"
);
std::fs::write(&diary_file, &edited).unwrap();

let stats = ingest_diaries(&diary_dir, Some(&palace_path), "diary", false).unwrap();
assert_eq!(
stats.days_updated, 1,
"same-size edit must be detected via content hash"
);

match prev_home {
Some(h) => std::env::set_var("HOME", h),
None => std::env::remove_var("HOME"),
}
}
}
61 changes: 60 additions & 1 deletion crates/core/src/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,26 @@ fn safe_path_component(name: &str) -> String {
}
}

/// Refuse to write into a path that is itself a symlink.
///
/// Defense-in-depth: a pre-placed symlink at the export target would
/// redirect writes to wherever it points (e.g., system directories).
/// Mirrors the miner's input-side caution.
fn reject_symlink(path: &Path, label: &str) -> anyhow::Result<()> {
if std::fs::symlink_metadata(path)
.map(|m| m.file_type().is_symlink())
.unwrap_or(false)
{
anyhow::bail!(
"refusing to export: {} is a symbolic link ({}). \
Remove the symlink or choose a different output path.",
label,
path.display()
);
}
Ok(())
}

pub struct ExportStats {
pub wings: usize,
pub rooms: usize,
Expand All @@ -39,6 +59,7 @@ pub fn export_palace(palace_path: Option<&Path>, output_dir: &Path) -> anyhow::R
});
}

reject_symlink(output_dir, "output_dir")?;
std::fs::create_dir_all(output_dir)?;
#[cfg(unix)]
{
Expand Down Expand Up @@ -82,8 +103,10 @@ pub fn export_palace(palace_path: Option<&Path>, output_dir: &Path) -> anyhow::R
.and_then(|v| v.as_str())
.unwrap_or("general");

let wing_dir = output_dir.join(safe_path_component(wing));
let safe_wing = safe_path_component(wing);
let wing_dir = output_dir.join(&safe_wing);
if !created_wing_dirs.contains_key(wing) {
reject_symlink(&wing_dir, &format!("wing directory '{}'", safe_wing))?;
std::fs::create_dir_all(&wing_dir)?;
created_wing_dirs.insert(wing.to_string(), true);
}
Expand Down Expand Up @@ -213,3 +236,39 @@ fn chrono_now_date() -> String {
let day = remaining + 1;
format!("{:04}-{:02}-{:02}", y, month, day)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_reject_symlink_allows_regular_path() {
let temp = tempfile::TempDir::new().unwrap();
let regular = temp.path().join("regular");
std::fs::create_dir_all(&regular).unwrap();
assert!(reject_symlink(&regular, "output_dir").is_ok());
}

#[test]
fn test_reject_symlink_allows_missing_path() {
let temp = tempfile::TempDir::new().unwrap();
let missing = temp.path().join("missing");
// A path that does not exist is fine — `create_dir_all` will create it.
assert!(reject_symlink(&missing, "output_dir").is_ok());
}

#[cfg(unix)]
#[test]
fn test_reject_symlink_blocks_symlinked_dir() {
let temp = tempfile::TempDir::new().unwrap();
let target = temp.path().join("real");
std::fs::create_dir_all(&target).unwrap();
let link = temp.path().join("link");
std::os::unix::fs::symlink(&target, &link).unwrap();

let err = reject_symlink(&link, "output_dir").unwrap_err();
let msg = format!("{}", err);
assert!(msg.contains("symbolic link"), "unexpected error: {msg}");
assert!(msg.contains("output_dir"), "unexpected error: {msg}");
}
}
Loading
Loading