Skip to content

Commit f83e68d

Browse files
committed
Tweak default print behavior.
1 parent c5888ab commit f83e68d

File tree

5 files changed

+52
-35
lines changed

5 files changed

+52
-35
lines changed

test-client/README.md

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@ This is a minimal client demonstrating direct usage of the HTTP stream sync API.
55
For a full implementation, see our client SDKs.
66

77
## Setup
8+
89
1. Install dependencies on the project root
10+
911
```shell
1012
# In project root directory
1113
pnpm install
1214
pnpm build:packages
1315
```
16+
1417
2. Build the test-client in the `test-client` directory
18+
1519
```shell
1620
# In the test-client directory
1721
pnpm build
@@ -20,6 +24,7 @@ pnpm build
2024
## Usage
2125

2226
### fetch-operations
27+
2328
The `fetch-operations` command downloads data for a single checkpoint, and outputs a normalized form: one CLEAR operation, followed by the latest PUT operation for each row. This normalized form is still split per bucket. The output is not affected by compacting, but can be affected by replication order.
2429

2530
To avoid normalizing the data, use the `--raw` option. This may include additional CLEAR, MOVE, REMOVE and duplicate PUT operations.
@@ -57,18 +62,19 @@ and other load-testing use cases. There are two modes available, `websocket` or
5762

5863
```shell
5964
# Send two concurrent requests to request a download of sync operations using -n to specify the number of connections
60-
node ./dist/bin.js concurrent-connections -n 2 -t <token>
65+
node ./dist/bin.js concurrent-connections -n 2 -t <token>
6166

6267
# Send two concurrent requests to request a download of sync operations using websocket mode
6368
node ./dist/bin.js concurrent-connections -n 2 -t <token> -m websocket
6469
```
6570

66-
Once the sync has completed for a connection the command will print the `op_id`, `ops`, `bytes`, `duration` and `data` for
67-
each connection. By default, the `data` value will be an array id's of the rows synced from the source database, but you can
68-
specify an additional argument if you want to print a specific field from the data being synced.
71+
Once the sync has completed for a connection the command will print the `op_id`, `ops`, `bytes` and `duration`
72+
each connection.
73+
74+
To see what rows are being synced, specify `--print=id`, or another relevant field. This will be included
75+
in the output as a `data:` array with each checkpoint.
6976

7077
```shell
7178
# Send two concurrent requests and print the name field, as an example.
7279
node ./dist/bin.js concurrent-connections -n 2 -t <token> -p name
7380
```
74-

test-client/src/bin.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ program
4444
.action(async (options) => {
4545
const credentials = await getCredentials(options);
4646

47-
await concurrentConnections(credentials, options['numClients'] ?? 10, options.mode ?? 'http', options.print ?? "id");
47+
await concurrentConnections(credentials, options['numClients'] ?? 10, options.mode ?? 'http', options.print);
4848
});
4949

5050
await program.parseAsync();

test-client/src/load-testing/http-worker.ts

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@ let lastCheckpointStart = 0;
1414
let printData: string[] = [];
1515

1616
const parseChunk = (chunk: any) => {
17+
if (print == null) {
18+
return;
19+
}
1720
chunk.data.forEach((data: any) => {
18-
if(data.op == "MOVE") {
19-
return;
21+
if (data.op == 'PUT') {
22+
const payload = JSON.parse(data.data);
23+
printData.push(payload[print]);
2024
}
21-
const payload = JSON.parse(data.data);
22-
printData.push(payload[print]);
23-
})
24-
}
25+
});
26+
};
2527

2628
const response = await fetch(url + '/sync/stream', {
2729
method: 'POST',
@@ -43,11 +45,12 @@ for await (let chunk of ndjsonStream<any>(response.body)) {
4345
size += JSON.stringify(chunk).length;
4446
if (chunk?.checkpoint_complete) {
4547
const duration = performance.now() - lastCheckpointStart;
46-
console.log(
47-
new Date().toISOString(),
48-
i,
49-
`checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms, data: [${printData}]`
50-
);
48+
let message = `checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms`;
49+
if (print) {
50+
message += `, data: [${printData}]`;
51+
}
52+
printData = [];
53+
console.log(new Date().toISOString(), i, message);
5154
} else if (chunk?.data) {
5255
parseChunk(chunk.data);
5356
numOperations += chunk.data.data.length;

test-client/src/load-testing/load-test.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,23 @@ import { Credentials } from '../auth.js';
33

44
export type Mode = 'http' | 'websocket';
55

6-
export async function stream(i: number, credentials: Credentials, mode: Mode, print: string) {
6+
export async function stream(i: number, credentials: Credentials, mode: Mode, print: string | undefined) {
77
const worker =
88
mode == 'websocket'
99
? new Worker(new URL('./rsocket-worker.js', import.meta.url), {
10-
workerData: { i, token: credentials.token, url: credentials.endpoint.replace(/^http/, 'ws'), print }
11-
})
10+
workerData: { i, token: credentials.token, url: credentials.endpoint.replace(/^http/, 'ws'), print }
11+
})
1212
: new Worker(new URL('./http-worker.js', import.meta.url), {
13-
workerData: { i, token: credentials.token, url: credentials.endpoint, print }
14-
});
13+
workerData: { i, token: credentials.token, url: credentials.endpoint, print }
14+
});
1515
await new Promise((resolve, reject) => {
1616
worker.on('message', (event) => resolve(event));
1717
worker.on('error', (err) => reject(err));
1818
});
1919
worker.terminate();
2020
}
2121

22-
export async function streamForever(i: number, credentials: Credentials, mode: Mode, print: string) {
22+
export async function streamForever(i: number, credentials: Credentials, mode: Mode, print: string | undefined) {
2323
while (true) {
2424
try {
2525
await stream(i, credentials, mode, print);
@@ -31,7 +31,12 @@ export async function streamForever(i: number, credentials: Credentials, mode: M
3131
}
3232
}
3333

34-
export async function concurrentConnections(credentials: Credentials, numClients: number, mode: Mode, print: string) {
34+
export async function concurrentConnections(
35+
credentials: Credentials,
36+
numClients: number,
37+
mode: Mode,
38+
print: string | undefined
39+
) {
3540
for (let i = 0; i < numClients; i++) {
3641
streamForever(i, credentials, mode, print);
3742
}

test-client/src/load-testing/rsocket-worker.ts

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,12 @@ const stream = rsocket.requestStream(
7373

7474
if (chunk?.checkpoint_complete) {
7575
const duration = performance.now() - lastCheckpointStart;
76-
console.log(
77-
new Date().toISOString(),
78-
i,
79-
`checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms, data: [${printData}]`
80-
);
76+
let message = `checkpoint_complete op_id: ${chunk.checkpoint_complete.last_op_id}, ops: ${numOperations}, bytes: ${size}, duration: ${duration.toFixed(0)}ms`;
77+
if (print) {
78+
message += `, data: [${printData}]`;
79+
}
80+
console.log(new Date().toISOString(), i, message);
81+
printData = [];
8182
} else if (chunk?.data) {
8283
parseChunk(chunk.data);
8384
numOperations += chunk.data.data.length;
@@ -112,11 +113,13 @@ const stream = rsocket.requestStream(
112113
);
113114

114115
const parseChunk = (chunk: any) => {
116+
if (print == null) {
117+
return;
118+
}
115119
chunk.data.forEach((data: any) => {
116-
if(data.op == "MOVE") {
117-
return;
120+
if (data.op == 'PUT') {
121+
const payload = JSON.parse(data.data);
122+
printData.push(payload[print]);
118123
}
119-
const payload = JSON.parse(data.data);
120-
printData.push(payload[print]);
121-
})
122-
}
124+
});
125+
};

0 commit comments

Comments
 (0)