1010
1111pub ( crate ) mod logging;
1212
13+ use std:: boxed:: Box ;
1314use std:: collections:: { HashMap , HashSet } ;
1415use std:: env;
16+ use std:: future:: Future ;
1517use std:: path:: PathBuf ;
18+ use std:: pin:: Pin ;
1619use std:: sync:: { Arc , RwLock } ;
1720use std:: time:: Duration ;
1821
@@ -31,9 +34,10 @@ use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus};
3134use ldk_node:: {
3235 Builder , CustomTlvRecord , Event , LightningBalance , Node , NodeError , PendingSweepBalance ,
3336} ;
37+ use lightning:: io;
3438use lightning:: ln:: msgs:: SocketAddress ;
3539use lightning:: routing:: gossip:: NodeAlias ;
36- use lightning:: util:: persist:: KVStoreSync ;
40+ use lightning:: util:: persist:: { KVStore , KVStoreSync } ;
3741use lightning:: util:: test_utils:: TestStore ;
3842use lightning_invoice:: { Bolt11InvoiceDescription , Description } ;
3943use lightning_persister:: fs_store:: FilesystemStore ;
@@ -1200,6 +1204,76 @@ impl TestSyncStore {
12001204 }
12011205}
12021206
1207+ impl KVStore for TestSyncStore {
1208+ fn read (
1209+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
1210+ ) -> Pin < Box < dyn Future < Output = Result < Vec < u8 > , io:: Error > > + Send > > {
1211+ let primary_namespace = primary_namespace. to_string ( ) ;
1212+ let secondary_namespace = secondary_namespace. to_string ( ) ;
1213+ let key = key. to_string ( ) ;
1214+ let inner = Arc :: clone ( & self . inner ) ;
1215+ let fut = tokio:: task:: spawn_blocking ( move || {
1216+ inner. read_internal ( & primary_namespace, & secondary_namespace, & key)
1217+ } ) ;
1218+ Box :: pin ( async move {
1219+ fut. await . unwrap_or_else ( |e| {
1220+ let msg = format ! ( "Failed to IO operation due join error: {}" , e) ;
1221+ Err ( io:: Error :: new ( io:: ErrorKind :: Other , msg) )
1222+ } )
1223+ } )
1224+ }
1225+ fn write (
1226+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
1227+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , io:: Error > > + Send > > {
1228+ let primary_namespace = primary_namespace. to_string ( ) ;
1229+ let secondary_namespace = secondary_namespace. to_string ( ) ;
1230+ let key = key. to_string ( ) ;
1231+ let inner = Arc :: clone ( & self . inner ) ;
1232+ let fut = tokio:: task:: spawn_blocking ( move || {
1233+ inner. write_internal ( & primary_namespace, & secondary_namespace, & key, buf)
1234+ } ) ;
1235+ Box :: pin ( async move {
1236+ fut. await . unwrap_or_else ( |e| {
1237+ let msg = format ! ( "Failed to IO operation due join error: {}" , e) ;
1238+ Err ( io:: Error :: new ( io:: ErrorKind :: Other , msg) )
1239+ } )
1240+ } )
1241+ }
1242+ fn remove (
1243+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
1244+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , io:: Error > > + Send > > {
1245+ let primary_namespace = primary_namespace. to_string ( ) ;
1246+ let secondary_namespace = secondary_namespace. to_string ( ) ;
1247+ let key = key. to_string ( ) ;
1248+ let inner = Arc :: clone ( & self . inner ) ;
1249+ let fut = tokio:: task:: spawn_blocking ( move || {
1250+ inner. remove_internal ( & primary_namespace, & secondary_namespace, & key, lazy)
1251+ } ) ;
1252+ Box :: pin ( async move {
1253+ fut. await . unwrap_or_else ( |e| {
1254+ let msg = format ! ( "Failed to IO operation due join error: {}" , e) ;
1255+ Err ( io:: Error :: new ( io:: ErrorKind :: Other , msg) )
1256+ } )
1257+ } )
1258+ }
1259+ fn list (
1260+ & self , primary_namespace : & str , secondary_namespace : & str ,
1261+ ) -> Pin < Box < dyn Future < Output = Result < Vec < String > , io:: Error > > + Send > > {
1262+ let primary_namespace = primary_namespace. to_string ( ) ;
1263+ let secondary_namespace = secondary_namespace. to_string ( ) ;
1264+ let inner = Arc :: clone ( & self . inner ) ;
1265+ let fut = tokio:: task:: spawn_blocking ( move || {
1266+ inner. list_internal ( & primary_namespace, & secondary_namespace)
1267+ } ) ;
1268+ Box :: pin ( async move {
1269+ fut. await . unwrap_or_else ( |e| {
1270+ let msg = format ! ( "Failed to IO operation due join error: {}" , e) ;
1271+ Err ( io:: Error :: new ( io:: ErrorKind :: Other , msg) )
1272+ } )
1273+ } )
1274+ }
1275+ }
1276+
12031277impl KVStoreSync for TestSyncStore {
12041278 fn read (
12051279 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
@@ -1254,9 +1328,10 @@ impl TestSyncStoreInner {
12541328 fn do_list (
12551329 & self , primary_namespace : & str , secondary_namespace : & str ,
12561330 ) -> lightning:: io:: Result < Vec < String > > {
1257- let fs_res = self . fs_store . list ( primary_namespace, secondary_namespace) ;
1258- let sqlite_res = self . sqlite_store . list ( primary_namespace, secondary_namespace) ;
1259- let test_res = self . test_store . list ( primary_namespace, secondary_namespace) ;
1331+ let fs_res = KVStoreSync :: list ( & self . fs_store , primary_namespace, secondary_namespace) ;
1332+ let sqlite_res =
1333+ KVStoreSync :: list ( & self . sqlite_store , primary_namespace, secondary_namespace) ;
1334+ let test_res = KVStoreSync :: list ( & self . test_store , primary_namespace, secondary_namespace) ;
12601335
12611336 match fs_res {
12621337 Ok ( mut list) => {
@@ -1285,9 +1360,11 @@ impl TestSyncStoreInner {
12851360 ) -> lightning:: io:: Result < Vec < u8 > > {
12861361 let _guard = self . serializer . read ( ) . unwrap ( ) ;
12871362
1288- let fs_res = self . fs_store . read ( primary_namespace, secondary_namespace, key) ;
1289- let sqlite_res = self . sqlite_store . read ( primary_namespace, secondary_namespace, key) ;
1290- let test_res = self . test_store . read ( primary_namespace, secondary_namespace, key) ;
1363+ let fs_res = KVStoreSync :: read ( & self . fs_store , primary_namespace, secondary_namespace, key) ;
1364+ let sqlite_res =
1365+ KVStoreSync :: read ( & self . sqlite_store , primary_namespace, secondary_namespace, key) ;
1366+ let test_res =
1367+ KVStoreSync :: read ( & self . test_store , primary_namespace, secondary_namespace, key) ;
12911368
12921369 match fs_res {
12931370 Ok ( read) => {
@@ -1309,11 +1386,27 @@ impl TestSyncStoreInner {
13091386 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
13101387 ) -> lightning:: io:: Result < ( ) > {
13111388 let _guard = self . serializer . write ( ) . unwrap ( ) ;
1312- let fs_res = self . fs_store . write ( primary_namespace, secondary_namespace, key, buf. clone ( ) ) ;
1313- let sqlite_res =
1314- self . sqlite_store . write ( primary_namespace, secondary_namespace, key, buf. clone ( ) ) ;
1315- let test_res =
1316- self . test_store . write ( primary_namespace, secondary_namespace, key, buf. clone ( ) ) ;
1389+ let fs_res = KVStoreSync :: write (
1390+ & self . fs_store ,
1391+ primary_namespace,
1392+ secondary_namespace,
1393+ key,
1394+ buf. clone ( ) ,
1395+ ) ;
1396+ let sqlite_res = KVStoreSync :: write (
1397+ & self . sqlite_store ,
1398+ primary_namespace,
1399+ secondary_namespace,
1400+ key,
1401+ buf. clone ( ) ,
1402+ ) ;
1403+ let test_res = KVStoreSync :: write (
1404+ & self . test_store ,
1405+ primary_namespace,
1406+ secondary_namespace,
1407+ key,
1408+ buf. clone ( ) ,
1409+ ) ;
13171410
13181411 assert ! ( self
13191412 . do_list( primary_namespace, secondary_namespace)
@@ -1338,10 +1431,22 @@ impl TestSyncStoreInner {
13381431 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
13391432 ) -> lightning:: io:: Result < ( ) > {
13401433 let _guard = self . serializer . write ( ) . unwrap ( ) ;
1341- let fs_res = self . fs_store . remove ( primary_namespace, secondary_namespace, key, lazy) ;
1342- let sqlite_res =
1343- self . sqlite_store . remove ( primary_namespace, secondary_namespace, key, lazy) ;
1344- let test_res = self . test_store . remove ( primary_namespace, secondary_namespace, key, lazy) ;
1434+ let fs_res =
1435+ KVStoreSync :: remove ( & self . fs_store , primary_namespace, secondary_namespace, key, lazy) ;
1436+ let sqlite_res = KVStoreSync :: remove (
1437+ & self . sqlite_store ,
1438+ primary_namespace,
1439+ secondary_namespace,
1440+ key,
1441+ lazy,
1442+ ) ;
1443+ let test_res = KVStoreSync :: remove (
1444+ & self . test_store ,
1445+ primary_namespace,
1446+ secondary_namespace,
1447+ key,
1448+ lazy,
1449+ ) ;
13451450
13461451 assert ! ( !self
13471452 . do_list( primary_namespace, secondary_namespace)
0 commit comments