Skip to content

Commit c73a1ad

Browse files
committed
Split into two functions
1 parent c6fb581 commit c73a1ad

File tree

1 file changed

+150
-94
lines changed
  • crates/breez-sdk/core/src

1 file changed

+150
-94
lines changed

crates/breez-sdk/core/src/sdk.rs

Lines changed: 150 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ use breez_sdk_common::{
1818
};
1919
use spark_wallet::{
2020
ExitSpeed, InvoiceDescription, ListTokenTransactionsRequest, SparkAddress, SparkWallet,
21-
TokenInputs, TransferTokenOutput, WalletEvent, WalletTransfer,
21+
SspUserRequest, TokenInputs, TransferTokenOutput, WalletEvent, WalletTransfer,
2222
};
23-
use std::{str::FromStr, sync::Arc};
23+
use std::{collections::HashMap, str::FromStr, sync::Arc};
2424
use tracing::{error, info, trace};
2525
use web_time::{Duration, SystemTime};
2626

@@ -68,6 +68,11 @@ enum SyncType {
6868
PaymentsOnly,
6969
}
7070

71+
struct AddressTransactionsWithSspUserRequests {
72+
address_transactions: Vec<sparkscan::types::AddressTransaction>,
73+
ssp_user_requests: HashMap<String, SspUserRequest>,
74+
}
75+
7176
/// `BreezSDK` is a wrapper around `SparkSDK` that provides a more structured API
7277
/// with request/response objects and comprehensive error handling.
7378
#[derive(Clone)]
@@ -337,7 +342,7 @@ impl BreezSdk {
337342

338343
self.sync_balances_to_storage(&object_repository).await?;
339344
self.sync_pending_payments().await?;
340-
self.sync_payments_to_storage(&object_repository, true, None)
345+
self.sync_payments_head_to_storage(&object_repository)
341346
.await?;
342347

343348
Ok(())
@@ -364,10 +369,40 @@ impl BreezSdk {
364369
Ok(())
365370
}
366371

367-
async fn sync_payments_tail_to_storage(&self) -> Result<(), SdkError> {
368-
let object_repository = ObjectCacheRepository::new(self.storage.clone());
369-
self.sync_payments_to_storage(&object_repository, false, Some(PAYMENT_SYNC_TAIL_MAX_PAGES))
370-
.await
372+
async fn fetch_address_transactions_with_ssp_user_requests(
373+
&self,
374+
legacy_spark_address: &str,
375+
offset: u64,
376+
) -> Result<AddressTransactionsWithSspUserRequests, SdkError> {
377+
let response = sparkscan::Client::new(&self.config.sparkscan_api_url)
378+
.get_address_transactions_v1_address_address_transactions_get()
379+
.network(sparkscan::types::Network::from(self.config.network))
380+
.address(legacy_spark_address)
381+
.offset(offset)
382+
.limit(PAYMENT_SYNC_BATCH_SIZE)
383+
.send()
384+
.await?;
385+
let address_transactions = response.data.clone();
386+
let ssp_transfer_types = [
387+
sparkscan::types::AddressTransactionType::BitcoinDeposit,
388+
sparkscan::types::AddressTransactionType::BitcoinWithdrawal,
389+
sparkscan::types::AddressTransactionType::LightningPayment,
390+
];
391+
let ssp_user_requests = self
392+
.spark_wallet
393+
.query_ssp_user_requests(
394+
address_transactions
395+
.iter()
396+
.filter(|tx| ssp_transfer_types.contains(&tx.type_))
397+
.map(|tx| tx.id.clone())
398+
.collect(),
399+
)
400+
.await?;
401+
402+
Ok(AddressTransactionsWithSspUserRequests {
403+
address_transactions,
404+
ssp_user_requests,
405+
})
371406
}
372407

373408
/// Synchronizes payments from the Spark network to persistent storage using the Sparkscan API.
@@ -376,100 +411,57 @@ impl BreezSdk {
376411
/// `last_synced_payment_id`. If `max_pages` is reached or we get an error response from the Sparkscan
377412
/// API, it will update the `next_head_offset` for the next sync. This allows us to gradually sync the
378413
/// head up to the `last_synced_payment_id` in multiple sync cycles.
379-
///
380-
/// When syncing from tail, it will start from the count of completed payments in storage and page for
381-
/// a maximum of `PAYMENT_SYNC_TAIL_MAX_PAGES` for one sync cycle. Once it reaches the end page,
382-
/// it will update `tail_synced` in the sync info and the tail will no longer be synced in future cycles.
383-
#[allow(clippy::too_many_lines)]
384-
async fn sync_payments_to_storage(
414+
async fn sync_payments_head_to_storage(
385415
&self,
386416
object_repository: &ObjectCacheRepository,
387-
from_head: bool,
388-
max_pages: Option<u64>,
389417
) -> Result<(), SdkError> {
390-
info!("Syncing payments to storage, from_head = {from_head}, max_pages = {max_pages:?}");
418+
info!("Syncing payments head to storage");
391419
let cached_sync_info = object_repository
392420
.fetch_sync_info()
393421
.await?
394422
.unwrap_or_default();
395-
if cached_sync_info.tail_synced && !from_head {
396-
info!("Payments tail already synced, skipping");
397-
return Ok(());
398-
}
399-
let last_synced_id = cached_sync_info.last_synced_payment_id;
400-
let mut max_pages = max_pages.unwrap_or(u64::MAX);
401-
let mut found_last_synced = false;
402-
let mut more_to_sync = true;
403-
404-
if last_synced_id.is_none() && from_head {
405-
// There is no cached last synced payment id, limit the number of pages when syncing
406-
// from head. Then let the rest of the payments be synced by the tail sync.
407-
// Set `found_last_synced` to true to store the first payment as the last synced payment id.
408-
info!("No last synced payment id found syncing from head, setting max_pages to 1");
409-
max_pages = 1;
410-
found_last_synced = true;
411-
}
412-
413423
// TODO: use new spark address format once sparkscan supports it
414424
let legacy_spark_address = self
415425
.spark_wallet
416426
.get_spark_address()?
417427
.to_string_with_hrp_legacy();
418-
419-
let mut next_offset = if from_head {
420-
// Sync from the next head offset in case we didn't finish syncing the head last time
421-
cached_sync_info.next_head_offset
428+
let last_synced_id = cached_sync_info.last_synced_payment_id;
429+
let (max_pages, mut head_synced) = if last_synced_id.is_some() {
430+
(u64::MAX, false)
422431
} else {
423-
// Count the completed payments and start the sync from there
424-
self.storage
425-
.list_payments(None, None, Some(PaymentStatus::Completed))
426-
.await?
427-
.len() as u64
432+
info!("No last synced payment id found syncing from head, setting max_pages to 1");
433+
// There is no cached last synced payment id, limit the number of pages when syncing
434+
// from head. Then let the rest of the payments be synced by the tail sync.
435+
// Set `head_synced` to true to store the first payment as the last synced payment id.
436+
(1, true)
428437
};
429-
438+
// Sync from the next head offset in case we didn't finish syncing the head last time
439+
let mut next_offset = cached_sync_info.next_head_offset;
430440
let mut payments_to_sync = Vec::new();
431441
'page_loop: for page in 1..=max_pages {
432442
info!(
433443
"Fetching address transactions, offset = {next_offset}, page = {page}/{max_pages}"
434444
);
435-
let response = sparkscan::Client::new(&self.config.sparkscan_api_url)
436-
.get_address_transactions_v1_address_address_transactions_get()
437-
.network(sparkscan::types::Network::from(self.config.network))
438-
.address(legacy_spark_address.to_string())
439-
.offset(next_offset)
440-
.limit(PAYMENT_SYNC_BATCH_SIZE)
441-
.send()
442-
.await;
443-
let address_transactions = match &response {
444-
Ok(response) => &response.data,
445-
Err(e) => {
446-
error!("Failed to fetch address transactions: {e}");
447-
break 'page_loop;
448-
}
449-
};
450-
451-
let ssp_transfer_types = [
452-
sparkscan::types::AddressTransactionType::BitcoinDeposit,
453-
sparkscan::types::AddressTransactionType::BitcoinWithdrawal,
454-
sparkscan::types::AddressTransactionType::LightningPayment,
455-
];
456-
let ssp_user_requests = self
457-
.spark_wallet
458-
.query_ssp_user_requests(
459-
address_transactions
460-
.iter()
461-
.filter(|tx| ssp_transfer_types.contains(&tx.type_))
462-
.map(|tx| tx.id.clone())
463-
.collect(),
445+
let Ok(AddressTransactionsWithSspUserRequests {
446+
address_transactions,
447+
ssp_user_requests,
448+
}) = self
449+
.fetch_address_transactions_with_ssp_user_requests(
450+
&legacy_spark_address,
451+
next_offset,
464452
)
465-
.await?;
453+
.await
454+
else {
455+
error!("Failed to fetch address transactions, stopping sync");
456+
break 'page_loop;
457+
};
466458

467459
info!(
468460
"Processing address transactions, offset = {next_offset}, transactions = {}",
469461
address_transactions.len()
470462
);
471463
// Process transactions in this batch
472-
for transaction in address_transactions {
464+
for transaction in &address_transactions {
473465
// Create payment records
474466
let payments = payments_from_address_transaction_and_ssp_request(
475467
transaction,
@@ -483,7 +475,7 @@ impl BreezSdk {
483475
"Last synced payment id found ({last_synced_id:?}), stopping sync and proceeding to insert {} payments",
484476
payments_to_sync.len()
485477
);
486-
found_last_synced = true;
478+
head_synced = true;
487479
break 'page_loop;
488480
}
489481
payments_to_sync.push(payment);
@@ -493,39 +485,103 @@ impl BreezSdk {
493485
// Check if we have more transfers to fetch
494486
next_offset = next_offset.saturating_add(u64::try_from(address_transactions.len())?);
495487
if (address_transactions.len() as u64) < PAYMENT_SYNC_BATCH_SIZE {
496-
more_to_sync = false;
488+
head_synced = true;
497489
break 'page_loop;
498490
}
499491
}
500492

501-
let (next_head_offset, tail_synced) = match (from_head, found_last_synced) {
502-
// If syncing from head and found the last synced payment, reset next head offset
503-
(true, true) => (Some(0), None),
504-
// If syncing from head and did not find the last synced payment, set the next head offset
505-
(true, false) => (Some(next_offset), None),
506-
// If syncing from tail and there is no more to sync, mark tail as synced
507-
_ => (None, Some(!more_to_sync)),
508-
};
509493
// Insert what synced payments we have into storage from oldest to newest
510494
for payment in payments_to_sync.iter().rev() {
511495
self.storage.insert_payment(payment.clone()).await?;
512496
info!("Inserted payment: {payment:?}");
513-
let last_synced_payment_id = if from_head && found_last_synced {
514-
Some(payment.id.clone())
497+
let (last_synced_payment_id, next_head_offset) = if head_synced {
498+
(Some(payment.id.clone()), 0)
515499
} else {
516-
None
500+
(None, next_offset)
517501
};
518502
object_repository
519-
.merge_sync_info(last_synced_payment_id, next_head_offset, tail_synced)
503+
.merge_sync_info(last_synced_payment_id, Some(next_head_offset), None)
520504
.await?;
521505
}
522-
if !from_head && payments_to_sync.is_empty() {
523-
// If syncing from tail and no new payments were found, set tail synced
524-
object_repository
525-
.merge_sync_info(None, None, tail_synced)
526-
.await?;
506+
507+
Ok(())
508+
}
509+
510+
/// Synchronizes payments from the Spark network to persistent storage using the Sparkscan API.
511+
///
512+
/// When syncing from tail, it will start from the count of completed payments in storage and page for
513+
/// a maximum of `PAYMENT_SYNC_TAIL_MAX_PAGES` for one sync cycle. Once it reaches the end page,
514+
/// it will update `tail_synced` in the sync info and the tail will no longer be synced in future cycles.
515+
async fn sync_payments_tail_to_storage(&self) -> Result<(), SdkError> {
516+
info!("Syncing payments tail to storage");
517+
let object_repository = ObjectCacheRepository::new(self.storage.clone());
518+
let cached_sync_info = object_repository
519+
.fetch_sync_info()
520+
.await?
521+
.unwrap_or_default();
522+
if cached_sync_info.tail_synced {
523+
info!("Payments tail already synced, skipping");
524+
return Ok(());
525+
}
526+
// TODO: use new spark address format once sparkscan supports it
527+
let legacy_spark_address = self
528+
.spark_wallet
529+
.get_spark_address()?
530+
.to_string_with_hrp_legacy();
531+
let mut next_offset = self
532+
.storage
533+
.list_payments(None, None, Some(PaymentStatus::Completed))
534+
.await?
535+
.len() as u64;
536+
let mut tail_synced = false;
537+
538+
for page in 1..=PAYMENT_SYNC_TAIL_MAX_PAGES {
539+
info!(
540+
"Fetching address transactions, offset = {next_offset}, page = {page}/{PAYMENT_SYNC_TAIL_MAX_PAGES}"
541+
);
542+
let Ok(AddressTransactionsWithSspUserRequests {
543+
address_transactions,
544+
ssp_user_requests,
545+
}) = self
546+
.fetch_address_transactions_with_ssp_user_requests(
547+
&legacy_spark_address,
548+
next_offset,
549+
)
550+
.await
551+
else {
552+
error!("Failed to fetch address transactions, stopping sync");
553+
break;
554+
};
555+
556+
info!(
557+
"Processing address transactions, offset = {next_offset}, transactions = {}",
558+
address_transactions.len()
559+
);
560+
// Process transactions in this batch
561+
for transaction in &address_transactions {
562+
let payments = payments_from_address_transaction_and_ssp_request(
563+
transaction,
564+
ssp_user_requests.get(&transaction.id),
565+
&legacy_spark_address,
566+
)?;
567+
// Insert payments
568+
for payment in payments {
569+
self.storage.insert_payment(payment).await?;
570+
}
571+
}
572+
573+
// Check if we have more transfers to fetch
574+
next_offset = next_offset.saturating_add(u64::try_from(address_transactions.len())?);
575+
if (address_transactions.len() as u64) < PAYMENT_SYNC_BATCH_SIZE {
576+
tail_synced = true;
577+
break;
578+
}
527579
}
528580

581+
object_repository
582+
.merge_sync_info(None, None, Some(tail_synced))
583+
.await?;
584+
529585
Ok(())
530586
}
531587

0 commit comments

Comments
 (0)