diff --git a/.changeset/heavy-pianos-grin.md b/.changeset/heavy-pianos-grin.md new file mode 100644 index 000000000..897f7c639 --- /dev/null +++ b/.changeset/heavy-pianos-grin.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core': patch +'@powersync/service-types': patch +--- + +sdk reporting diff --git a/.changeset/honest-sloths-smoke.md b/.changeset/honest-sloths-smoke.md new file mode 100644 index 000000000..4e50441a1 --- /dev/null +++ b/.changeset/honest-sloths-smoke.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core': patch +'@powersync/service-types': patch +--- + +Added sdk reporting to storage diff --git a/.changeset/smart-mugs-share.md b/.changeset/smart-mugs-share.md new file mode 100644 index 000000000..f8cfda8ea --- /dev/null +++ b/.changeset/smart-mugs-share.md @@ -0,0 +1,13 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-postgres': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-module-mysql': patch +'@powersync/service-module-core': patch +'@powersync/lib-services-framework': patch +'@powersync/service-types': patch +--- + +Reporting mongo storage added to storage engine. diff --git a/.github/workflows/development_packages_release.yaml b/.github/workflows/development_packages_release.yaml index 0081557dc..2ae4fb7a7 100644 --- a/.github/workflows/development_packages_release.yaml +++ b/.github/workflows/development_packages_release.yaml @@ -6,7 +6,7 @@ on: workflow_dispatch jobs: publish_packages: - name: Publish Devevelopment Packages + name: Publish Development Packages runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/libs/lib-services/src/container.ts b/libs/lib-services/src/container.ts index f591f24d9..c7c4102cb 100644 --- a/libs/lib-services/src/container.ts +++ b/libs/lib-services/src/container.ts @@ -4,10 +4,10 @@ import { ErrorReporter } from './alerts/definitions.js'; import { NoOpReporter } from './alerts/no-op-reporter.js'; import { MigrationManager } from './migrations/MigrationManager.js'; import { - ProbeModule, - TerminationHandler, createInMemoryProbe, - createTerminationHandler + createTerminationHandler, + ProbeModule, + TerminationHandler } from './signals/signals-index.js'; export enum ContainerImplementation { @@ -47,7 +47,6 @@ export type Newable = new (...args: never[]) => T; * Identifier used to get and register implementations */ export type ServiceIdentifier = string | symbol | Newable | Abstract | ContainerImplementation; - const DEFAULT_GENERATORS: ContainerImplementationDefaultGenerators = { [ContainerImplementation.REPORTER]: () => NoOpReporter, [ContainerImplementation.PROBES]: () => createInMemoryProbe(), diff --git a/modules/module-mongodb-storage/package.json b/modules/module-mongodb-storage/package.json index e3ef6c1c2..a2a7c3f2b 100644 --- a/modules/module-mongodb-storage/package.json +++ b/modules/module-mongodb-storage/package.json @@ -31,9 +31,9 @@ "@powersync/lib-service-mongodb": "workspace:*", "@powersync/lib-services-framework": "workspace:*", "@powersync/service-core": "workspace:*", + "@powersync/service-types": "workspace:*", "@powersync/service-jsonbig": "workspace:*", "@powersync/service-sync-rules": "workspace:*", - "@powersync/service-types": "workspace:*", "bson": "^6.10.3", "ix": "^5.0.0", "lru-cache": "^10.2.2", diff --git a/modules/module-mongodb-storage/src/index.ts b/modules/module-mongodb-storage/src/index.ts index 54ac0f55c..91aa8161b 100644 --- a/modules/module-mongodb-storage/src/index.ts +++ b/modules/module-mongodb-storage/src/index.ts @@ -5,3 +5,4 @@ export * as storage from './storage/storage-index.js'; export * from './types/types.js'; export * as types from './types/types.js'; +export * as utils from './utils/utils-index.js'; diff --git a/modules/module-mongodb-storage/src/migrations/db/migrations/1752661449910-connection-reporting.ts b/modules/module-mongodb-storage/src/migrations/db/migrations/1752661449910-connection-reporting.ts new file mode 100644 index 000000000..a74c35291 --- /dev/null +++ b/modules/module-mongodb-storage/src/migrations/db/migrations/1752661449910-connection-reporting.ts @@ -0,0 +1,58 @@ +import { migrations } from '@powersync/service-core'; +import * as storage from '../../../storage/storage-index.js'; +import { MongoStorageConfig } from '../../../types/types.js'; + +export const up: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); + + try { + await db.createConnectionReportingCollection(); + + await db.connection_report_events.createIndex( + { + connected_at: 1, + jwt_exp: 1, + disconnected_at: 1 + }, + { name: 'connection_list_index' } + ); + + await db.connection_report_events.createIndex( + { + user_id: 1 + }, + { name: 'connection_user_id_index' } + ); + await db.connection_report_events.createIndex( + { + client_id: 1 + }, + { name: 'connection_client_id_index' } + ); + await db.connection_report_events.createIndex( + { + sdk: 1 + }, + { name: 'connection_index' } + ); + } finally { + await db.client.close(); + } +}; + +export const down: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + + const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig); + + try { + await db.db.dropCollection('connection_report_events'); + } finally { + await db.client.close(); + } +}; diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 50459b84a..5c318e368 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -12,7 +12,7 @@ import { PowerSyncMongo } from './implementation/db.js'; import { SyncRuleDocument } from './implementation/models.js'; import { MongoPersistedSyncRulesContent } from './implementation/MongoPersistedSyncRulesContent.js'; import { MongoSyncBucketStorage, MongoSyncBucketStorageOptions } from './implementation/MongoSyncBucketStorage.js'; -import { generateSlotName } from './implementation/util.js'; +import { generateSlotName } from '../utils/util.js'; export class MongoBucketStorage extends BaseObserver diff --git a/modules/module-mongodb-storage/src/storage/MongoReportStorage.ts b/modules/module-mongodb-storage/src/storage/MongoReportStorage.ts new file mode 100644 index 000000000..4b9c332bf --- /dev/null +++ b/modules/module-mongodb-storage/src/storage/MongoReportStorage.ts @@ -0,0 +1,174 @@ +import { storage } from '@powersync/service-core'; +import { event_types } from '@powersync/service-types'; +import { PowerSyncMongo } from './implementation/db.js'; +import { logger } from '@powersync/lib-services-framework'; + +export class MongoReportStorage implements storage.ReportStorage { + public readonly db: PowerSyncMongo; + + constructor(db: PowerSyncMongo) { + this.db = db; + } + async deleteOldConnectionData(data: event_types.DeleteOldConnectionData): Promise { + const { date } = data; + const result = await this.db.connection_report_events.deleteMany({ + connected_at: { $lt: date }, + $or: [ + { disconnected_at: { $exists: true } }, + { jwt_exp: { $lt: new Date() }, disconnected_at: { $exists: false } } + ] + }); + if (result.deletedCount > 0) { + logger.info( + `TTL from ${date.toISOString()}: ${result.deletedCount} MongoDB documents have been removed from connection_report_events.` + ); + } + } + + async getClientConnectionReports( + data: event_types.ClientConnectionReportRequest + ): Promise { + const { start, end } = data; + const result = await this.db.connection_report_events + .aggregate([ + { + $match: { + connected_at: { $lte: end, $gte: start } + } + }, + this.connectionsFacetPipeline(), + this.connectionsProjectPipeline() + ]) + .toArray(); + return result[0]; + } + + async reportClientConnection(data: event_types.ClientConnectionBucketData): Promise { + const updateFilter = this.updateDocFilter(data.user_id, data.client_id!); + await this.db.connection_report_events.findOneAndUpdate( + updateFilter, + { + $set: data, + $unset: { + disconnected_at: '' + } + }, + { + upsert: true + } + ); + } + async reportClientDisconnection(data: event_types.ClientDisconnectionEventData): Promise { + const { connected_at, user_id, client_id } = data; + await this.db.connection_report_events.findOneAndUpdate( + { + client_id, + user_id, + connected_at + }, + { + $set: { + disconnected_at: data.disconnected_at + }, + $unset: { + jwt_exp: '' + } + } + ); + } + async getConnectedClients(): Promise { + const result = await this.db.connection_report_events + .aggregate([ + { + $match: { + disconnected_at: { $exists: false }, + jwt_exp: { $gt: new Date() } + } + }, + this.connectionsFacetPipeline(), + this.connectionsProjectPipeline() + ]) + .toArray(); + return result[0]; + } + + async [Symbol.asyncDispose]() { + // No-op + } + + private parseJsDate(date: Date) { + const year = date.getUTCFullYear(); + const month = date.getUTCMonth(); + const today = date.getUTCDate(); + const day = date.getUTCDay(); + return { + year, + month, + today, + day, + parsedDate: date + }; + } + + private connectionsFacetPipeline() { + return { + $facet: { + unique_users: [ + { + $group: { + _id: '$user_id' + } + }, + { + $count: 'count' + } + ], + sdk_versions_array: [ + { + $group: { + _id: '$sdk', + total: { $sum: 1 }, + client_ids: { $addToSet: '$client_id' }, + user_ids: { $addToSet: '$user_id' } + } + }, + { + $project: { + _id: 0, + sdk: '$_id', + users: { $size: '$user_ids' }, + clients: { $size: '$client_ids' } + } + }, + { + $sort: { + sdk: 1 + } + } + ] + } + }; + } + + private connectionsProjectPipeline() { + return { + $project: { + users: { $ifNull: [{ $arrayElemAt: ['$unique_users.count', 0] }, 0] }, + sdks: '$sdk_versions_array' + } + }; + } + + private updateDocFilter(userId: string, clientId: string) { + const { year, month, today } = this.parseJsDate(new Date()); + const nextDay = today + 1; + return { + user_id: userId, + client_id: clientId, + connected_at: { + $gte: new Date(Date.UTC(year, month, today)), + $lt: new Date(Date.UTC(year, month, nextDay)) + } + }; + } +} diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 003f50134..eecf747e2 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -28,7 +28,7 @@ import { MongoIdSequence } from './MongoIdSequence.js'; import { batchCreateCustomWriteCheckpoints } from './MongoWriteCheckpointAPI.js'; import { cacheKey, OperationBatch, RecordOperation } from './OperationBatch.js'; import { PersistedBatch } from './PersistedBatch.js'; -import { idPrefixFilter } from './util.js'; +import { idPrefixFilter } from '../../utils/util.js'; /** * 15MB diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts index 75333edb1..4ae2055b7 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoStorageProvider.ts @@ -4,8 +4,9 @@ import { POWERSYNC_VERSION, storage } from '@powersync/service-core'; import { MongoStorageConfig } from '../../types/types.js'; import { MongoBucketStorage } from '../MongoBucketStorage.js'; import { PowerSyncMongo } from './db.js'; +import { MongoReportStorage } from '../MongoReportStorage.js'; -export class MongoStorageProvider implements storage.BucketStorageProvider { +export class MongoStorageProvider implements storage.StorageProvider { get type() { return lib_mongo.MONGO_CONNECTION_TYPE; } @@ -37,15 +38,19 @@ export class MongoStorageProvider implements storage.BucketStorageProvider { await client.connect(); const database = new PowerSyncMongo(client, { database: resolvedConfig.storage.database }); - const factory = new MongoBucketStorage(database, { + const syncStorageFactory = new MongoBucketStorage(database, { // TODO currently need the entire resolved config due to this slot_name_prefix: resolvedConfig.slot_name_prefix }); + + // Storage factory for reports + const reportStorageFactory = new MongoReportStorage(database); return { - storage: factory, + storage: syncStorageFactory, + reportStorage: reportStorageFactory, shutDown: async () => { shuttingDown = true; - await factory[Symbol.asyncDispose](); + await syncStorageFactory[Symbol.asyncDispose](); await client.close(); }, tearDown: () => { diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index bc199bdf0..80b6385f8 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -35,7 +35,8 @@ import { MongoChecksumOptions, MongoChecksums } from './MongoChecksums.js'; import { MongoCompactor } from './MongoCompactor.js'; import { MongoParameterCompactor } from './MongoParameterCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; -import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from './util.js'; +import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js'; + export interface MongoSyncBucketStorageOptions { checksumOptions?: MongoChecksumOptions; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts deleted file mode 100644 index 4aa149bfb..000000000 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { TestStorageOptions } from '@powersync/service-core'; -import { MongoBucketStorage } from '../MongoBucketStorage.js'; -import { connectMongoForTests } from './util.js'; -import { MongoSyncBucketStorageOptions } from './MongoSyncBucketStorage.js'; - -export type MongoTestStorageOptions = { - url: string; - isCI: boolean; - internalOptions?: MongoSyncBucketStorageOptions; -}; - -export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorageOptions) => { - return async (options?: TestStorageOptions) => { - const db = connectMongoForTests(factoryOptions.url, factoryOptions.isCI); - - // None of the tests insert data into this collection, so it was never created - if (!(await db.db.listCollections({ name: db.bucket_parameters.collectionName }).hasNext())) { - await db.db.createCollection('bucket_parameters'); - } - - if (!options?.doNotClear) { - await db.clear(); - } - - // Full migrations are not currently run for tests, so we manually create the important ones - await db.createCheckpointEventsCollection(); - await db.createBucketStateIndex(); - - return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }, factoryOptions.internalOptions); - }; -}; diff --git a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts index 5f33d0988..dab269f34 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts @@ -16,7 +16,7 @@ import { CurrentDataDocument, SourceKey } from './models.js'; -import { replicaIdToSubkey } from './util.js'; +import { replicaIdToSubkey } from '../../utils/util.js'; /** * Maximum size of operations we write in a single transaction. diff --git a/modules/module-mongodb-storage/src/storage/implementation/db.ts b/modules/module-mongodb-storage/src/storage/implementation/db.ts index e6b08352f..519ca2075 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/db.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/db.ts @@ -8,6 +8,7 @@ import { BucketParameterDocument, BucketStateDocument, CheckpointEventDocument, + ClientConnectionDocument, CurrentDataDocument, CustomWriteCheckpointDocument, IdSequenceDocument, @@ -37,6 +38,7 @@ export class PowerSyncMongo { readonly locks: mongo.Collection; readonly bucket_state: mongo.Collection; readonly checkpoint_events: mongo.Collection; + readonly connection_report_events: mongo.Collection; readonly client: mongo.MongoClient; readonly db: mongo.Db; @@ -61,6 +63,7 @@ export class PowerSyncMongo { this.locks = this.db.collection('locks'); this.bucket_state = this.db.collection('bucket_state'); this.checkpoint_events = this.db.collection('checkpoint_events'); + this.connection_report_events = this.db.collection('connection_report_events'); } /** @@ -128,6 +131,20 @@ export class PowerSyncMongo { }); } + /** + * Only use in migrations and tests. + */ + async createConnectionReportingCollection() { + const existingCollections = await this.db + .listCollections({ name: 'connection_report_events' }, { nameOnly: false }) + .toArray(); + const collection = existingCollections[0]; + if (collection != null) { + return; + } + await this.db.createCollection('connection_report_events'); + } + /** * Only use in migrations and tests. */ diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 0d2b92426..6e00d6f2d 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -1,6 +1,7 @@ import { InternalOpId, storage } from '@powersync/service-core'; import { SqliteJsonValue } from '@powersync/service-sync-rules'; import * as bson from 'bson'; +import { event_types } from '@powersync/service-types'; /** * Replica id uniquely identifying a row on the source database. @@ -238,3 +239,5 @@ export interface InstanceDocument { // The instance UUID _id: string; } + +export interface ClientConnectionDocument extends event_types.ClientConnection {} diff --git a/modules/module-mongodb-storage/src/storage/storage-index.ts b/modules/module-mongodb-storage/src/storage/storage-index.ts index d4d3373b9..cfb1d4ad0 100644 --- a/modules/module-mongodb-storage/src/storage/storage-index.ts +++ b/modules/module-mongodb-storage/src/storage/storage-index.ts @@ -7,8 +7,9 @@ export * from './implementation/MongoPersistedSyncRulesContent.js'; export * from './implementation/MongoStorageProvider.js'; export * from './implementation/MongoSyncBucketStorage.js'; export * from './implementation/MongoSyncRulesLock.js'; -export * from './implementation/MongoTestStorageFactoryGenerator.js'; export * from './implementation/OperationBatch.js'; export * from './implementation/PersistedBatch.js'; -export * from './implementation/util.js'; +export * from '../utils/util.js'; export * from './MongoBucketStorage.js'; +export * from './MongoReportStorage.js'; +export * as test_utils from '../utils/test-utils.js'; diff --git a/modules/module-mongodb-storage/src/utils/test-utils.ts b/modules/module-mongodb-storage/src/utils/test-utils.ts new file mode 100644 index 000000000..eece317f4 --- /dev/null +++ b/modules/module-mongodb-storage/src/utils/test-utils.ts @@ -0,0 +1,57 @@ +import { mongo } from '@powersync/lib-service-mongodb'; +import { PowerSyncMongo } from '../storage/implementation/db.js'; +import { TestStorageOptions } from '@powersync/service-core'; +import { MongoReportStorage } from '../storage/MongoReportStorage.js'; +import { MongoBucketStorage } from '../storage/MongoBucketStorage.js'; +import { MongoSyncBucketStorageOptions } from '../storage/implementation/MongoSyncBucketStorage.js'; + +export type MongoTestStorageOptions = { + url: string; + isCI: boolean; + internalOptions?: MongoSyncBucketStorageOptions; +}; + +export function mongoTestStorageFactoryGenerator(factoryOptions: MongoTestStorageOptions) { + return async (options?: TestStorageOptions) => { + const db = connectMongoForTests(factoryOptions.url, factoryOptions.isCI); + + // None of the tests insert data into this collection, so it was never created + if (!(await db.db.listCollections({ name: db.bucket_parameters.collectionName }).hasNext())) { + await db.db.createCollection('bucket_parameters'); + } + + // Full migrations are not currently run for tests, so we manually create this + await db.createCheckpointEventsCollection(); + + if (!options?.doNotClear) { + await db.clear(); + } + + return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }, factoryOptions.internalOptions); + }; +} + +export function mongoTestReportStorageFactoryGenerator(factoryOptions: MongoTestStorageOptions) { + return async (options?: TestStorageOptions) => { + const db = connectMongoForTests(factoryOptions.url, factoryOptions.isCI); + + await db.createConnectionReportingCollection(); + + if (!options?.doNotClear) { + await db.clear(); + } + + return new MongoReportStorage(db); + }; +} + +export const connectMongoForTests = (url: string, isCI: boolean) => { + // Short timeout for tests, to fail fast when the server is not available. + // Slightly longer timeouts for CI, to avoid arbitrary test failures + const client = new mongo.MongoClient(url, { + connectTimeoutMS: isCI ? 15_000 : 5_000, + socketTimeoutMS: isCI ? 15_000 : 5_000, + serverSelectionTimeoutMS: isCI ? 15_000 : 2_500 + }); + return new PowerSyncMongo(client); +}; diff --git a/modules/module-mongodb-storage/src/storage/implementation/util.ts b/modules/module-mongodb-storage/src/utils/util.ts similarity index 84% rename from modules/module-mongodb-storage/src/storage/implementation/util.ts rename to modules/module-mongodb-storage/src/utils/util.ts index cae1a5b9f..eded213d6 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/util.ts +++ b/modules/module-mongodb-storage/src/utils/util.ts @@ -3,11 +3,9 @@ import * as crypto from 'crypto'; import * as uuid from 'uuid'; import { mongo } from '@powersync/lib-service-mongodb'; -import { BucketChecksum, PartialChecksum, PartialOrFullChecksum, storage, utils } from '@powersync/service-core'; - -import { PowerSyncMongo } from './db.js'; -import { BucketDataDocument } from './models.js'; +import { storage, utils } from '@powersync/service-core'; import { ServiceAssertionError } from '@powersync/lib-services-framework'; +import { BucketDataDocument } from '../storage/implementation/models.js'; export function idPrefixFilter(prefix: Partial, rest: (keyof T)[]): mongo.Condition { let filter = { @@ -105,20 +103,6 @@ export function replicaIdToSubkey(table: bson.ObjectId, id: storage.ReplicaId): } } -/** - * Helper for unit tests - */ -export const connectMongoForTests = (url: string, isCI: boolean) => { - // Short timeout for tests, to fail fast when the server is not available. - // Slightly longer timeouts for CI, to avoid arbitrary test failures - const client = new mongo.MongoClient(url, { - connectTimeoutMS: isCI ? 15_000 : 5_000, - socketTimeoutMS: isCI ? 15_000 : 5_000, - serverSelectionTimeoutMS: isCI ? 15_000 : 2_500 - }); - return new PowerSyncMongo(client); -}; - export function setSessionSnapshotTime(session: mongo.ClientSession, time: bson.Timestamp) { // This is a workaround for the lack of direct support for snapshot reads in the MongoDB driver. if (!session.snapshotEnabled) { diff --git a/modules/module-mongodb-storage/src/utils/utils-index.ts b/modules/module-mongodb-storage/src/utils/utils-index.ts new file mode 100644 index 000000000..a42a9024d --- /dev/null +++ b/modules/module-mongodb-storage/src/utils/utils-index.ts @@ -0,0 +1,2 @@ +export * as test_utils from './test-utils.js'; +export * from './util.js'; diff --git a/modules/module-mongodb-storage/test/src/__snapshots__/connection-report-storage.test.ts.snap b/modules/module-mongodb-storage/test/src/__snapshots__/connection-report-storage.test.ts.snap new file mode 100644 index 000000000..3d70e5582 --- /dev/null +++ b/modules/module-mongodb-storage/test/src/__snapshots__/connection-report-storage.test.ts.snap @@ -0,0 +1,215 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`Connection reporting storage > Should create a connection report if its after a day 1`] = ` +[ + { + "client_id": "client_week", + "sdk": "powersync-js/1.24.5", + "user_agent": "powersync-js/1.21.0 powersync-web Firefox/141 linux", + "user_id": "user_week", + }, + { + "client_id": "client_week", + "sdk": "powersync-js/1.24.5", + "user_agent": "powersync-js/1.21.0 powersync-web Firefox/141 linux", + "user_id": "user_week", + }, +] +`; + +exports[`Connection reporting storage > Should delete rows older than specified range 1`] = ` +{ + "sdks": [ + { + "clients": 1, + "sdk": "powersync-dart/1.6.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.1", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.2", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.24.5", + "users": 1, + }, + { + "clients": 1, + "sdk": "unknown", + "users": 1, + }, + ], + "users": 5, +} +`; + +exports[`Connection reporting storage > Should update a connected connection report and make it disconnected 1`] = ` +[ + { + "client_id": "client_three", + "sdk": "powersync-js/1.21.2", + "user_agent": "powersync-js/1.21.0 powersync-web Firefox/141 linux", + "user_id": "user_three", + }, +] +`; + +exports[`Connection reporting storage > Should update a connection report if its within a day 1`] = ` +[ + { + "client_id": "client_one", + "sdk": "powersync-dart/1.6.4", + "user_agent": "powersync-dart/1.6.4 Dart (flutter-web) Chrome/128 android", + "user_id": "user_one", + }, +] +`; + +exports[`Report storage tests > Should show connection report data for user over the past day 1`] = ` +{ + "sdks": [ + { + "clients": 1, + "sdk": "powersync-dart/1.6.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.1", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "unknown", + "users": 1, + }, + ], + "users": 3, +} +`; + +exports[`Report storage tests > Should show connection report data for user over the past month 1`] = ` +{ + "sdks": [ + { + "clients": 1, + "sdk": "powersync-dart/1.6.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.1", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.2", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.23.6", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.23.7", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.24.5", + "users": 1, + }, + { + "clients": 1, + "sdk": "unknown", + "users": 1, + }, + ], + "users": 7, +} +`; + +exports[`Report storage tests > Should show connection report data for user over the past week 1`] = ` +{ + "sdks": [ + { + "clients": 1, + "sdk": "powersync-dart/1.6.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.1", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.2", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.24.5", + "users": 1, + }, + { + "clients": 1, + "sdk": "unknown", + "users": 1, + }, + ], + "users": 5, +} +`; + +exports[`Report storage tests > Should show currently connected users 1`] = ` +{ + "sdks": [ + { + "clients": 1, + "sdk": "powersync-dart/1.6.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.1", + "users": 1, + }, + { + "clients": 1, + "sdk": "unknown", + "users": 1, + }, + ], + "users": 2, +} +`; diff --git a/modules/module-mongodb-storage/test/src/connection-report-storage.test.ts b/modules/module-mongodb-storage/test/src/connection-report-storage.test.ts new file mode 100644 index 000000000..fd1487cf1 --- /dev/null +++ b/modules/module-mongodb-storage/test/src/connection-report-storage.test.ts @@ -0,0 +1,133 @@ +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; +import { INITIALIZED_MONGO_REPORT_STORAGE_FACTORY } from './util.js'; +import { register, ReportUserData } from '@powersync/service-core-tests'; +import { event_types } from '@powersync/service-types'; +import { MongoReportStorage } from '@module/storage/MongoReportStorage.js'; + +const userData = register.REPORT_TEST_USERS; +const dates = register.REPORT_TEST_DATES; +const factory = await INITIALIZED_MONGO_REPORT_STORAGE_FACTORY(); + +function removeVolatileFields( + connections: event_types.ClientConnection[] +): Partial[] { + return connections.map((sdk: Partial) => { + const { _id, disconnected_at, connected_at, jwt_exp, ...rest } = sdk; + return { + ...rest + }; + }); +} + +async function loadData(data: ReportUserData, factory: MongoReportStorage) { + await factory.db.connection_report_events.insertMany(Object.values(data)); +} + +async function deleteData(factory: MongoReportStorage) { + await factory.db.connection_report_events.deleteMany(); +} + +beforeAll(async () => { + await loadData(userData, factory); +}); +afterAll(async () => { + await deleteData(factory); +}); + +describe('Report storage tests', async () => { + await register.registerReportTests(factory); +}); + +describe('Connection reporting storage', async () => { + it('Should create a connection report if its after a day', async () => { + const newConnectAt = new Date( + dates.now.getFullYear(), + dates.now.getMonth(), + dates.now.getDate() + 1, + dates.now.getHours() + ); + const jwtExp = new Date(newConnectAt.getFullYear(), newConnectAt.getMonth(), newConnectAt.getDate() + 1); + + await factory.reportClientConnection({ + sdk: userData.user_week.sdk, + connected_at: newConnectAt, + jwt_exp: jwtExp, + client_id: userData.user_week.client_id, + user_id: userData.user_week.user_id, + user_agent: userData.user_week.user_agent + }); + + const connection = await factory.db.connection_report_events.find({ user_id: userData.user_week.user_id }).toArray(); + expect(connection).toHaveLength(2); + const cleaned = removeVolatileFields(connection); + expect(cleaned).toMatchSnapshot(); + }); + + it('Should update a connection report if its within a day', async () => { + const newConnectAt = new Date( + dates.now.getFullYear(), + dates.now.getMonth(), + dates.now.getDate(), + dates.now.getHours(), + dates.now.getMinutes() + 20 + ); + const jwtExp = new Date(newConnectAt.getFullYear(), newConnectAt.getMonth(), newConnectAt.getDate() + 1); + await factory.reportClientConnection({ + sdk: userData.user_one.sdk, + connected_at: newConnectAt, + jwt_exp: jwtExp, + client_id: userData.user_one.client_id, + user_id: userData.user_one.user_id, + user_agent: userData.user_one.user_agent + }); + + const connection = await factory.db.connection_report_events + .find({ user_id: userData.user_one.user_id, client_id: userData.user_one.client_id }) + .toArray(); + expect(connection).toHaveLength(1); + expect(new Date(connection[0].connected_at)).toEqual(newConnectAt); + expect(new Date(connection[0].jwt_exp!)).toEqual(jwtExp); + expect(connection[0].disconnected_at).toBeUndefined(); + const cleaned = removeVolatileFields(connection); + expect(cleaned).toMatchSnapshot(); + }); + + it('Should update a connected connection report and make it disconnected', async () => { + const disconnectAt = new Date( + dates.now.getFullYear(), + dates.now.getMonth(), + dates.now.getDate(), + dates.now.getHours(), + dates.now.getMinutes() + 20 + ); + const jwtExp = new Date(disconnectAt.getFullYear(), disconnectAt.getMonth(), disconnectAt.getDate() + 1); + + await factory.reportClientDisconnection({ + disconnected_at: disconnectAt, + jwt_exp: jwtExp, + client_id: userData.user_three.client_id, + user_id: userData.user_three.user_id, + user_agent: userData.user_three.user_agent, + connected_at: userData.user_three.connected_at + }); + + const connection = await factory.db.connection_report_events.find({ user_id: userData.user_three.user_id }).toArray(); + expect(connection).toHaveLength(1); + expect(new Date(connection[0].disconnected_at!)).toEqual(disconnectAt); + const cleaned = removeVolatileFields(connection); + expect(cleaned).toMatchSnapshot(); + }); + + it('Should delete rows older than specified range', async () => { + await deleteData(factory); + await loadData(userData, factory); + await factory.deleteOldConnectionData({ + date: dates.weekAgo + }); + const connection = await factory.getClientConnectionReports({ + start: dates.monthAgo, + end: dates.now + }); + expect(connection).toMatchSnapshot(); + }); +}); diff --git a/modules/module-mongodb-storage/test/src/storage.test.ts b/modules/module-mongodb-storage/test/src/storage.test.ts index b238487c2..847d75b05 100644 --- a/modules/module-mongodb-storage/test/src/storage.test.ts +++ b/modules/module-mongodb-storage/test/src/storage.test.ts @@ -2,8 +2,7 @@ import { register } from '@powersync/service-core-tests'; import { describe } from 'vitest'; import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; import { env } from './env.js'; -import { MongoTestStorageFactoryGenerator } from '@module/storage/implementation/MongoTestStorageFactoryGenerator.js'; -import { MongoChecksumOptions } from '@module/storage/implementation/MongoChecksums.js'; +import { mongoTestStorageFactoryGenerator } from '@module/utils/test-utils.js'; describe('Mongo Sync Bucket Storage - Parameters', () => register.registerDataStorageParameterTests(INITIALIZED_MONGO_STORAGE_FACTORY)); @@ -18,7 +17,7 @@ describe('Sync Bucket Validation', register.registerBucketValidationTests); describe('Mongo Sync Bucket Storage - split operations', () => register.registerDataStorageDataTests( - MongoTestStorageFactoryGenerator({ + mongoTestStorageFactoryGenerator({ url: env.MONGO_TEST_URL, isCI: env.CI, internalOptions: { @@ -32,7 +31,7 @@ describe('Mongo Sync Bucket Storage - split operations', () => describe('Mongo Sync Bucket Storage - split buckets', () => register.registerDataStorageDataTests( - MongoTestStorageFactoryGenerator({ + mongoTestStorageFactoryGenerator({ url: env.MONGO_TEST_URL, isCI: env.CI, internalOptions: { @@ -43,50 +42,3 @@ describe('Mongo Sync Bucket Storage - split buckets', () => } }) )); - -describe('Mongo Sync Bucket Storage - checksum calculations', () => { - // This test tests 4 buckets x 4 operations in each. - // We specifically use operationBatchLimit that does not have factors in common with 4, - // as well some that do. - const params: MongoChecksumOptions[] = [ - { - bucketBatchLimit: 100, - operationBatchLimit: 3 - }, - - { - bucketBatchLimit: 10, - operationBatchLimit: 7 - }, - - { - bucketBatchLimit: 3, - operationBatchLimit: 1 - }, - { - bucketBatchLimit: 1, - operationBatchLimit: 3 - }, - { - bucketBatchLimit: 2, - operationBatchLimit: 4 - }, - { - bucketBatchLimit: 4, - operationBatchLimit: 12 - } - ]; - for (let options of params) { - describe(`${options.bucketBatchLimit}|${options.operationBatchLimit}`, () => { - register.testChecksumBatching( - MongoTestStorageFactoryGenerator({ - url: env.MONGO_TEST_URL, - isCI: env.CI, - internalOptions: { - checksumOptions: options - } - }) - ); - }); - } -}); diff --git a/modules/module-mongodb-storage/test/src/util.ts b/modules/module-mongodb-storage/test/src/util.ts index db3b2dd18..4a7174056 100644 --- a/modules/module-mongodb-storage/test/src/util.ts +++ b/modules/module-mongodb-storage/test/src/util.ts @@ -1,8 +1,12 @@ import { env } from './env.js'; +import { mongoTestReportStorageFactoryGenerator, mongoTestStorageFactoryGenerator } from '@module/utils/test-utils.js'; -import { MongoTestStorageFactoryGenerator } from '@module/storage/implementation/MongoTestStorageFactoryGenerator.js'; +export const INITIALIZED_MONGO_STORAGE_FACTORY = mongoTestStorageFactoryGenerator({ + url: env.MONGO_TEST_URL, + isCI: env.CI +}); -export const INITIALIZED_MONGO_STORAGE_FACTORY = MongoTestStorageFactoryGenerator({ +export const INITIALIZED_MONGO_REPORT_STORAGE_FACTORY = mongoTestReportStorageFactoryGenerator({ url: env.MONGO_TEST_URL, isCI: env.CI }); diff --git a/modules/module-mongodb/test/src/util.ts b/modules/module-mongodb/test/src/util.ts index db02a5158..cda52142e 100644 --- a/modules/module-mongodb/test/src/util.ts +++ b/modules/module-mongodb/test/src/util.ts @@ -14,12 +14,12 @@ export const TEST_CONNECTION_OPTIONS = types.normalizeConnectionConfig({ uri: TEST_URI }); -export const INITIALIZED_MONGO_STORAGE_FACTORY = mongo_storage.MongoTestStorageFactoryGenerator({ +export const INITIALIZED_MONGO_STORAGE_FACTORY = mongo_storage.test_utils.mongoTestStorageFactoryGenerator({ url: env.MONGO_TEST_URL, isCI: env.CI }); -export const INITIALIZED_POSTGRES_STORAGE_FACTORY = postgres_storage.PostgresTestStorageFactoryGenerator({ +export const INITIALIZED_POSTGRES_STORAGE_FACTORY = postgres_storage.test_utils.postgresTestStorageFactoryGenerator({ url: env.PG_STORAGE_TEST_URL }); diff --git a/modules/module-mysql/test/src/util.ts b/modules/module-mysql/test/src/util.ts index cb72b12b4..4f18cdc53 100644 --- a/modules/module-mysql/test/src/util.ts +++ b/modules/module-mysql/test/src/util.ts @@ -19,12 +19,12 @@ export const TEST_CONNECTION_OPTIONS = types.normalizeConnectionConfig({ uri: TEST_URI }); -export const INITIALIZED_MONGO_STORAGE_FACTORY = mongo_storage.MongoTestStorageFactoryGenerator({ +export const INITIALIZED_MONGO_STORAGE_FACTORY = mongo_storage.test_utils.mongoTestStorageFactoryGenerator({ url: env.MONGO_TEST_URL, isCI: env.CI }); -export const INITIALIZED_POSTGRES_STORAGE_FACTORY = postgres_storage.PostgresTestStorageFactoryGenerator({ +export const INITIALIZED_POSTGRES_STORAGE_FACTORY = postgres_storage.test_utils.postgresTestStorageFactoryGenerator({ url: env.PG_STORAGE_TEST_URL }); diff --git a/modules/module-postgres-storage/package.json b/modules/module-postgres-storage/package.json index 2d4839d62..e7b87ed2d 100644 --- a/modules/module-postgres-storage/package.json +++ b/modules/module-postgres-storage/package.json @@ -33,10 +33,10 @@ "@powersync/lib-service-postgres": "workspace:*", "@powersync/lib-services-framework": "workspace:*", "@powersync/service-core": "workspace:*", + "@powersync/service-types": "workspace:*", "@powersync/service-jpgwire": "workspace:*", "@powersync/service-jsonbig": "workspace:*", "@powersync/service-sync-rules": "workspace:*", - "@powersync/service-types": "workspace:*", "ix": "^5.0.0", "lru-cache": "^10.2.2", "p-defer": "^4.0.1", diff --git a/modules/module-postgres-storage/src/migrations/scripts/1756282360128-connection-reporting.ts b/modules/module-postgres-storage/src/migrations/scripts/1756282360128-connection-reporting.ts new file mode 100644 index 000000000..d2620a35a --- /dev/null +++ b/modules/module-postgres-storage/src/migrations/scripts/1756282360128-connection-reporting.ts @@ -0,0 +1,41 @@ +import { migrations } from '@powersync/service-core'; +import { openMigrationDB } from '../migration-utils.js'; + +export const up: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + await using client = openMigrationDB(configuration.storage); + await client.transaction(async (db) => { + await db.sql` + CREATE TABLE IF NOT EXISTS connection_report_events ( + id TEXT PRIMARY KEY, + user_agent TEXT NOT NULL, + client_id TEXT NOT NULL, + user_id TEXT NOT NULL, + sdk TEXT NOT NULL, + jwt_exp TIMESTAMP WITH TIME ZONE, + connected_at TIMESTAMP WITH TIME ZONE NOT NULL, + disconnected_at TIMESTAMP WITH TIME ZONE + ) + `.execute(); + + await db.sql` + CREATE INDEX IF NOT EXISTS sdk_list_index ON connection_report_events (connected_at, jwt_exp, disconnected_at) + `.execute(); + + await db.sql`CREATE INDEX IF NOT EXISTS sdk_user_id_index ON connection_report_events (user_id)`.execute(); + + await db.sql`CREATE INDEX IF NOT EXISTS sdk_client_id_index ON connection_report_events (client_id)`.execute(); + + await db.sql`CREATE INDEX IF NOT EXISTS sdk_index ON connection_report_events (sdk)`.execute(); + }); +}; + +export const down: migrations.PowerSyncMigrationFunction = async (context) => { + const { + service_context: { configuration } + } = context; + await using client = openMigrationDB(configuration.storage); + await client.sql`DROP TABLE IF EXISTS connection_report_events`.execute(); +}; diff --git a/modules/module-postgres-storage/src/storage/PostgresReportStorage.ts b/modules/module-postgres-storage/src/storage/PostgresReportStorage.ts new file mode 100644 index 000000000..22187760c --- /dev/null +++ b/modules/module-postgres-storage/src/storage/PostgresReportStorage.ts @@ -0,0 +1,258 @@ +import { storage } from '@powersync/service-core'; +import * as pg_wire from '@powersync/service-jpgwire'; +import { event_types } from '@powersync/service-types'; +import { v4 } from 'uuid'; +import * as lib_postgres from '@powersync/lib-service-postgres'; +import { NormalizedPostgresStorageConfig } from '../types/types.js'; +import { SdkReporting, SdkReportingDecoded } from '../types/models/SdkReporting.js'; +import { toInteger } from 'ix/util/tointeger.js'; +import { logger } from '@powersync/lib-services-framework'; +import { getStorageApplicationName } from '../utils/application-name.js'; +import { STORAGE_SCHEMA_NAME } from '../utils/db.js'; + +export type PostgresReportStorageOptions = { + config: NormalizedPostgresStorageConfig; +}; + +export class PostgresReportStorage implements storage.ReportStorage { + readonly db: lib_postgres.DatabaseClient; + constructor(protected options: PostgresReportStorageOptions) { + this.db = new lib_postgres.DatabaseClient({ + config: options.config, + schema: STORAGE_SCHEMA_NAME, + applicationName: getStorageApplicationName() + }); + + this.db.registerListener({ + connectionCreated: async (connection) => this.prepareStatements(connection) + }); + } + + private parseJsDate(date: Date) { + const year = date.getFullYear(); + const month = date.getMonth(); + const today = date.getDate(); + const day = date.getDay(); + return { + year, + month, + today, + day, + parsedDate: date + }; + } + + private mapListCurrentConnectionsResponse( + result: SdkReportingDecoded | null + ): event_types.ClientConnectionReportResponse { + if (!result) { + return { + users: 0, + sdks: [] + }; + } + return { + users: Number(result.users), + sdks: result.sdks?.data || [] + }; + } + private async listConnectionsQuery() { + return await this.db.sql` + WITH + filtered AS ( + SELECT + * + FROM + connection_report_events + WHERE + disconnected_at IS NULL + AND jwt_exp > NOW() + ), + unique_users AS ( + SELECT + COUNT(DISTINCT user_id) AS count + FROM + filtered + ), + sdk_versions_array AS ( + SELECT + sdk, + COUNT(DISTINCT client_id) AS clients, + COUNT(DISTINCT user_id) AS users + FROM + filtered + GROUP BY + sdk + ) + SELECT + ( + SELECT + COALESCE(count, 0) + FROM + unique_users + ) AS users, + ( + SELECT + JSON_AGG(ROW_TO_JSON(s)) + FROM + sdk_versions_array s + ) AS sdks; + ` + .decoded(SdkReporting) + .first(); + } + + private updateTableFilter() { + const { year, month, today } = this.parseJsDate(new Date()); + const nextDay = today + 1; + return { + gte: new Date(year, month, today).toISOString(), + lt: new Date(year, month, nextDay).toISOString() + }; + } + + async reportClientConnection(data: event_types.ClientConnectionBucketData): Promise { + const { sdk, connected_at, user_id, user_agent, jwt_exp, client_id } = data; + const connectIsoString = connected_at.toISOString(); + const jwtExpIsoString = jwt_exp.toISOString(); + const { gte, lt } = this.updateTableFilter(); + const uuid = v4(); + const result = await this.db.sql` + UPDATE connection_report_events + SET + connected_at = ${{ type: 1184, value: connectIsoString }}, + sdk = ${{ type: 'varchar', value: sdk }}, + user_agent = ${{ type: 'varchar', value: user_agent }}, + jwt_exp = ${{ type: 1184, value: jwtExpIsoString }}, + disconnected_at = NULL + WHERE + user_id = ${{ type: 'varchar', value: user_id }} + AND client_id = ${{ type: 'varchar', value: client_id }} + AND connected_at >= ${{ type: 1184, value: gte }} + AND connected_at < ${{ type: 1184, value: lt }}; + `.execute(); + if (result.results[1].status === 'UPDATE 0') { + await this.db.sql` + INSERT INTO + connection_report_events ( + user_id, + client_id, + connected_at, + sdk, + user_agent, + jwt_exp, + id + ) + VALUES + ( + ${{ type: 'varchar', value: user_id }}, + ${{ type: 'varchar', value: client_id }}, + ${{ type: 1184, value: connectIsoString }}, + ${{ type: 'varchar', value: sdk }}, + ${{ type: 'varchar', value: user_agent }}, + ${{ type: 1184, value: jwtExpIsoString }}, + ${{ type: 'varchar', value: uuid }} + ) + `.execute(); + } + } + async reportClientDisconnection(data: event_types.ClientDisconnectionEventData): Promise { + const { user_id, client_id, disconnected_at, connected_at } = data; + const disconnectIsoString = disconnected_at.toISOString(); + const connectIsoString = connected_at.toISOString(); + await this.db.sql` + UPDATE connection_report_events + SET + disconnected_at = ${{ type: 1184, value: disconnectIsoString }}, + jwt_exp = NULL + WHERE + user_id = ${{ type: 'varchar', value: user_id }} + AND client_id = ${{ type: 'varchar', value: client_id }} + AND connected_at = ${{ type: 1184, value: connectIsoString }} + `.execute(); + } + async getConnectedClients(): Promise { + const result = await this.listConnectionsQuery(); + return this.mapListCurrentConnectionsResponse(result); + } + + async getClientConnectionReports( + data: event_types.ClientConnectionReportRequest + ): Promise { + const { start, end } = data; + const result = await this.db.sql` + WITH + filtered AS ( + SELECT + * + FROM + connection_report_events + WHERE + connected_at >= ${{ type: 1184, value: start.toISOString() }} + AND connected_at <= ${{ type: 1184, value: end.toISOString() }} + ), + unique_users AS ( + SELECT + COUNT(DISTINCT user_id) AS count + FROM + filtered + ), + sdk_versions_array AS ( + SELECT + sdk, + COUNT(DISTINCT client_id) AS clients, + COUNT(DISTINCT user_id) AS users + FROM + filtered + GROUP BY + sdk + ) + SELECT + ( + SELECT + COALESCE(count, 0) + FROM + unique_users + ) AS users, + ( + SELECT + JSON_AGG(ROW_TO_JSON(s)) + FROM + sdk_versions_array s + ) AS sdks; + ` + .decoded(SdkReporting) + .first(); + return this.mapListCurrentConnectionsResponse(result); + } + async deleteOldConnectionData(data: event_types.DeleteOldConnectionData): Promise { + const { date } = data; + const result = await this.db.sql` + DELETE FROM connection_report_events + WHERE + connected_at < ${{ type: 1184, value: date.toISOString() }} + AND ( + disconnected_at IS NOT NULL + OR ( + jwt_exp < NOW() + AND disconnected_at IS NULL + ) + ); + `.execute(); + const deletedRows = toInteger(result.results[1].status.split(' ')[1] || '0'); + if (deletedRows > 0) { + logger.info( + `TTL from ${date.toISOString()}: ${deletedRows} PostgresSQL rows have been removed from connection_report_events.` + ); + } + } + + async [Symbol.asyncDispose]() { + await this.db[Symbol.asyncDispose](); + } + + async prepareStatements(connection: pg_wire.PgConnection) { + // It should be possible to prepare statements for some common operations here. + // This has not been implemented yet. + } +} diff --git a/modules/module-postgres-storage/src/storage/PostgresStorageProvider.ts b/modules/module-postgres-storage/src/storage/PostgresStorageProvider.ts index 7125ec6a8..3122aace3 100644 --- a/modules/module-postgres-storage/src/storage/PostgresStorageProvider.ts +++ b/modules/module-postgres-storage/src/storage/PostgresStorageProvider.ts @@ -5,8 +5,9 @@ import { storage } from '@powersync/service-core'; import { isPostgresStorageConfig, normalizePostgresStorageConfig, PostgresStorageConfig } from '../types/types.js'; import { dropTables } from '../utils/db.js'; import { PostgresBucketStorageFactory } from './PostgresBucketStorageFactory.js'; +import { PostgresReportStorage } from './PostgresReportStorage.js'; -export class PostgresStorageProvider implements storage.BucketStorageProvider { +export class PostgresStorageProvider implements storage.StorageProvider { get type() { return lib_postgres.POSTGRES_CONNECTION_TYPE; } @@ -28,13 +29,23 @@ export class PostgresStorageProvider implements storage.BucketStorageProvider { config: normalizedConfig, slot_name_prefix: options.resolvedConfig.slot_name_prefix }); + + const reportStorageFactory = new PostgresReportStorage({ + config: normalizedConfig + }); + return { + reportStorage: reportStorageFactory, storage: storageFactory, - shutDown: async () => storageFactory.db[Symbol.asyncDispose](), + shutDown: async () => { + await storageFactory.db[Symbol.asyncDispose](); + await reportStorageFactory.db[Symbol.asyncDispose](); + }, tearDown: async () => { logger.info(`Tearing down Postgres storage: ${normalizedConfig.database}...`); await dropTables(storageFactory.db); await storageFactory.db[Symbol.asyncDispose](); + await reportStorageFactory.db[Symbol.asyncDispose](); return true; } } satisfies storage.ActiveStorage; diff --git a/modules/module-postgres-storage/src/storage/storage-index.ts b/modules/module-postgres-storage/src/storage/storage-index.ts index b97b6a966..4f8558818 100644 --- a/modules/module-postgres-storage/src/storage/storage-index.ts +++ b/modules/module-postgres-storage/src/storage/storage-index.ts @@ -2,4 +2,3 @@ export * from './PostgresBucketStorageFactory.js'; export * from './PostgresCompactor.js'; export * from './PostgresStorageProvider.js'; export * from './PostgresSyncRulesStorage.js'; -export * from './PostgresTestStorageFactoryGenerator.js'; diff --git a/modules/module-postgres-storage/src/types/models/SdkReporting.ts b/modules/module-postgres-storage/src/types/models/SdkReporting.ts new file mode 100644 index 000000000..f44248bd3 --- /dev/null +++ b/modules/module-postgres-storage/src/types/models/SdkReporting.ts @@ -0,0 +1,23 @@ +import * as t from 'ts-codec'; +import { bigint, jsonb } from '../codecs.js'; + +export const Sdks = t.object({ + sdk: t.string, + clients: t.number, + users: t.number +}); + +export type Sdks = t.Encoded; + +export const SdkReporting = t.object({ + users: bigint, + sdks: t + .object({ + data: jsonb(t.array(Sdks)) + }) + .optional() + .or(t.Null) +}); + +export type SdkReporting = t.Encoded; +export type SdkReportingDecoded = t.Decoded; diff --git a/modules/module-postgres-storage/src/types/models/models-index.ts b/modules/module-postgres-storage/src/types/models/models-index.ts index fb5574608..81a528d33 100644 --- a/modules/module-postgres-storage/src/types/models/models-index.ts +++ b/modules/module-postgres-storage/src/types/models/models-index.ts @@ -8,3 +8,4 @@ export * from './Migration.js'; export * from './SourceTable.js'; export * from './SyncRules.js'; export * from './WriteCheckpoint.js'; +export * from './SdkReporting.js'; diff --git a/modules/module-postgres-storage/src/storage/PostgresTestStorageFactoryGenerator.ts b/modules/module-postgres-storage/src/utils/test-utils.ts similarity index 74% rename from modules/module-postgres-storage/src/storage/PostgresTestStorageFactoryGenerator.ts rename to modules/module-postgres-storage/src/utils/test-utils.ts index 1e6015cd6..c0ba7c2b0 100644 --- a/modules/module-postgres-storage/src/storage/PostgresTestStorageFactoryGenerator.ts +++ b/modules/module-postgres-storage/src/utils/test-utils.ts @@ -1,7 +1,8 @@ import { framework, PowerSyncMigrationManager, ServiceContext, TestStorageOptions } from '@powersync/service-core'; import { PostgresMigrationAgent } from '../migrations/PostgresMigrationAgent.js'; import { normalizePostgresStorageConfig, PostgresStorageConfigDecoded } from '../types/types.js'; -import { PostgresBucketStorageFactory } from './PostgresBucketStorageFactory.js'; +import { PostgresReportStorage } from '../storage/PostgresReportStorage.js'; +import { PostgresBucketStorageFactory } from '../storage/PostgresBucketStorageFactory.js'; export type PostgresTestStorageOptions = { url: string; @@ -12,7 +13,7 @@ export type PostgresTestStorageOptions = { migrationAgent?: (config: PostgresStorageConfigDecoded) => PostgresMigrationAgent; }; -export const postgresTestSetup = (factoryOptions: PostgresTestStorageOptions) => { +export function postgresTestSetup(factoryOptions: PostgresTestStorageOptions) { const BASE_CONFIG = { type: 'postgresql' as const, uri: factoryOptions.url, @@ -48,6 +49,21 @@ export const postgresTestSetup = (factoryOptions: PostgresTestStorageOptions) => }; return { + reportFactory: async (options?: TestStorageOptions) => { + try { + if (!options?.doNotClear) { + await migrate(framework.migrations.Direction.Up); + } + + return new PostgresReportStorage({ + config: TEST_CONNECTION_OPTIONS + }); + } catch (ex) { + // Vitest does not display these errors nicely when using the `await using` syntx + console.error(ex, ex.cause); + throw ex; + } + }, factory: async (options?: TestStorageOptions) => { try { if (!options?.doNotClear) { @@ -66,8 +82,8 @@ export const postgresTestSetup = (factoryOptions: PostgresTestStorageOptions) => }, migrate }; -}; +} -export const PostgresTestStorageFactoryGenerator = (factoryOptions: PostgresTestStorageOptions) => { +export function postgresTestStorageFactoryGenerator(factoryOptions: PostgresTestStorageOptions) { return postgresTestSetup(factoryOptions).factory; -}; +} diff --git a/modules/module-postgres-storage/src/utils/utils-index.ts b/modules/module-postgres-storage/src/utils/utils-index.ts index 65f808ff7..40370a4a7 100644 --- a/modules/module-postgres-storage/src/utils/utils-index.ts +++ b/modules/module-postgres-storage/src/utils/utils-index.ts @@ -2,3 +2,4 @@ export * from './bson.js'; export * from './bucket-data.js'; export * from './db.js'; export * from './ts-codec.js'; +export * as test_utils from './test-utils.js'; diff --git a/modules/module-postgres-storage/test/src/__snapshots__/connection-report-storage.test.ts.snap b/modules/module-postgres-storage/test/src/__snapshots__/connection-report-storage.test.ts.snap new file mode 100644 index 000000000..9c2d0b20e --- /dev/null +++ b/modules/module-postgres-storage/test/src/__snapshots__/connection-report-storage.test.ts.snap @@ -0,0 +1,215 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`Connection report storage > Should create a connection event if its after a day 1`] = ` +[ + { + "client_id": "client_week", + "sdk": "powersync-js/1.24.5", + "user_agent": "powersync-js/1.21.0 powersync-web Firefox/141 linux", + "user_id": "user_week", + }, + { + "client_id": "client_week", + "sdk": "powersync-js/1.24.5", + "user_agent": "powersync-js/1.21.0 powersync-web Firefox/141 linux", + "user_id": "user_week", + }, +] +`; + +exports[`Connection report storage > Should delete rows older than specified range 1`] = ` +{ + "sdks": [ + { + "clients": 1, + "sdk": "powersync-dart/1.6.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.1", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.2", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.24.5", + "users": 1, + }, + { + "clients": 1, + "sdk": "unknown", + "users": 1, + }, + ], + "users": 5, +} +`; + +exports[`Connection report storage > Should update a connection event and make it disconnected 1`] = ` +[ + { + "client_id": "client_three", + "sdk": "powersync-js/1.21.2", + "user_agent": "powersync-js/1.21.0 powersync-web Firefox/141 linux", + "user_id": "user_three", + }, +] +`; + +exports[`Connection report storage > Should update a connection event if its within a day 1`] = ` +[ + { + "client_id": "client_one", + "sdk": "powersync-dart/1.6.4", + "user_agent": "powersync-dart/1.6.4 Dart (flutter-web) Chrome/128 android", + "user_id": "user_one", + }, +] +`; + +exports[`Report storage tests > Should show connection report data for user over the past day 1`] = ` +{ + "sdks": [ + { + "clients": 1, + "sdk": "powersync-dart/1.6.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.1", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "unknown", + "users": 1, + }, + ], + "users": 3, +} +`; + +exports[`Report storage tests > Should show connection report data for user over the past month 1`] = ` +{ + "sdks": [ + { + "clients": 1, + "sdk": "powersync-dart/1.6.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.1", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.2", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.23.6", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.23.7", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.24.5", + "users": 1, + }, + { + "clients": 1, + "sdk": "unknown", + "users": 1, + }, + ], + "users": 7, +} +`; + +exports[`Report storage tests > Should show connection report data for user over the past week 1`] = ` +{ + "sdks": [ + { + "clients": 1, + "sdk": "powersync-dart/1.6.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.1", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.2", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.24.5", + "users": 1, + }, + { + "clients": 1, + "sdk": "unknown", + "users": 1, + }, + ], + "users": 5, +} +`; + +exports[`Report storage tests > Should show currently connected users 1`] = ` +{ + "sdks": [ + { + "clients": 1, + "sdk": "powersync-dart/1.6.4", + "users": 1, + }, + { + "clients": 1, + "sdk": "powersync-js/1.21.1", + "users": 1, + }, + { + "clients": 1, + "sdk": "unknown", + "users": 1, + }, + ], + "users": 2, +} +`; diff --git a/modules/module-postgres-storage/test/src/connection-report-storage.test.ts b/modules/module-postgres-storage/test/src/connection-report-storage.test.ts new file mode 100644 index 000000000..3a0f1df74 --- /dev/null +++ b/modules/module-postgres-storage/test/src/connection-report-storage.test.ts @@ -0,0 +1,233 @@ +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; +import { POSTGRES_REPORT_STORAGE_FACTORY } from './util.js'; +import { event_types } from '@powersync/service-types'; +import { register, ReportUserData } from '@powersync/service-core-tests'; +import { PostgresReportStorage } from '../../src/storage/PostgresReportStorage.js'; +import { DateTimeValue } from '@powersync/service-sync-rules'; + +const factory = await POSTGRES_REPORT_STORAGE_FACTORY(); +const userData = register.REPORT_TEST_USERS; +const dates = register.REPORT_TEST_DATES; + +function removeVolatileFields(sdks: event_types.ClientConnection[]): Partial[] { + return sdks.map((sdk) => { + const { id, disconnected_at, connected_at, jwt_exp, ...rest } = sdk; + return { + ...rest + }; + }); +} + +async function loadData(userData: ReportUserData, factory: PostgresReportStorage) { + await factory.db.sql` + INSERT INTO + connection_report_events ( + user_id, + client_id, + connected_at, + sdk, + user_agent, + jwt_exp, + id, + disconnected_at + ) + VALUES + ( + ${{ type: 'varchar', value: userData.user_one.user_id }}, + ${{ type: 'varchar', value: userData.user_one.client_id }}, + ${{ type: 1184, value: userData.user_one.connected_at.toISOString() }}, + ${{ type: 'varchar', value: userData.user_one.sdk }}, + ${{ type: 'varchar', value: userData.user_one.user_agent }}, + ${{ type: 1184, value: userData.user_one.jwt_exp.toISOString() }}, + ${{ type: 'varchar', value: '1' }}, + NULL + ), + ( + ${{ type: 'varchar', value: userData.user_two.user_id }}, + ${{ type: 'varchar', value: userData.user_two.client_id }}, + ${{ type: 1184, value: userData.user_two.connected_at.toISOString() }}, + ${{ type: 'varchar', value: userData.user_two.sdk }}, + ${{ type: 'varchar', value: userData.user_two.user_agent }}, + ${{ type: 1184, value: userData.user_two.jwt_exp.toISOString() }}, + ${{ type: 'varchar', value: '2' }}, + NULL + ), + ( + ${{ type: 'varchar', value: userData.user_four.user_id }}, + ${{ type: 'varchar', value: userData.user_four.client_id }}, + ${{ type: 1184, value: userData.user_four.connected_at.toISOString() }}, + ${{ type: 'varchar', value: userData.user_four.sdk }}, + ${{ type: 'varchar', value: userData.user_four.user_agent }}, + ${{ type: 1184, value: userData.user_four.jwt_exp.toISOString() }}, + ${{ type: 'varchar', value: '4' }}, + NULL + ), + ( + ${{ type: 'varchar', value: userData.user_old.user_id }}, + ${{ type: 'varchar', value: userData.user_old.client_id }}, + ${{ type: 1184, value: userData.user_old.connected_at.toISOString() }}, + ${{ type: 'varchar', value: userData.user_old.sdk }}, + ${{ type: 'varchar', value: userData.user_old.user_agent }}, + ${{ type: 1184, value: userData.user_old.jwt_exp.toISOString() }}, + ${{ type: 'varchar', value: '5' }}, + NULL + ), + ( + ${{ type: 'varchar', value: userData.user_three.user_id }}, + ${{ type: 'varchar', value: userData.user_three.client_id }}, + ${{ type: 1184, value: userData.user_three.connected_at.toISOString() }}, + ${{ type: 'varchar', value: userData.user_three.sdk }}, + ${{ type: 'varchar', value: userData.user_three.user_agent }}, + NULL, + ${{ type: 'varchar', value: '3' }}, + ${{ type: 1184, value: userData.user_three.disconnected_at.toISOString() }} + ), + ( + ${{ type: 'varchar', value: userData.user_week.user_id }}, + ${{ type: 'varchar', value: userData.user_week.client_id }}, + ${{ type: 1184, value: userData.user_week.connected_at.toISOString() }}, + ${{ type: 'varchar', value: userData.user_week.sdk }}, + ${{ type: 'varchar', value: userData.user_week.user_agent }}, + NULL, + ${{ type: 'varchar', value: 'week' }}, + ${{ type: 1184, value: userData.user_week.disconnected_at.toISOString() }} + ), + ( + ${{ type: 'varchar', value: userData.user_month.user_id }}, + ${{ type: 'varchar', value: userData.user_month.client_id }}, + ${{ type: 1184, value: userData.user_month.connected_at.toISOString() }}, + ${{ type: 'varchar', value: userData.user_month.sdk }}, + ${{ type: 'varchar', value: userData.user_month.user_agent }}, + NULL, + ${{ type: 'varchar', value: 'month' }}, + ${{ type: 1184, value: userData.user_month.disconnected_at.toISOString() }} + ), + ( + ${{ type: 'varchar', value: userData.user_expired.user_id }}, + ${{ type: 'varchar', value: userData.user_expired.client_id }}, + ${{ type: 1184, value: userData.user_expired.connected_at.toISOString() }}, + ${{ type: 'varchar', value: userData.user_expired.sdk }}, + ${{ type: 'varchar', value: userData.user_expired.user_agent }}, + ${{ type: 1184, value: userData.user_expired.jwt_exp.toISOString() }}, + ${{ type: 'varchar', value: 'expired' }}, + NULL + ) + `.execute(); +} + +async function deleteData(factory: PostgresReportStorage) { + await factory.db.sql`TRUNCATE TABLE connection_report_events`.execute(); +} + +beforeAll(async () => { + await loadData(userData, factory); +}); +afterAll(async () => { + await deleteData(factory); +}); + +describe('Report storage tests', async () => { + await register.registerReportTests(factory); +}); + +describe('Connection report storage', async () => { + it('Should update a connection event if its within a day', async () => { + const newConnectAt = new Date( + dates.now.getFullYear(), + dates.now.getMonth(), + dates.now.getDate(), + dates.now.getHours(), + dates.now.getMinutes() + 20 + ); + const jwtExp = new Date(newConnectAt.getFullYear(), newConnectAt.getMonth(), newConnectAt.getDate() + 1); + await factory.reportClientConnection({ + sdk: userData.user_one.sdk, + connected_at: newConnectAt, + jwt_exp: jwtExp, + client_id: userData.user_one.client_id, + user_id: userData.user_one.user_id, + user_agent: userData.user_one.user_agent + }); + + const sdk = await factory.db + .sql`SELECT * FROM connection_report_events WHERE user_id = ${{ type: 'varchar', value: userData.user_one.user_id }} AND client_id = ${{ type: 'varchar', value: userData.user_one.client_id }}`.rows(); + expect(sdk).toHaveLength(1); + expect(new Date((sdk[0].connected_at as unknown as DateTimeValue).iso8601Representation).toISOString()).toEqual( + newConnectAt.toISOString() + ); + expect(new Date((sdk[0].jwt_exp! as unknown as DateTimeValue).iso8601Representation).toISOString()).toEqual( + jwtExp.toISOString() + ); + expect(sdk[0].disconnected_at).toBeNull(); + const cleaned = removeVolatileFields(sdk); + expect(cleaned).toMatchSnapshot(); + }); + + it('Should update a connection event and make it disconnected', async () => { + const disconnectAt = new Date( + dates.now.getFullYear(), + dates.now.getMonth(), + dates.now.getDate(), + dates.now.getHours(), + dates.now.getMinutes() + 20 + ); + const jwtExp = new Date(disconnectAt.getFullYear(), disconnectAt.getMonth(), disconnectAt.getDate() + 1); + + await factory.reportClientDisconnection({ + disconnected_at: disconnectAt, + jwt_exp: jwtExp, + client_id: userData.user_three.client_id, + user_id: userData.user_three.user_id, + user_agent: userData.user_three.user_agent, + connected_at: dates.yesterday + }); + + const sdk = await factory.db + .sql`SELECT * FROM connection_report_events WHERE user_id = ${{ type: 'varchar', value: userData.user_three.user_id }}`.rows(); + expect(sdk).toHaveLength(1); + console.log(sdk[0]); + expect(new Date((sdk[0].disconnected_at! as unknown as DateTimeValue).iso8601Representation).toISOString()).toEqual( + disconnectAt.toISOString() + ); + const cleaned = removeVolatileFields(sdk); + expect(cleaned).toMatchSnapshot(); + }); + + it('Should create a connection event if its after a day', async () => { + const newConnectAt = new Date( + dates.now.getFullYear(), + dates.now.getMonth(), + dates.now.getDate() + 1, + dates.now.getHours() + ); + const jwtExp = new Date(newConnectAt.getFullYear(), newConnectAt.getMonth(), newConnectAt.getDate() + 1); + + await factory.reportClientConnection({ + sdk: userData.user_week.sdk, + connected_at: newConnectAt, + jwt_exp: jwtExp, + client_id: userData.user_week.client_id, + user_id: userData.user_week.user_id, + user_agent: userData.user_week.user_agent + }); + + const sdk = await factory.db + .sql`SELECT * FROM connection_report_events WHERE user_id = ${{ type: 'varchar', value: userData.user_week.user_id }}`.rows(); + expect(sdk).toHaveLength(2); + const cleaned = removeVolatileFields(sdk); + expect(cleaned).toMatchSnapshot(); + }); + + it('Should delete rows older than specified range', async () => { + await deleteData(factory); + await loadData(userData, factory); + await factory.deleteOldConnectionData({ + date: dates.weekAgo + }); + const sdk = await factory.getClientConnectionReports({ + start: dates.monthAgo, + end: dates.now + }); + expect(sdk).toMatchSnapshot(); + }); +}); diff --git a/modules/module-postgres-storage/test/src/util.ts b/modules/module-postgres-storage/test/src/util.ts index c9e0e2555..d055dc343 100644 --- a/modules/module-postgres-storage/test/src/util.ts +++ b/modules/module-postgres-storage/test/src/util.ts @@ -1,12 +1,8 @@ import path from 'path'; import { fileURLToPath } from 'url'; -import { normalizePostgresStorageConfig } from '../../src//types/types.js'; -import { PostgresMigrationAgent } from '../../src/migrations/PostgresMigrationAgent.js'; -import { - postgresTestSetup, - PostgresTestStorageFactoryGenerator -} from '../../src/storage/PostgresTestStorageFactoryGenerator.js'; +import { normalizePostgresStorageConfig, PostgresMigrationAgent } from '../../src/index.js'; import { env } from './env.js'; +import { postgresTestSetup } from '../../src/utils/test-utils.js'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); @@ -37,3 +33,4 @@ export const POSTGRES_STORAGE_SETUP = postgresTestSetup({ }); export const POSTGRES_STORAGE_FACTORY = POSTGRES_STORAGE_SETUP.factory; +export const POSTGRES_REPORT_STORAGE_FACTORY = POSTGRES_STORAGE_SETUP.reportFactory; diff --git a/modules/module-postgres/src/types/registry.ts b/modules/module-postgres/src/types/registry.ts index 5ab6b9231..362350cd6 100644 --- a/modules/module-postgres/src/types/registry.ts +++ b/modules/module-postgres/src/types/registry.ts @@ -59,7 +59,7 @@ interface RangeType extends BaseType { innerId: number; } -type KnownType = BuiltinType | ArrayType | DomainType | DomainType | CompositeType | RangeType; +type KnownType = BuiltinType | ArrayType | DomainType | CompositeType | RangeType; interface UnknownType extends BaseType { type: 'unknown'; diff --git a/modules/module-postgres/test/src/storage_combination.test.ts b/modules/module-postgres/test/src/storage_combination.test.ts index 1183d6806..89ec74de9 100644 --- a/modules/module-postgres/test/src/storage_combination.test.ts +++ b/modules/module-postgres/test/src/storage_combination.test.ts @@ -7,7 +7,7 @@ describe.skipIf(!env.TEST_POSTGRES_STORAGE)('replication storage combination - p test('should allow the same Postgres cluster to be used for data and storage', async () => { // Use the same cluster for the storage as the data source await using context = await WalStreamTestContext.open( - postgres_storage.PostgresTestStorageFactoryGenerator({ + postgres_storage.test_utils.postgresTestStorageFactoryGenerator({ url: env.PG_TEST_URL }), { doNotClear: false } diff --git a/modules/module-postgres/test/src/util.ts b/modules/module-postgres/test/src/util.ts index 410dd50e2..f0516f6b8 100644 --- a/modules/module-postgres/test/src/util.ts +++ b/modules/module-postgres/test/src/util.ts @@ -2,7 +2,7 @@ import { PostgresRouteAPIAdapter } from '@module/api/PostgresRouteAPIAdapter.js' import * as types from '@module/types/types.js'; import * as lib_postgres from '@powersync/lib-service-postgres'; import { logger } from '@powersync/lib-services-framework'; -import { BucketStorageFactory, InternalOpId, TestStorageFactory, TestStorageOptions } from '@powersync/service-core'; +import { BucketStorageFactory, InternalOpId, TestStorageFactory } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import * as mongo_storage from '@powersync/service-module-mongodb-storage'; import * as postgres_storage from '@powersync/service-module-postgres-storage'; @@ -11,12 +11,12 @@ import { describe, TestOptions } from 'vitest'; export const TEST_URI = env.PG_TEST_URL; -export const INITIALIZED_MONGO_STORAGE_FACTORY = mongo_storage.MongoTestStorageFactoryGenerator({ +export const INITIALIZED_MONGO_STORAGE_FACTORY = mongo_storage.test_utils.mongoTestStorageFactoryGenerator({ url: env.MONGO_TEST_URL, isCI: env.CI }); -export const INITIALIZED_POSTGRES_STORAGE_FACTORY = postgres_storage.PostgresTestStorageFactoryGenerator({ +export const INITIALIZED_POSTGRES_STORAGE_FACTORY = postgres_storage.test_utils.postgresTestStorageFactoryGenerator({ url: env.PG_STORAGE_TEST_URL }); diff --git a/package.json b/package.json index baa667346..9d366c918 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ "watch:ts": "pnpm build:ts -w --preserveWatchOutput", "watch:service": "concurrently --passthrough-arguments \"pnpm watch:ts\" \" pnpm start:service {@}\" -- ", "start:service": "pnpm --filter @powersync/service-image watch", + "clean:modules": "rm -rf node_modules && pnpm -r exec rm -rf node_modules", "clean": "pnpm run -r clean", "release": "pnpm build:production && pnpm changeset publish", "test": "pnpm run -r test", diff --git a/packages/jpgwire/src/pgwire_types.ts b/packages/jpgwire/src/pgwire_types.ts index 555de6e3a..97fe442ba 100644 --- a/packages/jpgwire/src/pgwire_types.ts +++ b/packages/jpgwire/src/pgwire_types.ts @@ -1,7 +1,7 @@ // Adapted from https://github.com/kagis/pgwire/blob/0dc927f9f8990a903f238737326e53ba1c8d094f/mod.js#L2218 import { JsonContainer } from '@powersync/service-jsonbig'; -import { CustomSqliteValue, TimeValue, type DatabaseInputValue } from '@powersync/service-sync-rules'; +import { type DatabaseInputValue, TimeValue } from '@powersync/service-sync-rules'; import { dateToSqlite, lsnMakeComparable, timestampToSqlite, timestamptzToSqlite } from './util.js'; import { StructureParser } from './structure_parser.js'; diff --git a/packages/service-core-tests/src/tests/register-report-tests.ts b/packages/service-core-tests/src/tests/register-report-tests.ts new file mode 100644 index 000000000..0e95e3143 --- /dev/null +++ b/packages/service-core-tests/src/tests/register-report-tests.ts @@ -0,0 +1,136 @@ +import { storage } from '@powersync/service-core'; +import { expect, it } from 'vitest'; + +const now = new Date(); +const nowAdd5minutes = new Date(now.getFullYear(), now.getMonth(), now.getDate(), now.getHours(), now.getMinutes() + 5); +const nowLess5minutes = new Date( + now.getFullYear(), + now.getMonth(), + now.getDate(), + now.getHours(), + now.getMinutes() - 5 +); +const dayAgo = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 1, now.getHours()); +const yesterday = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 1); +const weekAgo = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7); +const monthAgo = new Date(now.getFullYear(), now.getMonth() - 1, now.getDate()); + +export const REPORT_TEST_DATES = { + now, + nowAdd5minutes, + nowLess5minutes, + dayAgo, + yesterday, + weekAgo, + monthAgo +}; + +const user_one = { + user_id: 'user_one', + client_id: 'client_one', + connected_at: now, + sdk: 'powersync-dart/1.6.4', + user_agent: 'powersync-dart/1.6.4 Dart (flutter-web) Chrome/128 android', + jwt_exp: nowAdd5minutes +}; +const user_two = { + user_id: 'user_two', + client_id: 'client_two', + connected_at: nowLess5minutes, + sdk: 'powersync-js/1.21.1', + user_agent: 'powersync-js/1.21.0 powersync-web Chromium/138 linux', + jwt_exp: nowAdd5minutes +}; +const user_three = { + user_id: 'user_three', + client_id: 'client_three', + connected_at: yesterday, + sdk: 'powersync-js/1.21.2', + user_agent: 'powersync-js/1.21.0 powersync-web Firefox/141 linux', + disconnected_at: yesterday +}; + +const user_four = { + user_id: 'user_four', + client_id: 'client_four', + connected_at: now, + sdk: 'powersync-js/1.21.4', + user_agent: 'powersync-js/1.21.0 powersync-web Firefox/141 linux', + jwt_exp: nowLess5minutes +}; + +const user_old = { + user_id: 'user_one', + client_id: '', + connected_at: now, + sdk: 'unknown', + user_agent: 'Dart (flutter-web) Chrome/128 android', + jwt_exp: nowAdd5minutes +}; + +const user_week = { + user_id: 'user_week', + client_id: 'client_week', + connected_at: weekAgo, + sdk: 'powersync-js/1.24.5', + user_agent: 'powersync-js/1.21.0 powersync-web Firefox/141 linux', + disconnected_at: weekAgo +}; + +const user_month = { + user_id: 'user_month', + client_id: 'client_month', + connected_at: monthAgo, + sdk: 'powersync-js/1.23.6', + user_agent: 'powersync-js/1.23.0 powersync-web Firefox/141 linux', + disconnected_at: monthAgo +}; + +const user_expired = { + user_id: 'user_expired', + client_id: 'client_expired', + connected_at: monthAgo, + sdk: 'powersync-js/1.23.7', + user_agent: 'powersync-js/1.23.0 powersync-web Firefox/141 linux', + jwt_exp: monthAgo +}; +export const REPORT_TEST_USERS = { + user_one, + user_two, + user_three, + user_four, + user_old, + user_week, + user_month, + user_expired +}; +export type ReportUserData = typeof REPORT_TEST_USERS; + +export async function registerReportTests(factory: storage.ReportStorage) { + it('Should show currently connected users', async () => { + const current = await factory.getConnectedClients(); + expect(current).toMatchSnapshot(); + }); + + it('Should show connection report data for user over the past month', async () => { + const sdk = await factory.getClientConnectionReports({ + start: monthAgo, + end: now + }); + expect(sdk).toMatchSnapshot(); + }); + it('Should show connection report data for user over the past week', async () => { + const sdk = await factory.getClientConnectionReports({ + start: weekAgo, + end: now + }); + expect(sdk).toMatchSnapshot(); + }); + it('Should show connection report data for user over the past day', async () => { + const sdk = await factory.getClientConnectionReports({ + start: dayAgo, + end: now + }); + expect(sdk).toMatchSnapshot(); + }); +} diff --git a/packages/service-core-tests/src/tests/tests-index.ts b/packages/service-core-tests/src/tests/tests-index.ts index 6d35089e1..3145bb725 100644 --- a/packages/service-core-tests/src/tests/tests-index.ts +++ b/packages/service-core-tests/src/tests/tests-index.ts @@ -6,4 +6,5 @@ export * from './register-data-storage-data-tests.js'; export * from './register-data-storage-checkpoint-tests.js'; export * from './register-migration-tests.js'; export * from './register-sync-tests.js'; +export * from './register-report-tests.js'; export * from './util.js'; diff --git a/packages/service-core/src/events/EventsEngine.ts b/packages/service-core/src/events/EventsEngine.ts new file mode 100644 index 000000000..61cea555e --- /dev/null +++ b/packages/service-core/src/events/EventsEngine.ts @@ -0,0 +1,38 @@ +import EventEmitter from 'node:events'; +import { logger } from '@powersync/lib-services-framework'; +import { event_types } from '@powersync/service-types'; + +export class EventsEngine { + private emitter: EventEmitter; + private events: Set = new Set(); + constructor() { + this.emitter = new EventEmitter({ captureRejections: true }); + this.emitter.on('error', (error: Error) => { + logger.error(error.message); + }); + } + + /** + * All new events added need to be subscribed to be used. + * @example engine.subscribe(new MyNewEvent(storageEngine)); + */ + subscribe(event: event_types.EmitterEvent): void { + if (!this.events.has(event.event)) { + this.events.add(event.event); + } + this.emitter.on(event.event, event.handler.bind(event)); + } + + get listEvents(): event_types.EventsEngineEventType[] { + return Array.from(this.events.values()); + } + + emit(event: K, data: event_types.SubscribeEvents[K]): void { + this.emitter.emit(event, data); + } + + shutDown(): void { + logger.info(`Shutting down EmitterEngine and removing all listeners for ${this.listEvents.join(', ')}.`); + this.emitter.removeAllListeners(); + } +} diff --git a/packages/service-core/src/routes/configure-fastify.ts b/packages/service-core/src/routes/configure-fastify.ts index ba8906d15..45c5eacc2 100644 --- a/packages/service-core/src/routes/configure-fastify.ts +++ b/packages/service-core/src/routes/configure-fastify.ts @@ -1,5 +1,4 @@ import type fastify from 'fastify'; -import * as uuid from 'uuid'; import { registerFastifyNotFoundHandler, registerFastifyRoutes } from './route-register.js'; diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 2f20382f5..35e51e027 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -1,12 +1,11 @@ import { ErrorCode, errors, schema } from '@powersync/lib-services-framework'; -import { RequestParameters } from '@powersync/service-sync-rules'; import * as sync from '../../sync/sync-index.js'; import * as util from '../../util/util-index.js'; import { SocketRouteGenerator } from '../router-socket.js'; import { SyncRoutes } from './sync-stream.js'; -import { APIMetric } from '@powersync/service-types'; +import { APIMetric, event_types } from '@powersync/service-types'; export const syncStreamReactive: SocketRouteGenerator = (router) => router.reactiveStream(SyncRoutes.STREAM, { @@ -14,6 +13,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal, connection }) => { const { service_context, logger } = context; const { routerEngine, metricsEngine, syncContext } = service_context; + const streamStart = Date.now(); logger.defaultMeta = { ...logger.defaultMeta, @@ -21,7 +21,15 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => client_id: params.client_id, user_agent: context.user_agent }; - const streamStart = Date.now(); + + const sdkData: event_types.ConnectedUserData & event_types.ClientConnectionEventData = { + client_id: params.client_id ?? '', + user_id: context.user_id!, + user_agent: context.user_agent, + // At this point the token_payload is guaranteed to be present + jwt_exp: new Date(context.token_payload!.exp * 1000), + connected_at: new Date(streamStart) + }; // Best effort guess on why the stream was closed. // We use the `??=` operator everywhere, so that we catch the first relevant @@ -83,6 +91,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => }); metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1); + service_context.eventsEngine.emit(event_types.EventsEngineEventType.SDK_CONNECT_EVENT, sdkData); const tracker = new sync.RequestTracker(metricsEngine); if (connection.tracker.encoding) { // Must be set before we start the stream @@ -174,6 +183,10 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => close_reason: closeReason ?? 'unknown' }); metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(-1); + service_context.eventsEngine.emit(event_types.EventsEngineEventType.SDK_DISCONNECT_EVENT, { + ...sdkData, + disconnected_at: new Date() + }); } } }); diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index e2181a6f2..d9cb3e406 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -7,8 +7,8 @@ import * as util from '../../util/util-index.js'; import { authUser } from '../auth.js'; import { routeDefinition } from '../router.js'; +import { APIMetric, event_types } from '@powersync/service-types'; -import { APIMetric } from '@powersync/service-types'; import { maybeCompressResponseStream } from '../compression.js'; export enum SyncRoutes { @@ -25,7 +25,7 @@ export const syncStreamed = routeDefinition({ authorize: authUser, validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }), handler: async (payload) => { - const { service_context, logger } = payload.context; + const { service_context, logger, token_payload } = payload.context; const { routerEngine, storageEngine, metricsEngine, syncContext } = service_context; const headers = payload.request.headers; const userAgent = headers['x-user-agent'] ?? headers['user-agent']; @@ -44,6 +44,14 @@ export const syncStreamed = routeDefinition({ user_id: payload.context.user_id, bson: useBson }; + const sdkData: event_types.ConnectedUserData & event_types.ClientConnectionEventData = { + client_id: clientId ?? '', + user_id: payload.context.user_id!, + user_agent: userAgent as string, + // At this point the token_payload is guaranteed to be present + jwt_exp: new Date(token_payload!.exp * 1000), + connected_at: new Date(streamStart) + }; if (routerEngine.closed) { throw new errors.ServiceError({ @@ -69,6 +77,7 @@ export const syncStreamed = routeDefinition({ const tracker = new sync.RequestTracker(metricsEngine); try { metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1); + service_context.eventsEngine.emit(event_types.EventsEngineEventType.SDK_CONNECT_EVENT, sdkData); const syncLines = sync.streamResponse({ syncContext: syncContext, bucketStorage, @@ -134,6 +143,10 @@ export const syncStreamed = routeDefinition({ } controller.abort(); metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(-1); + service_context.eventsEngine.emit(event_types.EventsEngineEventType.SDK_DISCONNECT_EVENT, { + ...sdkData, + disconnected_at: new Date() + }); logger.info(`Sync stream complete`, { ...tracker.getLogMeta(), stream_ms: Date.now() - streamStart, @@ -144,6 +157,10 @@ export const syncStreamed = routeDefinition({ } catch (ex) { controller.abort(); metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(-1); + service_context.eventsEngine.emit(event_types.EventsEngineEventType.SDK_DISCONNECT_EVENT, { + ...sdkData, + disconnected_at: new Date() + }); } } }); diff --git a/packages/service-core/src/routes/router.ts b/packages/service-core/src/routes/router.ts index 3298c6e6b..9ee9d075b 100644 --- a/packages/service-core/src/routes/router.ts +++ b/packages/service-core/src/routes/router.ts @@ -1,4 +1,4 @@ -import { router, ServiceError, Logger } from '@powersync/lib-services-framework'; +import { Logger, router } from '@powersync/lib-services-framework'; import type { JwtPayload } from '../auth/auth-index.js'; import { ServiceContext } from '../system/ServiceContext.js'; import { RouterEngine } from './RouterEngine.js'; @@ -31,11 +31,11 @@ export type BasicRouterRequest = { hostname: string; }; -export type ConextProviderOptions = { +export type ContextProviderOptions = { logger: Logger; }; -export type ContextProvider = (request: BasicRouterRequest, options: ConextProviderOptions) => Promise; +export type ContextProvider = (request: BasicRouterRequest, options: ContextProviderOptions) => Promise; export type RequestEndpoint< I, diff --git a/packages/service-core/src/storage/BucketStorageFactory.ts b/packages/service-core/src/storage/BucketStorageFactory.ts index faf02e4b7..355efec84 100644 --- a/packages/service-core/src/storage/BucketStorageFactory.ts +++ b/packages/service-core/src/storage/BucketStorageFactory.ts @@ -3,6 +3,7 @@ import { ParseSyncRulesOptions, PersistedSyncRules, PersistedSyncRulesContent } import { ReplicationEventPayload } from './ReplicationEventPayload.js'; import { ReplicationLock } from './ReplicationLock.js'; import { SyncRulesBucketStorage } from './SyncRulesBucketStorage.js'; +import { ReportStorage } from './ReportStorage.js'; /** * Represents a configured storage provider. @@ -164,3 +165,4 @@ export interface TestStorageOptions { doNotClear?: boolean; } export type TestStorageFactory = (options?: TestStorageOptions) => Promise; +export type TestReportStorageFactory = (options?: TestStorageOptions) => Promise; diff --git a/packages/service-core/src/storage/ReportStorage.ts b/packages/service-core/src/storage/ReportStorage.ts new file mode 100644 index 000000000..b75c67763 --- /dev/null +++ b/packages/service-core/src/storage/ReportStorage.ts @@ -0,0 +1,39 @@ +import { event_types } from '@powersync/service-types'; + +/** + * Represents a configured report storage. + * + * Report storage is used for storing localized data for the instance. + * Data can then be used for reporting purposes. + * + */ +export interface ReportStorage extends AsyncDisposable { + /** + * Report a client connection. + */ + reportClientConnection(data: event_types.ClientConnectionBucketData): Promise; + /** + * Report a client disconnection. + */ + reportClientDisconnection(data: event_types.ClientDisconnectionEventData): Promise; + /** + * Get currently connected clients. + * This will return any short or long term connected clients. + * Clients that have no disconnected_at timestamp and that have a valid jwt_exp timestamp are considered connected. + */ + getConnectedClients(): Promise; + /** + * Get a report of client connections over a day, week or month. + * This is internally used to generate reports over it always returns the previous day, week or month. + * Usually this is call on the start of the new day, week or month. It will return all unique completed connections + * as well as uniques currently connected clients. + */ + getClientConnectionReports( + data: event_types.ClientConnectionReportRequest + ): Promise; + /** + * Delete old connection data based on a specific date. + * This is used to clean up old connection data that is no longer needed. + */ + deleteOldConnectionData(data: event_types.DeleteOldConnectionData): Promise; +} diff --git a/packages/service-core/src/storage/StorageEngine.ts b/packages/service-core/src/storage/StorageEngine.ts index 62fcf11f1..a53819ccb 100644 --- a/packages/service-core/src/storage/StorageEngine.ts +++ b/packages/service-core/src/storage/StorageEngine.ts @@ -1,7 +1,7 @@ import { BaseObserver, logger, ServiceError } from '@powersync/lib-services-framework'; import { ResolvedPowerSyncConfig } from '../util/util-index.js'; import { BucketStorageFactory } from './BucketStorageFactory.js'; -import { ActiveStorage, BucketStorageProvider } from './StorageProvider.js'; +import { ActiveStorage, StorageProvider } from './StorageProvider.js'; export type StorageEngineOptions = { configuration: ResolvedPowerSyncConfig; @@ -14,7 +14,7 @@ export interface StorageEngineListener { export class StorageEngine extends BaseObserver { // TODO: This will need to revisited when we actually support multiple storage providers. - private storageProviders: Map = new Map(); + private storageProviders: Map = new Map(); private currentActiveStorage: ActiveStorage | null = null; constructor(private options: StorageEngineOptions) { @@ -37,7 +37,7 @@ export class StorageEngine extends BaseObserver { * Register a provider which generates a {@link BucketStorageFactory} * given the matching config specified in the loaded {@link ResolvedPowerSyncConfig} */ - registerProvider(provider: BucketStorageProvider) { + registerProvider(provider: StorageProvider) { this.storageProviders.set(provider.type, provider); } diff --git a/packages/service-core/src/storage/StorageProvider.ts b/packages/service-core/src/storage/StorageProvider.ts index 4cffbb1ec..f2404bdf0 100644 --- a/packages/service-core/src/storage/StorageProvider.ts +++ b/packages/service-core/src/storage/StorageProvider.ts @@ -1,9 +1,11 @@ import { ServiceError } from '@powersync/lib-services-framework'; import * as util from '../util/util-index.js'; import { BucketStorageFactory } from './BucketStorageFactory.js'; +import { ReportStorage } from './ReportStorage.js'; export interface ActiveStorage { storage: BucketStorageFactory; + reportStorage: ReportStorage; shutDown(): Promise; /** @@ -22,7 +24,7 @@ export interface GetStorageOptions { /** * Represents a provider that can create a storage instance for a specific storage type from configuration. */ -export interface BucketStorageProvider { +export interface StorageProvider { /** * The storage type that this provider provides. * The type should match the `type` field in the config. diff --git a/packages/service-core/src/storage/storage-index.ts b/packages/service-core/src/storage/storage-index.ts index 9485f85d5..b83a2fb2f 100644 --- a/packages/service-core/src/storage/storage-index.ts +++ b/packages/service-core/src/storage/storage-index.ts @@ -13,3 +13,4 @@ export * from './BucketStorageBatch.js'; export * from './SyncRulesBucketStorage.js'; export * from './PersistedSyncRulesContent.js'; export * from './ReplicationLock.js'; +export * from './ReportStorage.js'; diff --git a/packages/service-core/src/system/ServiceContext.ts b/packages/service-core/src/system/ServiceContext.ts index fa1d68f7d..763fcbca0 100644 --- a/packages/service-core/src/system/ServiceContext.ts +++ b/packages/service-core/src/system/ServiceContext.ts @@ -1,4 +1,4 @@ -import { LifeCycledSystem, MigrationManager, ServiceIdentifier, container } from '@powersync/lib-services-framework'; +import { container, LifeCycledSystem, MigrationManager, ServiceIdentifier } from '@powersync/lib-services-framework'; import { framework } from '../index.js'; import * as metrics from '../metrics/MetricsEngine.js'; @@ -8,6 +8,7 @@ import * as routes from '../routes/routes-index.js'; import * as storage from '../storage/storage-index.js'; import { SyncContext } from '../sync/SyncContext.js'; import * as utils from '../util/util-index.js'; +import { EventsEngine } from '../events/EventsEngine.js'; export interface ServiceContext { configuration: utils.ResolvedPowerSyncConfig; @@ -19,6 +20,7 @@ export interface ServiceContext { migrations: PowerSyncMigrationManager; syncContext: SyncContext; serviceMode: ServiceContextMode; + eventsEngine: EventsEngine; } export enum ServiceContextMode { @@ -45,6 +47,7 @@ export class ServiceContextContainer implements ServiceContext { configuration: utils.ResolvedPowerSyncConfig; lifeCycleEngine: LifeCycledSystem; storageEngine: storage.StorageEngine; + eventsEngine: EventsEngine; syncContext: SyncContext; routerEngine: routes.RouterEngine; serviceMode: ServiceContextMode; @@ -66,6 +69,11 @@ export class ServiceContextContainer implements ServiceContext { } }); + this.eventsEngine = new EventsEngine(); + this.lifeCycleEngine.withLifecycle(this.eventsEngine, { + stop: (emitterEngine) => emitterEngine.shutDown() + }); + this.lifeCycleEngine.withLifecycle(this.storageEngine, { start: (storageEngine) => storageEngine.start(), stop: (storageEngine) => storageEngine.shutDown() @@ -89,6 +97,10 @@ export class ServiceContextContainer implements ServiceContext { // Migrations should be executed before the system starts start: () => migrationManager[Symbol.asyncDispose]() }); + + this.lifeCycleEngine.withLifecycle(this.eventsEngine, { + stop: (emitterEngine) => emitterEngine.shutDown() + }); } get replicationEngine(): replication.ReplicationEngine | null { diff --git a/packages/service-core/test/src/routes/mocks.ts b/packages/service-core/test/src/routes/mocks.ts index f6205bf29..2768c094d 100644 --- a/packages/service-core/test/src/routes/mocks.ts +++ b/packages/service-core/test/src/routes/mocks.ts @@ -11,6 +11,7 @@ import { SyncRulesBucketStorage } from '@/index.js'; import { MeterProvider } from '@opentelemetry/sdk-metrics'; +import { EventsEngine } from '@/events/EventsEngine.js'; export function mockServiceContext(storage: Partial | null) { // This is very incomplete - just enough to get the current tests passing. @@ -34,6 +35,7 @@ export function mockServiceContext(storage: Partial | nu createCoreAPIMetrics(metricsEngine); const service_context: Partial = { syncContext: new SyncContext({ maxBuckets: 1, maxDataFetchConcurrency: 1, maxParameterQueryResults: 1 }), + eventsEngine: new EventsEngine(), routerEngine: { getAPI() { return { diff --git a/packages/service-core/test/src/routes/stream.test.ts b/packages/service-core/test/src/routes/stream.test.ts index 52a3e4e9c..724437b11 100644 --- a/packages/service-core/test/src/routes/stream.test.ts +++ b/packages/service-core/test/src/routes/stream.test.ts @@ -3,7 +3,7 @@ import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-fr import { SqlSyncRules } from '@powersync/service-sync-rules'; import { Readable, Writable } from 'stream'; import { pipeline } from 'stream/promises'; -import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { describe, expect, it } from 'vitest'; import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js'; import { mockServiceContext } from './mocks.js'; @@ -12,7 +12,12 @@ describe('Stream Route', () => { it('handles missing sync rules', async () => { const context: Context = { logger: logger, - service_context: mockServiceContext(null) + service_context: mockServiceContext(null), + token_payload: { + sub: '', + exp: 0, + iat: 0 + } }; const request: BasicRouterRequest = { @@ -21,10 +26,13 @@ describe('Stream Route', () => { protocol: 'http' }; - const error = (await (syncStreamed.handler({ context, params: {}, request }) as Promise).catch( - (e) => e - )) as ServiceError; - + const error = (await ( + syncStreamed.handler({ + context, + params: {}, + request + }) as Promise + ).catch((e) => e)) as ServiceError; expect(error.errorData.status).toEqual(500); expect(error.errorData.code).toEqual('PSYNC_S2302'); }); diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index dca03e082..fb72c079f 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -4,3 +4,4 @@ export * from './definitions.js'; export * as internal_routes from './routes.js'; export * from './metrics.js'; export * as metric_types from './metrics.js'; +export * as event_types from './reports'; diff --git a/packages/types/src/reports.ts b/packages/types/src/reports.ts new file mode 100644 index 000000000..266b5942a --- /dev/null +++ b/packages/types/src/reports.ts @@ -0,0 +1,85 @@ +export enum EventsEngineEventType { + SDK_CONNECT_EVENT = 'sdk-connect-event', + SDK_DISCONNECT_EVENT = 'sdk-disconnect-event', + SDK_DELETE_OLD = 'sdk-delete-old' +} + +/** + * Events engine event types. + * Any new events will need to be added here with the data structure they expect. + */ +export type SubscribeEvents = { + [EventsEngineEventType.SDK_CONNECT_EVENT]: ClientConnectionEventData; + [EventsEngineEventType.SDK_DISCONNECT_EVENT]: ClientDisconnectionEventData; + [EventsEngineEventType.SDK_DELETE_OLD]: DeleteOldConnectionData; +}; + +/** + * Events handler functions + */ +export type EventHandlerFunc = (data: SubscribeEvents[K]) => Promise | void; + +/** + * Emitter event interface. + * Create a class extending EmitterEvent and implement the handler function. + */ +export interface EmitterEvent { + event: K; + handler: EventHandlerFunc; +} + +export type ConnectedUserData = { + client_id: string; + user_id: string; + user_agent?: string; + jwt_exp: Date; +}; + +export type DeleteOldConnectionData = { + /** + * Date before which all connection data should be deleted. + * This is used to clean up old connection data that is no longer needed. + */ + date: Date; +}; + +export type ClientConnectionEventData = { + connected_at: Date; +} & ConnectedUserData; + +export type ClientConnectionBucketData = { + connected_at: Date; + /** parsed sdk version from the user agent. */ + sdk: string; +} & ConnectedUserData; + +export type ClientDisconnectionEventData = { + disconnected_at: Date; + connected_at: Date; +} & ConnectedUserData; + +/** client connection schema stored locally */ +export type ClientConnection = { + id?: string; + sdk: string; + user_agent: string; + client_id: string; + user_id: string; + jwt_exp?: Date; + connected_at: Date; + disconnected_at?: Date; +}; + +export type ClientConnectionReportResponse = { + users: number; + sdks: { + sdk: string; + users: number; + clients: number; + }[]; +}; + +export type ClientConnectionReportRequest = { + start: Date; + end: Date; +};