diff --git a/ethexe/rpc/src/apis/injected/relay.rs b/ethexe/rpc/src/apis/injected/relay.rs index 4469d334ab3..6f667c1e45f 100644 --- a/ethexe/rpc/src/apis/injected/relay.rs +++ b/ethexe/rpc/src/apis/injected/relay.rs @@ -99,12 +99,12 @@ mod utils { tx: &mut AddressedInjectedTransaction, ) -> RpcResult<()> { let now = now_since_unix_epoch().map_err(|err| { - tracing::error!("system clock error: {err}"); + error!("system clock error: {err}"); crate::errors::internal() })?; let next_producer = calculate_next_producer(db, now).map_err(|err| { - tracing::error!("calculate next producer error: {err}"); + error!("calculate next producer error: {err}"); crate::errors::internal() })?; tx.recipient = next_producer; diff --git a/ethexe/rpc/src/apis/injected/server.rs b/ethexe/rpc/src/apis/injected/server.rs index ee27cb0121f..8dfb5fd6470 100644 --- a/ethexe/rpc/src/apis/injected/server.rs +++ b/ethexe/rpc/src/apis/injected/server.rs @@ -107,8 +107,6 @@ impl InjectedApi { &self, transaction: AddressedInjectedTransaction, ) -> RpcResult { - self.metrics.send_injected_tx_calls.increment(1); - self.relayer.relay(transaction).await } @@ -118,8 +116,6 @@ impl InjectedApi { pending: PendingSubscriptionSink, transaction: AddressedInjectedTransaction, ) -> SubscriptionResult { - self.metrics.send_and_watch_injected_tx_calls.increment(1); - let tx_hash = transaction.tx.data().to_hash(); let pending_subscriber = match self.manager.try_register_subscriber(tx_hash) { @@ -144,9 +140,11 @@ impl InjectedApi { } }; - let manager = self.manager.clone(); + self.metrics.injected_tx_active_subscriptions.increment(1); + let (manager, metrics) = (self.manager.clone(), self.metrics.clone()); spawner::spawn_pending_subscriber(sink, pending_subscriber, move |tx_hash| { manager.cancel_registration(tx_hash); + metrics.injected_tx_active_subscriptions.decrement(1); }); Ok(()) } diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 38c50678a70..60375278857 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -66,8 +66,9 @@ use futures::{Stream, stream::FusedStream}; use hyper::header::HeaderValue; use jsonrpsee::{ RpcModule as JsonrpcModule, - server::{PingConfig, Server, ServerHandle}, + server::{PingConfig, RpcServiceBuilder, Server, ServerHandle}, }; +use metrics::RpcMetricsLayer; use std::{ net::SocketAddr, pin::Pin, @@ -129,14 +130,8 @@ impl RpcServer { let cors_layer = self.cors_layer()?; let http_middleware = tower::ServiceBuilder::new().layer(cors_layer); - - let server = Server::builder() - .set_http_middleware(http_middleware) - // Setup WebSocket pings to detect dead connections. - // Now it is set to default: ping_interval = 30s, inactive_limit = 40s - .enable_ws_ping(PingConfig::default()) - .build(self.config.listen_addr) - .await?; + // Setup the default RPC metrics layer. + let rpc_middleware = RpcServiceBuilder::new().layer(RpcMetricsLayer); let processor = Processor::with_config( ProcessorConfig { @@ -158,9 +153,17 @@ impl RpcServer { }; let injected_api = server_apis.injected.clone(); - let handle = server.start(server_apis.into_methods()); + let server_handle = Server::builder() + .set_http_middleware(http_middleware) + .set_rpc_middleware(rpc_middleware) + // Setup WebSocket pings to detect dead connections. + // Now it is set to default: ping_interval = 30s, inactive_limit = 40s + .enable_ws_ping(PingConfig::default()) + .build(self.config.listen_addr) + .await? + .start(server_apis.into_module()); - Ok((handle, RpcService::new(rpc_receiver, injected_api))) + Ok((server_handle, RpcService::new(rpc_receiver, injected_api))) } fn cors_layer(&self) -> Result { @@ -224,7 +227,7 @@ struct RpcServerApis { } impl RpcServerApis { - pub fn into_methods(self) -> jsonrpsee::server::RpcModule<()> { + pub fn into_module(self) -> jsonrpsee::server::RpcModule<()> { let mut module = JsonrpcModule::new(()); module diff --git a/ethexe/rpc/src/metrics.rs b/ethexe/rpc/src/metrics.rs index 4c05d353e6c..0050ad06c0b 100644 --- a/ethexe/rpc/src/metrics.rs +++ b/ethexe/rpc/src/metrics.rs @@ -1,6 +1,6 @@ // This file is part of Gear. // -// Copyright (C) 2025 Gear Technologies Inc. +// Copyright (C) 2025-2026 Gear Technologies Inc. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // // This program is free software: you can redistribute it and/or modify @@ -18,20 +18,133 @@ //! Metrics for the RPC server. -use metrics::{Counter, Gauge}; +use futures::future::BoxFuture; +use jsonrpsee::{ + server::{MethodResponse, middleware::rpc::RpcServiceT}, + types::Request, +}; +use metrics::{Counter, Gauge, Histogram}; +use std::{collections::HashMap, sync::LazyLock, time::Instant}; +use tower::Layer; -// TODO kuzmindev: add metrics for all RPC apis, e.g number of calls, latency, errors, etc. +/// Default methods names tracked by [super::RpcMetricsLayer]. +pub const TRACKED_METHODS: &[&str] = &[ + "injected_sendTransaction", + "injected_sendTransactionAndWatch", + "program_calculateReplyForHandle", +]; -/// Metrics for the Injected RPC API. +static METHODS_MAP: LazyLock> = LazyLock::new(|| { + TRACKED_METHODS + .iter() + .copied() + .map(|method_name| { + ( + method_name, + MethodMetrics::new_with_labels(&[("method", method_name)]), + ) + }) + .collect() +}); + +/// Unified bundle of metrics for RPC method. +/// [metrics_derive::Metrics] macro will register all metrics under the `ethexe_rpc_*` scope. +/// +/// ## Must use +/// This object must be created using [MethodMetrics::new_with_labels] method. +/// This method will construct all metrics with provided unique label. +#[derive(Clone, metrics_derive::Metrics)] +#[metrics(scope = "ethexe_rpc")] +pub struct MethodMetrics { + #[metric( + rename = "calls_started_total", + describe = "Number of started RPC calls for the method" + )] + pub calls_started: Counter, + #[metric( + rename = "calls_finished_total", + labels = [("status", "ok")], + describe = "Number of successfully finished RPC calls for the method" + )] + pub calls_finished_ok: Counter, + #[metric( + rename = "calls_finished_total", + labels = [("status", "error")], + describe = "Number of failed RPC calls for the method" + )] + pub calls_finished_err: Counter, + #[metric( + rename = "call_duration_seconds", + describe = "Latency of RPC calls for the method in seconds" + )] + pub calls_latency_seconds: Histogram, + #[metric( + rename = "calls_in_flight", + describe = "Number of in-flight RPC calls for the method" + )] + pub calls_in_flight: Gauge, +} + +/// The metrics for internal state of [crate::apis::InjectedApi]. #[derive(Clone, metrics_derive::Metrics)] #[metrics(scope = "ethexe_rpc_injected_api")] pub struct InjectedApiMetrics { - /// The number of calls to `injected_sendTransaction`. - pub send_injected_tx_calls: Counter, - /// The number of calls to `injected_subscribeTransactionPromise`. - pub send_and_watch_injected_tx_calls: Counter, - /// The number of active injected transaction promises subscriptions. + #[metric( + rename = "active_promise_subscriptions", + describe = "Number of active subscriptions for injected transaction's promise" + )] pub injected_tx_active_subscriptions: Gauge, - /// The total number of injected transaction promises given to subscribers. - pub injected_tx_promises_given: Counter, +} + +/// Metrics layer for [jsonrpsee::server::RpcServiceBuilder]. +/// Uses [RpcMetricsService] to wrap each request to metrics collection logic. +#[derive(Clone, Default)] +pub struct RpcMetricsLayer; + +impl Layer for RpcMetricsLayer { + type Service = RpcMetricsService; + + fn layer(&self, service: S) -> Self::Service { + RpcMetricsService { service } + } +} + +#[derive(Clone)] +pub struct RpcMetricsService { + service: S, +} + +impl<'a, S> RpcServiceT<'a> for RpcMetricsService +where + S: RpcServiceT<'a> + Send + Sync, + S::Future: Send + 'a, +{ + type Future = BoxFuture<'a, MethodResponse>; + + fn call(&self, request: Request<'a>) -> Self::Future { + let Some(metrics) = METHODS_MAP.get(request.method_name()) else { + return Box::pin(self.service.call(request)); + }; + + let future = self.service.call(request); + Box::pin(async move { + metrics.calls_started.increment(1); + metrics.calls_in_flight.increment(1); + let _metrics_guard = scopeguard::guard((), |_| metrics.calls_in_flight.decrement(1)); + + let started_at = Instant::now(); + + let response = future.await; + + metrics + .calls_latency_seconds + .record(started_at.elapsed().as_secs_f64()); + match response.is_success() { + true => metrics.calls_finished_ok.increment(1), + false => metrics.calls_finished_err.increment(1), + } + + response + }) + } }