Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions Hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,31 @@ export class Hub {
}
}

/**
* Dispatches file changes from one peer to all other peers in the same group.
*
* This is the central coordination point for synchronization.
*
* Flow:
* 1. A peer detects a change (via file watcher or database watcher)
* 2. The peer calls this dispatch method via dispatchToHub
* 3. Hub iterates through all peers in the same group
* 4. For each peer (except the source):
* - If data is false (deletion), calls peer.delete()
* - Otherwise, calls peer.put() to write the file/document
* 5. Each peer's put()/delete() method will:
* - Check isRepeating() to avoid redundant writes
* - Write to its storage if needed
* - Its own watcher will detect the write but skip re-dispatching (due to cache)
*
* This design prevents infinite loops:
* - Source peer is excluded from the dispatch
* - Each peer's cache prevents re-dispatching its own writes
*
* @param source - The peer that detected the change
* @param path - File path (global format)
* @param data - File data or false for deletion
*/
async dispatch(source: Peer, path: string, data: FileData | false) {
for (const peer of this.peers) {
if (peer !== source && (source.config.group ?? "") === (peer.config.group ?? "")) {
Expand Down
103 changes: 95 additions & 8 deletions Peer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { join as joinPosix } from "jsr:@std/path/posix";
import { join as joinPosix } from "@std/path/posix";
import type { FileInfo } from "./lib/src/API/DirectFileManipulatorV2.ts";

import { FilePathWithPrefix, LOG_LEVEL, LOG_LEVEL_DEBUG, LOG_LEVEL_INFO } from "./lib/src/common/types.ts";
Expand All @@ -25,26 +25,113 @@ export abstract class Peer {
return ret;
}
toGlobalPath(pathSrc: string) {
let path = pathSrc.startsWith("_") ? pathSrc.substring(1) : pathSrc;
if (path.startsWith(this.config.baseDir)) {
path = path.substring(this.config.baseDir.length);
const path = pathSrc.startsWith("_") ? pathSrc.substring(1) : pathSrc;

// Normalize the baseDir to handle different path formats
let normalizedBaseDir = this.config.baseDir.replace(/\\/g, '/');
// Remove leading ./ if present
if (normalizedBaseDir.startsWith('./')) {
normalizedBaseDir = normalizedBaseDir.substring(2);
}
// this.debugLog(`**TOLOCAL: ${pathSrc} => ${path}`);
return path;
// Ensure trailing slash for proper prefix matching
if (normalizedBaseDir && !normalizedBaseDir.endsWith('/')) {
normalizedBaseDir += '/';
}

// Normalize the input path
let normalizedPath = path.replace(/\\/g, '/');
// Remove leading ./ if present
if (normalizedPath.startsWith('./')) {
normalizedPath = normalizedPath.substring(2);
}

// Remove baseDir prefix if present
if (normalizedBaseDir && normalizedPath.startsWith(normalizedBaseDir)) {
normalizedPath = normalizedPath.substring(normalizedBaseDir.length);
}

// this.debugLog(`**TOGLOBAL: ${pathSrc} => ${normalizedPath} (baseDir: ${normalizedBaseDir})`);
return normalizedPath;
}
abstract delete(path: string): Promise<boolean>;
abstract put(path: string, data: FileData): Promise<boolean>;
abstract get(path: FilePathWithPrefix): Promise<false | FileData>;
abstract start(): Promise<void>;
abstract stop(): Promise<void>;
cache = new LRUCache<string, string>(300, 10000000, true);

/**
* Normalizes a file path for use as a cache key.
* This ensures consistent cache lookups regardless of whether the path
* comes from put(), dispatch(), or other sources.
*
* The normalization:
* 1. Converts the path to a global path format (removes baseDir prefix)
* 2. Converts all backslashes to forward slashes (Windows compatibility)
* 3. Ensures consistent representation across different code paths
*
* This avoids cache misses caused by different path representations:
* - "./vault/file.md" vs "file.md"
* - "file/path.md" vs "file\path.md" (Windows)
*
* @param path - The file path to normalize
* @returns Normalized path suitable for cache key (always uses forward slashes)
*/
normalizeCacheKey(path: string): string {
// If feature is disabled, return path as-is for backward compatibility
if (this.config.useNormalizedCachePaths === false) {
return path;
}

// Convert to global path format to ensure consistency
// This removes baseDir prefix and handles underscore prefixes
let normalized = this.toGlobalPath(path);

// Ensure forward slashes for cross-platform consistency
// This is critical on Windows where paths can have backslashes
normalized = normalized.replace(/\\/g, '/');

return normalized;
}

/**
* Checks if a file operation is a repeat (same content as last processed).
*
* This prevents infinite loops in the following scenario:
* 1. Peer A detects file change and dispatches to Hub
* 2. Hub dispatches to Peer B (and back to Peer A)
* 3. Peer A's put() writes the file (same content)
* 4. Peer A's file watcher detects the write as a "change"
* 5. Without this check, step 1 would repeat infinitely
*
* The function:
* - Computes a hash of the file data
* - Checks if this hash was recently seen for this file path
* - Updates the cache with the new hash
* - Returns true if the operation should be skipped (repeat detected)
*
* @param path - File path (will be normalized for cache lookup)
* @param data - File data to check, or false for deletion
* @returns true if this is a repeat operation (should skip), false if new
*/
async isRepeating(path: string, data: FileData | false) {
// Compute hash of the file content (or special marker for deletions)
const d = await computeHash(data === false ? ["\u0001Deleted"] : data.data);

if (this.cache.has(path) && this.cache.get(path) == d) {
// Normalize the path for consistent cache lookups
const normalizedPath = this.normalizeCacheKey(path);

// Check if we've recently processed this exact file content
const cachedValue = this.cache.get(normalizedPath);
if (this.cache.has(normalizedPath) && cachedValue == d) {
this.normalLog(` Skipped (Repeat) ${path}: ${d?.substring(0, 6)} (cached: ${cachedValue?.substring(0, 6)}, normalized: ${normalizedPath})`);
return true;
}
this.cache.set(path, d);

this.normalLog(`Cache miss for ${path}: ${d?.substring(0, 6)} (previous: ${cachedValue?.substring(0, 6)}, normalized: ${normalizedPath})`);

// Update cache with new hash for this file
this.cache.set(normalizedPath, d);
return false;
}
receiveLog(message: string, level?: LOG_LEVEL) {
Expand Down
56 changes: 56 additions & 0 deletions PeerCouchDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ export class PeerCouchDB extends Peer {
// Fetch remote since.
this.man.since = this.getSetting("since") || "now";
}
/**
* Deletes a document from CouchDB.
*
* Flow:
* 1. Convert path to local format
* 2. Check if this deletion is a repeat operation (avoids loops)
* 3. Delete the document from the database
*
* @param pathSrc - Source path (global format)
* @returns true if deletion succeeded, false if skipped or failed
*/
async delete(pathSrc: string): Promise<boolean> {
const path = this.toLocalPath(pathSrc);
if (await this.isRepeating(pathSrc, false)) {
Expand All @@ -30,6 +41,25 @@ export class PeerCouchDB extends Peer {
}
return r;
}
/**
* Writes a document to CouchDB (called when receiving changes from other peers).
*
* Flow:
* 1. Convert path to local format
* 2. Check if this is a repeat operation using isRepeating()
* - isRepeating() will update the cache with the new document hash
* - This prevents loops when the database watcher detects this write
* 3. Get existing document metadata (if any)
* 4. Compare timestamps and content to avoid unnecessary updates
* 5. Write document to CouchDB
*
* Note: After this write, the CouchDB watcher (beginWatch) will detect the change.
* However, dispatch() will find the same hash in the cache (from step 2) and skip re-dispatching.
*
* @param pathSrc - Source path (global format from Hub)
* @param data - File data including content and metadata
* @returns true if document was written, false if skipped or failed
*/
async put(pathSrc: string, data: FileData): Promise<boolean> {
const path = this.toLocalPath(pathSrc);
if (await this.isRepeating(pathSrc, data)) {
Expand Down Expand Up @@ -170,6 +200,24 @@ export class PeerCouchDB extends Peer {
return entry.path.startsWith(baseDir);
});
}
/**
* Handles document changes detected by the CouchDB watcher (beginWatch).
*
* This is called when the database watcher detects a change.
* It can be triggered by:
* - External changes (another client updating the database)
* - Internal changes (this peer's put() writing the document after receiving from Hub)
*
* Flow:
* 1. Extract relative path from the full document path
* 2. Check if this is a repeat using isRepeating()
* - If the document was just written by put(), the hash will match the cache
* - This prevents infinite loops: put() → write → watcher → dispatch → put() → ...
* 3. If not a repeat, dispatch to Hub so other peers can sync
*
* @param path - Relative path of the changed document
* @param data - Document data or false if not available
*/
async dispatch(path: string, data: FileData | false) {
if (data === false) return;
if (!await this.isRepeating(path, data)) {
Expand All @@ -179,6 +227,14 @@ export class PeerCouchDB extends Peer {
// this.receiveLog(`${path} dispatch repeating`);
// }
}
/**
* Handles document deletions detected by the CouchDB watcher.
*
* Similar to dispatch() but for deletion events.
* Checks if the deletion is a repeat (to avoid loops) before notifying the Hub.
*
* @param path - Relative path of the deleted document
*/
async dispatchDeleted(path: string) {
if (!await this.isRepeating(path, false)) {
await this.dispatchToHub(this, this.toGlobalPath(path), false);
Expand Down
63 changes: 62 additions & 1 deletion PeerStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ export class PeerStorage extends Peer {
super(conf, dispatcher);
}

/**
* Deletes a file from the local storage.
*
* Flow:
* 1. Convert path to local format, then to storage-specific format
* 2. Check if this deletion is a repeat operation (avoids loops)
* 3. Delete the file from disk
* 4. Run any configured post-processing scripts
*
* @param pathSrc - Source path (global format)
* @returns true if deletion succeeded, false if skipped or failed
*/
async delete(pathSrc: string): Promise<boolean> {
const lp = this.toLocalPath(pathSrc);
const path = this.toStoragePath(lp);
Expand All @@ -37,6 +49,27 @@ export class PeerStorage extends Peer {
this.runScript(path, true);
return true;
}
/**
* Writes a file to local storage (called when receiving changes from other peers).
*
* Flow:
* 1. Convert path to local format, then to storage-specific format
* 2. Check if this is a repeat operation using isRepeating()
* - isRepeating() will update the cache with the new file hash
* - This prevents loops when our own watcher detects this write
* 3. Create necessary directories
* 4. Write file data to disk
* 5. Set file modification time
* 6. Update file stat cache (for change detection)
* 7. Run any configured post-processing scripts
*
* Note: After this write, the file watcher will detect the change and call dispatch().
* However, dispatch() will find the same hash in the cache (from step 2) and skip re-dispatching.
*
* @param pathSrc - Source path (global format from Hub)
* @param data - File data including content and metadata
* @returns true if file was written, false if skipped or failed
*/
async put(pathSrc: string, data: FileData): Promise<boolean> {
const lp = this.toLocalPath(pathSrc);
const path = this.toStoragePath(lp);
Expand Down Expand Up @@ -153,6 +186,26 @@ export class PeerStorage extends Peer {
}
watcher?: chokidar.FSWatcher;

/**
* Handles file changes detected by the file system watcher.
*
* This is called when the file watcher detects a change in the monitored directory.
* It can be triggered by:
* - External changes (user editing the file)
* - Internal changes (this peer's put() writing the file after receiving from Hub)
*
* Flow:
* 1. Convert the absolute storage path to a relative path
* 2. Read the current file data from disk
* 3. Schedule processing (debounced to handle rapid changes)
* 4. Update file stat cache
* 5. Check if this is a repeat using isRepeating()
* - If the file was just written by put(), the hash will match the cache
* - This prevents infinite loops: put() → write → watcher → dispatch → put() → ...
* 6. If not a repeat, dispatch to Hub so other peers can sync
*
* @param pathSrc - Absolute storage path from file watcher
*/
async dispatch(pathSrc: string) {
const lP = this.toStoragePath(this.toLocalPath("."));
const path = this.toPosixPath(relative(lP, pathSrc));
Expand All @@ -174,13 +227,21 @@ export class PeerStorage extends Peer {
// }
});
}
/**
* Handles file deletions detected by the file system watcher.
*
* Similar to dispatch() but for deletion events.
* Checks if the deletion is a repeat (to avoid loops) before notifying the Hub.
*
* @param pathSrc - Absolute storage path of deleted file
*/
async dispatchDeleted(pathSrc: string) {
const lP = this.toStoragePath(this.toLocalPath("."));
const path = this.toPosixPath(relative(lP, pathSrc));
await scheduleOnceIfDuplicated(pathSrc, async () => {
await delay(250);
if (!await this.isRepeating(path, false)) {
this.sendLog(`${path} delete detected`);
this.sendLog(`${path} deletion detected`);
await this.dispatchToHub(this, this.toGlobalPath(path), false);
}
});
Expand Down
12 changes: 8 additions & 4 deletions dat/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
"passphrase": "passphrase",
"obfuscatePassphrase": "passphrase",
"baseDir": "blog/",
"useRemoteTweaks": true
"useRemoteTweaks": true,
"useNormalizedCachePaths": true
},
{
"type": "couchdb",
Expand All @@ -25,7 +26,8 @@
"customChunkSize": 100,
"minimumChunkSize": 20,
"obfuscatePassphrase": "passphrase",
"baseDir": "syncLinux/"
"baseDir": "syncLinux/",
"useNormalizedCachePaths": true
},
{
"type": "storage",
Expand All @@ -36,7 +38,8 @@
"cmd": "cmd",
"args": ["/C", "script\\test.bat", "$filename", "$mode"]
},
"scanOfflineChanges": true
"scanOfflineChanges": true,
"useNormalizedCachePaths": true
},
{
"type": "storage",
Expand All @@ -48,7 +51,8 @@
"args": ["$filename", "$mode"]
},
"scanOfflineChanges": true,
"useChokidar": true
"useChokidar": true,
"useNormalizedCachePaths": true
}
]
}
4 changes: 3 additions & 1 deletion deno.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"imports": {
"./lib/src/worker/bgWorker.ts": "./lib/src/worker/bgWorker.mock.ts",
"./lib/src/worker/bgWorker": "./lib/src/worker/bgWorker.mock.ts",
"@std/cli": "jsr:@std/cli@^1.0.23",
"fs/walk": "jsr:@std/fs@^1.0.17/",
"@std/async": "jsr:@std/async@^1.0.12",
"@std/path": "jsr:@std/path@^1.0.9",
Expand All @@ -27,7 +28,8 @@
"pouchdb-mapreduce": "npm:pouchdb-mapreduce@^9.0.0",
"transform-pouch": "npm:transform-pouch@^2.0.0",
"chokidar": "npm:chokidar@^3.5.1",
"trystero": "https://github.com/vrtmrz/vrtmrz/trystero#9e892a93ec14eeb57ce806d272fbb7c3935256d8"
// Changed 'trystero' import from GitHub URL to esm.sh for better compatibility with Deno's module resolution and caching.
"trystero": "https://esm.sh/gh/vrtmrz/trystero#9e892a93ec14eeb57ce806d272fbb7c3935256d8"
},
"lint": {
"rules": {
Expand Down
Loading