Skip to content

Commit 33d7917

Browse files
gpadtarzacodes
andauthored
200 fix maxlistenersexceededwarning warning functional (#202)
* fix: use a single listener to close publishers and consumers when receiving a metadata update containing the relevant stream * refactor: rewrote a part of the code in order to avoid side effects when filtering an array * More functional approach --------- Co-authored-by: Luca <[email protected]>
1 parent 75ba0c7 commit 33d7917

File tree

2 files changed

+59
-11
lines changed

2 files changed

+59
-11
lines changed

src/client.ts

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,9 @@ export class Client {
157157
logger: this.logger,
158158
}
159159
const publisher = new StreamPublisher(streamPublisherParams, filter)
160-
connection.on("metadata_update", async (metadata) => {
161-
if (metadata.metadataInfo.stream === publisher.streamName) {
162-
await publisher.close(false)
163-
this.publishers.delete(publisher.extendedId)
164-
}
160+
connection.onPublisherClosed(publisher.extendedId, params.stream, async () => {
161+
await publisher.close(false)
162+
this.publishers.delete(publisher.extendedId)
165163
})
166164
this.publishers.set(publisher.extendedId, { publisher, connection, params, filter })
167165
this.logger.info(
@@ -206,13 +204,11 @@ export class Client {
206204
},
207205
params.filter
208206
)
209-
connection.on("metadata_update", async (metadata) => {
210-
if (metadata.metadataInfo.stream === consumer.streamName) {
211-
if (params.connectionClosedListener) {
212-
params.connectionClosedListener(false)
213-
}
214-
await this.closeConsumer(consumer.extendedId)
207+
connection.onConsumerClosed(consumer.extendedId, params.stream, async () => {
208+
if (params.connectionClosedListener) {
209+
params.connectionClosedListener(false)
215210
}
211+
await this.closeConsumer(consumer.extendedId)
216212
})
217213
this.consumers.set(consumer.extendedId, { connection, consumer, params })
218214
await this.declareConsumerOnConnection(params, consumerId, connection)

src/connection.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ import { StoreOffsetRequest } from "./requests/store_offset_request"
4040
import { QueryOffsetResponse } from "./responses/query_offset_response"
4141
import { QueryOffsetRequest } from "./requests/query_offset_request"
4242
import { coerce, lt } from "semver"
43+
import EventEmitter from "events"
44+
import { MetadataUpdateResponse } from "./responses/metadata_update_response"
45+
import { MetadataInfo } from "./responses/raw_response"
4346

4447
export type ConnectionClosedListener = (hadError: boolean) => void
4548

@@ -68,6 +71,11 @@ function extractHeartbeatInterval(heartbeatInterval: number, tuneResponse: TuneR
6871
return heartbeatInterval === 0 ? tuneResponse.heartbeat : Math.min(heartbeatInterval, tuneResponse.heartbeat)
6972
}
7073

74+
type ListenerEntry = {
75+
extendedId: string
76+
stream: string
77+
}
78+
7179
export class Connection {
7280
public readonly hostname: string
7381
public readonly leader: boolean
@@ -92,6 +100,9 @@ export class Connection {
92100
private setupCompleted: boolean = false
93101
publisherId = 0
94102
consumerId = 0
103+
private consumerListeners: ListenerEntry[] = []
104+
private publisherListeners: ListenerEntry[] = []
105+
private closeEventsEmitter = new EventEmitter()
95106

96107
constructor(
97108
private readonly params: ConnectionParams,
@@ -249,7 +260,22 @@ export class Connection {
249260
)
250261
}
251262

263+
public onPublisherClosed(publisherExtendedId: string, streamName: string, callback: () => void | Promise<void>) {
264+
this.publisherListeners.push({ extendedId: publisherExtendedId, stream: streamName })
265+
this.closeEventsEmitter.once(`close_publisher_${publisherExtendedId}`, callback)
266+
}
267+
268+
public onConsumerClosed(consumerExtendedId: string, streamName: string, callback: () => void | Promise<void>) {
269+
this.consumerListeners.push({ extendedId: consumerExtendedId, stream: streamName })
270+
this.closeEventsEmitter.once(`close_consumer_${consumerExtendedId}`, callback)
271+
}
272+
252273
private registerListeners(listeners?: ConnectionListenersParams) {
274+
this.decoder.on("metadata_update", (metadata) => {
275+
this.publisherListeners = notifyOnceClose(this.publisherListeners, metadata, this.closeEventsEmitter, "publisher")
276+
this.consumerListeners = notifyOnceClose(this.consumerListeners, metadata, this.closeEventsEmitter, "consumer")
277+
})
278+
253279
if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update)
254280
if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm)
255281
if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error)
@@ -548,3 +574,29 @@ export function connect(logger: Logger, params: ConnectionParams) {
548574
export function create(logger: Logger, params: ConnectionParams) {
549575
return Connection.create(params, logger)
550576
}
577+
578+
function notifyOnceClose(
579+
listeners: ListenerEntry[],
580+
metadata: MetadataUpdateResponse,
581+
closeEventsEmitter: EventEmitter,
582+
eventName: "publisher" | "consumer"
583+
): ListenerEntry[] {
584+
const [toNotify, toKeep] = partition(listeners, isSameStream(metadata))
585+
toNotify.forEach((l) => closeEventsEmitter.emit(`close_${eventName}_${l.extendedId}`))
586+
return toKeep
587+
}
588+
589+
export function partition<T>(arr: T[], predicate: (t: T) => boolean): [T[], T[]] {
590+
const [truthy, falsy] = arr.reduce(
591+
(acc, t) => {
592+
acc[predicate(t) ? 0 : 1].push(t)
593+
return acc
594+
},
595+
[[], []] as [T[], T[]]
596+
)
597+
return [truthy, falsy]
598+
}
599+
600+
function isSameStream({ metadataInfo }: { metadataInfo: MetadataInfo }): (e: ListenerEntry) => boolean {
601+
return (e) => e.stream === metadataInfo.stream
602+
}

0 commit comments

Comments
 (0)