From 9bb39f1a7fd91ce54aadbefc88499860e5a68dcb Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 30 Jan 2025 14:28:19 +0100 Subject: [PATCH] chore: shrink cache queues (#14105) --- crates/rpc/rpc-eth-types/src/cache/mod.rs | 18 ++++++++++++++++-- .../rpc-eth-types/src/cache/multi_consumer.rs | 6 ++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index b1cf57aec7d7..a20ec09231bc 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -16,7 +16,7 @@ use std::{ future::Future, pin::Pin, sync::Arc, - task::{ready, Context, Poll}, + task::{Context, Poll}, }; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedSender}, @@ -324,6 +324,14 @@ where } } + /// Shrinks the queues but leaves some space for the next requests + fn shrink_queues(&mut self) { + let min_capacity = 2; + self.full_block_cache.shrink_to(min_capacity); + self.receipts_cache.shrink_to(min_capacity); + self.headers_cache.shrink_to(min_capacity); + } + fn update_cached_metrics(&self) { self.full_block_cache.update_cached_metrics(); self.receipts_cache.update_cached_metrics(); @@ -342,7 +350,13 @@ where let this = self.get_mut(); loop { - match ready!(this.action_rx.poll_next_unpin(cx)) { + let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else { + // shrink queues if we don't have any work to do + this.shrink_queues(); + return Poll::Pending; + }; + + match action { None => { unreachable!("can't close") } diff --git a/crates/rpc/rpc-eth-types/src/cache/multi_consumer.rs b/crates/rpc/rpc-eth-types/src/cache/multi_consumer.rs index d4ab4e07c265..2f5a46125a13 100644 --- a/crates/rpc/rpc-eth-types/src/cache/multi_consumer.rs +++ b/crates/rpc/rpc-eth-types/src/cache/multi_consumer.rs @@ -107,6 +107,12 @@ where } } + /// Shrinks the capacity of the queue with a lower limit. + #[inline] + pub fn shrink_to(&mut self, min_capacity: usize) { + self.queued.shrink_to(min_capacity); + } + /// Update metrics for the inner cache. #[inline] pub fn update_cached_metrics(&self) {