diff --git a/multinode_integration_tests/tests/verify_bill_payment.rs b/multinode_integration_tests/tests/verify_bill_payment.rs index 714856eb8..e5fddc67f 100644 --- a/multinode_integration_tests/tests/verify_bill_payment.rs +++ b/multinode_integration_tests/tests/verify_bill_payment.rs @@ -465,7 +465,7 @@ fn verify_pending_payables() { } MASQNodeUtils::assert_node_wrote_log_containing( real_consuming_node.name(), - "Found 3 pending payables and 0 supposed failures to process", + "Found 3 pending payables and 0 suspected failures to process", Duration::from_secs(5), ); MASQNodeUtils::assert_node_wrote_log_containing( diff --git a/node/src/accountant/logging_utils/accounting_msgs_debug.rs b/node/src/accountant/logging_utils/accounting_msgs_debug.rs index 81f547b6d..e6cb8eb52 100644 --- a/node/src/accountant/logging_utils/accounting_msgs_debug.rs +++ b/node/src/accountant/logging_utils/accounting_msgs_debug.rs @@ -12,80 +12,68 @@ use web3::types::Address; // of the Accountant. #[derive(Default)] -pub struct AccountingMsgsDebugStats { - report_routing_service_provided_processed: AccountingMsgStats, - report_exit_service_provided_processed: AccountingMsgStats, - report_services_consumed_processed: AccountingMsgStats, +pub struct AccountingMessageTracker { + routing_provided_stats: AccountingMsgStats, + exit_provided_stats: AccountingMsgStats, + consumed_stats: AccountingMsgStats, } -impl AccountingMsgsDebugStats { - pub fn manage_debug_log( +impl AccountingMessageTracker { + pub fn process_debug_stats( &mut self, - logger: &Logger, msg_type: AccountingMsgType, + new_charges: Vec, log_window_size: u16, - new_postings: Vec, - ) { - if logger.debug_enabled() { - if let Some(loggable_stats) = self.manage_log(msg_type, new_postings, log_window_size) { - debug!(logger, "{}", loggable_stats); - } - } + ) -> Option { + self.record_new_charges_by_msg_type(new_charges, msg_type); + + self.maybe_dump_stats_by_msg_type(log_window_size, msg_type) } - fn manage_log( + fn record_new_charges_by_msg_type( &mut self, + new_charges: Vec, msg_type: AccountingMsgType, - new_postings: Vec, - log_window_size: u16, - ) -> Option { - self.record(new_postings, msg_type); - self.request_log_instruction(log_window_size, msg_type) - } - - fn record(&mut self, new_postings: Vec, msg_type: AccountingMsgType) { + ) { match msg_type { AccountingMsgType::RoutingServiceProvided => { - self.report_routing_service_provided_processed - .handle_new_postings(new_postings); + self.routing_provided_stats.record_new_charges(new_charges); } AccountingMsgType::ExitServiceProvided => { - self.report_exit_service_provided_processed - .handle_new_postings(new_postings); + self.exit_provided_stats.record_new_charges(new_charges); } AccountingMsgType::ServicesConsumed => { - self.report_services_consumed_processed - .handle_new_postings(new_postings); + self.consumed_stats.record_new_charges(new_charges); } } } - fn request_log_instruction( + fn maybe_dump_stats_by_msg_type( &mut self, - gap_size: u16, + log_window_size: u16, msg_type: AccountingMsgType, ) -> Option { match msg_type { AccountingMsgType::RoutingServiceProvided => self - .report_routing_service_provided_processed - .loggable_stats(gap_size), + .routing_provided_stats + .maybe_dump_stats(log_window_size, msg_type), AccountingMsgType::ExitServiceProvided => self - .report_exit_service_provided_processed - .loggable_stats(gap_size), + .exit_provided_stats + .maybe_dump_stats(log_window_size, msg_type), AccountingMsgType::ServicesConsumed => self - .report_services_consumed_processed - .loggable_stats(gap_size), + .consumed_stats + .maybe_dump_stats(log_window_size, msg_type), } } } #[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub struct NewPosting { - address: Address, - amount_wei: u128, +pub struct NewCharge { + pub address: Address, + pub amount_wei: u128, } -impl NewPosting { +impl NewCharge { pub fn new(address: Address, amount_wei: u128) -> Self { Self { address, @@ -100,11 +88,10 @@ pub struct LoggableStats { accounting_msg_stats: HashMap, log_window_in_pcs_of_msgs: u16, } - impl Display for LoggableStats { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let label = format!( - "Account debits in last {} {:?} messages (wei):", + "Total debits across last {} {:?} messages (wei):", self.log_window_in_pcs_of_msgs, self.msg_type ); let stats = self @@ -124,12 +111,16 @@ struct AccountingMsgStats { } impl AccountingMsgStats { - fn loggable_stats(&mut self, log_window_size: u16) -> Option { - if self.msg_count_since_last_logged == log_window_size as usize { + fn maybe_dump_stats( + &mut self, + log_window_size: u16, + msg_type: AccountingMsgType, + ) -> Option { + if self.should_log_stats(log_window_size) { self.msg_count_since_last_logged = 0; Some(LoggableStats { - msg_type: AccountingMsgType::RoutingServiceProvided, + msg_type, accounting_msg_stats: self.stats.drain().collect(), log_window_in_pcs_of_msgs: log_window_size, }) @@ -138,10 +129,14 @@ impl AccountingMsgStats { } } - fn handle_new_postings(&mut self, new_postings: Vec) { - for new_posting in &new_postings { - *self.stats.entry(new_posting.address).or_default() += new_posting.amount_wei; - } + fn should_log_stats(&self, log_window_size: u16) -> bool { + self.msg_count_since_last_logged >= log_window_size as usize + } + + fn record_new_charges(&mut self, new_charges_vec: Vec) { + new_charges_vec.iter().for_each(|new_charges| { + *self.stats.entry(new_charges.address).or_default() += new_charges.amount_wei; + }); self.msg_count_since_last_logged += 1; } } @@ -153,12 +148,12 @@ pub enum AccountingMsgType { ServicesConsumed, } -pub struct NewPostingsDebugContainer { +pub struct NewChargessDebugContainer { debug_enabled: bool, - vec: Vec, + vec: Vec, } -impl NewPostingsDebugContainer { +impl NewChargessDebugContainer { pub fn new(logger: &Logger) -> Self { Self { debug_enabled: logger.debug_enabled(), @@ -166,16 +161,18 @@ impl NewPostingsDebugContainer { } } - pub fn add(mut self, address: Address, sum_wei: u128) -> Self { + pub fn add_new_charge(mut self, new_charge_opt: Option) -> Self { if self.debug_enabled { - self.vec.push(NewPosting::new(address, sum_wei)); + if let Some(new_charge) = new_charge_opt { + self.vec.push(new_charge); + } } self } } -impl From for Vec { - fn from(postings: NewPostingsDebugContainer) -> Self { +impl From for Vec { + fn from(postings: NewChargessDebugContainer) -> Self { postings.vec } } @@ -183,274 +180,231 @@ impl From for Vec { #[cfg(test)] mod tests { use super::{ - AccountingMsgType, AccountingMsgsDebugStats, LoggableStats, NewPosting, - NewPostingsDebugContainer, + AccountingMessageTracker, AccountingMsgType, LoggableStats, NewCharge, + NewChargessDebugContainer, }; use crate::blockchain::test_utils::make_address; use itertools::Itertools; use log::Level; use masq_lib::logger::Logger; - use masq_lib::test_utils::logging::{init_test_logging, TestLogHandler}; use std::collections::HashMap; use web3::types::Address; #[test] fn test_loggable_count_works_for_routing_service_provided() { - test_manage_debug_log( - AccountingMsgType::RoutingServiceProvided, - 6, - |subject| { - subject - .report_routing_service_provided_processed - .stats - .clone() - }, - |subject| { - subject - .report_routing_service_provided_processed - .msg_count_since_last_logged - }, - ); + test_process_debug_stats(AccountingMsgType::RoutingServiceProvided, 6); } #[test] fn test_loggable_count_works_for_exit_service_provided() { - test_manage_debug_log( - AccountingMsgType::ExitServiceProvided, - 3, - |subject| subject.report_exit_service_provided_processed.stats.clone(), - |subject| { - subject - .report_exit_service_provided_processed - .msg_count_since_last_logged - }, - ); + test_process_debug_stats(AccountingMsgType::ExitServiceProvided, 3); } #[test] fn test_loggable_count_works_for_services_consumed() { - test_manage_debug_log( - AccountingMsgType::ServicesConsumed, - 8, - |subject| subject.report_services_consumed_processed.stats.clone(), - |subject| { - subject - .report_services_consumed_processed - .msg_count_since_last_logged - }, + test_process_debug_stats(AccountingMsgType::ServicesConsumed, 8); + } + + fn test_process_debug_stats(msg_type: AccountingMsgType, log_window_size: u16) { + let mut new_charge_feeds_per_msg = + generate_new_charge_feeds_representing_msgs(log_window_size); + let expected_sorted_stats = + construct_expected_sorted_stats_from_generated_new_charges(&new_charge_feeds_per_msg); + let mut subject = AccountingMessageTracker::default(); + assert_empty_stats(&subject); + let charge_msg_matching_the_window_size = + new_charge_feeds_per_msg.remove(log_window_size as usize - 1 - 1); + let initial_charge_msgs = new_charge_feeds_per_msg; + + test_msgs_of_count_window_size_minus_one( + &mut subject, + msg_type, + log_window_size, + initial_charge_msgs, ); + + let result = subject + .process_debug_stats( + msg_type, + charge_msg_matching_the_window_size, + log_window_size, + ) + .expect("first try: expected stats dump"); + + assert_provided_loggable_stats(result, msg_type, log_window_size, expected_sorted_stats); + assert_empty_stats(&subject); + + retest_after_emptied(&mut subject, msg_type); } - fn test_manage_debug_log( + fn test_msgs_of_count_window_size_minus_one( + subject: &mut AccountingMessageTracker, msg_type: AccountingMsgType, - gap_size: u16, - fetch_stats: fn(&AccountingMsgsDebugStats) -> HashMap, - fetch_msg_count_processed: fn(&AccountingMsgsDebugStats) -> usize, + log_window_size: u16, + initial_charge_msgs: Vec>, ) { - // We begin the test by recording N - 1 msgs. Then we add one more and match the gap_size - // condition which should release the debug stats. After that happens, the stats are cleared - // and the process can start again. - let new_posting_feeds_per_msg = generate_posting_feeds_representing_msgs(gap_size); - let mut subject = AccountingMsgsDebugStats::default(); - - let initial_state_total_count = fetch_stats(&subject); - let initial_msg_count_processed = fetch_msg_count_processed(&subject); - assert_eq!(initial_state_total_count, hashmap!()); - assert_eq!(initial_msg_count_processed, 0); - - let first_feed_remembered = new_posting_feeds_per_msg.first().unwrap().clone(); - let last_feed_remembered = new_posting_feeds_per_msg.last().unwrap().clone(); - - let first_expected_stats = - compute_expected_stats_from_new_posting_feeds(&new_posting_feeds_per_msg); - let first_loggable_stats_opt = new_posting_feeds_per_msg + initial_charge_msgs .into_iter() - .fold(None, |_, new_postings| { - subject.manage_log(msg_type, new_postings, gap_size) + .enumerate() + .for_each(|(idx, new_charges)| { + let result = subject.process_debug_stats(msg_type, new_charges, log_window_size); + + assert_eq!( + result, + None, + "We expected the first {} msgs to be just recorded and not to stimulate stats \ + as happened with msg {}", + log_window_size - 1, + idx + 1 + ) }); - let first_actual_stats = fetch_stats(&subject); - let first_msg_count_processed = fetch_msg_count_processed(&subject); - assert_eq!(first_loggable_stats_opt, None); + } + + fn assert_empty_stats(subject: &AccountingMessageTracker) { + assert!(subject.consumed_stats.stats.is_empty()); + assert_eq!(subject.consumed_stats.msg_count_since_last_logged, 0); + assert!(subject.exit_provided_stats.stats.is_empty()); + assert_eq!(subject.exit_provided_stats.msg_count_since_last_logged, 0); + assert!(subject.routing_provided_stats.stats.is_empty()); assert_eq!( - first_actual_stats.into_iter().sorted().collect_vec(), - first_expected_stats - ); - assert_eq!(first_msg_count_processed, gap_size as usize - 1); + subject.routing_provided_stats.msg_count_since_last_logged, + 0 + ) + } - let posting_fulfilling_the_msg_count_requirement = first_feed_remembered; - let second_loggable_stats_opt = subject.manage_log( - msg_type, - posting_fulfilling_the_msg_count_requirement.clone(), - gap_size, - ); - let second_actual_stats = fetch_stats(&subject); - let second_msg_count_processed = fetch_msg_count_processed(&subject); - let second_expected_stats = record_new_posting_feed_in( - first_expected_stats, - posting_fulfilling_the_msg_count_requirement, - ); - let loggable_stats = second_loggable_stats_opt.unwrap(); + fn assert_provided_loggable_stats( + actual_loggable_stats: LoggableStats, + msg_type: AccountingMsgType, + log_window_size: u16, + expected_sorted_stats: Vec<(Address, u128)>, + ) { + assert_eq!(actual_loggable_stats.msg_type, msg_type); assert_eq!( - loggable_stats + actual_loggable_stats .accounting_msg_stats .into_iter() .sorted() .collect_vec(), - second_expected_stats, + expected_sorted_stats ); - assert_eq!(loggable_stats.log_window_in_pcs_of_msgs, gap_size,); - assert_eq!(second_actual_stats, hashmap!()); - assert_eq!(second_msg_count_processed, 0); - - let new_posting_after_stats_dumping = last_feed_remembered; - let third_loggable_stats_opt = - subject.manage_log(msg_type, new_posting_after_stats_dumping.clone(), gap_size); - let third_actual_stats = fetch_stats(&subject); - let third_msg_count_processed = fetch_msg_count_processed(&subject); - assert_eq!(third_loggable_stats_opt, None); assert_eq!( - third_actual_stats.into_iter().sorted().collect_vec(), - new_posting_after_stats_dumping - .into_iter() - .map(|posting| (posting.address, posting.amount_wei)) - .sorted() - .collect_vec(), + actual_loggable_stats.log_window_in_pcs_of_msgs, + log_window_size ); - assert_eq!(third_msg_count_processed, 1); } - fn record_new_posting_feed_in( - first_expected_stats: Vec<(Address, u128)>, - second_new_posting: Vec, - ) -> Vec<(Address, u128)> { - let second_expected_stats = first_expected_stats - .into_iter() - .map(|(address, sum)| { - let updated_sum = second_new_posting.iter().fold(sum, |acc, posting| { - if posting.address == address { - acc + posting.amount_wei - } else { - acc - } - }); - (address, updated_sum) - }) - .collect_vec(); - second_expected_stats + fn retest_after_emptied(subject: &mut AccountingMessageTracker, msg_type: AccountingMsgType) { + const QUICK_RETEST_WINDOW_SIZE: u16 = 2; + let mut new_charges_feeds_per_msg = + generate_new_charge_feeds_representing_msgs(QUICK_RETEST_WINDOW_SIZE); + let expected_sorted_stats = + construct_expected_sorted_stats_from_generated_new_charges(&new_charges_feeds_per_msg); + + let result = subject.process_debug_stats( + msg_type, + new_charges_feeds_per_msg.remove(0), + QUICK_RETEST_WINDOW_SIZE, + ); + + assert_eq!(result, None); + + let result = subject + .process_debug_stats( + msg_type, + new_charges_feeds_per_msg.remove(0), + QUICK_RETEST_WINDOW_SIZE, + ) + .expect("second try: expected stats dump"); + + assert_provided_loggable_stats( + result, + msg_type, + QUICK_RETEST_WINDOW_SIZE, + expected_sorted_stats, + ); } - fn generate_posting_feeds_representing_msgs(gap_size: u16) -> Vec> { - let new_postings_feeds = (0..gap_size - 1) - .map(|outer_idx| { - (0..outer_idx + 1) - .map(|inner_idx| { - NewPosting::new( - make_address(inner_idx as u32), - (inner_idx as u128 + 1) * 1234567, - ) + fn generate_new_charge_feeds_representing_msgs(log_window_size: u16) -> Vec> { + (0..log_window_size) + .map(|msg_number| { + (0..msg_number) + .map(|new_charge_idx| { + let address = make_address(new_charge_idx as u32); + let charge = (new_charge_idx as u128 + 1) * 1234567; + NewCharge::new(address, charge) }) .collect_vec() }) - .collect_vec(); - new_postings_feeds + .collect_vec() } - fn compute_expected_stats_from_new_posting_feeds( - new_postings_feeds: &Vec>, + fn construct_expected_sorted_stats_from_generated_new_charges( + msg_batches: &[Vec], ) -> Vec<(Address, u128)> { - let first_expected_stats = { - let all_postings_flattened = new_postings_feeds.iter().flatten().collect_vec(); - let all_unique_addresses = new_postings_feeds.last().unwrap(); - all_unique_addresses - .iter() - .map(|unique_account_posting| { - let sum = all_postings_flattened.iter().fold(0, |acc, posting| { - if posting.address == unique_account_posting.address { - acc + posting.amount_wei - } else { - acc - } - }); - (unique_account_posting.address, sum) - }) - .collect_vec() - }; - first_expected_stats + msg_batches + .iter() + .flatten() + .fold(HashMap::new(), |mut totals, posting| { + *totals.entry(posting.address).or_default() += posting.amount_wei; + totals + }) + .into_iter() + .sorted() + .collect() } #[test] fn new_posting_debug_container_for_debug_enabled() { let mut logger = Logger::new("test"); logger.set_level_for_test(Level::Debug); - let container = NewPostingsDebugContainer::new(&logger); - let new_posting_1 = NewPosting::new(make_address(1), 1234567); - let new_posting_2 = NewPosting::new(make_address(2), 7654321); - - let container = container.add(new_posting_1.address, new_posting_1.amount_wei); - let container = container.add(new_posting_1.address, new_posting_1.amount_wei); - let container = container.add(new_posting_2.address, new_posting_2.amount_wei); - - let stats: Vec = container.into(); - assert_eq!(stats, vec![new_posting_1, new_posting_1, new_posting_2]); + let container = NewChargessDebugContainer::new(&logger); + let new_charge_1 = NewCharge::new(make_address(1), 1234567); + let new_charge_2 = NewCharge::new(make_address(2), 7654321); + + let container = container.add_new_charge(Some(NewCharge::new( + new_charge_1.address, + new_charge_1.amount_wei, + ))); + let container = container.add_new_charge(Some(NewCharge::new( + new_charge_1.address, + new_charge_1.amount_wei, + ))); + let container = container.add_new_charge(None); + let container = container.add_new_charge(Some(NewCharge::new( + new_charge_2.address, + new_charge_2.amount_wei, + ))); + + let stats: Vec = container.into(); + assert_eq!(stats, vec![new_charge_1, new_charge_1, new_charge_2]); } #[test] fn new_posting_debug_container_for_debug_not_enabled() { let mut logger = Logger::new("test"); logger.set_level_for_test(Level::Info); - let container = NewPostingsDebugContainer::new(&logger); - let new_posting_1 = NewPosting::new(make_address(1), 1234567); - let new_posting_2 = NewPosting::new(make_address(2), 7654321); - - let container = container.add(new_posting_1.address, new_posting_1.amount_wei); - let container = container.add(new_posting_1.address, new_posting_1.amount_wei); - let container = container.add(new_posting_2.address, new_posting_2.amount_wei); - - let stats: Vec = container.into(); + let container = NewChargessDebugContainer::new(&logger); + let new_charge_1 = NewCharge::new(make_address(1), 1234567); + let new_charge_2 = NewCharge::new(make_address(2), 7654321); + + let container = container.add_new_charge(Some(NewCharge::new( + new_charge_1.address, + new_charge_1.amount_wei, + ))); + let container = container.add_new_charge(Some(NewCharge::new( + new_charge_1.address, + new_charge_1.amount_wei, + ))); + let container = container.add_new_charge(None); + let container = container.add_new_charge(Some(NewCharge::new( + new_charge_2.address, + new_charge_2.amount_wei, + ))); + + let stats: Vec = container.into(); assert_eq!(stats, vec![]); } - #[test] - fn accounts_stats_are_logged_only_if_debug_enabled() { - init_test_logging(); - let test_name = "accounts_stats_are_logged_only_if_debug_enabled"; - let mut logger = Logger::new(test_name); - logger.set_level_for_test(Level::Debug); - let mut subject = AccountingMsgsDebugStats::default(); - let new_posting_1 = NewPosting::new(make_address(1), 1234567); - let new_posting_2 = NewPosting::new(make_address(2), 7654321); - - subject.manage_debug_log( - &logger, - AccountingMsgType::ServicesConsumed, - 1, - vec![new_posting_1, new_posting_2], - ); - - TestLogHandler::new() - .exists_log_containing(&format!("DEBUG: {test_name}: Account debits in last")); - } - - #[test] - fn accounts_stats_are_not_logged_if_debug_is_not_enabled() { - init_test_logging(); - let test_name = "accounts_stats_are_not_logged_if_debug_is_not_enabled"; - let mut logger = Logger::new("test"); - logger.set_level_for_test(Level::Info); - let mut subject = AccountingMsgsDebugStats::default(); - let new_posting_1 = NewPosting::new(make_address(1), 1234567); - let new_posting_2 = NewPosting::new(make_address(2), 7654321); - - subject.manage_debug_log( - &logger, - AccountingMsgType::ServicesConsumed, - 1, - vec![new_posting_1, new_posting_2], - ); - - TestLogHandler::new().exists_no_log_containing(&format!("DEBUG: {test_name}:")); - } - #[test] fn display_loggable_stats() { let loggable_stats = LoggableStats { @@ -459,7 +413,7 @@ mod tests { log_window_in_pcs_of_msgs: 15, }; let expected_display = "\ - Account debits in last 15 RoutingServiceProvided messages (wei):\n\ + Total debits across last 15 RoutingServiceProvided messages (wei):\n\ 0x0000000000000000000001000000001000000001: 1234567\n\ 0x0000000000000000000002000000002000000002: 7654321"; assert_eq!(format!("{}", loggable_stats), expected_display); diff --git a/node/src/accountant/logging_utils/mod.rs b/node/src/accountant/logging_utils/mod.rs index 36b240cc4..a64df97d7 100644 --- a/node/src/accountant/logging_utils/mod.rs +++ b/node/src/accountant/logging_utils/mod.rs @@ -3,21 +3,42 @@ pub mod accounting_msgs_debug; pub mod msg_id_generator; -use crate::accountant::logging_utils::accounting_msgs_debug::AccountingMsgsDebugStats; +use crate::accountant::logging_utils::accounting_msgs_debug::AccountingMessageTracker; use crate::accountant::logging_utils::msg_id_generator::{ MessageIdGenerator, MessageIdGeneratorReal, }; +const ACCOUNTING_MSG_LOG_WINDOW: u16 = 50; + pub struct LoggingUtils { - pub accounting_msgs_stats: AccountingMsgsDebugStats, + pub debug_stats: AccountingMessageTracker, + pub accounting_msg_log_window: u16, pub msg_id_generator: Box, } impl Default for LoggingUtils { fn default() -> Self { Self { - accounting_msgs_stats: AccountingMsgsDebugStats::default(), + debug_stats: AccountingMessageTracker::default(), + accounting_msg_log_window: ACCOUNTING_MSG_LOG_WINDOW, msg_id_generator: Box::new(MessageIdGeneratorReal::default()), } } } + +#[cfg(test)] +mod tests { + use crate::accountant::logging_utils::{LoggingUtils, ACCOUNTING_MSG_LOG_WINDOW}; + + #[test] + fn constants_have_right_values() { + assert_eq!(ACCOUNTING_MSG_LOG_WINDOW, 50); + } + + #[test] + fn default_log_window() { + let subject = LoggingUtils::default(); + + assert_eq!(subject.accounting_msg_log_window, ACCOUNTING_MSG_LOG_WINDOW) + } +} diff --git a/node/src/accountant/logging_utils/msg_id_generator.rs b/node/src/accountant/logging_utils/msg_id_generator.rs index e121a37b8..03069b353 100644 --- a/node/src/accountant/logging_utils/msg_id_generator.rs +++ b/node/src/accountant/logging_utils/msg_id_generator.rs @@ -4,7 +4,8 @@ use crate::sub_lib::accountant::MSG_ID_INCREMENTER; use std::sync::atomic::Ordering; pub trait MessageIdGenerator { - fn id(&self) -> u32; + fn new_id(&self) -> u32; + fn last_used_id(&self) -> u32; as_any_ref_in_trait!(); } @@ -12,12 +13,20 @@ pub trait MessageIdGenerator { pub struct MessageIdGeneratorReal {} impl MessageIdGenerator for MessageIdGeneratorReal { - fn id(&self) -> u32 { + fn new_id(&self) -> u32 { MSG_ID_INCREMENTER.fetch_add(1, Ordering::Relaxed) } + fn last_used_id(&self) -> u32 { + MSG_ID_INCREMENTER.load(Ordering::Relaxed) - 1 + } as_any_ref_in_trait_impl!(); } +pub enum MsgIdRequested { + New, + LastUsed, +} + #[cfg(test)] mod tests { use crate::accountant::logging_utils::msg_id_generator::{ @@ -34,9 +43,9 @@ mod tests { let _guard = MSG_ID_GENERATOR_TEST_GUARD.lock().unwrap(); let subject = MessageIdGeneratorReal::default(); - let id1 = subject.id(); - let id2 = subject.id(); - let id3 = subject.id(); + let id1 = subject.new_id(); + let id2 = subject.new_id(); + let id3 = subject.new_id(); assert_eq!(id2, id1 + 1); assert_eq!(id3, id2 + 1) @@ -47,10 +56,26 @@ mod tests { let _guard = MSG_ID_GENERATOR_TEST_GUARD.lock().unwrap(); MSG_ID_INCREMENTER.store(u32::MAX, Ordering::Relaxed); let subject = MessageIdGeneratorReal::default(); - subject.id(); // This returns the previous value, not the newly incremented + // First call: gets u32::MAX; then increments the global counter to 0 (wraparound) + subject.new_id(); - let id = subject.id(); + let id = subject.new_id(); assert_eq!(id, 0) } + + #[test] + fn msg_id_generator_last_used_id() { + let _guard = MSG_ID_GENERATOR_TEST_GUARD.lock().unwrap(); + let subject = MessageIdGeneratorReal::default(); + let new_id = subject.new_id(); + + let same_id_1 = subject.last_used_id(); + let same_id_2 = subject.last_used_id(); + let new_id_2 = subject.new_id(); + + assert_eq!(new_id, same_id_1); + assert_eq!(new_id, same_id_2); + assert_eq!(new_id_2, same_id_2 + 1); + } } diff --git a/node/src/accountant/mod.rs b/node/src/accountant/mod.rs index 931e610ed..9c2a8c849 100644 --- a/node/src/accountant/mod.rs +++ b/node/src/accountant/mod.rs @@ -3,7 +3,7 @@ pub mod db_access_objects; pub mod db_big_integer; pub mod financials; -mod logging_utils; +pub mod logging_utils; pub mod payment_adjuster; pub mod scanners; @@ -24,8 +24,9 @@ use crate::accountant::financials::visibility_restricted_module::{ check_query_is_within_tech_limits, financials_entry_check, }; use crate::accountant::logging_utils::accounting_msgs_debug::{ - AccountingMsgType, NewPosting, NewPostingsDebugContainer, + AccountingMsgType, NewCharge, NewChargessDebugContainer, }; +use crate::accountant::logging_utils::msg_id_generator::MsgIdRequested; use crate::accountant::logging_utils::LoggingUtils; use crate::accountant::scanners::payable_scanner::msgs::{ InitialTemplatesMessage, PricedTemplatesMessage, @@ -47,11 +48,14 @@ use crate::blockchain::blockchain_interface::data_structures::{ use crate::blockchain::errors::rpc_errors::AppRpcError; use crate::bootstrapper::BootstrapperConfig; use crate::database::db_initializer::DbInitializationConfig; -use crate::sub_lib::accountant::DaoFactories; use crate::sub_lib::accountant::FinancialStatistics; use crate::sub_lib::accountant::ReportExitServiceProvidedMessage; use crate::sub_lib::accountant::ReportRoutingServiceProvidedMessage; use crate::sub_lib::accountant::ReportServicesConsumedMessage; +use crate::sub_lib::accountant::{ + AccountableServiceWithTraceLog, DaoFactories, MessageWithServicesProvided, + RoutingServiceConsumedTraceLogWrapper, ServiceProvided, +}; use crate::sub_lib::accountant::{AccountantSubs, DetailedScanType}; use crate::sub_lib::blockchain_bridge::OutboundPaymentsInstructions; use crate::sub_lib::neighborhood::{ConfigChange, ConfigChangeMsg}; @@ -90,7 +94,6 @@ use std::time::SystemTime; pub const CRASH_KEY: &str = "ACCOUNTANT"; pub const DEFAULT_PENDING_TOO_LONG_SEC: u64 = 21_600; //6 hours -const DEFAULT_ACCOUNTING_MSG_LOG_WINDOW: u16 = 50; pub struct Accountant { consuming_wallet_opt: Option, @@ -484,7 +487,7 @@ impl Handler for Accountant { msg: ReportRoutingServiceProvidedMessage, _ctx: &mut Self::Context, ) -> Self::Result { - self.handle_report_routing_service_provided_message(msg); + self.handle_report_service_provided_message(msg); } } @@ -496,7 +499,7 @@ impl Handler for Accountant { msg: ReportExitServiceProvidedMessage, _ctx: &mut Self::Context, ) -> Self::Result { - self.handle_report_exit_service_provided_message(msg); + self.handle_report_service_provided_message(msg); } } @@ -622,39 +625,33 @@ impl Accountant { DaoFactoryReal::new(data_directory, DbInitializationConfig::panic_on_migration()) } - fn record_service_provided( - &self, - service_rate: u64, - byte_rate: u64, - timestamp: SystemTime, - payload_size: usize, - wallet: &Wallet, - ) -> u128 { - let byte_charge = byte_rate as u128 * (payload_size as u128); - let total_charge = service_rate as u128 + byte_charge; + fn record_service_provided(&self, service: &ServiceProvided) -> Option { + let byte_charge = service.byte_rate as u128 * (service.payload_size as u128); + let total_charge = service.service_rate as u128 + byte_charge; + let wallet = &service.paying_wallet; if !self.our_wallet(wallet) { match self.receivable_dao .as_ref() - .more_money_receivable(timestamp, wallet, total_charge) { + .more_money_receivable(service.timestamp, wallet, total_charge) { Ok(_) => (), Err(ReceivableDaoError::SignConversion(_)) => error!( self.logger, "Overflow error recording service provided for {}: service rate {}, byte rate {}, payload size {}. Skipping", wallet, - service_rate, - byte_rate, - payload_size + service.service_rate, + service.byte_rate, + service.payload_size ), Err(e) => panic!("Was recording services provided for {} but hit a fatal database error: {:?}", wallet, e) - }; - total_charge + } + Some(NewCharge::new(wallet.address(), total_charge)) } else { warning!( self.logger, "Declining to record a receivable against our wallet {} for services we provided", wallet ); - 0 + None } } @@ -665,7 +662,7 @@ impl Accountant { timestamp: SystemTime, payload_size: usize, wallet: &Wallet, - ) -> u128 { + ) -> Option { let byte_charge = byte_rate as u128 * (payload_size as u128); let total_charge = service_rate as u128 + byte_charge; if !self.our_wallet(wallet) { @@ -684,14 +681,14 @@ impl Accountant { ), Err(e) => panic!("Recording services consumed from {} but has hit fatal database error: {:?}", wallet, e) }; - total_charge + Some(NewCharge::new(wallet.address(), total_charge)) } else { warning!( self.logger, "Declining to record a payable against our wallet {} for service we provided", wallet ); - 0 + None } } @@ -745,84 +742,25 @@ impl Accountant { } } - fn handle_report_routing_service_provided_message( - &mut self, - msg: ReportRoutingServiceProvidedMessage, - ) { - let msg_id = self.msg_id(); - trace!( - self.logger, - "Msg {}: Charging routing of {} bytes to wallet {}", - msg_id, - msg.payload_size, - msg.paying_wallet - ); - - let sum = self.record_service_provided( - msg.service_rate, - msg.byte_rate, - msg.timestamp, - msg.payload_size, - &msg.paying_wallet, - ); - - self.logging_utils.accounting_msgs_stats.manage_debug_log( - &self.logger, - AccountingMsgType::RoutingServiceProvided, - DEFAULT_ACCOUNTING_MSG_LOG_WINDOW, - vec![NewPosting::new(msg.paying_wallet.address(), sum)], - ) - } + fn handle_report_service_provided_message(&mut self, msg: AccountingMessage) + where + AccountingMessage: MessageWithServicesProvided, + { + let services_provided = &msg.services_provided(); + let accounting_msg_type = msg.msg_type(); + let trace_log_wrapper = msg.trace_log_wrapper(); - fn handle_report_exit_service_provided_message( - &mut self, - msg: ReportExitServiceProvidedMessage, - ) { - self.logger.trace(||{ - let msg_id = self.msg_id(); - format!( - "Msg {}: Charging exit service for {} bytes to wallet {} at {} per service and {} per byte", - msg_id, - msg.payload_size, - msg.paying_wallet, - msg.service_rate, - msg.byte_rate - ) - }); - let sum = self.record_service_provided( - msg.service_rate, - msg.byte_rate, - msg.timestamp, - msg.payload_size, - &msg.paying_wallet, - ); + trace_log_wrapper.log_trace(self.logging_kit(MsgIdRequested::New)); - self.logging_utils.accounting_msgs_stats.manage_debug_log( - &self.logger, - AccountingMsgType::ExitServiceProvided, - DEFAULT_ACCOUNTING_MSG_LOG_WINDOW, - vec![NewPosting::new(msg.paying_wallet.address(), sum)], - ) - } + let charges = self.record_service_provided(services_provided); - fn msg_id(&self) -> u32 { - if self.logger.debug_enabled() { - self.logging_utils.msg_id_generator.id() - } else { - 0 - } + self.consider_logging_debug_stats(accounting_msg_type, charges.into_iter().collect_vec()); } fn handle_report_services_consumed_message(&mut self, msg: ReportServicesConsumedMessage) { - let msg_id = self.msg_id(); - trace!( - self.logger, - "Msg {}: Accruing debt to {} for consuming {} exited bytes", - msg_id, - msg.exit.earning_wallet, - msg.exit.payload_size - ); - let exit_sum = self.record_service_consumed( + msg.exit.log_trace(self.logging_kit(MsgIdRequested::New)); + + let exit_charge_opt = self.record_service_consumed( msg.exit.service_rate, msg.exit.byte_rate, msg.timestamp, @@ -830,48 +768,77 @@ impl Accountant { &msg.exit.earning_wallet, ); - let new_postings = NewPostingsDebugContainer::new(&self.logger) - .add(msg.exit.earning_wallet.address(), exit_sum); + let new_charges_container = + NewChargessDebugContainer::new(&self.logger).add_new_charge(exit_charge_opt); - let new_postings = self.handle_routing_services_consumed(msg, msg_id, new_postings); + let new_charges_container = + self.handle_routing_services_consumed(&msg, new_charges_container); - self.logging_utils.accounting_msgs_stats.manage_debug_log( - &self.logger, + self.consider_logging_debug_stats( AccountingMsgType::ServicesConsumed, - DEFAULT_ACCOUNTING_MSG_LOG_WINDOW, - new_postings.into(), - ) + new_charges_container.into(), + ); } fn handle_routing_services_consumed( &mut self, - msg: ReportServicesConsumedMessage, - msg_id: u32, - new_postings: NewPostingsDebugContainer, - ) -> NewPostingsDebugContainer { + msg: &ReportServicesConsumedMessage, + new_charges: NewChargessDebugContainer, + ) -> NewChargessDebugContainer { msg.routing + .services .iter() - .fold(new_postings, |new_postings, routing_service| { - trace!( - self.logger, - "Msg {}: Accruing debt to {} for consuming {} routed bytes", - msg_id, - routing_service.earning_wallet, - msg.routing_payload_size - ); + .fold(new_charges, |new_charges, routing_service| { + RoutingServiceConsumedTraceLogWrapper { + service: routing_service, + routing_payload_size: msg.routing.routing_payload_size, + } + .log_trace(self.logging_kit(MsgIdRequested::LastUsed)); - let sum = self.record_service_consumed( + let new_charge = self.record_service_consumed( routing_service.service_rate, routing_service.byte_rate, msg.timestamp, - msg.routing_payload_size, + msg.routing.routing_payload_size, &routing_service.earning_wallet, ); - new_postings.add(routing_service.earning_wallet.address(), sum) + new_charges.add_new_charge(new_charge) }) } + fn logging_kit(&self, msg_id_requested: MsgIdRequested) -> (&Logger, u32) { + ( + &self.logger, + match msg_id_requested { + MsgIdRequested::New => self.logging_utils.msg_id_generator.new_id(), + MsgIdRequested::LastUsed => self.logging_utils.msg_id_generator.last_used_id(), + }, + ) + } + + fn consider_logging_debug_stats( + &mut self, + msg_type: AccountingMsgType, + charges: Vec, + ) { + if charges.is_empty() { + return; + } + + let log_window_size = self.logging_utils.accounting_msg_log_window; + + if self.logger.debug_enabled() { + if let Some(loggable_stats) = self.logging_utils.debug_stats.process_debug_stats( + msg_type, + charges, + log_window_size, + ) { + debug!(self.logger, "{}", loggable_stats); + } + } + } + fn handle_payable_payment_setup(&mut self, msg: PricedTemplatesMessage) { let blockchain_bridge_instructions = match self .scanners @@ -1388,9 +1355,9 @@ mod tests { bc_from_earning_wallet, bc_from_wallets, make_payable_account, make_qualified_and_unqualified_payables, make_transaction_block, BannedDaoFactoryMock, ConfigDaoFactoryMock, DaoWithDestination, FailedPayableDaoFactoryMock, - FailedPayableDaoMock, MessageIdGeneratorMock, PayableDaoFactoryMock, PayableDaoMock, - PaymentAdjusterMock, PendingPayableScannerBuilder, ReceivableDaoFactoryMock, - ReceivableDaoMock, SentPayableDaoFactoryMock, SentPayableDaoMock, + FailedPayableDaoMock, PayableDaoFactoryMock, PayableDaoMock, PaymentAdjusterMock, + PendingPayableScannerBuilder, ReceivableDaoFactoryMock, ReceivableDaoMock, + SentPayableDaoFactoryMock, SentPayableDaoMock, }; use crate::accountant::test_utils::{AccountantBuilder, BannedDaoMock}; use crate::accountant::Accountant; @@ -1400,14 +1367,14 @@ mod tests { }; use crate::blockchain::errors::rpc_errors::RemoteError; use crate::blockchain::errors::validation_status::ValidationStatus; - use crate::blockchain::test_utils::make_tx_hash; + use crate::blockchain::test_utils::{make_address, make_tx_hash}; use crate::database::rusqlite_wrappers::TransactionSafeWrapper; use crate::database::test_utils::transaction_wrapper_mock::TransactionInnerWrapperMockBuilder; use crate::db_config::config_dao::ConfigDaoRecord; use crate::db_config::mocks::ConfigDaoMock; use crate::sub_lib::accountant::{ - ExitServiceConsumed, PaymentThresholds, RoutingServiceConsumed, ScanIntervals, - DEFAULT_EARNING_WALLET, DEFAULT_PAYMENT_THRESHOLDS, + ExitServiceConsumed, PaymentThresholds, RoutingServiceConsumed, RoutingServicesConsumed, + ScanIntervals, DEFAULT_EARNING_WALLET, DEFAULT_PAYMENT_THRESHOLDS, }; use crate::sub_lib::blockchain_bridge::OutboundPaymentsInstructions; use crate::sub_lib::neighborhood::ConfigChange; @@ -1471,7 +1438,6 @@ mod tests { fn constants_have_correct_values() { assert_eq!(CRASH_KEY, "ACCOUNTANT"); assert_eq!(DEFAULT_PENDING_TOO_LONG_SEC, 21_600); - assert_eq!(DEFAULT_ACCOUNTING_MSG_LOG_WINDOW, 50); } #[test] @@ -4522,6 +4488,7 @@ mod tests { #[test] fn report_routing_service_provided_message_is_received() { init_test_logging(); + let test_name = "report_routing_service_provided_message_is_received"; let now = SystemTime::now(); let bootstrapper_config = bc_from_earning_wallet(make_wallet("hi")); let more_money_receivable_parameters_arc = Arc::new(Mutex::new(vec![])); @@ -4529,11 +4496,13 @@ mod tests { let receivable_dao_mock = ReceivableDaoMock::new() .more_money_receivable_parameters(&more_money_receivable_parameters_arc) .more_money_receivable_result(Ok(())); - let subject = AccountantBuilder::default() + let mut subject = AccountantBuilder::default() .bootstrapper_config(bootstrapper_config) + .logger(Logger::new(test_name)) .payable_daos(vec![ForAccountantBody(payable_dao_mock)]) .receivable_daos(vec![ForAccountantBody(receivable_dao_mock)]) .build(); + subject.logging_utils.accounting_msg_log_window = 1; let system = System::new("report_routing_service_message_is_received"); let subject_addr: Addr = subject.start(); subject_addr @@ -4545,11 +4514,13 @@ mod tests { let paying_wallet = make_wallet("booga"); subject_addr .try_send(ReportRoutingServiceProvidedMessage { - timestamp: now, - paying_wallet: paying_wallet.clone(), - payload_size: 1234, - service_rate: 42, - byte_rate: 24, + service: ServiceProvided { + timestamp: now, + paying_wallet: paying_wallet.clone(), + payload_size: 1234, + service_rate: 42, + byte_rate: 24, + }, }) .unwrap(); @@ -4560,6 +4531,8 @@ mod tests { more_money_receivable_parameters[0], (now, make_wallet("booga"), (1 * 42) + (1234 * 24)) ); + TestLogHandler::new() + .exists_log_containing(&format!("DEBUG: {test_name}: Total debits across last ")); } #[test] @@ -4586,11 +4559,13 @@ mod tests { subject_addr .try_send(ReportRoutingServiceProvidedMessage { - timestamp: SystemTime::now(), - paying_wallet: consuming_wallet.clone(), - payload_size: 1234, - service_rate: 42, - byte_rate: 24, + service: ServiceProvided { + timestamp: SystemTime::now(), + paying_wallet: consuming_wallet.clone(), + payload_size: 1234, + service_rate: 42, + byte_rate: 24, + }, }) .unwrap(); @@ -4631,11 +4606,13 @@ mod tests { subject_addr .try_send(ReportRoutingServiceProvidedMessage { - timestamp: SystemTime::now(), - paying_wallet: earning_wallet.clone(), - payload_size: 1234, - service_rate: 42, - byte_rate: 24, + service: ServiceProvided { + timestamp: SystemTime::now(), + paying_wallet: earning_wallet.clone(), + payload_size: 1234, + service_rate: 42, + byte_rate: 24, + }, }) .unwrap(); @@ -4655,6 +4632,7 @@ mod tests { #[test] fn report_exit_service_provided_message_is_received() { init_test_logging(); + let test_name = "report_exit_service_provided_message_is_received"; let now = SystemTime::now(); let config = bc_from_earning_wallet(make_wallet("hi")); let more_money_receivable_parameters_arc = Arc::new(Mutex::new(vec![])); @@ -4662,11 +4640,13 @@ mod tests { let receivable_dao_mock = ReceivableDaoMock::new() .more_money_receivable_parameters(&more_money_receivable_parameters_arc) .more_money_receivable_result(Ok(())); - let subject = AccountantBuilder::default() + let mut subject = AccountantBuilder::default() .bootstrapper_config(config) + .logger(Logger::new(test_name)) .payable_daos(vec![ForAccountantBody(payable_dao_mock)]) .receivable_daos(vec![ForAccountantBody(receivable_dao_mock)]) .build(); + subject.logging_utils.accounting_msg_log_window = 1; let system = System::new("report_exit_service_provided_message_is_received"); let subject_addr: Addr = subject.start(); subject_addr @@ -4678,11 +4658,13 @@ mod tests { let paying_wallet = make_wallet("booga"); subject_addr .try_send(ReportExitServiceProvidedMessage { - timestamp: now, - paying_wallet: paying_wallet.clone(), - payload_size: 1234, - service_rate: 42, - byte_rate: 24, + service: ServiceProvided { + timestamp: now, + paying_wallet: paying_wallet.clone(), + payload_size: 1234, + service_rate: 42, + byte_rate: 24, + }, }) .unwrap(); @@ -4693,6 +4675,8 @@ mod tests { more_money_receivable_parameters[0], (now, make_wallet("booga"), (1 * 42) + (1234 * 24)) ); + TestLogHandler::new() + .exists_log_containing(&format!("DEBUG: {test_name}: Total debits across last ")); } #[test] @@ -4719,11 +4703,13 @@ mod tests { subject_addr .try_send(ReportExitServiceProvidedMessage { - timestamp: SystemTime::now(), - paying_wallet: consuming_wallet.clone(), - payload_size: 1234, - service_rate: 42, - byte_rate: 24, + service: ServiceProvided { + timestamp: SystemTime::now(), + paying_wallet: consuming_wallet.clone(), + payload_size: 1234, + service_rate: 42, + byte_rate: 24, + }, }) .unwrap(); @@ -4764,11 +4750,13 @@ mod tests { subject_addr .try_send(ReportExitServiceProvidedMessage { - timestamp: SystemTime::now(), - paying_wallet: earning_wallet.clone(), - payload_size: 1234, - service_rate: 42, - byte_rate: 24, + service: ServiceProvided { + timestamp: SystemTime::now(), + paying_wallet: earning_wallet.clone(), + payload_size: 1234, + service_rate: 42, + byte_rate: 24, + }, }) .unwrap(); @@ -4788,6 +4776,7 @@ mod tests { #[test] fn report_services_consumed_message_is_received() { init_test_logging(); + let test_name = "report_services_consumed_message_is_received"; let config = make_bc_with_defaults(TEST_DEFAULT_CHAIN); let more_money_payable_params_arc = Arc::new(Mutex::new(vec![])); let payable_dao_mock = PayableDaoMock::new() @@ -4797,10 +4786,10 @@ mod tests { .more_money_payable_result(Ok(())); let mut subject = AccountantBuilder::default() .bootstrapper_config(config) + .logger(Logger::new(test_name)) .payable_daos(vec![ForAccountantBody(payable_dao_mock)]) .build(); - subject.logging_utils.msg_id_generator = - Box::new(MessageIdGeneratorMock::default().id_result(123)); + subject.logging_utils.accounting_msg_log_window = 1; let system = System::new("report_services_consumed_message_is_received"); let subject_addr: Addr = subject.start(); subject_addr @@ -4822,19 +4811,21 @@ mod tests { service_rate: 120, byte_rate: 30, }, - routing_payload_size: 3456, - routing: vec![ - RoutingServiceConsumed { - earning_wallet: earning_wallet_routing_1.clone(), - service_rate: 42, - byte_rate: 24, - }, - RoutingServiceConsumed { - earning_wallet: earning_wallet_routing_2.clone(), - service_rate: 52, - byte_rate: 33, - }, - ], + routing: RoutingServicesConsumed { + routing_payload_size: 3456, + services: vec![ + RoutingServiceConsumed { + earning_wallet: earning_wallet_routing_1.clone(), + service_rate: 42, + byte_rate: 24, + }, + RoutingServiceConsumed { + earning_wallet: earning_wallet_routing_2.clone(), + service_rate: 52, + byte_rate: 33, + }, + ], + }, }) .unwrap(); @@ -4860,6 +4851,8 @@ mod tests { ) ] ); + TestLogHandler::new() + .exists_log_containing(&format!("DEBUG: {test_name}: Total debits across last ")); } fn assert_that_we_do_not_charge_our_own_wallet_for_consumed_services( @@ -4905,12 +4898,14 @@ mod tests { service_rate: 45, byte_rate: 10, }, - routing_payload_size: 3333, - routing: vec![RoutingServiceConsumed { - earning_wallet: consuming_wallet.clone(), - service_rate: 42, - byte_rate: 6, - }], + routing: RoutingServicesConsumed { + routing_payload_size: 3333, + services: vec![RoutingServiceConsumed { + earning_wallet: consuming_wallet.clone(), + service_rate: 42, + byte_rate: 6, + }], + }, }; let more_money_payable_params_arc = @@ -4947,12 +4942,14 @@ mod tests { service_rate: 45, byte_rate: 10, }, - routing_payload_size: 3333, - routing: vec![RoutingServiceConsumed { - earning_wallet: earning_wallet.clone(), - service_rate: 42, - byte_rate: 6, - }], + routing: RoutingServicesConsumed { + routing_payload_size: 3333, + services: vec![RoutingServiceConsumed { + earning_wallet: earning_wallet.clone(), + service_rate: 42, + byte_rate: 6, + }], + }, }; let more_money_payable_params_arc = @@ -4987,8 +4984,10 @@ mod tests { service_rate: 42, byte_rate: 24, }, - routing_payload_size: 3333, - routing: vec![], + routing: RoutingServicesConsumed { + routing_payload_size: 3333, + services: vec![], + }, }; let more_money_payable_params_arc = @@ -5017,8 +5016,10 @@ mod tests { service_rate: 42, byte_rate: 24, }, - routing_payload_size: 3333, - routing: vec![], + routing: RoutingServicesConsumed { + routing_payload_size: 333, + services: vec![], + }, }; let more_money_payable_params_arc = @@ -5050,8 +5051,15 @@ mod tests { let subject = AccountantBuilder::default() .receivable_daos(vec![ForAccountantBody(receivable_dao)]) .build(); + let services_provided = ServiceProvided { + timestamp: SystemTime::now(), + paying_wallet: wallet, + payload_size: 123456, + service_rate: 2, + byte_rate: 1, + }; - let _ = subject.record_service_provided(i64::MAX as u64, 1, SystemTime::now(), 2, &wallet); + let _ = subject.record_service_provided(&services_provided); } #[test] @@ -5063,8 +5071,15 @@ mod tests { let subject = AccountantBuilder::default() .receivable_daos(vec![ForAccountantBody(receivable_dao)]) .build(); + let service_provided = ServiceProvided { + timestamp: SystemTime::now(), + paying_wallet: wallet.clone(), + payload_size: 2, + service_rate: i64::MAX as u64, + byte_rate: 1, + }; - subject.record_service_provided(i64::MAX as u64, 1, SystemTime::now(), 2, &wallet); + subject.record_service_provided(&service_provided); TestLogHandler::new().exists_log_containing(&format!( "ERROR: Accountant: Overflow error recording service provided for {}: service rate {}, byte rate 1, payload size 2. Skipping", @@ -6855,28 +6870,6 @@ mod tests { subject.compute_financials(&request, context_id_expected); } - #[test] - #[cfg(not(feature = "no_test_share"))] - fn msg_id_generates_numbers_only_if_debug_log_enabled() { - let mut logger1 = Logger::new("msg_id_generator_off"); - logger1.set_level_for_test(Level::Info); - let mut subject = AccountantBuilder::default().build(); - let msg_id_generator = MessageIdGeneratorMock::default().id_result(789); // We prepared a result just for one call - subject.logging_utils.msg_id_generator = Box::new(msg_id_generator); - subject.logger = logger1; - - let id1 = subject.msg_id(); - - let mut logger2 = Logger::new("msg_id_generator_on"); - logger2.set_level_for_test(Level::Debug); - subject.logger = logger2; - - let id2 = subject.msg_id(); - - assert_eq!(id1, 0); - assert_eq!(id2, 789); - } - #[test] fn unsigned_to_signed_handles_zero() { let result = sign_conversion::(0); @@ -7034,6 +7027,107 @@ mod tests { assert_on_initialization_with_panic_on_migration(&data_dir, &act); } + #[test] + fn accounts_stats_are_logged_only_if_debug_enabled() { + init_test_logging(); + let test_name = "accounts_stats_are_logged_only_if_debug_enabled"; + let mut logger = Logger::new(test_name); + logger.set_level_for_test(Level::Debug); + let mut subject = AccountantBuilder::default().logger(logger).build(); + subject.logging_utils.accounting_msg_log_window = 1; + let new_charge_1 = NewCharge::new(make_address(1), 1234567); + let new_charge_2 = NewCharge::new(make_address(2), 7654321); + + subject.consider_logging_debug_stats( + AccountingMsgType::ServicesConsumed, + vec![new_charge_1, new_charge_2], + ); + + TestLogHandler::new() + .exists_log_containing(&format!("DEBUG: {test_name}: Total debits across last")); + } + + #[test] + fn accounts_stats_are_not_logged_if_debug_is_not_enabled() { + init_test_logging(); + let test_name = "accounts_stats_are_not_logged_if_debug_is_not_enabled"; + let mut logger = Logger::new("test"); + logger.set_level_for_test(Level::Info); + let mut subject = AccountantBuilder::default().logger(logger).build(); + subject.logging_utils.accounting_msg_log_window = 1; + let new_charge_1 = NewCharge::new(make_address(1), 1234567); + let new_charge_2 = NewCharge::new(make_address(2), 7654321); + + subject.consider_logging_debug_stats( + AccountingMsgType::ServicesConsumed, + vec![new_charge_1, new_charge_2], + ); + + TestLogHandler::new().exists_no_log_containing(&format!("DEBUG: {test_name}:")); + } + + #[test] + fn accounts_stats_are_not_produced_for_empty_charges() { + init_test_logging(); + let test_name = "accounts_stats_are_not_produced_for_empty_charges"; + let mut logger = Logger::new("test"); + logger.set_level_for_test(Level::Debug); + let mut subject = AccountantBuilder::default().logger(logger).build(); + subject.logging_utils.accounting_msg_log_window = 1; + + subject.consider_logging_debug_stats(AccountingMsgType::ServicesConsumed, vec![]); + + TestLogHandler::new().exists_no_log_containing(&format!("DEBUG: {test_name}:")); + } + + #[test] + fn logging_kit_works() { + let test_name = "logging_kit_works"; + let subject = AccountantBuilder::default() + .logger(Logger::new(test_name)) + .build(); + + let (logger_1, id_1) = subject.logging_kit(MsgIdRequested::New); + let (logger_2, id_2) = subject.logging_kit(MsgIdRequested::New); + let (logger_3, id_3) = subject.logging_kit(MsgIdRequested::LastUsed); + let (logger_4, id_4) = subject.logging_kit(MsgIdRequested::New); + + assert_ne!(id_1, id_2); + assert_eq!(id_2, id_3); + assert_ne!(id_3, id_4); + let test_log_handler = TestLogHandler::new(); + vec![logger_1, logger_2, logger_3, logger_4] + .into_iter() + .enumerate() + .for_each(|(idx, logger)| { + debug!(logger, "idx: {idx}"); + test_log_handler.exists_log_containing(&format!("DEBUG: {test_name}: idx: {idx}")); + }); + } + + #[test] + fn join_with_separator_works() { + // With a Vec + let vec = vec![1, 2, 3]; + let result_vec = join_with_separator(vec, |&num| num.to_string(), ", "); + assert_eq!(result_vec, "1, 2, 3".to_string()); + + // With a HashSet + let set = BTreeSet::from([1, 2, 3]); + let result_set = join_with_separator(set, |&num| num.to_string(), ", "); + assert_eq!(result_set, "1, 2, 3".to_string()); + + // With a slice + let slice = &[1, 2, 3]; + let result_slice = join_with_separator(slice.to_vec(), |&num| num.to_string(), ", "); + assert_eq!(result_slice, "1, 2, 3".to_string()); + + // With an array + let array = [1, 2, 3]; + let result_array = join_with_separator(array.to_vec(), |&num| num.to_string(), ", "); + assert_eq!(result_array, "1, 2, 3".to_string()); + } + fn bind_ui_gateway_unasserted(accountant: &mut Accountant) { accountant.ui_message_sub_opt = Some(make_recorder().0.start().recipient()); } @@ -7057,7 +7151,6 @@ pub mod exportable_test_parts { check_if_source_code_is_attached, ensure_node_home_directory_exists, ShouldWeRunTheTest, }; use regex::Regex; - use std::collections::BTreeSet; use std::env::current_dir; use std::fs::File; use std::io::{BufRead, BufReader}; @@ -7234,27 +7327,4 @@ pub mod exportable_test_parts { // We didn't blow up, it recognized the functions. // This is an example of the error: "no such function: slope_drop_high_bytes" } - - #[test] - fn join_with_separator_works() { - // With a Vec - let vec = vec![1, 2, 3]; - let result_vec = join_with_separator(vec, |&num| num.to_string(), ", "); - assert_eq!(result_vec, "1, 2, 3".to_string()); - - // With a HashSet - let set = BTreeSet::from([1, 2, 3]); - let result_set = join_with_separator(set, |&num| num.to_string(), ", "); - assert_eq!(result_set, "1, 2, 3".to_string()); - - // With a slice - let slice = &[1, 2, 3]; - let result_slice = join_with_separator(slice.to_vec(), |&num| num.to_string(), ", "); - assert_eq!(result_slice, "1, 2, 3".to_string()); - - // With an array - let array = [1, 2, 3]; - let result_array = join_with_separator(array.to_vec(), |&num| num.to_string(), ", "); - assert_eq!(result_array, "1, 2, 3".to_string()); - } } diff --git a/node/src/accountant/scanners/mod.rs b/node/src/accountant/scanners/mod.rs index e8980d6cd..4c1747e01 100644 --- a/node/src/accountant/scanners/mod.rs +++ b/node/src/accountant/scanners/mod.rs @@ -142,9 +142,9 @@ impl Scanners { ) -> Result { if let Some(started_at) = self.payable.scan_started_at() { unreachable!( - "Guards are applied to ensure that none of the payable scanners may run \ - if the pending payable has not finished the monitoring of pending txs. \ - Still, another payable scan intruded at {} and is still running at {}", + "Guards are implemented to ensure that none of the payable scanners can operate \ + if the pending payable has not completed monitoring the pending transactions. \ + However, another payable scan interrupted at {} and is still running at {}.", StartScanError::timestamp_as_string(started_at), StartScanError::timestamp_as_string(SystemTime::now()) ) @@ -795,11 +795,11 @@ mod tests { false ); let dumped_records = pending_payable_scanner - .supposed_failed_payables + .suspected_failed_payables .dump_cache(); assert!( dumped_records.is_empty(), - "There should be no supposed failures but found {:?}.", + "There should be no suspected failures but found {:?}.", dumped_records ); assert_eq!( @@ -1026,10 +1026,10 @@ mod tests { let after = SystemTime::now(); let panic_msg = caught_panic.downcast_ref::().unwrap(); - let expected_needle_1 = "internal error: entered unreachable code: \ - Guards are applied to ensure that none of the payable scanners may run if the pending \ - payable has not finished the monitoring of pending txs. Still, another payable scan \ - intruded at"; + let expected_needle_1 = "internal error: entered unreachable code: Guards \ + are implemented to ensure that none of the payable scanners can operate if the pending \ + payable has not completed monitoring the pending transactions. However, another payable \ + scan interrupted at"; assert!( panic_msg.contains(expected_needle_1), "We looked for \"{}\" but the actual string doesn't contain it: {}", @@ -1202,7 +1202,7 @@ mod tests { TestLogHandler::new().assert_logs_match_in_order(vec![ &format!("INFO: {test_name}: Scanning for pending payable"), &format!( - "DEBUG: {test_name}: Collected 1 pending payables and 1 supposed failures \ + "DEBUG: {test_name}: Collected 1 pending payables and 1 suspected failures \ for the receipt check" ), ]) diff --git a/node/src/accountant/scanners/pending_payable_scanner/mod.rs b/node/src/accountant/scanners/pending_payable_scanner/mod.rs index a0ecfdda9..78fffbe8d 100644 --- a/node/src/accountant/scanners/pending_payable_scanner/mod.rs +++ b/node/src/accountant/scanners/pending_payable_scanner/mod.rs @@ -62,7 +62,7 @@ pub struct PendingPayableScanner { pub failed_payable_dao: Box, pub financial_statistics: Rc>, pub current_sent_payables: Box>, - pub supposed_failed_payables: Box>, + pub suspected_failed_payables: Box>, pub clock: Box, } @@ -81,7 +81,7 @@ impl impl CachesEmptiableScanner for PendingPayableScanner { fn empty_caches(&mut self, logger: &Logger) { self.current_sent_payables.ensure_empty_cache(logger); - self.supposed_failed_payables.ensure_empty_cache(logger); + self.suspected_failed_payables.ensure_empty_cache(logger); } } @@ -100,7 +100,7 @@ impl PendingPayableScanner { failed_payable_dao, financial_statistics, current_sent_payables: Box::new(CurrentPendingPayables::default()), - supposed_failed_payables: Box::new(RecheckRequiringFailures::default()), + suspected_failed_payables: Box::new(RecheckRequiringFailures::default()), clock: Box::new(SimpleClockReal::default()), } } @@ -109,7 +109,7 @@ impl PendingPayableScanner { debug!(logger, "Harvesting sent_payable and failed_payable tables"); let pending_tx_hashes = self.harvest_pending_payables(); - let failure_hashes = self.harvest_supposed_failures(); + let failure_hashes = self.harvest_suspected_failures(); if Self::is_there_nothing_to_process(&pending_tx_hashes, &failure_hashes) { return Err(StartScanError::NothingToProcess); @@ -139,7 +139,7 @@ impl PendingPayableScanner { pending_tx_hashes } - fn harvest_supposed_failures(&mut self) -> Vec { + fn harvest_suspected_failures(&mut self) -> Vec { let failures = self .failed_payable_dao .retrieve_txs(Some(FailureRetrieveCondition::EveryRecheckRequiredRecord)) @@ -151,7 +151,7 @@ impl PendingPayableScanner { } let failure_hashes = Self::wrap_hashes(&failures, TxHashByTable::FailedPayable); - self.supposed_failed_payables.load_cache(failures); + self.suspected_failed_payables.load_cache(failures); failure_hashes } @@ -263,7 +263,7 @@ impl PendingPayableScanner { }; self.current_sent_payables.ensure_empty_cache(logger); - self.supposed_failed_payables.ensure_empty_cache(logger); + self.suspected_failed_payables.ensure_empty_cache(logger); cases } @@ -288,7 +288,7 @@ impl PendingPayableScanner { } } TxHashByTable::FailedPayable(tx_hash) => { - match self.supposed_failed_payables.get_record_by_hash(tx_hash) { + match self.suspected_failed_payables.get_record_by_hash(tx_hash) { Some(failed_tx) => { cases.push(TxCaseToBeInterpreted::new( TxByTable::FailedPayable(failed_tx), @@ -313,10 +313,10 @@ impl PendingPayableScanner { panic!( "Looking up '{:?}' in the cache, the record could not be found. Dumping \ - the remaining values. Pending payables: {:?}. Supposed failures: {:?}.", + the remaining values. Pending payables: {:?}. Suspected failures: {:?}.", missing_entry, rearrange(self.current_sent_payables.dump_cache()), - rearrange(self.supposed_failed_payables.dump_cache()), + rearrange(self.suspected_failed_payables.dump_cache()), ) } @@ -563,7 +563,7 @@ impl PendingPayableScanner { }); self.add_new_failures(grouped_failures.new_failures, logger); - self.finalize_supposed_failures(grouped_failures.rechecks_completed, logger); + self.finalize_suspected_failures(grouped_failures.rechecks_completed, logger); } fn add_new_failures(&self, new_failures: Vec, logger: &Logger) { @@ -615,7 +615,7 @@ impl PendingPayableScanner { } } - fn finalize_supposed_failures(&self, rechecks_completed: Vec, logger: &Logger) { + fn finalize_suspected_failures(&self, rechecks_completed: Vec, logger: &Logger) { fn prepare_hashmap(rechecks_completed: &[TxHash]) -> HashMap { rechecks_completed .iter() @@ -784,7 +784,7 @@ impl PendingPayableScanner { ) { debug!( logger, - "Collected {} pending payables and {} supposed failures for the receipt check", + "Collected {} pending payables and {} suspected failures for the receipt check", pending_tx_hashes.len(), failure_hashes.len() ); @@ -859,7 +859,7 @@ mod tests { .build(); let logger = Logger::new("start_scan_fills_in_caches_and_returns_msg"); let pending_payable_cache_before = subject.current_sent_payables.dump_cache(); - let failed_payable_cache_before = subject.supposed_failed_payables.dump_cache(); + let failed_payable_cache_before = subject.suspected_failed_payables.dump_cache(); let result = subject.start_scan(&make_wallet("blah"), SystemTime::now(), None, &logger); @@ -886,7 +886,7 @@ mod tests { failed_payable_cache_before ); let pending_payable_cache_after = subject.current_sent_payables.dump_cache(); - let failed_payable_cache_after = subject.supposed_failed_payables.dump_cache(); + let failed_payable_cache_after = subject.suspected_failed_payables.dump_cache(); assert_eq!( pending_payable_cache_after, hashmap!(sent_tx_hash_1 => sent_tx_1, sent_tx_hash_2 => sent_tx_2) @@ -994,7 +994,7 @@ mod tests { failed_payable_cache.load_cache(vec![failed_tx_1, failed_tx_2]); let mut subject = PendingPayableScannerBuilder::new().build(); subject.current_sent_payables = Box::new(pending_payable_cache); - subject.supposed_failed_payables = Box::new(failed_payable_cache); + subject.suspected_failed_payables = Box::new(failed_payable_cache); let logger = Logger::new("test"); let msg = TxReceiptsMessage { results: btreemap![TxHashByTable::SentPayable(sent_tx_hash_1) => Ok( @@ -1015,7 +1015,7 @@ mod tests { values. Pending payables: [SentTx { hash: 0x0000000000000000000000000000000000000000000000\ 000000000000000890, receiver_address: 0x00000000000000000001c80000001c80000001c8, \ amount_minor: 43237380096, timestamp: 29942784, gas_price_minor: 94818816, nonce: 456, \ - status: Pending(Waiting) }]. Supposed failures: []."; + status: Pending(Waiting) }]. Suspected failures: []."; assert_eq!(panic_msg, expected); } @@ -1034,7 +1034,7 @@ mod tests { failed_payable_cache.load_cache(vec![failed_tx_1]); let mut subject = PendingPayableScannerBuilder::new().build(); subject.current_sent_payables = Box::new(pending_payable_cache); - subject.supposed_failed_payables = Box::new(failed_payable_cache); + subject.suspected_failed_payables = Box::new(failed_payable_cache); let logger = Logger::new("test"); let msg = TxReceiptsMessage { results: btreemap![TxHashByTable::SentPayable(sent_tx_hash_1) => Ok(StatusReadFromReceiptCheck::Pending), @@ -1057,7 +1057,7 @@ mod tests { Pending(Waiting) }, SentTx { hash: 0x0000000000000000000000000000000000000000000000000000000\ 000000315, receiver_address: 0x0000000000000000000315000000315000000315, amount_minor: \ 387532395441, timestamp: 89643024, gas_price_minor: 491169069, nonce: 789, status: \ - Pending(Waiting) }]. Supposed failures: []."; + Pending(Waiting) }]. Suspected failures: []."; assert_eq!(panic_msg, expected); } diff --git a/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs b/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs index ad1cbaf29..cb99f977e 100644 --- a/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs +++ b/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs @@ -186,7 +186,7 @@ impl TxReceiptInterpreter { failed_tx.reason, ); - scan_report.register_finalization_of_supposed_failure(failed_tx.hash); + scan_report.register_finalization_of_suspected_failure(failed_tx.hash); } } scan_report @@ -473,10 +473,10 @@ mod tests { } #[test] - fn interprets_tx_receipt_for_supposedly_failed_tx_if_the_tx_keeps_pending() { + fn interprets_tx_receipt_for_suspectedly_failed_tx_if_the_tx_keeps_pending() { init_test_logging(); let retrieve_txs_params_arc = Arc::new(Mutex::new(vec![])); - let test_name = "interprets_tx_receipt_for_supposedly_failed_tx_if_the_tx_keeps_pending"; + let test_name = "interprets_tx_receipt_for_suspectedly_failed_tx_if_the_tx_keeps_pending"; let mut newer_sent_tx_for_older_failed_tx = make_sent_tx(2244); newer_sent_tx_for_older_failed_tx.hash = make_tx_hash(0x7c6); let sent_payable_dao = SentPayableDaoMock::new() diff --git a/node/src/accountant/scanners/pending_payable_scanner/utils.rs b/node/src/accountant/scanners/pending_payable_scanner/utils.rs index e012d4579..972d8944c 100644 --- a/node/src/accountant/scanners/pending_payable_scanner/utils.rs +++ b/node/src/accountant/scanners/pending_payable_scanner/utils.rs @@ -51,7 +51,7 @@ impl ReceiptScanReport { .push(PresortedTxFailure::NewEntry(failed_tx)); } - pub(super) fn register_finalization_of_supposed_failure(&mut self, tx_hash: TxHash) { + pub(super) fn register_finalization_of_suspected_failure(&mut self, tx_hash: TxHash) { self.failures .tx_failures .push(PresortedTxFailure::RecheckCompleted(tx_hash)); diff --git a/node/src/accountant/test_utils.rs b/node/src/accountant/test_utils.rs index dfb2f03ed..8b9611910 100644 --- a/node/src/accountant/test_utils.rs +++ b/node/src/accountant/test_utils.rs @@ -1282,7 +1282,7 @@ pub struct PendingPayableScannerBuilder { payment_thresholds: PaymentThresholds, financial_statistics: FinancialStatistics, current_sent_payables: Box>, - supposed_failed_payables: Box>, + suspected_failed_payables: Box>, clock: Box, } @@ -1295,7 +1295,7 @@ impl PendingPayableScannerBuilder { payment_thresholds: PaymentThresholds::default(), financial_statistics: FinancialStatistics::default(), current_sent_payables: Box::new(PendingPayableCacheMock::default()), - supposed_failed_payables: Box::new(PendingPayableCacheMock::default()), + suspected_failed_payables: Box::new(PendingPayableCacheMock::default()), clock: Box::new(SimpleClockMock::default()), } } @@ -1324,7 +1324,7 @@ impl PendingPayableScannerBuilder { mut self, failures: Box>, ) -> Self { - self.supposed_failed_payables = failures; + self.suspected_failed_payables = failures; self } @@ -1342,7 +1342,7 @@ impl PendingPayableScannerBuilder { Rc::new(RefCell::new(self.financial_statistics)), ); scanner.current_sent_payables = self.current_sent_payables; - scanner.supposed_failed_payables = self.supposed_failed_payables; + scanner.suspected_failed_payables = self.suspected_failed_payables; scanner.clock = self.clock; scanner } @@ -1475,9 +1475,13 @@ pub struct MessageIdGeneratorMock { } impl MessageIdGenerator for MessageIdGeneratorMock { - fn id(&self) -> u32 { + fn new_id(&self) -> u32 { self.ids.borrow_mut().remove(0) } + + fn last_used_id(&self) -> u32 { + todo!() + } } impl MessageIdGeneratorMock { diff --git a/node/src/hopper/routing_service.rs b/node/src/hopper/routing_service.rs index 1ad3ba578..b61158f76 100644 --- a/node/src/hopper/routing_service.rs +++ b/node/src/hopper/routing_service.rs @@ -3,7 +3,7 @@ use super::live_cores_package::LiveCoresPackage; use crate::blockchain::payer::Payer; use crate::bootstrapper::CryptDEPair; use crate::neighborhood::gossip::Gossip_0v1; -use crate::sub_lib::accountant::ReportRoutingServiceProvidedMessage; +use crate::sub_lib::accountant::{ReportRoutingServiceProvidedMessage, ServiceProvided}; use crate::sub_lib::cryptde::{decodex, encodex, CryptData, CryptdecError}; use crate::sub_lib::dispatcher::{Component, Endpoint, InboundClientData}; use crate::sub_lib::hop::LiveHop; @@ -420,11 +420,13 @@ impl RoutingService { } match self.routing_service_subs.to_accountant_routing.try_send( ReportRoutingServiceProvidedMessage { - timestamp: SystemTime::now(), - paying_wallet: payer.wallet, - payload_size, - service_rate: self.per_routing_service, - byte_rate: self.per_routing_byte, + service: ServiceProvided { + timestamp: SystemTime::now(), + paying_wallet: payer.wallet, + payload_size, + service_rate: self.per_routing_service, + byte_rate: self.per_routing_byte, + }, }, ) { Ok(_) => (), @@ -1178,16 +1180,18 @@ mod tests { ); let accountant_recording = accountant_recording_arc.lock().unwrap(); let message = accountant_recording.get_record::(0); - check_timestamp(before, message.timestamp, after); - assert!(message.paying_wallet.congruent(&paying_wallet)); + check_timestamp(before, message.service.timestamp, after); + assert!(message.service.paying_wallet.congruent(&paying_wallet)); assert_eq!( *message, ReportRoutingServiceProvidedMessage { - timestamp: message.timestamp, - paying_wallet: address_paying_wallet, - payload_size: lcp.payload.len(), - service_rate: rate_pack_routing(103), - byte_rate: rate_pack_routing_byte(103), + service: ServiceProvided { + timestamp: message.service.timestamp, + paying_wallet: address_paying_wallet, + payload_size: lcp.payload.len(), + service_rate: rate_pack_routing(103), + byte_rate: rate_pack_routing_byte(103), + } } ) } diff --git a/node/src/proxy_client/mod.rs b/node/src/proxy_client/mod.rs index aa0f33010..b1d7b9776 100644 --- a/node/src/proxy_client/mod.rs +++ b/node/src/proxy_client/mod.rs @@ -14,7 +14,7 @@ use crate::proxy_client::resolver_wrapper::ResolverWrapperFactoryReal; use crate::proxy_client::stream_handler_pool::StreamHandlerPool; use crate::proxy_client::stream_handler_pool::StreamHandlerPoolFactory; use crate::proxy_client::stream_handler_pool::StreamHandlerPoolFactoryReal; -use crate::sub_lib::accountant::ReportExitServiceProvidedMessage; +use crate::sub_lib::accountant::{ReportExitServiceProvidedMessage, ServiceProvided}; use crate::sub_lib::cryptde::PublicKey; use crate::sub_lib::hopper::MessageType; use crate::sub_lib::hopper::{ExpiredCoresPackage, IncipientCoresPackage}; @@ -302,11 +302,13 @@ impl ProxyClient { ) { if let Some(paying_wallet) = stream_context.paying_wallet.clone() { let exit_report = ReportExitServiceProvidedMessage { - timestamp: SystemTime::now(), - paying_wallet, - payload_size: msg_data_len, - service_rate: self.exit_service_rate, - byte_rate: self.exit_byte_rate, + service: ServiceProvided { + timestamp: SystemTime::now(), + paying_wallet, + payload_size: msg_data_len, + service_rate: self.exit_service_rate, + byte_rate: self.exit_byte_rate, + }, }; self.to_accountant .as_ref() @@ -1047,28 +1049,32 @@ mod tests { let accountant_recording = accountant_recording_arc.lock().unwrap(); let accountant_record = accountant_recording.get_record::(0); - check_timestamp(before, accountant_record.timestamp, after); + check_timestamp(before, accountant_record.service.timestamp, after); assert_eq!( accountant_record, &ReportExitServiceProvidedMessage { - timestamp: accountant_record.timestamp, - paying_wallet: make_wallet("paying"), - payload_size: data.len(), - service_rate: 100, - byte_rate: 200, + service: ServiceProvided { + timestamp: accountant_record.service.timestamp, + paying_wallet: make_wallet("paying"), + payload_size: data.len(), + service_rate: 100, + byte_rate: 200, + } } ); let accountant_record = accountant_recording.get_record::(1); - check_timestamp(before, accountant_record.timestamp, after); + check_timestamp(before, accountant_record.service.timestamp, after); assert_eq!( accountant_record, &ReportExitServiceProvidedMessage { - timestamp: accountant_record.timestamp, - paying_wallet: make_wallet("paying"), - payload_size: data.len(), - service_rate: 100, - byte_rate: 200, + service: ServiceProvided { + timestamp: accountant_record.service.timestamp, + paying_wallet: make_wallet("paying"), + payload_size: data.len(), + service_rate: 100, + byte_rate: 200, + } } ); assert_eq!(accountant_recording.len(), 2); @@ -1285,15 +1291,17 @@ mod tests { let accountant_recording = accountant_recording_arc.lock().unwrap(); let accountant_record = accountant_recording.get_record::(0); - check_timestamp(before, accountant_record.timestamp, after); + check_timestamp(before, accountant_record.service.timestamp, after); assert_eq!( accountant_record, &ReportExitServiceProvidedMessage { - timestamp: accountant_record.timestamp, - paying_wallet: make_wallet("gnimusnoc"), - payload_size: data.len(), - service_rate: 100, - byte_rate: 200, + service: ServiceProvided { + timestamp: accountant_record.service.timestamp, + paying_wallet: make_wallet("gnimusnoc"), + payload_size: data.len(), + service_rate: 100, + byte_rate: 200, + } } ) } diff --git a/node/src/proxy_client/stream_handler_pool.rs b/node/src/proxy_client/stream_handler_pool.rs index 7fc623617..bdaf8b269 100644 --- a/node/src/proxy_client/stream_handler_pool.rs +++ b/node/src/proxy_client/stream_handler_pool.rs @@ -4,7 +4,7 @@ use crate::proxy_client::resolver_wrapper::ResolverWrapper; use crate::proxy_client::stream_establisher::StreamEstablisherFactoryReal; use crate::proxy_client::stream_establisher::{StreamEstablisher, StreamEstablisherFactory}; -use crate::sub_lib::accountant::ReportExitServiceProvidedMessage; +use crate::sub_lib::accountant::{ReportExitServiceProvidedMessage, ServiceProvided}; use crate::sub_lib::channel_wrappers::SenderWrapper; use crate::sub_lib::cryptde::CryptDE; use crate::sub_lib::proxy_client::{error_socket_addr, ProxyClientSubs}; @@ -260,11 +260,13 @@ impl StreamHandlerPoolReal { Some(wallet) => inner .accountant_sub .try_send(ReportExitServiceProvidedMessage { - timestamp: SystemTime::now(), - paying_wallet: wallet, - payload_size, - service_rate: inner.exit_service_rate, - byte_rate: inner.exit_byte_rate, + service: ServiceProvided{ + timestamp: SystemTime::now(), + paying_wallet: wallet, + payload_size, + service_rate: inner.exit_service_rate, + byte_rate: inner.exit_byte_rate, + } }) .expect("Accountant is dead"), // This log is here mostly for testing, to prove that no Accountant message is sent in the no-wallet case @@ -1399,7 +1401,7 @@ mod tests { ); let accountant_recording = accountant_recording_arc.lock().unwrap(); let resp_msg = accountant_recording.get_record::(0); - check_timestamp(before, resp_msg.timestamp, after); + check_timestamp(before, resp_msg.service.timestamp, after); } #[test] diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index ec9166bc5..8bc9486f7 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -16,8 +16,8 @@ use crate::proxy_server::protocol_pack::{from_ibcd, from_protocol, ProtocolPack} use crate::proxy_server::ExitServiceSearch::{Definite, ZeroHop}; use crate::stream_messages::NonClandestineAttributes; use crate::stream_messages::RemovedStreamType; -use crate::sub_lib::accountant::RoutingServiceConsumed; use crate::sub_lib::accountant::{ExitServiceConsumed, ReportServicesConsumedMessage}; +use crate::sub_lib::accountant::{RoutingServiceConsumed, RoutingServicesConsumed}; use crate::sub_lib::bidi_hashmap::BidiHashMap; use crate::sub_lib::cryptde::CryptDE; use crate::sub_lib::cryptde::PublicKey; @@ -856,8 +856,10 @@ impl ProxyServer { .try_send(ReportServicesConsumedMessage { timestamp: args.timestamp, exit, - routing_payload_size: pkg.payload.len(), - routing, + routing: RoutingServicesConsumed { + routing_payload_size: pkg.payload.len(), + services: routing, + }, }) .expect("Accountant is dead"); } @@ -997,8 +999,10 @@ impl ProxyServer { let report_message = ReportServicesConsumedMessage { timestamp: SystemTime::now(), exit: exit_service_report, - routing_payload_size: routing_size, - routing: routing_service_reports, + routing: RoutingServicesConsumed { + routing_payload_size: routing_size, + services: routing_service_reports, + }, }; self.subs .as_ref() @@ -2902,19 +2906,21 @@ mod tests { service_rate: exit_node_rate_pack.exit_service_rate, byte_rate: exit_node_rate_pack.exit_byte_rate }, - routing_payload_size: payload_enc_length, - routing: vec![ - RoutingServiceConsumed { - earning_wallet: route_1_earning_wallet, - service_rate: routing_node_1_rate_pack.routing_service_rate, - byte_rate: routing_node_1_rate_pack.routing_byte_rate, - }, - RoutingServiceConsumed { - earning_wallet: route_2_earning_wallet, - service_rate: routing_node_2_rate_pack.routing_service_rate, - byte_rate: routing_node_2_rate_pack.routing_byte_rate, - } - ] + routing: RoutingServicesConsumed { + routing_payload_size: payload_enc_length, + services: vec![ + RoutingServiceConsumed { + earning_wallet: route_1_earning_wallet, + service_rate: routing_node_1_rate_pack.routing_service_rate, + byte_rate: routing_node_1_rate_pack.routing_byte_rate, + }, + RoutingServiceConsumed { + earning_wallet: route_2_earning_wallet, + service_rate: routing_node_2_rate_pack.routing_service_rate, + byte_rate: routing_node_2_rate_pack.routing_byte_rate, + } + ] + } } ); let recording = proxy_server_recording_arc.lock().unwrap(); @@ -4101,7 +4107,7 @@ mod tests { byte_rate: exit_rates.exit_byte_rate, } ); - assert_eq!(msg.routing_payload_size, 5432); + assert_eq!(msg.routing.routing_payload_size, 5432); let dispatcher_recording = dispatcher_recording_arc.lock().unwrap(); let len = dispatcher_recording.len(); assert_eq!(len, 0); @@ -4270,19 +4276,21 @@ mod tests { service_rate: rate_pack_d.exit_service_rate, byte_rate: rate_pack_d.exit_byte_rate }, - routing_payload_size: routing_size, - routing: vec![ - RoutingServiceConsumed { - earning_wallet: incoming_route_e_wallet, - service_rate: rate_pack_e.routing_service_rate, - byte_rate: rate_pack_e.routing_byte_rate - }, - RoutingServiceConsumed { - earning_wallet: incoming_route_f_wallet, - service_rate: rate_pack_f.routing_service_rate, - byte_rate: rate_pack_f.routing_byte_rate - } - ] + routing: RoutingServicesConsumed { + routing_payload_size: routing_size, + services: vec![ + RoutingServiceConsumed { + earning_wallet: incoming_route_e_wallet, + service_rate: rate_pack_e.routing_service_rate, + byte_rate: rate_pack_e.routing_byte_rate + }, + RoutingServiceConsumed { + earning_wallet: incoming_route_f_wallet, + service_rate: rate_pack_f.routing_service_rate, + byte_rate: rate_pack_f.routing_byte_rate + } + ] + } } ); assert!(before <= first_report_timestamp && first_report_timestamp <= after); @@ -4299,19 +4307,21 @@ mod tests { service_rate: rate_pack_g.exit_service_rate, byte_rate: rate_pack_g.exit_byte_rate }, - routing_payload_size: routing_size, - routing: vec![ - RoutingServiceConsumed { - earning_wallet: incoming_route_h_wallet, - service_rate: rate_pack_h.routing_service_rate, - byte_rate: rate_pack_h.routing_byte_rate - }, - RoutingServiceConsumed { - earning_wallet: incoming_route_i_wallet, - service_rate: rate_pack_i.routing_service_rate, - byte_rate: rate_pack_i.routing_byte_rate - } - ] + routing: RoutingServicesConsumed { + routing_payload_size: routing_size, + services: vec![ + RoutingServiceConsumed { + earning_wallet: incoming_route_h_wallet, + service_rate: rate_pack_h.routing_service_rate, + byte_rate: rate_pack_h.routing_byte_rate + }, + RoutingServiceConsumed { + earning_wallet: incoming_route_i_wallet, + service_rate: rate_pack_i.routing_service_rate, + byte_rate: rate_pack_i.routing_byte_rate + } + ] + } } ); assert!(before <= second_report_timestamp && second_report_timestamp <= after); @@ -4507,12 +4517,14 @@ mod tests { service_rate: rate_pack_d.exit_service_rate, byte_rate: rate_pack_d.exit_byte_rate }, - routing_payload_size: routing_size, - routing: vec![RoutingServiceConsumed { - earning_wallet: incoming_route_e_wallet, - service_rate: rate_pack_e.routing_service_rate, - byte_rate: rate_pack_e.routing_byte_rate - }] + routing: RoutingServicesConsumed { + routing_payload_size: routing_size, + services: vec![RoutingServiceConsumed { + earning_wallet: incoming_route_e_wallet, + service_rate: rate_pack_e.routing_service_rate, + byte_rate: rate_pack_e.routing_byte_rate + }] + } } ); assert!(before <= returned_timestamp && returned_timestamp <= after); @@ -4689,19 +4701,21 @@ mod tests { service_rate: rate_pack_d.exit_service_rate, byte_rate: rate_pack_d.exit_byte_rate }, - routing_payload_size: routing_size, - routing: vec![ - RoutingServiceConsumed { - earning_wallet: incoming_route_e_wallet, - service_rate: rate_pack_e.routing_service_rate, - byte_rate: rate_pack_e.routing_byte_rate - }, - RoutingServiceConsumed { - earning_wallet: incoming_route_f_wallet, - service_rate: rate_pack_f.routing_service_rate, - byte_rate: rate_pack_f.routing_byte_rate - } - ] + routing: RoutingServicesConsumed { + routing_payload_size: routing_size, + services: vec![ + RoutingServiceConsumed { + earning_wallet: incoming_route_e_wallet, + service_rate: rate_pack_e.routing_service_rate, + byte_rate: rate_pack_e.routing_byte_rate + }, + RoutingServiceConsumed { + earning_wallet: incoming_route_f_wallet, + service_rate: rate_pack_f.routing_service_rate, + byte_rate: rate_pack_f.routing_byte_rate + } + ] + } } ); assert!(before <= returned_timestamp && returned_timestamp <= after); diff --git a/node/src/sub_lib/accountant.rs b/node/src/sub_lib/accountant.rs index 8f9e80dac..4d23faaf8 100644 --- a/node/src/sub_lib/accountant.rs +++ b/node/src/sub_lib/accountant.rs @@ -4,6 +4,7 @@ use crate::accountant::db_access_objects::failed_payable_dao::FailedPayableDaoFa use crate::accountant::db_access_objects::payable_dao::PayableDaoFactory; use crate::accountant::db_access_objects::receivable_dao::ReceivableDaoFactory; use crate::accountant::db_access_objects::sent_payable_dao::SentPayableDaoFactory; +use crate::accountant::logging_utils::accounting_msgs_debug::AccountingMsgType; use crate::accountant::scanners::payable_scanner::msgs::PricedTemplatesMessage; use crate::accountant::{ checked_conversion, Accountant, ReceivedPayments, ScanError, SentPayables, TxReceiptsMessage, @@ -18,6 +19,7 @@ use actix::Recipient; use actix::{Addr, Message}; use lazy_static::lazy_static; use masq_lib::blockchains::chains::Chain; +use masq_lib::logger::Logger; use masq_lib::ui_gateway::NodeFromUiMessage; use std::fmt::{Debug, Formatter}; use std::str::FromStr; @@ -124,18 +126,82 @@ impl SubsFactory for AccountantSubsFactoryReal { } } -// TODO: These four structures all consist of exactly the same five fields. They could be factored out. -#[derive(Clone, PartialEq, Eq, Debug, Message)] -pub struct ReportRoutingServiceProvidedMessage { - pub timestamp: SystemTime, - pub paying_wallet: Wallet, - pub payload_size: usize, - pub service_rate: u64, - pub byte_rate: u64, +pub trait AccountableServiceWithTraceLog { + fn log_trace(&self, (logger, msg_id): (&Logger, u32)) { + logger.trace(|| self.trace_log_msg(msg_id)) + } + fn trace_log_msg(&self, msg_id: u32) -> String; +} + +pub trait MessageWithServicesProvided { + fn services_provided(&self) -> &ServiceProvided; + fn msg_type(&self) -> AccountingMsgType; + fn trace_log_wrapper(&self) -> ServiceProvidedTraceLogWrapper<'_>; } #[derive(Clone, PartialEq, Eq, Debug, Message)] pub struct ReportExitServiceProvidedMessage { + pub service: ServiceProvided, +} + +impl MessageWithServicesProvided for ReportExitServiceProvidedMessage { + fn services_provided(&self) -> &ServiceProvided { + &self.service + } + + fn msg_type(&self) -> AccountingMsgType { + AccountingMsgType::ExitServiceProvided + } + + fn trace_log_wrapper(&self) -> ServiceProvidedTraceLogWrapper<'_> { + ServiceProvidedTraceLogWrapper::ExitServiceProvided(&self.service) + } +} + +#[derive(Clone, PartialEq, Eq, Debug, Message)] +pub struct ReportRoutingServiceProvidedMessage { + pub service: ServiceProvided, +} + +impl MessageWithServicesProvided for ReportRoutingServiceProvidedMessage { + fn services_provided(&self) -> &ServiceProvided { + &self.service + } + + fn msg_type(&self) -> AccountingMsgType { + AccountingMsgType::RoutingServiceProvided + } + + fn trace_log_wrapper(&self) -> ServiceProvidedTraceLogWrapper<'_> { + ServiceProvidedTraceLogWrapper::RoutingServiceProvided(&self.service) + } +} + +impl AccountableServiceWithTraceLog for ServiceProvidedTraceLogWrapper<'_> { + fn trace_log_msg(&self, msg_id: u32) -> String { + match self { + ServiceProvidedTraceLogWrapper::ExitServiceProvided(service) => { + format!( + "Msg {}: Charging exit service for {} bytes to wallet {} at {} per service and {} per byte", + msg_id, + service.payload_size, + service.paying_wallet, + service.service_rate, + service.byte_rate + ) + } + ServiceProvidedTraceLogWrapper::RoutingServiceProvided(service) => { + format!( + "Msg {}: Charging routing of {} bytes to wallet {}", + msg_id, service.payload_size, service.paying_wallet + ) + } + } + } +} + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct ServiceProvided { pub timestamp: SystemTime, pub paying_wallet: Wallet, pub payload_size: usize, @@ -147,25 +213,58 @@ pub struct ReportExitServiceProvidedMessage { pub struct ReportServicesConsumedMessage { pub timestamp: SystemTime, pub exit: ExitServiceConsumed, - pub routing_payload_size: usize, - pub routing: Vec, + pub routing: RoutingServicesConsumed, +} + +impl AccountableServiceWithTraceLog for ExitServiceConsumed { + fn trace_log_msg(&self, msg_id: u32) -> String { + format!( + "Msg {}: Accruing debt to {} for consuming {} exited bytes", + msg_id, self.earning_wallet, self.payload_size + ) + } +} + +impl<'a> AccountableServiceWithTraceLog for RoutingServiceConsumedTraceLogWrapper<'a> { + fn trace_log_msg(&self, msg_id: u32) -> String { + format!( + "Msg {}: Accruing debt to {} for consuming {} routed bytes", + msg_id, self.service.earning_wallet, self.routing_payload_size + ) + } } #[derive(Clone, PartialEq, Eq, Debug)] -pub struct RoutingServiceConsumed { +pub struct ExitServiceConsumed { pub earning_wallet: Wallet, + pub payload_size: usize, pub service_rate: u64, pub byte_rate: u64, } #[derive(Clone, PartialEq, Eq, Debug)] -pub struct ExitServiceConsumed { +pub struct RoutingServicesConsumed { + pub routing_payload_size: usize, + pub services: Vec, +} + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct RoutingServiceConsumed { pub earning_wallet: Wallet, - pub payload_size: usize, pub service_rate: u64, pub byte_rate: u64, } +pub enum ServiceProvidedTraceLogWrapper<'a> { + ExitServiceProvided(&'a ServiceProvided), + RoutingServiceProvided(&'a ServiceProvided), +} + +pub struct RoutingServiceConsumedTraceLogWrapper<'a> { + pub service: &'a RoutingServiceConsumed, + pub routing_payload_size: usize, +} + #[derive(Clone, PartialEq, Eq, Debug, Default)] pub struct FinancialStatistics { pub total_paid_payable_wei: u128, @@ -190,19 +289,23 @@ pub enum DetailedScanType { #[cfg(test)] mod tests { + use crate::accountant::logging_utils::accounting_msgs_debug::AccountingMsgType; use crate::accountant::test_utils::AccountantBuilder; use crate::accountant::{checked_conversion, Accountant}; use crate::sub_lib::accountant::{ - AccountantSubsFactoryReal, DetailedScanType, PaymentThresholds, ScanIntervals, SubsFactory, + AccountableServiceWithTraceLog, AccountantSubsFactoryReal, DetailedScanType, + MessageWithServicesProvided, PaymentThresholds, ReportExitServiceProvidedMessage, + ReportRoutingServiceProvidedMessage, ScanIntervals, ServiceProvided, SubsFactory, DEFAULT_EARNING_WALLET, DEFAULT_PAYMENT_THRESHOLDS, TEMPORARY_CONSUMING_WALLET, }; use crate::sub_lib::wallet::Wallet; + use crate::test_utils::make_wallet; use crate::test_utils::recorder::{make_accountant_subs_from_recorder, Recorder}; use actix::Actor; use masq_lib::blockchains::chains::Chain; use masq_lib::messages::ScanType; use std::str::FromStr; - use std::time::Duration; + use std::time::{Duration, SystemTime}; impl From for ScanType { fn from(scan_type: DetailedScanType) -> Self { @@ -240,7 +343,7 @@ mod tests { assert_eq!( *TEMPORARY_CONSUMING_WALLET, temporary_consuming_wallet_expected - ) + ); } #[test] @@ -292,4 +395,39 @@ mod tests { } ); } + + #[test] + fn messages_with_services_provided_are_correctly_implemented() { + let service = ServiceProvided { + timestamp: SystemTime::now(), + paying_wallet: make_wallet("abc"), + payload_size: 0, + service_rate: 0, + byte_rate: 0, + }; + let exit_service_msg = ReportExitServiceProvidedMessage { + service: service.clone(), + }; + let routing_service_msg = ReportRoutingServiceProvidedMessage { service }; + + let exit_service_accounting_msg_type = exit_service_msg.msg_type(); + let exit_service_provided_trace_log_wrapper = exit_service_msg.trace_log_wrapper(); + let routing_service_accounting_msg_type = routing_service_msg.msg_type(); + let routing_service_provided_trace_log_wrapper = routing_service_msg.trace_log_wrapper(); + + assert_eq!( + exit_service_accounting_msg_type, + AccountingMsgType::ExitServiceProvided + ); + assert_eq!( + routing_service_accounting_msg_type, + AccountingMsgType::RoutingServiceProvided + ); + assert!(exit_service_provided_trace_log_wrapper + .trace_log_msg(1) + .contains("Msg 1: Charging exit")); + assert!(routing_service_provided_trace_log_wrapper + .trace_log_msg(2) + .contains("Msg 2: Charging routing")); + } }