diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index cf383fcf4e..279bc5870a 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,5 +1,6 @@ -import type { IDecodedMessage, IDecoder } from "./message.js"; -import type { Callback } from "./protocols.js"; +import type { IDecodedMessage } from "./message.js"; +import { ContentTopic } from "./misc.js"; +import { IRoutingInfo } from "./sharding.js"; export type IFilter = { readonly multicodec: string; @@ -38,9 +39,10 @@ export type IFilter = { * console.error("Failed to subscribe"); * } */ - subscribe( - decoders: IDecoder | IDecoder[], - callback: Callback + subscribe( + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo, + callback: (msg: IDecodedMessage) => void | Promise ): Promise; /** @@ -64,8 +66,9 @@ export type IFilter = { * console.error("Failed to unsubscribe"); * } */ - unsubscribe( - decoders: IDecoder | IDecoder[] + unsubscribe( + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo ): Promise; /** diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index caecb73aec..36b6bf67bc 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -105,6 +105,7 @@ export interface IEncoder { export interface IDecoder { contentTopic: string; pubsubTopic: PubsubTopic; + routingInfo: IRoutingInfo; fromWireToProtoObj: (bytes: Uint8Array) => Promise; fromProtoObj: ( pubsubTopic: string, diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 71914755f7..9ffe536d25 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -11,6 +11,7 @@ import type { HealthStatus } from "./health_status.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; import { IDecodedMessage, IDecoder, IEncoder } from "./message.js"; +import { ContentTopic } from "./misc.js"; import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { ShardId } from "./sharding.js"; @@ -54,7 +55,12 @@ export interface IWakuEvents { [WakuEvent.Health]: CustomEvent; } +export interface IMessageEmitterEvents { + [contentTopic: string]: CustomEvent; +} + export type IWakuEventEmitter = TypedEventEmitter; +export type IMessageEmitter = TypedEventEmitter; export interface IWaku { libp2p: Libp2p; @@ -78,6 +84,20 @@ export interface IWaku { */ events: IWakuEventEmitter; + /** + * Emits messages on their content topic. Messages may be coming from subscriptions + * or store queries (TODO). The payload is directly emitted + * + * @example + * ```typescript + * waku.messageEmitter.addEventListener("/some/0/content-topic/proto", (event) => { + * const payload: UInt8Array = event.detail + * MyDecoder.decode(payload); + * }); + * ``` + */ + messageEmitter: IMessageEmitter; + /** * Returns a unique identifier for a node on the network. * @@ -251,6 +271,8 @@ export interface IWaku { */ createEncoder(params: CreateEncoderParams): IEncoder; + subscribe(contentTopics: ContentTopic[]): Promise; + /** * @returns {boolean} `true` if the node was started and `false` otherwise */ diff --git a/packages/message-encryption/src/crypto/utils.ts b/packages/message-encryption/src/crypto/utils.ts index fd57811bd4..15e9ed4b08 100644 --- a/packages/message-encryption/src/crypto/utils.ts +++ b/packages/message-encryption/src/crypto/utils.ts @@ -74,3 +74,32 @@ export async function sign( export function keccak256(input: Uint8Array): Uint8Array { return new Uint8Array(sha3.keccak256.arrayBuffer(input)); } + +/** + * Compare two public keys, can be used to verify that a given signature matches + * expectations. + * + * @param publicKeyA - The first public key to compare + * @param publicKeyB - The second public key to compare + * @returns true if the public keys are the same + */ +export function comparePublicKeys( + publicKeyA: Uint8Array | undefined, + publicKeyB: Uint8Array | undefined +): boolean { + if (!publicKeyA || !publicKeyB) { + return false; + } + + if (publicKeyA.length !== publicKeyB.length) { + return false; + } + + for (let i = 0; i < publicKeyA.length; i++) { + if (publicKeyA[i] !== publicKeyB[i]) { + return false; + } + } + + return true; +} diff --git a/packages/message-encryption/src/index.ts b/packages/message-encryption/src/index.ts index 79b6753833..fdace3c8dc 100644 --- a/packages/message-encryption/src/index.ts +++ b/packages/message-encryption/src/index.ts @@ -1,11 +1,17 @@ import { + comparePublicKeys, generatePrivateKey, generateSymmetricKey, getPublicKey } from "./crypto/index.js"; import { DecodedMessage } from "./decoded_message.js"; -export { generatePrivateKey, generateSymmetricKey, getPublicKey }; +export { + generatePrivateKey, + generateSymmetricKey, + getPublicKey, + comparePublicKeys +}; export type { DecodedMessage }; export * as ecies from "./ecies.js"; diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 87fcbbb8fc..2f44ef8606 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -206,3 +206,105 @@ export function createDecoder( ): Decoder { return new Decoder(contentTopic, routingInfo, symKey); } + +/** + * Result of decrypting a message with AES symmetric encryption. + */ +export interface SymmetricDecryptionResult { + /** The decrypted payload */ + payload: Uint8Array; + /** The signature if the message was signed */ + signature?: Uint8Array; + /** The recovered public key if the message was signed */ + signaturePublicKey?: Uint8Array; +} + +/** + * AES symmetric encryption. + * + * + * Follows [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/) encryption standard. + */ +export class SymmetricEncryption { + /** + * Creates an AES Symmetric encryption instance. + * + * @param symKey - The symmetric key for encryption (32 bytes recommended) + * @param sigPrivKey - Optional private key to sign messages before encryption + */ + public constructor( + private symKey: Uint8Array, + private sigPrivKey?: Uint8Array + ) {} + + /** + * Encrypts a byte array payload. + * + * The encryption process: + * 1. Optionally signs the payload with the private key + * 2. Adds padding to obscure payload size + * 3. Encrypts using AES-256-GCM + * + * @param payload - The data to encrypt + * @returns The encrypted payload + */ + public async encrypt(payload: Uint8Array): Promise { + const preparedPayload = await preCipher(payload, this.sigPrivKey); + return encryptSymmetric(preparedPayload, this.symKey); + } +} + +/** + * AES symmetric decryption. + * + * Follows [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/) encryption standard. + */ +export class SymmetricDecryption { + /** + * Creates an AES Symmetric decryption instance. + * + * @param symKey - The symmetric key for decryption (must match encryption key) + */ + public constructor(private symKey: Uint8Array) {} + + /** + * Decrypts an encrypted byte array payload. + * + * The decryption process: + * 1. Decrypts using AES-256-GCM + * 2. Removes padding + * 3. Verifies and recovers signature if present + * + * @param encryptedPayload - The encrypted data (from [[SymmetricEncryption.encrypt]]) + * @returns Object containing the decrypted payload and signature info, or undefined if decryption fails + */ + public async decrypt( + encryptedPayload: Uint8Array + ): Promise { + try { + const decryptedData = await decryptSymmetric( + encryptedPayload, + this.symKey + ); + + if (!decryptedData) { + return undefined; + } + + const result = postCipher(decryptedData); + + if (!result) { + return undefined; + } + + return { + payload: result.payload, + signature: result.sig?.signature, + signaturePublicKey: result.sig?.publicKey + }; + } catch (error) { + log.error("Failed to decrypt payload", error); + return undefined; + } + } +} diff --git a/packages/sdk/src/filter/filter.spec.ts b/packages/sdk/src/filter/filter.spec.ts index 4eabec6969..96577e0702 100644 --- a/packages/sdk/src/filter/filter.spec.ts +++ b/packages/sdk/src/filter/filter.spec.ts @@ -49,7 +49,11 @@ describe("Filter SDK", () => { const addStub = sinon.stub(Subscription.prototype, "add").resolves(true); const startStub = sinon.stub(Subscription.prototype, "start"); - const result = await filter.subscribe(decoder, callback); + const result = await filter.subscribe( + [testContentTopic], + testRoutingInfo, + callback + ); expect(result).to.be.true; expect(addStub.calledOnce).to.be.true; @@ -57,7 +61,10 @@ describe("Filter SDK", () => { }); it("should return false when unsubscribing from a non-existing subscription", async () => { - const result = await filter.unsubscribe(decoder); + const result = await filter.unsubscribe( + [testContentTopic], + testRoutingInfo + ); expect(result).to.be.false; }); diff --git a/packages/sdk/src/filter/filter.ts b/packages/sdk/src/filter/filter.ts index 4d12f8d32d..f74361b7d9 100644 --- a/packages/sdk/src/filter/filter.ts +++ b/packages/sdk/src/filter/filter.ts @@ -1,10 +1,10 @@ import { FilterCore } from "@waku/core"; import type { - Callback, + ContentTopic, FilterProtocolOptions, IDecodedMessage, - IDecoder, - IFilter + IFilter, + IRoutingInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { Logger } from "@waku/utils"; @@ -61,31 +61,25 @@ export class Filter implements IFilter { this.subscriptions.clear(); } - public async subscribe( - decoder: IDecoder | IDecoder[], - callback: Callback + public async subscribe( + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo, + callback: (msg: IDecodedMessage) => void | Promise ): Promise { - const decoders = Array.isArray(decoder) ? decoder : [decoder]; - - if (decoders.length === 0) { - throw Error("Cannot subscribe with 0 decoders."); + if (contentTopics.length === 0) { + throw Error("Cannot subscribe with 0 contentTopics."); } - const pubsubTopics = decoders.map((v) => v.pubsubTopic); - const singlePubsubTopic = pubsubTopics[0]; - - const contentTopics = decoders.map((v) => v.contentTopic); + const pubsubTopic = routingInfo.pubsubTopic; log.info( - `Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${singlePubsubTopic}` + `Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${pubsubTopic}` ); - this.throwIfTopicNotSame(pubsubTopics); - - let subscription = this.subscriptions.get(singlePubsubTopic); + let subscription = this.subscriptions.get(pubsubTopic); if (!subscription) { subscription = new Subscription({ - pubsubTopic: singlePubsubTopic, + pubsubTopic, protocol: this.protocol, config: this.config, peerManager: this.peerManager @@ -93,8 +87,8 @@ export class Filter implements IFilter { subscription.start(); } - const result = await subscription.add(decoders, callback); - this.subscriptions.set(singlePubsubTopic, subscription); + const result = await subscription.add(contentTopics, routingInfo, callback); + this.subscriptions.set(pubsubTopic, subscription); log.info( `Subscription ${result ? "successful" : "failed"} for content topic: ${contentTopics}` @@ -103,38 +97,31 @@ export class Filter implements IFilter { return result; } - public async unsubscribe( - decoder: IDecoder | IDecoder[] + public async unsubscribe( + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo ): Promise { - const decoders = Array.isArray(decoder) ? decoder : [decoder]; - - if (decoders.length === 0) { - throw Error("Cannot unsubscribe with 0 decoders."); + if (contentTopics.length === 0) { + throw Error("Cannot unsubscribe with 0 contentTopics."); } - - const pubsubTopics = decoders.map((v) => v.pubsubTopic); - const singlePubsubTopic = pubsubTopics[0]; - - const contentTopics = decoders.map((v) => v.contentTopic); + const { pubsubTopic } = routingInfo; log.info( - `Unsubscribing from contentTopics: ${contentTopics}, pubsubTopic: ${singlePubsubTopic}` + `Unsubscribing from contentTopics: ${contentTopics}, pubsubTopic: ${pubsubTopic}` ); - this.throwIfTopicNotSame(pubsubTopics); - - const subscription = this.subscriptions.get(singlePubsubTopic); + const subscription = this.subscriptions.get(pubsubTopic); if (!subscription) { log.warn("No subscriptions associated with the decoder."); return false; } - const result = await subscription.remove(decoders); + const result = await subscription.remove(contentTopics); if (subscription.isEmpty()) { log.warn("Subscription has no decoders anymore, terminating it."); subscription.stop(); - this.subscriptions.delete(singlePubsubTopic); + this.subscriptions.delete(pubsubTopic); } log.info( @@ -162,16 +149,4 @@ export class Filter implements IFilter { subscription.invoke(message, peerId); } - - // Limiting to one pubsubTopic for simplicity reasons, we can enable subscription for more than one PubsubTopic at once later when requested - private throwIfTopicNotSame(pubsubTopics: string[]): void { - const first = pubsubTopics[0]; - const isSameTopic = pubsubTopics.every((t) => t === first); - - if (!isSameTopic) { - throw Error( - `Cannot subscribe to more than one pubsub topic at the same time, got pubsubTopics:${pubsubTopics}` - ); - } - } } diff --git a/packages/sdk/src/filter/subscription.spec.ts b/packages/sdk/src/filter/subscription.spec.ts index 37f3d48ed3..94eb56ce8f 100644 --- a/packages/sdk/src/filter/subscription.spec.ts +++ b/packages/sdk/src/filter/subscription.spec.ts @@ -19,7 +19,6 @@ describe("Filter Subscription", () => { let filterCore: FilterCore; let peerManager: PeerManager; let subscription: Subscription; - let decoder: IDecoder; let config: FilterProtocolOptions; beforeEach(() => { @@ -37,8 +36,6 @@ describe("Filter Subscription", () => { config, peerManager }); - - decoder = mockDecoder(); }); afterEach(() => { diff --git a/packages/sdk/src/filter/subscription.ts b/packages/sdk/src/filter/subscription.ts index 3ab7273eb1..793b53a382 100644 --- a/packages/sdk/src/filter/subscription.ts +++ b/packages/sdk/src/filter/subscription.ts @@ -5,11 +5,11 @@ import { } from "@libp2p/interface"; import { FilterCore, messageHashStr } from "@waku/core"; import type { - Callback, + ContentTopic, FilterProtocolOptions, IDecodedMessage, - IDecoder, IProtoMessage, + IRoutingInfo, PeerIdStr } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; @@ -51,8 +51,8 @@ export class Subscription { private readonly receivedMessages = new TTLSet(60_000); private callbacks = new Map< - IDecoder, - EventHandler> + ContentTopic, + EventHandler> >(); private messageEmitter = new TypedEventEmitter(); @@ -63,9 +63,7 @@ export class Subscription { private keepAliveIntervalId: number | null = null; private get contentTopics(): string[] { - const allTopics = Array.from(this.callbacks.keys()).map( - (k) => k.contentTopic - ); + const allTopics = Array.from(this.callbacks.keys()); const uniqueTopics = new Set(allTopics).values(); return Array.from(uniqueTopics); @@ -131,14 +129,13 @@ export class Subscription { return this.callbacks.size === 0; } - public async add( - decoder: IDecoder | IDecoder[], - callback: Callback + public async add( + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo, + callback: (msg: IDecodedMessage) => void | Promise ): Promise { - const decoders = Array.isArray(decoder) ? decoder : [decoder]; - - for (const decoder of decoders) { - this.addSingle(decoder, callback); + for (const contentTopic of contentTopics) { + this.addSingle(contentTopic, routingInfo, callback); } return this.toSubscribeContentTopics.size > 0 @@ -146,13 +143,9 @@ export class Subscription { : true; // if content topic is not new - subscription, most likely exists } - public async remove( - decoder: IDecoder | IDecoder[] - ): Promise { - const decoders = Array.isArray(decoder) ? decoder : [decoder]; - - for (const decoder of decoders) { - this.removeSingle(decoder); + public async remove(contentTopics: ContentTopic[]): Promise { + for (const contentTopic of contentTopics) { + this.removeSingle(contentTopic); } return this.toUnsubscribeContentTopics.size > 0 @@ -177,76 +170,63 @@ export class Subscription { ); } - private addSingle( - decoder: IDecoder, - callback: Callback + private addSingle( + contentTopic: ContentTopic, + routingInfo: IRoutingInfo, + callback: (msg: IDecodedMessage) => void | Promise ): void { - log.info(`Adding subscription for contentTopic: ${decoder.contentTopic}`); + log.info(`Adding subscription for contentTopic: ${contentTopic}`); - const isNewContentTopic = !this.contentTopics.includes( - decoder.contentTopic - ); + const isNewContentTopic = !this.contentTopics.includes(contentTopic); if (isNewContentTopic) { - this.toSubscribeContentTopics.add(decoder.contentTopic); + this.toSubscribeContentTopics.add(contentTopic); } - if (this.callbacks.has(decoder)) { + if (this.callbacks.has(contentTopic)) { log.warn( - `Replacing callback associated associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}` + `Replacing callback associated associated with decoder with pubsubTopic:${routingInfo.pubsubTopic} and contentTopic:${contentTopic}` ); - const callback = this.callbacks.get(decoder); - this.callbacks.delete(decoder); - this.messageEmitter.removeEventListener(decoder.contentTopic, callback); + const callback = this.callbacks.get(contentTopic); + this.callbacks.delete(contentTopic); + this.messageEmitter.removeEventListener(contentTopic, callback); } - const eventHandler = (event: CustomEvent): void => { - void (async (): Promise => { - try { - const message = await decoder.fromProtoObj( - decoder.pubsubTopic, - event.detail as IProtoMessage - ); - void callback(message!); - } catch (err) { - log.error("Error decoding message", err); - } - })(); + const eventHandler = (event: CustomEvent): void => { + void callback(event.detail); }; - this.callbacks.set(decoder, eventHandler); - this.messageEmitter.addEventListener(decoder.contentTopic, eventHandler); + this.callbacks.set(contentTopic, eventHandler); + this.messageEmitter.addEventListener(contentTopic, eventHandler); log.info( - `Subscription added for contentTopic: ${decoder.contentTopic}, isNewContentTopic: ${isNewContentTopic}` + `Subscription added for contentTopic: ${contentTopic}, isNewContentTopic: ${isNewContentTopic}` ); } - private removeSingle(decoder: IDecoder): void { - log.info(`Removing subscription for contentTopic: ${decoder.contentTopic}`); + private removeSingle(contentTopic: ContentTopic): void { + log.info(`Removing subscription for contentTopic: ${contentTopic}`); - const callback = this.callbacks.get(decoder); + const callback = this.callbacks.get(contentTopic); if (!callback) { log.warn( - `No callback associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}` + `No callback associated with decoder with contentTopic: ${contentTopic}` ); } - this.callbacks.delete(decoder); - this.messageEmitter.removeEventListener(decoder.contentTopic, callback); + this.callbacks.delete(contentTopic); + this.messageEmitter.removeEventListener(contentTopic, callback); - const isCompletelyRemoved = !this.contentTopics.includes( - decoder.contentTopic - ); + const isCompletelyRemoved = !this.contentTopics.includes(contentTopic); if (isCompletelyRemoved) { - this.toUnsubscribeContentTopics.add(decoder.contentTopic); + this.toUnsubscribeContentTopics.add(contentTopic); } log.info( - `Subscription removed for contentTopic: ${decoder.contentTopic}, isCompletelyRemoved: ${isCompletelyRemoved}` + `Subscription removed for contentTopic: ${contentTopic}, isCompletelyRemoved: ${isCompletelyRemoved}` ); } @@ -383,8 +363,8 @@ export class Subscription { } private disposeHandlers(): void { - for (const [decoder, handler] of this.callbacks.entries()) { - this.messageEmitter.removeEventListener(decoder.contentTopic, handler); + for (const [contentTopic, handler] of this.callbacks.entries()) { + this.messageEmitter.removeEventListener(contentTopic, handler); } this.callbacks.clear(); } diff --git a/packages/sdk/src/filter/types.ts b/packages/sdk/src/filter/types.ts index 44326728d1..e748c9d93e 100644 --- a/packages/sdk/src/filter/types.ts +++ b/packages/sdk/src/filter/types.ts @@ -1,6 +1,9 @@ import type { FilterCore } from "@waku/core"; -import type { FilterProtocolOptions, Libp2p } from "@waku/interfaces"; -import type { WakuMessage } from "@waku/proto"; +import type { + FilterProtocolOptions, + IDecodedMessage, + Libp2p +} from "@waku/interfaces"; import type { PeerManager } from "../peer_manager/index.js"; @@ -11,7 +14,7 @@ export type FilterConstructorParams = { }; export type SubscriptionEvents = { - [contentTopic: string]: CustomEvent; + [contentTopic: string]: CustomEvent; }; export type SubscriptionParams = { diff --git a/packages/sdk/src/reliable_channel/events.ts b/packages/sdk/src/reliable_channel/events.ts index c79c2c0c0f..2382c73103 100644 --- a/packages/sdk/src/reliable_channel/events.ts +++ b/packages/sdk/src/reliable_channel/events.ts @@ -1,4 +1,4 @@ -import { IDecodedMessage, ProtocolError } from "@waku/interfaces"; +import { ProtocolError } from "@waku/interfaces"; import type { HistoryEntry, MessageId } from "@waku/sds"; export const ReliableChannelEvent = { @@ -56,8 +56,7 @@ export interface ReliableChannelEvents { possibleAckCount: number; }>; "message-acknowledged": CustomEvent; - // TODO probably T extends IDecodedMessage? - "message-received": CustomEvent; + "message-received": CustomEvent; "irretrievable-message": CustomEvent; "sending-message-irrecoverable-error": CustomEvent<{ messageId: MessageId; diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 49b55aa495..8ea6b58a84 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -1,11 +1,12 @@ import { TypedEventEmitter } from "@libp2p/interface"; import { messageHash } from "@waku/core"; import { - type Callback, + type ContentTopic, type IDecodedMessage, type IDecoder, type IEncoder, type IMessage, + type IRoutingInfo, ISendOptions, type IWaku, LightPushError, @@ -132,8 +133,9 @@ export class ReliableChannel< ) => Promise; private readonly _subscribe: ( - decoders: IDecoder | IDecoder[], - callback: Callback + contentTopics: ContentTopic[], + routingInfo: IRoutingInfo, + callback: (msg: IDecodedMessage) => void | Promise ) => Promise; private readonly _retrieve?: ( @@ -324,10 +326,11 @@ export class ReliableChannel< const messageId = ReliableChannel.getMessageId(messagePayload); - // TODO: should the encoder give me the message hash? - // Encoding now to fail early, used later to get message hash - const protoMessage = await this.encoder.toProtoObj(wakuMessage); - if (!protoMessage) { + const retrievalHint = await computeRetrievalHint( + messagePayload, + this.encoder + ); + if (!retrievalHint) { this.safeSendEvent("sending-message-irrecoverable-error", { detail: { messageId: messageId, @@ -336,10 +339,6 @@ export class ReliableChannel< }); return { success: false }; } - const retrievalHint = messageHash( - this.encoder.pubsubTopic, - protoMessage - ); this.safeSendEvent("sending-message", { detail: messageId @@ -383,9 +382,13 @@ export class ReliableChannel< private async subscribe(): Promise { this.assertStarted(); - return this._subscribe(this.decoder, async (message: T) => { - await this.processIncomingMessage(message); - }); + return this._subscribe( + [this.decoder.contentTopic], + this.decoder.routingInfo, + async (message: IDecodedMessage) => { + await this.processIncomingMessage(message); + } + ); } /** @@ -393,9 +396,7 @@ export class ReliableChannel< * @param msg * @private */ - private async processIncomingMessage( - msg: T - ): Promise { + private async processIncomingMessage(msg: IDecodedMessage): Promise { // New message arrives, we need to unwrap it first const sdsMessage = SdsMessage.decode(msg.payload); @@ -422,25 +423,8 @@ export class ReliableChannel< if (sdsMessage.content && sdsMessage.content.length > 0) { // Now, process the message with callback - - // Overrides msg.payload with unwrapped payload - // TODO: can we do better? - const { payload: _p, ...allButPayload } = msg; - const unwrappedMessage = Object.assign(allButPayload, { - payload: sdsMessage.content, - hash: msg.hash, - hashStr: msg.hashStr, - version: msg.version, - contentTopic: msg.contentTopic, - pubsubTopic: msg.pubsubTopic, - timestamp: msg.timestamp, - rateLimitProof: msg.rateLimitProof, - ephemeral: msg.ephemeral, - meta: msg.meta - }); - this.safeSendEvent("message-received", { - detail: unwrappedMessage as unknown as T + detail: sdsMessage.content }); } @@ -689,3 +673,16 @@ export class ReliableChannel< } } } + +async function computeRetrievalHint( + payload: Uint8Array, + encoder: IEncoder +): Promise { + // TODO: should the encoder give me the message hash? + // Encoding now to fail early, used later to get message hash + const protoMessage = await encoder.toProtoObj({ payload }); + if (!protoMessage) { + return undefined; + } + return messageHash(encoder.pubsubTopic, protoMessage); +} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 7336b06df7..03366de5a2 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -6,7 +6,8 @@ import { } from "@libp2p/interface"; import type { MultiaddrInput } from "@multiformats/multiaddr"; import { ConnectionManager, createDecoder, createEncoder } from "@waku/core"; -import type { +import { + ContentTopic, CreateDecoderParams, CreateEncoderParams, CreateNodeOptions, @@ -15,6 +16,7 @@ import type { IEncoder, IFilter, ILightPush, + IMessageEmitter, IRelay, IRoutingInfo, IStore, @@ -28,7 +30,7 @@ import { HealthStatus, Protocols } from "@waku/interfaces"; -import { createRoutingInfo, Logger } from "@waku/utils"; +import { createRoutingInfo, Logger, pushOrInitMapSet } from "@waku/utils"; import { Filter } from "../filter/index.js"; import { HealthIndicator } from "../health_indicator/index.js"; @@ -54,6 +56,7 @@ export class WakuNode implements IWaku { public lightPush?: ILightPush; public readonly events: IWakuEventEmitter = new TypedEventEmitter(); + public readonly messageEmitter: IMessageEmitter = new TypedEventEmitter(); private readonly networkConfig: NetworkConfig; @@ -134,6 +137,37 @@ export class WakuNode implements IWaku { ); } + public async subscribe(contentTopics: ContentTopic[]): Promise { + // Group content topics via routing info in case they spread across several shards + const ctToRouting: Map> = new Map(); + for (const contentTopic of contentTopics) { + const routingInfo = this.createRoutingInfo(contentTopic); + pushOrInitMapSet(ctToRouting, routingInfo, contentTopic); + } + + const promises = []; + if (this.filter) { + for (const [routingInfo, contentTopics] of ctToRouting) { + // TODO: Returned bool from subscribe should be used + promises.push( + this.filter.subscribe( + Array.from(contentTopics), + routingInfo, + this.emitIncomingMessages.bind(this, Array.from(contentTopics)) + ) + ); + } + + await Promise.all(promises); + return; + } + + if (this.relay) { + throw "not implemented"; + } + throw "no subscribe protocol available"; + } + public get peerId(): PeerId { return this.libp2p.peerId; } @@ -288,4 +322,20 @@ export class WakuNode implements IWaku { ): IRoutingInfo { return createRoutingInfo(this.networkConfig, { contentTopic, shardId }); } + + private emitIncomingMessages( + contentTopics: ContentTopic[], + message: { + contentTopic: ContentTopic; + payload: Uint8Array; + } + ): void { + if (contentTopics.includes(message.contentTopic)) { + this.messageEmitter.dispatchEvent( + new CustomEvent(message.contentTopic, { + detail: message.payload + }) + ); + } + } } diff --git a/packages/tests/src/lib/index.ts b/packages/tests/src/lib/index.ts index 85de368b23..742c09c092 100644 --- a/packages/tests/src/lib/index.ts +++ b/packages/tests/src/lib/index.ts @@ -124,14 +124,14 @@ export class ServiceNodesFleet { } class MultipleNodesMessageCollector { - public callback: (msg: IDecodedMessage) => void = () => {}; - protected messageList: Array = []; + public callback: (msg: Partial) => void = () => {}; + protected messageList: Array> = []; public constructor( private messageCollectors: MessageCollector[], private relayNodes?: ServiceNode[], private strictChecking: boolean = false ) { - this.callback = (msg: IDecodedMessage): void => { + this.callback = (msg: Partial): void => { log.info("Got a message"); this.messageList.push(msg); }; @@ -153,7 +153,9 @@ class MultipleNodesMessageCollector { } } - public getMessage(index: number): MessageRpcResponse | IDecodedMessage { + public getMessage( + index: number + ): MessageRpcResponse | Partial { return this.messageList[index]; } diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 9483bc8ba6..7565de86a6 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -7,10 +7,17 @@ import type { RelayNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; -import { generateSymmetricKey } from "@waku/message-encryption"; +import { + comparePublicKeys, + generatePrivateKey, + generateSymmetricKey, + getPublicKey +} from "@waku/message-encryption"; import { createDecoder, - createEncoder + createEncoder, + SymmetricDecryption, + SymmetricDecryptionResult } from "@waku/message-encryption/symmetric"; import { createRelayNode } from "@waku/relay"; import { @@ -29,8 +36,11 @@ import { makeLogFileName, NOISE_KEY_1, NOISE_KEY_2, + runMultipleNodes, ServiceNode, - tearDownNodes + ServiceNodesFleet, + tearDownNodes, + teardownNodesWithRedundancy } from "../src/index.js"; const TestContentTopic = "/test/1/waku/utf8"; @@ -291,3 +301,148 @@ describe("User Agent", function () { ); }); }); + +describe("Waku API", function () { + describe("WakuNode.subscribe (light node)", function () { + this.timeout(100000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + const messageText = "some message"; + const messagePayload = utf8ToBytes(messageText); + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestRoutingInfo, + undefined + ); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + it("Subscribe and receive messages on 2 different content topics", async function () { + // Subscribe to the first content topic and send a message. + waku.messageEmitter.addEventListener(TestContentTopic, (event) => { + // TODO: fix the callback type + serviceNodes.messageCollector.callback({ + contentTopic: TestContentTopic, + payload: event.detail + }); + }); + await waku.subscribe([TestContentTopic]); + + await waku.lightPush.send(TestEncoder, { payload: messagePayload }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true, + "Waiting for the first message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + + // Modify subscription to include a new content topic and send a message. + const newMessageText = "Filtering still works!"; + const newContentTopic = "/test/2/waku-filter/default"; + const newRoutingInfo = createRoutingInfo(DefaultTestNetworkConfig, { + contentTopic: newContentTopic + }); + const newEncoder = createPlainEncoder({ + contentTopic: newContentTopic, + routingInfo: newRoutingInfo + }); + // subscribe to second content topic + waku.messageEmitter.addEventListener(newContentTopic, (event) => { + // TODO: fix the callback type + serviceNodes.messageCollector.callback({ + contentTopic: TestContentTopic, + payload: event.detail + }); + }); + await waku.subscribe([newContentTopic]); + + await waku.lightPush.send(newEncoder, { + payload: utf8ToBytes(newMessageText) + }); + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true, + "Waiting for the second message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedContentTopic: newContentTopic, + expectedMessageText: newMessageText, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + + // Send another message on the initial content topic to verify it still works. + const thirdMessageText = "Filtering still works on first subscription!"; + const thirdMessagePayload = { payload: utf8ToBytes(thirdMessageText) }; + await waku.lightPush.send(TestEncoder, thirdMessagePayload); + expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq( + true, + "Waiting for the third message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(2, { + expectedMessageText: thirdMessageText, + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + }); + + it("Subscribe and receive messages encrypted with AES", async function () { + const symKey = generateSymmetricKey(); + const senderPrivKey = generatePrivateKey(); + // TODO: For now, still using encoder + const newEncoder = createEncoder({ + contentTopic: TestContentTopic, + routingInfo: TestRoutingInfo, + symKey, + sigPrivKey: senderPrivKey + }); + + // Setup payload decryption + const symDecryption = new SymmetricDecryption(symKey); + + // subscribe to second content topic + waku.messageEmitter.addEventListener(TestContentTopic, (event) => { + const encryptedPayload = event.detail; + void symDecryption + .decrypt(encryptedPayload) + .then((decryptionResult: SymmetricDecryptionResult | undefined) => { + if (!decryptionResult) return; + serviceNodes.messageCollector.callback({ + contentTopic: TestContentTopic, + payload: decryptionResult.payload + }); + + // TODO: probably best to adapt the message collector + expect(decryptionResult?.signature).to.not.be.undefined; + expect( + comparePublicKeys( + getPublicKey(senderPrivKey), + decryptionResult?.signaturePublicKey + ) + ); + // usually best to ignore decryption failure + }); + }); + await waku.subscribe([TestContentTopic]); + + await waku.lightPush.send(newEncoder, { + payload: utf8ToBytes(messageText) + }); + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true, + "Waiting for the message" + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedContentTopic: TestContentTopic, + expectedMessageText: messageText, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + }); + }); +});