-
Notifications
You must be signed in to change notification settings - Fork 10
fix(journal): serialize concurrent appends via lockfile #1002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
f339222
f5c737c
82ffccd
5177133
e17da0a
7534d65
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,106 @@ import { debug, warn } from '../../infrastructure/logger.js'; | |
|
|
||
| export const JOURNAL_FILENAME = 'changes.journal'; | ||
| const HEADER_PREFIX = '# codegraph-journal v1 '; | ||
| const LOCK_SUFFIX = '.lock'; | ||
| const LOCK_TIMEOUT_MS = 5_000; | ||
| const LOCK_STALE_MS = 30_000; | ||
| const LOCK_RETRY_MS = 25; | ||
|
|
||
| function sleepSync(ms: number): void { | ||
| const buf = new Int32Array(new SharedArrayBuffer(4)); | ||
| Atomics.wait(buf, 0, 0, ms); | ||
| } | ||
|
|
||
| function isPidAlive(pid: number): boolean { | ||
| if (!Number.isFinite(pid) || pid <= 0) return false; | ||
| try { | ||
| process.kill(pid, 0); | ||
| return true; | ||
| } catch (e) { | ||
| // EPERM means the process exists but we lack permission — still alive. | ||
| return (e as NodeJS.ErrnoException).code === 'EPERM'; | ||
| } | ||
| } | ||
|
|
||
| function acquireJournalLock(lockPath: string): number { | ||
| const start = Date.now(); | ||
| for (;;) { | ||
| try { | ||
| const fd = fs.openSync(lockPath, 'wx'); | ||
| try { | ||
| fs.writeSync(fd, `${process.pid}\n`); | ||
| } catch { | ||
| /* PID stamp is advisory; fd is still exclusive */ | ||
| } | ||
| return fd; | ||
| } catch (e) { | ||
| if ((e as NodeJS.ErrnoException).code !== 'EEXIST') throw e; | ||
| } | ||
|
Comment on lines
+98
to
+127
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If
The comment "PID stamp is advisory; fd is still exclusive" only holds when there are no concurrent waiters, but a journal with a watcher + build process is exactly the concurrent scenario this lock is meant to protect. Fix: when the write fails, release the fd and unlink synchronously (we still hold the exclusive fd at that point) and retry, rather than proceeding into
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in e17da0a. When the nonce stamp |
||
|
|
||
| let holderAlive = true; | ||
| try { | ||
| const pidContent = fs.readFileSync(lockPath, 'utf-8').trim(); | ||
| holderAlive = isPidAlive(Number(pidContent)); | ||
| } catch { | ||
| /* unreadable — fall through to age check */ | ||
| } | ||
|
|
||
| if (!holderAlive) { | ||
| try { | ||
| fs.unlinkSync(lockPath); | ||
| } catch { | ||
| /* another writer stole it first */ | ||
| } | ||
| continue; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When multiple processes are waiting on the same stale (dead-PID) lockfile, they can each independently steal the lock from one another's fresh acquisition:
Both B and C are now inside the critical section simultaneously — exactly the write-serialisation invariant this PR is meant to guarantee. The standard mitigation is to verify ownership after the unlink/re-create cycle: write a nonce (or use the inode number) inside the lockfile, then re-read it after
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in f5c737c. Replaced the unlink + openSync('wx') steal pattern with an atomic write-tmp + fs.renameSync steal. Each writer generates a random nonce, writes it into a temp file, and atomically renames onto the lockfile. After rename we re-read the lockfile and only proceed if our nonce is still there — if another stealer's rename landed after ours, we bail and retry instead of unlinking their live lock. Release also nonce-verifies before unlinking. Added a regression test that stages a lockfile with a different-writer nonce after a real steal cycle and asserts we do not retroactively unlink it. |
||
| } | ||
|
|
||
| try { | ||
| const stat = fs.statSync(lockPath); | ||
| if (Date.now() - stat.mtimeMs > LOCK_STALE_MS) { | ||
| try { | ||
| fs.unlinkSync(lockPath); | ||
| } catch { | ||
| /* raced */ | ||
| } | ||
| continue; | ||
| } | ||
| } catch { | ||
| /* stat failed — keep retrying */ | ||
| } | ||
|
|
||
| if (Date.now() - start > LOCK_TIMEOUT_MS) { | ||
| throw new Error(`Failed to acquire journal lock at ${lockPath} within ${LOCK_TIMEOUT_MS}ms`); | ||
| } | ||
| sleepSync(LOCK_RETRY_MS); | ||
| } | ||
| } | ||
|
|
||
| function releaseJournalLock(lockPath: string, fd: number): void { | ||
| try { | ||
| fs.closeSync(fd); | ||
| } catch { | ||
| /* ignore */ | ||
| } | ||
| try { | ||
| fs.unlinkSync(lockPath); | ||
| } catch { | ||
| /* ignore */ | ||
| } | ||
| } | ||
|
|
||
| function withJournalLock<T>(rootDir: string, fn: () => T): T { | ||
| const dir = path.join(rootDir, '.codegraph'); | ||
| if (!fs.existsSync(dir)) { | ||
| fs.mkdirSync(dir, { recursive: true }); | ||
| } | ||
| const lockPath = path.join(dir, `${JOURNAL_FILENAME}${LOCK_SUFFIX}`); | ||
| const fd = acquireJournalLock(lockPath); | ||
| try { | ||
| return fn(); | ||
| } finally { | ||
| releaseJournalLock(lockPath, fd); | ||
| } | ||
| } | ||
|
|
||
| interface JournalResult { | ||
| valid: boolean; | ||
|
|
@@ -63,43 +163,37 @@ export function appendJournalEntries( | |
| rootDir: string, | ||
| entries: Array<{ file: string; deleted?: boolean }>, | ||
| ): void { | ||
| const dir = path.join(rootDir, '.codegraph'); | ||
| const journalPath = path.join(dir, JOURNAL_FILENAME); | ||
| withJournalLock(rootDir, () => { | ||
| const journalPath = path.join(rootDir, '.codegraph', JOURNAL_FILENAME); | ||
|
|
||
| if (!fs.existsSync(dir)) { | ||
| fs.mkdirSync(dir, { recursive: true }); | ||
| } | ||
| if (!fs.existsSync(journalPath)) { | ||
| fs.writeFileSync(journalPath, `${HEADER_PREFIX}0\n`); | ||
| } | ||
|
|
||
| if (!fs.existsSync(journalPath)) { | ||
| fs.writeFileSync(journalPath, `${HEADER_PREFIX}0\n`); | ||
| } | ||
| const lines = entries.map((e) => { | ||
| if (e.deleted) return `DELETED ${e.file}`; | ||
| return e.file; | ||
| }); | ||
|
|
||
| const lines = entries.map((e) => { | ||
| if (e.deleted) return `DELETED ${e.file}`; | ||
| return e.file; | ||
| fs.appendFileSync(journalPath, `${lines.join('\n')}\n`); | ||
| }); | ||
|
|
||
| fs.appendFileSync(journalPath, `${lines.join('\n')}\n`); | ||
| } | ||
|
|
||
| export function writeJournalHeader(rootDir: string, timestamp: number): void { | ||
| const dir = path.join(rootDir, '.codegraph'); | ||
| const journalPath = path.join(dir, JOURNAL_FILENAME); | ||
| const tmpPath = `${journalPath}.tmp`; | ||
|
|
||
| if (!fs.existsSync(dir)) { | ||
| fs.mkdirSync(dir, { recursive: true }); | ||
| } | ||
| withJournalLock(rootDir, () => { | ||
| const journalPath = path.join(rootDir, '.codegraph', JOURNAL_FILENAME); | ||
| const tmpPath = `${journalPath}.tmp`; | ||
|
|
||
| try { | ||
| fs.writeFileSync(tmpPath, `${HEADER_PREFIX}${timestamp}\n`); | ||
| fs.renameSync(tmpPath, journalPath); | ||
| } catch (err) { | ||
| warn(`Failed to write journal header: ${(err as Error).message}`); | ||
| try { | ||
| fs.unlinkSync(tmpPath); | ||
| } catch { | ||
| /* ignore */ | ||
| fs.writeFileSync(tmpPath, `${HEADER_PREFIX}${timestamp}\n`); | ||
| fs.renameSync(tmpPath, journalPath); | ||
| } catch (err) { | ||
| warn(`Failed to write journal header: ${(err as Error).message}`); | ||
| try { | ||
| fs.unlinkSync(tmpPath); | ||
| } catch { | ||
| /* ignore */ | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Atomics.waitfreezes the Node.js event loop during lock contentionAtomics.waitis a synchronous, blocking call — it stops the entire V8 event loop for the fullmsduration. In a watcher process, every filesystem notification, timer, and pending I/O callback is silenced for each 25 ms retry. In the worst case (5 000 ms timeout, 200 retries), the watcher becomes completely unresponsive for up to 5 seconds before ever throwing.A lighter alternative that avoids blocking the event loop is a simple busy-spin with
process.hrtime.bigint():This keeps each retry short and doesn't starve unrelated callbacks (though it does keep the CPU busy, which is acceptable for the brief per-retry duration).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in f5c737c. Replaced Atomics.wait with a short process.hrtime.bigint busy-spin per your suggestion. The 25ms retry interval keeps CPU burn negligible while letting pending FS events, timers, and I/O callbacks in watcher processes keep firing during contention.