Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ dat/*
vault*/*
!.gitempty
*.log
node_modules
node_modules
lib
12 changes: 8 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ FROM denoland/deno:2.3.1
WORKDIR /app

VOLUME /app/dat
VOLUME /app/data

COPY . .
RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*

RUN rm -rf lib && git clone https://github.com/aotsukiqx/livesync-commonlib lib && \
cd lib && git checkout 798a313

RUN deno install -A
RUN mkdir -p /app/vault /app/vault-linux

COPY . .

CMD [ "deno", "task", "run" ]
CMD ["deno", "run", "--unstable-kv", "-A", "main.ts"]

21 changes: 20 additions & 1 deletion Hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ export class Hub {
this.peers = [];
for (const peer of this.conf.peers) {
if (peer.type == "couchdb") {
const since = Deno.env.get("LSB_SINCE");
if (since) {
(peer as PeerCouchDBConf).since = since;
}
const p = new PeerCouchDB(peer, this.dispatch.bind(this));
this.peers.push(p);
} else if (peer.type == "storage") {
Expand All @@ -27,10 +31,25 @@ export class Hub {
}
}
for (const p of this.peers) {
p.start();
this.safeStartPeer(p);
}
}

private safeStartPeer(p: Peer) {
setTimeout(() => {
try {
const result = p.start();
if (result && typeof result.then === 'function') {
result.then(() => {}).catch((ex) => {
console.error(`Peer ${p.config.name} stopped: ${ex}`);
});
}
} catch (ex) {
console.error(`Failed to start peer ${p.config.name}: ${ex}`);
}
}, 100);
}

async dispatch(source: Peer, path: string, data: FileData | false) {
for (const peer of this.peers) {
if (peer !== source && (source.config.group ?? "") === (peer.config.group ?? "")) {
Expand Down
23 changes: 18 additions & 5 deletions Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ import { computeHash } from "./util.ts";

export type DispatchFun = (source: Peer, path: string, data: FileData | false) => Promise<void>;

// Deno KV store for persistent settings (replaces localStorage)
let kv: Deno.KvStore | null = null;

async function getKv(): Promise<Deno.KvStore> {
if (!kv) {
kv = await Deno.openKv();
}
return kv;
}

export abstract class Peer {
config: PeerConf;
// hub: Hub;
Expand All @@ -25,7 +35,7 @@ export abstract class Peer {
return ret;
}
toGlobalPath(pathSrc: string) {
let path = pathSrc.startsWith("_") ? pathSrc.substring(1) : pathSrc;
let path = pathSrc;
if (path.startsWith(this.config.baseDir)) {
path = path.substring(this.config.baseDir.length);
}
Expand Down Expand Up @@ -62,11 +72,14 @@ export abstract class Peer {
_getKey(key: string) {
return `${this.config.name}-${this.config.type}-${this.config.baseDir}-${key}`;
}
setSetting(key: string, value: string) {
return localStorage.setItem(this._getKey(key), value);
async setSetting(key: string, value: string) {
const store = await getKv();
await store.set([this._getKey(key)], value);
}
getSetting(key: string) {
return localStorage.getItem(this._getKey(key));
async getSetting(key: string): Promise<string | null> {
const store = await getKv();
const result = await store.get<string>([this._getKey(key)]);
return result.value ?? null;
}
compareDate(a: FileInfo, b: FileInfo) {
const aMTime = ~~(a?.mtime ?? 0 / 1000);
Expand Down
20 changes: 12 additions & 8 deletions PeerCouchDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { DirectFileManipulator, FileInfo, MetaEntry, ReadyEntry } from "./lib/sr
import { FilePathWithPrefix, LOG_LEVEL_NOTICE, MILESTONE_DOCID, TweakValues } from "./lib/src/common/types.ts";
import { PeerCouchDBConf, FileData } from "./types.ts";
import { decodeBinary } from "./lib/src/string_and_binary/convert.ts";
import { isPlainText } from "./lib/src/string_and_binary/path.ts";
import { isPlainText, stripAllPrefixes } from "./lib/src/string_and_binary/path.ts";
import { DispatchFun, Peer } from "./Peer.ts";
import { createBinaryBlob, createTextBlob, isDocContentSame, unique } from "./lib/src/common/utils.ts";

Expand All @@ -14,8 +14,6 @@ export class PeerCouchDB extends Peer {
constructor(conf: PeerCouchDBConf, dispatcher: DispatchFun) {
super(conf, dispatcher);
this.man = new DirectFileManipulator(conf);
// Fetch remote since.
this.man.since = this.getSetting("since") || "now";
}
async delete(pathSrc: string): Promise<boolean> {
const path = this.toLocalPath(pathSrc);
Expand Down Expand Up @@ -139,15 +137,20 @@ export class PeerCouchDB extends Peer {
}
if (!w) {
this.normalLog(`Remote database looks like empty. fetch from the first.`);
this.setSetting("remote-created", "0");
await this.setSetting("remote-created", "0");
return;
}
const created = w.created;
if (this.getSetting("remote-created") !== `${created}`) {
if (await this.getSetting("remote-created") !== `${created}`) {
this.man.since = "";
this.normalLog(`Remote database looks like rebuilt. fetch from the first again.`);
this.setSetting("remote-created", `${created}`);
await this.setSetting("remote-created", `${created}`);
} else {
if (this.config.since !== undefined) {
this.man.since = this.config.since;
} else {
this.man.since = await this.getSetting("since") || "now";
}
this.normalLog(`Watch starting from ${this.man.since}`);
}
this.man.beginWatch(async (entry) => {
Expand All @@ -156,6 +159,7 @@ export class PeerCouchDB extends Peer {
if (path.startsWith("/")) {
path = path.substring(1);
}
path = stripAllPrefixes(path as FilePathWithPrefix) as string;
if (entry.deleted || entry._deleted) {
this.sendLog(`${path} delete detected`);
await this.dispatchDeleted(path);
Expand All @@ -164,8 +168,8 @@ export class PeerCouchDB extends Peer {
this.sendLog(`${path} change detected`);
await this.dispatch(path, docData);
}
}, (entry) => {
this.setSetting("since", this.man.since);
}, async (entry) => {
await this.setSetting("since", this.man.since);
if (entry.path.indexOf(":") !== -1) return false;
return entry.path.startsWith(baseDir);
});
Expand Down
56 changes: 5 additions & 51 deletions PeerStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import { parse, format, relative, dirname, resolve } from "@std/path";
import { format as posixFormat, parse as posixParse } from "@std/path/posix"
import { scheduleOnceIfDuplicated } from "octagonal-wheels/concurrency/lock";
import { DispatchFun, Peer } from "./Peer.ts";
import chokidar from "chokidar";
import { walk } from 'fs/walk';
import { walk } from "@std/fs/walk";

import { scheduleTask } from "octagonal-wheels/concurrency/task";

Expand Down Expand Up @@ -50,7 +49,7 @@ export class PeerStorage extends Peer {
await Deno.mkdir(dirName, { recursive: true });
} catch (ex) {
// While recursive is true, mkdir will not raise the `AlreadyExist`.
console.log(ex);
this.receiveLog(`mkdir failed: ${ex}`, LOG_LEVEL_NOTICE);
}
const fp = await Deno.open(path, { read: true, write: true, create: true });
if (data.data instanceof Uint8Array) {
Expand Down Expand Up @@ -151,7 +150,6 @@ export class PeerStorage extends Peer {
}
return ret;
}
watcher?: chokidar.FSWatcher;

async dispatch(pathSrc: string) {
const lP = this.toStoragePath(this.toLocalPath("."));
Expand Down Expand Up @@ -207,13 +205,13 @@ export class PeerStorage extends Peer {
return false;
}
const fileStat = `${stat.mtime?.getTime() ?? 0}-${stat.size}`;
this.setSetting(key, fileStat);
await this.setSetting(key, fileStat);
}

async isChanged(pathSrc: string) {
const lp = this.toLocalPath(pathSrc);
const key = `file-stat-${lp}`;
const last = this.getSetting(key);
const last = await this.getSetting(key);
// console.log(`R:${key}`);
// console.log(`RV:${last}`);

Expand Down Expand Up @@ -278,54 +276,10 @@ export class PeerStorage extends Peer {

}
async start() {
// For addressing Deno's and chokidar's compatibility issues (especially on Windows), we use Deno's fs watcher as the primary watcher.
if (!this.config.useChokidar) {
await this.startDenoFsWatch();
return;
}

if (this.watcher) {
this.watcher.close();
this.watcher = undefined;
}
const lP = this.toStoragePath(this.toLocalPath("."));
this.normalLog(`Scan offline changes: ${this.config.scanOfflineChanges ? "Enabled, now starting..." : "Disabled"}`);
this.watcher = chokidar.watch(lP,
{
ignoreInitial: !this.config.scanOfflineChanges,
awaitWriteFinish: {
stabilityThreshold: 500,
},
});

this.watcher.on("change", async (path) => {
const ePath = this.toPosixPath(relative(this.toLocalPath("."), path));
if (!await this.isChanged(ePath)) {
// this.debugLog(`Not changed: ${ePath}`);
} else {
this.debugLog(`Changes detected: ${ePath}`);
await this.dispatch(path);
}
})
this.watcher.on("add", async (path) => {
const ePath = this.toPosixPath(relative(this.toLocalPath("."), path));
if (!await this.isChanged(ePath)) {
// this.debugLog(`Not changed: ${ePath}`);
} else {
this.debugLog(`New detected: ${ePath}`);
await this.dispatch(path);
}
})
this.watcher.on("unlink", async (path) => {
const ePath = this.toPosixPath(relative(this.toLocalPath("."), path));
this.debugLog(`Unlink detected: ${ePath}`);
await this.dispatchDeleted(path)
})
await this.startDenoFsWatch();
}
async stop() {
this.watcher?.close();
this.watcherDeno?.close();
this.watcherDeno = undefined;
return await Promise.resolve();
}
}
5 changes: 4 additions & 1 deletion deno.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
"run": "deno run -A main.ts"
},
"imports": {
"@/": "./lib/src/",
"@lib/": "./lib/src/",
"./lib/src/worker/bgWorker.ts": "./lib/src/worker/bgWorker.mock.ts",
"./lib/src/worker/bgWorker": "./lib/src/worker/bgWorker.mock.ts",
"fs/walk": "jsr:@std/fs@^1.0.17/",
"@std/async": "jsr:@std/async@^1.0.12",
"@std/fs": "jsr:@std/fs@^1.0.17",
"@std/path": "jsr:@std/path@^1.0.9",
"@types/diff-match-patch": "npm:@types/diff-match-patch@^1.0.36",
"@types/pouchdb": "npm:@types/pouchdb@^6.4.2",
Expand All @@ -34,6 +37,6 @@
"exclude": ["no-explicit-any", "ban-ts-comment"]
}
},
"nodeModulesDir": "manual",
"nodeModulesDir": "auto",
"unstable": ["bare-node-builtins", "sloppy-imports", "byonm"]
}
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ services:
- ./data:/app/data
- ./dat:/app/dat
restart: unless-stopped
environment:
- LSB_CONFIG=/app/dat/config.json
58 changes: 54 additions & 4 deletions main.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,79 @@
import { defaultLoggerEnv } from "./lib/src/common/logger.ts";
import { LOG_LEVEL_DEBUG } from "./lib/src/common/logger.ts";
import { Hub } from "./Hub.ts";
import { Config } from "./types.ts";
import { Config, isCouchDBPeer, isStoragePeer } from "./types.ts";
import { parseArgs } from "jsr:@std/cli";

const KEY = "LSB_"
defaultLoggerEnv.minLogLevel = LOG_LEVEL_DEBUG;
const configFile = Deno.env.get(`${KEY}CONFIG`) || "./dat/config.json";

console.log("LiveSync Bridge is now starting...");

globalThis.addEventListener("unhandledrejection", (event) => {
event.preventDefault();
console.error(`Unhandled rejection: ${event.reason}`);
});
let config: Config = { peers: [] };
const flags = parseArgs(Deno.args, {
boolean: ["reset"],
// string: ["version"],
default: { reset: false },
});

if (flags.reset) {
localStorage.clear();
const kv = await Deno.openKv();
for await (const entry of kv.list()) {
kv.delete(entry.key);
}
console.log("Storage cleared.");
}

function validateConfig(cfg: unknown): cfg is Config {
if (!cfg || typeof cfg !== "object") return false;
const c = cfg as Record<string, unknown>;
if (!Array.isArray(c.peers)) {
console.error("Config error: 'peers' must be an array");
return false;
}
for (let i = 0; i < c.peers.length; i++) {
const peer = c.peers[i] as Record<string, unknown>;
if (!peer.type || !["storage", "couchdb"].includes(peer.type as string)) {
console.error(`Config error: peers[${i}].type must be "storage" or "couchdb"`);
return false;
}
if (!peer.name) {
console.error(`Config error: peers[${i}].name is required`);
return false;
}
if (peer.type === "storage" && !peer.baseDir) {
console.error(`Config error: peers[${i}].baseDir is required for storage peer`);
return false;
}
if (peer.type === "couchdb") {
if (!peer.database) {
console.error(`Config error: peers[${i}].database is required for couchdb peer`);
return false;
}
if (!peer.url) {
console.error(`Config error: peers[${i}].url is required for couchdb peer`);
return false;
}
}
}
return true;
}

try {
const confText = await Deno.readTextFile(configFile);
config = JSON.parse(confText);
const parsed = JSON.parse(confText);
if (!validateConfig(parsed)) {
Deno.exit(1);
}
config = parsed;
} catch (ex) {
console.error("Could not parse configuration!");
console.error(ex);
Deno.exit(1);
}
console.log("LiveSync Bridge is now started!");
const hub = new Hub(config);
Expand Down
1 change: 1 addition & 0 deletions types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface PeerCouchDBConf extends DirectFileManipulatorOptions {
passphrase: string;
obfuscatePassphrase: string;
baseDir: string;
since?: string;
}


Expand Down