@@ -470,8 +470,7 @@ where
470
470
return Ok ( ( ) ) ;
471
471
}
472
472
473
- let persist_fut = {
474
- let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
473
+ self . update_state ( |state_lock| -> Result < ( ( ) , bool ) , ( ) > {
475
474
for descriptor in relevant_descriptors {
476
475
let output_info = TrackedSpendableOutput {
477
476
descriptor,
@@ -491,17 +490,12 @@ where
491
490
}
492
491
493
492
state_lock. outputs . push ( output_info) ;
493
+ state_lock. dirty = true ;
494
494
}
495
495
496
- state_lock. dirty = false ;
497
- self . persist_state ( & state_lock)
498
- } ;
499
-
500
- persist_fut. await . map_err ( |e| {
501
- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
502
-
503
- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
496
+ Ok ( ( ( ) , true ) )
504
497
} )
498
+ . await
505
499
}
506
500
507
501
/// Returns a list of the currently tracked spendable outputs.
@@ -557,29 +551,18 @@ where
557
551
} ;
558
552
559
553
// See if there is anything to sweep before requesting a change address.
560
- let ( persist_fut, has_respends) = {
561
- let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
562
-
563
- let cur_height = sweeper_state. best_block . height ;
564
- let has_respends = sweeper_state. outputs . iter ( ) . any ( |o| filter_fn ( o, cur_height) ) ;
565
- // If there is nothing to sweep, we still persist the state if it is dirty.
566
- let fut_opt = if !has_respends && sweeper_state. dirty {
567
- sweeper_state. dirty = false ;
568
- Some ( self . persist_state ( & sweeper_state) )
569
- } else {
570
- None
571
- } ;
572
-
573
- ( fut_opt, has_respends)
574
- } ;
554
+ let has_respends = self
555
+ . update_state ( |sweeper_state| -> Result < ( bool , bool ) , ( ) > {
556
+ let cur_height = sweeper_state. best_block . height ;
557
+ let has_respends = sweeper_state. outputs . iter ( ) . any ( |o| filter_fn ( o, cur_height) ) ;
575
558
576
- if let Some ( persist_fut ) = persist_fut {
577
- persist_fut . await . map_err ( |e| {
578
- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
559
+ // If there are respends, we don't need to persist a dirty state already now and can postpone it until
560
+ // after the sweep.
561
+ let persist_if_dirty = !has_respends ;
579
562
580
- log_error ! ( self . logger , "Error persisting OutputSweeper: {:?}" , e ) ;
581
- } ) ? ;
582
- } ;
563
+ Ok ( ( has_respends , persist_if_dirty ) )
564
+ } )
565
+ . await ? ;
583
566
584
567
if !has_respends {
585
568
return Ok ( ( ) ) ;
@@ -590,67 +573,56 @@ where
590
573
self . change_destination_source . get_change_destination_script ( ) . await ?;
591
574
592
575
// Sweep the outputs.
593
- let spending_tx;
594
- let persist_fut = {
595
- let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
596
-
597
- let cur_height = sweeper_state. best_block . height ;
598
- let cur_hash = sweeper_state. best_block . block_hash ;
576
+ let spending_tx = self
577
+ . update_state ( |sweeper_state| -> Result < ( Option < Transaction > , bool ) , ( ) > {
578
+ let cur_height = sweeper_state. best_block . height ;
579
+ let cur_hash = sweeper_state. best_block . block_hash ;
599
580
600
- let respend_descriptors: Vec < & SpendableOutputDescriptor > = sweeper_state
601
- . outputs
602
- . iter ( )
603
- . filter ( |o| filter_fn ( * o, cur_height) )
604
- . map ( |o| & o. descriptor )
605
- . collect ( ) ;
606
-
607
- // Exit if there is nothing to spend anymore and there also is no need to persist the state.
608
- if respend_descriptors. is_empty ( ) && !sweeper_state. dirty {
609
- return Ok ( ( ) ) ;
610
- }
611
-
612
- // Generate the spending transaction and broadcast it.
613
- spending_tx = if !respend_descriptors. is_empty ( ) {
614
- let spending_tx = self
615
- . spend_outputs ( & sweeper_state, & respend_descriptors, change_destination_script)
616
- . map_err ( |e| {
617
- log_error ! ( self . logger, "Error spending outputs: {:?}" , e) ;
618
- } ) ?;
619
-
620
- log_debug ! (
621
- self . logger,
622
- "Generating and broadcasting sweeping transaction {}" ,
623
- spending_tx. compute_txid( )
624
- ) ;
581
+ let respend_descriptors: Vec < & SpendableOutputDescriptor > = sweeper_state
582
+ . outputs
583
+ . iter ( )
584
+ . filter ( |o| filter_fn ( * o, cur_height) )
585
+ . map ( |o| & o. descriptor )
586
+ . collect ( ) ;
587
+
588
+ // Generate the spending transaction and broadcast it.
589
+ if !respend_descriptors. is_empty ( ) {
590
+ let spending_tx = self
591
+ . spend_outputs (
592
+ & sweeper_state,
593
+ & respend_descriptors,
594
+ change_destination_script,
595
+ )
596
+ . map_err ( |e| {
597
+ log_error ! ( self . logger, "Error spending outputs: {:?}" , e) ;
598
+ } ) ?;
599
+
600
+ log_debug ! (
601
+ self . logger,
602
+ "Generating and broadcasting sweeping transaction {}" ,
603
+ spending_tx. compute_txid( )
604
+ ) ;
625
605
626
- // As we didn't modify the state so far, the same filter_fn yields the same elements as
627
- // above.
628
- let respend_outputs =
629
- sweeper_state. outputs . iter_mut ( ) . filter ( |o| filter_fn ( & * * o, cur_height) ) ;
630
- for output_info in respend_outputs {
631
- if let Some ( filter) = self . chain_data_source . as_ref ( ) {
632
- let watched_output = output_info. to_watched_output ( cur_hash) ;
633
- filter. register_output ( watched_output) ;
606
+ // As we didn't modify the state so far, the same filter_fn yields the same elements as
607
+ // above.
608
+ let respend_outputs =
609
+ sweeper_state. outputs . iter_mut ( ) . filter ( |o| filter_fn ( & * * o, cur_height) ) ;
610
+ for output_info in respend_outputs {
611
+ if let Some ( filter) = self . chain_data_source . as_ref ( ) {
612
+ let watched_output = output_info. to_watched_output ( cur_hash) ;
613
+ filter. register_output ( watched_output) ;
614
+ }
615
+
616
+ output_info. status . broadcast ( cur_hash, cur_height, spending_tx. clone ( ) ) ;
617
+ sweeper_state. dirty = true ;
634
618
}
635
619
636
- output_info. status . broadcast ( cur_hash, cur_height, spending_tx. clone ( ) ) ;
620
+ Ok ( ( Some ( spending_tx) , true ) )
621
+ } else {
622
+ Ok ( ( None , true ) )
637
623
}
638
-
639
- Some ( spending_tx)
640
- } else {
641
- None
642
- } ;
643
-
644
- // Either the state was already dirty or we modified it above, so we persist it.
645
- sweeper_state. dirty = false ;
646
- self . persist_state ( & sweeper_state)
647
- } ;
648
-
649
- persist_fut. await . map_err ( |e| {
650
- self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
651
-
652
- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
653
- } ) ?;
624
+ } )
625
+ . await ?;
654
626
655
627
// Persistence completely successfully. If we have a spending transaction, we broadcast it.
656
628
if let Some ( spending_tx) = spending_tx {
@@ -695,6 +667,37 @@ where
695
667
)
696
668
}
697
669
670
+ /// Updates the sweeper state by executing the given callback. Persists the state afterwards if it is marked dirty,
671
+ /// but only if persist_if_dirty is also true. Returning false for persist_if_dirty allows the callback to postpone
672
+ /// persisting a potentially dirty state.
673
+ async fn update_state < X > (
674
+ & self , callback : impl FnOnce ( & mut SweeperState ) -> Result < ( X , bool ) , ( ) > ,
675
+ ) -> Result < X , ( ) > {
676
+ let ( fut, res) ;
677
+ {
678
+ let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
679
+
680
+ let persist_if_dirty;
681
+ ( res, persist_if_dirty) = callback ( & mut state_lock) ?;
682
+
683
+ if !state_lock. dirty || !persist_if_dirty {
684
+ return Ok ( res) ;
685
+ }
686
+
687
+ state_lock. dirty = false ;
688
+
689
+ fut = self . persist_state ( & state_lock) ;
690
+ } ;
691
+
692
+ fut. await . map_err ( |e| {
693
+ self . sweeper_state . lock ( ) . unwrap ( ) . dirty = true ;
694
+
695
+ log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
696
+ } ) ?;
697
+
698
+ Ok ( res)
699
+ }
700
+
698
701
fn spend_outputs (
699
702
& self , sweeper_state : & SweeperState , descriptors : & [ & SpendableOutputDescriptor ] ,
700
703
change_destination_script : ScriptBuf ,
0 commit comments