Skip to content

Commit afd0b3a

Browse files
l4mbymagne
andauthored
feat: expose query offset in the client (#207)
* feat: expose query offset in the client * fix: refactor of test * fix: remove only --------- Co-authored-by: magne <[email protected]>
1 parent e113f41 commit afd0b3a

File tree

2 files changed

+32
-11
lines changed

2 files changed

+32
-11
lines changed

src/client.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ export class Client {
265265
})
266266
}
267267

268+
public queryOffset(params: QueryOffsetParams) {
269+
return this.connection.queryOffset(params)
270+
}
271+
268272
private async closeAllConsumers(manuallyClose: boolean) {
269273
await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close(manuallyClose)))
270274
this.consumers = new Map<string, ConsumerMappedValue>()

test/e2e/offset.test.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -204,23 +204,40 @@ describe("offset", () => {
204204
})
205205

206206
describe("query", () => {
207-
it("the consumer is able to track the offset of the stream through queryOffset method", async () => {
208-
let offset: bigint = 0n
207+
it("the consumer is able to track the offset and start from the stored offset", async () => {
208+
const consumerOneMessages: Message[] = []
209+
const consumerTwoMessages: Message[] = []
210+
const publisher = await client.declarePublisher({ stream: testStreamName })
211+
await publisher.send(Buffer.from("hello"))
212+
await publisher.send(Buffer.from("marker"))
213+
await publisher.send(Buffer.from("hello"))
214+
await publisher.send(Buffer.from("world"))
215+
209216
const consumer = await client.declareConsumer(
210-
{ stream: testStreamName, offset: Offset.next(), consumerRef: "my_consumer" },
217+
{ stream: testStreamName, offset: Offset.first(), consumerRef: "my_consumer" },
211218
async (message: Message) => {
212-
await consumer.storeOffset(message.offset!)
213-
offset = message.offset!
219+
consumerOneMessages.push(message)
220+
if (message.content.toString() === "marker") {
221+
await consumer.storeOffset(message.offset!)
222+
}
223+
}
224+
)
225+
await eventually(async () => {
226+
const [foundMarker] = consumerOneMessages.filter((msg) => msg.content.toString() === "marker")
227+
expect(foundMarker).to.not.be.undefined
228+
const offset = await client.queryOffset({ stream: testStreamName, reference: "my_consumer" })
229+
expect(offset).to.not.be.undefined
230+
}, 3000)
231+
const storedOffset = await client.queryOffset({ stream: testStreamName, reference: "my_consumer" })
232+
await client.declareConsumer(
233+
{ stream: testStreamName, offset: Offset.offset(storedOffset), consumerRef: "my_consumer" },
234+
async (message: Message) => {
235+
consumerTwoMessages.push(message)
214236
}
215237
)
216-
const publisher = await client.declarePublisher({ stream: testStreamName })
217-
218-
await publisher.send(Buffer.from("hello"))
219-
await publisher.send(Buffer.from("world"))
220238

221239
await eventually(async () => {
222-
const result = await consumer.queryOffset()
223-
expect(result).eql(offset)
240+
expect(consumerTwoMessages.length).eql(3)
224241
})
225242
}).timeout(10000)
226243

0 commit comments

Comments
 (0)