Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Result<Option<>> rather than Option<Option<>> #9119

Merged
merged 2 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions frame/election-provider-multi-phase/src/unsigned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use sp_npos_elections::{
CompactSolution, ElectionResult, assignment_ratio_to_staked_normalized,
assignment_staked_to_ratio_normalized, is_score_better, seq_phragmen,
};
use sp_runtime::{offchain::storage::StorageValueRef, traits::TrailingZeroInput, SaturatedConversion};
use sp_runtime::{
offchain::storage::{MutateStorageError, StorageValueRef},
traits::TrailingZeroInput, SaturatedConversion
};
use sp_std::{cmp::Ordering, convert::TryFrom, vec::Vec};

/// Storage key used to store the last block number at which offchain worker ran.
Expand Down Expand Up @@ -98,9 +101,9 @@ fn save_solution<T: Config>(call: &Call<T>) -> Result<(), MinerError> {
log!(debug, "saving a call to the offchain storage.");
let storage = StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL);
match storage.mutate::<_, (), _>(|_| Ok(call.clone())) {
Ok(Ok(_)) => Ok(()),
Ok(Err(_)) => Err(MinerError::FailedToStoreSolution),
Err(_) => {
Ok(_) => Ok(()),
Err(MutateStorageError::ConcurrentModification(_)) => Err(MinerError::FailedToStoreSolution),
Err(MutateStorageError::ValueFunctionFailed(_)) => {
// this branch should be unreachable according to the definition of
// `StorageValueRef::mutate`: that function should only ever `Err` if the closure we
// pass it returns an error. however, for safety in case the definition changes, we do
Expand All @@ -114,6 +117,7 @@ fn save_solution<T: Config>(call: &Call<T>) -> Result<(), MinerError> {
fn restore_solution<T: Config>() -> Result<Call<T>, MinerError> {
StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL)
.get()
.ok()
.flatten()
.ok_or(MinerError::NoStoredSolution)
}
Expand All @@ -135,12 +139,9 @@ fn clear_offchain_repeat_frequency() {
}

/// `true` when OCW storage contains a solution
///
/// More precise than `restore_solution::<T>().is_ok()`; that invocation will return `false`
/// if a solution exists but cannot be decoded, whereas this just checks whether an item is present.
#[cfg(test)]
fn ocw_solution_exists<T: Config>() -> bool {
StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL).get::<Call<T>>().is_some()
matches!(StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL).get::<Call<T>>(), Ok(Some(_)))
}

impl<T: Config> Pallet<T> {
Expand Down Expand Up @@ -584,13 +585,13 @@ impl<T: Config> Pallet<T> {
let last_block = StorageValueRef::persistent(&OFFCHAIN_LAST_BLOCK);

let mutate_stat = last_block.mutate::<_, &'static str, _>(
|maybe_head: Option<Option<T::BlockNumber>>| {
|maybe_head: Result<Option<T::BlockNumber>, _>| {
match maybe_head {
Some(Some(head)) if now < head => Err("fork."),
Some(Some(head)) if now >= head && now <= head + threshold => {
Ok(Some(head)) if now < head => Err("fork."),
Ok(Some(head)) if now >= head && now <= head + threshold => {
Err("recently executed.")
}
Some(Some(head)) if now > head + threshold => {
Ok(Some(head)) if now > head + threshold => {
// we can run again now. Write the new head.
Ok(now)
}
Expand All @@ -604,11 +605,12 @@ impl<T: Config> Pallet<T> {

match mutate_stat {
// all good
Ok(Ok(_)) => Ok(()),
Ok(_) => Ok(()),
// failed to write.
Ok(Err(_)) => Err(MinerError::Lock("failed to write to offchain db.")),
Err(MutateStorageError::ConcurrentModification(_)) =>
Err(MinerError::Lock("failed to write to offchain db (concurrent modification).")),
// fork etc.
Err(why) => Err(MinerError::Lock(why)),
Err(MutateStorageError::ValueFunctionFailed(why)) => Err(MinerError::Lock(why)),
}
}

Expand Down Expand Up @@ -1117,15 +1119,15 @@ mod tests {
assert!(MultiPhase::current_phase().is_unsigned());

// initially, the lock is not set.
assert!(guard.get::<bool>().is_none());
assert!(guard.get::<bool>().unwrap().is_none());

// a successful a-z execution.
MultiPhase::offchain_worker(25);
assert_eq!(pool.read().transactions.len(), 1);

// afterwards, the lock is not set either..
assert!(guard.get::<bool>().is_none());
assert_eq!(last_block.get::<BlockNumber>().unwrap().unwrap(), 25);
assert!(guard.get::<bool>().unwrap().is_none());
assert_eq!(last_block.get::<BlockNumber>().unwrap(), Some(25));
});
}

Expand Down Expand Up @@ -1280,7 +1282,7 @@ mod tests {
// this ensures that when the resubmit window rolls around, we're ready to regenerate
// from scratch if necessary
let mut call_cache = StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL);
assert!(matches!(call_cache.get::<Call<Runtime>>(), Some(Some(_call))));
assert!(matches!(call_cache.get::<Call<Runtime>>(), Ok(Some(_call))));
call_cache.clear();

// attempts to resubmit the tx after the threshold has expired
Expand Down
16 changes: 6 additions & 10 deletions frame/example-offchain-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use frame_support::traits::Get;
use sp_core::crypto::KeyTypeId;
use sp_runtime::{
RuntimeDebug,
offchain::{http, Duration, storage::StorageValueRef},
offchain::{http, Duration, storage::{MutateStorageError, StorageRetrievalError, StorageValueRef}},
traits::Zero,
transaction_validity::{InvalidTransaction, ValidTransaction, TransactionValidity},
};
Expand Down Expand Up @@ -366,15 +366,11 @@ impl<T: Config> Pallet<T> {
// low-level method of local storage API, which means that only one worker
// will be able to "acquire a lock" and send a transaction if multiple workers
// happen to be executed concurrently.
let res = val.mutate(|last_send: Option<Option<T::BlockNumber>>| {
// We match on the value decoded from the storage. The first `Option`
// indicates if the value was present in the storage at all,
// the second (inner) `Option` indicates if the value was succesfuly
// decoded to expected type (`T::BlockNumber` in our case).
let res = val.mutate(|last_send: Result<Option<T::BlockNumber>, StorageRetrievalError>| {
match last_send {
// If we already have a value in storage and the block number is recent enough
// we avoid sending another transaction at this time.
Some(Some(block)) if block_number < block + T::GracePeriod::get() => {
Ok(Some(block)) if block_number < block + T::GracePeriod::get() => {
Err(RECENTLY_SENT)
},
// In every other case we attempt to acquire the lock and send a transaction.
Expand All @@ -390,7 +386,7 @@ impl<T: Config> Pallet<T> {
// written to in the meantime.
match res {
// The value has been set correctly, which means we can safely send a transaction now.
Ok(Ok(block_number)) => {
Ok(block_number) => {
// Depending if the block is even or odd we will send a `Signed` or `Unsigned`
// transaction.
// Note that this logic doesn't really guarantee that the transactions will be sent
Expand All @@ -406,13 +402,13 @@ impl<T: Config> Pallet<T> {
else { TransactionType::Raw }
},
// We are in the grace period, we should not send a transaction this time.
Err(RECENTLY_SENT) => TransactionType::None,
Err(MutateStorageError::ValueFunctionFailed(RECENTLY_SENT)) => TransactionType::None,
// We wanted to send a transaction, but failed to write the block number (acquire a
// lock). This indicates that another offchain worker that was running concurrently
// most likely executed the same logic and succeeded at writing to storage.
// Thus we don't really want to send the transaction, knowing that the other run
// already did.
Ok(Err(_)) => TransactionType::None,
Err(MutateStorageError::ConcurrentModification(_)) => TransactionType::None,
}
}

Expand Down
12 changes: 8 additions & 4 deletions frame/im-online/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ use sp_core::offchain::OpaqueNetworkState;
use sp_std::prelude::*;
use sp_std::convert::TryInto;
use sp_runtime::{
offchain::storage::StorageValueRef,
offchain::storage::{MutateStorageError, StorageRetrievalError, StorageValueRef},
traits::{AtLeast32BitUnsigned, Convert, Saturating, TrailingZeroInput},
Perbill, Permill, PerThing, RuntimeDebug, SaturatedConversion,
};
Expand Down Expand Up @@ -719,14 +719,15 @@ impl<T: Config> Pallet<T> {
key
};
let storage = StorageValueRef::persistent(&key);
let res = storage.mutate(|status: Option<Option<HeartbeatStatus<T::BlockNumber>>>| {
let res = storage.mutate(
|status: Result<Option<HeartbeatStatus<T::BlockNumber>>, StorageRetrievalError>| {
// Check if there is already a lock for that particular block.
// This means that the heartbeat has already been sent, and we are just waiting
// for it to be included. However if it doesn't get included for INCLUDE_THRESHOLD
// we will re-send it.
match status {
// we are still waiting for inclusion.
Some(Some(status)) if status.is_recent(session_index, now) => {
Ok(Some(status)) if status.is_recent(session_index, now) => {
Err(OffchainErr::WaitingForInclusion(status.sent_at))
},
// attempt to set new status
Expand All @@ -735,7 +736,10 @@ impl<T: Config> Pallet<T> {
sent_at: now,
}),
}
})?;
});
if let Err(MutateStorageError::ValueFunctionFailed(err)) = res {
return Err(err);
}

let mut new_status = res.map_err(|_| OffchainErr::FailedToAcquireLock)?;

Expand Down
22 changes: 13 additions & 9 deletions frame/session/src/historical/offchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
//! This is used in conjunction with [`ProvingTrie`](super::ProvingTrie) and
//! the off-chain indexing API.

use sp_runtime::{offchain::storage::StorageValueRef, KeyTypeId};
use sp_runtime::{
offchain::storage::{MutateStorageError, StorageRetrievalError, StorageValueRef},
KeyTypeId
};
use sp_session::MembershipProof;

use super::super::{Pallet as SessionModule, SessionIndex};
Expand All @@ -49,6 +52,7 @@ impl<T: Config> ValidatorSet<T> {
let derived_key = shared::derive_key(shared::PREFIX, session_index);
StorageValueRef::persistent(derived_key.as_ref())
.get::<Vec<(T::ValidatorId, T::FullIdentification)>>()
.ok()
.flatten()
.map(|validator_set| Self { validator_set })
}
Expand Down Expand Up @@ -100,19 +104,19 @@ pub fn prove_session_membership<T: Config, D: AsRef<[u8]>>(
pub fn prune_older_than<T: Config>(first_to_keep: SessionIndex) {
let derived_key = shared::LAST_PRUNE.to_vec();
let entry = StorageValueRef::persistent(derived_key.as_ref());
match entry.mutate(|current: Option<Option<SessionIndex>>| -> Result<_, ()> {
match entry.mutate(|current: Result<Option<SessionIndex>, StorageRetrievalError>| -> Result<_, ()> {
match current {
Some(Some(current)) if current < first_to_keep => Ok(first_to_keep),
Ok(Some(current)) if current < first_to_keep => Ok(first_to_keep),
// do not move the cursor, if the new one would be behind ours
Some(Some(current)) => Ok(current),
None => Ok(first_to_keep),
Ok(Some(current)) => Ok(current),
Ok(None) => Ok(first_to_keep),
// if the storage contains undecodable data, overwrite with current anyways
// which might leak some entries being never purged, but that is acceptable
// in this context
Some(None) => Ok(first_to_keep),
Err(_) => Ok(first_to_keep),
}
}) {
Ok(Ok(new_value)) => {
Ok(new_value) => {
// on a re-org this is not necessarily true, with the above they might be equal
if new_value < first_to_keep {
for session_index in new_value..first_to_keep {
Expand All @@ -121,8 +125,8 @@ pub fn prune_older_than<T: Config>(first_to_keep: SessionIndex) {
}
}
}
Ok(Err(_)) => {} // failed to store the value calculated with the given closure
Err(_) => {} // failed to calculate the value to store with the given closure
Err(MutateStorageError::ConcurrentModification(_)) => {}
Err(MutateStorageError::ValueFunctionFailed(_)) => {}
}
}

Expand Down
80 changes: 54 additions & 26 deletions primitives/runtime/src/offchain/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@ pub struct StorageValueRef<'a> {
kind: StorageKind,
}

/// Reason for not being able to provide the stored value
#[derive(Debug, PartialEq, Eq)]
pub enum StorageRetrievalError {
/// Value found but undecodable
Undecodable,
}

/// Possible errors when mutating a storage value.
#[derive(Debug, PartialEq, Eq)]
pub enum MutateStorageError<T, E> {
/// The underlying db failed to update due to a concurrent modification.
/// Contains the new value that was not stored.
ConcurrentModification(T),
/// The function given to us to create the value to be stored failed.
/// May be used to signal that having looked at the existing value,
/// they don't want to mutate it.
ValueFunctionFailed(E)
}

impl<'a> StorageValueRef<'a> {
/// Create a new reference to a value in the persistent local storage.
pub fn persistent(key: &'a [u8]) -> Self {
Expand Down Expand Up @@ -58,30 +77,40 @@ impl<'a> StorageValueRef<'a> {
/// Retrieve & decode the value from storage.
///
/// Note that if you want to do some checks based on the value
/// and write changes after that you should rather be using `mutate`.
/// and write changes after that, you should rather be using `mutate`.
///
/// The function returns `None` if the value was not found in storage,
/// otherwise a decoding of the value to requested type.
pub fn get<T: codec::Decode>(&self) -> Option<Option<T>> {
/// Returns the value if stored.
/// Returns an error if the value could not be decoded.
pub fn get<T: codec::Decode>(&self) -> Result<Option<T>, StorageRetrievalError> {
sp_io::offchain::local_storage_get(self.kind, self.key)
.map(|val| T::decode(&mut &*val).ok())
.map(|val| T::decode(&mut &*val)
.map_err(|_| StorageRetrievalError::Undecodable))
.transpose()
}

/// Retrieve & decode the value and set it to a new one atomically.
/// Retrieve & decode the current value and set it to a new value atomically.
///
/// Function `mutate_val` takes as input the current value and should
/// return a new value that is attempted to be written to storage.
///
/// Function `f` should return a new value that we should attempt to write to storage.
/// This function returns:
/// 1. `Ok(Ok(T))` in case the value has been successfully set.
/// 2. `Ok(Err(T))` in case the value was calculated by the passed closure `f`,
/// but it could not be stored.
/// 3. `Err(_)` in case `f` returns an error.
pub fn mutate<T, E, F>(&self, f: F) -> Result<Result<T, T>, E> where
/// 1. `Ok(T)` in case the value has been successfully set.
/// 2. `Err(MutateStorageError::ConcurrentModification(T))` in case the value was calculated
/// by the passed closure `mutate_val`, but it could not be stored.
/// 3. `Err(MutateStorageError::ValueFunctionFailed(_))` in case `mutate_val` returns an error.
pub fn mutate<T, E, F>(&self, mutate_val: F) -> Result<T, MutateStorageError<T,E>> where
T: codec::Codec,
F: FnOnce(Option<Option<T>>) -> Result<T, E>
F: FnOnce(Result<Option<T>, StorageRetrievalError>) -> Result<T, E>
{
let value = sp_io::offchain::local_storage_get(self.kind, self.key);
let decoded = value.as_deref().map(|mut v| T::decode(&mut v).ok());
let val = f(decoded)?;
let decoded = value.as_deref()
.map(|mut bytes| {
T::decode(&mut bytes)
.map_err(|_| StorageRetrievalError::Undecodable)
}).transpose();

let val = mutate_val(decoded).map_err(|err| MutateStorageError::ValueFunctionFailed(err))?;

let set = val.using_encoded(|new_val| {
sp_io::offchain::local_storage_compare_and_set(
self.kind,
Expand All @@ -90,11 +119,10 @@ impl<'a> StorageValueRef<'a> {
new_val,
)
});

if set {
Ok(Ok(val))
Ok(val)
} else {
Ok(Err(val))
Err(MutateStorageError::ConcurrentModification(val))
}
}
}
Expand All @@ -117,12 +145,12 @@ mod tests {
t.execute_with(|| {
let val = StorageValue::persistent(b"testval");

assert_eq!(val.get::<u32>(), None);
assert_eq!(val.get::<u32>(), Ok(None));

val.set(&15_u32);

assert_eq!(val.get::<u32>(), Some(Some(15_u32)));
assert_eq!(val.get::<Vec<u8>>(), Some(None));
assert_eq!(val.get::<u32>(), Ok(Some(15_u32)));
assert_eq!(val.get::<Vec<u8>>(), Err(StorageRetrievalError::Undecodable));
assert_eq!(
state.read().persistent_storage.get(b"testval"),
Some(vec![15_u8, 0, 0, 0])
Expand All @@ -140,23 +168,23 @@ mod tests {
let val = StorageValue::persistent(b"testval");

let result = val.mutate::<u32, (), _>(|val| {
assert_eq!(val, None);
assert_eq!(val, Ok(None));

Ok(16_u32)
});
assert_eq!(result, Ok(Ok(16_u32)));
assert_eq!(val.get::<u32>(), Some(Some(16_u32)));
assert_eq!(result, Ok(16_u32));
assert_eq!(val.get::<u32>(), Ok(Some(16_u32)));
assert_eq!(
state.read().persistent_storage.get(b"testval"),
Some(vec![16_u8, 0, 0, 0])
);

// mutate again, but this time early-exit.
let res = val.mutate::<u32, (), _>(|val| {
assert_eq!(val, Some(Some(16_u32)));
assert_eq!(val, Ok(Some(16_u32)));
Err(())
});
assert_eq!(res, Err(()));
assert_eq!(res, Err(MutateStorageError::ValueFunctionFailed(())));
})
}
}
Loading