Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 1 addition & 75 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use lightning::sign::EntropySource;
use lightning::sign::OutputSpender;
use lightning::util::logger::Logger;
use lightning::util::persist::{
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
KVStore, KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
Expand Down Expand Up @@ -1187,80 +1187,6 @@ fn check_and_reset_sleeper<
}
}

/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
/// synchronous background persistence.
pub async fn process_events_async_with_kv_store_sync<
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not quite sure I understand why we want to drop this? ldk-node might not use it, but I imagine some others might? Its the equivalent of our previous async BP loop and keeping it makes upgrades easier for those who might not want to switch to partial-async-kvstore immediately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's a very odd middleground API that was introduced when the KVStoreSyncWrapper wasn't public. I fear users might find it confusing, and just using KVStoreSyncWrapper when needed seems way more consistent (as they'd already need to do that for some of the other types they'd hand into that method anyways).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No comment. It's so little code that I think it's fine either way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should prefer users not use KVStoreSyncWrapper directly. The docs even explicitly say "It is not necessary to use this type directly." (and I feel like we should #[doc(hidden)] it?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, but at least in LDK Node using KVStoreSyncWrapper was unavoidable. I can drop the drop commit if you insist, but IMO it's a pretty awkward confusing API.

UL: 'static + Deref,
CF: 'static + Deref,
T: 'static + Deref,
F: 'static + Deref,
G: 'static + Deref<Target = NetworkGraph<L>>,
L: 'static + Deref + Send + Sync,
P: 'static + Deref,
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
ES: 'static + Deref + Send,
M: 'static
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
+ Send
+ Sync,
CM: 'static + Deref + Send + Sync,
OM: 'static + Deref,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>>,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
PM: 'static + Deref,
LM: 'static + Deref,
D: 'static + Deref,
O: 'static + Deref,
K: 'static + Deref,
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: for<'b> WriteableScore<'b>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Sleeper: Fn(Duration) -> SleepFuture,
FetchTime: Fn() -> Option<Duration>,
>(
kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
T::Target: 'static + BroadcasterInterface,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
ES::Target: 'static + EntropySource,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
LM::Target: ALiquidityManager,
O::Target: 'static + OutputSpender,
D::Target: 'static + ChangeDestinationSource,
K::Target: 'static + KVStoreSync,
{
let kv_store = KVStoreSyncWrapper(kv_store);
process_events_async(
kv_store,
event_handler,
chain_monitor,
channel_manager,
onion_messenger,
gossip_sync,
peer_manager,
liquidity_manager,
sweeper,
logger,
scorer,
sleeper,
mobile_interruptable_platform,
fetch_time,
)
.await
}

#[cfg(feature = "std")]
impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
Expand Down
62 changes: 29 additions & 33 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1257,14 +1257,13 @@ mod tests {

let monitor_name = mon.persistence_key();
assert_eq!(
persister_0
.kv_store
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string()
)
.unwrap()
.len() as u64,
KVStoreSync::list(
&*persister_0.kv_store,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string()
)
.unwrap()
.len() as u64,
mon.get_latest_update_id() % persister_0_max_pending_updates,
"Wrong number of updates stored in persister 0",
);
Expand All @@ -1276,14 +1275,13 @@ mod tests {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
let monitor_name = mon.persistence_key();
assert_eq!(
persister_1
.kv_store
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string()
)
.unwrap()
.len() as u64,
KVStoreSync::list(
&*persister_1.kv_store,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string()
)
.unwrap()
.len() as u64,
mon.get_latest_update_id() % persister_1_max_pending_updates,
"Wrong number of updates stored in persister 1",
);
Expand Down Expand Up @@ -1481,28 +1479,26 @@ mod tests {
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
let (_, monitor) = &persisted_chan_data[0];
let monitor_name = monitor.persistence_key();
persister_0
.kv_store
.write(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
UpdateName::from(1).as_str(),
vec![0u8; 1],
)
.unwrap();
KVStoreSync::write(
&*persister_0.kv_store,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
UpdateName::from(1).as_str(),
vec![0u8; 1],
)
.unwrap();

// Do the stale update cleanup
persister_0.cleanup_stale_updates(false).unwrap();

// Confirm the stale update is unreadable/gone
assert!(persister_0
.kv_store
.read(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
UpdateName::from(1).as_str()
)
.is_err());
assert!(KVStoreSync::read(
&*persister_0.kv_store,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
UpdateName::from(1).as_str()
)
.is_err());
}

fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool
Expand Down
100 changes: 93 additions & 7 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::util::dyn_signer::{
use crate::util::logger::{Logger, Record};
#[cfg(feature = "std")]
use crate::util::mut_global::MutGlobal;
use crate::util::persist::{KVStoreSync, MonitorName};
use crate::util::persist::{KVStore, KVStoreSync, MonitorName};
use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer};
use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner};

Expand All @@ -84,7 +84,10 @@ use crate::io;
use crate::prelude::*;
use crate::sign::{EntropySource, NodeSigner, RandomBytes, Recipient, SignerProvider};
use crate::sync::{Arc, Mutex};
use alloc::boxed::Box;
use core::future::Future;
use core::mem;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use core::time::Duration;

Expand Down Expand Up @@ -863,10 +866,8 @@ impl TestStore {
let persisted_bytes = Mutex::new(new_hash_map());
Self { persisted_bytes, read_only }
}
}

impl KVStoreSync for TestStore {
fn read(
fn read_internal(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> io::Result<Vec<u8>> {
let persisted_lock = self.persisted_bytes.lock().unwrap();
Expand All @@ -888,7 +889,7 @@ impl KVStoreSync for TestStore {
}
}

fn write(
fn write_internal(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> io::Result<()> {
if self.read_only {
Expand All @@ -911,7 +912,7 @@ impl KVStoreSync for TestStore {
Ok(())
}

fn remove(
fn remove_internal(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
) -> io::Result<()> {
if self.read_only {
Expand All @@ -935,7 +936,9 @@ impl KVStoreSync for TestStore {
Ok(())
}

fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
fn list_internal(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> io::Result<Vec<String>> {
let mut persisted_lock = self.persisted_bytes.lock().unwrap();

let prefixed = if secondary_namespace.is_empty() {
Expand All @@ -950,6 +953,89 @@ impl KVStoreSync for TestStore {
}
}

impl KVStore for TestStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> {
let res = self.read_internal(&primary_namespace, &secondary_namespace, &key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmmm, would be nice to be able to race reads and writes rather than always immediately completing.....

Copy link
Contributor Author

@tnull tnull Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I agree it would be nice to eventually extend this for improved test coverage. But see above: given it's not actually used anywhere it's a bit out-of-scope here.

Box::pin(async move { TestStoreFuture::new(res).await })
}
fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
let res = self.write_internal(&primary_namespace, &secondary_namespace, &key, buf);
Box::pin(async move { TestStoreFuture::new(res).await })
}
fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
let res = self.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy);
Box::pin(async move { TestStoreFuture::new(res).await })
}
fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> {
let res = self.list_internal(primary_namespace, secondary_namespace);
Box::pin(async move { TestStoreFuture::new(res).await })
}
}

impl KVStoreSync for TestStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> io::Result<Vec<u8>> {
self.read_internal(primary_namespace, secondary_namespace, key)
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> io::Result<()> {
self.write_internal(primary_namespace, secondary_namespace, key, buf)
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> io::Result<()> {
self.remove_internal(primary_namespace, secondary_namespace, key, lazy)
}

fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
self.list_internal(primary_namespace, secondary_namespace)
}
}

// A `Future` that returns the result only on the second poll.
pub(crate) struct TestStoreFuture<R> {
inner: Mutex<(Option<core::task::Waker>, Option<io::Result<R>>)>,
}

impl<R> TestStoreFuture<R> {
fn new(res: io::Result<R>) -> Self {
let inner = Mutex::new((None, Some(res)));
Self { inner }
}
}

impl<R> Future for TestStoreFuture<R> {
type Output = Result<R, io::Error>;
fn poll(
self: Pin<&mut Self>, cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
let mut inner_lock = self.inner.lock().unwrap();
let first_poll = inner_lock.0.is_none();
if first_poll {
(*inner_lock).0 = Some(cx.waker().clone());
core::task::Poll::Pending
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH hmm this is more complicated than I thought. Technically this is a malformed future - after we switch to Ready we're supposed to call wake on the context. In our use it doesn't matter but...ugh. I do kinda wonder if we won't eventually want to be able to test having control over when async writes complete, it might well be useful even in testing of #4063. If its too much here then we can just skip it.

Copy link
Contributor Author

@tnull tnull Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH hmm this is more complicated than I thought. Technically this is a malformed future - after we switch to Ready we're supposed to call wake on the context.

Ah, good point. Now added a fixup that wakes after dropping the lock.

I do kinda wonder if we won't eventually want to be able to test having control over when async writes complete, it might well be useful even in testing of #4063. If its too much here then we can just skip it.

Yeah, tbh. this PR is mostly to make lightningdevkit/ldk-node#633 compile, and not even there we'd actually currently use the async TestStore, the implementation is just needed to fulfill the trait bounds currently. So while I agree it would be nice to upgrade TestStore and use it to write better async-KVStore tests, it's a bit out-of-scope for this PR right now. Happy to pick it up some time in a follow-up though!

} else {
let waker = inner_lock.0.take().expect("We should never poll more than twice");
let res = inner_lock.1.take().expect("We should never poll more than twice");
drop(inner_lock);
waker.wake();
core::task::Poll::Ready(res)
}
}
}

unsafe impl Sync for TestStore {}
unsafe impl Send for TestStore {}

Expand Down
Loading