Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions test-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,29 @@ This is a minimal client demonstrating direct usage of the HTTP stream sync API.

For a full implementation, see our client SDKs.

## Usage

```sh
# In project root
## Setup
1. Install dependencies on the project root
```shell
# In project root directory
pnpm install
pnpm build:packages
# In this folder
```
2. Build the test-client in the `test-client` directory
```shell
# In the test-client directory
pnpm build
node dist/bin.js fetch-operations --token <token> --endpoint http://localhost:8080
```

# More examples:
## Usage

### fetch-operations
The `fetch-operations` command downloads data for a single checkpoint, and outputs a normalized form: one CLEAR operation, followed by the latest PUT operation for each row. This normalized form is still split per bucket. The output is not affected by compacting, but can be affected by replication order.

To avoid normalizing the data, use the `--raw` option. This may include additional CLEAR, MOVE, REMOVE and duplicate PUT operations.

```sh
# If the endpoint is not available in the token aud field, add the --endpoint argument
node dist/bin.js fetch-operations --token <token> --endpoint http://localhost:8080

# If the endpoint is present in token aud field, it can be omitted from args:
node dist/bin.js fetch-operations --token <token>
Expand All @@ -29,12 +41,34 @@ node dist/bin.js fetch-operations --config path/to/powersync.yaml
node dist/bin.js fetch-operations --config path/to/powersync.yaml --sub test-user
```

The `fetch-operations` command downloads data for a single checkpoint, and outputs a normalized form: one CLEAR operation, followed by the latest PUT operation for each row. This normalized form is still split per bucket. The output is not affected by compacting, but can be affected by replication order.

To avoid normalizing the data, use the `--raw` option. This may include additional CLEAR, MOVE, REMOVE and duplicate PUT operations.
### generate-token

To generate a token without downloading data, use the `generate-token` command:
Used to generate a JWT token based on your current PowerSync YAML config.

```sh
node dist/bin.js generate-token --config path/to/powersync.yaml --sub test-user
```

### concurrent-connections

Use this command to simulate concurrent connections to a PowerSync instance. This can be used for performance benchmarking
and other load-testing use cases. There are two modes available, `websocket` or `http`. By default, the command uses the
`http` mode.

```shell
# Send two concurrent requests to request a download of sync operations using -n to specify the number of connections
node ./dist/bin.js concurrent-connections -n 2 -t <token>

# Send two concurrent requests to request a download of sync operations using websocket mode
node ./dist/bin.js concurrent-connections -n 2 -t <token> -m websocket
```

Once the sync has completed for a connection the command will print the `op_id`, `ops`, `bytes`, `duration` and `data` for
each connection. By default, the `data` value will be an array id's of the rows synced from the source database, but you can
specify an additional argument if you want to print a specific field from the data being synced.

```shell
# Send two concurrent requests and print the name field, as an example.
node ./dist/bin.js concurrent-connections -n 2 -t <token> -p name
```

3 changes: 2 additions & 1 deletion test-client/src/bin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ program
.option('-u, --sub [sub]', 'sub field for auto-generated token')
.option('-n, --num-clients [num-clients]', 'number of clients to connect')
.option('-m, --mode [mode]', 'http or websocket')
.option('-p, --print [print]', 'print a field from the data being downloaded')
.action(async (options) => {
const credentials = await getCredentials(options);

await concurrentConnections(credentials, options['numClients'] ?? 10, options.mode ?? 'http');
await concurrentConnections(credentials, options['numClients'] ?? 10, options.mode ?? 'http', options.print ?? "id");
});

await program.parseAsync();
25 changes: 19 additions & 6 deletions test-client/src/load-testing/http-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,22 @@ if (parentPort == null) {
throw new Error(`Can only run this script in a worker_thread`);
}

const { i, url, token } = workerData;
const { i, url, token, print } = workerData;

let size = 0;
let numOperations = 0;
let lastCheckpointStart = 0;
let printData: string[] = [];

const parseChunk = (chunk: any) => {
chunk.data.forEach((data: any) => {
if(data.op == "MOVE") {
return;
}
const payload = JSON.parse(data.data);
printData.push(payload[print]);
})
}

const response = await fetch(url + '/sync/stream', {
method: 'POST',
Expand All @@ -19,24 +34,22 @@ const response = await fetch(url + '/sync/stream', {
include_checksums: true
})
});

if (!response.ok || response.body == null) {
throw new Error(response.statusText + '\n' + (await response.text()));
}

let size = 0;
let numOperations = 0;
let lastCheckpointStart = 0;

for await (let chunk of ndjsonStream<any>(response.body)) {
size += JSON.stringify(chunk).length;
if (chunk?.checkpoint_complete) {
const duration = performance.now() - lastCheckpointStart;
console.log(
new Date().toISOString(),
i,
`checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms`
`checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms, data: [${printData}]`
);
} else if (chunk?.data) {
parseChunk(chunk.data);
numOperations += chunk.data.data.length;
} else if (chunk?.checkpoint) {
lastCheckpointStart = performance.now();
Expand Down
20 changes: 10 additions & 10 deletions test-client/src/load-testing/load-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,36 @@ import { Credentials } from '../auth.js';

export type Mode = 'http' | 'websocket';

export async function stream(i: number, credentials: Credentials, mode: Mode) {
export async function stream(i: number, credentials: Credentials, mode: Mode, print: string) {
const worker =
mode == 'websocket'
? new Worker(new URL('./rsocket-worker.js', import.meta.url), {
workerData: { i, token: credentials.token, url: credentials.endpoint.replace(/^http/, 'ws') }
})
workerData: { i, token: credentials.token, url: credentials.endpoint.replace(/^http/, 'ws'), print }
})
: new Worker(new URL('./http-worker.js', import.meta.url), {
workerData: { i, token: credentials.token, url: credentials.endpoint }
});
workerData: { i, token: credentials.token, url: credentials.endpoint, print }
});
await new Promise((resolve, reject) => {
worker.on('message', (event) => resolve(event));
worker.on('error', (err) => reject(err));
});
worker.terminate();
}

export async function streamForever(i: number, credentials: Credentials, mode: Mode) {
export async function streamForever(i: number, credentials: Credentials, mode: Mode, print: string) {
while (true) {
try {
await stream(i, credentials, mode);
await stream(i, credentials, mode, print);
console.log(new Date().toISOString(), i, 'Stream ended');
} catch (e) {
} catch (e: any) {
console.error(new Date().toISOString(), i, e.message);
await new Promise((resolve) => setTimeout(resolve, 1000 + Math.random()));
}
}
}

export async function concurrentConnections(credentials: Credentials, numClients: number, mode: Mode) {
export async function concurrentConnections(credentials: Credentials, numClients: number, mode: Mode, print: string) {
for (let i = 0; i < numClients; i++) {
streamForever(i, credentials, mode);
streamForever(i, credentials, mode, print);
}
}
44 changes: 40 additions & 4 deletions test-client/src/load-testing/rsocket-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ if (parentPort == null) {
throw new Error(`Can only run this script in a worker_thread`);
}

const { i, url, token } = workerData;
const { i, url, token, print } = workerData;

const client = new RSocketConnector({
transport: new WebsocketClientTransport({
url,
wsCreator: (url) => {
wsCreator: (url: string) => {
return new WebSocket(url) as any;
}
}),
Expand All @@ -40,6 +40,9 @@ const SYNC_QUEUE_REQUEST_N = 2;

let pendingEventsCount = SYNC_QUEUE_REQUEST_N;
let size = 0;
let numOperations = 0;
let lastCheckpointStart = 0;
let printData: string[] = [];

const stream = rsocket.requestStream(
{
Expand All @@ -57,6 +60,7 @@ const stream = rsocket.requestStream(
},
onNext: (payload) => {
const { data } = payload;

// Less events are now pending
pendingEventsCount--;
if (!data) {
Expand All @@ -66,10 +70,32 @@ const stream = rsocket.requestStream(
size += data.byteLength;

const chunk = deserialize(data);

if (chunk?.checkpoint_complete) {
console.log(new Date().toISOString(), i, 'checkpoint', chunk.checkpoint_complete.last_op_id, size);
const duration = performance.now() - lastCheckpointStart;
console.log(
new Date().toISOString(),
i,
`checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms, data: [${printData}]`
);
} else if (chunk?.data) {
parseChunk(chunk.data);
numOperations += chunk.data.data.length;
} else if (chunk?.checkpoint) {
lastCheckpointStart = performance.now();
console.log(new Date().toISOString(), i, `checkpoint buckets: ${chunk.checkpoint.buckets.length}`);
} else if (chunk?.checkpoint_diff) {
lastCheckpointStart = performance.now();
console.log(
new Date().toISOString(),
i,
`checkpoint_diff removed_buckets: ${chunk.checkpoint_diff.removed_buckets.length} updated_buckets: ${chunk.checkpoint_diff.updated_buckets.length}`
);
} else {
console.log(new Date().toISOString(), i, Object.keys(chunk)[0]);
const key = Object.keys(chunk)[0];
if (key != 'token_expires_in' && key != 'data') {
console.log(new Date().toISOString(), i, key);
}
}

const required = SYNC_QUEUE_REQUEST_N - pendingEventsCount;
Expand All @@ -84,3 +110,13 @@ const stream = rsocket.requestStream(
onExtension: () => {}
}
);

const parseChunk = (chunk: any) => {
chunk.data.forEach((data: any) => {
if(data.op == "MOVE") {
return;
}
const payload = JSON.parse(data.data);
printData.push(payload[print]);
})
}