diff --git a/PeerCouchDB.ts b/PeerCouchDB.ts index d58d0e1..97c8627 100644 --- a/PeerCouchDB.ts +++ b/PeerCouchDB.ts @@ -11,11 +11,21 @@ import { createBinaryBlob, createTextBlob, isDocContentSame, unique } from "./li export class PeerCouchDB extends Peer { man: DirectFileManipulator; declare config: PeerCouchDBConf; + private _pollTimer: ReturnType | undefined; + private _polling = false; + private _pollIntervalMs: number; + private _pollTimeoutMs: number; + private _useShortPolling: boolean; + constructor(conf: PeerCouchDBConf, dispatcher: DispatchFun) { super(conf, dispatcher); this.man = new DirectFileManipulator(conf); // Fetch remote since. this.man.since = this.getSetting("since") || "now"; + // Short-polling config (opt-in for Cloudflare Tunnel / reverse proxy environments) + this._useShortPolling = conf.useShortPolling ?? false; + this._pollIntervalMs = conf.pollIntervalMs ?? 5000; + this._pollTimeoutMs = conf.pollTimeoutMs ?? 50000; } async delete(pathSrc: string): Promise { const path = this.toLocalPath(pathSrc); @@ -150,25 +160,140 @@ export class PeerCouchDB extends Peer { } else { this.normalLog(`Watch starting from ${this.man.since}`); } - this.man.beginWatch(async (entry) => { - const d = entry.type == "plain" ? entry.data : new Uint8Array(decodeBinary(entry.data)); - let path = entry.path.substring(baseDir.length); - if (path.startsWith("/")) { - path = path.substring(1); + + if (this._useShortPolling) { + this.normalLog(`Starting short-poll mode (interval=${this._pollIntervalMs}ms, timeout=${this._pollTimeoutMs}ms)`); + this._startPolling(baseDir); + } else { + this.man.beginWatch(async (entry) => { + const d = entry.type == "plain" ? entry.data : new Uint8Array(decodeBinary(entry.data)); + let path = entry.path.substring(baseDir.length); + if (path.startsWith("/")) { + path = path.substring(1); + } + if (entry.deleted || entry._deleted) { + this.sendLog(`${path} delete detected`); + await this.dispatchDeleted(path); + } else { + const docData = { ctime: entry.ctime, mtime: entry.mtime, size: entry.size, deleted: entry.deleted || entry._deleted, data: d }; + this.sendLog(`${path} change detected`); + await this.dispatch(path, docData); + } + }, (entry) => { + this.setSetting("since", this.man.since); + if (entry.path.indexOf(":") !== -1) return false; + return entry.path.startsWith(baseDir); + }); + } + } + + private _startPolling(baseDir: string) { + this._polling = true; + const changesUrl = `${this.config.url}/${this.config.database}/_changes`; + const authHeader = "Basic " + btoa(`${this.config.username}:${this.config.password}`); + + const poll = async () => { + if (!this._polling) return; + try { + const since = this.man.since || "0"; + const params = new URLSearchParams({ + since: since, + feed: "normal", + include_docs: "true", + filter: "_selector", + }); + const body = JSON.stringify({ selector: { type: { "$ne": "leaf" } } }); + + const controller = new AbortController(); + const fetchTimeout = setTimeout(() => controller.abort(), this._pollTimeoutMs + 10000); + + const resp = await fetch(`${changesUrl}?${params}`, { + method: "POST", + headers: { + "Authorization": authHeader, + "Content-Type": "application/json", + }, + body: body, + signal: controller.signal, + }); + clearTimeout(fetchTimeout); + + if (!resp.ok) { + const errBody = await resp.text().catch(() => ""); + this.normalLog(`Poll HTTP ${resp.status}: ${errBody.substring(0, 200)}`); + if (this._polling) { + this._pollTimer = setTimeout(poll, this._pollIntervalMs); + } + return; + } + + const data = await resp.json(); + const results = data.results || []; + + for (const change of results) { + if (!this._polling) break; + const doc = change.doc; + if (!doc) continue; + // Skip leaf chunks + if (doc.type === "leaf") continue; + // Skip non-note entries (no path field) + if (!doc.path) continue; + // Check if path is in our base dir + if (doc.path.indexOf(":") !== -1) continue; + const fullPath = doc.path as string; + if (baseDir && !fullPath.startsWith(baseDir)) continue; + + let path = fullPath.substring(baseDir.length); + if (path.startsWith("/")) { + path = path.substring(1); + } + + if (doc.deleted || doc._deleted || change.deleted) { + this.sendLog(`${path} delete detected`); + await this.dispatchDeleted(path); + } else { + // Fetch full doc content via DirectFileManipulator + try { + // Retry with delay — chunks may arrive after the doc metadata + let entry: ReadyEntry | false = false; + for (let attempt = 0; attempt < 3; attempt++) { + entry = await this.man.getByMeta(doc) as ReadyEntry; + if (entry && entry.size > 0) break; + if (attempt < 2) await new Promise(r => setTimeout(r, 2000)); + } + if (entry) { + const d = entry.type == "plain" ? entry.data : new Uint8Array(decodeBinary(entry.data)); + const docData = { ctime: entry.ctime, mtime: entry.mtime, size: entry.size, deleted: entry.deleted || entry._deleted, data: d }; + this.sendLog(`${path} change detected`); + await this.dispatch(path, docData); + } + } catch (ex) { + this.normalLog(`Poll: failed to process ${path}: ${ex}`); + } + } + } + + // Update since checkpoint + if (data.last_seq) { + this.man.since = data.last_seq; + this.setSetting("since", this.man.since); + } + + } catch (err) { + if (err instanceof DOMException && err.name === "AbortError") { + this.normalLog(`Poll: request aborted (timeout) — will retry`); + } else { + this.normalLog(`Poll error: ${err}`); + } } - if (entry.deleted || entry._deleted) { - this.sendLog(`${path} delete detected`); - await this.dispatchDeleted(path); - } else { - const docData = { ctime: entry.ctime, mtime: entry.mtime, size: entry.size, deleted: entry.deleted || entry._deleted, data: d }; - this.sendLog(`${path} change detected`); - await this.dispatch(path, docData); + + // Schedule next poll + if (this._polling) { + this._pollTimer = setTimeout(poll, this._pollIntervalMs); } - }, (entry) => { - this.setSetting("since", this.man.since); - if (entry.path.indexOf(":") !== -1) return false; - return entry.path.startsWith(baseDir); - }); + }; + + poll(); } async dispatch(path: string, data: FileData | false) { if (data === false) return; @@ -185,6 +310,11 @@ export class PeerCouchDB extends Peer { } } async stop(): Promise { + this._polling = false; + if (this._pollTimer) { + clearTimeout(this._pollTimer); + this._pollTimer = undefined; + } this.man.endWatch(); return await Promise.resolve(); } diff --git a/dat/config.sample.json b/dat/config.sample.json index 6627c6e..72dd9f5 100644 --- a/dat/config.sample.json +++ b/dat/config.sample.json @@ -11,7 +11,10 @@ "passphrase": "passphrase", "obfuscatePassphrase": "passphrase", "baseDir": "blog/", - "useRemoteTweaks": true + "useRemoteTweaks": true, + "useShortPolling": false, + "pollIntervalMs": 5000, + "pollTimeoutMs": 50000 }, { "type": "couchdb", diff --git a/readme.md b/readme.md index 2cba8e1..f2a6e4c 100644 --- a/readme.md +++ b/readme.md @@ -75,7 +75,10 @@ The configuration file consists of the following structure. "passphrase": "passphrase", // E2EE passphrase, if you do not enabled, leave it blank. "obfuscatePassphrase": "passphrase", // Path obfuscation passphrase, if you do not enabled, leave it blank. if enabled, set the same value of passphrase. "baseDir": "blog/", // Sharing folder - "useRemoteTweaks":true // Overwrite customChunkSize or minimumChunkSize, and check configuration matches + "useRemoteTweaks":true, // Overwrite customChunkSize or minimumChunkSize, and check configuration matches + "useShortPolling": false, // Use short-polling instead of live changes feed (enable for Cloudflare Tunnel / reverse proxies) + "pollIntervalMs": 5000, // Interval between poll requests in ms (default: 5000) + "pollTimeoutMs": 50000 // HTTP timeout per poll request in ms (default: 50000) }, { "type": "couchdb", @@ -109,6 +112,29 @@ The configuration file consists of the following structure. } ``` +## Cloudflare Tunnel / Reverse Proxy Support + +If your CouchDB is accessed through Cloudflare Tunnel, nginx, or another reverse proxy that terminates long-lived HTTP connections, the default live changes feed (`PouchDB.changes({ live: true })`) may silently stall. The proxy kills the idle connection, but PouchDB does not detect the disconnect and stops receiving updates. + +This is the same issue addressed by the "Use timeouts instead of heartbeats" setting in the Self-hosted LiveSync Obsidian plugin. + +To enable short-polling mode for the bridge, add these options to your CouchDB peer configuration: + +```jsonc +{ + "type": "couchdb", + "name": "my-vault", + // ... other settings ... + "useShortPolling": true, // Enable short-polling (default: false) + "pollIntervalMs": 5000, // Poll every 5 seconds (default: 5000) + "pollTimeoutMs": 50000 // HTTP timeout per request (default: 50000) +} +``` + +When enabled, the bridge replaces the persistent `_changes` feed with periodic HTTP requests (`POST _changes?feed=normal`). Each request completes immediately with any pending changes, avoiding the proxy timeout issue entirely. + +**When to use:** Enable `useShortPolling` if your CouchDB is behind Cloudflare Tunnel, a reverse proxy, or any infrastructure that imposes idle connection timeouts. If you connect directly to CouchDB (e.g. `localhost:5984`), the default live mode works fine and you don't need this. + ## Realistic example | name | database_uri / path | CouchDB username | CouchDB password | vault E2EE passphrase | baseDir | diff --git a/types.ts b/types.ts index 2e19e0a..dabd341 100644 --- a/types.ts +++ b/types.ts @@ -31,6 +31,14 @@ export interface PeerCouchDBConf extends DirectFileManipulatorOptions { passphrase: string; obfuscatePassphrase: string; baseDir: string; + /** Use short-polling instead of PouchDB's live changes feed. + * Enable this when CouchDB is accessed through Cloudflare Tunnel + * or a reverse proxy that kills long-lived HTTP connections. */ + useShortPolling?: boolean; + /** Interval between poll requests in milliseconds (default: 5000). */ + pollIntervalMs?: number; + /** HTTP timeout per poll request in milliseconds (default: 50000). */ + pollTimeoutMs?: number; }