Skip to content

Commit 3f0d544

Browse files
committed
Add finalized tip for streaming to validator
1 parent fecc84f commit 3f0d544

5 files changed

Lines changed: 73 additions & 16 deletions

File tree

bin/validator/src/server/validator_service/block_subscription.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl grpc::server::validator_api::BlockSubscription for ValidatorService {
3939

4040
let from = BlockNumber::from(request.block_from);
4141
let source = BlockStoreSource { block_store: self.block_store.clone() };
42-
let stream = run_stream(from, self.committed_tip.subscribe(), source)
42+
let stream = run_stream(from, self.finalized_tip.subscribe(), source)
4343
.map(|event| event.map_err(subscription_error_to_status));
4444

4545
Ok(Box::pin(stream))

bin/validator/src/server/validator_service/mod.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,14 @@ pub(crate) struct ValidatorService {
6868
/// Serializes `sign_block` requests so that concurrent calls are processed sequentially,
6969
/// ensuring consistent chain tip reads and preventing race conditions.
7070
sign_block_semaphore: Semaphore,
71-
/// In-memory chain tip, updated after each signed block. Block subscriptions follow this to
72-
/// stream live blocks as they are signed.
73-
committed_tip: watch::Sender<BlockNumber>,
71+
/// In-memory signed chain tip, updated after each signed block. The tip is provisional: it can
72+
/// be replaced by a different block at the same height until a child block is signed on top of
73+
/// it (see [`Self::finalized_tip`]).
74+
signed_tip: watch::Sender<BlockNumber>,
75+
/// In-memory finalized chain tip: the highest block that can no longer be replaced (the parent
76+
/// of [`Self::signed_tip`]). Block subscriptions follow this so that only finalized blocks are
77+
/// streamed downstream; the provisional tip is withheld until a child is signed on top of it.
78+
finalized_tip: watch::Sender<BlockNumber>,
7479
/// In-memory count of validated transactions, incremented after each new insert.
7580
validated_transactions_count: AtomicU64,
7681
/// In-memory count of signed blocks, incremented after each signed block.
@@ -102,12 +107,14 @@ impl ValidatorService {
102107
});
103108
}
104109

110+
let signed_tip = BlockNumber::from(initial_chain_tip);
105111
Ok(Self {
106112
signer,
107113
db: db.into(),
108114
block_store,
109115
sign_block_semaphore: Semaphore::new(1),
110-
committed_tip: watch::Sender::new(BlockNumber::from(initial_chain_tip)),
116+
signed_tip: watch::Sender::new(signed_tip),
117+
finalized_tip: watch::Sender::new(finalized_tip(signed_tip)),
111118
validated_transactions_count: AtomicU64::new(initial_tx_count),
112119
signed_blocks_count: AtomicU64::new(initial_block_count),
113120
})
@@ -216,3 +223,13 @@ impl ValidatorService {
216223
.map_err(|err| ValidatorError::BlockSigningFailed(err.to_string()))
217224
}
218225
}
226+
227+
/// Returns the finalized chain tip for a given signed chain tip.
228+
///
229+
/// The signed tip is provisional: it can be replaced by a different block at the same height until a
230+
/// child block is signed on top of it. The finalized tip is therefore the parent of the signed tip,
231+
/// since that parent can no longer be replaced. Genesis has no parent and is itself always final
232+
/// (it cannot be replaced), so it maps to itself.
233+
pub(super) fn finalized_tip(signed_tip: BlockNumber) -> BlockNumber {
234+
signed_tip.parent().unwrap_or(signed_tip)
235+
}

bin/validator/src/server/validator_service/sign_block.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use miden_protocol::block::{BlockNumber, ProposedBlock};
77
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::Signature;
88
use miden_tx::utils::serde::{Deserializable, Serializable};
99

10-
use super::ValidatorService;
10+
use super::{ValidatorService, finalized_tip};
1111
use crate::db::{load_chain_tip, upsert_block_header};
1212

1313
#[tonic::async_trait]
@@ -73,10 +73,14 @@ impl grpc::server::validator_api::SignBlock for ValidatorService {
7373
))
7474
})?;
7575

76-
// Update the in-memory counters after successful persistence. The block has already been
77-
// backed up to the block store by `validate_block`, so it is available to subscribers by
78-
// the time they observe this new tip.
79-
self.committed_tip.send_replace(BlockNumber::from(new_block_num));
76+
// Update the in-memory tips after successful persistence. The newly signed block becomes
77+
// the provisional tip, and its parent becomes finalized: the parent can no longer be
78+
// replaced, so it is now safe to stream downstream. On a replacement (same-height) block
79+
// the finalized tip is unchanged, so subscribers never observe a block that was later
80+
// replaced.
81+
let signed_tip = BlockNumber::from(new_block_num);
82+
self.signed_tip.send_replace(signed_tip);
83+
self.finalized_tip.send_replace(finalized_tip(signed_tip));
8084
self.signed_blocks_count.fetch_add(1, Ordering::Relaxed);
8185

8286
Ok((signature, block_commitment))

bin/validator/src/server/validator_service/status.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ impl grpc::server::validator_api::Status for ValidatorService {
1616
Ok(grpc::validator::ValidatorStatus {
1717
version: env!("CARGO_PKG_VERSION").to_string(),
1818
status: "OK".to_string(),
19-
chain_tip: self.committed_tip.borrow().as_u32(),
19+
chain_tip: self.signed_tip.borrow().as_u32(),
2020
validated_transactions_count: self.validated_transactions_count.load(Ordering::Relaxed),
2121
signed_blocks_count: self.signed_blocks_count.load(Ordering::Relaxed),
2222
})

bin/validator/src/server/validator_service/tests.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -499,8 +499,8 @@ async fn validate_block_number_mismatch() {
499499
);
500500
}
501501

502-
/// A block subscription replays the backed-up blocks from the requested height and then streams
503-
/// newly signed blocks as they arrive.
502+
/// A block subscription replays the finalized backed-up blocks from the requested height and then
503+
/// streams blocks live as they are finalized (i.e. once a child block is signed on top of them).
504504
#[tokio::test]
505505
async fn block_subscription_replays_then_follows() {
506506
use std::time::Duration;
@@ -511,11 +511,14 @@ async fn block_subscription_replays_then_follows() {
511511

512512
let mut tv = TestValidator::new().await;
513513

514-
// Sign blocks 1 and 2 so the validator backs them up to its block store.
514+
// Sign blocks 1, 2 and 3. The signed tip is 3, so blocks 1 and 2 are finalized while block 3
515+
// remains the provisional, replaceable tip and is withheld from subscribers.
516+
tv.apply_empty_block().await;
515517
tv.apply_empty_block().await;
516518
tv.apply_empty_block().await;
517519

518-
// Subscribe from the first signed block and confirm the backed-up blocks are replayed in order.
520+
// Subscribe from the first signed block and confirm only the finalized blocks (1 and 2) are
521+
// replayed; the provisional tip (block 3) is withheld.
519522
let mut stream = tv.call_block_subscription(1).await;
520523
for expected in 1..=2 {
521524
let response = tokio::time::timeout(Duration::from_secs(5), stream.next())
@@ -528,7 +531,7 @@ async fn block_subscription_replays_then_follows() {
528531
assert_eq!(response.committed_chain_tip, 2);
529532
}
530533

531-
// Sign a new block and confirm it is streamed live to the existing subscriber.
534+
// Sign a new block (4), finalizing block 3, and confirm block 3 is now streamed live.
532535
tv.apply_empty_block().await;
533536
let response = tokio::time::timeout(Duration::from_secs(5), stream.next())
534537
.await
@@ -539,3 +542,36 @@ async fn block_subscription_replays_then_follows() {
539542
assert_eq!(block.header().block_num().as_u32(), 3);
540543
assert_eq!(response.committed_chain_tip, 3);
541544
}
545+
546+
/// The provisional chain tip (the most recently signed block) must be withheld from subscribers
547+
/// until a child block is signed on top of it, since it can still be replaced.
548+
#[tokio::test]
549+
async fn provisional_tip_is_withheld_from_subscribers() {
550+
use std::time::Duration;
551+
552+
use miden_protocol::block::SignedBlock;
553+
use miden_tx::utils::serde::Deserializable;
554+
use tokio_stream::StreamExt;
555+
556+
let mut tv = TestValidator::new().await;
557+
558+
// Sign blocks 1 and 2. Block 1 is finalized (it has a child); block 2 is the provisional tip.
559+
tv.apply_empty_block().await;
560+
tv.apply_empty_block().await;
561+
562+
let mut stream = tv.call_block_subscription(1).await;
563+
564+
// Block 1 is finalized and streamed.
565+
let response = tokio::time::timeout(Duration::from_secs(5), stream.next())
566+
.await
567+
.expect("finalized block should arrive promptly")
568+
.expect("stream should not end")
569+
.expect("stream item should not be an error");
570+
let block = SignedBlock::read_from_bytes(&response.block).expect("valid signed block");
571+
assert_eq!(block.header().block_num().as_u32(), 1);
572+
assert_eq!(response.committed_chain_tip, 1);
573+
574+
// Block 2 is the provisional tip and must not be streamed while it remains replaceable.
575+
let next = tokio::time::timeout(Duration::from_millis(500), stream.next()).await;
576+
assert!(next.is_err(), "provisional tip (block 2) should be withheld from subscribers");
577+
}

0 commit comments

Comments
 (0)