Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 147 additions & 17 deletions PeerCouchDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,21 @@ import { createBinaryBlob, createTextBlob, isDocContentSame, unique } from "./li
export class PeerCouchDB extends Peer {
man: DirectFileManipulator;
declare config: PeerCouchDBConf;
private _pollTimer: ReturnType<typeof setTimeout> | undefined;
private _polling = false;
private _pollIntervalMs: number;
private _pollTimeoutMs: number;
private _useShortPolling: boolean;

constructor(conf: PeerCouchDBConf, dispatcher: DispatchFun) {
super(conf, dispatcher);
this.man = new DirectFileManipulator(conf);
// Fetch remote since.
this.man.since = this.getSetting("since") || "now";
// Short-polling config (opt-in for Cloudflare Tunnel / reverse proxy environments)
this._useShortPolling = conf.useShortPolling ?? false;
this._pollIntervalMs = conf.pollIntervalMs ?? 5000;
this._pollTimeoutMs = conf.pollTimeoutMs ?? 50000;
}
async delete(pathSrc: string): Promise<boolean> {
const path = this.toLocalPath(pathSrc);
Expand Down Expand Up @@ -150,25 +160,140 @@ export class PeerCouchDB extends Peer {
} else {
this.normalLog(`Watch starting from ${this.man.since}`);
}
this.man.beginWatch(async (entry) => {
const d = entry.type == "plain" ? entry.data : new Uint8Array(decodeBinary(entry.data));
let path = entry.path.substring(baseDir.length);
if (path.startsWith("/")) {
path = path.substring(1);

if (this._useShortPolling) {
this.normalLog(`Starting short-poll mode (interval=${this._pollIntervalMs}ms, timeout=${this._pollTimeoutMs}ms)`);
this._startPolling(baseDir);
} else {
this.man.beginWatch(async (entry) => {
const d = entry.type == "plain" ? entry.data : new Uint8Array(decodeBinary(entry.data));
let path = entry.path.substring(baseDir.length);
if (path.startsWith("/")) {
path = path.substring(1);
}
if (entry.deleted || entry._deleted) {
this.sendLog(`${path} delete detected`);
await this.dispatchDeleted(path);
} else {
const docData = { ctime: entry.ctime, mtime: entry.mtime, size: entry.size, deleted: entry.deleted || entry._deleted, data: d };
this.sendLog(`${path} change detected`);
await this.dispatch(path, docData);
}
}, (entry) => {
this.setSetting("since", this.man.since);
if (entry.path.indexOf(":") !== -1) return false;
return entry.path.startsWith(baseDir);
});
}
}

private _startPolling(baseDir: string) {
this._polling = true;
const changesUrl = `${this.config.url}/${this.config.database}/_changes`;
const authHeader = "Basic " + btoa(`${this.config.username}:${this.config.password}`);

const poll = async () => {
if (!this._polling) return;
try {
const since = this.man.since || "0";
const params = new URLSearchParams({
since: since,
feed: "normal",
include_docs: "true",
filter: "_selector",
});
const body = JSON.stringify({ selector: { type: { "$ne": "leaf" } } });

const controller = new AbortController();
const fetchTimeout = setTimeout(() => controller.abort(), this._pollTimeoutMs + 10000);

const resp = await fetch(`${changesUrl}?${params}`, {
method: "POST",
headers: {
"Authorization": authHeader,
"Content-Type": "application/json",
},
body: body,
signal: controller.signal,
});
clearTimeout(fetchTimeout);

if (!resp.ok) {
const errBody = await resp.text().catch(() => "");
this.normalLog(`Poll HTTP ${resp.status}: ${errBody.substring(0, 200)}`);
if (this._polling) {
this._pollTimer = setTimeout(poll, this._pollIntervalMs);
}
return;
}

const data = await resp.json();
const results = data.results || [];

for (const change of results) {
if (!this._polling) break;
const doc = change.doc;
if (!doc) continue;
// Skip leaf chunks
if (doc.type === "leaf") continue;
// Skip non-note entries (no path field)
if (!doc.path) continue;
// Check if path is in our base dir
if (doc.path.indexOf(":") !== -1) continue;
const fullPath = doc.path as string;
if (baseDir && !fullPath.startsWith(baseDir)) continue;

let path = fullPath.substring(baseDir.length);
if (path.startsWith("/")) {
path = path.substring(1);
}

if (doc.deleted || doc._deleted || change.deleted) {
this.sendLog(`${path} delete detected`);
await this.dispatchDeleted(path);
} else {
// Fetch full doc content via DirectFileManipulator
try {
// Retry with delay — chunks may arrive after the doc metadata
let entry: ReadyEntry | false = false;
for (let attempt = 0; attempt < 3; attempt++) {
entry = await this.man.getByMeta(doc) as ReadyEntry;
if (entry && entry.size > 0) break;
if (attempt < 2) await new Promise(r => setTimeout(r, 2000));
}
if (entry) {
const d = entry.type == "plain" ? entry.data : new Uint8Array(decodeBinary(entry.data));
const docData = { ctime: entry.ctime, mtime: entry.mtime, size: entry.size, deleted: entry.deleted || entry._deleted, data: d };
this.sendLog(`${path} change detected`);
await this.dispatch(path, docData);
}
} catch (ex) {
this.normalLog(`Poll: failed to process ${path}: ${ex}`);
}
}
}

// Update since checkpoint
if (data.last_seq) {
this.man.since = data.last_seq;
this.setSetting("since", this.man.since);
}

} catch (err) {
if (err instanceof DOMException && err.name === "AbortError") {
this.normalLog(`Poll: request aborted (timeout) — will retry`);
} else {
this.normalLog(`Poll error: ${err}`);
}
}
if (entry.deleted || entry._deleted) {
this.sendLog(`${path} delete detected`);
await this.dispatchDeleted(path);
} else {
const docData = { ctime: entry.ctime, mtime: entry.mtime, size: entry.size, deleted: entry.deleted || entry._deleted, data: d };
this.sendLog(`${path} change detected`);
await this.dispatch(path, docData);

// Schedule next poll
if (this._polling) {
this._pollTimer = setTimeout(poll, this._pollIntervalMs);
}
}, (entry) => {
this.setSetting("since", this.man.since);
if (entry.path.indexOf(":") !== -1) return false;
return entry.path.startsWith(baseDir);
});
};

poll();
}
async dispatch(path: string, data: FileData | false) {
if (data === false) return;
Expand All @@ -185,6 +310,11 @@ export class PeerCouchDB extends Peer {
}
}
async stop(): Promise<void> {
this._polling = false;
if (this._pollTimer) {
clearTimeout(this._pollTimer);
this._pollTimer = undefined;
}
this.man.endWatch();
return await Promise.resolve();
}
Expand Down
5 changes: 4 additions & 1 deletion dat/config.sample.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
"passphrase": "passphrase",
"obfuscatePassphrase": "passphrase",
"baseDir": "blog/",
"useRemoteTweaks": true
"useRemoteTweaks": true,
"useShortPolling": false,
"pollIntervalMs": 5000,
"pollTimeoutMs": 50000
},
{
"type": "couchdb",
Expand Down
28 changes: 27 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ The configuration file consists of the following structure.
"passphrase": "passphrase", // E2EE passphrase, if you do not enabled, leave it blank.
"obfuscatePassphrase": "passphrase", // Path obfuscation passphrase, if you do not enabled, leave it blank. if enabled, set the same value of passphrase.
"baseDir": "blog/", // Sharing folder
"useRemoteTweaks":true // Overwrite customChunkSize or minimumChunkSize, and check configuration matches
"useRemoteTweaks":true, // Overwrite customChunkSize or minimumChunkSize, and check configuration matches
"useShortPolling": false, // Use short-polling instead of live changes feed (enable for Cloudflare Tunnel / reverse proxies)
"pollIntervalMs": 5000, // Interval between poll requests in ms (default: 5000)
"pollTimeoutMs": 50000 // HTTP timeout per poll request in ms (default: 50000)
},
{
"type": "couchdb",
Expand Down Expand Up @@ -109,6 +112,29 @@ The configuration file consists of the following structure.
}
```

## Cloudflare Tunnel / Reverse Proxy Support

If your CouchDB is accessed through Cloudflare Tunnel, nginx, or another reverse proxy that terminates long-lived HTTP connections, the default live changes feed (`PouchDB.changes({ live: true })`) may silently stall. The proxy kills the idle connection, but PouchDB does not detect the disconnect and stops receiving updates.

This is the same issue addressed by the "Use timeouts instead of heartbeats" setting in the Self-hosted LiveSync Obsidian plugin.

To enable short-polling mode for the bridge, add these options to your CouchDB peer configuration:

```jsonc
{
"type": "couchdb",
"name": "my-vault",
// ... other settings ...
"useShortPolling": true, // Enable short-polling (default: false)
"pollIntervalMs": 5000, // Poll every 5 seconds (default: 5000)
"pollTimeoutMs": 50000 // HTTP timeout per request (default: 50000)
}
```

When enabled, the bridge replaces the persistent `_changes` feed with periodic HTTP requests (`POST _changes?feed=normal`). Each request completes immediately with any pending changes, avoiding the proxy timeout issue entirely.

**When to use:** Enable `useShortPolling` if your CouchDB is behind Cloudflare Tunnel, a reverse proxy, or any infrastructure that imposes idle connection timeouts. If you connect directly to CouchDB (e.g. `localhost:5984`), the default live mode works fine and you don't need this.

## Realistic example

| name | database_uri / path | CouchDB username | CouchDB password | vault E2EE passphrase | baseDir |
Expand Down
8 changes: 8 additions & 0 deletions types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ export interface PeerCouchDBConf extends DirectFileManipulatorOptions {
passphrase: string;
obfuscatePassphrase: string;
baseDir: string;
/** Use short-polling instead of PouchDB's live changes feed.
* Enable this when CouchDB is accessed through Cloudflare Tunnel
* or a reverse proxy that kills long-lived HTTP connections. */
useShortPolling?: boolean;
/** Interval between poll requests in milliseconds (default: 5000). */
pollIntervalMs?: number;
/** HTTP timeout per poll request in milliseconds (default: 50000). */
pollTimeoutMs?: number;
}


Expand Down