Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d29994d
add draft
weboko Aug 20, 2025
97f0fb2
Merge branch 'master' of github.com:waku-org/js-waku into weboko/send…
weboko Aug 22, 2025
01288ff
add draft api
weboko Sep 4, 2025
e54645a
Merge branch 'master' of github.com:waku-org/js-waku into weboko/send…
weboko Sep 4, 2025
3de906a
implement basic entites and structure, decouple into separate files
weboko Sep 24, 2025
4fe8bfd
implement main ack manager, improve message store, implement Sender e…
weboko Sep 24, 2025
37ee490
implement background send, requestID
weboko Sep 25, 2025
998c2a1
Merge branch 'master' of github.com:waku-org/js-waku into weboko/send…
weboko Sep 29, 2025
0c852e4
add utils, fix typings
weboko Sep 29, 2025
9e7719e
add ICodec
weboko Sep 29, 2025
ad675d8
implement send on waku
weboko Sep 29, 2025
7c67abe
implement and fix queuing mechanics of message store
weboko Sep 30, 2025
4385104
implement acks and message hash saving for sent messages
weboko Oct 2, 2025
7f98bb1
move from encoder/decoder/codec to simple message parameter
weboko Oct 2, 2025
da11478
implement working version of sendind
weboko Oct 6, 2025
62b43f0
merge with master
weboko Oct 6, 2025
80cef4b
remove Codec, update types
weboko Oct 6, 2025
82be279
Merge branch 'master' of github.com:waku-org/js-waku into weboko/send…
weboko Oct 7, 2025
969330d
add unit tests
weboko Oct 7, 2025
de70791
Merge branch 'master' of github.com:waku-org/js-waku into weboko/send…
weboko Oct 8, 2025
e39c6c9
address review
weboko Oct 8, 2025
974fb21
fix start
weboko Oct 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 20 additions & 6 deletions packages/core/src/lib/light_push/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ export class LightPushCore {
};
}

const { rpc, error: prepError } = await ProtocolHandler.preparePushMessage(
encoder,
message,
protocol
);
const {
rpc,
error: prepError,
message: protoMessage
} = await ProtocolHandler.preparePushMessage(encoder, message, protocol);

if (prepError) {
return {
Expand Down Expand Up @@ -117,7 +117,21 @@ export class LightPushCore {
};
}

return ProtocolHandler.handleResponse(bytes, protocol, peerId);
const processedResponse = ProtocolHandler.handleResponse(
bytes,
protocol,
peerId
);

if (processedResponse.success) {
return {
success: processedResponse.success,
failure: null,
message: protoMessage
};
}

return processedResponse;
}

private async getProtocol(
Expand Down
15 changes: 11 additions & 4 deletions packages/core/src/lib/light_push/protocol_handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import type { PeerId } from "@libp2p/interface";
import type { IEncoder, IMessage, LightPushCoreResult } from "@waku/interfaces";
import type {
IEncoder,
IMessage,
IProtoMessage,
LightPushCoreResult
} from "@waku/interfaces";
import { LightPushError, LightPushStatusCode } from "@waku/interfaces";
import { PushResponse, WakuMessage } from "@waku/proto";
import { isMessageSizeUnderCap, Logger } from "@waku/utils";
Expand All @@ -15,8 +20,8 @@ type VersionedPushRpc =
| ({ version: "v3" } & PushRpc);

type PreparePushMessageResult =
| { rpc: VersionedPushRpc; error: null }
| { rpc: null; error: LightPushError };
| { rpc: VersionedPushRpc; error: null; message?: IProtoMessage }
| { rpc: null; error: LightPushError; message?: IProtoMessage };

const log = new Logger("light-push:protocol-handler");

Expand Down Expand Up @@ -47,13 +52,15 @@ export class ProtocolHandler {
log.info("Creating v3 RPC message");
return {
rpc: ProtocolHandler.createV3Rpc(protoMessage, encoder.pubsubTopic),
error: null
error: null,
message: protoMessage
};
}

log.info("Creating v2 RPC message");
return {
rpc: ProtocolHandler.createV2Rpc(protoMessage, encoder.pubsubTopic),
message: protoMessage,
error: null
};
} catch (err) {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/lib/message/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const OneMillion = BigInt(1_000_000);
export const Version = 0;
1 change: 1 addition & 0 deletions packages/core/src/lib/message/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * as version_0 from "./version_0.js";
export { OneMillion, Version } from "./constants.js";
4 changes: 2 additions & 2 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import { bytesToHex } from "@waku/utils/bytes";

import { messageHash } from "../message_hash/index.js";

import { OneMillion, Version } from "./constants.js";

const log = new Logger("message:version-0");
const OneMillion = BigInt(1_000_000);

export const Version = 0;
export { proto };

export class DecodedMessage implements IDecodedMessage {
Expand Down
15 changes: 15 additions & 0 deletions packages/interfaces/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ export interface IMessage {
rateLimitProof?: IRateLimitProof;
}

/**
* Send message data structure used in {@link IWaku.send}.
*/
export interface ISendMessage {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced we need a type here, it just makes it harder to understand the API.

Just do

send: (
  contentTopic: string;
  payload: Uint8Array;
  ephemeral?: boolean;
  ) => Promise<Uint8Array | undefined>;

contentTopic: string;
payload: Uint8Array;
ephemeral?: boolean;
rateLimitProof?: boolean;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the API consumer should be exposed to the rate limit proof, this sit behind the Waku API. see createNode spec for details.

}

/**
* Request ID of attempt to send a message.
*/
export type RequestId = string;

export interface IMetaSetter {
(message: IProtoMessage & { meta: undefined }): Uint8Array;
}
Expand Down
18 changes: 15 additions & 3 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { DiscoveryOptions, PeerCache } from "./discovery.js";
import type { FilterProtocolOptions } from "./filter.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { LightPushProtocolOptions } from "./light_push.js";
import type { IDecodedMessage } from "./message.js";
import type { IDecodedMessage, IProtoMessage } from "./message.js";
import type { ThisAndThat, ThisOrThat } from "./misc.js";
import { NetworkConfig } from "./sharding.js";
import type { StoreProtocolOptions } from "./store.js";
Expand Down Expand Up @@ -195,7 +195,13 @@ export type LightPushCoreResult = ThisOrThat<
PeerId,
"failure",
LightPushFailure
>;
> & {
/**
* The proto object of the message.
* Present only if the message was successfully pushed to the network.
*/
message?: IProtoMessage;
};

export type FilterCoreResult = ThisOrThat<
"success",
Expand All @@ -209,7 +215,13 @@ export type LightPushSDKResult = ThisAndThat<
PeerId[],
"failures",
LightPushFailure[]
>;
> & {
/**
* The proto objects of the messages.
* Present only if the messages were successfully pushed to the network.
*/
messages?: IProtoMessage[];
};

export type FilterSDKResult = ThisAndThat<
"successes",
Expand Down
6 changes: 6 additions & 0 deletions packages/interfaces/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ export type ISendOptions = {
* @default false
*/
useLegacy?: boolean;

/**
* Amount of peers to send message to.
* Overrides `numPeersToUse` in {@link @waku/interfaces!CreateNodeOptions}.
*/
numPeersToUse?: number;
};

export interface ISender {
Expand Down
29 changes: 28 additions & 1 deletion packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import type { IFilter } from "./filter.js";
import type { HealthStatus } from "./health_status.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
import { IDecodedMessage, IDecoder, IEncoder } from "./message.js";
import {
IDecodedMessage,
IDecoder,
IEncoder,
ISendMessage,
RequestId
} from "./message.js";
import type { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { ShardId } from "./sharding.js";
Expand Down Expand Up @@ -58,9 +64,22 @@ export type IWakuEventEmitter = TypedEventEmitter<IWakuEvents>;

export interface IWaku {
libp2p: Libp2p;

/**
* @deprecated should not be accessed directly, use {@link IWaku.send} and {@link IWaku.subscribe} instead
*/
relay?: IRelay;

store?: IStore;

/**
* @deprecated should not be accessed directly, use {@link IWaku.subscribe} instead
*/
filter?: IFilter;

/**
* @deprecated should not be accessed directly, use {@link IWaku.send} instead
*/
lightPush?: ILightPush;

/**
Expand Down Expand Up @@ -251,6 +270,14 @@ export interface IWaku {
*/
createEncoder(params: CreateEncoderParams): IEncoder;

/**
* Sends a message to the Waku network.
*
* @param {ISendMessage} message - The message to send.
* @returns {Promise<RequestId>} A promise that resolves to the request ID
*/
send(message: ISendMessage): Promise<RequestId>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Waku message Id must be exposed in the API to enable SDS's retrieval hints.

I strongly recommend for it to be returned by send to avoid issues with timestamp and pubsubtopic (both being used to calculate message id)


/**
* @returns {boolean} `true` if the node was started and `false` otherwise
*/
Expand Down
3 changes: 2 additions & 1 deletion packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
"@waku/sds": "^0.0.7",
"@waku/utils": "0.0.27",
"libp2p": "2.8.11",
"lodash.debounce": "^4.0.8"
"lodash.debounce": "^4.0.8",
"uuid": "^10.0.0"
},
"devDependencies": {
"@libp2p/interface": "2.10.4",
Expand Down
24 changes: 16 additions & 8 deletions packages/sdk/src/light_push/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
type IEncoder,
ILightPush,
type IMessage,
IProtoMessage,
type ISendOptions,
type Libp2p,
LightPushCoreResult,
Expand Down Expand Up @@ -82,23 +83,27 @@ export class LightPush implements ILightPush {

log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic);

const peerIds = await this.peerManager.getPeers({
let peerIds = await this.peerManager.getPeers({
protocol: options.useLegacy ? "light-push-v2" : Protocols.LightPush,
pubsubTopic: encoder.pubsubTopic
});
peerIds = peerIds.slice(0, options.numPeersToUse);

const coreResults =
peerIds?.length > 0
? await Promise.all(
peerIds.map((peerId) =>
this.protocol
.send(encoder, message, peerId, options.useLegacy)
.catch((_e) => ({
success: null,
failure: {
error: LightPushError.GENERIC_FAIL
}
}))
.catch(
(_e) =>
({
success: null,
failure: {
error: LightPushError.GENERIC_FAIL
}
}) as LightPushCoreResult
)
)
)
: [];
Expand All @@ -110,7 +115,10 @@ export class LightPush implements ILightPush {
.map((v) => v.success) as PeerId[],
failures: coreResults
.filter((v) => v.failure)
.map((v) => v.failure) as LightPushFailure[]
.map((v) => v.failure) as LightPushFailure[],
messages: coreResults
.filter((v) => v.message)
.map((v) => v.message) as IProtoMessage[]
}
: {
successes: [],
Expand Down
Loading
Loading