Skip to content

Commit c5c70bc

Browse files
committed
Add support for native async KVStore persist to ChainMonitor
This finally adds support for full native Rust `async` persistence to `ChainMonitor`. Way back when, before we had any other persistence, we added the `Persist` trait to persist `ChannelMonitor`s. It eventualy grew homegrown async persistence support via a simple immediate return and callback upon completion. We later added a persistence trait in `lightning-background-processor` to persist the few fields that it needed to drive writes for. Over time, we found more places where persistence was useful, and we eventually added a generic `KVStore` trait. In dc75436 we removed the `lightning-background-processor` `Persister` in favor of simply using the native `KVStore` directly. Here we continue that trend, building native `async` `ChannelMonitor` persistence on top of our native `KVStore` rather than hacking support for it into the `chain::Persist` trait. Because `MonitorUpdatingPersister` already exists as a common way to wrap a `KVStore` into a `ChannelMonitor` persister, we build exclusively on that (though note that the "monitor updating" part is now optional), utilizing its new async option as our native async driver. Thus, we end up with a `ChainMonitor::new_async_beta` which takes a `MonitorUpdatingPersisterAsync` rather than a classic `chain::Persist` and then operates the same as a normal `ChainMonitor`. While the requirement that users now use a `MonitorUpdatingPersister` to wrap their `KVStore` before providing it to `ChainMonitor` is somewhat awkward, as we move towards a `KVStore`-only world it seems like `MonitorUpdatingPersister` should eventually merge into `ChainMonitor`.
1 parent 216de6a commit c5c70bc

File tree

2 files changed

+148
-5
lines changed

2 files changed

+148
-5
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,13 @@ use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHol
4646
use crate::ln::types::ChannelId;
4747
use crate::prelude::*;
4848
use crate::sign::ecdsa::EcdsaChannelSigner;
49-
use crate::sign::{EntropySource, PeerStorageKey};
49+
use crate::sign::{EntropySource, PeerStorageKey, SignerProvider};
5050
use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
5151
use crate::types::features::{InitFeatures, NodeFeatures};
52+
use crate::util::async_poll::{MaybeSend, MaybeSync};
5253
use crate::util::errors::APIError;
5354
use crate::util::logger::{Logger, WithContext};
54-
use crate::util::persist::MonitorName;
55+
use crate::util::persist::{FutureSpawner, MonitorName, MonitorUpdatingPersisterAsync, KVStore};
5556
#[cfg(peer_storage)]
5657
use crate::util::ser::{VecWriter, Writeable};
5758
use crate::util::wakers::{Future, Notifier};
@@ -192,6 +193,15 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
192193
/// restart, this method must in that case be idempotent, ensuring it can handle scenarios where
193194
/// the monitor already exists in the archive.
194195
fn archive_persisted_channel(&self, monitor_name: MonitorName);
196+
197+
/// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
198+
/// [`Self::update_persisted_channel`], which have completed.
199+
///
200+
/// Returning an update here is equivalent to calling
201+
/// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
202+
/// hidden in the docs.
203+
#[doc(hidden)]
204+
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { Vec::new() }
195205
}
196206

197207
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -235,6 +245,73 @@ impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, Chann
235245
}
236246
}
237247

248+
249+
/// An unconstructable [`Persist`]er which is used under the hood when you call
250+
/// [`ChainMonitor::new_async_beta`].
251+
pub struct AsyncPersister<K: Deref + MaybeSend + MaybeSync + 'static, S: FutureSpawner, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, SP: Deref + MaybeSend + MaybeSync + 'static, BI: Deref + MaybeSend + MaybeSync + 'static, FE: Deref + MaybeSend + MaybeSync + 'static>
252+
where
253+
K::Target: KVStore + MaybeSync,
254+
L::Target: Logger,
255+
ES::Target: EntropySource + Sized,
256+
SP::Target: SignerProvider + Sized,
257+
BI::Target: BroadcasterInterface,
258+
FE::Target: FeeEstimator
259+
{
260+
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
261+
}
262+
263+
impl<K: Deref + MaybeSend + MaybeSync + 'static, S: FutureSpawner, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, SP: Deref + MaybeSend + MaybeSync + 'static, BI: Deref + MaybeSend + MaybeSync + 'static, FE: Deref + MaybeSend + MaybeSync + 'static>
264+
Deref for AsyncPersister<K, S, L, ES, SP, BI, FE>
265+
where
266+
K::Target: KVStore + MaybeSync,
267+
L::Target: Logger,
268+
ES::Target: EntropySource + Sized,
269+
SP::Target: SignerProvider + Sized,
270+
BI::Target: BroadcasterInterface,
271+
FE::Target: FeeEstimator
272+
{
273+
type Target = Self;
274+
fn deref(&self) -> &Self {
275+
self
276+
}
277+
}
278+
279+
impl<K: Deref + MaybeSend + MaybeSync + 'static, S: FutureSpawner, L: Deref + MaybeSend + MaybeSync + 'static, ES: Deref + MaybeSend + MaybeSync + 'static, SP: Deref + MaybeSend + MaybeSync + 'static, BI: Deref + MaybeSend + MaybeSync + 'static, FE: Deref + MaybeSend + MaybeSync + 'static>
280+
Persist<<SP::Target as SignerProvider>::EcdsaSigner> for AsyncPersister<K, S, L, ES, SP, BI, FE>
281+
where
282+
K::Target: KVStore + MaybeSync,
283+
L::Target: Logger,
284+
ES::Target: EntropySource + Sized,
285+
SP::Target: SignerProvider + Sized,
286+
BI::Target: BroadcasterInterface,
287+
FE::Target: FeeEstimator,
288+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
289+
{
290+
fn persist_new_channel(
291+
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
292+
) -> ChannelMonitorUpdateStatus {
293+
self.persister.spawn_async_persist_new_channel(monitor_name, monitor);
294+
ChannelMonitorUpdateStatus::InProgress
295+
}
296+
297+
fn update_persisted_channel(
298+
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
299+
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
300+
) -> ChannelMonitorUpdateStatus {
301+
self.persister.spawn_async_update_persisted_channel(monitor_name, monitor_update, monitor);
302+
ChannelMonitorUpdateStatus::InProgress
303+
}
304+
305+
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
306+
self.persister.spawn_async_archive_persisted_channel(monitor_name);
307+
}
308+
309+
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
310+
self.persister.get_and_clear_completed_updates()
311+
}
312+
}
313+
314+
238315
/// An implementation of [`chain::Watch`] for monitoring channels.
239316
///
240317
/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
@@ -291,6 +368,55 @@ pub struct ChainMonitor<
291368
our_peerstorage_encryption_key: PeerStorageKey,
292369
}
293370

371+
impl<
372+
K: Deref + MaybeSend + MaybeSync + 'static,
373+
S: FutureSpawner,
374+
SP: Deref + MaybeSend + MaybeSync + 'static,
375+
C: Deref,
376+
T: Deref + MaybeSend + MaybeSync + 'static,
377+
F: Deref + MaybeSend + MaybeSync + 'static,
378+
L: Deref + MaybeSend + MaybeSync + 'static,
379+
ES: Deref + MaybeSend + MaybeSync + 'static,
380+
> ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, C, T, F, L, AsyncPersister<K, S, L, ES, SP, T, F>, ES>
381+
where
382+
K::Target: KVStore + MaybeSync,
383+
SP::Target: SignerProvider + Sized,
384+
C::Target: chain::Filter,
385+
T::Target: BroadcasterInterface,
386+
F::Target: FeeEstimator,
387+
L::Target: Logger,
388+
ES::Target: EntropySource + Sized,
389+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
390+
{
391+
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
392+
///
393+
/// This behaves the same as [`ChainMonitor::new`] except that it relies on
394+
/// [`MonitorUpdatingPersisterAsync`] and thus allows persistence to be completed async.
395+
///
396+
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
397+
pub fn new_async_beta(
398+
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F,
399+
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
400+
_our_peerstorage_encryption_key: PeerStorageKey,
401+
) -> Self {
402+
Self {
403+
monitors: RwLock::new(new_hash_map()),
404+
chain_source,
405+
broadcaster,
406+
logger,
407+
fee_estimator: feeest,
408+
persister: AsyncPersister { persister },
409+
_entropy_source,
410+
pending_monitor_events: Mutex::new(Vec::new()),
411+
highest_chain_height: AtomicUsize::new(0),
412+
event_notifier: Notifier::new(),
413+
pending_send_only_events: Mutex::new(Vec::new()),
414+
#[cfg(peer_storage)]
415+
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
416+
}
417+
}
418+
}
419+
294420
impl<
295421
ChannelSigner: EcdsaChannelSigner,
296422
C: Deref,
@@ -1357,6 +1483,9 @@ where
13571483
fn release_pending_monitor_events(
13581484
&self,
13591485
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1486+
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
1487+
self.channel_monitor_updated(channel_id, update_id);
1488+
}
13601489
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
13611490
for monitor_state in self.monitors.read().unwrap().values() {
13621491
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();

lightning/src/util/persist.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,7 @@ where
710710
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
711711
///
712712
/// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used
713-
/// directly by the [`ChainMonitor`].
713+
/// directly by the [`ChainMonitor`] via [`ChainMonitor::new_async_beta`].
714714
pub struct MonitorUpdatingPersisterAsync<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
715715
(Arc<MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>>)
716716
where
@@ -731,6 +731,7 @@ where
731731
FE::Target: FeeEstimator,
732732
{
733733
kv_store: K,
734+
async_completed_updates: Mutex<Vec<(ChannelId, u64)>>,
734735
future_spawner: S,
735736
logger: L,
736737
maximum_pending_updates: u64,
@@ -759,6 +760,7 @@ where
759760
) -> Self {
760761
MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner {
761762
kv_store,
763+
async_completed_updates: Mutex::new(Vec::new()),
762764
future_spawner,
763765
logger,
764766
maximum_pending_updates,
@@ -846,9 +848,10 @@ where
846848
let inner = Arc::clone(&self.0);
847849
let future = inner.persist_new_channel(monitor_name, monitor);
848850
let channel_id = monitor.channel_id();
851+
let completion = (monitor.channel_id(), monitor.get_latest_update_id());
849852
self.0.future_spawner.spawn(async move {
850853
match future.await {
851-
Ok(()) => {}, // TODO: expose completions
854+
Ok(()) => inner.async_completed_updates.lock().unwrap().push(completion),
852855
Err(e) => {
853856
log_error!(
854857
inner.logger,
@@ -866,10 +869,17 @@ where
866869
let inner = Arc::clone(&self.0);
867870
let future = inner.update_persisted_channel(monitor_name, update, monitor);
868871
let channel_id = monitor.channel_id();
872+
let completion = if let Some(update) = update {
873+
Some((monitor.channel_id(), update.update_id))
874+
} else {
875+
None
876+
};
869877
let inner = Arc::clone(&self.0);
870878
self.0.future_spawner.spawn(async move {
871879
match future.await {
872-
Ok(()) => {}, // TODO: expose completions
880+
Ok(()) => if let Some(completion) = completion {
881+
inner.async_completed_updates.lock().unwrap().push(completion);
882+
},
873883
Err(e) => {
874884
log_error!(
875885
inner.logger,
@@ -888,6 +898,10 @@ where
888898
inner.archive_persisted_channel(monitor_name).await;
889899
});
890900
}
901+
902+
pub(crate) fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
903+
mem::take(&mut *self.0.async_completed_updates.lock().unwrap())
904+
}
891905
}
892906

893907

0 commit comments

Comments
 (0)