Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
7e242f7
feat(drive-abci): preliminary support for event subscriptions in Plat…
lklimek Oct 2, 2025
187fd80
fix: Dockerfile
lklimek Oct 2, 2025
c3b1c1a
chore: cargo fmt
lklimek Oct 2, 2025
e3e5715
feat: rs-sdk add subscriptions feature
lklimek Oct 2, 2025
aa44c32
rs-dash-event-bus edition 2024
lklimek Oct 2, 2025
2c68c4c
chore: cargo.lock
lklimek Oct 2, 2025
953664c
dash-event-bus fixes
lklimek Oct 8, 2025
15664fb
fmt
lklimek Oct 8, 2025
bcc8a07
cargo clippy + cargo machete
lklimek Oct 8, 2025
8dcd14a
Merge remote-tracking branch 'origin/v2.1-dev' into feat/drive-event-bus
lklimek Oct 21, 2025
b8ae8cb
chore: fix build
lklimek Oct 21, 2025
9b8bc40
refactor: rewrite subscriptions to not use bi-directional streams
lklimek Oct 21, 2025
1b82c1b
chore: fix build
lklimek Oct 21, 2025
c1b30bd
chore: fmt
lklimek Oct 21, 2025
7771e32
Merge remote-tracking branch 'origin/v2.1-dev' into feat/drive-event-bus
lklimek Oct 21, 2025
1d7ca2e
chore: fix sdk
lklimek Oct 21, 2025
1bc5fb7
Merge remote-tracking branch 'origin/v2.1-dev' into feat/drive-event-bus
lklimek Oct 23, 2025
2d66a4c
chore: rabbit review and other small stuff
lklimek Oct 23, 2025
9122a58
chore: update grpc clients
lklimek Oct 23, 2025
a326b8a
chore: improve platform_events example
lklimek Oct 23, 2025
3e25658
fix(dashmate): dashmate restart --platform should restart rs-dapi and…
lklimek Oct 23, 2025
66a756d
feat(rs-dapi): keepalives for event stream
lklimek Oct 24, 2025
7c81629
chore(dapi-grpc): update protobuf clients
lklimek Oct 24, 2025
fe39bb9
Merge branch 'v2.2-dev' into feat/drive-event-bus
lklimek Oct 27, 2025
cb04779
test: exclude SubscribePlatformEvents from coverage checks
lklimek Oct 29, 2025
068dcd7
Merge remote-tracking branch 'origin/v2.2-dev' into feat/drive-event-bus
lklimek Oct 29, 2025
0f2d368
build: bump version to 2.2.0-dev.1 to make migrations run
lklimek Oct 30, 2025
ee86a19
feat(dapi): configurable timeouts
lklimek Oct 31, 2025
0c436ac
chore: apply rabbit's feedback and linter
lklimek Nov 3, 2025
5ccbd08
test: fix platform events test
lklimek Nov 3, 2025
c55eb12
chore: rebuild dapi clients
lklimek Nov 3, 2025
51fb24e
Merge remote-tracking branch 'origin/v2.2-dev' into feat/drive-event-bus
lklimek Nov 3, 2025
241b30b
chore: saturating sub the keepalive
lklimek Nov 3, 2025
d7d8f83
chore: recreate grpc clients
lklimek Nov 3, 2025
5ff52de
chore(drive-abci): lint and machete
lklimek Nov 3, 2025
7895cb6
fix failing migratons
lklimek Nov 3, 2025
d6f52ea
chore: typo
lklimek Nov 3, 2025
2c8d6be
chore: subscription id as int
lklimek Nov 3, 2025
fa042cc
refactor(rs-dapi): simplify timeout handling
lklimek Nov 3, 2025
1742c74
chore: docs typo
lklimek Nov 3, 2025
3148d08
chore: clippy
lklimek Nov 3, 2025
2293471
fix: envoy config broken
lklimek Nov 3, 2025
2b7e09f
chore: streaming grace period 5s in envoy
lklimek Nov 3, 2025
5d129f9
feat: stream idle timeout normalized to 300s
lklimek Nov 3, 2025
97b1b7e
chore: cleanup after rabbit feedback
lklimek Nov 3, 2025
ddb66d7
chore: rabbit feedback
lklimek Nov 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 61 additions & 0 deletions packages/dapi-grpc/protos/platform/v0/platform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,63 @@ package org.dash.platform.dapi.v0;

import "google/protobuf/timestamp.proto";

// Platform events subscription (v0)
message PlatformSubscriptionRequest {
message PlatformSubscriptionRequestV0 {
string client_subscription_id = 1;
PlatformFilterV0 filter = 2;
}
oneof version { PlatformSubscriptionRequestV0 v0 = 1; }
}

message PlatformSubscriptionResponse {
message PlatformSubscriptionResponseV0 {
string client_subscription_id = 1;
PlatformEventV0 event = 2;
}
oneof version { PlatformSubscriptionResponseV0 v0 = 1; }
}

// Initial placeholder filter and event to be refined during integration
// Filter for StateTransitionResult events
message StateTransitionResultFilter {
// When set, only match StateTransitionResult events for this tx hash.
optional bytes tx_hash = 1;
}

message PlatformFilterV0 {
oneof kind {
bool all = 1; // subscribe to all platform events
bool block_committed = 2; // subscribe to BlockCommitted events only
StateTransitionResultFilter state_transition_result =
3; // subscribe to StateTransitionResult events (optionally filtered by
// tx_hash)
}
}

message PlatformEventV0 {
message BlockMetadata {
uint64 height = 1 [ jstype = JS_STRING ];
uint64 time_ms = 2 [ jstype = JS_STRING ];
bytes block_id_hash = 3;
}

message BlockCommitted {
BlockMetadata meta = 1;
uint32 tx_count = 2;
}

message StateTransitionFinalized {
BlockMetadata meta = 1;
bytes tx_hash = 2;
}

oneof event {
BlockCommitted block_committed = 1;
StateTransitionFinalized state_transition_finalized = 2;
}
}

service Platform {
rpc broadcastStateTransition(BroadcastStateTransitionRequest)
returns (BroadcastStateTransitionResponse);
Expand Down Expand Up @@ -104,6 +161,10 @@ service Platform {
rpc getGroupActions(GetGroupActionsRequest) returns (GetGroupActionsResponse);
rpc getGroupActionSigners(GetGroupActionSignersRequest)
returns (GetGroupActionSignersResponse);

// Bi-directional stream for multiplexed platform events subscriptions
rpc SubscribePlatformEvents(PlatformSubscriptionRequest)
returns (stream PlatformSubscriptionResponse);
}

// Proof message includes cryptographic proofs for validating responses
Expand Down
9 changes: 4 additions & 5 deletions packages/rs-dapi/doc/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ Operational notes:
- Access logging: HTTP/JSON-RPC and gRPC traffic share the same access logging layer when configured, so all protocols emit uniform access entries

- Platform event streaming is handled via a direct upstream proxy:
- `subscribePlatformEvents` simply forwards every inbound command stream to a single Drive connection and relays responses back without multiplexing
- `subscribePlatformEvents` forwards each inbound subscription request to Drive and relays the resulting event stream without additional multiplexing

#### Key Features
- **Modular Organization**: Complex methods separated into dedicated modules for maintainability
Expand All @@ -283,12 +283,11 @@ Operational notes:
rs-dapi exposes `subscribePlatformEvents` as a server-streaming endpoint and currently performs a straightforward pass-through to rs-drive-abci.

- Public interface:
- Bi-directional gRPC stream: `subscribePlatformEvents(request stream PlatformEventsCommand) -> (response stream PlatformEventsResponse)`.
- Commands (`Add`, `Remove`, `Ping`) and responses (`Event`, `Ack`, `Error`) stay in their protobuf `V0` envelopes end-to-end.
- Server-streaming gRPC method: `subscribePlatformEvents(PlatformSubscriptionRequest) -> (stream PlatformSubscriptionResponse)`.
- Requests carry versioned `V0` envelopes with the client subscription id and filter; responses return the same id alongside the event payload.

- Upstream behavior:
- Each client stream obtains its own upstream Drive connection; tokio channels forward commands upstream and pipe responses back downstream without pooling.
- The `EventMux` from `rs-dash-event-bus` is retained for future multiplexing work but does not alter traffic today.
- Each client stream obtains its own upstream Drive connection; responses are forwarded downstream as they arrive without intermediate pooling or command channels.

- Observability:
- Standard `tracing` logging wraps the forwarders, and the proxy participates in the existing `/metrics` exporter via shared counters.
Expand Down
7 changes: 3 additions & 4 deletions packages/rs-dapi/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Cli {
"rs-dapi server initializing",
);

let mut server_future = run_server(config, access_logger);
let server_future = run_server(config, access_logger);
tokio::pin!(server_future);

let outcome = tokio::select! {
Expand All @@ -130,8 +130,8 @@ impl Cli {
}
};

if let Some(result) = outcome {
if let Err(e) = result {
if let Some(result) = outcome
&& let Err(e) = result {
error!("Server error: {}", e);

// Check if this is a connection-related error and set appropriate exit code
Expand Down Expand Up @@ -161,7 +161,6 @@ impl Cli {
}
}
}
}
Ok(())
}
Commands::Config => dump_config(&config),
Expand Down
37 changes: 0 additions & 37 deletions packages/rs-dapi/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,8 @@ pub enum Metric {
RequestDuration,
/// Platform events: active sessions gauge
PlatformEventsActiveSessions,
/// Platform events: commands processed, labels [op]
PlatformEventsCommands,
/// Platform events: forwarded events counter
PlatformEventsForwardedEvents,
/// Platform events: forwarded acks counter
PlatformEventsForwardedAcks,
/// Platform events: forwarded errors counter
PlatformEventsForwardedErrors,
/// Platform events: upstream streams started counter
Expand All @@ -91,11 +87,9 @@ impl Metric {
Metric::RequestCount => "rsdapi_requests_total",
Metric::RequestDuration => "rsdapi_request_duration_seconds",
Metric::PlatformEventsActiveSessions => "rsdapi_platform_events_active_sessions",
Metric::PlatformEventsCommands => "rsdapi_platform_events_commands_total",
Metric::PlatformEventsForwardedEvents => {
"rsdapi_platform_events_forwarded_events_total"
}
Metric::PlatformEventsForwardedAcks => "rsdapi_platform_events_forwarded_acks_total",
Metric::PlatformEventsForwardedErrors => {
"rsdapi_platform_events_forwarded_errors_total"
}
Expand All @@ -120,9 +114,7 @@ impl Metric {
Metric::PlatformEventsActiveSessions => {
"Current number of active Platform events sessions"
}
Metric::PlatformEventsCommands => "Platform events commands processed by operation",
Metric::PlatformEventsForwardedEvents => "Platform events forwarded to clients",
Metric::PlatformEventsForwardedAcks => "Platform acks forwarded to clients",
Metric::PlatformEventsForwardedErrors => "Platform errors forwarded to clients",
Metric::PlatformEventsUpstreamStreams => {
"Upstream subscribePlatformEvents streams started"
Expand Down Expand Up @@ -159,7 +151,6 @@ pub enum Label {
// TODO: ensure we have a limited set of endpoints, so that cardinality is controlled and we don't overload Prometheus
Endpoint,
Status,
Op,
}

impl Label {
Expand All @@ -172,7 +163,6 @@ impl Label {
Label::Protocol => "protocol",
Label::Endpoint => "endpoint",
Label::Status => "status",
Label::Op => "op",
}
}
}
Expand Down Expand Up @@ -251,15 +241,6 @@ pub static REQUEST_DURATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
.expect("create histogram vec")
});

pub static PLATFORM_EVENTS_COMMANDS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Metric::PlatformEventsCommands.name(),
Metric::PlatformEventsCommands.help(),
&[Label::Op.name()]
)
.expect("create counter vec")
});

pub static PLATFORM_EVENTS_FORWARDED_EVENTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
Metric::PlatformEventsForwardedEvents.name(),
Expand All @@ -268,14 +249,6 @@ pub static PLATFORM_EVENTS_FORWARDED_EVENTS: Lazy<IntCounter> = Lazy::new(|| {
.expect("create counter")
});

pub static PLATFORM_EVENTS_FORWARDED_ACKS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
Metric::PlatformEventsForwardedAcks.name(),
Metric::PlatformEventsForwardedAcks.help()
)
.expect("create counter")
});

pub static PLATFORM_EVENTS_FORWARDED_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
Metric::PlatformEventsForwardedErrors.name(),
Expand Down Expand Up @@ -533,21 +506,11 @@ pub fn platform_events_active_sessions_dec() {
PLATFORM_EVENTS_ACTIVE_SESSIONS.dec();
}

#[inline]
pub fn platform_events_command(op: &str) {
PLATFORM_EVENTS_COMMANDS.with_label_values(&[op]).inc();
}

#[inline]
pub fn platform_events_forwarded_event() {
PLATFORM_EVENTS_FORWARDED_EVENTS.inc();
}

#[inline]
pub fn platform_events_forwarded_ack() {
PLATFORM_EVENTS_FORWARDED_ACKS.inc();
}

#[inline]
pub fn platform_events_forwarded_error() {
PLATFORM_EVENTS_FORWARDED_ERRORS.inc();
Expand Down
15 changes: 15 additions & 0 deletions packages/rs-dapi/src/services/platform_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mod broadcast_state_transition;
mod error_mapping;
mod get_status;
mod subscribe_platform_events;
mod wait_for_state_transition_result;

use dapi_grpc::platform::v0::platform_server::Platform;
Expand Down Expand Up @@ -167,6 +168,20 @@ impl PlatformServiceImpl {
impl Platform for PlatformServiceImpl {
// Manually implemented methods

// Streaming: multiplexed platform events
type SubscribePlatformEventsStream = tokio_stream::wrappers::ReceiverStream<
Result<dapi_grpc::platform::v0::PlatformSubscriptionResponse, dapi_grpc::tonic::Status>,
>;

async fn subscribe_platform_events(
&self,
request: dapi_grpc::tonic::Request<dapi_grpc::platform::v0::PlatformSubscriptionRequest>,
) -> Result<
dapi_grpc::tonic::Response<Self::SubscribePlatformEventsStream>,
dapi_grpc::tonic::Status,
> {
self.subscribe_platform_events_impl(request).await
}
/// Get the status of the whole system
///
/// This method retrieves the current status of Drive, Tenderdash, and other components.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use crate::{DapiError, metrics};
use dapi_grpc::platform::v0::{PlatformSubscriptionRequest, PlatformSubscriptionResponse};
use dapi_grpc::tonic::{Request, Response, Status};
use futures::StreamExt;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use super::PlatformServiceImpl;

const PLATFORM_EVENTS_STREAM_BUFFER: usize = 512;

/// Tracks an active platform events session until all clones drop.
struct ActiveSessionGuard;

impl ActiveSessionGuard {
fn new() -> Arc<Self> {
metrics::platform_events_active_sessions_inc();
Arc::new(Self)
}
}

impl Drop for ActiveSessionGuard {
fn drop(&mut self) {
metrics::platform_events_active_sessions_dec();
}
}

impl PlatformServiceImpl {
/// Proxy implementation of Platform::subscribePlatformEvents.
///
/// Forwards a subscription request upstream to Drive and streams responses back to the caller.
pub async fn subscribe_platform_events_impl(
&self,
request: Request<PlatformSubscriptionRequest>,
) -> Result<Response<ReceiverStream<Result<PlatformSubscriptionResponse, Status>>>, Status>
{
let active_session = ActiveSessionGuard::new();

let mut client = self.drive_client.get_client();
let uplink_resp = client.subscribe_platform_events(request).await?;
metrics::platform_events_upstream_stream_started();
let mut uplink_resp_rx = uplink_resp.into_inner();

// Channel to forward responses back to caller (downlink)
let (downlink_resp_tx, downlink_resp_rx) = mpsc::channel::<
Result<PlatformSubscriptionResponse, Status>,
>(PLATFORM_EVENTS_STREAM_BUFFER);

// Spawn a task to forward uplink responses -> downlink
{
let session_handle = active_session;
self.workers.spawn(async move {
let _session_guard = session_handle;
while let Some(msg) = uplink_resp_rx.next().await {
match msg {
Ok(response) => {
metrics::platform_events_forwarded_event();
if downlink_resp_tx.send(Ok(response)).await.is_err() {
tracing::debug!(
"Platform events downlink response channel closed; stopping forward"
);
break;
}
}
Err(status) => {
metrics::platform_events_forwarded_error();
if downlink_resp_tx.send(Err(status)).await.is_err() {
tracing::debug!(
"Platform events downlink response channel closed while forwarding error"
);
break;
}
}
}
}
tracing::debug!("Platform events uplink response stream closed");
Err::<(), DapiError>(DapiError::ConnectionClosed)
});
}

Ok(Response::new(ReceiverStream::new(downlink_resp_rx)))
}
}
Loading
Loading