diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index a2874ce400..e7c59cd173 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -529,6 +529,7 @@ impl StarknetVersion { // Poseidon hash, even when `class_commitment` is zero. pub const V_0_14_0: Self = Self::new(0, 14, 0, 0); pub const V_0_14_1: Self = Self::new(0, 14, 1, 0); + pub const V_0_14_3: Self = Self::new(0, 14, 3, 0); } impl FromStr for StarknetVersion { diff --git a/crates/gateway-client/src/lib.rs b/crates/gateway-client/src/lib.rs index 2eabe78151..85ea1bb828 100644 --- a/crates/gateway-client/src/lib.rs +++ b/crates/gateway-client/src/lib.rs @@ -61,6 +61,8 @@ pub trait GatewayApi: Sync { async fn preconfirmed_block( &self, block: BlockId, + round: u64, + transaction_count: u64, ) -> Result { unimplemented!(); } @@ -161,8 +163,12 @@ impl GatewayApi for Arc { async fn preconfirmed_block( &self, block: BlockId, + round: u64, + transaction_count: u64, ) -> Result { - self.as_ref().preconfirmed_block(block).await + self.as_ref() + .preconfirmed_block(block, round, transaction_count) + .await } async fn block_header( @@ -512,6 +518,8 @@ impl GatewayApi for Client { async fn preconfirmed_block( &self, block: BlockId, + round: u64, + transaction_count: u64, ) -> Result { // Note that we don't do retries here. // The pre-confirmed block is polled continuously by the sync logic, @@ -520,6 +528,8 @@ impl GatewayApi for Client { .feeder_gateway_request() .get_preconfirmed_block() .block(block) + .param("round", &round.to_string()) + .param("transactionCount", &transaction_count.to_string()) .retry(false) .get() .await?; diff --git a/crates/gateway-types/src/reply.rs b/crates/gateway-types/src/reply.rs index 48998b6cef..58bc4248eb 100644 --- a/crates/gateway-types/src/reply.rs +++ b/crates/gateway-types/src/reply.rs @@ -117,6 +117,14 @@ pub struct PreConfirmedBlock { >, pub transaction_state_diffs: Vec>, + + /// Current consensus round at this height (TODO: Pending spec confirmation) + /// + /// From Starknet 0.14.3 onward, the pre-confirmed block endpoint returns + /// transactions added after the caller-supplied `transaction_count`. + /// Will default to `None` for earlier versions. + #[serde(default)] + pub round: Option, } #[derive(Copy, Clone, Debug, Default, Deserialize, PartialEq, Eq, serde::Serialize)] @@ -2565,5 +2573,45 @@ mod tests { let _pre_confirmed_block: PreConfirmedBlock = serde_json::from_str(json).unwrap(); } + + mod round_field { + use super::super::super::PreConfirmedBlock; + + fn pre_confirmed_block_json(round: Option) -> serde_json::Value { + let mut body = serde_json::json!({ + "l1_gas_price": {"price_in_wei": "0x0", "price_in_fri": "0x0"}, + "l1_data_gas_price":{"price_in_wei": "0x0", "price_in_fri": "0x0"}, + "l2_gas_price": {"price_in_wei": "0x0", "price_in_fri": "0x0"}, + "sequencer_address": "0x0", + "status": "PRE_CONFIRMED", + "timestamp": 0, + "starknet_version": "0.14.3", + "l1_da_mode": "BLOB", + "transactions": [], + "transaction_receipts": [], + "transaction_state_diffs": [], + }); + if let Some(r) = round { + body.as_object_mut() + .unwrap() + .insert("round".into(), r.into()); + } + body + } + + #[test] + fn round_present_parses_to_some() { + let body = pre_confirmed_block_json(Some(5)); + let block: PreConfirmedBlock = serde_json::from_value(body).unwrap(); + assert_eq!(block.round, Some(5)); + } + + #[test] + fn round_absent_parses_to_none() { + let body = pre_confirmed_block_json(None); + let block: PreConfirmedBlock = serde_json::from_value(body).unwrap(); + assert_eq!(block.round, None); + } + } } } diff --git a/crates/pathfinder/src/state/sync/pending.rs b/crates/pathfinder/src/state/sync/pending.rs index f441c4ff63..49a56b5663 100644 --- a/crates/pathfinder/src/state/sync/pending.rs +++ b/crates/pathfinder/src/state/sync/pending.rs @@ -1,57 +1,157 @@ use anyhow::Context; -use pathfinder_common::{BlockHash, BlockNumber}; +use pathfinder_common::{BlockHash, BlockNumber, StarknetVersion}; use starknet_gateway_client::GatewayApi; +use starknet_gateway_types::reply::PreConfirmedBlock; use tokio::sync::watch; use tokio::time::Instant; use crate::state::sync::SyncEvent; -/// Emits new pending data events while the current block is close to the latest -/// block. -#[allow(clippy::too_many_arguments)] -pub async fn poll_pre_confirmed( - tx_event: tokio::sync::mpsc::Sender, - sequencer: S, - poll_interval: std::time::Duration, - latest: watch::Receiver<(BlockNumber, BlockHash)>, - current: watch::Receiver<(BlockNumber, BlockHash)>, -) { - const IN_SYNC_THRESHOLD: u64 = 6; +/// Maximum gap, in blocks, between `current` and `latest` for pre-confirmed +/// polling. Beyond this we skip as the node is still catching up. +const IN_SYNC_THRESHOLD: u64 = 6; + +#[derive(Debug)] +struct State { + /// Height we're currently polling. + block_number: BlockNumber, + /// `None` until we've seen a 0.14.3+ response for this height. Once + /// set, we track the round we're currently following and treat + /// any response with a higher round as a full reset. + round: Option, + /// Running merged view of the preconfirmed block at `block_number`. + /// `None` until we've received our first response for this height. + accumulated: Option, + /// Whether the pre-latest block was present at the last poll. + pre_latest_data_present: bool, +} - #[derive(Debug, Default)] - struct State { - block_number: BlockNumber, - tx_count: usize, - pre_latest_data_present: bool, +impl Default for State { + fn default() -> Self { + Self { + block_number: BlockNumber::GENESIS, + round: None, + accumulated: None, + pre_latest_data_present: false, + } } +} - impl State { - /// Returns `true` if the state was updated, `false` otherwise. - fn update(&mut self, new_state: Self) -> bool { - use std::cmp::Ordering; - - let should_update = match new_state.block_number.get().cmp(&self.block_number.get()) { - Ordering::Less => false, // Stale pre-confirmed data (older block). - Ordering::Greater => true, // New pre-confirmed block. - Ordering::Equal => match new_state.tx_count.cmp(&self.tx_count) { - Ordering::Less => false, // Stale pre-confirmed data (fewer txs). - Ordering::Greater => true, // New transactions available. - Ordering::Equal => { - // Check if pre-latest data got cleared (because it has been finalized), - // which is a valid update if both block number and transaction count are - // same. - self.pre_latest_data_present && !new_state.pre_latest_data_present - } - }, +impl State { + fn tx_count(&self) -> u64 { + self.accumulated + .as_ref() + .map(|b| b.transactions.len() as u64) + .unwrap_or(0) + } + + /// Apply a fresh poll response, given the pre-latest presence + /// observed for this poll. Returns `true` if `accumulated` was + /// updated and the caller should emit the new view. + fn apply(&mut self, response: PreConfirmedBlock, new_pre_latest: bool) -> bool { + use std::cmp::Ordering; + + let prev_pre_latest = self.pre_latest_data_present; + self.pre_latest_data_present = new_pre_latest; + + // Starknet 0.14.3 introduced the incremental API: responses carry a + // `round` field and contain only the transactions added since the + // count we sent. + let is_delta_capable = response.starknet_version >= StarknetVersion::V_0_14_3; + + if !is_delta_capable { + // Pre-0.14.3: response is always the full current view, but the + // gateway may re-serve the same view across polls. Suppress + // emissions that carry no new information. + let prev_tx_count = self.tx_count(); + let new_tx_count = response.transactions.len() as u64; + + let should_update = match new_tx_count.cmp(&prev_tx_count) { + Ordering::Greater => true, + Ordering::Less => false, // stale: same height, fewer txs + Ordering::Equal => prev_pre_latest && !new_pre_latest, }; if should_update { - *self = new_state; + self.round = None; + self.accumulated = Some(response); } - should_update + return should_update; + } + + match (self.round, response.round) { + (_, None) => { + // Server claims ≥ 0.14.3 but omitted `round`. Treat as a + // malformed payload: full-replace, but don't advance our + // tracked round so we keep retrying without committing to + // a phantom round number. + tracing::warn!( + "pre-confirmed response claims version {} but omits round; falling back to \ + full replace", + response.starknet_version + ); + self.round = None; + self.accumulated = Some(response); + true + } + (None, Some(r)) => { + // First 0.14.3 response for this height. Adopt verbatim. + self.round = Some(r); + self.accumulated = Some(response); + true + } + (Some(ours), Some(r)) => match r.cmp(&ours) { + Ordering::Greater => { + // Round bumped: the sequencer abandoned the prior + // proposal at this height, so any txs/header we'd + // accumulated are no longer valid. + self.round = Some(r); + self.accumulated = Some(response); + true + } + Ordering::Equal => { + if response.transactions.is_empty() { + false + } else { + // Delta at the same round — append. + let acc = self + .accumulated + .as_mut() + .expect("accumulated block present whenever round is Some"); + acc.transactions.extend(response.transactions); + acc.transaction_receipts + .extend(response.transaction_receipts); + acc.transaction_state_diffs + .extend(response.transaction_state_diffs); + true + } + } + Ordering::Less => { + // Server is on an older round than we are; this should + // never happen in normal operation. Hold our state + // and wait for a fresh response. + tracing::warn!( + our_round = ours, + server_round = r, + "pre-confirmed response carries an older round than ours; ignoring" + ); + false + } + }, } } +} +/// Emits new pending data events while the current block is close to the latest +/// block. +#[allow(clippy::too_many_arguments)] +pub async fn poll_pre_confirmed( + tx_event: tokio::sync::mpsc::Sender, + sequencer: S, + poll_interval: std::time::Duration, + latest: watch::Receiver<(BlockNumber, BlockHash)>, + current: watch::Receiver<(BlockNumber, BlockHash)>, +) { let mut state = State::default(); loop { @@ -69,6 +169,9 @@ pub async fn poll_pre_confirmed( continue; } + // Fetch the pre-latest block. + // Its presence determines the pre-confirmed block number we poll below, + // and it is later forwarded downstream in the emitted event. let pre_latest_data = match fetch_pre_latest(&sequencer, latest_number, latest_hash).await { Ok(r) => r.map(Box::new), Err(e) => { @@ -85,8 +188,34 @@ pub async fn poll_pre_confirmed( } else { latest_number + 1 }; - let pre_confirmed_block = match sequencer - .preconfirmed_block(pre_confirmed_block_number.into()) + + // A transient gateway inconsistency (e.g. the pre-latest block briefly + // disappears) can cause this poll's pre-confirmed block number to drop + // below the height we're already tracking. Just skip the poll, the state + // we've accumulated for the higher height is still valid for later. + if pre_confirmed_block_number < state.block_number { + tracing::debug!( + pre_confirmed_block_number = %pre_confirmed_block_number, + current = %state.block_number, + "Pre-confirmed block number stepped backwards; skipping poll" + ); + tokio::time::sleep_until(t_fetch + poll_interval).await; + continue; + } + + // New height. Invalidate any state from prior height. + if pre_confirmed_block_number > state.block_number { + state = State { + block_number: pre_confirmed_block_number, + ..State::default() + }; + } + + let req_round = state.round.unwrap_or(0); + let req_count = state.tx_count(); + + let response = match sequencer + .preconfirmed_block(pre_confirmed_block_number.into(), req_round, req_count) .await { Ok(r) => r, @@ -97,17 +226,16 @@ pub async fn poll_pre_confirmed( } }; - let new_state = State { - block_number: pre_confirmed_block_number, - tx_count: pre_confirmed_block.transactions.len(), - pre_latest_data_present: pre_latest_data.is_some(), - }; - if state.update(new_state) { + if state.apply(response, pre_latest_data.is_some()) { + let accumulated = state + .accumulated + .as_ref() + .expect("accumulated block present after a successful update"); tracing::trace!("Emitting a pre-confirmed update"); if let Err(e) = tx_event .send(SyncEvent::PreConfirmed { number: pre_confirmed_block_number, - block: pre_confirmed_block.into(), + block: accumulated.clone().into(), pre_latest_data, }) .await @@ -342,6 +470,7 @@ mod tests { }), None, ], + round: None, }); /// Arbitrary timeout for receiving emits on the tokio channel. Otherwise @@ -358,7 +487,7 @@ mod tests { .returning(|| Ok((PRE_LATEST_BLOCK.clone(), PENDING_UPDATE.clone()))); sequencer .expect_preconfirmed_block() - .returning(move |_| Ok(PRE_CONFIRMED_BLOCK.clone())); + .returning(move |_, _, _| Ok(PRE_CONFIRMED_BLOCK.clone())); let latest_hash = PRE_LATEST_BLOCK.parent_hash; let latest_block_number = BlockNumber::new_or_panic(1); @@ -432,18 +561,20 @@ mod tests { sequencer .expect_pending_block() .returning(move || Ok((PRE_LATEST_BLOCK.clone(), PENDING_UPDATE.clone()))); - sequencer.expect_preconfirmed_block().returning(move |_| { - let mut count = COUNT.lock().unwrap(); - *count += 1; + sequencer + .expect_preconfirmed_block() + .returning(move |_, _, _| { + let mut count = COUNT.lock().unwrap(); + *count += 1; - let block = match *count { - 1 => b0_copy.clone(), - 2 => PRE_CONFIRMED_BLOCK.clone(), - _ => b1_copy.clone(), - }; + let block = match *count { + 1 => b0_copy.clone(), + 2 => PRE_CONFIRMED_BLOCK.clone(), + _ => b1_copy.clone(), + }; - Ok(block) - }); + Ok(block) + }); let sequencer = Arc::new(sequencer); let latest_hash = PRE_LATEST_BLOCK.parent_hash; @@ -570,28 +701,30 @@ mod tests { sequencer .expect_pending_block() .returning(move || Ok((PRE_LATEST_BLOCK.clone(), PENDING_UPDATE.clone()))); - sequencer.expect_preconfirmed_block().returning(move |_| { - let mut count = COUNT.lock().unwrap(); - let block = match *count { - 0 => { - *count += 1; - // Polling task has default state at the start, so this should produce an - // event. - PRE_CONFIRMED_BLOCK.clone() - } - 1 => { - *count += 1; - // Same transaction count as before, should be ignored. - PRE_CONFIRMED_BLOCK.clone() - } - _ => { - // Lower transaction count than before, should be ignored. - stale_pre_confirmed.clone() - } - }; + sequencer + .expect_preconfirmed_block() + .returning(move |_, _, _| { + let mut count = COUNT.lock().unwrap(); + let block = match *count { + 0 => { + *count += 1; + // Polling task has default state at the start, so this should produce an + // event. + PRE_CONFIRMED_BLOCK.clone() + } + 1 => { + *count += 1; + // Same transaction count as before, should be ignored. + PRE_CONFIRMED_BLOCK.clone() + } + _ => { + // Lower transaction count than before, should be ignored. + stale_pre_confirmed.clone() + } + }; - Ok(block) - }); + Ok(block) + }); let latest_block_number = BlockNumber::new_or_panic(10); @@ -678,7 +811,7 @@ mod tests { }); sequencer .expect_preconfirmed_block() - .returning(move |_| Ok(PRE_CONFIRMED_BLOCK.clone())); + .returning(move |_, _, _| Ok(PRE_CONFIRMED_BLOCK.clone())); let latest_block_number = BlockNumber::new_or_panic(10); diff --git a/crates/rpc/src/method/trace_block_transactions.rs b/crates/rpc/src/method/trace_block_transactions.rs index de4dc3d4ac..bb1e4eba6b 100644 --- a/crates/rpc/src/method/trace_block_transactions.rs +++ b/crates/rpc/src/method/trace_block_transactions.rs @@ -1278,6 +1278,7 @@ pub(crate) mod tests { starknet_version: last_block_header.starknet_version, l1_da_mode: L1DataAvailabilityMode::Blob, transaction_state_diffs: vec![], + ..Default::default() }; tx.commit()?; @@ -1408,6 +1409,7 @@ pub(crate) mod tests { starknet_version: last_block_header.starknet_version, l1_da_mode: L1DataAvailabilityMode::Blob, transaction_state_diffs: vec![], + ..Default::default() }; tx.commit()?;