diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 39acd3418e5..3f97ba669cf 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -158,6 +158,9 @@ dependencies: '@rush-temp/cloud-branding': specifier: file:./projects/cloud-branding.tgz version: file:projects/cloud-branding.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4) + '@rush-temp/cloud-collaborator': + specifier: file:./projects/cloud-collaborator.tgz + version: file:projects/cloud-collaborator.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4) '@rush-temp/cloud-datalake': specifier: file:./projects/cloud-datalake.tgz version: file:projects/cloud-datalake.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4) @@ -23205,6 +23208,48 @@ packages: - utf-8-validate dev: false + file:projects/cloud-collaborator.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4): + resolution: {integrity: sha512-fwUF4g20XILvd0nYY4zeEr/p5DhEJ8oh3Ssjd1YFAc0jdlqbhjbmo0OWrKhYDkxvSOwnt0VjFPvqisJ/jH0iQQ==, tarball: file:projects/cloud-collaborator.tgz} + id: file:projects/cloud-collaborator.tgz + name: '@rush-temp/cloud-collaborator' + version: 0.0.0 + dependencies: + '@cloudflare/workers-types': 4.20241022.0 + '@types/jest': 29.5.12 + '@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.6.2) + '@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.6.2) + eslint: 8.56.0 + eslint-config-standard-with-typescript: 40.0.0(@typescript-eslint/eslint-plugin@6.21.0)(eslint-plugin-import@2.29.1)(eslint-plugin-n@15.7.0)(eslint-plugin-promise@6.1.1)(eslint@8.56.0)(typescript@5.6.2) + eslint-plugin-import: 2.29.1(eslint@8.56.0) + eslint-plugin-n: 15.7.0(eslint@8.56.0) + eslint-plugin-promise: 6.1.1(eslint@8.56.0) + itty-router: 5.0.18 + jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2) + lib0: 0.2.89 + prettier: 3.2.5 + ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.6.2) + typescript: 5.6.2 + wrangler: 3.97.0(@cloudflare/workers-types@4.20241022.0)(bufferutil@4.0.8)(utf-8-validate@6.0.4) + y-prosemirror: 1.2.12(y-protocols@1.0.6)(yjs@13.6.19) + y-protocols: 1.0.6(yjs@13.6.19) + yjs: 13.6.19 + transitivePeerDependencies: + - '@babel/core' + - '@jest/types' + - '@types/node' + - babel-jest + - babel-plugin-macros + - bufferutil + - esbuild + - node-notifier + - prosemirror-model + - prosemirror-state + - prosemirror-view + - supports-color + - ts-node + - utf-8-validate + dev: false + file:projects/cloud-datalake.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4): resolution: {integrity: sha512-pOjup2gTfDH4Qq9r4LTeVnf+nDQvlTSUOxLtT+bKnrHM0jQQ705v6wYtmkpWD7CtEwwCvS9CH+iPZwB5ZNpzqQ==, tarball: file:projects/cloud-datalake.tgz} id: file:projects/cloud-datalake.tgz diff --git a/plugins/text-editor-resources/src/provider/cloud.ts b/plugins/text-editor-resources/src/provider/cloud.ts index 76136f15908..9aa70b8df6e 100644 --- a/plugins/text-editor-resources/src/provider/cloud.ts +++ b/plugins/text-editor-resources/src/provider/cloud.ts @@ -24,14 +24,14 @@ export interface DatalakeCollabProviderParameters { token: string document: YDoc - content: Ref | null + source: Ref | null } export class CloudCollabProvider extends WebsocketProvider implements Provider { readonly loaded: Promise - constructor ({ document, url, name, content }: DatalakeCollabProviderParameters) { - const params = content != null ? { content } : undefined + constructor ({ document, url, name, source, token }: DatalakeCollabProviderParameters) { + const params = { token, source: source ?? '' } super(url, encodeURIComponent(name), document, { params }) diff --git a/plugins/text-editor-resources/src/provider/utils.ts b/plugins/text-editor-resources/src/provider/utils.ts index f075289587b..2fb20d350ab 100644 --- a/plugins/text-editor-resources/src/provider/utils.ts +++ b/plugins/text-editor-resources/src/provider/utils.ts @@ -48,7 +48,7 @@ export function createRemoteProvider (ydoc: Ydoc, doc: CollaborativeDoc, content url: collaboratorUrl, name: documentId, document: ydoc, - content, + source: content, token }) : new HocuspocusCollabProvider({ diff --git a/rush.json b/rush.json index e36f8d94032..04672db766b 100644 --- a/rush.json +++ b/rush.json @@ -2215,6 +2215,11 @@ "packageName": "@hcengineering/cloud-transactor", "projectFolder": "workers/transactor", "shouldPublish": false + }, + { + "packageName": "@hcengineering/cloud-collaborator", + "projectFolder": "workers/collaborator", + "shouldPublish": false } ] } diff --git a/workers/collaborator/.eslintrc.js b/workers/collaborator/.eslintrc.js new file mode 100644 index 00000000000..ce90fb9646f --- /dev/null +++ b/workers/collaborator/.eslintrc.js @@ -0,0 +1,7 @@ +module.exports = { + extends: ['./node_modules/@hcengineering/platform-rig/profiles/node/eslint.config.json'], + parserOptions: { + tsconfigRootDir: __dirname, + project: './tsconfig.json' + } +} diff --git a/workers/collaborator/jest.config.js b/workers/collaborator/jest.config.js new file mode 100644 index 00000000000..2cfd408b679 --- /dev/null +++ b/workers/collaborator/jest.config.js @@ -0,0 +1,7 @@ +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', + testMatch: ['**/?(*.)+(spec|test).[jt]s?(x)'], + roots: ["./src"], + coverageReporters: ["text-summary", "html"] +} diff --git a/workers/collaborator/package.json b/workers/collaborator/package.json new file mode 100644 index 00000000000..2f8a7802401 --- /dev/null +++ b/workers/collaborator/package.json @@ -0,0 +1,46 @@ +{ + "name": "@hcengineering/cloud-collaborator", + "version": "0.6.0", + "main": "lib/index.js", + "types": "types/index.d.ts", + "template": "@hcengineering/cloud-package", + "scripts": { + "deploy": "wrangler deploy", + "dev": "wrangler dev --port 4021", + "start": "wrangler dev --port 4021", + "cf-typegen": "wrangler types", + "build": "compile", + "build:watch": "compile", + "test": "jest --passWithNoTests --silent --forceExit", + "format": "format src", + "_phase:build": "compile transpile src", + "_phase:test": "jest --passWithNoTests --silent --forceExit", + "_phase:format": "format src", + "_phase:validate": "compile validate" + }, + "devDependencies": { + "@hcengineering/platform-rig": "^0.6.0", + "@cloudflare/workers-types": "^4.20241022.0", + "typescript": "^5.3.3", + "wrangler": "^3.97.0", + "jest": "^29.7.0", + "prettier": "^3.1.0", + "ts-jest": "^29.1.1", + "@typescript-eslint/eslint-plugin": "^6.11.0", + "@typescript-eslint/parser": "^6.11.0", + "eslint-config-standard-with-typescript": "^40.0.0", + "eslint-plugin-import": "^2.26.0", + "eslint-plugin-n": "^15.4.0", + "eslint-plugin-promise": "^6.1.1", + "eslint": "^8.54.0", + "@types/jest": "^29.5.5", + "@hcengineering/cloud-datalake": "^0.6.0" + }, + "dependencies": { + "itty-router": "^5.0.18", + "lib0": "^0.2.88", + "yjs": "^13.6.19", + "y-protocols": "^1.0.6", + "y-prosemirror": "^1.2.12" + } +} diff --git a/workers/collaborator/src/collaborator.ts b/workers/collaborator/src/collaborator.ts new file mode 100644 index 00000000000..f51e4971e5c --- /dev/null +++ b/workers/collaborator/src/collaborator.ts @@ -0,0 +1,414 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { DurableObject } from 'cloudflare:workers' +import { type RouterType, type IRequest, Router, error } from 'itty-router' +import { applyUpdate, encodeStateAsUpdate } from 'yjs' + +import { Document } from './document' +import { type Env } from './env' +import { ConsoleLogger, type MetricsContext, withMetrics } from './metrics' +import type { + AwarenessUpdate, + RpcCreateContentRequest, + RpcGetContentRequest, + RpcRequest, + RpcUpdateContentRequest +} from './types' +import { decodeDocumentId, extractStrParam, jsonBlobId, ydocBlobId } from './utils' +import { jsonToYDoc, yDocToJSON } from './ydoc' + +export const PREFERRED_SAVE_SIZE = 500 +export const PREFERRED_SAVE_INTERVAL = 30 * 1000 + +/** + * YDoc state is stored as a series of updates in the Durable Object KV storage. + * + * Durable Object KV + * - documentId: string, document Id as provided by the client + * - versionId: string, latest document version Id + * - version-*: string, maps version Id to datalake blob Id + * - updates: Uint8Array[]: list of pending updates to be saved + */ +export class Collaborator extends DurableObject { + private readonly logger = new ConsoleLogger() + private readonly router: RouterType + private readonly doc: Document + private readonly updates: Uint8Array[] + private documentId: string = '' + private source: string = '' + private hydrated: boolean = false + + constructor (ctx: DurableObjectState, env: Env) { + super(ctx, env) + + this.doc = new Document(this.logger) + this.updates = [] + + this.router = Router() + .get('/:id', async (request) => { + return await withMetrics('connnect', (ctx) => { + return this.handleConnect(ctx, request) + }) + }) + .post('/rpc/:id', async (request, env) => { + return await withMetrics('rpc', (ctx) => { + return this.handleRpc(ctx, request) + }) + }) + } + + async fetch (request: Request): Promise { + return await this.router.fetch(request).catch(error) + } + + async handleConnect (ctx: MetricsContext, request: IRequest): Promise { + const documentId = decodeURIComponent(request.params.id) + const source = decodeURIComponent(extractStrParam(request.query.source) ?? '') + const headers = request.headers + + if (headers.get('Upgrade') !== 'websocket') { + return new Response('Expected header Upgrade: websocket', { status: 426 }) + } + + const { 0: client, 1: server } = new WebSocketPair() + this.ctx.acceptWebSocket(server) + + await ctx.with('session', async (ctx) => { + await this.handleSession(ctx, server, documentId, source) + }) + + return new Response(null, { status: 101, webSocket: client }) + } + + async handleRpc (ctx: MetricsContext, request: IRequest): Promise { + const documentId = decodeURIComponent(request.params.id) + const rpc = await request.json() + + return await ctx.with(rpc.method, async (ctx) => { + try { + switch (rpc.method) { + case 'getContent': + return this.handleRpcGetContent(ctx, documentId, rpc) + case 'createContent': + return await this.handleRpcCreateContent(ctx, documentId, rpc) + case 'updateContent': + return this.handleRpcUpdateContent(ctx, documentId, rpc) + default: + return Response.json({ error: 'Bad request' }, { status: 400 }) + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + ctx.error('failed to perform rpc request', { error: message }) + return error(500) + } + }) + } + + handleRpcGetContent (ctx: MetricsContext, id: string, request: RpcGetContentRequest): Response { + const content: Record = {} + + for (const field of this.doc.share.keys()) { + content[field] = JSON.stringify(yDocToJSON(this.doc, field)) + } + + return Response.json({ content }, { status: 200 }) + } + + async handleRpcCreateContent (ctx: MetricsContext, id: string, request: RpcCreateContentRequest): Promise { + const documentId = decodeDocumentId(id) + const content: Record = {} + + this.doc.transact(() => { + Object.entries(request.payload.content).forEach(([field, value]) => { + if (value !== undefined && value !== null && value !== '') { + try { + const json = JSON.parse(String(value)) + jsonToYDoc(json, this.doc, field) + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('Failed to process JSON', { error }) + } + } + }) + }) + + for (const [field, value] of Object.entries(request.payload.content)) { + const blobId = jsonBlobId(documentId) + await ctx.with('datalake.putBlob', async () => { + await this.env.DATALAKE.putBlob(documentId.workspaceId, blobId, value, 'application/json') + }) + content[field] = blobId + } + + return Response.json({ content }, { status: 200 }) + } + + handleRpcUpdateContent (ctx: MetricsContext, id: string, request: RpcUpdateContentRequest): Response { + this.doc.transact(() => { + Object.entries(request.payload.content).forEach(([field, value]) => { + try { + const json = JSON.parse(String(value)) + jsonToYDoc(json, this.doc, field) + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('Failed to process JSON', { error }) + } + }) + }) + + return Response.json({}, { status: 200 }) + } + + async webSocketMessage (ws: WebSocket, message: ArrayBuffer | string): Promise { + if (typeof message === 'string') { + this.logger.warn('unexpected message type', { message }) + return + } + + this.doc.handleMessage(new Uint8Array(message), ws) + } + + async webSocketClose (ws: WebSocket, code: number, reason: string, wasClean: boolean): Promise { + this.logger.log('WebSocket closed', { code, reason, wasClean }) + await this.handleClose(ws, 1000) + } + + async webSocketError (ws: WebSocket, error: unknown): Promise { + this.logger.error('WebSocket error', { error }) + await this.handleClose(ws, 1011, 'error') + } + + async alarm (): Promise { + await this.hydrate() + await this.writeDocument() + } + + async hydrate (): Promise { + if (this.hydrated) { + return + } + + await this.ctx.blockConcurrencyWhile(async () => { + const documentId = (await this.ctx.storage.get('documentId')) ?? '' + const source = (await this.ctx.storage.get('source')) ?? '' + + if (documentId === '') { + // name is not set, hydrate later + return + } + + this.documentId = documentId + this.source = source + + await withMetrics('hydrate', async (ctx) => { + await ctx.with('readDocument', async (ctx) => { + await this.readDocument(ctx) + }) + + const connections = this.ctx.getWebSockets() + connections.forEach((ws: WebSocket) => { + this.doc.addConnection(ws) + }) + + // enable update listeners only after the document is restored + // eslint-disable-next-line @typescript-eslint/no-misused-promises + this.doc.on('update', this.handleDocUpdate.bind(this)) + this.doc.awareness.on('update', this.handleAwarenessUpdate.bind(this)) + }) + + this.hydrated = true + }) + } + + async handleSession (ctx: MetricsContext, ws: WebSocket, documentId: string, source: string): Promise { + if (this.documentId !== documentId) { + this.documentId = documentId + await this.ctx.storage.put('documentId', documentId) + } + + if (this.source !== source) { + this.source = source + await this.ctx.storage.put('source', source) + } + + await ctx.with('hydrate', async () => { + await this.hydrate() + }) + + this.doc.addConnection(ws) + } + + async handleAwarenessUpdate ({ added, updated, removed }: AwarenessUpdate, origin: any): Promise { + // persist awareness state + const state = this.doc.awareness.getLocalState() + await this.ctx.storage.put('awareness', state) + } + + async handleDocUpdate (update: Uint8Array, origin: any): Promise { + // save update + this.updates.push(update) + await this.ctx.storage.put('updates', [...this.updates]) + + await this.ctx.storage.setAlarm(Date.now() + PREFERRED_SAVE_INTERVAL) + if (this.updates.length > PREFERRED_SAVE_SIZE) { + void this.writeDocument() + } + } + + async handleClose (ws: WebSocket, code: number, reason?: string): Promise { + const clients = this.ctx.getWebSockets().length + + try { + ws.close(code, reason) + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + this.logger.error('failed to close WebSocket', { error }) + } + + this.doc.removeConnection(ws) + + // last client disconnected, write document + if (clients === 1) { + await this.writeDocument() + } + } + + async readDocument (ctx: MetricsContext): Promise { + // restore document state from storage or datalake + const source = this.source + const documentId = decodeDocumentId(this.documentId) + const workspaceId = documentId.workspaceId + + const blobId = ydocBlobId(documentId) + + let loaded = false + + try { + ctx.log('loading from datalake', { workspaceId, documentId, blobId }) + + await ctx.with('fromYdoc', async (ctx) => { + const buffer = await ctx.with('datalake.getBlob', () => { + return this.env.DATALAKE.getBlob(workspaceId, blobId) + }) + + if (buffer !== undefined) { + applyUpdate(this.doc, new Uint8Array(buffer)) + + loaded = true + ctx.log('loaded from datalake', { workspaceId, documentId, blobId }) + } + }) + } catch (err) { + // the blob might be missing, ignore errors + const error = err instanceof Error ? err.message : String(err) + ctx.error('loading from datalake error', { workspaceId, documentId, blobId, error }) + } + + if (!loaded && source !== '') { + try { + ctx.log('loading from datalake', { workspaceId, documentId, source }) + + await ctx.with('fromJson', async (ctx) => { + const buffer = await ctx.with('datalake.getBlob', () => { + return this.env.DATALAKE.getBlob(workspaceId, source) + }) + + if (buffer !== undefined) { + const json = JSON.parse(String(buffer)) + jsonToYDoc(json, this.doc, documentId.objectAttr) + + loaded = true + ctx.log('loaded from datalake', { workspaceId, documentId, blobId }) + } + }) + } catch (err) { + // the blob might be missing, ignore errors + const error = err instanceof Error ? err.message : String(err) + ctx.error('loading from datalake error', { workspaceId, documentId, source, error }) + } + } + + // restore cached updates + await ctx.with('restore updates', async () => { + try { + const updates = await this.ctx.storage.get>('updates') + if (updates !== undefined && updates.length > 0) { + this.doc.transact(() => { + updates.forEach((update) => { + applyUpdate(this.doc, update) + this.updates.push(update) + }) + }) + } + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('failed to restore updates', { workspaceId, documentId, error }) + } + }) + + // restore awareness state + await ctx.with('restore awareness', async () => { + try { + const awareness = await this.ctx.storage.get>('awareness') + if (awareness !== undefined) { + this.doc.awareness.setLocalState(awareness) + } + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('failed to restore awareness', { workspaceId, documentId, error }) + } + }) + } + + async writeDocument (): Promise { + await this.ctx.storage.deleteAlarm() + + const updates = this.updates.splice(0) + if (updates.length === 0) { + return + } + + await withMetrics('write document', async (ctx) => { + try { + const documentId = decodeDocumentId(this.documentId) + const workspaceId = documentId.workspaceId + + // save ydoc content + const update = encodeStateAsUpdate(this.doc) + await ctx.with('datalake.putBlob', async () => { + const blobId = ydocBlobId(documentId) + await this.env.DATALAKE.putBlob(workspaceId, blobId, new Uint8Array(update), 'application/ydoc') + ctx.log('saved ydoc content to datalake', { documentId, blobId }) + }) + + void this.ctx.storage.put('updates', []) + + // save json snapshot + const blobId = jsonBlobId(documentId) + const markup = JSON.stringify(yDocToJSON(this.doc, documentId.objectAttr)) + await ctx.with('datalake.putBlob', async () => { + await this.env.DATALAKE.putBlob(workspaceId, blobId, markup, 'application/json') + ctx.log('saved json content to datalake', { documentId, blobId }) + }) + } catch (err) { + // save failed, restore updates + const error = err instanceof Error ? err.message : String(err) + ctx.error('failed to save document', { documentId: this.documentId, error }) + this.updates.unshift(...updates) + } + }) + } +} diff --git a/workers/collaborator/src/document.ts b/workers/collaborator/src/document.ts new file mode 100644 index 00000000000..75bf1a4e449 --- /dev/null +++ b/workers/collaborator/src/document.ts @@ -0,0 +1,137 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { Doc as YDoc } from 'yjs' +import { Awareness, removeAwarenessStates } from 'y-protocols/awareness' +import * as encoding from 'lib0/encoding' +import * as protocol from './protocol' +import { type Logger } from './metrics' +import { type AwarenessUpdate } from './types' + +export class Document extends YDoc { + awareness: Awareness + connections: Map> + + constructor (readonly logger: Logger) { + super({ gc: false }) + + this.connections = new Map() + + this.awareness = new Awareness(this) + this.awareness.setLocalState(null) + + this.on('update', this.handleUpdate.bind(this)) + this.awareness.on('update', this.handleAwarenessUpdate.bind(this)) + } + + addConnection (ws: WebSocket): void { + const state = ws.deserializeAttachment() ?? new Set() + this.connections.set(ws, state) + + const awareness = this.awareness + + // Force sync document state + const encoder = protocol.forceSyncMessage(this) + ws.send(encoding.toUint8Array(encoder)) + + // Force sync awareness state + if (awareness.states.size > 0) { + const clients = Array.from(awareness.states.keys()) + if (clients.length > 0) { + const encoder = protocol.awarenessMessage(this, clients) + ws.send(encoding.toUint8Array(encoder)) + } + } + } + + removeConnection (ws: WebSocket): void { + closeConnection(this, ws) + } + + handleMessage (message: Uint8Array, origin: WebSocket): void { + try { + const encoder = protocol.handleMessage(this, new Uint8Array(message), origin) + if (encoding.length(encoder) > 1) { + origin.send(encoding.toUint8Array(encoder)) + } + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + this.logger.error('WebSocket message error', { error }) + } + } + + private handleUpdate (update: Uint8Array, origin: any): void { + const encoder = protocol.updateMessage(update, origin) + broadcast(this, encoding.toUint8Array(encoder), [origin]) + } + + private handleAwarenessUpdate ({ added, updated, removed }: AwarenessUpdate, origin: any): void { + const changed = [...added, ...updated, ...removed] + const encoder = protocol.awarenessMessage(this, changed) + broadcast(this, encoding.toUint8Array(encoder)) + + if (origin == null || !(origin instanceof WebSocket)) return + + if (added.length > 0 || removed.length > 0) { + const connIDs = this.connections.get(origin) + if (connIDs !== undefined) { + added.forEach((client) => connIDs.add(client)) + removed.forEach((client) => connIDs.delete(client)) + + origin.serializeAttachment(connIDs) + } + } + } +} + +function closeConnection (doc: Document, ws: WebSocket): void { + if (doc.connections.has(ws)) { + const connIDs = doc.connections.get(ws) + doc.connections.delete(ws) + + if (connIDs !== undefined && connIDs.size > 0) { + removeAwarenessStates(doc.awareness, Array.from(connIDs), null) + } + } + + try { + ws.close() + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + doc.logger.error('failed to close WebSocket', { error }) + } +} + +function broadcast (doc: Document, message: Uint8Array, exclude: any[] = []): void { + doc.connections.forEach((_, ws) => { + if (!exclude.includes(ws)) { + send(doc, ws, message) + } + }) +} + +function send (doc: Document, ws: WebSocket, message: Uint8Array): void { + if (ws.readyState !== undefined && ws.readyState !== WebSocket.CONNECTING && ws.readyState !== WebSocket.OPEN) { + closeConnection(doc, ws) + } + + try { + ws.send(message) + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + doc.logger.error('failed to send message', { error }) + closeConnection(doc, ws) + } +} diff --git a/workers/collaborator/src/env.ts b/workers/collaborator/src/env.ts new file mode 100644 index 00000000000..4d81e0f7ed5 --- /dev/null +++ b/workers/collaborator/src/env.ts @@ -0,0 +1,22 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import type DatalakeWorker from '@hcengineering/cloud-datalake' +import { type Collaborator } from './collaborator' + +export interface Env { + COLLABORATOR: DurableObjectNamespace + DATALAKE: Service +} diff --git a/workers/collaborator/src/index.ts b/workers/collaborator/src/index.ts new file mode 100644 index 00000000000..c4484dad83b --- /dev/null +++ b/workers/collaborator/src/index.ts @@ -0,0 +1,82 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type IRequest, type IRequestStrict, type RequestHandler, Router, cors, error, html } from 'itty-router' +import { type Env } from './env' + +export { Collaborator } from './collaborator' + +const { preflight, corsify } = cors({ maxAge: 86400 }) + +const withToken: RequestHandler = (request: IRequest) => { + let token = request.query.token + + if (token === undefined || token === '') { + const authorization = request.headers.get('Authorization') + if (authorization != null && authorization.startsWith('Bearer ')) { + token = authorization.substring(7) + } + } + + if (token === undefined || token === '') { + return error(401, 'Unauthorized') + } + + request.token = token +} + +const router = Router() + .options('*', preflight) + .get('/:id', withToken, async (request, env) => { + const { headers } = request + const documentId = decodeURIComponent(request.params.id) + + if (headers.get('Upgrade') !== 'websocket') { + return new Response('Expected header Upgrade: websocket', { status: 426 }) + } + + const id = env.COLLABORATOR.idFromName(documentId) + const stub = env.COLLABORATOR.get(id) + + return stub.fetch(request) + }) + .post('/rpc/:id', withToken, async (request, env) => { + const documentId = decodeURIComponent(request.params.id) + + const id = env.COLLABORATOR.idFromName(documentId) + const stub = env.COLLABORATOR.get(id) + + return stub.fetch(request) + }) + .all('/', () => + html( + `Huly® Collaborator™ https://huly.io + © 2024 Huly Labs` + ) + ) + .all('*', () => new Response('Not found', { status: 404 })) + +export default { + async fetch (request: Request, env: Env): Promise { + return await router + .fetch(request, env) + .catch(error) + .then((response) => { + // workaround for "Can't modify immutable headers" error + // see https://github.com/kwhitley/itty-router/issues/242 + return corsify(new Response(response.body, response)) + }) + } +} satisfies ExportedHandler diff --git a/workers/collaborator/src/metrics.ts b/workers/collaborator/src/metrics.ts index 5c800b879b8..64ddf17ec44 100644 --- a/workers/collaborator/src/metrics.ts +++ b/workers/collaborator/src/metrics.ts @@ -13,60 +13,97 @@ // limitations under the License. // -import { type Env } from './env' +export interface Logger { + log: (message: string, data?: Record) => void + warn: (message: string, data?: Record) => void + error: (message: string, data?: Record) => void + debug: (message: string, data?: Record) => void +} + +export class ConsoleLogger implements Logger { + log (message: string, data?: Record): void { + console.log({ message, ...data }) + } + + warn (message: string, data?: Record): void { + console.warn({ message, ...data }) + } + + error (message: string, data?: Record): void { + console.error({ message, ...data }) + } + + debug (message: string, data?: Record): void { + console.debug({ message, ...data }) + } +} -export async function withMetrics (name: string, fn: (ctx: MetricsContext) => Promise): Promise { - const ctx = new MetricsContext() +export async function withMetrics ( + name: string, + fn: (ctx: MetricsContext) => Promise, + logger?: Logger +): Promise { + logger ??= new ConsoleLogger() + const ctx = new MetricsContext(logger) const start = performance.now() try { return await fn(ctx) + } catch (err: any) { + logger.error(err instanceof Error ? err.message : String(err)) + throw err } finally { const total = performance.now() - start const ops = ctx.metrics const message = `${name} total=${total} ` + ctx.toString() - console.log({ message, total, ops }) + logger.log(message, { total, ops }) } } -export interface MetricsData { +interface MetricsData { op: string time: number + error?: string + children?: MetricsData[] } export class MetricsContext { - metrics: Array = [] + readonly metrics: Array = [] + + constructor (private readonly logger: Logger) {} - debug (...data: any[]): void { - console.debug(...data) + debug (message: string, data?: Record): void { + this.logger.debug(message, data) } - log (...data: any[]): void { - console.log(...data) + log (message: string, data?: Record): void { + this.logger.log(message, data) } - error (...data: any[]): void { - console.error(...data) + warn (message: string, data?: Record): void { + this.logger.warn(message, data) } - async with(op: string, fn: () => Promise): Promise { - const start = performance.now() - try { - return await fn() - } finally { - const time = performance.now() - start - this.metrics.push({ op, time }) - } + error (message: string, data?: Record): void { + this.logger.error(message, data) } - withSync(op: string, fn: () => T): T { + async with(op: string, fn: (ctx: MetricsContext) => Promise): Promise { + const ctx = new MetricsContext(this.logger) const start = performance.now() + + let error: string | undefined + try { - return fn() + return await fn(ctx) + } catch (err: any) { + error = err instanceof Error ? err.message : String(err) + throw err } finally { const time = performance.now() - start - this.metrics.push({ op, time }) + const children = ctx.metrics + this.metrics.push(error !== undefined ? { op, time, error, children } : { op, time, children }) } } @@ -74,22 +111,3 @@ export class MetricsContext { return this.metrics.map((p) => `${p.op}=${p.time}`).join(' ') } } - -export class LoggedDatalake { - constructor ( - private readonly datalake: Env['DATALAKE'], - private readonly ctx: MetricsContext - ) {} - - async getBlob (workspace: string, name: string): Promise { - return await this.ctx.with('datalake.getBlob', () => { - return this.datalake.getBlob(workspace, name) - }) - } - - async putBlob (workspace: string, name: string, data: ArrayBuffer | Blob | string, type: string): Promise { - await this.ctx.with('datalake.putBlob', () => { - return this.datalake.putBlob(workspace, name, data, type) - }) - } -} diff --git a/workers/collaborator/src/protocol.ts b/workers/collaborator/src/protocol.ts new file mode 100644 index 00000000000..91ab33c3539 --- /dev/null +++ b/workers/collaborator/src/protocol.ts @@ -0,0 +1,73 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import * as decoding from 'lib0/decoding' +import * as encoding from 'lib0/encoding' +import * as awarenessProtocol from 'y-protocols/awareness' +import * as syncProtocol from 'y-protocols/sync' +import { type Document } from './document' + +export enum MessageType { + Sync = 0, + Awareness = 1 +} + +export function forceSyncMessage (doc: Document): encoding.Encoder { + const encoder = encoding.createEncoder() + + encoding.writeVarUint(encoder, MessageType.Sync) + syncProtocol.writeSyncStep1(encoder, doc) + + return encoder +} + +export function updateMessage (update: Uint8Array, origin: any): encoding.Encoder { + const encoder = encoding.createEncoder() + + encoding.writeVarUint(encoder, MessageType.Sync) + syncProtocol.writeUpdate(encoder, update) + + return encoder +} + +export function awarenessMessage (doc: Document, clients: Array): encoding.Encoder { + const encoder = encoding.createEncoder() + + encoding.writeVarUint(encoder, MessageType.Awareness) + encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(doc.awareness, clients)) + + return encoder +} + +export function handleMessage (doc: Document, message: Uint8Array, origin: any): encoding.Encoder { + const encoder = encoding.createEncoder() + const decoder = decoding.createDecoder(message) + const messageType = decoding.readVarUint(decoder) + + switch (messageType) { + case MessageType.Sync: + encoding.writeVarUint(encoder, MessageType.Sync) + syncProtocol.readSyncMessage(decoder, encoder, doc, origin) + break + + case MessageType.Awareness: + awarenessProtocol.applyAwarenessUpdate(doc.awareness, decoding.readVarUint8Array(decoder), origin) + break + + default: + throw new Error('Unknown message type') + } + return encoder +} diff --git a/workers/collaborator/src/types.ts b/workers/collaborator/src/types.ts new file mode 100644 index 00000000000..ac102013f5a --- /dev/null +++ b/workers/collaborator/src/types.ts @@ -0,0 +1,50 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// https://github.com/yjs/y-protocols/blob/master/awareness.js#L134 +export interface AwarenessUpdate { + added: Array + updated: Array + removed: Array +} + +export type RpcRequest = RpcGetContentRequest | RpcCreateContentRequest | RpcUpdateContentRequest + +export interface RpcGetContentRequest { + method: 'getContent' + payload: RpcGetContentPayload +} + +export interface RpcCreateContentRequest { + method: 'createContent' + payload: RpcCreateContentPayload +} + +export interface RpcUpdateContentRequest { + method: 'updateContent' + payload: RpcUpdateContentPayload +} + +export interface RpcGetContentPayload { + source?: string +} + +export interface RpcCreateContentPayload { + content: Record +} + +export interface RpcUpdateContentPayload { + content: Record +} diff --git a/workers/collaborator/src/utils.ts b/workers/collaborator/src/utils.ts new file mode 100644 index 00000000000..a9e5b94b7fb --- /dev/null +++ b/workers/collaborator/src/utils.ts @@ -0,0 +1,43 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +export interface DocumentId { + workspaceId: string + objectClass: string + objectId: string + objectAttr: string +} + +export function decodeDocumentId (name: string): DocumentId { + const [workspaceId, objectClass, objectId, objectAttr] = name.split('|') + if (workspaceId == null || objectClass == null || objectId == null || objectAttr == null) { + throw new Error('Malformed document id') + } + return { workspaceId, objectClass, objectId, objectAttr } +} + +export function ydocBlobId ({ objectId, objectAttr }: DocumentId): string { + // generate ydoc blob id compatible with platform collaborator + return `${objectId}%${objectAttr}` +} + +export function jsonBlobId ({ objectId, objectAttr }: DocumentId): string { + // generate ydoc json id compatible with platform collaborator + return [objectId, objectAttr, Date.now()].join('-') +} + +export function extractStrParam (value: string | string[] | undefined): string | undefined { + return Array.isArray(value) ? value[0] : value +} diff --git a/workers/collaborator/src/ydoc.ts b/workers/collaborator/src/ydoc.ts new file mode 100644 index 00000000000..c9c210fe8cb --- /dev/null +++ b/workers/collaborator/src/ydoc.ts @@ -0,0 +1,89 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type Doc as YDoc, XmlElement as YXmlElement, XmlText as YXmlText } from 'yjs' +import { yDocToProsemirrorJSON } from 'y-prosemirror' + +/** ProseMirror Mark JSON representation */ +export interface JMark { + type: string + attrs?: Record +} + +/** ProseMirror Node JSON representation */ +interface JNode { + type: string + content?: JNode[] + marks?: JMark[] + attrs?: Record + text?: string +} + +/** + * Converts YDoc to ProseMirror JSON object + * @param ydoc YDoc + * @param field YDoc field name + * */ +export function yDocToJSON (ydoc: YDoc, field: string): Record { + return yDocToProsemirrorJSON(ydoc, field) +} + +/** + * Converts ProseMirror JSON object to YDoc without ProseMirror schema + * @param json ProseMirror JSON object + * @param field YDoc field name + * */ +export function jsonToYDoc (json: Record, ydoc: YDoc, field: string): void { + const nodes = json.type === 'doc' ? json.content ?? [] : [json] + const content = nodes.map(nodeToYXmlElement) + + const fragment = ydoc.getXmlFragment(field) + fragment.delete(0, fragment.length) + fragment.push(content) +} + +/** Convert ProseMirror JSON Node representation to YXmlElement */ +function nodeToYXmlElement (node: JNode): YXmlElement | YXmlText { + const elem = node.type === 'text' ? new YXmlText() : new YXmlElement(node.type) + + if (elem instanceof YXmlElement) { + if (node.content !== undefined && node.content.length > 0) { + const content = node.content.map(nodeToYXmlElement) + elem.push(content) + } + } else { + // https://github.com/yjs/y-prosemirror/blob/master/src/plugins/sync-plugin.js#L777 + const attributes: Record = {} + if (node.marks !== undefined) { + node.marks.forEach((mark) => { + attributes[mark.type] = mark.attrs ?? {} + }) + } + elem.applyDelta([ + { + insert: node.text ?? '', + attributes + } + ]) + } + + if (node.attrs !== undefined) { + Object.entries(node.attrs).forEach(([key, value]) => { + elem.setAttribute(key, value) + }) + } + + return elem +} diff --git a/workers/collaborator/tsconfig copy.json b/workers/collaborator/tsconfig copy.json new file mode 100644 index 00000000000..da8672e6cbc --- /dev/null +++ b/workers/collaborator/tsconfig copy.json @@ -0,0 +1,12 @@ +{ + "extends": "./node_modules/@hcengineering/platform-rig/profiles/default/tsconfig.json", + + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib", + "declarationDir": "./types", + "tsBuildInfoFile": ".build/build.tsbuildinfo", + "types": ["@cloudflare/workers-types", "jest"], + "lib": ["esnext"] + } +} \ No newline at end of file diff --git a/workers/collaborator/tsconfig.json b/workers/collaborator/tsconfig.json new file mode 100644 index 00000000000..da8672e6cbc --- /dev/null +++ b/workers/collaborator/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "./node_modules/@hcengineering/platform-rig/profiles/default/tsconfig.json", + + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib", + "declarationDir": "./types", + "tsBuildInfoFile": ".build/build.tsbuildinfo", + "types": ["@cloudflare/workers-types", "jest"], + "lib": ["esnext"] + } +} \ No newline at end of file diff --git a/workers/collaborator/worker-configuration.d.ts b/workers/collaborator/worker-configuration.d.ts new file mode 100644 index 00000000000..b004d064f6a --- /dev/null +++ b/workers/collaborator/worker-configuration.d.ts @@ -0,0 +1,6 @@ +// Generated by Wrangler by running `wrangler types` + +interface Env { + COLLABORATOR: DurableObjectNamespace; + DATALAKE: Fetcher; +} diff --git a/workers/collaborator/wrangler.toml b/workers/collaborator/wrangler.toml new file mode 100644 index 00000000000..1624911561a --- /dev/null +++ b/workers/collaborator/wrangler.toml @@ -0,0 +1,60 @@ +#:schema node_modules/wrangler/config-schema.json +name = "collaborator-worker" +main = "src/index.ts" +compatibility_date = "2024-11-11" +compatibility_flags = ["nodejs_compat"] +keep_vars = true + +[[services]] +binding = "DATALAKE" +service = "datalake-worker" + +[[durable_objects.bindings]] +name = "COLLABORATOR" +class_name = "Collaborator" + +[[migrations]] +tag = "v1" +new_classes = ["Collaborator"] + +[observability] +enabled = true +head_sampling_rate = 1 + +[env.staging] +name = "collaborator-worker-staging" + +services = [ + { binding = "DATALAKE", service = "datalake-worker-staging" } +] + +[[env.staging.durable_objects.bindings]] +name = "COLLABORATOR" +class_name = "Collaborator" + +[[env.staging.migrations]] +tag = "v1" +new_classes = ["Collaborator"] + +[env.staging.observability] +enabled = true +head_sampling_rate = 1 + +[env.dev] +name = "collaborator-worker-dev" + +services = [ + { binding = "DATALAKE", service = "datalake-worker-dev" } +] + +[[env.dev.durable_objects.bindings]] +name = "COLLABORATOR" +class_name = "Collaborator" + +[[env.dev.migrations]] +tag = "v1" +new_classes = ["Collaborator"] + +[env.dev.observability] +enabled = true +head_sampling_rate = 1 diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts index fd4782cb6e7..fa0751ab991 100644 --- a/workers/datalake/src/blob.ts +++ b/workers/datalake/src/blob.ts @@ -195,7 +195,7 @@ export async function handleUploadFormData ( const { name, type, lastModified } = file try { const metadata = await withPostgres(env, ctx, metrics, (db) => { - return saveBlob(env, db, file.stream(), file.size, type, workspace, name, lastModified) + return saveBlob(env, db, file.stream(), workspace, name, { type, size: file.size, lastModified }) }) // TODO this probably should happen via queue, let it be here for now @@ -220,14 +220,13 @@ export async function saveBlob ( env: Env, db: BlobDB, stream: ReadableStream, - size: number, - type: string, workspace: string, name: string, - lastModified: number + metadata: Omit ): Promise { const { location, bucket } = selectStorage(env, workspace) + const { size, type, lastModified } = metadata const httpMetadata = { contentType: type, cacheControl, lastModified } const filename = getUniqueFilename() diff --git a/workers/datalake/src/db.ts b/workers/datalake/src/db.ts index be5a03f99e9..c38feffe8e9 100644 --- a/workers/datalake/src/db.ts +++ b/workers/datalake/src/db.ts @@ -53,7 +53,8 @@ export async function withPostgres ( fn: (db: BlobDB) => Promise ): Promise { const sql = metrics.withSync('db.connect', () => { - return postgres(env.HYPERDRIVE.connectionString, { + const url = env.DB_URL !== '' && env.DB_URL !== undefined ? env.DB_URL : env.HYPERDRIVE.connectionString + return postgres(url, { connection: { application_name: 'datalake' }, diff --git a/workers/datalake/src/index.ts b/workers/datalake/src/index.ts index 012b547795d..c29eab2ca54 100644 --- a/workers/datalake/src/index.ts +++ b/workers/datalake/src/index.ts @@ -14,7 +14,7 @@ // import { WorkerEntrypoint } from 'cloudflare:workers' -import { type IRequest, type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router' +import { type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router' import { handleBlobDelete, handleBlobGet, handleBlobHead, handleBlobList, handleUploadFormData } from './blob' import { cors } from './cors' @@ -91,7 +91,7 @@ router .all('*', () => error(404)) export default class DatalakeWorker extends WorkerEntrypoint { - async fetch (request: IRequest): Promise { + async fetch (request: Request): Promise { const start = performance.now() const context = new MetricsContext() @@ -116,9 +116,13 @@ export default class DatalakeWorker extends WorkerEntrypoint { } } - async getBlob (workspace: string, name: string): Promise { - const request = new Request(`https://datalake/blob/${workspace}/${name}`) - const response = await router.fetch(request) + async getBlob (workspace: string, name: string): Promise { + const request = new Request(`https://datalake/blob/${workspace}/${encodeURIComponent(name)}`) + const response = await this.fetch(request) + + if (response.status === 404) { + return undefined + } if (!response.ok) { console.error({ error: 'datalake error: ' + response.statusText, workspace, name }) @@ -129,13 +133,13 @@ export default class DatalakeWorker extends WorkerEntrypoint { } async putBlob (workspace: string, name: string, data: ArrayBuffer | Blob | string, type: string): Promise { - const request = new Request(`https://datalake/upload/form-data/${workspace}`) - const body = new FormData() const blob = new Blob([data], { type }) body.set('file', blob, name) - const response = await router.fetch(request, { method: 'POST', body }) + const request = new Request(`https://datalake/upload/form-data/${workspace}`, { method: 'POST', body }) + + const response = await this.fetch(request) if (!response.ok) { console.error({ error: 'datalake error: ' + response.statusText, workspace, name }) diff --git a/workers/datalake/src/s3.ts b/workers/datalake/src/s3.ts index 2f1e1d1f47e..4c59c8773f2 100644 --- a/workers/datalake/src/s3.ts +++ b/workers/datalake/src/s3.ts @@ -64,14 +64,14 @@ export async function handleS3Blob ( return error(400) } - const contentType = object.headers.get('content-type') ?? 'application/octet-stream' + const type = object.headers.get('content-type') ?? 'application/octet-stream' const contentLengthHeader = object.headers.get('content-length') ?? '0' const lastModifiedHeader = object.headers.get('last-modified') - const contentLength = Number.parseInt(contentLengthHeader) + const size = Number.parseInt(contentLengthHeader) const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now() - const result = await saveBlob(env, db, object.body, contentLength, contentType, workspace, name, lastModified) - return json(result) + const metadata = await saveBlob(env, db, object.body, workspace, name, { lastModified, size, type }) + return json(metadata) }) }