Skip to content
17 changes: 10 additions & 7 deletions linera-core/src/chain_worker/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use linera_storage::Storage;
use tokio::sync::{OwnedRwLockReadGuard, RwLock};

use super::{config::ChainWorkerConfig, state::ChainWorkerState};
use crate::worker::WorkerError;

/// A write guard that automatically rolls back uncommitted chain state changes on drop.
///
Expand Down Expand Up @@ -150,10 +151,11 @@ pub(crate) fn create_chain_worker<S: Storage + Clone + 'static>(
/// Acquires a read lock, updating the last-access timestamp.
pub(crate) async fn read_lock<S: Storage + Clone + 'static>(
state: &Arc<RwLock<ChainWorkerState<S>>>,
) -> OwnedRwLockReadGuard<ChainWorkerState<S>> {
) -> Result<OwnedRwLockReadGuard<ChainWorkerState<S>>, WorkerError> {
let guard = state.clone().read_owned().await;
guard.check_not_poisoned()?;
guard.touch();
guard
Ok(guard)
}

/// Acquires a read lock, initializing the chain if needed.
Expand All @@ -165,16 +167,16 @@ pub(crate) async fn read_lock_initialized<S: Storage + Clone + 'static>(
state: &Arc<RwLock<ChainWorkerState<S>>>,
) -> Result<OwnedRwLockReadGuard<ChainWorkerState<S>>, crate::worker::WorkerError> {
{
let guard = read_lock(state).await;
let guard = read_lock(state).await?;
if guard.knows_chain_is_active() {
return Ok(guard);
}
}
{
let mut guard = write_lock(state).await;
let mut guard = write_lock(state).await?;
guard.initialize_and_save_if_needed().await?;
}
Ok(read_lock(state).await)
read_lock(state).await
}

/// Acquires a write lock, updating the last-access timestamp.
Expand All @@ -183,10 +185,11 @@ pub(crate) async fn read_lock_initialized<S: Storage + Clone + 'static>(
/// when dropped, ensuring cancellation safety.
pub(crate) async fn write_lock<S: Storage + Clone + 'static>(
state: &Arc<RwLock<ChainWorkerState<S>>>,
) -> RollbackGuard<S> {
) -> Result<RollbackGuard<S>, WorkerError> {
let guard = RollbackGuard(state.clone().write_owned().await);
guard.check_not_poisoned()?;
guard.touch();
guard
Ok(guard)
}

/// Spawns a background task that keeps the chain state alive for at least `ttl`
Expand Down
137 changes: 119 additions & 18 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{
chain_worker::{handle::AtomicTimestamp, ChainWorkerConfig, DeliveryNotifier},
client::ListeningMode,
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, CrossChainRequest},
worker::{NetworkActions, Notification, Reason, WorkerError},
worker::{BatchRequest, NetworkActions, Notification, Reason, WorkerError},
};

/// Type alias for event subscriptions result.
Expand Down Expand Up @@ -116,7 +116,7 @@ where

/// The result of processing a cross-chain update.
pub(crate) enum CrossChainUpdateResult {
/// The update was applied and the chain was saved up to the given height.
/// The update was applied up to the given height. The caller must save.
Updated(BlockHeight),
/// All bundles were already received; nothing to do.
NothingToDo,
Expand Down Expand Up @@ -1147,42 +1147,135 @@ where
);
return Ok(CrossChainUpdateResult::NothingToDo);
}
// Save the chain.
self.save().await?;
Ok(CrossChainUpdateResult::Updated(last_updated_height))
}

/// Handles the cross-chain request confirming that the recipient was updated.
#[instrument(skip_all, fields(
chain_id = %self.chain_id(),
recipient = %recipient,
latest_height = %latest_height
%recipient,
%latest_height
))]
pub(crate) async fn confirm_updated_recipient(
&mut self,
recipient: ChainId,
latest_height: BlockHeight,
) -> Result<NetworkActions, WorkerError> {
let fully_delivered = self
) -> Result<bool, WorkerError> {
Ok(self
.chain
.mark_messages_as_received(&recipient, latest_height)
.await?
&& self
.all_messages_to_tracked_chains_delivered_up_to(latest_height)
.await?;
.await?)
}

// Send the next chunk of cross-chain messages for this recipient, if any.
let actions = self
.create_cross_chain_actions_for_recipient(recipient)
.await?;
/// Notifies delivery waiters that all messages up to `height` have been delivered.
pub(crate) fn notify_delivery(&self, height: BlockHeight) {
self.delivery_notifier.notify(height);
}

self.save().await?;
/// Processes a batch of cross-chain requests, performing at most one `save()`.
///
/// Both update and confirmation requests are handled together so that a
/// single write-lock acquisition covers all pending work for the chain.
pub(crate) async fn process_batch(&mut self, requests: Vec<BatchRequest>) {
let mut update_results = Vec::new();
let mut confirm_results = Vec::new();
let mut need_save = false;
let mut need_rollback = false;
let mut max_delivered_height: Option<BlockHeight> = None;

for request in requests {
match request {
BatchRequest::Update {
origin,
bundles,
previous_height,
result_tx,
} => {
if need_rollback {
send_result(result_tx, Err(WorkerError::BatchRolledBack));
continue;
}
let result = self
.process_cross_chain_update(origin, bundles, previous_height)
.await;
let update_result = match result {
Ok(update_result) => update_result,
Err(error) => {
need_rollback = true;
send_result(result_tx, Err(error));
continue;
}
};
match &update_result {
CrossChainUpdateResult::Updated(_) => need_save = true,
CrossChainUpdateResult::GapDetected { .. }
| CrossChainUpdateResult::NothingToDo => {}
}
update_results.push((result_tx, update_result));
}
BatchRequest::Confirm {
recipient,
latest_height,
result_tx,
} => {
if need_rollback {
send_result(result_tx, Err(WorkerError::BatchRolledBack));
continue;
}
match self
.confirm_updated_recipient(recipient, latest_height)
.await
{
Ok(fully_delivered) => {
need_save = true;
if fully_delivered {
max_delivered_height = Some(
max_delivered_height
.map_or(latest_height, |h| h.max(latest_height)),
);
}
confirm_results.push((result_tx, recipient));
}
Err(error) => {
need_rollback = true;
send_result(result_tx, Err(error));
}
}
}
}
}
if !need_rollback && need_save {
if let Err(error) = self.save().await {
tracing::error!(%error, "failed to save batch; rolling back");
need_rollback = true;
}
}
if need_rollback {
for (result_tx, _) in update_results {
send_result(result_tx, Err(WorkerError::BatchRolledBack));
}
for (result_tx, _) in confirm_results {
send_result(result_tx, Err(WorkerError::BatchRolledBack));
}
return;
}

if fully_delivered {
self.delivery_notifier.notify(latest_height);
if let Some(height) = max_delivered_height {
self.notify_delivery(height);
}

Ok(actions)
for (result_tx, update_result) in update_results {
send_result(result_tx, Ok(update_result));
}
for (result_tx, recipient) in confirm_results {
let result = self
.create_cross_chain_actions_for_recipient(recipient)
.await;
send_result(result_tx, result);
}
}

/// Handles a `RevertConfirm` request: walks backward through
Expand Down Expand Up @@ -2144,7 +2237,7 @@ where
#[instrument(skip_all, fields(
chain_id = %self.chain_id()
))]
async fn save(&mut self) -> Result<(), WorkerError> {
pub(crate) async fn save(&mut self) -> Result<(), WorkerError> {
if let Err(error) = self.chain.save().await {
if error.must_reload_view() {
tracing::error!(
Expand All @@ -2160,6 +2253,14 @@ where
}
}

/// Sends a result through a oneshot channel, logging at `debug` level if the
/// receiver has been dropped.
pub(crate) fn send_result<T>(sender: oneshot::Sender<T>, value: T) {
if sender.send(value).is_err() {
tracing::debug!("cannot send cross-chain result; receiver dropped");
}
}

/// Returns the missing indices and corresponding blob_ids.
fn missing_indices_blob_ids(maybe_blobs: &[(BlobId, Option<Blob>)]) -> (Vec<usize>, Vec<BlobId>) {
let mut missing_indices = Vec::new();
Expand Down
Loading
Loading