Skip to content

Commit 4235b92

Browse files
l4mbymagne
andauthored
[bugfix] Reading of message annotations (#182)
* Fix on reading of message annotations * Remove unused test * Fix broken test --------- Co-authored-by: magne <[email protected]>
1 parent 55631e7 commit 4235b92

File tree

5 files changed

+86
-42
lines changed

5 files changed

+86
-42
lines changed

src/amqp10/decoder.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export const FormatCode = {
2323
Map32: 0xd1,
2424
Null: 0x40,
2525
ULong0: 0x44,
26+
Ubyte: 0x50,
2627
SmallUlong: 0x53,
2728
ULong: 0x80,
2829
Uint: 0x70,

src/amqp10/encoder.ts

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
import { inspect } from "node:util"
22
import { isDate } from "node:util/types"
3-
import { Message, MessageApplicationProperties, MessageProperties } from "../publisher"
3+
import {
4+
AmqpByte,
5+
Message,
6+
MessageAnnotations,
7+
MessageAnnotationsValue,
8+
MessageApplicationProperties,
9+
MessageProperties,
10+
} from "../publisher"
411
import { DataWriter } from "../requests/data_writer"
512

613
const FormatCodeType = {
@@ -23,6 +30,7 @@ const FormatCode = {
2330
Null: 0x40,
2431
SmallUlong: 0x53,
2532
Uint: 0x70,
33+
Ubyte: 0x50,
2634
Int: 0x71,
2735
Timestamp: 0x83,
2836
} as const
@@ -35,14 +43,14 @@ const PropertySizeDescription =
3543

3644
type MessageApplicationPropertiesList = { key: string; value: string | number }[]
3745

38-
type MessageAnnotationsList = { key: string; value: string | number }[]
46+
type MessageAnnotationsList = { key: string; value: MessageAnnotationsValue }[]
3947

4048
export function amqpEncode(
4149
writer: DataWriter,
4250
{ content, messageProperties, applicationProperties, messageAnnotations }: Message
4351
): void {
4452
writer.writeUInt32(messageSize({ content, messageProperties, applicationProperties, messageAnnotations }))
45-
writeMessageAnnotations(writer, toList(messageAnnotations))
53+
writeMessageAnnotations(writer, toAnnotationsList(messageAnnotations))
4654
writeProperties(writer, messageProperties)
4755
writeApplicationProperties(writer, toList(applicationProperties))
4856
writeContent(writer, content)
@@ -53,7 +61,7 @@ export function messageSize({ content, messageProperties, applicationProperties,
5361
lengthOfContent(content) +
5462
lengthOfProperties(messageProperties) +
5563
lengthOfApplicationProperties(toList(applicationProperties)) +
56-
lengthOfMessageAnnotations(toList(messageAnnotations))
64+
lengthOfMessageAnnotations(toAnnotationsList(messageAnnotations))
5765
)
5866
}
5967

@@ -121,7 +129,11 @@ function writeMessageAnnotations(writer: DataWriter, messageAnnotationsList: Mes
121129
.filter((elem) => elem.key)
122130
.forEach((elem) => {
123131
amqpWriteString(writer, elem.key)
124-
typeof elem.value === "string" ? amqpWriteString(writer, elem.value) : amqpWriteIntNumber(writer, elem.value)
132+
if (elem.value instanceof AmqpByte) {
133+
amqpWriteByte(writer, elem.value)
134+
} else {
135+
typeof elem.value === "string" ? amqpWriteString(writer, elem.value) : amqpWriteIntNumber(writer, elem.value)
136+
}
125137
})
126138
}
127139

@@ -188,8 +200,12 @@ function getPropertySize(properties: MessageProperties): number {
188200
)
189201
}
190202

191-
function getListSize(list: MessageApplicationPropertiesList | MessageAnnotationsList): number {
192-
return list.reduce((sum, elem) => sum + getSizeOf(elem.key) + getSizeOf(elem.value), 0)
203+
function getListSize(list: MessageAnnotationsList): number {
204+
return list.reduce(
205+
(sum: number, elem: { key: string; value: MessageAnnotationsValue }) =>
206+
sum + getSizeOf(elem.key) + getSizeOf(elem.value),
207+
0
208+
)
193209
}
194210

195211
function amqpWriteString(writer: DataWriter, data?: string): void {
@@ -242,6 +258,11 @@ function amqpWriteIntNumber(writer: DataWriter, data?: number): void {
242258
writer.writeInt32(data)
243259
}
244260

261+
function amqpWriteByte(writer: DataWriter, data: AmqpByte): void {
262+
writer.writeByte(FormatCode.Ubyte)
263+
writer.writeByte(data.byteValue)
264+
}
265+
245266
function amqpWriteBuffer(writer: DataWriter, data?: Buffer): void {
246267
if (!data || !data.length) {
247268
return amqpWriteNull(writer)
@@ -269,11 +290,15 @@ function amqpWriteDate(writer: DataWriter, date?: Date): void {
269290
writer.writeUInt64(BigInt(date.getTime()))
270291
}
271292

272-
function getSizeOf(value?: string | Date | number | Buffer): number {
293+
function getSizeOf(value?: string | Date | number | Buffer | AmqpByte): number {
273294
if (!value) {
274295
return 1
275296
}
276297

298+
if (value instanceof AmqpByte) {
299+
return 1 + 1
300+
}
301+
277302
if (typeof value === "string") {
278303
const count = Buffer.from(value).length
279304
return count <= 255 ? 1 + 1 + count : 1 + 4 + count
@@ -300,3 +325,10 @@ function toList(applicationProperties?: MessageApplicationProperties): MessageAp
300325
return { key: elem[0], value: elem[1] }
301326
})
302327
}
328+
329+
function toAnnotationsList(annotations?: MessageAnnotations): MessageAnnotationsList {
330+
if (!annotations) return []
331+
return Object.entries(annotations).map((elem) => {
332+
return { key: elem[0], value: elem[1] }
333+
})
334+
}

src/publisher.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,24 @@ import { PublishRequestV2 } from "./requests/publish_request_v2"
1515

1616
export type MessageApplicationProperties = Record<string, string | number>
1717

18-
export type MessageAnnotations = Record<string, string | number>
18+
export type MessageAnnotations = Record<string, MessageAnnotationsValue>
19+
20+
export type MessageAnnotationsValue = string | number | AmqpByte
21+
22+
export class AmqpByte {
23+
private value: number
24+
25+
constructor(value: number) {
26+
if (value > 255 || value < 0) {
27+
throw new Error("Invalid byte, value must be between 0 and 255")
28+
}
29+
this.value = value
30+
}
31+
32+
public get byteValue() {
33+
return this.value
34+
}
35+
}
1936

2037
export interface MessageProperties {
2138
contentType?: string
@@ -54,7 +71,7 @@ export interface Message {
5471
export interface MessageOptions {
5572
messageProperties?: MessageProperties
5673
applicationProperties?: Record<string, string | number>
57-
messageAnnotations?: Record<string, string | number>
74+
messageAnnotations?: Record<string, MessageAnnotationsValue>
5875
}
5976

6077
export interface Publisher {

src/response_decoder.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,9 @@ export function decodeFormatCode(dataResponse: DataReader, formatCode: number, s
411411
return dataResponse.readUInt32()
412412
case FormatCode.SmallUlong:
413413
return dataResponse.readInt8() // Read a SmallUlong
414+
case FormatCode.Ubyte:
415+
dataResponse.forward(1)
416+
return dataResponse.readUInt8()
414417
case FormatCode.ULong:
415418
return dataResponse.readUInt64() // Read an ULong
416419
case FormatCode.List0:

test/e2e/declare_consumer.test.ts

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
import { expect } from "chai"
2+
import { randomUUID } from "crypto"
3+
import { readFileSync } from "fs"
4+
import path from "path"
25
import { Client, Publisher } from "../../src"
36
import {
7+
AmqpByte,
48
Message,
59
MessageAnnotations,
610
MessageApplicationProperties,
7-
MessageProperties,
811
MessageHeader,
12+
MessageProperties,
913
} from "../../src/publisher"
1014
import { Offset } from "../../src/requests/subscribe_request"
15+
import { BufferDataReader } from "../../src/response_decoder"
16+
import { getMaxSharedConnectionInstances, range } from "../../src/util"
1117
import {
1218
createClient,
1319
createConsumer,
@@ -16,20 +22,14 @@ import {
1622
createStreamName,
1723
} from "../support/fake_data"
1824
import { Rabbit, RabbitConnectionResponse } from "../support/rabbit"
19-
import { getMaxSharedConnectionInstances, range } from "../../src/util"
20-
import { BufferDataReader } from "../../src/response_decoder"
2125
import {
26+
decodeMessageTesting,
2227
eventually,
2328
expectToThrowAsync,
24-
username,
25-
password,
26-
createClassicPublisher,
27-
decodeMessageTesting,
2829
getTestNodesFromEnv,
30+
password,
31+
username,
2932
} from "../support/util"
30-
import { readFileSync } from "fs"
31-
import path from "path"
32-
import { randomUUID } from "crypto"
3333

3434
describe("declare consumer", () => {
3535
let streamName: string
@@ -241,32 +241,23 @@ describe("declare consumer", () => {
241241
await eventually(async () => expect(messageAnnotations).eql([annotations]))
242242
}).timeout(10000)
243243

244-
it("messageAnnotations are ignored by a classic driver", async () => {
244+
it("messageAnnotations with bytes are read correctly", async () => {
245245
const messageAnnotations: MessageAnnotations[] = []
246-
const annotations = createAnnotations()
247-
const classicPublisher = await createClassicPublisher()
248-
await classicPublisher.ch.assertQueue("testQ", {
249-
exclusive: false,
250-
durable: true,
251-
autoDelete: false,
252-
arguments: {
253-
"x-queue-type": "stream", // Mandatory to define stream queue
254-
},
255-
})
256-
classicPublisher.ch.sendToQueue("testQ", Buffer.from("Hello"), {
257-
headers: {
258-
messageAnnotations: annotations,
259-
},
260-
})
246+
const annotations = { test: new AmqpByte(123) }
247+
await rabbit.createStream("testQ")
248+
await client.declareConsumer(
249+
{ stream: "testQ", offset: Offset.next(), consumerRef: "test" },
250+
(message: Message) => {
251+
messageAnnotations.push(message.messageAnnotations ?? {})
252+
}
253+
)
261254

262-
await client.declareConsumer({ stream: "testQ", offset: Offset.first() }, (message: Message) => {
263-
messageAnnotations.push(message.messageAnnotations || {})
264-
})
255+
const testP = await client.declarePublisher({ stream: "testQ" })
256+
await testP.send(Buffer.from("Hello"), { messageAnnotations: annotations })
265257

266258
await eventually(async () => {
267-
expect(messageAnnotations).not.eql([annotations])
268-
await classicPublisher.ch.close()
269-
await classicPublisher.conn.close()
259+
const [messageAnnotation] = messageAnnotations
260+
expect(messageAnnotation).to.eql({ test: 123 })
270261
})
271262
}).timeout(10000)
272263

0 commit comments

Comments
 (0)