Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the UtxoSource interface for REST/RPC clients #2248

Merged
merged 5 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions lightning-block-sync/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ use serde_json;
use std::convert::From;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::str::FromStr;
use bitcoin::hashes::Hash;

impl TryInto<serde_json::Value> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> Result<serde_json::Value, std::io::Error> { Ok(self.0) }
}

/// Conversion from `std::io::Error` into `BlockSourceError`.
impl From<std::io::Error> for BlockSourceError {
fn from(e: std::io::Error) -> BlockSourceError {
Expand All @@ -38,6 +44,17 @@ impl TryInto<Block> for BinaryResponse {
}
}

/// Parses binary data as a block hash.
impl TryInto<BlockHash> for BinaryResponse {
type Error = std::io::Error;

fn try_into(self) -> std::io::Result<BlockHash> {
BlockHash::from_slice(&self.0).map_err(|_|
std::io::Error::new(std::io::ErrorKind::InvalidData, "bad block hash length")
)
}
}

/// Converts a JSON value into block header data. The JSON value may be an object representing a
/// block header or an array of such objects. In the latter case, the first object is converted.
impl TryInto<BlockHeaderData> for JsonResponse {
Expand Down Expand Up @@ -226,6 +243,46 @@ impl TryInto<Transaction> for JsonResponse {
}
}

impl TryInto<BlockHash> for JsonResponse {
type Error = std::io::Error;

fn try_into(self) -> std::io::Result<BlockHash> {
match self.0.as_str() {
None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")),
Some(hex_data) if hex_data.len() != 64 =>
Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hash length")),
Some(hex_data) => BlockHash::from_str(hex_data)
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")),
}
}
}

/// The REST `getutxos` endpoint retuns a whole pile of data we don't care about and one bit we do
/// - whether the `hit bitmap` field had any entries. Thus we condense the result down into only
/// that.
pub(crate) struct GetUtxosResponse {
pub(crate) hit_bitmap_nonempty: bool
}

impl TryInto<GetUtxosResponse> for JsonResponse {
type Error = std::io::Error;

fn try_into(self) -> std::io::Result<GetUtxosResponse> {
let bitmap_str =
self.0.as_object().ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected an object"))?
.get("bitmap").ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "missing bitmap field"))?
.as_str().ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "bitmap should be an str"))?;
let mut hit_bitmap_nonempty = false;
for c in bitmap_str.chars() {
if c < '0' || c > '9' {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid byte"));
}
if c > '0' { hit_bitmap_nonempty = true; }
}
Ok(GetUtxosResponse { hit_bitmap_nonempty })
}
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
Expand Down
319 changes: 319 additions & 0 deletions lightning-block-sync/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
//! When fetching gossip from peers, lightning nodes need to validate that gossip against the
//! current UTXO set. This module defines an implementation of the LDK API required to do so
//! against a [`BlockSource`] which implements a few additional methods for accessing the UTXO set.

use crate::{AsyncBlockSourceResult, BlockData, BlockSource, BlockSourceError};

use bitcoin::blockdata::block::Block;
use bitcoin::blockdata::transaction::{TxOut, OutPoint};
use bitcoin::hash_types::BlockHash;

use lightning::sign::NodeSigner;

use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler};

use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError};

use lightning::util::logger::Logger;

use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::task::Poll;

/// A trait which extends [`BlockSource`] and can be queried to fetch the block at a given height
/// as well as whether a given output is unspent (i.e. a member of the current UTXO set).
///
/// Note that while this is implementable for a [`BlockSource`] which returns filtered block data
/// (i.e. [`BlockData::HeaderOnly`] for [`BlockSource::get_block`] requests), such an
/// implementation will reject all gossip as it is not fully able to verify the UTXOs referenced.
pub trait UtxoSource : BlockSource + 'static {
/// Fetches the block hash of the block at the given height.
///
/// This will, in turn, be passed to to [`BlockSource::get_block`] to fetch the block needed
/// for gossip validation.
fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash>;

/// Returns true if the given output has *not* been spent, i.e. is a member of the current UTXO
/// set.
fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool>;
}

/// A generic trait which is able to spawn futures in the background.
///
/// If the `tokio` feature is enabled, this is implemented on `TokioSpawner` struct which
/// delegates to `tokio::spawn()`.
pub trait FutureSpawner : Send + Sync + 'static {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking nit: Technically this does not spawn futures but tasks. :P

One day we may be able to replace that with an std trait: rust-lang/wg-async#283

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking nit: Technically this does not spawn futures but tasks. :P

bleh. if you think its gonna confuse anyone ill change it but i doubt it otherwise.

One day we may be able to replace that with an std trait: rust-lang/wg-async#283

Yea, rust async is such an MVP...sadly its gotten all hung up on deciding big hairy decisions so never really got past MVP :(

/// Spawns the given future as a background task.
///
/// This method MUST NOT block on the given future immediately.
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T);
}

#[cfg(feature = "tokio")]
/// A trivial [`FutureSpawner`] which delegates to `tokio::spawn`.
pub struct TokioSpawner;
#[cfg(feature = "tokio")]
impl FutureSpawner for TokioSpawner {
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
tokio::spawn(future);
}
}

/// A trivial future which joins two other futures and polls them at the same time, returning only
/// once both complete.
pub(crate) struct Joiner<
A: Future<Output=Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
B: Future<Output=Result<BlockHash, BlockSourceError>> + Unpin,
> {
pub a: A,
pub b: B,
a_res: Option<(BlockHash, Option<u32>)>,
b_res: Option<BlockHash>,
}

impl<
A: Future<Output=Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
B: Future<Output=Result<BlockHash, BlockSourceError>> + Unpin,
> Joiner<A, B> {
fn new(a: A, b: B) -> Self { Self { a, b, a_res: None, b_res: None } }
}

impl<
A: Future<Output=Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
B: Future<Output=Result<BlockHash, BlockSourceError>> + Unpin,
> Future for Joiner<A, B> {
type Output = Result<((BlockHash, Option<u32>), BlockHash), BlockSourceError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
if self.a_res.is_none() {
match Pin::new(&mut self.a).poll(ctx) {
Poll::Ready(res) => {
if let Ok(ok) = res {
self.a_res = Some(ok);
} else {
return Poll::Ready(Err(res.unwrap_err()));
}
},
Poll::Pending => {},
}
}
if self.b_res.is_none() {
match Pin::new(&mut self.b).poll(ctx) {
Poll::Ready(res) => {
if let Ok(ok) = res {
self.b_res = Some(ok);
} else {
return Poll::Ready(Err(res.unwrap_err()));
}

},
Poll::Pending => {},
}
}
if let Some(b_res) = self.b_res {
if let Some(a_res) = self.a_res {
return Poll::Ready(Ok((a_res, b_res)))
}
}
Poll::Pending
}
}

/// A struct which wraps a [`UtxoSource`] and a few LDK objects and implements the LDK
/// [`UtxoLookup`] trait.
///
/// Note that if you're using this against a Bitcoin Core REST or RPC server, you likely wish to
/// increase the `rpcworkqueue` setting in Bitcoin Core as LDK attempts to parallelize requests (a
/// value of 1024 should more than suffice), and ensure you have sufficient file descriptors
/// available on both Bitcoin Core and your LDK application for each request to hold its own
/// connection.
pub struct GossipVerifier<S: FutureSpawner,
Blocks: Deref + Send + Sync + 'static + Clone,
L: Deref + Send + Sync + 'static,
Descriptor: SocketDescriptor + Send + Sync + 'static,
CM: Deref + Send + Sync + 'static,
OM: Deref + Send + Sync + 'static,
CMH: Deref + Send + Sync + 'static,
NS: Deref + Send + Sync + 'static,
> where
Blocks::Target: UtxoSource,
L::Target: Logger,
CM::Target: ChannelMessageHandler,
OM::Target: OnionMessageHandler,
CMH::Target: CustomMessageHandler,
NS::Target: NodeSigner,
{
source: Blocks,
peer_manager: Arc<PeerManager<Descriptor, CM, Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, OM, L, CMH, NS>>,
gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>,
spawn: S,
block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>,
}

const BLOCK_CACHE_SIZE: usize = 5;

impl<S: FutureSpawner,
Blocks: Deref + Send + Sync + Clone,
L: Deref + Send + Sync,
Descriptor: SocketDescriptor + Send + Sync,
CM: Deref + Send + Sync,
OM: Deref + Send + Sync,
CMH: Deref + Send + Sync,
NS: Deref + Send + Sync,
> GossipVerifier<S, Blocks, L, Descriptor, CM, OM, CMH, NS> where
Blocks::Target: UtxoSource,
L::Target: Logger,
CM::Target: ChannelMessageHandler,
OM::Target: OnionMessageHandler,
CMH::Target: CustomMessageHandler,
NS::Target: NodeSigner,
{
/// Constructs a new [`GossipVerifier`].
///
/// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for
/// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`].
pub fn new(source: Blocks, spawn: S, gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, peer_manager: Arc<PeerManager<Descriptor, CM, Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, OM, L, CMH, NS>>) -> Self {
Self {
source, spawn, gossiper, peer_manager,
block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))),
}
}

async fn retrieve_utxo(
source: Blocks, block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>, short_channel_id: u64
) -> Result<TxOut, UtxoLookupError> {
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
let output_index = (short_channel_id & 0xffff) as u16;

let (outpoint, output);

'tx_found: loop { // Used as a simple goto
macro_rules! process_block {
($block: expr) => { {
if transaction_index as usize >= $block.txdata.len() {
return Err(UtxoLookupError::UnknownTx);
}
let transaction = &$block.txdata[transaction_index as usize];
if output_index as usize >= transaction.output.len() {
return Err(UtxoLookupError::UnknownTx);
}

outpoint = OutPoint::new(transaction.txid(), output_index.into());
output = transaction.output[output_index as usize].clone();
} }
}
{
let recent_blocks = block_cache.lock().unwrap();
for (height, block) in recent_blocks.iter() {
if *height == block_height {
process_block!(block);
break 'tx_found;
}
}
Comment on lines +211 to +217
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making a note that since we don't move the block to the end of the queue here, we may pop off a block even though it's been used recently - probably not a big deal, but just something that popped up

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think its fine. Making it LRU is possible, but we'd have to shift the array, which isn't bad but is kinda annoying. Also, I don't think the behavior of "LRFirstUsed" is all that bad, either, really.

}

let ((_, tip_height_opt), block_hash) =
Joiner::new(source.get_best_block(), source.get_block_hash_by_height(block_height))
.await
.map_err(|_| UtxoLookupError::UnknownTx)?;
if let Some(tip_height) = tip_height_opt {
// If the block doesn't yet have five confirmations, error out.
//
// The BOLT spec requires nodes wait for six confirmations before announcing a
// channel, and we give them one block of headroom in case we're delayed seeing a
// block.
if block_height + 5 > tip_height {
return Err(UtxoLookupError::UnknownTx);
}
}
let block_data = source.get_block(&block_hash).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could end up fetching the block multiple times here since we're not holding the lock, but probably not a big deal since we're already ok with the bandwidth costs associated with P2P sync and validation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, and we can't hold the lock across an await point. It does suck, I agree, but storing and poll'ing other futures sounded hard 🤷

.map_err(|_| UtxoLookupError::UnknownTx)?;
let block = match block_data {
BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx),
BlockData::FullBlock(block) => block,
};
process_block!(block);
{
let mut recent_blocks = block_cache.lock().unwrap();
let mut insert = true;
for (height, _) in recent_blocks.iter() {
if *height == block_height {
insert = false;
}
}
if insert {
if recent_blocks.len() >= BLOCK_CACHE_SIZE {
recent_blocks.pop_front();
}
recent_blocks.push_back((block_height, block));
}
}
break 'tx_found;
};
let outpoint_unspent =
source.is_output_unspent(outpoint).await.map_err(|_| UtxoLookupError::UnknownTx)?;
if outpoint_unspent {
Ok(output)
} else {
Err(UtxoLookupError::UnknownTx)
}
}
}

impl<S: FutureSpawner,
Blocks: Deref + Send + Sync + Clone,
L: Deref + Send + Sync,
Descriptor: SocketDescriptor + Send + Sync,
CM: Deref + Send + Sync,
OM: Deref + Send + Sync,
CMH: Deref + Send + Sync,
NS: Deref + Send + Sync,
> Deref for GossipVerifier<S, Blocks, L, Descriptor, CM, OM, CMH, NS> where
Blocks::Target: UtxoSource,
L::Target: Logger,
CM::Target: ChannelMessageHandler,
OM::Target: OnionMessageHandler,
CMH::Target: CustomMessageHandler,
NS::Target: NodeSigner,
{
type Target = Self;
fn deref(&self) -> &Self { self }
}


impl<S: FutureSpawner,
Blocks: Deref + Send + Sync + Clone,
L: Deref + Send + Sync,
Descriptor: SocketDescriptor + Send + Sync,
CM: Deref + Send + Sync,
OM: Deref + Send + Sync,
CMH: Deref + Send + Sync,
NS: Deref + Send + Sync,
> UtxoLookup for GossipVerifier<S, Blocks, L, Descriptor, CM, OM, CMH, NS> where
Blocks::Target: UtxoSource,
L::Target: Logger,
CM::Target: ChannelMessageHandler,
OM::Target: OnionMessageHandler,
CMH::Target: CustomMessageHandler,
NS::Target: NodeSigner,
{
fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult {
let res = UtxoFuture::new();
let fut = res.clone();
let source = self.source.clone();
let gossiper = Arc::clone(&self.gossiper);
let block_cache = Arc::clone(&self.block_cache);
let pm = Arc::clone(&self.peer_manager);
self.spawn.spawn(async move {
let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await;
fut.resolve(gossiper.network_graph(), &*gossiper, res);
pm.process_events();
Comment on lines +311 to +315
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to call PeerManager::process_events here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process_events has to be called cause we now have a pending outbound message waiting to be sent (or possibly we are ready to process more messages from the peer's socket, which we stopped reading from cause we were busy). If we don't call it, the message will go out when we next hit a timer tick in the background processor (or some other regular event calls it for us) but this may be a while.

});
UtxoResult::Async(res)
}
}
Loading