Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 50 additions & 10 deletions test-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,34 @@ This is a minimal client demonstrating direct usage of the HTTP stream sync API.

For a full implementation, see our client SDKs.

## Usage
## Setup

```sh
# In project root
1. Install dependencies on the project root

```shell
# In project root directory
pnpm install
pnpm build:packages
# In this folder
```

2. Build the test-client in the `test-client` directory

```shell
# In the test-client directory
pnpm build
node dist/bin.js fetch-operations --token <token> --endpoint http://localhost:8080
```

# More examples:
## Usage

### fetch-operations

The `fetch-operations` command downloads data for a single checkpoint, and outputs a normalized form: one CLEAR operation, followed by the latest PUT operation for each row. This normalized form is still split per bucket. The output is not affected by compacting, but can be affected by replication order.

To avoid normalizing the data, use the `--raw` option. This may include additional CLEAR, MOVE, REMOVE and duplicate PUT operations.

```sh
# If the endpoint is not available in the token aud field, add the --endpoint argument
node dist/bin.js fetch-operations --token <token> --endpoint http://localhost:8080

# If the endpoint is present in token aud field, it can be omitted from args:
node dist/bin.js fetch-operations --token <token>
Expand All @@ -29,12 +46,35 @@ node dist/bin.js fetch-operations --config path/to/powersync.yaml
node dist/bin.js fetch-operations --config path/to/powersync.yaml --sub test-user
```

The `fetch-operations` command downloads data for a single checkpoint, and outputs a normalized form: one CLEAR operation, followed by the latest PUT operation for each row. This normalized form is still split per bucket. The output is not affected by compacting, but can be affected by replication order.
### generate-token

To avoid normalizing the data, use the `--raw` option. This may include additional CLEAR, MOVE, REMOVE and duplicate PUT operations.

To generate a token without downloading data, use the `generate-token` command:
Used to generate a JWT token based on your current PowerSync YAML config.

```sh
node dist/bin.js generate-token --config path/to/powersync.yaml --sub test-user
```

### concurrent-connections

Use this command to simulate concurrent connections to a PowerSync instance. This can be used for performance benchmarking
and other load-testing use cases. There are two modes available, `websocket` or `http`. By default, the command uses the
`http` mode.

```shell
# Send two concurrent requests to request a download of sync operations using -n to specify the number of connections
node ./dist/bin.js concurrent-connections -n 2 -t <token>

# Send two concurrent requests to request a download of sync operations using websocket mode
node ./dist/bin.js concurrent-connections -n 2 -t <token> -m websocket
```

Once the sync has completed for a connection the command will print the `op_id`, `ops`, `bytes` and `duration`
each connection.

To see what rows are being synced, specify `--print=id`, or another relevant field. This will be included
in the output as a `data:` array with each checkpoint.

```shell
# Send two concurrent requests and print the name field, as an example.
node ./dist/bin.js concurrent-connections -n 2 -t <token> -p name
```
3 changes: 2 additions & 1 deletion test-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"@powersync/service-core": "workspace:*",
"commander": "^12.0.0",
"jose": "^4.15.1",
"undici": "^7.15.0",
"ws": "^8.18.0",
"yaml": "^2.5.0"
},
Expand All @@ -26,4 +27,4 @@
"@types/ws": "~8.2.0",
"typescript": "^5.7.3"
}
}
}
12 changes: 11 additions & 1 deletion test-client/src/bin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,20 @@ program
.option('-u, --sub [sub]', 'sub field for auto-generated token')
.option('-n, --num-clients [num-clients]', 'number of clients to connect')
.option('-m, --mode [mode]', 'http or websocket')
.option('-p, --print [print]', 'print a field from the data being downloaded')
.option('--once', 'stop after the first checkpoint')
.action(async (options) => {
const credentials = await getCredentials(options);

await concurrentConnections(credentials, options['numClients'] ?? 10, options.mode ?? 'http');
await concurrentConnections(
{
...credentials,
once: options.once ?? false,
mode: options.mode
},
options['numClients'] ?? 10,
options.print
);
});

await program.parseAsync();
2 changes: 1 addition & 1 deletion test-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export async function getCheckpointData(options: GetCheckpointOptions) {
let data: types.StreamingSyncData[] = [];
let checkpoint: types.StreamingSyncCheckpoint;

for await (let chunk of ndjsonStream<types.StreamingSyncLine>(response.body!)) {
for await (let { chunk } of ndjsonStream<types.StreamingSyncLine>(response.body!)) {
if (isStreamingSyncData(chunk)) {
// Collect data
data.push(chunk);
Expand Down
37 changes: 37 additions & 0 deletions test-client/src/httpStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { StreamingSyncLine, StreamingSyncRequest } from '@powersync/service-core';
import { Readable } from 'node:stream';
import { request } from 'undici';
import { ndjsonStream } from './ndjson.js';
import { StreamEvent, SyncOptions } from './stream.js';

export async function* openHttpStream(options: SyncOptions): AsyncGenerator<StreamEvent> {
const streamRequest: StreamingSyncRequest = {
raw_data: true,
client_id: options.clientId,
buckets: [...(options.bucketPositions ?? new Map()).entries()].map(([bucket, after]) => ({
name: bucket,
after: after
}))
};
const response = await request(options.endpoint + '/sync/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Token ${options.token}`
},
body: JSON.stringify(streamRequest),
signal: options.signal
});

if (response.statusCode != 200) {
throw new Error(`Request failed with code: ${response.statusCode}\n${await response.body.text()}`);
}

const stream = Readable.toWeb(response.body) as ReadableStream<Uint8Array>;

for await (let { chunk, size } of ndjsonStream<StreamingSyncLine>(stream)) {
yield { stats: { decodedBytes: size } };
yield chunk;
}
// If we reach this, the connection was closed without error by the server
}
59 changes: 0 additions & 59 deletions test-client/src/load-testing/http-worker.ts

This file was deleted.

65 changes: 65 additions & 0 deletions test-client/src/load-testing/load-test-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { parentPort, workerData } from 'worker_threads';
import { openStream, SyncOptions } from '../stream.js';

if (parentPort == null) {
throw new Error(`Can only run this script in a worker_thread`);
}

const { i, print } = workerData;
const request: SyncOptions = workerData.request;

let size = 0;
let numOperations = 0;
let lastCheckpointStart = 0;
let printData: string[] = [];

const parseChunk = (chunk: any) => {
if (print == null) {
return;
}
chunk.data.forEach((data: any) => {
if (data.op == 'PUT') {
const payload = JSON.parse(data.data);
printData.push(payload[print]);
}
});
};

for await (let chunk of openStream(request)) {
if ('error' in chunk) {
// Retried automatically
console.error(new Date().toISOString(), i, `Error in stream: ${chunk.error}`);
} else if ('checkpoint_complete' in chunk) {
const duration = performance.now() - lastCheckpointStart;
let message = `checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms`;
if (print) {
message += `, data: [${printData}]`;
}
printData = [];
console.log(new Date().toISOString(), i, message);

if (request.once) {
break;
}
} else if ('data' in chunk) {
parseChunk(chunk.data);
numOperations += chunk.data.data.length;
} else if ('checkpoint' in chunk) {
lastCheckpointStart = performance.now();
console.log(new Date().toISOString(), i, `checkpoint buckets: ${chunk.checkpoint.buckets.length}`);
} else if ('checkpoint_diff' in chunk) {
lastCheckpointStart = performance.now();
console.log(
new Date().toISOString(),
i,
`checkpoint_diff removed_buckets: ${chunk.checkpoint_diff.removed_buckets.length} updated_buckets: ${chunk.checkpoint_diff.updated_buckets.length}`
);
} else if ('stats' in chunk) {
size += chunk.stats.decodedBytes;
} else {
const key = Object.keys(chunk)[0];
if (key != 'token_expires_in' && key != 'data') {
console.log(new Date().toISOString(), i, key);
}
}
}
34 changes: 10 additions & 24 deletions test-client/src/load-testing/load-test.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,24 @@
import { Worker } from 'worker_threads';
import { Credentials } from '../auth.js';
import { SyncOptions } from '../stream.js';

export type Mode = 'http' | 'websocket';

export async function stream(i: number, credentials: Credentials, mode: Mode) {
const worker =
mode == 'websocket'
? new Worker(new URL('./rsocket-worker.js', import.meta.url), {
workerData: { i, token: credentials.token, url: credentials.endpoint.replace(/^http/, 'ws') }
})
: new Worker(new URL('./http-worker.js', import.meta.url), {
workerData: { i, token: credentials.token, url: credentials.endpoint }
});
export async function stream(i: number, request: SyncOptions, print: string | undefined) {
const worker = new Worker(new URL('./load-test-worker.js', import.meta.url), {
workerData: { i, request, print }
});
await new Promise((resolve, reject) => {
worker.on('message', (event) => resolve(event));
worker.on('error', (err) => reject(err));
worker.on('exit', (__code) => {
resolve(null);
});
});
worker.terminate();
}

export async function streamForever(i: number, credentials: Credentials, mode: Mode) {
while (true) {
try {
await stream(i, credentials, mode);
console.log(new Date().toISOString(), i, 'Stream ended');
} catch (e) {
console.error(new Date().toISOString(), i, e.message);
await new Promise((resolve) => setTimeout(resolve, 1000 + Math.random()));
}
}
}

export async function concurrentConnections(credentials: Credentials, numClients: number, mode: Mode) {
export async function concurrentConnections(options: SyncOptions, numClients: number, print: string | undefined) {
for (let i = 0; i < numClients; i++) {
streamForever(i, credentials, mode);
stream(i, { ...options, clientId: options.clientId ?? `load-test-${i}` }, print);
}
}
Loading