Skip to content

Commit 216de6a

Browse files
committed
Add async persistence logic in MonitorUpdatingPersister
In the next commit we'll add the ability to use an async `KVStore` as the backing for a `ChainMonitor`. Here we tee this up by adding an async API to `MonitorUpdatingPersisterAsync`. Its not intended for public use and is thus only `pub(crate)` but allows spawning all operations via a generic `FutureSpawner` trait, initiating the write via the `KVStore` before any `await`s (or async functions). Because we aren't going to make the `ChannelManager` (or `ChainMonitor`) fully async, we need a way to alert the `ChainMonitor` when a persistence completes, but we leave that for the next commit.
1 parent 5bc1417 commit 216de6a

File tree

1 file changed

+144
-37
lines changed

1 file changed

+144
-37
lines changed

lightning/src/util/persist.rs

Lines changed: 144 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use bitcoin::hashes::hex::FromHex;
1717
use bitcoin::{BlockHash, Txid};
1818

1919
use core::future::Future;
20+
use core::mem;
2021
use core::ops::Deref;
2122
use core::pin::Pin;
2223
use core::str::FromStr;
@@ -32,7 +33,8 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
3233
use crate::chain::transaction::OutPoint;
3334
use crate::ln::types::ChannelId;
3435
use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
35-
use crate::util::async_poll::dummy_waker;
36+
use crate::sync::Mutex;
37+
use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync};
3638
use crate::util::logger::Logger;
3739
use crate::util::ser::{Readable, ReadableArgs, Writeable};
3840

@@ -409,6 +411,21 @@ where
409411
Ok(res)
410412
}
411413

414+
/// A generic trait which is able to spawn futures in the background.
415+
pub trait FutureSpawner: Send + Sync + 'static {
416+
/// Spawns the given future as a background task.
417+
///
418+
/// This method MUST NOT block on the given future immediately.
419+
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, future: T);
420+
}
421+
422+
struct PanicingSpawner;
423+
impl FutureSpawner for PanicingSpawner {
424+
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, _: T) {
425+
unreachable!();
426+
}
427+
}
428+
412429
fn poll_sync_future<F: Future>(future: F) -> F::Output {
413430
let mut waker = dummy_waker();
414431
let mut ctx = task::Context::from_waker(&mut waker);
@@ -507,7 +524,7 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
507524
/// would like to get rid of them, consider using the
508525
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
509526
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
510-
(MonitorUpdatingPersisterAsync<KVStoreSyncWrapper<K>, L, ES, SP, BI, FE>)
527+
(MonitorUpdatingPersisterAsync<KVStoreSyncWrapper<K>, PanicingSpawner, L, ES, SP, BI, FE>)
511528
where
512529
K::Target: KVStoreSync,
513530
L::Target: Logger,
@@ -552,6 +569,7 @@ where
552569
) -> Self {
553570
MonitorUpdatingPersister(MonitorUpdatingPersisterAsync::new(
554571
KVStoreSyncWrapper(kv_store),
572+
PanicingSpawner,
555573
logger,
556574
maximum_pending_updates,
557575
entropy_source,
@@ -664,7 +682,8 @@ where
664682
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
665683
monitor: &ChannelMonitor<ChannelSigner>,
666684
) -> chain::ChannelMonitorUpdateStatus {
667-
let res = poll_sync_future(self.0.0.update_persisted_channel(monitor_name, update, monitor));
685+
let inner = Arc::clone(&self.0.0);
686+
let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor));
668687
match res {
669688
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
670689
Err(e) => {
@@ -689,8 +708,11 @@ where
689708
/// async versions of the public accessors.
690709
///
691710
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
692-
pub struct MonitorUpdatingPersisterAsync<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
693-
(Arc<MonitorUpdatingPersisterAsyncInner<K, L, ES, SP, BI, FE>>)
711+
///
712+
/// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used
713+
/// directly by the [`ChainMonitor`].
714+
pub struct MonitorUpdatingPersisterAsync<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
715+
(Arc<MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>>)
694716
where
695717
K::Target: KVStore,
696718
L::Target: Logger,
@@ -699,7 +721,7 @@ where
699721
BI::Target: BroadcasterInterface,
700722
FE::Target: FeeEstimator;
701723

702-
struct MonitorUpdatingPersisterAsyncInner<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
724+
struct MonitorUpdatingPersisterAsyncInner<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
703725
where
704726
K::Target: KVStore,
705727
L::Target: Logger,
@@ -709,6 +731,7 @@ where
709731
FE::Target: FeeEstimator,
710732
{
711733
kv_store: K,
734+
future_spawner: S,
712735
logger: L,
713736
maximum_pending_updates: u64,
714737
entropy_source: ES,
@@ -717,8 +740,8 @@ where
717740
fee_estimator: FE,
718741
}
719742

720-
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
721-
MonitorUpdatingPersisterAsync<K, L, ES, SP, BI, FE>
743+
impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
744+
MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
722745
where
723746
K::Target: KVStore,
724747
L::Target: Logger,
@@ -731,11 +754,12 @@ where
731754
///
732755
/// See [`MonitorUpdatingPersister::new`] for more info.
733756
pub fn new(
734-
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
735-
signer_provider: SP, broadcaster: BI, fee_estimator: FE,
757+
kv_store: K, future_spawner: S, logger: L, maximum_pending_updates: u64,
758+
entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE,
736759
) -> Self {
737760
MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner {
738761
kv_store,
762+
future_spawner,
739763
logger,
740764
maximum_pending_updates,
741765
entropy_source,
@@ -805,9 +829,70 @@ where
805829
}
806830
}
807831

832+
impl<K: Deref + MaybeSend + MaybeSync + 'static, S: FutureSpawner, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, SP: Deref + MaybeSend + MaybeSync + 'static, BI: Deref + MaybeSend + MaybeSync + 'static, FE: Deref + MaybeSend + MaybeSync + 'static>
833+
MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
834+
where
835+
K::Target: KVStore + MaybeSync,
836+
L::Target: Logger,
837+
ES::Target: EntropySource + Sized,
838+
SP::Target: SignerProvider + Sized,
839+
BI::Target: BroadcasterInterface,
840+
FE::Target: FeeEstimator,
841+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
842+
{
843+
pub(crate) fn spawn_async_persist_new_channel(
844+
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
845+
) {
846+
let inner = Arc::clone(&self.0);
847+
let future = inner.persist_new_channel(monitor_name, monitor);
848+
let channel_id = monitor.channel_id();
849+
self.0.future_spawner.spawn(async move {
850+
match future.await {
851+
Ok(()) => {}, // TODO: expose completions
852+
Err(e) => {
853+
log_error!(
854+
inner.logger,
855+
"Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
856+
);
857+
},
858+
}
859+
});
860+
}
808861

809-
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
810-
MonitorUpdatingPersisterAsyncInner<K, L, ES, SP, BI, FE>
862+
pub(crate) fn spawn_async_update_persisted_channel(
863+
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
864+
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
865+
) {
866+
let inner = Arc::clone(&self.0);
867+
let future = inner.update_persisted_channel(monitor_name, update, monitor);
868+
let channel_id = monitor.channel_id();
869+
let inner = Arc::clone(&self.0);
870+
self.0.future_spawner.spawn(async move {
871+
match future.await {
872+
Ok(()) => {}, // TODO: expose completions
873+
Err(e) => {
874+
log_error!(
875+
inner.logger,
876+
"Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
877+
);
878+
},
879+
}
880+
});
881+
}
882+
883+
pub(crate) fn spawn_async_archive_persisted_channel(
884+
&self, monitor_name: MonitorName,
885+
) {
886+
let inner = Arc::clone(&self.0);
887+
self.0.future_spawner.spawn(async move {
888+
inner.archive_persisted_channel(monitor_name).await;
889+
});
890+
}
891+
}
892+
893+
894+
impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
895+
MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>
811896
where
812897
K::Target: KVStore,
813898
L::Target: Logger,
@@ -931,7 +1016,7 @@ where
9311016
let monitor_name = MonitorName::from_str(&monitor_key)?;
9321017
let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key).await?;
9331018
let latest_update_id = current_monitor.get_latest_update_id();
934-
self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy).await;
1019+
self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy).await?;
9351020
}
9361021
Ok(())
9371022
}
@@ -958,9 +1043,9 @@ where
9581043
Ok(())
9591044
}
9601045

961-
async fn persist_new_channel<ChannelSigner: EcdsaChannelSigner>(
1046+
fn persist_new_channel<ChannelSigner: EcdsaChannelSigner>(
9621047
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
963-
) -> Result<(), io::Error> {
1048+
) -> impl Future<Output = Result<(), io::Error>> {
9641049
// Determine the proper key for this monitor
9651050
let monitor_key = monitor_name.to_string();
9661051
// Serialize and write the new monitor
@@ -974,55 +1059,77 @@ where
9741059
monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
9751060
}
9761061
monitor.write(&mut monitor_bytes).unwrap();
1062+
// Note that this is NOT an async function, but rather calls the *sync* KVStore write
1063+
// method, allowing it to do its queueing immediately, and then return a future for the
1064+
// completion of the write. This ensures monitor persistence ordering is preserved.
9771065
self.kv_store.write(
9781066
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
9791067
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
9801068
monitor_key.as_str(),
9811069
monitor_bytes,
982-
).await
1070+
)
9831071
}
9841072

985-
async fn update_persisted_channel<ChannelSigner: EcdsaChannelSigner>(
986-
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
1073+
fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>(
1074+
self: Arc<Self>, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
9871075
monitor: &ChannelMonitor<ChannelSigner>,
988-
) -> Result<(), io::Error> {
1076+
) -> impl Future<Output = Result<(), io::Error>> + 'a where Self: 'a {
9891077
const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX;
1078+
let mut res_a = None;
1079+
let mut res_b = None;
1080+
let mut res_c = None;
9901081
if let Some(update) = update {
9911082
let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
9921083
&& self.maximum_pending_updates != 0
9931084
&& update.update_id % self.maximum_pending_updates != 0;
9941085
if persist_update {
9951086
let monitor_key = monitor_name.to_string();
9961087
let update_name = UpdateName::from(update.update_id);
997-
self.kv_store.write(
1088+
res_a = Some(self.kv_store.write(
9981089
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
9991090
monitor_key.as_str(),
10001091
update_name.as_str(),
10011092
update.encode(),
1002-
).await
1093+
));
10031094
} else {
10041095
// We could write this update, but it meets criteria of our design that calls for a full monitor write.
1005-
let write_status = self.persist_new_channel(monitor_name, monitor).await;
1006-
1007-
if let Ok(()) = write_status {
1008-
let channel_closed_legacy =
1009-
monitor.get_latest_update_id() == LEGACY_CLOSED_CHANNEL_UPDATE_ID;
1010-
let latest_update_id = monitor.get_latest_update_id();
1011-
if channel_closed_legacy {
1012-
let monitor_key = monitor_name.to_string();
1013-
self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, true).await;
1014-
} else {
1015-
let end = latest_update_id;
1016-
let start = end.saturating_sub(self.maximum_pending_updates);
1017-
self.cleanup_in_range(monitor_name, start, end).await;
1096+
let write_fut = self.persist_new_channel(monitor_name, monitor);
1097+
let latest_update_id = monitor.get_latest_update_id();
1098+
1099+
res_b = Some(async move {
1100+
let write_status = write_fut.await;
1101+
if let Ok(()) = write_status {
1102+
if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID {
1103+
let monitor_key = monitor_name.to_string();
1104+
self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, true).await?;
1105+
} else {
1106+
let end = latest_update_id;
1107+
let start = end.saturating_sub(self.maximum_pending_updates);
1108+
self.cleanup_in_range(monitor_name, start, end).await;
1109+
}
10181110
}
1019-
}
10201111

1021-
write_status
1112+
write_status
1113+
});
10221114
}
10231115
} else {
10241116
// There is no update given, so we must persist a new monitor.
1025-
self.persist_new_channel(monitor_name, monitor).await
1117+
// Note that this is NOT an async function, but rather calls the *sync* KVStore write
1118+
// method, allowing it to do its queueing immediately, and then return a future for the
1119+
// completion of the write. This ensures monitor persistence ordering is preserved.
1120+
res_c = Some(self.persist_new_channel(monitor_name, monitor));
1121+
}
1122+
async move {
1123+
if let Some(a) = res_a {
1124+
a.await?;
1125+
}
1126+
if let Some(b) = res_b {
1127+
b.await?;
1128+
}
1129+
if let Some(c) = res_c {
1130+
c.await?;
1131+
}
1132+
Ok(())
10261133
}
10271134
}
10281135

0 commit comments

Comments
 (0)