Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/interfaces/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ export interface ISendMessage {
*/
export type RequestId = string;

/**
* Listener for subscribe messages.
*/
export type SubscribeListener = (message: IDecodedMessage) => void;

export interface IMetaSetter {
(message: IProtoMessage & { meta: undefined }): Uint8Array;
}
Expand Down
78 changes: 68 additions & 10 deletions packages/sdk/src/messaging/ack_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import {
IDecoder,
IFilter,
IStore,
NetworkConfig
NetworkConfig,
SubscribeListener
} from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";

import { MessageStore } from "./message_store.js";
import { IAckManager } from "./utils.js";

type AckManagerConstructorParams = {
messageStore: MessageStore;
Expand All @@ -18,6 +18,14 @@ type AckManagerConstructorParams = {
networkConfig: NetworkConfig;
};

export interface IAckManager {
start(): void;
stop(): void;
observe(contentTopic: string): Promise<boolean>;
subscribe(contentTopic: string, cb: SubscribeListener): Promise<boolean>;
unsubscribe(contentTopic: string): Promise<void>;
}

export class AckManager implements IAckManager {
private readonly messageStore: MessageStore;
private readonly filterAckManager: FilterAckManager;
Expand Down Expand Up @@ -49,7 +57,7 @@ export class AckManager implements IAckManager {
this.subscribedContentTopics.clear();
}

public async subscribe(contentTopic: string): Promise<boolean> {
public async observe(contentTopic: string): Promise<boolean> {
if (this.subscribedContentTopics.has(contentTopic)) {
return true;
}
Expand All @@ -69,6 +77,24 @@ export class AckManager implements IAckManager {
])
).some((success) => success);
}

public async subscribe(
contentTopic: string,
cb: SubscribeListener
): Promise<boolean> {
const decoder = createDecoder(
contentTopic,
createRoutingInfo(this.networkConfig, {
contentTopic
})
);

return this.filterAckManager.subscribe(decoder, cb);
}

public async unsubscribe(contentTopic: string): Promise<void> {
return this.filterAckManager.unsubscribe(contentTopic);
}
}

class FilterAckManager {
Expand All @@ -77,7 +103,9 @@ class FilterAckManager {
public constructor(
private messageStore: MessageStore,
private filter: IFilter
) {}
) {
this.onMessage = this.onMessage.bind(this);
}

public start(): void {
return;
Expand All @@ -91,18 +119,48 @@ class FilterAckManager {
this.decoders.clear();
}

public async subscribe(decoder: IDecoder<IDecodedMessage>): Promise<boolean> {
const success = await this.filter.subscribe(
decoder,
this.onMessage.bind(this)
);
public async subscribe(
decoder: IDecoder<IDecodedMessage>,
cb?: SubscribeListener
): Promise<boolean> {
const success = await this.filter.subscribe(decoder, (message) => {
try {
cb?.(message);
} catch (error) {
// ignore
}

try {
this.onMessage(message);
} catch (error) {
// ignore
}
});

if (success) {
this.decoders.add(decoder);
}

return success;
}

private async onMessage(message: IDecodedMessage): Promise<void> {
public async unsubscribe(contentTopic: string): Promise<void> {
const decoders = Array.from(this.decoders).filter(
(decoder) => decoder.contentTopic === contentTopic
);

const promises = decoders.map((decoder) =>
this.filter.unsubscribe(decoder)
);

await Promise.all(promises);

for (const decoder of decoders) {
this.decoders.delete(decoder);
}
}

private onMessage(message: IDecodedMessage): void {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message, { filterAck: true });
}
Expand Down
14 changes: 13 additions & 1 deletion packages/sdk/src/messaging/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import {
ISendMessage,
IStore,
NetworkConfig,
RequestId
RequestId,
SubscribeListener
} from "@waku/interfaces";

import { AckManager } from "./ack_manager.js";
Expand Down Expand Up @@ -58,4 +59,15 @@ export class Messaging implements IMessaging {
public send(wakuLikeMessage: ISendMessage): Promise<RequestId> {
return this.sender.send(wakuLikeMessage);
}

public subscribe(
contentTopic: string,
cb: SubscribeListener
): Promise<boolean> {
return this.ackManager.subscribe(contentTopic, cb);
}

public unsubscribe(contentTopic: string): Promise<void> {
return this.ackManager.unsubscribe(contentTopic);
}
}
2 changes: 1 addition & 1 deletion packages/sdk/src/messaging/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class Sender {
public async send(message: ISendMessage): Promise<RequestId> {
const requestId = await this.messageStore.queue(message);

await this.ackManager.subscribe(message.contentTopic);
await this.ackManager.observe(message.contentTopic);
await this.sendMessage(requestId, message);

return requestId;
Expand Down
5 changes: 0 additions & 5 deletions packages/sdk/src/messaging/utils.ts

This file was deleted.

22 changes: 21 additions & 1 deletion packages/sdk/src/waku/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import type {
IWaku,
IWakuEventEmitter,
Libp2p,
NetworkConfig
NetworkConfig,
SubscribeListener
} from "@waku/interfaces";
import {
DefaultNetworkConfig,
Expand Down Expand Up @@ -305,6 +306,25 @@ export class WakuNode implements IWaku {
return this.messaging.send(message);
}

public subscribe(
contentTopic: string,
cb: SubscribeListener
): Promise<boolean> {
if (!this.messaging) {
throw new Error("Messaging not initialized");
}

return this.messaging.subscribe(contentTopic, cb);
}

public unsubscribe(contentTopic: string): Promise<void> {
if (!this.messaging) {
throw new Error("Messaging not initialized");
}

return this.messaging.unsubscribe(contentTopic);
}

private createRoutingInfo(
contentTopic?: string,
shardId?: number
Expand Down
Loading