diff --git a/backend/instance.ts b/backend/instance.ts index d751d86..1a40722 100644 --- a/backend/instance.ts +++ b/backend/instance.ts @@ -21,6 +21,11 @@ type SendUpdateMessage = { update: ReceivedStatusUpdate; }; +type SendRealtimeMessage = { + type: "sendRealtime"; + data: Uint8Array; +}; + type SetUpdateListenerMessage = { type: "setUpdateListener"; serial: number; @@ -125,6 +130,12 @@ export class Instances { // XXX should validate parsed if (isSendUpdateMessage(parsed)) { instance.webXdc.sendUpdate(parsed.update, ""); + } else if (isSendRealtimeMessage(parsed)) { + instance.webXdc.sendRealtimeData(parsed.data); + } else if (isSetRealtimeListenerMessage(parsed)) { + instance.webXdc.connectRealtime((data) => { + return broadcast(wss, JSON.stringify({ type: "realtime", data })); + }); } else if (isSetUpdateListenerMessage(parsed)) { instance.webXdc.connect( (updates) => { @@ -216,6 +227,16 @@ function isSendUpdateMessage(value: any): value is SendUpdateMessage { return value.type === "sendUpdate"; } +function isSendRealtimeMessage(value: any): value is SendRealtimeMessage { + return value.type === "sendRealtime"; +} + +function isSetRealtimeListenerMessage( + value: any, +): value is { type: "setRealtimeListener" } { + return value.type === "setRealtimeListener"; +} + function isSetUpdateListenerMessage( value: any, ): value is SetUpdateListenerMessage { diff --git a/backend/message.test.ts b/backend/message.test.ts index 3f462eb..9ecde67 100644 --- a/backend/message.test.ts +++ b/backend/message.test.ts @@ -53,6 +53,33 @@ test("distribute to self", () => { ]); }); +test("Send realtime", () => { + const [getMessages, onMessage] = track(); + const processor = createProcessor(onMessage); + const client0 = processor.createClient("3001"); + const client1 = processor.createClient("3002"); + + const client0Heard: string[] = []; + const client1Heard: string[] = []; + + const decoder = new TextDecoder(); + client0.connectRealtime((data) => { + client0Heard.push(decoder.decode(data)); + return true; + }); + client1.connectRealtime((data) => { + client1Heard.push(decoder.decode(data)); + return true; + }); + + const encoder = new TextEncoder(); + + client0.sendRealtimeData(new Uint8Array(encoder.encode("hi"))); + + expect(client1Heard).toMatchObject(["hi"]); + expect(client0Heard).toMatchObject([]); +}); + test("distribute to self and other", () => { const [getMessages, onMessage] = track(); const processor = createProcessor(onMessage); diff --git a/backend/message.ts b/backend/message.ts index cab5fc2..8c4b83d 100644 --- a/backend/message.ts +++ b/backend/message.ts @@ -1,4 +1,5 @@ -import type { +import { + RealtimeListener as WebxdcRealtimeListener, ReceivedStatusUpdate, SendingStatusUpdate, Webxdc, @@ -10,6 +11,7 @@ type UpdateListenerMulti = (updates: ReceivedStatusUpdate[]) => boolean; type ClearListener = () => boolean; type DeleteListener = () => boolean; +type RTListener = (data: Uint8Array) => boolean; type Connect = ( updateListener: UpdateListenerMulti, @@ -20,7 +22,9 @@ type Connect = ( export type WebXdcMulti = { connect: Connect; + connectRealtime: (listener: RTListener) => void; sendUpdate: Webxdc["sendUpdate"]; + sendRealtimeData: (data: Uint8Array) => void; }; export type OnMessage = (message: Message) => void; @@ -33,6 +37,7 @@ export interface IProcessor { class Client implements WebXdcMulti { updateListener: UpdateListenerMulti | null = null; + realtimeListener: RTListener | null = null; clearListener: ClearListener | null = null; updateSerial: number | null = null; deleteListener: DeleteListener | null = null; @@ -46,6 +51,35 @@ class Client implements WebXdcMulti { this.processor.distribute(this.id, update); } + sendRealtimeData(data: Uint8Array) { + this.processor.distributeRealtime(this.id, data); + } + + connectRealtime(listener: RTListener) { + this.processor.onMessage({ + type: "connect-realtime", + instanceId: this.id, + instanceColor: getColorForId(this.id), + timestamp: Date.now(), + }); + + const realtimeListener = (data: Uint8Array) => { + const hasReceived = listener(data); + if (hasReceived) { + this.processor.onMessage({ + type: "realtime-received", + data, + instanceId: this.id, + instanceColor: getColorForId(this.id), + timestamp: Date.now(), + }); + } + return hasReceived; + }; + + this.realtimeListener = realtimeListener; + } + connect( listener: UpdateListenerMulti, serial: number, @@ -108,6 +142,13 @@ class Client implements WebXdcMulti { this.updateListener([update]); } + receiveRealtime(data: Uint8Array) { + if (this.realtimeListener == null) { + return; + } + this.realtimeListener(data); + } + clear() { if ( this.clearListener == null || @@ -148,6 +189,21 @@ class Processor implements IProcessor { this.clients.splice(client_index, 1); } + distributeRealtime(instanceId: string, data: Uint8Array) { + this.onMessage({ + type: "realtime-sent", + instanceId: instanceId, + instanceColor: getColorForId(instanceId), + data, + timestamp: Date.now(), + }); + for (const client of this.clients) { + if (client.id != instanceId) { + client.receiveRealtime(data); + } + } + } + distribute(instanceId: string, update: SendingStatusUpdate) { this.currentSerial++; const receivedUpdate: ReceivedStatusUpdate = { diff --git a/sim/create.ts b/sim/create.ts index aa60079..d7a6307 100644 --- a/sim/create.ts +++ b/sim/create.ts @@ -1,10 +1,18 @@ -import { Webxdc, ReceivedStatusUpdate } from "@webxdc/types"; - +import { + Webxdc, + ReceivedStatusUpdate, + RealtimeListener as WebxdcRealtimeListener, +} from "@webxdc/types"; type UpdatesMessage = { type: "updates"; updates: ReceivedStatusUpdate[]; }; +type RealtimeMessage = { + type: "realtime"; + data: Uint8Array; +}; + type ClearMessage = { type: "clear"; }; @@ -23,7 +31,12 @@ type DeleteMessage = { type: "delete"; }; -type Message = UpdatesMessage | ClearMessage | InfoMessage | DeleteMessage; +type Message = + | UpdatesMessage + | ClearMessage + | InfoMessage + | DeleteMessage + | RealtimeMessage; export type TransportMessageCallback = (message: Message) => void; @@ -32,7 +45,8 @@ export type TransportConnectCallback = () => void; export type Transport = { send(data: any): void; onMessage(callback: TransportMessageCallback): void; - onConnect(callback: TransportConnectCallback): void; + hasMessageListener(): boolean; + onConnect(callback: TransportConnectCallback): void; // Socket connection cb clear(): void; address(): string; name(): string; @@ -42,21 +56,63 @@ export type Transport = { type Log = (...args: any[]) => void; +export class RealtimeListener implements WebxdcRealtimeListener { + private trashed = false; + private listener: (data: Uint8Array) => void = () => {}; + + constructor( + public sendHook: (data: Uint8Array) => void = () => {}, + public setListenerHook: () => void = () => {}, + private leaveHook: () => void = () => {}, + ) {} + + is_trashed(): boolean { + return this.trashed; + } + + receive(data: Uint8Array) { + if (this.trashed) { + throw new Error("realtime listener is trashed and can no longer be used"); + } + if (this.listener) { + this.listener(data); + } + } + + setListener(listener: (data: Uint8Array) => void) { + this.setListenerHook(); + this.listener = listener; + } + + send(data: Uint8Array) { + if (!(data instanceof Uint8Array)) { + throw new Error("realtime listener data must be a Uint8Array"); + } + this.sendHook(data); + } + + leave() { + this.leaveHook(); + this.trashed = true; + } +} + export function createWebXdc( transport: Transport, log: Log = () => {}, ): Webxdc { let resolveUpdateListenerPromise: (() => void) | null = null; + let realtime: RealtimeListener | null = null; const webXdc: Webxdc = { - sendUpdate: (update: any) => { + sendUpdate: (update) => { transport.send({ type: "sendUpdate", update }); - log("send", { update }); + log("send update", { update }); }, setUpdateListener: (listener, serial = 0): Promise => { transport.onMessage((message) => { if (isUpdatesMessage(message)) { - log("recv", message.updates); + log("recv update", message.updates); for (const update of message.updates) { listener(update); } @@ -64,6 +120,13 @@ export function createWebXdc( resolveUpdateListenerPromise(); resolveUpdateListenerPromise = null; } + } else if (isRealtimeMessage(message)) { + if (realtime === null) { + return; + } + // Conversion to any because the actual data is a dict representation of Uint8Array + // This is due to JSON.stringify conversion. + realtime!.receive(new Uint8Array(Object.values(message.data as any))); } else if (isClearMessage(message)) { log("clear"); transport.clear(); @@ -73,6 +136,8 @@ export function createWebXdc( } else if (isDeleteMessage(message)) { log("delete"); window.top?.close(); + } else { + log("error", `Unhandled message ${message}`); } }); transport.onConnect(() => { @@ -92,17 +157,14 @@ export function createWebXdc( ); } - /** @type {(file: Blob) => Promise} */ - const blob_to_base64 = (file) => { + const blob_to_base64 = (file: Blob) => { const data_start = ";base64,"; return new Promise((resolve, reject) => { const reader = new FileReader(); reader.readAsDataURL(file); reader.onload = () => { - /** @type {string} */ - //@ts-ignore - let data = reader.result; - resolve(data.slice(data.indexOf(data_start) + data_start.length)); + let data: string = reader.result as string; + resolve(data!.slice(data!.indexOf(data_start) + data_start.length)); }; reader.onerror = () => reject(reader.error); }); @@ -191,6 +253,60 @@ export function createWebXdc( console.log(element); return promise; }, + + joinRealtimeChannel: () => { + if (!transport.hasMessageListener()) { + // we can only have one message listener with the current implementation, + // so we need to set it here to receive realtime data. When `setUpdateListener` + // is called, the callback is overwritten but the new value also looks for + // realtime data. + transport.onMessage((message) => { + if (isRealtimeMessage(message)) { + if (realtime === null) { + return; + } + realtime!.receive( + new Uint8Array(Object.values(message.data as any)), + ); + } + }); + } + let should_create = false; + realtime = new RealtimeListener( + () => {}, + () => { + should_create = true; + }, + () => { + should_create = false; + realtime = null; + }, + ); + transport.onConnect(() => { + if (!realtime) { + return; + } + + if (should_create) { + transport.send({ type: "setRealtimeListener" }); + } + + realtime.sendHook = (data) => { + transport.send({ type: "sendRealtime", data }); + log("send realtime", { data }); + }; + realtime.setListenerHook = () => { + transport.send({ type: "setRealtimeListener" }); + }; + }); + return realtime; + }, + getAllUpdates: () => { + console.log("[Webxdc] WARNING: getAllUpdates() is deprecated."); + return Promise.resolve([]); + }, + sendUpdateInterval: 1000, + sendUpdateMaxSize: 999999, selfAddr: transport.address(), selfName: transport.name(), }; @@ -201,6 +317,10 @@ function isUpdatesMessage(data: Message): data is UpdatesMessage { return data.type === "updates"; } +function isRealtimeMessage(data: Message): data is RealtimeMessage { + return data.type === "realtime"; +} + function isClearMessage(data: Message): data is ClearMessage { return data.type === "clear"; } diff --git a/sim/webxdc.test.ts b/sim/webxdc.test.ts index 8847cd9..107b52a 100644 --- a/sim/webxdc.test.ts +++ b/sim/webxdc.test.ts @@ -29,6 +29,18 @@ class FakeTransport implements Transport { if (data.type === "sendUpdate") { const { update } = data; this.client.sendUpdate(update, ""); + } else if (data.type === "sendRealtime") { + this.client.sendRealtimeData(data.data); + } else if (data.type === "setRealtimeListener") { + this.client.connectRealtime((data) => { + if (this.messageCallback != null) { + this.messageCallback({ + type: "realtime", + data, + }); + } + return true; + }); } else if (data.type === "setUpdateListener") { this.client.connect( (updates) => { diff --git a/sim/webxdc.ts b/sim/webxdc.ts index 7d0130b..a8d8b02 100644 --- a/sim/webxdc.ts +++ b/sim/webxdc.ts @@ -20,7 +20,7 @@ export class DevServerTransport implements Transport { constructor(url: string) { this.socket = new WebSocket(url); - this.promise = new Promise((resolve, reject) => { + this.promise = new Promise((resolve) => { this.resolveInfo = resolve; }); } @@ -41,6 +41,10 @@ export class DevServerTransport implements Transport { this.socket.addEventListener("message", listener); } + hasMessageListener() { + return this.messageListener !== null; + } + onConnect(callback: TransportConnectCallback): void { const readyState = this.socket.readyState; if (readyState === 0) { @@ -71,7 +75,7 @@ export class DevServerTransport implements Transport { return new Promise((resolve, reject) => { const name = result?.name; console.log(`Deleting indexedDB database: ${name}`); - const request = window.indexedDB.deleteDatabase(name); + const request = window.indexedDB.deleteDatabase(name!); request.onsuccess = (ev) => resolve(ev); request.onerror = (ev) => reject(ev); }); diff --git a/types/message.ts b/types/message.ts index 123c228..06f53d3 100644 --- a/types/message.ts +++ b/types/message.ts @@ -17,4 +17,7 @@ export type UpdateMessage = export type Message = | UpdateMessage | ({ type: "clear" } & InstanceMessage) - | ({ type: "connect" } & InstanceMessage); + | ({ type: "connect" } & InstanceMessage) + | ({ type: "connect-realtime" } & InstanceMessage) + | ({ type: "realtime-sent"; data: Uint8Array } & InstanceMessage) + | ({ type: "realtime-received"; data: Uint8Array } & InstanceMessage);