Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d29994d
add draft
weboko Aug 20, 2025
97f0fb2
Merge branch 'master' of github.com:waku-org/js-waku into weboko/send…
weboko Aug 22, 2025
01288ff
add draft api
weboko Sep 4, 2025
e54645a
Merge branch 'master' of github.com:waku-org/js-waku into weboko/send…
weboko Sep 4, 2025
3de906a
implement basic entites and structure, decouple into separate files
weboko Sep 24, 2025
4fe8bfd
implement main ack manager, improve message store, implement Sender e…
weboko Sep 24, 2025
37ee490
implement background send, requestID
weboko Sep 25, 2025
998c2a1
Merge branch 'master' of github.com:waku-org/js-waku into weboko/send…
weboko Sep 29, 2025
0c852e4
add utils, fix typings
weboko Sep 29, 2025
9e7719e
add ICodec
weboko Sep 29, 2025
ad675d8
implement send on waku
weboko Sep 29, 2025
7c67abe
implement and fix queuing mechanics of message store
weboko Sep 30, 2025
4385104
implement acks and message hash saving for sent messages
weboko Oct 2, 2025
7f98bb1
move from encoder/decoder/codec to simple message parameter
weboko Oct 2, 2025
da11478
implement working version of sendind
weboko Oct 6, 2025
62b43f0
merge with master
weboko Oct 6, 2025
80cef4b
remove Codec, update types
weboko Oct 6, 2025
82be279
Merge branch 'master' of github.com:waku-org/js-waku into weboko/send…
weboko Oct 7, 2025
969330d
add unit tests
weboko Oct 7, 2025
de70791
Merge branch 'master' of github.com:waku-org/js-waku into weboko/send…
weboko Oct 8, 2025
e39c6c9
address review
weboko Oct 8, 2025
974fb21
fix start
weboko Oct 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
export { createEncoder, createDecoder } from "./lib/message/version_0.js";
export { createCodec } from "./lib/message/index.js";
export type {
Encoder,
Decoder,
DecodedMessage
} from "./lib/message/version_0.js";
export type { Codec } from "./lib/message/index.js";
export * as message from "./lib/message/index.js";

export * as waku_filter from "./lib/filter/index.js";
Expand Down
76 changes: 76 additions & 0 deletions packages/core/src/lib/message/codec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import type {
ICodec,
IDecodedMessage,
IDecoder,
IEncoder,
IMessage,
IMetaSetter,
IProtoMessage,
IRoutingInfo,
PubsubTopic
} from "@waku/interfaces";

import { Decoder, Encoder } from "./version_0.js";

export class Codec implements ICodec<IDecodedMessage> {
private encoder: IEncoder;
private decoder: IDecoder<IDecodedMessage>;

public constructor(
public contentTopic: string,
public ephemeral: boolean = false,
public routingInfo: IRoutingInfo,
public metaSetter?: IMetaSetter
) {
this.encoder = new Encoder(
contentTopic,
ephemeral,
routingInfo,
metaSetter
);
this.decoder = new Decoder(contentTopic, routingInfo);
}

public get pubsubTopic(): PubsubTopic {
return this.routingInfo.pubsubTopic;
}

public async toWire(message: IMessage): Promise<Uint8Array | undefined> {
return this.encoder.toWire(message);
}

public async toProtoObj(
message: IMessage
): Promise<IProtoMessage | undefined> {
return this.encoder.toProtoObj(message);
}

public fromWireToProtoObj(
bytes: Uint8Array
): Promise<IProtoMessage | undefined> {
return this.decoder.fromWireToProtoObj(bytes);
}

public async fromProtoObj(
pubsubTopic: string,
proto: IProtoMessage
): Promise<IDecodedMessage | undefined> {
return this.decoder.fromProtoObj(pubsubTopic, proto);
}
}

type CodecParams = {
contentTopic: string;
ephemeral: boolean;
routingInfo: IRoutingInfo;
metaSetter?: IMetaSetter;
};

export function createCodec(params: CodecParams): Codec {
return new Codec(
params.contentTopic,
params.ephemeral,
params.routingInfo,
params.metaSetter
);
}
2 changes: 2 additions & 0 deletions packages/core/src/lib/message/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const OneMillion = BigInt(1_000_000);
export const Version = 0;
2 changes: 2 additions & 0 deletions packages/core/src/lib/message/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
export * as version_0 from "./version_0.js";
export { Codec, createCodec } from "./codec.js";
export { OneMillion, Version } from "./constants.js";
4 changes: 2 additions & 2 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import { bytesToHex } from "@waku/utils/bytes";

import { messageHash } from "../message_hash/index.js";

import { OneMillion, Version } from "./constants.js";

const log = new Logger("message:version-0");
const OneMillion = BigInt(1_000_000);

export const Version = 0;
export { proto };

export class DecodedMessage implements IDecodedMessage {
Expand Down
2 changes: 2 additions & 0 deletions packages/interfaces/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,5 @@ export interface IDecoder<T extends IDecodedMessage> {
proto: IProtoMessage
) => Promise<T | undefined>;
}

export type ICodec<T extends IDecodedMessage> = IEncoder & IDecoder<T>;
66 changes: 65 additions & 1 deletion packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import type { IFilter } from "./filter.js";
import type { HealthStatus } from "./health_status.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
import { IDecodedMessage, IDecoder, IEncoder } from "./message.js";
import {
ICodec,
IDecodedMessage,
IDecoder,
IEncoder,
IMessage
} from "./message.js";
import type { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { ShardId } from "./sharding.js";
Expand All @@ -25,6 +31,8 @@ export type CreateEncoderParams = CreateDecoderParams & {
ephemeral?: boolean;
};

export type CreateCodecParams = CreateDecoderParams & CreateEncoderParams;

export enum WakuEvent {
Connection = "waku:connection",
Health = "waku:health"
Expand Down Expand Up @@ -58,9 +66,22 @@ export type IWakuEventEmitter = TypedEventEmitter<IWakuEvents>;

export interface IWaku {
libp2p: Libp2p;

/**
* @deprecated should not be accessed directly, use {@link IWaku.send} and {@link IWaku.subscribe} instead
*/
relay?: IRelay;

store?: IStore;

/**
* @deprecated should not be accessed directly, use {@link IWaku.subscribe} instead
*/
filter?: IFilter;

/**
* @deprecated should not be accessed directly, use {@link IWaku.send} instead
*/
lightPush?: ILightPush;

/**
Expand Down Expand Up @@ -193,6 +214,8 @@ export interface IWaku {
waitForPeers(protocols?: Protocols[], timeoutMs?: number): Promise<void>;

/**
* @deprecated Use {@link createCodec} instead
*
* Creates a decoder for Waku messages on a specific content topic.
*
* A decoder is used to decode messages from the Waku network format.
Expand Down Expand Up @@ -222,6 +245,8 @@ export interface IWaku {
createDecoder(params: CreateDecoderParams): IDecoder<IDecodedMessage>;

/**
* @deprecated Use {@link createCodec} instead
*
* Creates an encoder for Waku messages on a specific content topic.
*
* An encoder is used to encode messages into the Waku network format.
Expand Down Expand Up @@ -251,6 +276,45 @@ export interface IWaku {
*/
createEncoder(params: CreateEncoderParams): IEncoder;

/**
* Creates a codec for Waku messages on a specific content topic.
*
* A codec is used to encode and decode messages from the Waku network format.
* The codec automatically handles shard configuration based on the Waku node's network settings.
*
* @param {CreateCodecParams} params - Configuration for the codec including content topic and optionally shard information and ephemeral flag
* @returns {ICodec<IDecodedMessage>} A codec instance configured for the specified content topic
* @throws {Error} If the shard configuration is incompatible with the node's network settings
*
* @example
* ```typescript
* // Create a codec with default network shard settings
* const codec = waku.createCodec({
* contentTopic: "/my-app/1/chat/proto"
* });
*
* // Create a codec with custom shard settings
* const customCodec = waku.createCodec({
* contentTopic: "/my-app/1/chat/proto",
* ephemeral: true,
* shardInfo: {
* clusterId: 1,
* shard: 5
* }
* });
* ```
*/
createCodec(params: CreateCodecParams): ICodec<IDecodedMessage>;

/**
* Sends a message to the Waku network.
*
* @param {ICodec<IDecodedMessage>} codec - The codec to use for encoding the message
* @param {IMessage} message - The message to send
* @returns {Promise<string>} A promise that resolves to the request ID
*/
send(codec: ICodec<IDecodedMessage>, message: IMessage): Promise<string>;

/**
* @returns {boolean} `true` if the node was started and `false` otherwise
*/
Expand Down
2 changes: 1 addition & 1 deletion packages/rln/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export function toRLNSignal(contentTopic: string, msg: IMessage): Uint8Array {

export class RlnMessage<T extends IDecodedMessage> implements IRlnMessage {
public pubsubTopic = "";
public version = message.version_0.Version;
public version = message.Version;

public constructor(
private rlnInstance: RLNInstance,
Expand Down
3 changes: 2 additions & 1 deletion packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
"@waku/sds": "^0.0.7",
"@waku/utils": "0.0.27",
"libp2p": "2.8.11",
"lodash.debounce": "^4.0.8"
"lodash.debounce": "^4.0.8",
"uuid": "^10.0.0"
},
"devDependencies": {
"@libp2p/interface": "2.10.4",
Expand Down
138 changes: 138 additions & 0 deletions packages/sdk/src/messaging/ack_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { ICodec, IDecodedMessage, IFilter, IStore } from "@waku/interfaces";

import { MessageStore } from "./message_store.js";
import { IAckManager } from "./utils.js";

type AckManagerConstructorParams = {
messageStore: MessageStore;
filter: IFilter;
store: IStore;
};

export class AckManager implements IAckManager {
private readonly messageStore: MessageStore;
private readonly filterAckManager: FilterAckManager;
private readonly storeAckManager: StoreAckManager;

public constructor(params: AckManagerConstructorParams) {
this.messageStore = params.messageStore;

this.filterAckManager = new FilterAckManager(
this.messageStore,
params.filter
);

this.storeAckManager = new StoreAckManager(this.messageStore, params.store);
}

public start(): void {
this.filterAckManager.start();
this.storeAckManager.start();
}

public async stop(): Promise<void> {
await this.filterAckManager.stop();
this.storeAckManager.stop();
}

public async subscribe(codec: ICodec<IDecodedMessage>): Promise<boolean> {
return (
(await this.filterAckManager.subscribe(codec)) ||
(await this.storeAckManager.subscribe(codec))
);
}
}

class FilterAckManager implements IAckManager {
private codecs: Set<ICodec<IDecodedMessage>> = new Set();

public constructor(
private messageStore: MessageStore,
private filter: IFilter
) {}

public start(): void {
return;
}

public async stop(): Promise<void> {
const promises = Array.from(this.codecs.entries()).map((codec) =>
this.filter.unsubscribe(codec)
);
await Promise.all(promises);
this.codecs.clear();
}

public async subscribe(codec: ICodec<IDecodedMessage>): Promise<boolean> {
const success = await this.filter.subscribe(
codec,
this.onMessage.bind(this)
);
if (success) {
this.codecs.add(codec);
}
return success;
}

private async onMessage(message: IDecodedMessage): Promise<void> {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message, { filterAck: true });
}

this.messageStore.markFilterAck(message.hashStr);
}
}

class StoreAckManager implements IAckManager {
private interval: ReturnType<typeof setInterval> | null = null;

private codecs: Set<ICodec<IDecodedMessage>> = new Set();

public constructor(
private messageStore: MessageStore,
private store: IStore
) {}

public start(): void {
if (this.interval) {
return;
}

this.interval = setInterval(() => {
void this.query();
}, 1000);
}

public stop(): void {
if (!this.interval) {
return;
}

clearInterval(this.interval);
this.interval = null;
}

public async subscribe(codec: ICodec<IDecodedMessage>): Promise<boolean> {
this.codecs.add(codec);
return true;
}

private async query(): Promise<void> {
for (const codec of this.codecs) {
await this.store.queryWithOrderedCallback(
[codec],
(message) => {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message, { storeAck: true });
}

this.messageStore.markStoreAck(message.hashStr);
},
{
timeStart: new Date(Date.now() - 60 * 60 * 1000),
timeEnd: new Date()
}
);
}
}
}
2 changes: 2 additions & 0 deletions packages/sdk/src/messaging/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { Messaging } from "./messaging.js";
export type { RequestId } from "./utils.js";
Loading
Loading