Skip to content
Draft
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 97 additions & 71 deletions lightning/src/util/persist.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No blocker, but I'd still prefer to do the MonitorUpdatingPersisterSync renaming for consistency, instead of adding MonitorUpdatingPersisterAsync. Or, if we want to go with the latter, rename all the other structs we did so far.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed this in the commit message somewhat, but basically because we are still calling the async persist logic "Beta" it seems to make more sense to leave "the one we expect folks to use" not having a suffix.

Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ where
/// less frequent "waves."
/// - [`MonitorUpdatingPersister`] will potentially have more listing to do if you need to run
/// [`MonitorUpdatingPersister::cleanup_stale_updates`].
///
/// Note that you can disable the update-writing entirely by setting `maximum_pending_updates`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the Persist impl for KvStore (non-updating) be removed now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think the next step in cleaning things up would be to consolidate ChainMonitor and MonitorUpdatingPersister and then we'd remove that blanket impl.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could already remove it now and let users use MonitorUpdatingPersister with zero updates? Or you want to keep it to avoid a two-step api change?

/// to zero, causing this [`Persist`] implementation to behave like the blanket [`Persist`]
/// implementation for all [`KVStoreSync`]s.
pub fn new(
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
signer_provider: SP, broadcaster: BI, fee_estimator: FE,
Expand Down Expand Up @@ -757,7 +761,12 @@ where
let mut monitor_bytes = Vec::with_capacity(
MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
);
monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
// If `maximum_pending_updates` is zero, we aren't actually writing monitor updates at all.
// Thus, there's no need to add the sentinel prefix as the monitor can be read directly
// from disk without issue.
if self.maximum_pending_updates != 0 {
monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
}
monitor.write(&mut monitor_bytes).unwrap();
match self.kv_store.write(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
Expand Down Expand Up @@ -796,6 +805,7 @@ where
const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX;
if let Some(update) = update {
let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
&& self.maximum_pending_updates != 0
&& update.update_id % self.maximum_pending_updates != 0;
if persist_update {
let monitor_key = monitor_name.to_string();
Expand Down Expand Up @@ -1111,7 +1121,7 @@ mod tests {
use crate::ln::msgs::BaseMessageHandler;
use crate::sync::Arc;
use crate::util::test_channel_signer::TestChannelSigner;
use crate::util::test_utils::{self, TestLogger, TestStore};
use crate::util::test_utils::{self, TestStore};
use crate::{check_added_monitors, check_closed_broadcast};
use bitcoin::hashes::hex::FromHex;

Expand Down Expand Up @@ -1188,31 +1198,30 @@ mod tests {
}

// Exercise the `MonitorUpdatingPersister` with real channels and payments.
#[test]
fn persister_with_real_monitors() {
// This value is used later to limit how many iterations we perform.
let persister_0_max_pending_updates = 7;
// Intentionally set this to a smaller value to test a different alignment.
let persister_1_max_pending_updates = 3;
fn do_persister_with_real_monitors(persisters_max_pending_updates: (u64, u64)) {
let persister_0_max_pending_updates = persisters_max_pending_updates.0;
let persister_1_max_pending_updates = persisters_max_pending_updates.1;
let chanmon_cfgs = create_chanmon_cfgs(4);
let persister_0 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: persister_0_max_pending_updates,
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
fee_estimator: &chanmon_cfgs[0].fee_estimator,
};
let persister_1 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: persister_1_max_pending_updates,
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
fee_estimator: &chanmon_cfgs[1].fee_estimator,
};
let kv_store_0 = TestStore::new(false);
let persister_0 = MonitorUpdatingPersister::new(
&kv_store_0,
&chanmon_cfgs[0].logger,
persister_0_max_pending_updates,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].fee_estimator,
);
let kv_store_1 = TestStore::new(false);
let persister_1 = MonitorUpdatingPersister::new(
&kv_store_1,
&chanmon_cfgs[1].logger,
persister_1_max_pending_updates,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].fee_estimator,
);
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
Expand Down Expand Up @@ -1256,16 +1265,20 @@ mod tests {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);

let monitor_name = mon.persistence_key();
let expected_updates = if persister_0_max_pending_updates == 0 {
0
} else {
mon.get_latest_update_id() % persister_0_max_pending_updates
};
assert_eq!(
persister_0
.kv_store
kv_store_0
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string()
)
.unwrap()
.len() as u64,
mon.get_latest_update_id() % persister_0_max_pending_updates,
expected_updates,
"Wrong number of updates stored in persister 0",
);
}
Expand All @@ -1275,16 +1288,20 @@ mod tests {
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
let monitor_name = mon.persistence_key();
let expected_updates = if persister_1_max_pending_updates == 0 {
0
} else {
mon.get_latest_update_id() % persister_1_max_pending_updates
};
assert_eq!(
persister_1
.kv_store
kv_store_1
.list(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string()
)
.unwrap()
.len() as u64,
mon.get_latest_update_id() % persister_1_max_pending_updates,
expected_updates,
"Wrong number of updates stored in persister 1",
);
}
Expand Down Expand Up @@ -1348,10 +1365,18 @@ mod tests {

// Make sure everything is persisted as expected after close.
check_persisted_data!(
persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1
cmp::max(2, persister_0_max_pending_updates * 2) * EXPECTED_UPDATES_PER_PAYMENT + 1
);
}

#[test]
fn persister_with_real_monitors() {
// Test various alignments
do_persister_with_real_monitors((7, 3));
do_persister_with_real_monitors((0, 1));
do_persister_with_real_monitors((4, 2));
}

// Test that if the `MonitorUpdatingPersister`'s can't actually write, trying to persist a
// monitor or update with it results in the persister returning an UnrecoverableError status.
#[test]
Expand Down Expand Up @@ -1379,15 +1404,16 @@ mod tests {
let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap();
let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0];

let ro_persister = MonitorUpdatingPersister {
kv_store: &TestStore::new(true),
logger: &TestLogger::new(),
maximum_pending_updates: 11,
entropy_source: node_cfgs[0].keys_manager,
signer_provider: node_cfgs[0].keys_manager,
broadcaster: node_cfgs[0].tx_broadcaster,
fee_estimator: node_cfgs[0].fee_estimator,
};
let store = TestStore::new(true);
let ro_persister = MonitorUpdatingPersister::new(
&store,
node_cfgs[0].logger,
11,
node_cfgs[0].keys_manager,
node_cfgs[0].keys_manager,
node_cfgs[0].tx_broadcaster,
node_cfgs[0].fee_estimator,
);
let monitor_name = added_monitors[0].1.persistence_key();
match ro_persister.persist_new_channel(monitor_name, &added_monitors[0].1) {
ChannelMonitorUpdateStatus::UnrecoverableError => {
Expand Down Expand Up @@ -1425,24 +1451,26 @@ mod tests {
fn clean_stale_updates_works() {
let test_max_pending_updates = 7;
let chanmon_cfgs = create_chanmon_cfgs(3);
let persister_0 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: test_max_pending_updates,
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
fee_estimator: &chanmon_cfgs[0].fee_estimator,
};
let persister_1 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: test_max_pending_updates,
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
fee_estimator: &chanmon_cfgs[1].fee_estimator,
};
let kv_store_0 = TestStore::new(false);
let persister_0 = MonitorUpdatingPersister::new(
&kv_store_0,
&chanmon_cfgs[0].logger,
test_max_pending_updates,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].fee_estimator,
);
let kv_store_1 = TestStore::new(false);
let persister_1 = MonitorUpdatingPersister::new(
&kv_store_1,
&chanmon_cfgs[1].logger,
test_max_pending_updates,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].fee_estimator,
);
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
Expand Down Expand Up @@ -1481,22 +1509,20 @@ mod tests {
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
let (_, monitor) = &persisted_chan_data[0];
let monitor_name = monitor.persistence_key();
persister_0
.kv_store
.write(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
UpdateName::from(1).as_str(),
vec![0u8; 1],
)
.unwrap();
KVStoreSync::write(
&kv_store_0,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
UpdateName::from(1).as_str(),
vec![0u8; 1],
)
.unwrap();

// Do the stale update cleanup
persister_0.cleanup_stale_updates(false).unwrap();

// Confirm the stale update is unreadable/gone
assert!(persister_0
.kv_store
assert!(kv_store_0
.read(
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
Expand Down