Skip to content

Commit 9afb72c

Browse files
committed
Add async KVStore
1 parent 1908f09 commit 9afb72c

File tree

4 files changed

+439
-168
lines changed

4 files changed

+439
-168
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 138 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ mod fwd_batch;
3030

3131
use fwd_batch::BatchDelay;
3232

33-
use crate::lightning::util::ser::Writeable;
3433
use lightning::chain;
3534
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
3635
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
@@ -40,6 +39,7 @@ use lightning::events::EventHandler;
4039
use lightning::events::EventsProvider;
4140
use lightning::events::ReplayEvent;
4241
use lightning::events::{Event, PathFailure};
42+
use lightning::util::ser::Writeable;
4343

4444
use lightning::ln::channelmanager::AChannelManager;
4545
use lightning::ln::msgs::OnionMessageHandler;
@@ -55,11 +55,11 @@ use lightning::sign::EntropySource;
5555
use lightning::sign::OutputSpender;
5656
use lightning::util::logger::Logger;
5757
use lightning::util::persist::{
58-
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
59-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
60-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
61-
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
62-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
58+
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
59+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
60+
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
61+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
62+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
6363
};
6464
use lightning::util::sweep::OutputSweeper;
6565
#[cfg(feature = "std")]
@@ -331,6 +331,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
331331
true
332332
}
333333

334+
macro_rules! maybe_await {
335+
(true, $e:expr) => {
336+
$e.await
337+
};
338+
(false, $e:expr) => {
339+
$e
340+
};
341+
}
342+
334343
macro_rules! define_run_body {
335344
(
336345
$kv_store: ident,
@@ -340,7 +349,7 @@ macro_rules! define_run_body {
340349
$peer_manager: ident, $gossip_sync: ident,
341350
$process_sweeper: expr,
342351
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
343-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr,
352+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr, $async_persist: tt,
344353
) => { {
345354
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
346355
$channel_manager.get_cm().timer_tick_occurred();
@@ -412,12 +421,12 @@ macro_rules! define_run_body {
412421

413422
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
414423
log_trace!($logger, "Persisting ChannelManager...");
415-
$kv_store.write(
424+
maybe_await!($async_persist, $kv_store.write(
416425
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
417426
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
418427
CHANNEL_MANAGER_PERSISTENCE_KEY,
419428
&$channel_manager.get_cm().encode(),
420-
)?;
429+
))?;
421430
log_trace!($logger, "Done persisting ChannelManager.");
422431
}
423432
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -478,12 +487,12 @@ macro_rules! define_run_body {
478487
log_trace!($logger, "Persisting network graph.");
479488
}
480489

481-
if let Err(e) = $kv_store.write(
490+
if let Err(e) = maybe_await!($async_persist, $kv_store.write(
482491
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
483492
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
484493
NETWORK_GRAPH_PERSISTENCE_KEY,
485494
&network_graph.encode(),
486-
) {
495+
)) {
487496
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
488497
}
489498

@@ -511,12 +520,12 @@ macro_rules! define_run_body {
511520
} else {
512521
log_trace!($logger, "Persisting scorer");
513522
}
514-
if let Err(e) = $kv_store.write(
523+
if let Err(e) = maybe_await!($async_persist, $kv_store.write(
515524
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
516525
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
517526
SCORER_PERSISTENCE_KEY,
518527
&scorer.encode(),
519-
) {
528+
)) {
520529
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
521530
}
522531
}
@@ -539,31 +548,31 @@ macro_rules! define_run_body {
539548
// After we exit, ensure we persist the ChannelManager one final time - this avoids
540549
// some races where users quit while channel updates were in-flight, with
541550
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
542-
$kv_store.write(
551+
maybe_await!($async_persist, $kv_store.write(
543552
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
544553
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
545554
CHANNEL_MANAGER_PERSISTENCE_KEY,
546555
&$channel_manager.get_cm().encode(),
547-
)?;
556+
))?;
548557

549558
// Persist Scorer on exit
550559
if let Some(ref scorer) = $scorer {
551-
$kv_store.write(
560+
maybe_await!($async_persist, $kv_store.write(
552561
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
553562
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
554563
SCORER_PERSISTENCE_KEY,
555564
&scorer.encode(),
556-
)?;
565+
))?;
557566
}
558567

559568
// Persist NetworkGraph on exit
560569
if let Some(network_graph) = $gossip_sync.network_graph() {
561-
$kv_store.write(
570+
maybe_await!($async_persist, $kv_store.write(
562571
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
563572
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
564573
NETWORK_GRAPH_PERSISTENCE_KEY,
565574
&network_graph.encode(),
566-
)?;
575+
))?;
567576
}
568577

569578
Ok(())
@@ -720,11 +729,12 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
720729
/// ```
721730
/// # use lightning::io;
722731
/// # use lightning::events::ReplayEvent;
723-
/// # use lightning::util::sweep::OutputSweeper;
724732
/// # use std::sync::{Arc, RwLock};
725733
/// # use std::sync::atomic::{AtomicBool, Ordering};
726734
/// # use std::time::SystemTime;
727735
/// # use lightning_background_processor::{process_events_async, GossipSync};
736+
/// # use core::future::Future;
737+
/// # use core::pin::Pin;
728738
/// # struct Logger {}
729739
/// # impl lightning::util::logger::Logger for Logger {
730740
/// # fn log(&self, _record: lightning::util::logger::Record) {}
@@ -736,6 +746,13 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
736746
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
737747
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
738748
/// # }
749+
/// # struct Store {}
750+
/// # impl lightning::util::persist::KVStore for Store {
751+
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
752+
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
753+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
754+
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
755+
/// # }
739756
/// # struct EventHandler {}
740757
/// # impl EventHandler {
741758
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -754,7 +771,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
754771
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
755772
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
756773
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
757-
/// #
774+
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
775+
///
758776
/// # struct Node<
759777
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
760778
/// # F: lightning::chain::Filter + Send + Sync + 'static,
@@ -770,10 +788,10 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
770788
/// # liquidity_manager: Arc<LiquidityManager<B, F, FE>>,
771789
/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
772790
/// # gossip_sync: Arc<P2PGossipSync<UL>>,
773-
/// # persister: Arc<StoreSync>,
791+
/// # persister: Arc<Store>,
774792
/// # logger: Arc<Logger>,
775793
/// # scorer: Arc<Scorer>,
776-
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>>,
794+
/// # sweeper: Arc<OutputSweeper<B, D, FE, F, O>>,
777795
/// # }
778796
/// #
779797
/// # async fn setup_background_processing<
@@ -895,7 +913,7 @@ where
895913
LM::Target: ALiquidityManager,
896914
O::Target: 'static + OutputSpender,
897915
D::Target: 'static + ChangeDestinationSource,
898-
K::Target: 'static + KVStoreSync,
916+
K::Target: 'static + KVStore,
899917
{
900918
let mut should_break = false;
901919
let async_event_handler = |event| {
@@ -914,12 +932,15 @@ where
914932
if let Some(duration_since_epoch) = fetch_time() {
915933
if update_scorer(scorer, &event, duration_since_epoch) {
916934
log_trace!(logger, "Persisting scorer after update");
917-
if let Err(e) = kv_store.write(
918-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
919-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
920-
SCORER_PERSISTENCE_KEY,
921-
&scorer.encode(),
922-
) {
935+
if let Err(e) = kv_store
936+
.write(
937+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
938+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
939+
SCORER_PERSISTENCE_KEY,
940+
&scorer.encode(),
941+
)
942+
.await
943+
{
923944
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
924945
// We opt not to abort early on persistence failure here as persisting
925946
// the scorer is non-critical and we still hope that it will have
@@ -1007,7 +1028,82 @@ where
10071028
mobile_interruptable_platform,
10081029
fetch_time,
10091030
batch_delay,
1031+
true,
1032+
)
1033+
}
1034+
1035+
/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
1036+
/// synchronous background persistence.
1037+
pub async fn process_events_async_with_kv_store_sync<
1038+
UL: 'static + Deref,
1039+
CF: 'static + Deref,
1040+
T: 'static + Deref,
1041+
F: 'static + Deref,
1042+
G: 'static + Deref<Target = NetworkGraph<L>>,
1043+
L: 'static + Deref + Send + Sync,
1044+
P: 'static + Deref,
1045+
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
1046+
EventHandler: Fn(Event) -> EventHandlerFuture,
1047+
ES: 'static + Deref + Send,
1048+
M: 'static
1049+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
1050+
+ Send
1051+
+ Sync,
1052+
CM: 'static + Deref + Send + Sync,
1053+
OM: 'static + Deref,
1054+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>>,
1055+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
1056+
PM: 'static + Deref,
1057+
LM: 'static + Deref,
1058+
D: 'static + Deref,
1059+
O: 'static + Deref,
1060+
K: 'static + Deref,
1061+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>,
1062+
S: 'static + Deref<Target = SC> + Send + Sync,
1063+
SC: for<'b> WriteableScore<'b>,
1064+
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1065+
Sleeper: Fn(Duration) -> SleepFuture,
1066+
FetchTime: Fn() -> Option<Duration>,
1067+
>(
1068+
kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1069+
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1070+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1071+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1072+
) -> Result<(), lightning::io::Error>
1073+
where
1074+
UL::Target: 'static + UtxoLookup,
1075+
CF::Target: 'static + chain::Filter,
1076+
T::Target: 'static + BroadcasterInterface,
1077+
F::Target: 'static + FeeEstimator,
1078+
L::Target: 'static + Logger,
1079+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1080+
ES::Target: 'static + EntropySource,
1081+
CM::Target: AChannelManager,
1082+
OM::Target: AOnionMessenger,
1083+
PM::Target: APeerManager,
1084+
LM::Target: ALiquidityManager,
1085+
O::Target: 'static + OutputSpender,
1086+
D::Target: 'static + ChangeDestinationSource,
1087+
K::Target: 'static + KVStoreSync,
1088+
{
1089+
let kv_store = KVStoreSyncWrapper(kv_store);
1090+
process_events_async(
1091+
kv_store,
1092+
event_handler,
1093+
chain_monitor,
1094+
channel_manager,
1095+
onion_messenger,
1096+
gossip_sync,
1097+
peer_manager,
1098+
liquidity_manager,
1099+
sweeper,
1100+
logger,
1101+
scorer,
1102+
sleeper,
1103+
mobile_interruptable_platform,
1104+
fetch_time,
10101105
)
1106+
.await
10111107
}
10121108

10131109
#[cfg(feature = "std")]
@@ -1195,6 +1291,7 @@ impl BackgroundProcessor {
11951291
)
11961292
},
11971293
batch_delay,
1294+
false,
11981295
)
11991296
});
12001297
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -1283,7 +1380,7 @@ mod tests {
12831380
use lightning::types::payment::PaymentHash;
12841381
use lightning::util::config::UserConfig;
12851382
use lightning::util::persist::{
1286-
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
1383+
KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
12871384
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
12881385
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
12891386
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -2209,12 +2306,13 @@ mod tests {
22092306
open_channel!(nodes[0], nodes[1], 100000);
22102307

22112308
let data_dir = nodes[0].kv_store.get_data_dir();
2212-
let persister = Arc::new(
2309+
let kv_store_sync = Arc::new(
22132310
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
22142311
);
2312+
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
22152313

22162314
let bp_future = super::process_events_async(
2217-
persister,
2315+
kv_store,
22182316
|_: _| async { Ok(()) },
22192317
Arc::clone(&nodes[0].chain_monitor),
22202318
Arc::clone(&nodes[0].node),
@@ -2717,11 +2815,13 @@ mod tests {
27172815
let (_, nodes) =
27182816
create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
27192817
let data_dir = nodes[0].kv_store.get_data_dir();
2720-
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
2818+
let kv_store_sync =
2819+
Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
2820+
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
27212821

27222822
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
27232823
let bp_future = super::process_events_async(
2724-
persister,
2824+
kv_store,
27252825
|_: _| async { Ok(()) },
27262826
Arc::clone(&nodes[0].chain_monitor),
27272827
Arc::clone(&nodes[0].node),
@@ -2930,12 +3030,13 @@ mod tests {
29303030

29313031
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
29323032
let data_dir = nodes[0].kv_store.get_data_dir();
2933-
let persister = Arc::new(Persister::new(data_dir));
3033+
let kv_store_sync = Arc::new(Persister::new(data_dir));
3034+
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
29343035

29353036
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
29363037

29373038
let bp_future = super::process_events_async(
2938-
persister,
3039+
kv_store,
29393040
event_handler,
29403041
Arc::clone(&nodes[0].chain_monitor),
29413042
Arc::clone(&nodes[0].node),

0 commit comments

Comments
 (0)