Skip to content

Commit 986093a

Browse files
joostjagerclaude
andcommitted
Add incremental ChannelManager persistence support
This implements incremental persistence for ChannelManager, enabling more efficient persistence for nodes with many channels by only writing peer states that have changed since the last persist. Key changes: - Add `write_update` method to ChannelManager that writes only dirty peer states while always including global state (forward_htlcs, claimable_payments, pending_events, etc.) - Track latest update_id via AtomicU64, serialized in TLV field 23 - Use byte comparison against `last_persisted_peer_bytes` to detect which peers have changed - Add `apply_update` method to ChannelManagerData for merging incremental updates during recovery - Extract `channel_manager_from_data` as a crate-public function for use by the persistence utilities - Update background processor to persist incremental updates instead of full ChannelManager, with periodic consolidation (every 100 updates in production, 5 in tests) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 50f3726 commit 986093a

File tree

4 files changed

+754
-194
lines changed

4 files changed

+754
-194
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 122 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,10 @@ use lightning::util::logger::Logger;
5959
use lightning::util::persist::{
6060
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
6161
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
62-
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
63-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
64-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
62+
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
63+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
64+
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
65+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
6566
};
6667
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
6768
use lightning::util::wakers::Future;
@@ -182,6 +183,13 @@ const ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::from_secs(60 * 10);
182183
#[cfg(test)]
183184
const ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::from_secs(1);
184185

186+
/// After this many incremental ChannelManager updates, consolidate by writing a full
187+
/// ChannelManager and cleaning up old updates.
188+
#[cfg(not(test))]
189+
const CHANNEL_MANAGER_UPDATE_CONSOLIDATION_THRESHOLD: u64 = 100;
190+
#[cfg(test)]
191+
const CHANNEL_MANAGER_UPDATE_CONSOLIDATION_THRESHOLD: u64 = 5;
192+
185193
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
186194
const fn min_duration(a: Duration, b: Duration) -> Duration {
187195
if a.as_nanos() < b.as_nanos() {
@@ -1150,18 +1158,32 @@ where
11501158
None => {},
11511159
}
11521160

1153-
let mut futures = Joiner::new();
1161+
// Prepare ChannelManager update data outside the async block so it lives long enough.
1162+
// This must be declared before `futures` so it outlives the future references.
1163+
let cm_update_data = if channel_manager.get_cm().get_and_clear_needs_persistence() {
1164+
log_trace!(logger, "Persisting ChannelManager update...");
11541165

1155-
if channel_manager.get_cm().get_and_clear_needs_persistence() {
1156-
log_trace!(logger, "Persisting ChannelManager...");
1166+
let mut buf = Vec::new();
1167+
let update_id = channel_manager
1168+
.get_cm()
1169+
.write_update(&mut buf)
1170+
.map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?;
1171+
Some((update_id, update_id.to_string(), buf))
1172+
} else {
1173+
None
1174+
};
1175+
1176+
let mut futures = Joiner::new();
11571177

1178+
if let Some((_, ref update_key, ref buf)) = cm_update_data {
1179+
let buf = buf.clone(); // Clone for the async block
11581180
let fut = async {
11591181
kv_store
11601182
.write(
1161-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1162-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1163-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1164-
channel_manager.get_cm().encode(),
1183+
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1184+
"",
1185+
update_key,
1186+
buf,
11651187
)
11661188
.await
11671189
};
@@ -1181,7 +1203,7 @@ where
11811203
task::Poll::Pending => futures.set_a(fut),
11821204
}
11831205

1184-
log_trace!(logger, "Done persisting ChannelManager.");
1206+
log_trace!(logger, "Done persisting ChannelManager update.");
11851207
}
11861208

11871209
// Note that we want to archive stale ChannelMonitors and run a network graph prune once
@@ -1349,6 +1371,40 @@ where
13491371
res?;
13501372
}
13511373

1374+
// Consolidate if we've accumulated enough updates
1375+
if let Some((update_id, _, _)) = cm_update_data {
1376+
if update_id % CHANNEL_MANAGER_UPDATE_CONSOLIDATION_THRESHOLD == 0 {
1377+
log_trace!(logger, "Consolidating ChannelManager updates...");
1378+
// Write full ChannelManager
1379+
kv_store
1380+
.write(
1381+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1382+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1383+
CHANNEL_MANAGER_PERSISTENCE_KEY,
1384+
channel_manager.get_cm().encode(),
1385+
)
1386+
.await?;
1387+
// Clean up old updates
1388+
let update_keys =
1389+
kv_store.list(CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, "").await?;
1390+
for key in update_keys {
1391+
if let Ok(id) = key.parse::<u64>() {
1392+
if id <= update_id {
1393+
let _ = kv_store
1394+
.remove(
1395+
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1396+
"",
1397+
&key,
1398+
true,
1399+
)
1400+
.await;
1401+
}
1402+
}
1403+
}
1404+
log_trace!(logger, "Done consolidating ChannelManager updates.");
1405+
}
1406+
}
1407+
13521408
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13531409
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13541410
}) {
@@ -1405,12 +1461,17 @@ where
14051461
// After we exit, ensure we persist the ChannelManager one final time - this avoids
14061462
// some races where users quit while channel updates were in-flight, with
14071463
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1464+
let mut buf = Vec::new();
1465+
let update_id = channel_manager
1466+
.get_cm()
1467+
.write_update(&mut buf)
1468+
.map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?;
14081469
kv_store
14091470
.write(
1410-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1411-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1412-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1413-
channel_manager.get_cm().encode(),
1471+
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1472+
"",
1473+
&update_id.to_string(),
1474+
buf,
14141475
)
14151476
.await?;
14161477
if let Some(ref scorer) = scorer {
@@ -1723,14 +1784,47 @@ impl BackgroundProcessor {
17231784
last_freshness_call = Instant::now();
17241785
}
17251786
if channel_manager.get_cm().get_and_clear_needs_persistence() {
1726-
log_trace!(logger, "Persisting ChannelManager...");
1787+
log_trace!(logger, "Persisting ChannelManager update...");
1788+
let mut buf = Vec::new();
1789+
let update_id = channel_manager
1790+
.get_cm()
1791+
.write_update(&mut buf)
1792+
.map_err(std::io::Error::other)?;
17271793
(kv_store.write(
1728-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1729-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1730-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1731-
channel_manager.get_cm().encode(),
1794+
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1795+
"",
1796+
&update_id.to_string(),
1797+
buf,
17321798
))?;
1733-
log_trace!(logger, "Done persisting ChannelManager.");
1799+
log_trace!(logger, "Done persisting ChannelManager update {}.", update_id);
1800+
1801+
// Consolidate if we've accumulated enough updates
1802+
if update_id % CHANNEL_MANAGER_UPDATE_CONSOLIDATION_THRESHOLD == 0 {
1803+
log_trace!(logger, "Consolidating ChannelManager updates...");
1804+
// Write full ChannelManager
1805+
kv_store.write(
1806+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1807+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1808+
CHANNEL_MANAGER_PERSISTENCE_KEY,
1809+
channel_manager.get_cm().encode(),
1810+
)?;
1811+
// Clean up old updates
1812+
let update_keys = kv_store
1813+
.list(CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, "")?;
1814+
for key in update_keys {
1815+
if let Ok(id) = key.parse::<u64>() {
1816+
if id <= update_id {
1817+
let _ = kv_store.remove(
1818+
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1819+
"",
1820+
&key,
1821+
true,
1822+
);
1823+
}
1824+
}
1825+
}
1826+
log_trace!(logger, "Done consolidating ChannelManager updates.");
1827+
}
17341828
}
17351829

17361830
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
@@ -1847,11 +1941,14 @@ impl BackgroundProcessor {
18471941
// After we exit, ensure we persist the ChannelManager one final time - this avoids
18481942
// some races where users quit while channel updates were in-flight, with
18491943
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1944+
let mut buf = Vec::new();
1945+
let update_id =
1946+
channel_manager.get_cm().write_update(&mut buf).map_err(std::io::Error::other)?;
18501947
kv_store.write(
1851-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1852-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1853-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1854-
channel_manager.get_cm().encode(),
1948+
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
1949+
"",
1950+
&update_id.to_string(),
1951+
buf,
18551952
)?;
18561953
if let Some(ref scorer) = scorer {
18571954
kv_store.write(

0 commit comments

Comments
 (0)