Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions fuzz/src/lsps_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use lightning::util::test_utils::{
};

use lightning_liquidity::lsps0::ser::LSPS_MESSAGE_TYPE_ID;
use lightning_liquidity::LiquidityManager;
use lightning_liquidity::LiquidityManagerSync;

use core::time::Duration;

Expand Down Expand Up @@ -77,15 +77,16 @@ pub fn do_test(data: &[u8]) {
genesis_block.header.time,
));

let liquidity_manager = Arc::new(LiquidityManager::new(
let liquidity_manager = Arc::new(LiquidityManagerSync::new(
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&manager),
None::<Arc<dyn Filter + Send + Sync>>,
None,
kv_store,
None,
None,
));
).unwrap());
let mut reader = data;
if let Ok(Some(msg)) = liquidity_manager.read(LSPS_MESSAGE_TYPE_ID, &mut reader) {
let secp = Secp256k1::signing_only();
Expand Down
1 change: 1 addition & 0 deletions lightning-background-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ possiblyrandom = { version = "0.2", path = "../possiblyrandom", default-features
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }
lightning = { version = "0.2.0", path = "../lightning", features = ["_test_utils"] }
lightning-invoice = { version = "0.34.0", path = "../lightning-invoice" }
lightning-liquidity = { version = "0.2.0", path = "../lightning-liquidity", default-features = false, features = ["_test_utils"] }
lightning-persister = { version = "0.2.0", path = "../lightning-persister" }

[lints]
Expand Down
39 changes: 23 additions & 16 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ use lightning::util::wakers::Sleeper;
use lightning_rapid_gossip_sync::RapidGossipSync;

use lightning_liquidity::ALiquidityManager;
#[cfg(feature = "std")]
use lightning_liquidity::ALiquidityManagerSync;

use core::ops::Deref;
use core::time::Duration;
Expand Down Expand Up @@ -631,7 +633,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>, Arc<DefaultTimeProvider>>;
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>, Arc<Store>, Arc<DefaultTimeProvider>>;
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
Expand Down Expand Up @@ -1350,7 +1352,7 @@ impl BackgroundProcessor {
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
LM::Target: ALiquidityManager,
LM::Target: ALiquidityManagerSync,
D::Target: ChangeDestinationSourceSync,
O::Target: 'static + OutputSpender,
K::Target: 'static + KVStoreSync,
Expand Down Expand Up @@ -1693,7 +1695,7 @@ mod tests {
use lightning::util::test_utils;
use lightning::{get_event, get_event_msg};
use lightning_liquidity::utils::time::DefaultTimeProvider;
use lightning_liquidity::LiquidityManager;
use lightning_liquidity::{ALiquidityManagerSync, LiquidityManagerSync};
use lightning_persister::fs_store::FilesystemStore;
use lightning_rapid_gossip_sync::RapidGossipSync;
use std::collections::VecDeque;
Expand Down Expand Up @@ -1790,11 +1792,12 @@ mod tests {
IgnoringMessageHandler,
>;

type LM = LiquidityManager<
type LM = LiquidityManagerSync<
Arc<KeysManager>,
Arc<KeysManager>,
Arc<ChannelManager>,
Arc<dyn Filter + Sync + Send>,
Arc<Persister>,
Arc<DefaultTimeProvider>,
>;

Expand Down Expand Up @@ -2242,15 +2245,19 @@ mod tests {
Arc::clone(&logger),
Arc::clone(&keys_manager),
));
let liquidity_manager = Arc::new(LiquidityManager::new(
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&manager),
None,
None,
None,
None,
));
let liquidity_manager = Arc::new(
LiquidityManagerSync::new(
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&manager),
None,
None,
Arc::clone(&kv_store),
None,
None,
)
.unwrap(),
);
let node = Node {
node: manager,
p2p_gossip_sync,
Expand Down Expand Up @@ -2627,7 +2634,7 @@ mod tests {
Some(Arc::clone(&nodes[0].messenger)),
nodes[0].rapid_gossip_sync(),
Arc::clone(&nodes[0].peer_manager),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].liquidity_manager.get_lm_async()),
Some(nodes[0].sweeper.sweeper_async()),
Arc::clone(&nodes[0].logger),
Some(Arc::clone(&nodes[0].scorer)),
Expand Down Expand Up @@ -3136,7 +3143,7 @@ mod tests {
Some(Arc::clone(&nodes[0].messenger)),
nodes[0].rapid_gossip_sync(),
Arc::clone(&nodes[0].peer_manager),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].liquidity_manager.get_lm_async()),
Some(nodes[0].sweeper.sweeper_async()),
Arc::clone(&nodes[0].logger),
Some(Arc::clone(&nodes[0].scorer)),
Expand Down Expand Up @@ -3351,7 +3358,7 @@ mod tests {
Some(Arc::clone(&nodes[0].messenger)),
nodes[0].no_gossip_sync(),
Arc::clone(&nodes[0].peer_manager),
Some(Arc::clone(&nodes[0].liquidity_manager)),
Some(nodes[0].liquidity_manager.get_lm_async()),
Some(nodes[0].sweeper.sweeper_async()),
Arc::clone(&nodes[0].logger),
Some(Arc::clone(&nodes[0].scorer)),
Expand Down
2 changes: 2 additions & 0 deletions lightning-liquidity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ default = ["std", "time"]
std = ["lightning/std"]
time = ["std"]
backtrace = ["dep:backtrace"]
_test_utils = []

[dependencies]
lightning = { version = "0.2.0", path = "../lightning", default-features = false }
lightning-types = { version = "0.3.0", path = "../lightning-types", default-features = false }
lightning-invoice = { version = "0.34.0", path = "../lightning-invoice", default-features = false, features = ["serde"] }
lightning-macros = { version = "0.2", path = "../lightning-macros" }

bitcoin = { version = "0.32.2", default-features = false, features = ["serde"] }

Expand Down
94 changes: 84 additions & 10 deletions lightning-liquidity/src/events/event_queue.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,51 @@
use super::LiquidityEvent;

use crate::persist::{
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY,
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
};
use crate::sync::{Arc, Mutex};

use alloc::boxed::Box;
use alloc::collections::VecDeque;
use alloc::vec::Vec;

use core::future::Future;
use core::ops::Deref;
use core::pin::Pin;
use core::task::{Poll, Waker};

use lightning::util::persist::KVStore;
use lightning::util::ser::{CollectionLength, MaybeReadable, Readable, Writeable, Writer};

/// The maximum queue size we allow before starting to drop events.
pub const MAX_EVENT_QUEUE_SIZE: usize = 1000;

pub(crate) struct EventQueue {
pub(crate) struct EventQueue<K: Deref + Clone>
where
K::Target: KVStore,
{
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
waker: Arc<Mutex<Option<Waker>>>,
#[cfg(feature = "std")]
condvar: Arc<crate::sync::Condvar>,
kv_store: K,
}

impl EventQueue {
pub fn new() -> Self {
let queue = Arc::new(Mutex::new(VecDeque::new()));
impl<K: Deref + Clone> EventQueue<K>
where
K::Target: KVStore,
{
pub fn new(queue: VecDeque<LiquidityEvent>, kv_store: K) -> Self {
let queue = Arc::new(Mutex::new(queue));
let waker = Arc::new(Mutex::new(None));
Self {
queue,
waker,
#[cfg(feature = "std")]
condvar: Arc::new(crate::sync::Condvar::new()),
kv_store,
}
}

Expand Down Expand Up @@ -67,16 +87,35 @@ impl EventQueue {
}

// Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
pub fn notifier(&self) -> EventQueueNotifierGuard<'_> {
pub fn notifier(&self) -> EventQueueNotifierGuard<'_, K> {
EventQueueNotifierGuard(self)
}

pub fn persist(
&self,
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + Send>> {
let queue = self.queue.lock().unwrap();
let encoded = EventQueueSerWrapper(&queue).encode();

self.kv_store.write(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY,
encoded,
)
}
}

// A guard type that will notify about new events when dropped.
#[must_use]
pub(crate) struct EventQueueNotifierGuard<'a>(&'a EventQueue);

impl<'a> EventQueueNotifierGuard<'a> {
pub(crate) struct EventQueueNotifierGuard<'a, K: Deref + Clone>(&'a EventQueue<K>)
where
K::Target: KVStore;

impl<'a, K: Deref + Clone> EventQueueNotifierGuard<'a, K>
where
K::Target: KVStore,
{
pub fn enqueue<E: Into<LiquidityEvent>>(&self, event: E) {
let mut queue = self.0.queue.lock().unwrap();
if queue.len() < MAX_EVENT_QUEUE_SIZE {
Expand All @@ -87,7 +126,10 @@ impl<'a> EventQueueNotifierGuard<'a> {
}
}

impl<'a> Drop for EventQueueNotifierGuard<'a> {
impl<'a, K: Deref + Clone> Drop for EventQueueNotifierGuard<'a, K>
where
K::Target: KVStore,
{
fn drop(&mut self) {
let should_notify = !self.0.queue.lock().unwrap().is_empty();

Expand Down Expand Up @@ -122,6 +164,35 @@ impl Future for EventFuture {
}
}

pub(crate) struct EventQueueDeserWrapper(pub VecDeque<LiquidityEvent>);

impl Readable for EventQueueDeserWrapper {
fn read<R: lightning::io::Read>(
reader: &mut R,
) -> Result<Self, lightning::ln::msgs::DecodeError> {
let len: CollectionLength = Readable::read(reader)?;
let mut queue = VecDeque::with_capacity(len.0 as usize);
for _ in 0..len.0 {
if let Some(event) = MaybeReadable::read(reader)? {
queue.push_back(event);
}
}
Ok(Self(queue))
}
}

struct EventQueueSerWrapper<'a>(&'a VecDeque<LiquidityEvent>);

impl Writeable for EventQueueSerWrapper<'_> {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
CollectionLength(self.0.len() as u64).write(writer)?;
for e in self.0.iter() {
e.write(writer)?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
#[tokio::test]
Expand All @@ -131,10 +202,13 @@ mod tests {
use crate::lsps0::event::LSPS0ClientEvent;
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
use core::sync::atomic::{AtomicU16, Ordering};
use lightning::util::persist::KVStoreSyncWrapper;
use lightning::util::test_utils::TestStore;
use std::sync::Arc;
use std::time::Duration;

let event_queue = Arc::new(EventQueue::new());
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
assert_eq!(event_queue.next_event(), None);

let secp_ctx = Secp256k1::new();
Expand Down
58 changes: 57 additions & 1 deletion lightning-liquidity/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@

mod event_queue;

pub(crate) use event_queue::EventQueue;
pub use event_queue::MAX_EVENT_QUEUE_SIZE;
pub(crate) use event_queue::{EventQueue, EventQueueDeserWrapper};

use crate::lsps0;
use crate::lsps1;
use crate::lsps2;
use crate::lsps5;

use lightning::io;
use lightning::ln::msgs::DecodeError;
use lightning::util::ser::{
BigSize, FixedLengthReader, MaybeReadable, Readable, Writeable, Writer,
};

/// An event which you should probably take some action in response to.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LiquidityEvent {
Expand Down Expand Up @@ -87,3 +93,53 @@ impl From<lsps5::event::LSPS5ServiceEvent> for LiquidityEvent {
Self::LSPS5Service(event)
}
}

impl Writeable for LiquidityEvent {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
match self {
Self::LSPS0Client(_) => {},
Self::LSPS1Client(_) => {},
#[cfg(lsps1_service)]
Self::LSPS1Service(_) => {},
Self::LSPS2Client(_) => {},
Self::LSPS2Service(event) => {
0u8.write(writer)?;
event.write(writer)?;
},
Self::LSPS5Client(_) => {},
Self::LSPS5Service(event) => {
2u8.write(writer)?;
event.write(writer)?;
},
}
Ok(())
}
}

impl MaybeReadable for LiquidityEvent {
fn read<R: io::Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
match Readable::read(reader)? {
0u8 => {
let event = Readable::read(reader)?;
Ok(Some(LiquidityEvent::LSPS2Service(event)))
},
2u8 => {
let event = Readable::read(reader)?;
Ok(Some(LiquidityEvent::LSPS5Service(event)))
},
x if x % 2 == 1 => {
// If the event is of unknown type, assume it was written with `write_tlv_fields`,
// which prefixes the whole thing with a length BigSize. Because the event is
// odd-type unknown, we should treat it as `Ok(None)` even if it has some TLV
// fields that are even. Thus, we avoid using `read_tlv_fields` and simply read
// exactly the number of bytes specified, ignoring them entirely.
let tlv_len: BigSize = Readable::read(reader)?;
FixedLengthReader::new(reader, tlv_len.0)
.eat_remaining()
.map_err(|_| DecodeError::ShortRead)?;
Ok(None)
},
_ => Err(DecodeError::InvalidValue),
}
}
}
Loading
Loading