Skip to content

Publish/Subscribe Namespace Support#157

Open
suhasHere wants to merge 21 commits intocloudflare:mainfrom
suhasHere:upstream-publish-subscribe
Open

Publish/Subscribe Namespace Support#157
suhasHere wants to merge 21 commits intocloudflare:mainfrom
suhasHere:upstream-publish-subscribe

Conversation

@suhasHere
Copy link
Copy Markdown

Note: This PR includes changes from #131 (draft-16
migration) plus the following additions:

  • Implement SUBSCRIBE_NAMESPACE/PUBLISH relay forwarding with
    subscriber registry
  • Preserve incoming subgroup header types when forwarding (fixes
    EndOfGroup handling)
  • Fix 1-second freeze on group transitions in SUBSCRIBE_NAMESPACE
    flow
  • Upgrade web-transport crates to v0.10 with subprotocol
    negotiation

…track

When a publisher sends PUBLISH with forward=0 (paused), the relay now:
1. Parses and stores the forward state from PUBLISH params
2. Waits for subscribers to arrive
3. Sends REQUEST_UPDATE with forward=1 to tell publisher to start sending

This fixes the issue where subscribers would never receive data because
the publisher was waiting for forward=1 but the relay never sent it.
@suhasHere suhasHere force-pushed the upstream-publish-subscribe branch from d0120e9 to c8cb923 Compare April 21, 2026 04:09
- Keep forwarded PublishNamespace handle alive in function scope to prevent
  premature PublishNamespaceDone being sent when task completes
- Detect and remove stale TrackInfo entries (Publishing state with no writer)
  to allow publishers to reconnect without 'already publishing' errors
When a publisher disconnects and reconnects, the entire namespace entry
(not just the TrackInfo) needs to be recreated because the TracksWriter
is also closed/stale.
The handles were being dropped at end of each loop iteration, causing
PublishNamespaceDone to be sent immediately after PublishNamespace.
SUBSCRIBE_NAMESPACE is for namespace discovery - the relay should send
NAMESPACE messages to notify about available tracks, not send PUBLISH
and stream data. The subscriber explicitly SUBSCRIBEs to tracks it wants.
Flow: relay sends PUBLISH -> client sends PUBLISH_OK -> relay streams data.
Changed from serve_immediately() to serve() which waits for PUBLISH_OK.
When a client sends SUBSCRIBE_NAMESPACE and also PUBLISH on the same session,
the relay should not forward the client's own PUBLISH back to them.

Added session_id tracking to Consumer and Producer, used when registering
subscriptions and notifying of PUBLISH events. Notifications skip subscriptions
from the same session that originated the PUBLISH.
When handling SUBSCRIBE_NAMESPACE, in addition to PUBLISH_NAMESPACE for
existing namespaces, also send PUBLISH for existing tracks that are already
in Publishing state. This triggers the client's onMatch callback for track
discovery (client expects PUBLISH not just ANNOUNCE/PUBLISH_NAMESPACE).
Client onMatch callback only fires on PUBLISH messages, not PUBLISH_NAMESPACE.
Remove PUBLISH_NAMESPACE sending and only forward PUBLISH for existing tracks
in matching namespaces.
Store track_extensions from incoming PUBLISH in TrackInfo and
forward them when relaying PUBLISH to subscribers. This preserves
end-to-end extension headers that clients require.
…sions

When objects have empty extension headers but the preserved header type
has extensions enabled (e.g., SubgroupIdExt), convert to the non-Ext
variant before writing the stream header. This prevents the subscriber
from expecting extension_length fields that aren't present.

Added StreamHeaderType::without_extensions() to convert Ext types to
their non-Ext equivalents.
@suhasHere suhasHere force-pushed the upstream-publish-subscribe branch from 94be232 to 68cc684 Compare April 21, 2026 07:56
The datagram serve layer was only keeping the latest datagram, causing
most datagrams to be dropped when they arrive faster than the reader
can consume them (e.g., 50/sec audio).

Changed to use tokio broadcast channel (1024 buffer) so datagrams are
queued and forwarded in order. Broadcast allows cloning the reader.
Logs warning if reader lags behind.

Also fixed is_closed() which was incorrectly returning true when the
buffer was empty (rx.len() == 0), potentially causing premature exit.
@suhasHere suhasHere force-pushed the upstream-publish-subscribe branch from 68cc684 to 0112f91 Compare April 21, 2026 08:04
Use fast-path immediate lookup for datagram alias resolution instead of
sequential 1-second timeout lookups. Only fall back to waiting on the
first datagram before the alias mapping is established.
subscribed.rs: Use conditional encoding based on header_type.has_extension_headers()
- SubgroupObjectExt for Ext header types (includes extension_length field)
- SubgroupObject for non-Ext header types (no extension_length field)

subscriber.rs: Use fast-path immediate lookup for datagram alias resolution
- Fixes datagram forwarding rate from 1/sec to 50/sec
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant