Skip to content

Commit f9de3ba

Browse files
Merge pull request #229 from EventStore/batch-append
Batch append
2 parents bc12b99 + 2249503 commit f9de3ba

File tree

13 files changed

+921
-100
lines changed

13 files changed

+921
-100
lines changed

src/Client/ServerFeatures.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ export class ServerFeatures {
6060
: this.#supported.has(path);
6161

6262
if (isSupported) {
63-
debug.connection("%s %s is Supported", path, feature);
63+
debug.connection("%s %s is Supported", path, feature ?? "");
6464
} else {
65-
debug.connection("%s %s is not Supported", path, feature);
65+
debug.connection("%s %s is not Supported", path, feature ?? "");
6666
}
6767

6868
return isSupported;

src/Client/index.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,12 +305,22 @@ export class Client {
305305
<Client extends GRPCClient, T extends Stream>(
306306
Client: GRPCClientConstructor<Client>,
307307
debugName: string,
308-
creator: (client: Client) => T | Promise<T>
308+
creator: (client: Client) => T | Promise<T>,
309+
cache?: WeakMap<Client, T>
309310
) =>
310311
async (): Promise<T> => {
311312
const client = await this.getGRPCClient(Client, debugName);
313+
314+
if (cache && cache.has(client)) return cache.get(client)!;
315+
312316
const stream = await creator(client);
313-
return stream.on("error", (err) => this.handleError(client, err));
317+
318+
cache?.set(client, stream);
319+
320+
return stream.on("error", (err) => {
321+
cache?.delete(client);
322+
this.handleError(client, err);
323+
});
314324
};
315325

316326
// Internal handled execution

src/__test__/connection/reconnect.test.ts

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ describe("reconnect", () => {
2626
// make successful append to connect to node
2727
const firstAppend = await client.appendToStream(
2828
"my_stream",
29-
jsonEvent({ type: "first-append", data: { message: "test" } })
29+
jsonEvent({ type: "first-append", data: { message: "test" } }),
30+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
31+
{ credentials: { username: "admin", password: "changeit" } }
3032
);
3133
expect(firstAppend).toBeDefined();
3234

@@ -38,7 +40,9 @@ describe("reconnect", () => {
3840
try {
3941
const secondAppend = await client.appendToStream(
4042
"my_stream",
41-
jsonEvent({ type: "failed-append", data: { message: "test" } })
43+
jsonEvent({ type: "failed-append", data: { message: "test" } }),
44+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
45+
{ credentials: { username: "admin", password: "changeit" } }
4246
);
4347
expect(secondAppend).toBe("Unreachable");
4448
} catch (error) {
@@ -52,7 +56,9 @@ describe("reconnect", () => {
5256
try {
5357
const reconnectedAppend = await client.appendToStream(
5458
"my_stream",
55-
jsonEvent({ type: "reconnect-append", data: { message: "test" } })
59+
jsonEvent({ type: "reconnect-append", data: { message: "test" } }),
60+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
61+
{ credentials: { username: "admin", password: "changeit" } }
5662
);
5763
expect(reconnectedAppend).toBeDefined();
5864
break;
@@ -79,7 +85,9 @@ describe("reconnect", () => {
7985
// make successful append to follower node
8086
const firstAppend = await client.appendToStream(
8187
"my_stream",
82-
jsonEvent({ type: "first-append", data: { message: "test" } })
88+
jsonEvent({ type: "first-append", data: { message: "test" } }),
89+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
90+
{ credentials: { username: "admin", password: "changeit" } }
8391
);
8492
expect(firstAppend).toBeDefined();
8593

@@ -89,7 +97,11 @@ describe("reconnect", () => {
8997
const secondAppend = await client.appendToStream(
9098
"my_stream",
9199
jsonEvent({ type: "failed-append", data: { message: "test" } }),
92-
{ requiresLeader: true }
100+
{
101+
requiresLeader: true,
102+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
103+
credentials: { username: "admin", password: "changeit" },
104+
}
93105
);
94106
expect(secondAppend).toBe("Unreachable");
95107
} catch (error) {
@@ -104,7 +116,11 @@ describe("reconnect", () => {
104116
const reconnectedAppend = await client.appendToStream(
105117
"my_stream",
106118
jsonEvent({ type: "reconnect-append", data: { message: "test" } }),
107-
{ requiresLeader: true }
119+
{
120+
requiresLeader: true,
121+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
122+
credentials: { username: "admin", password: "changeit" },
123+
}
108124
);
109125
expect(reconnectedAppend).toBeDefined();
110126

@@ -129,7 +145,9 @@ describe("reconnect", () => {
129145
// make successful append of 2000 events to node
130146
const firstAppend = await client.appendToStream(
131147
"my_stream",
132-
jsonTestEvents(2000)
148+
jsonTestEvents(2000),
149+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
150+
{ credentials: { username: "admin", password: "changeit" } }
133151
);
134152
expect(firstAppend).toBeDefined();
135153

@@ -159,7 +177,8 @@ describe("reconnect", () => {
159177
try {
160178
const reconnectedAppend = await client.appendToStream(
161179
"my_stream",
162-
jsonEvent({ type: "reconnect-append", data: { message: "test" } })
180+
jsonEvent({ type: "reconnect-append", data: { message: "test" } }), // batch append triggers reconnect as soon as stream drops, so we need to force regular append
181+
{ credentials: { username: "admin", password: "changeit" } }
163182
);
164183
expect(reconnectedAppend).toBeDefined();
165184
break;
@@ -186,7 +205,9 @@ describe("reconnect", () => {
186205
// make successful append to connect to node
187206
const firstAppend = await client.appendToStream(
188207
"my_stream",
189-
jsonEvent({ type: "first-append", data: { message: "test" } })
208+
jsonEvent({ type: "first-append", data: { message: "test" } }),
209+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
210+
{ credentials: { username: "admin", password: "changeit" } }
190211
);
191212
expect(firstAppend).toBeDefined();
192213

@@ -199,7 +220,9 @@ describe("reconnect", () => {
199220
try {
200221
const secondAppend = await client.appendToStream(
201222
"my_stream",
202-
jsonEvent({ type: "failed-append", data: { message: "test" } })
223+
jsonEvent({ type: "failed-append", data: { message: "test" } }),
224+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
225+
{ credentials: { username: "admin", password: "changeit" } }
203226
);
204227
expect(secondAppend).toBe("Unreachable");
205228
} catch (error) {
@@ -210,7 +233,9 @@ describe("reconnect", () => {
210233
try {
211234
const secondAppend = await client.appendToStream(
212235
"my_stream",
213-
jsonEvent({ type: "failed-append", data: { message: "test" } })
236+
jsonEvent({ type: "failed-append", data: { message: "test" } }),
237+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
238+
{ credentials: { username: "admin", password: "changeit" } }
214239
);
215240
expect(secondAppend).toBe("Unreachable");
216241
} catch (error) {
@@ -229,7 +254,9 @@ describe("reconnect", () => {
229254
try {
230255
const reconnectedAppend = await client.appendToStream(
231256
"my_stream",
232-
jsonEvent({ type: "reconnect-append", data: { message: "test" } })
257+
jsonEvent({ type: "reconnect-append", data: { message: "test" } }),
258+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
259+
{ credentials: { username: "admin", password: "changeit" } }
233260
);
234261
expect(reconnectedAppend).toBeDefined();
235262
break;
@@ -257,7 +284,9 @@ describe("reconnect", () => {
257284
// make successful append to connect to node
258285
const firstAppend = await client.appendToStream(
259286
"my_stream",
260-
jsonEvent({ type: "first-append", data: { message: "test" } })
287+
jsonEvent({ type: "first-append", data: { message: "test" } }),
288+
// batch append triggers reconnect as soon as stream drops, so we need to force regular append
289+
{ credentials: { username: "admin", password: "changeit" } }
261290
);
262291
expect(firstAppend).toBeDefined();
263292

src/__test__/extra/http2-assertion-failure.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,7 @@ describe("http2 assertion failure", () => {
1414
beforeAll(async () => {
1515
await node.up();
1616

17-
client = new EventStoreDBClient(
18-
{ endpoint: node.uri },
19-
{ insecure: true },
20-
{ username: "admin", password: "changeit" }
21-
);
17+
client = new EventStoreDBClient({ endpoint: node.uri }, { insecure: true });
2218
});
2319

2420
afterAll(async () => {
@@ -33,6 +29,8 @@ describe("http2 assertion failure", () => {
3329

3430
const appendRes = await client.appendToStream(stream, priorEvents, {
3531
expectedRevision: NO_STREAM,
32+
// we want to test classic append
33+
credentials: { username: "admin", password: "changeit" },
3634
});
3735

3836
const received: ResolvedEvent[] = [];
@@ -52,6 +50,8 @@ describe("http2 assertion failure", () => {
5250
while (received.length < 3) await delay(10);
5351
await client.appendToStream(stream, postEvents, {
5452
expectedRevision: appendRes.nextExpectedRevision,
53+
// we want to test classic append
54+
credentials: { username: "admin", password: "changeit" },
5555
});
5656

5757
while (received.length < 10) await delay(10);
Lines changed: 81 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,96 @@
1-
import { createTestNode, delay, jsonTestEvents } from "@test-utils";
1+
/** @jest-environment ./src/__test__/utils/enableVersionCheck.ts */
2+
3+
import {
4+
createTestNode,
5+
delay,
6+
jsonTestEvents,
7+
matchServerVersion,
8+
optionalTest,
9+
} from "@test-utils";
210
import { EventStoreDBClient, UnavailableError } from "@eventstore/db-client";
311

4-
describe("write after end", () => {
5-
const node = createTestNode();
6-
let client!: EventStoreDBClient;
12+
// These tests can take time.
13+
jest.setTimeout(120_000);
714

8-
beforeAll(async () => {
15+
describe("write after end", () => {
16+
test("Should not write after end", async () => {
17+
const node = createTestNode();
918
await node.up();
10-
client = new EventStoreDBClient(
19+
20+
const client = new EventStoreDBClient(
1121
{ endpoint: node.uri },
12-
{ rootCertificate: node.rootCertificate },
13-
{ username: "admin", password: "changeit" }
22+
{ rootCertificate: node.rootCertificate }
1423
);
15-
});
1624

17-
afterAll(async () => {
18-
await node.down();
19-
});
20-
21-
test("Should not write after end", async () => {
2225
const STREAM_NAME = "json_stream_name";
23-
await client.appendToStream(STREAM_NAME, jsonTestEvents());
24-
25-
client
26-
.appendToStream(STREAM_NAME, jsonTestEvents(100_000))
27-
.then((result) => {
28-
expect(result).toBe("unreachable");
29-
})
30-
.catch((e) => {
31-
expect(e).toBeInstanceOf(UnavailableError);
26+
await client.appendToStream(STREAM_NAME, jsonTestEvents(), {
27+
// credentials enforces classic append
28+
credentials: { username: "admin", password: "changeit" },
29+
});
30+
31+
const writeUntilError = () =>
32+
new Promise((resolve) => {
33+
const writeOnLoop = (): Promise<never> =>
34+
client
35+
.appendToStream(STREAM_NAME, jsonTestEvents(30_000), {
36+
// credentials enforces classic append
37+
credentials: { username: "admin", password: "changeit" },
38+
})
39+
.then(writeOnLoop);
40+
41+
writeOnLoop().catch((e) => {
42+
resolve(e);
43+
});
3244
});
3345

34-
node.killNode(node.endpoints[0]);
46+
const errorPromise = writeUntilError();
47+
48+
await node.killNode(node.endpoints[0]);
49+
50+
const error = await errorPromise;
51+
expect(error).toBeInstanceOf(UnavailableError);
3552

3653
// wait for any unhandled rejections
3754
await delay(5_000);
55+
56+
await node.down();
3857
});
58+
59+
optionalTest(matchServerVersion`>=21.10`)(
60+
"Should not write after end (batch append)",
61+
async () => {
62+
const node = createTestNode();
63+
await node.up();
64+
65+
const client = new EventStoreDBClient(
66+
{ endpoint: node.uri },
67+
{ rootCertificate: node.rootCertificate }
68+
);
69+
70+
const STREAM_NAME = "json_stream_name";
71+
await client.appendToStream(STREAM_NAME, jsonTestEvents());
72+
73+
const writeUntilError = () =>
74+
new Promise((resolve) => {
75+
const writeOnLoop = (): Promise<never> =>
76+
client
77+
.appendToStream(STREAM_NAME, jsonTestEvents(30_000))
78+
.then(writeOnLoop);
79+
80+
writeOnLoop().catch((e) => {
81+
resolve(e);
82+
});
83+
});
84+
85+
const errorPromise = writeUntilError();
86+
87+
await node.killNode(node.endpoints[0]);
88+
89+
const error = await errorPromise;
90+
expect(error).toBeInstanceOf(UnavailableError);
91+
92+
// wait for any unhandled rejections
93+
await delay(5_000);
94+
}
95+
);
3996
});

0 commit comments

Comments
 (0)