Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/application/Code51Exception.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"use strict"

import { ResponseCode } from "../util"

export type TResponseCode = (typeof ResponseCode)[keyof typeof ResponseCode]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this type is necessary, we can just use a type number for the code

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@l4mby I see.

export default class RMQProtocolResponseError extends Error {
  readonly #code: TResponseCode

  constructor(message: string, rmqStreamResponseCode: TResponseCode) {
    super(message)

The retionale to narrow type of rmqStreamResponseCode is to not allow to provide any number whereas we actually expect only ResponseCode.

The type alias TResponseCode is for better clarity and readability - semantically and to not use that long typecast. So I believe we should keep it. Let me know your thought on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @WhereJuly the problem with this type TResponseCode is that force you to use the as like in the code below

const code = res.code as TResponseCode

And if you receive a number that is not present in ResponseCode you are forcing it. This is the reason about using a number and left the user to verify if it's a known code present in ResponseCode.

About the name, it could be useful to have an error we use instead of "generic error" but it not be too generic because not always can be used so I suggest: class StreamResponseError.

So this code could became:

export class StreamResponseError extends Error {
  constructor(message: string, readonly errorCode: number) {
    super(message)
  }
}

If you want to use the #errorCode instead the typescript version, it's fine I suggest to name the property errorCode or responseCode.
As side note we prefer don't use default export.


/**
* Provides distinct domain exception for the package. Contains the optional
* RabbitMQ Stream protocol response code for more convenient processing.
*
* @param message A custom error message.
* @param rmqStreamResponseCode The above mentioned response code.
*
* @see https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_stream/docs/PROTOCOL.adoc#response-codes
*
* @example Selectively manage the exception type and react differently.
*
* ```typescript
* let result: any;
*
* const isRethrowable = (error_: Error) => {
* const isGenericError = error_ instanceof Code51Exception;
* const isNonManagedResponseCode = (error_ as Code51Exception).code !== ResponseCode.NoOffset;
*
* return isGenericError && isNonManagedResponseCode;
* };
*
* try {
* result = consumer.queryOffset();
* // ... process result
* } catch (error_) {
* if (isRethrowable(error_)) { throw error_; }
*
* const error = error_ as Code51Exception;
* if (error.code === ResponseCode.NoOffset) { return null; }
*
* return result;
* }
* ```
*
Comment on lines +7 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to keep the description related to the field of class.
We could provide a guard function in this file to verify if the error is StreamResponseError something like:

function isStreamResponseError(error: unknown): error is StreamResponseError {
return ... // shoudl return a boolean value
}

*/
export default class Code51Exception extends Error {
readonly #code?: TResponseCode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a more appropriate name for the error, specific to the queryoffset, for example QueryOffsetError could be ok. Also can you rename the property in code and make it mandatory?

Copy link
Author

@WhereJuly WhereJuly Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@l4mby Thank you. Understood. Wanted to discuss the following points.

code: make it mandatory

Will do.

more appropriate name for the error, specific to the queryoffset, for example QueryOffsetError

I planned this error to be a generic error for the entire package. Now I see this seems too wide. On the other hand maybe QueryOffsetError is too specific?

Could it be better RMQProtocolResponseError or something like that to cater for all potential use cases with the protocol response code? To not end up with a number of several too specific error classes?

Also can you rename the property in code

I could but this is not name convention, # is the JS property visiblity level. It actually makes the JS (not TS) property private at runtime.

The reason to have it is to actually make it read-only at run time, accessibe only via a getter. Making it code we expose it at runtime. Do you still think we should not use JS private properties?


constructor(message: string, rmqStreamResponseCode?: TResponseCode) {
super(message)

Object.setPrototypeOf(this, new.target.prototype)

this.name = this.constructor.name
this.#code = rmqStreamResponseCode ?? undefined

// Maintains proper stack trace for where our error was thrown (only available on V8)
if (Error.captureStackTrace) {
Error.captureStackTrace(this, this.constructor)
}
}

public get code(): TResponseCode | undefined {
return this.#code
}
}
63 changes: 63 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,51 @@ export interface ConsumerFilter {
matchUnfiltered: boolean
}

/**
* The parameters to declare a stream consumer.
*
* @param stream A stream name from which the consumer will be consuming messages.
*
* @param consumerRef A named consumer's name (identifier) for server-side offset tracking and single
* active consumer behavior.
*
* @see https://github.com/coders51/rabbitmq-stream-js-client/blob/main/example/src/offset_tracking_example.js
* @see https://github.com/coders51/rabbitmq-stream-js-client/blob/main/example/src/single_active_consumer_update_example.js
*
* @see {@link QueryOffsetParams.reference}
* @see {@link StoreOffsetParams.reference}
*
* @param offset The value object {@link Offset} representing the possible values for the
* RabbitMQ stream offset. Tells the client from where in the stream you want to start consuming.
* Could be "first", "last", "next", a specific numeric offset, or timestamp depending on the Offset type.
*
* @see https://github.com/WhereJuly/rabbitmq-stream-js-client?tab=readme-ov-file#basic-consuming
* @see https://www.rabbitmq.com/blog/2021/09/13/rabbitmq-streams-offset-tracking#the-different-offset-specifications-in-a-stream
*
* @param connectionClosedListener A callback {@link ConnectionClosedListener} invoked if the connection
* is closed (with parameter indicating error or not) to let you react to the consumer’s connection
* being closed.
*
* @param consumerUpdateListener {@link ConsumerUpdateListener}
*
* @param singleActive A flag to indicate "single active consumer" mode.
* Single active consumer provides exclusive consumption and consumption continuity on a stream.
* @see https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams
* @see https://github.com/coders51/rabbitmq-stream-js-client?tab=readme-ov-file#single-active-consumer
*
* @param filter A filter object {@link ConsumerFilter} specifying consumer filtering criteria.
* @see https://www.rabbitmq.com/docs/stream-filtering#filter-stages-overview
* @see https://github.com/coders51/rabbitmq-stream-js-client?tab=readme-ov-file#filtering
*
* @param creditPolicy {@link ConsumerCreditPolicy} determines if the consumer requests more message chunks from the broker
* while still processing the current chunk. By default only one chunk is processed ensuring
* the messages will be processed in order.
*
* @see https://github.com/coders51/rabbitmq-stream-js-client?tab=readme-ov-file#custom-policy
*
* @param consumerTag A simpler alias/identifier if you need to tag the consumer on the broker side
* for monitoring or naming.
*/
export interface DeclareConsumerParams {
stream: string
consumerRef?: string
Expand All @@ -829,12 +874,30 @@ export interface SubscribeParams {
offset: Offset
}

/**
* The parameters to store the concrete named consumer offset at the given stream.
*
* @member reference See {@link QueryOffsetParams.reference}
* @member stream See {@link QueryOffsetParams.stream}
* @member offsetValue The BigInt offset value.
*
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/BigInt
*/
export interface StoreOffsetParams {
reference: string
stream: string
offsetValue: bigint
}

/**
* Query offset parameters.
*
* @see https://www.rabbitmq.com/tutorials/tutorial-two-javascript-stream#server-side-offset-tracking
*
* @member reference The named consumer's name. Identifies the concrete stable consumer to
* persistently track the dedicated offset. Also named as `consumerRef` elsewhere.
* @member stream A stream name.
*/
export interface QueryOffsetParams {
reference: string
stream: string
Expand Down
58 changes: 57 additions & 1 deletion src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import { coerce, lt } from "semver"
import EventEmitter from "events"
import { MetadataUpdateResponse } from "./responses/metadata_update_response"
import { MetadataInfo } from "./responses/raw_response"
import Code51Exception, { TResponseCode } from "./application/Code51Exception"

export type ConnectionClosedListener = (hadError: boolean) => void

Expand Down Expand Up @@ -561,15 +562,70 @@ export class Connection {
return res.sequence
}

/**
* Store the provided offset at the RabbitMQ server in the given stream for the given consumer.
*
* @param StoreOffsetParams The stream name, consumer name and the offset value.
*
* The offset is stored on the given stream as the additional service message not visible to
* a stream consumers but counted for in the RabbitMQ UI.
*
* For streams with millions of messages per second it is recommended to store the offset
* once per a number of messages to not litter the stream and potentially worsen the performance
* when very high throughput is required.
*
* @see https://www.rabbitmq.com/blog/2021/09/13/rabbitmq-streams-offset-tracking#the-dark-side-of-server-side-offset-tracking
* @see https://www.rabbitmq.com/docs/streams#offset-tracking
*/
public storeOffset(params: StoreOffsetParams): Promise<void> {
return this.send(new StoreOffsetRequest(params))
}

/**
* Return the server-side saved offset or throws {@link Code51Exception} with the
* RabbitMQ response code.
*
* @see https://www.rabbitmq.com/tutorials/tutorial-two-javascript-stream
* @see https://www.rabbitmq.com/blog/2021/09/13/rabbitmq-streams-offset-tracking
*
* @param QueryOffsetParams The stream name and the named consumer identifier object.
*
* @see {@link Connection.connect}
* @see {@link Connection.storeOffset}
*
* @example Consumer reads previously saved server-tracked offset to start consuming
* from the desired offset.
*
* On consumer side create the client, detect the server-side saved offset to detect
* the desired offset to consume from. Then declare a consumer with the desired offset
* and the consumed message handler.
*
* When consuming, the consumer message handler may save the offset server-side
* for the `client.queryOffset()` to be able to further read it.
*
* ```typescript
* // ... create the RabbitMQ client here
* // Detect the decider starting offset for the next stream operation.
* const offset = await client.queryOffset({ reference: 'consumer-x', stream: 'stream-a' })
* const startWithOffset = offset ? rmqLibrary.Offset.offset(offset + 1n) :
* rmqLibrary.Offset.<whatever-enum-offset-is-desired>();
* const consumer = await client.declareConsumer({stream: 'stream-b', offset: startWithOffset},
* async (this: StreamConsumer, message: Message) => { await this.storeOffset(message.offset); });
* // Note the offset is saved by the message handler on the server.
* ```
*
* @throws {@link Code51Exception} if the server-side offset cannot be retrieved. The exception
* contains the `code` field that equals the RabbitMQ stream protocol response code value.
*
* @see https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_stream/docs/PROTOCOL.adoc#response-codes
*/
public async queryOffset(params: QueryOffsetParams): Promise<bigint> {
this.logger.debug(`Query Offset...`)
const res = await this.sendAndWait<QueryOffsetResponse>(new QueryOffsetRequest(params))
if (!res.ok) {
throw new Error(`Query offset command returned error with code ${res.code}`)
const code = res.code as TResponseCode

throw new Code51Exception(`Query offset command returned error with code ${res.code}`, code)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here since we will use the code as a number we can remove the as and throw directly the code in the constructor of QueryOffsetError

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@l4mby For this case I made an explanation here #285 (comment).

Shortly, my logic here is we better use the narrower type ResponseCode than wider number as the error class is going to accept only ResponseCode, so no reason to relax the expected type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @WhereJuly as written above we try to avoid as because it's very dangerous.

}
this.logger.debug(`Query Offset response: ${res.ok} with params: '${inspect(params)}'`)
return res.offsetValue
Expand Down
14 changes: 14 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,23 @@ export const wait = async (ms: number) => {
})
}

/**
* The RabbitMQ Stream protocol response codes.
*
* Only codes actually used by this package are defined here however any official code
* can be returned by the actually connected RabbitMQ instance.
*
* @see https://github.com/rabbitmq/rabbitmq-server/blob/v3.9.x/deps/rabbitmq_stream/docs/PROTOCOL.adoc#response-codes
*
* @see {@link connection.ts/Connection.queryOffset}
* @see {@link application/Code51Exception}
*/
export const ResponseCode = {
StreamDoesNotExist: 2,
SubscriptionIdDoesNotExist: 4,

// Used in src/connection.ts/Connection.queryOffset method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment could became stale easely please remove it.

NoOffset: 19,
} as const

export const isString = (value: unknown): boolean => typeof value === "string"
23 changes: 22 additions & 1 deletion test/e2e/offset.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
username,
wait,
} from "../support/util"
import Code51Exception from "../../src/application/Code51Exception"
import { ResponseCode } from "../../src/util"

describe("offset", () => {
const rabbit = new Rabbit(username, password)
Expand Down Expand Up @@ -267,7 +269,7 @@ describe("offset", () => {
})
}).timeout(10000)

it("declaring a consumer without consumerRef and querying for the offset should rise an error", async () => {
it("declaring a consumer without consumerRef and querying for the offset should raise an error", async () => {
const consumer = await client.declareConsumer(
{ stream: testStreamName, offset: Offset.first() },
(_message: Message) => {
Expand All @@ -292,5 +294,24 @@ describe("offset", () => {
await wait(200)
await expectToThrowAsync(() => consumer.queryOffset(), Error, `This socket has been ended by the other party`)
})

it("query offset is able to raise Code51Exception with ResponseCode.NoOffset code value set if there is no offset", async () => {
const params = { stream: testStreamName, consumerRef: "my_consumer", offset: Offset.first() }
const handler = (_message: Message) => { return } // prettier-ignore
const consumer = await client.declareConsumer(params, handler)

try {
await consumer.queryOffset()

throw new Error("Expected Code51Exception to be thrown")
} catch (error) {
const actual = error as Code51Exception

expect(actual).instanceOf(Error)
expect(actual).instanceOf(Code51Exception)
expect(actual.code).equals(ResponseCode.NoOffset)
expect(actual.message).contain("error with code 19")
}
})
})
})
38 changes: 38 additions & 0 deletions test/unit/application/Code51Exception.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { expect } from "chai"

import Code51Exception from "../../../src/application/Code51Exception"
import { ResponseCode } from "../../../src/util"

describe("[unit] Code51Exception Test", () => {
it("+constructor() #1: Should create Code51Exception expected default object", () => {
const expected = "A message"
const actual = new Code51Exception(expected)

expect(actual).instanceOf(Code51Exception)
expect(actual.message).eql(expected)
expect(actual.code).eql(undefined)
})

it("+constructor() #2: Should create Code51Exception expected object with expected response code", () => {
const expected = ResponseCode.NoOffset
const actual = new Code51Exception("a message", expected)

expect(actual.code).eql(expected)
})

it("Should throw expected Code51Exception exception", () => {
const expected = { message: "A message", code: ResponseCode.SubscriptionIdDoesNotExist }

try {
throw new Code51Exception(expected.message, expected.code)
} catch (error_) {
const actual = error_ as Code51Exception

expect(actual).instanceOf(Code51Exception)
expect(actual.message).eql(expected.message)
expect(actual.code).eql(expected.code)
}
})
})

// Assert: proper stack trace