Skip to content

Commit a176d75

Browse files
authored
Mono publisher and consumer removal (#112)
* Removing sqs mon publisher/consumer * core preparation for multi publishers only * SQS Abstract consumer * SQS publisher rename * SQS test fixes * Unused code removed * Fix lint * sqs Index fix + lint fix * Removing SNS mono consumer and publisher * Minor fix on sqs * SNS Fixing types * SNS renaming * SNS test fixes * amqp service types fix * amqp single publisher * amqp single consumer * removing unused tests * AMQP tests fixes * amqp index fix * Minor changes * Solving TODO * Minor change to not allow changing internal properties * SQS improving coverage * Adding tests * Lint fix * Removing unused test class * SQS making props protected * SNS marking some props as protected * Minor changes * AMQP improving tests coverage * Improving AMQP test coverage * Fix test * Trying to fix test * readme updated * Major version * Minor change * upgrading md * Lint fixes + solving todo + build fix * CR comments
1 parent a47c67c commit a176d75

File tree

74 files changed

+1756
-3618
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+1756
-3618
lines changed

README.md

Lines changed: 17 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,7 @@ It consists of the following submodules:
2222
### Publishers
2323

2424
`message-queue-toolkit` provides base classes for implementing publishers for each of the supported protocol.
25-
26-
#### Mono-schema publishers
27-
28-
Mono-schema publishers only support a single message type and are simpler to implement. They expose the following public methods:
29-
30-
* `constructor()`, which accepts the following parameters:
31-
* `dependencies` – a set of dependencies depending on the protocol;
32-
* `options`, composed by
33-
* `messageSchema` – the `zod` schema for the message;
34-
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema;
35-
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
36-
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
37-
* `init()`, prepare publisher for use (e. g. establish all necessary connections), it will be called automatically by `publish()` if not called before explicitly (lazy loading).
38-
* `close()`, stop publisher use (e. g. disconnect);
39-
* `publish()`, send a message to a queue or topic. It accepts the following parameters:
40-
* `message` – a message following a `zod` schema;
41-
* `options` – a protocol-dependent set of message parameters. For more information please check documentation for options for each protocol: [AMQP](https://amqp-node.github.io/amqplib/channel_api.html#channel_sendToQueue), [SQS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/interfaces/sendmessagecommandinput.html) and [SNS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sns/interfaces/publishcommandinput.html).
42-
43-
> **_NOTE:_** See [SqsPermissionPublisherMonoSchema.ts](./packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts) for a practical example.
44-
45-
> **_NOTE:_** Lazy loading is not supported for AMQP publishers.
46-
47-
#### Multi-schema publishers
48-
49-
Multi-schema publishers support multiple messages types. They implement the following public methods:
25+
They implement the following public methods:
5026

5127
* `constructor()`, which accepts the following parameters:
5228
* `dependencies` – a set of dependencies depending on the protocol;
@@ -61,35 +37,14 @@ Multi-schema publishers support multiple messages types. They implement the foll
6137
* `message` – a message following one of the `zod` schemas, supported by the publisher;
6238
* `options` – a protocol-dependent set of message parameters. For more information please check documentation for options for each protocol: [AMQP](https://amqp-node.github.io/amqplib/channel_api.html#channel_sendToQueue), [SQS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/interfaces/sendmessagecommandinput.html) and [SNS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sns/interfaces/publishcommandinput.html).
6339

40+
> **_NOTE:_** See [SqsPermissionPublisher.ts](./packages/sqs/test/publishers/SqsPermissionPublisher.ts) for a practical example.
41+
> **_NOTE:_** Lazy loading is not supported for AMQP publishers.
42+
6443

6544
### Consumers
6645

6746
`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocol.
68-
69-
#### Mono-schema consumers
70-
71-
Mono-schema consumers only support a single message type and are simpler to implement. They expose the following public methods:
72-
73-
* `constructor()`, which accepts the following parameters:
74-
* `dependencies` – a set of dependencies depending on the protocol;
75-
* `options`, composed by
76-
* `messageSchema` – the `zod` schema for the message;
77-
* `messageTypeField` - which field in the message is used for resolving the message type (for observability purposes);
78-
* `queueName`; (for SNS publishers this is a misnomer which actually refers to a topic name)
79-
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
80-
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
81-
* `subscriptionConfig` - SNS SQS consumer only - configuration for SNS -> SQS subscription to create, if one doesn't exist.
82-
* `consumerOverrides` – available only for SQS consumers;
83-
* `subscribedToTopic` – parameters for a topic to use during creation if it does not exist. Ignored if `queueLocator.subscriptionArn` is set. Available only for SNS consumers;
84-
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
85-
* `close()`, stop listening for messages and disconnect;
86-
* `processMessage()`, which accepts as parameter a `message` following a `zod` schema and should be overridden with logic on what to do with the message;
87-
* `start()`, which invokes `init()` and `processMessage()` and handles errors.
88-
* `preHandlerBarrier`, which accepts as a parameter a `message` following a `zod` schema and can be overridden to enable the barrier pattern (see [Barrier pattern](#barrier-pattern))
89-
90-
> **_NOTE:_** See [SqsPermissionConsumerMonoSchema.ts](./packages/sqs/test/consumers/SqsPermissionConsumerMonoSchema.ts) for a practical example.
91-
92-
#### Multi-schema consumers
47+
They expose the following public methods:
9348

9449
Multi-schema consumers support multiple message types via handler configs. They expose the following public methods:
9550

@@ -106,17 +61,18 @@ Multi-schema consumers support multiple message types via handler configs. They
10661
* `subscribedToTopic` – parameters for a topic to use during creation if it does not exist. Ignored if `queueLocator.subscriptionArn` is set. Available only for SNS consumers;
10762
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
10863
* `close()`, stop listening for messages and disconnect;
64+
* `start()`, which invokes `init()`.
10965

110-
* `processMessage()`, which accepts as parameter a `message` following a `zod` schema and should be overridden with logic on what to do with the message;
111-
* `start()`, which invokes `init()` and `processMessage()` and handles errors.
66+
> **_NOTE:_** See [SqsPermissionConsumer.ts](./packages/sqs/test/consumers/SqsPermissionConsumer.ts) for a practical example.
11267
113-
##### Multi-schema handler definition
68+
69+
##### How to define a handler
11470

11571
You can define handlers for each of the supported messages in a type-safe way using the MessageHandlerConfigBuilder.
11672

11773
Here is an example:
11874

119-
```ts
75+
```typescript
12076
type SupportedMessages = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE
12177
type ExecutionContext = {
12278
userService: UserService
@@ -176,7 +132,7 @@ export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
176132

177133
#### Error Handling
178134

179-
When implementing message handler in consumer (by overriding the `processMessage()` method), you are expected to return an instance of `Either`, containing either an error `retryLater`, or result `success`. In case of `retryLater`, the abstract consumer is instructed to requeue the message. Otherwise, in case of success, the message is finally removed from the queue. If an error is thrown while processing the message, the abstract consumer will also requeue the message. When overriding the `processMessage()` method, you should leverage the possible types to process the message as you need.
135+
When implementing a handler, you are expected to return an instance of `Either`, containing either an error `retryLater`, or result `success`. In case of `retryLater`, the abstract consumer is instructed to requeue the message. Otherwise, in case of success, the message is finally removed from the queue. If an error is thrown while processing the message, the abstract consumer will also requeue the message.
180136

181137
#### Schema Validation and Deserialization
182138

@@ -189,23 +145,22 @@ If
189145

190146
Then the message is automatically nacked without requeueing by the abstract consumer and processing fails.
191147

192-
> **_NOTE:_** See [userConsumerSchemas.ts](./packages/sqs/test/consumers/userConsumerSchemas.ts) and [SqsPermissionsConsumerMonoSchema.spec.ts](./packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts) for a practical example.
148+
> **_NOTE:_** See [userConsumerSchemas.ts](./packages/sqs/test/consumers/userConsumerSchemas.ts) and [SqsPermissionsConsumer.spec.ts](./packages/sqs/test/consumers/SqsPermissionsConsumer.spec.ts) for a practical example.
193149
194150
### Barrier pattern
195151
The barrier pattern facilitates the out-of-order message handling by retrying the message later if the system is not yet in the proper state to be able to process that message (e. g. some prerequisite messages have not yet arrived).
196152

197-
To enable this pattern you should implement `preHandlerBarrier` in order to define the conditions for starting to process the message.
153+
To enable this pattern you should define `preHandlerBarrier` on your message handler in order to define the conditions for starting to process the message.
198154
If the barrier method returns `false`, message will be returned into the queue for the later processing. If the barrier method returns `true`, message will be processed.
199155

200-
> **_NOTE:_** See [SqsPermissionConsumerMonoSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumerMonoSchema.ts) for a practical example on mono consumers.
201-
> **_NOTE:_** See [SqsPermissionConsumerMultiSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumerMultiSchema.ts) for a practical example on multi consumers.
156+
> **_NOTE:_** See [SqsPermissionConsumer.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumer.ts) for a practical example.
202157
203158

204159
## Fan-out to Multiple Consumers
205160

206161
SQS queues are built in a way that every message is only consumed once, and then deleted. If you want to do fan-out to multiple consumers, you need SNS topic in the middle, which is then propagated to all the SQS queues that have subscribed.
207162

208-
> **_NOTE:_** See [SnsPermissionPublisher.ts](./packages/sns/test/publishers/SnsPermissionPublisherMonoSchema.ts) and [SnsSqsPermissionConsumerMonoSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumerMonoSchema.ts) for a practical example.
163+
> **_NOTE:_** See [SnsPermissionPublisher.ts](./packages/sns/test/publishers/SnsPermissionPublisher.ts) and [SnsSqsPermissionConsumerMonoSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumer.ts) for a practical example.
209164
210165
## Automatic Queue and Topic Creation
211166

@@ -220,7 +175,7 @@ In certain cases you want to await until certain publisher publishes a message,
220175
In order to enable this functionality, configure spyHandler on the publisher or consumer:
221176

222177
```ts
223-
export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
178+
export class TestConsumerMultiSchema extends AbstractSqsConsumer<
224179
SupportedMessages,
225180
ExecutionContext
226181
> {
@@ -236,7 +191,7 @@ export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
236191
bufferSize: 100, // how many processed messages should be retained in memory for spy lookup. Default is 100
237192
messageIdField: 'id', // which field within a message payload uniquely identifies it. Default is `id`
238193
},
239-
}
194+
})
240195
}
241196
}
242197
```

UPGRADING.md

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Upgrading Guide
2+
3+
We have introduced the following breaking changes on version `12.0.0`, please follow the steps below to update your code
4+
from the previous version to the new one.
5+
6+
## Breaking Changes
7+
8+
### Description of Breaking Change
9+
Multi consumers and publishers can accomplish the same tasks as mono ones, but they add extra layer of complexity by
10+
requiring features to be implemented in both.
11+
As a result, we have decided to remove the mono ones to enhance maintainability.
12+
13+
### Migration Steps
14+
#### Multi consumers and publishers
15+
If you are using the multi consumer or consumer, you will only need to rename the class you are extending, and it should
16+
work as before.
17+
- `AbstractSqsMultiConsumer` -> `AbstractSqsConsumer`
18+
- `AbstractSqsMultiPublisher` -> `AbstractSqsPublisher`
19+
20+
#### Mono consumers and publishers
21+
If you are using the mono consumer or publisher, they no longer exist, so you will need to adjust your code to use
22+
the old named multi consumer or publisher (now called just consumer or publisher). Please check the guide below.
23+
24+
##### Publisher
25+
1. Rename the class you are extending from `AbstractSqsPublisherMonoSchema` to `AbstractSqsPublisherSchema`.
26+
2. replace the `messageSchema` property with `messageSchemas`, it is an array of `zod` schemas.
27+
```typescript
28+
// Old code
29+
export class MyPublisher extends AbstractSqsPublisherMonoSchema<MyType> {
30+
public static QUEUE_NAME = 'my-queue-name'
31+
32+
constructor(dependencies: SQSDependencies) {
33+
super(dependencies, {
34+
creationConfig: {
35+
queue: {
36+
QueueName: SqsPermissionPublisherMonoSchema.QUEUE_NAME,
37+
},
38+
},
39+
handlerSpy: true,
40+
deletionConfig: {
41+
deleteIfExists: false,
42+
},
43+
logMessages: true,
44+
messageSchema: MY_MESSAGE_SCHEMA,
45+
messageTypeField: 'messageType',
46+
})
47+
}
48+
}
49+
50+
// Updated code
51+
export class MyPublisher extends AbstractSqsPublisher<MyType> {
52+
public static QUEUE_NAME = 'my-queue-name'
53+
54+
constructor(dependencies: SQSDependencies) {
55+
super(dependencies, {
56+
creationConfig: {
57+
queue: {
58+
QueueName: SqsPermissionPublisherMonoSchema.QUEUE_NAME,
59+
},
60+
},
61+
handlerSpy: true,
62+
deletionConfig: {
63+
deleteIfExists: false,
64+
},
65+
logMessages: true,
66+
messageSchemas: [MY_MESSAGE_SCHEMA],
67+
messageTypeField: 'messageType',
68+
})
69+
}
70+
}
71+
```
72+
73+
##### Consumer
74+
1. Rename the class you are extending from `AbstractSqsConsumerMonoSchema` to `AbstractSqsConsumer`.
75+
2. Remove the `messageSchema` property.
76+
3. Define a handler (`handlers` property) for your message, specifying the `zod` schema (old `messageSchema`) and the
77+
method to handle the message (old `processMessage` method)
78+
```typescript
79+
// Old code
80+
export class MyConsumer extends AbstractAmqpConsumerMonoSchema<MyType> {
81+
public static QUEUE_NAME = 'my-queue-name'
82+
83+
constructor(dependencies: AMQPConsumerDependencies) {
84+
super(dependencies, {
85+
creationConfig: {
86+
queueName: AmqpPermissionConsumer.QUEUE_NAME,
87+
queueOptions: {
88+
durable: true,
89+
autoDelete: false,
90+
},
91+
},
92+
deletionConfig: {
93+
deleteIfExists: true,
94+
},
95+
messageSchema: MY_MESSAGE_SCHEMA,
96+
messageTypeField: 'messageType',
97+
})
98+
}
99+
100+
override async processMessage(
101+
message: MyType,
102+
): Promise<Either<'retryLater', 'success'>> {
103+
// Your handling code
104+
return { result: 'success' }
105+
}
106+
}
107+
108+
// Updated code
109+
export class MyConsumer extends AbstractAmqpConsumer<MyType, undefined> {
110+
public static QUEUE_NAME = 'my-queue-name'
111+
112+
constructor(dependencies: AMQPConsumerDependencies) {
113+
super(
114+
dependencies,
115+
{
116+
creationConfig: {
117+
queueName: AmqpPermissionConsumer.QUEUE_NAME,
118+
queueOptions: {
119+
durable: true,
120+
autoDelete: false,
121+
},
122+
},
123+
deletionConfig: {
124+
deleteIfExists: true,
125+
},
126+
messageTypeField: 'messageType',
127+
handlers: new MessageHandlerConfigBuilder<SupportedEvents, ExecutionContext>()
128+
.addConfig(
129+
MY_MESSAGE_SCHEMA,
130+
async (message) => {
131+
// Your handling code
132+
return {
133+
result: 'success',
134+
}
135+
},
136+
)
137+
.build(),
138+
},
139+
undefined
140+
)
141+
}
142+
}
143+
```
144+
> **_NOTE:_** on this example code we are omitting the barrier pattern (`preHandlerBarrier`) and pre handlers (`preHandlers`)
145+
to simplify the example. If you are using them, please check [SqsPermissionConsumer.ts](./packages/sqs/test/consumers/SqsPermissionConsumer.ts)
146+
to see how to update your code.

packages/amqp/index.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
1-
export type { CommonMessage } from './lib/types/MessageTypes'
2-
31
export type { AMQPQueueConfig } from './lib/AbstractAmqpService'
42

5-
export { AbstractAmqpConsumerMonoSchema } from './lib/AbstractAmqpConsumerMonoSchema'
6-
export { AbstractAmqpConsumerMultiSchema } from './lib/AbstractAmqpConsumerMultiSchema'
3+
export { AbstractAmqpConsumer, AMQPConsumerOptions } from './lib/AbstractAmqpConsumer'
74
export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver'
85

9-
export { AbstractAmqpPublisherMonoSchema } from './lib/AbstractAmqpPublisherMonoSchema'
10-
export { AbstractAmqpPublisherMultiSchema } from './lib/AbstractAmqpPublisherMultiSchema'
6+
export { AbstractAmqpPublisher, AMQPPublisherOptions } from './lib/AbstractAmqpPublisher'
117

128
export type { AmqpConfig } from './lib/amqpConnectionResolver'
139

packages/amqp/lib/AbstractAmqpBasePublisher.ts

Lines changed: 0 additions & 53 deletions
This file was deleted.

0 commit comments

Comments
 (0)