Skip to content

Commit f6fb1c4

Browse files
committed
Rewrite concurrent-connections clients - now resumeable.
1 parent f83e68d commit f6fb1c4

File tree

11 files changed

+268
-188
lines changed

11 files changed

+268
-188
lines changed

pnpm-lock.yaml

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test-client/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"@powersync/service-core": "workspace:*",
1919
"commander": "^12.0.0",
2020
"jose": "^4.15.1",
21+
"undici": "^7.15.0",
2122
"ws": "^8.18.0",
2223
"yaml": "^2.5.0"
2324
},
@@ -26,4 +27,4 @@
2627
"@types/ws": "~8.2.0",
2728
"typescript": "^5.7.3"
2829
}
29-
}
30+
}

test-client/src/bin.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,19 @@ program
4141
.option('-n, --num-clients [num-clients]', 'number of clients to connect')
4242
.option('-m, --mode [mode]', 'http or websocket')
4343
.option('-p, --print [print]', 'print a field from the data being downloaded')
44+
.option('--once', 'stop after the first checkpoint')
4445
.action(async (options) => {
4546
const credentials = await getCredentials(options);
4647

47-
await concurrentConnections(credentials, options['numClients'] ?? 10, options.mode ?? 'http', options.print);
48+
await concurrentConnections(
49+
{
50+
...credentials,
51+
once: options.once ?? false,
52+
mode: options.mode
53+
},
54+
options['numClients'] ?? 10,
55+
options.print
56+
);
4857
});
4958

5059
await program.parseAsync();

test-client/src/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export async function getCheckpointData(options: GetCheckpointOptions) {
2929
let data: types.StreamingSyncData[] = [];
3030
let checkpoint: types.StreamingSyncCheckpoint;
3131

32-
for await (let chunk of ndjsonStream<types.StreamingSyncLine>(response.body!)) {
32+
for await (let { chunk } of ndjsonStream<types.StreamingSyncLine>(response.body!)) {
3333
if (isStreamingSyncData(chunk)) {
3434
// Collect data
3535
data.push(chunk);

test-client/src/httpStream.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import type { StreamingSyncLine, StreamingSyncRequest } from '@powersync/service-core';
2+
import { Readable } from 'node:stream';
3+
import { request } from 'undici';
4+
import { ndjsonStream } from './ndjson.js';
5+
import { StreamEvent, SyncOptions } from './stream.js';
6+
7+
export async function* openHttpStream(options: SyncOptions): AsyncGenerator<StreamEvent> {
8+
const streamRequest: StreamingSyncRequest = {
9+
raw_data: true,
10+
client_id: options.clientId,
11+
buckets: [...(options.bucketPositions ?? new Map()).entries()].map(([bucket, after]) => ({
12+
name: bucket,
13+
after: after
14+
}))
15+
};
16+
const response = await request(options.endpoint + '/sync/stream', {
17+
method: 'POST',
18+
headers: {
19+
'Content-Type': 'application/json',
20+
Authorization: `Token ${options.token}`
21+
},
22+
body: JSON.stringify(streamRequest),
23+
signal: options.signal
24+
});
25+
26+
if (response.statusCode != 200) {
27+
throw new Error(`Request failed with code: ${response.statusCode}\n${await response.body.text()}`);
28+
}
29+
30+
const stream = Readable.toWeb(response.body) as ReadableStream<Uint8Array>;
31+
32+
for await (let { chunk, size } of ndjsonStream<StreamingSyncLine>(stream)) {
33+
yield { stats: { decodedBytes: size } };
34+
yield chunk;
35+
}
36+
// If we reach this, the connection was closed without error by the server
37+
}
Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import { ndjsonStream } from '../ndjson.js';
2-
31
import { parentPort, workerData } from 'worker_threads';
2+
import { openStream, SyncOptions } from '../stream.js';
43

54
if (parentPort == null) {
65
throw new Error(`Can only run this script in a worker_thread`);
76
}
87

9-
const { i, url, token, print } = workerData;
8+
const { i, print } = workerData;
9+
const request: SyncOptions = workerData.request;
1010

1111
let size = 0;
1212
let numOperations = 0;
@@ -25,51 +25,41 @@ const parseChunk = (chunk: any) => {
2525
});
2626
};
2727

28-
const response = await fetch(url + '/sync/stream', {
29-
method: 'POST',
30-
headers: {
31-
'Content-Type': 'application/json',
32-
Authorization: `Token ${token}`
33-
},
34-
body: JSON.stringify({
35-
raw_data: true,
36-
include_checksums: true
37-
})
38-
});
39-
40-
if (!response.ok || response.body == null) {
41-
throw new Error(response.statusText + '\n' + (await response.text()));
42-
}
43-
44-
for await (let chunk of ndjsonStream<any>(response.body)) {
45-
size += JSON.stringify(chunk).length;
46-
if (chunk?.checkpoint_complete) {
28+
for await (let chunk of openStream(request)) {
29+
if ('error' in chunk) {
30+
// Retried automatically
31+
console.error(new Date().toISOString(), i, `Error in stream: ${chunk.error}`);
32+
} else if ('checkpoint_complete' in chunk) {
4733
const duration = performance.now() - lastCheckpointStart;
4834
let message = `checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms`;
4935
if (print) {
5036
message += `, data: [${printData}]`;
5137
}
5238
printData = [];
5339
console.log(new Date().toISOString(), i, message);
54-
} else if (chunk?.data) {
40+
41+
if (request.once) {
42+
break;
43+
}
44+
} else if ('data' in chunk) {
5545
parseChunk(chunk.data);
5646
numOperations += chunk.data.data.length;
57-
} else if (chunk?.checkpoint) {
47+
} else if ('checkpoint' in chunk) {
5848
lastCheckpointStart = performance.now();
5949
console.log(new Date().toISOString(), i, `checkpoint buckets: ${chunk.checkpoint.buckets.length}`);
60-
} else if (chunk?.checkpoint_diff) {
50+
} else if ('checkpoint_diff' in chunk) {
6151
lastCheckpointStart = performance.now();
6252
console.log(
6353
new Date().toISOString(),
6454
i,
6555
`checkpoint_diff removed_buckets: ${chunk.checkpoint_diff.removed_buckets.length} updated_buckets: ${chunk.checkpoint_diff.updated_buckets.length}`
6656
);
57+
} else if ('stats' in chunk) {
58+
size += chunk.stats.decodedBytes;
6759
} else {
6860
const key = Object.keys(chunk)[0];
6961
if (key != 'token_expires_in' && key != 'data') {
7062
console.log(new Date().toISOString(), i, key);
7163
}
7264
}
7365
}
74-
75-
parentPort.postMessage({ done: true });
Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,24 @@
11
import { Worker } from 'worker_threads';
2-
import { Credentials } from '../auth.js';
2+
import { SyncOptions } from '../stream.js';
33

44
export type Mode = 'http' | 'websocket';
55

6-
export async function stream(i: number, credentials: Credentials, mode: Mode, print: string | undefined) {
7-
const worker =
8-
mode == 'websocket'
9-
? new Worker(new URL('./rsocket-worker.js', import.meta.url), {
10-
workerData: { i, token: credentials.token, url: credentials.endpoint.replace(/^http/, 'ws'), print }
11-
})
12-
: new Worker(new URL('./http-worker.js', import.meta.url), {
13-
workerData: { i, token: credentials.token, url: credentials.endpoint, print }
14-
});
6+
export async function stream(i: number, request: SyncOptions, print: string | undefined) {
7+
const worker = new Worker(new URL('./load-test-worker.js', import.meta.url), {
8+
workerData: { i, request, print }
9+
});
1510
await new Promise((resolve, reject) => {
1611
worker.on('message', (event) => resolve(event));
1712
worker.on('error', (err) => reject(err));
13+
worker.on('exit', (__code) => {
14+
resolve(null);
15+
});
1816
});
1917
worker.terminate();
2018
}
2119

22-
export async function streamForever(i: number, credentials: Credentials, mode: Mode, print: string | undefined) {
23-
while (true) {
24-
try {
25-
await stream(i, credentials, mode, print);
26-
console.log(new Date().toISOString(), i, 'Stream ended');
27-
} catch (e: any) {
28-
console.error(new Date().toISOString(), i, e.message);
29-
await new Promise((resolve) => setTimeout(resolve, 1000 + Math.random()));
30-
}
31-
}
32-
}
33-
34-
export async function concurrentConnections(
35-
credentials: Credentials,
36-
numClients: number,
37-
mode: Mode,
38-
print: string | undefined
39-
) {
20+
export async function concurrentConnections(options: SyncOptions, numClients: number, print: string | undefined) {
4021
for (let i = 0; i < numClients; i++) {
41-
streamForever(i, credentials, mode, print);
22+
stream(i, { ...options, clientId: options.clientId ?? `load-test-${i}` }, print);
4223
}
4324
}

test-client/src/load-testing/rsocket-worker.ts

Lines changed: 0 additions & 125 deletions
This file was deleted.

0 commit comments

Comments
 (0)