diff --git a/multinode_integration_tests/tests/verify_bill_payment.rs b/multinode_integration_tests/tests/verify_bill_payment.rs index e5fddc67f..714856eb8 100644 --- a/multinode_integration_tests/tests/verify_bill_payment.rs +++ b/multinode_integration_tests/tests/verify_bill_payment.rs @@ -465,7 +465,7 @@ fn verify_pending_payables() { } MASQNodeUtils::assert_node_wrote_log_containing( real_consuming_node.name(), - "Found 3 pending payables and 0 suspected failures to process", + "Found 3 pending payables and 0 supposed failures to process", Duration::from_secs(5), ); MASQNodeUtils::assert_node_wrote_log_containing( diff --git a/node/src/accountant/db_access_objects/failed_payable_dao.rs b/node/src/accountant/db_access_objects/failed_payable_dao.rs index 2b83fdfab..f2e48827d 100644 --- a/node/src/accountant/db_access_objects/failed_payable_dao.rs +++ b/node/src/accountant/db_access_objects/failed_payable_dao.rs @@ -748,7 +748,7 @@ mod tests { assert_eq!( FailureRetrieveCondition::ByReceiverAddresses(BTreeSet::from([make_address(1), make_address(2)])) .to_string(), - "WHERE receiver_address IN ('0x0000000000000000000003000000000003000000', '0x0000000000000000000006000000000006000000')" + "WHERE receiver_address IN ('0x0000000000000000000001000000001000000001', '0x0000000000000000000002000000002000000002')" ) } diff --git a/node/src/accountant/logging_utils/accounting_msgs_debug.rs b/node/src/accountant/logging_utils/accounting_msgs_debug.rs new file mode 100644 index 000000000..81f547b6d --- /dev/null +++ b/node/src/accountant/logging_utils/accounting_msgs_debug.rs @@ -0,0 +1,467 @@ +// Copyright (c) 2025, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. + +use itertools::Itertools; +use masq_lib::logger::Logger; +use std::collections::HashMap; +use std::fmt::{Display, Formatter}; +use std::iter::once; +use web3::types::Address; + +// An attempt to provide somewhat useful debug stats for the accounting messages after we have +// decreased the log level for lots of them, and it drastically reduced the observability +// of the Accountant. + +#[derive(Default)] +pub struct AccountingMsgsDebugStats { + report_routing_service_provided_processed: AccountingMsgStats, + report_exit_service_provided_processed: AccountingMsgStats, + report_services_consumed_processed: AccountingMsgStats, +} + +impl AccountingMsgsDebugStats { + pub fn manage_debug_log( + &mut self, + logger: &Logger, + msg_type: AccountingMsgType, + log_window_size: u16, + new_postings: Vec, + ) { + if logger.debug_enabled() { + if let Some(loggable_stats) = self.manage_log(msg_type, new_postings, log_window_size) { + debug!(logger, "{}", loggable_stats); + } + } + } + + fn manage_log( + &mut self, + msg_type: AccountingMsgType, + new_postings: Vec, + log_window_size: u16, + ) -> Option { + self.record(new_postings, msg_type); + self.request_log_instruction(log_window_size, msg_type) + } + + fn record(&mut self, new_postings: Vec, msg_type: AccountingMsgType) { + match msg_type { + AccountingMsgType::RoutingServiceProvided => { + self.report_routing_service_provided_processed + .handle_new_postings(new_postings); + } + AccountingMsgType::ExitServiceProvided => { + self.report_exit_service_provided_processed + .handle_new_postings(new_postings); + } + AccountingMsgType::ServicesConsumed => { + self.report_services_consumed_processed + .handle_new_postings(new_postings); + } + } + } + + fn request_log_instruction( + &mut self, + gap_size: u16, + msg_type: AccountingMsgType, + ) -> Option { + match msg_type { + AccountingMsgType::RoutingServiceProvided => self + .report_routing_service_provided_processed + .loggable_stats(gap_size), + AccountingMsgType::ExitServiceProvided => self + .report_exit_service_provided_processed + .loggable_stats(gap_size), + AccountingMsgType::ServicesConsumed => self + .report_services_consumed_processed + .loggable_stats(gap_size), + } + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub struct NewPosting { + address: Address, + amount_wei: u128, +} + +impl NewPosting { + pub fn new(address: Address, amount_wei: u128) -> Self { + Self { + address, + amount_wei, + } + } +} + +#[derive(Debug, PartialEq, Eq)] +pub struct LoggableStats { + msg_type: AccountingMsgType, + accounting_msg_stats: HashMap, + log_window_in_pcs_of_msgs: u16, +} + +impl Display for LoggableStats { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let label = format!( + "Account debits in last {} {:?} messages (wei):", + self.log_window_in_pcs_of_msgs, self.msg_type + ); + let stats = self + .accounting_msg_stats + .iter() + .sorted() + .map(|(address, sum)| format!("{:?}: {}", address, sum)) + .collect_vec(); + once(label).chain(stats).join("\n").fmt(f) + } +} + +#[derive(Default)] +struct AccountingMsgStats { + stats: HashMap, + msg_count_since_last_logged: usize, +} + +impl AccountingMsgStats { + fn loggable_stats(&mut self, log_window_size: u16) -> Option { + if self.msg_count_since_last_logged == log_window_size as usize { + self.msg_count_since_last_logged = 0; + + Some(LoggableStats { + msg_type: AccountingMsgType::RoutingServiceProvided, + accounting_msg_stats: self.stats.drain().collect(), + log_window_in_pcs_of_msgs: log_window_size, + }) + } else { + None + } + } + + fn handle_new_postings(&mut self, new_postings: Vec) { + for new_posting in &new_postings { + *self.stats.entry(new_posting.address).or_default() += new_posting.amount_wei; + } + self.msg_count_since_last_logged += 1; + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum AccountingMsgType { + RoutingServiceProvided, + ExitServiceProvided, + ServicesConsumed, +} + +pub struct NewPostingsDebugContainer { + debug_enabled: bool, + vec: Vec, +} + +impl NewPostingsDebugContainer { + pub fn new(logger: &Logger) -> Self { + Self { + debug_enabled: logger.debug_enabled(), + vec: vec![], + } + } + + pub fn add(mut self, address: Address, sum_wei: u128) -> Self { + if self.debug_enabled { + self.vec.push(NewPosting::new(address, sum_wei)); + } + self + } +} + +impl From for Vec { + fn from(postings: NewPostingsDebugContainer) -> Self { + postings.vec + } +} + +#[cfg(test)] +mod tests { + use super::{ + AccountingMsgType, AccountingMsgsDebugStats, LoggableStats, NewPosting, + NewPostingsDebugContainer, + }; + use crate::blockchain::test_utils::make_address; + use itertools::Itertools; + use log::Level; + use masq_lib::logger::Logger; + use masq_lib::test_utils::logging::{init_test_logging, TestLogHandler}; + use std::collections::HashMap; + use web3::types::Address; + + #[test] + fn test_loggable_count_works_for_routing_service_provided() { + test_manage_debug_log( + AccountingMsgType::RoutingServiceProvided, + 6, + |subject| { + subject + .report_routing_service_provided_processed + .stats + .clone() + }, + |subject| { + subject + .report_routing_service_provided_processed + .msg_count_since_last_logged + }, + ); + } + + #[test] + fn test_loggable_count_works_for_exit_service_provided() { + test_manage_debug_log( + AccountingMsgType::ExitServiceProvided, + 3, + |subject| subject.report_exit_service_provided_processed.stats.clone(), + |subject| { + subject + .report_exit_service_provided_processed + .msg_count_since_last_logged + }, + ); + } + + #[test] + fn test_loggable_count_works_for_services_consumed() { + test_manage_debug_log( + AccountingMsgType::ServicesConsumed, + 8, + |subject| subject.report_services_consumed_processed.stats.clone(), + |subject| { + subject + .report_services_consumed_processed + .msg_count_since_last_logged + }, + ); + } + + fn test_manage_debug_log( + msg_type: AccountingMsgType, + gap_size: u16, + fetch_stats: fn(&AccountingMsgsDebugStats) -> HashMap, + fetch_msg_count_processed: fn(&AccountingMsgsDebugStats) -> usize, + ) { + // We begin the test by recording N - 1 msgs. Then we add one more and match the gap_size + // condition which should release the debug stats. After that happens, the stats are cleared + // and the process can start again. + let new_posting_feeds_per_msg = generate_posting_feeds_representing_msgs(gap_size); + let mut subject = AccountingMsgsDebugStats::default(); + + let initial_state_total_count = fetch_stats(&subject); + let initial_msg_count_processed = fetch_msg_count_processed(&subject); + assert_eq!(initial_state_total_count, hashmap!()); + assert_eq!(initial_msg_count_processed, 0); + + let first_feed_remembered = new_posting_feeds_per_msg.first().unwrap().clone(); + let last_feed_remembered = new_posting_feeds_per_msg.last().unwrap().clone(); + + let first_expected_stats = + compute_expected_stats_from_new_posting_feeds(&new_posting_feeds_per_msg); + let first_loggable_stats_opt = new_posting_feeds_per_msg + .into_iter() + .fold(None, |_, new_postings| { + subject.manage_log(msg_type, new_postings, gap_size) + }); + let first_actual_stats = fetch_stats(&subject); + let first_msg_count_processed = fetch_msg_count_processed(&subject); + assert_eq!(first_loggable_stats_opt, None); + assert_eq!( + first_actual_stats.into_iter().sorted().collect_vec(), + first_expected_stats + ); + assert_eq!(first_msg_count_processed, gap_size as usize - 1); + + let posting_fulfilling_the_msg_count_requirement = first_feed_remembered; + let second_loggable_stats_opt = subject.manage_log( + msg_type, + posting_fulfilling_the_msg_count_requirement.clone(), + gap_size, + ); + let second_actual_stats = fetch_stats(&subject); + let second_msg_count_processed = fetch_msg_count_processed(&subject); + let second_expected_stats = record_new_posting_feed_in( + first_expected_stats, + posting_fulfilling_the_msg_count_requirement, + ); + let loggable_stats = second_loggable_stats_opt.unwrap(); + assert_eq!( + loggable_stats + .accounting_msg_stats + .into_iter() + .sorted() + .collect_vec(), + second_expected_stats, + ); + assert_eq!(loggable_stats.log_window_in_pcs_of_msgs, gap_size,); + assert_eq!(second_actual_stats, hashmap!()); + assert_eq!(second_msg_count_processed, 0); + + let new_posting_after_stats_dumping = last_feed_remembered; + let third_loggable_stats_opt = + subject.manage_log(msg_type, new_posting_after_stats_dumping.clone(), gap_size); + let third_actual_stats = fetch_stats(&subject); + let third_msg_count_processed = fetch_msg_count_processed(&subject); + assert_eq!(third_loggable_stats_opt, None); + assert_eq!( + third_actual_stats.into_iter().sorted().collect_vec(), + new_posting_after_stats_dumping + .into_iter() + .map(|posting| (posting.address, posting.amount_wei)) + .sorted() + .collect_vec(), + ); + assert_eq!(third_msg_count_processed, 1); + } + + fn record_new_posting_feed_in( + first_expected_stats: Vec<(Address, u128)>, + second_new_posting: Vec, + ) -> Vec<(Address, u128)> { + let second_expected_stats = first_expected_stats + .into_iter() + .map(|(address, sum)| { + let updated_sum = second_new_posting.iter().fold(sum, |acc, posting| { + if posting.address == address { + acc + posting.amount_wei + } else { + acc + } + }); + (address, updated_sum) + }) + .collect_vec(); + second_expected_stats + } + + fn generate_posting_feeds_representing_msgs(gap_size: u16) -> Vec> { + let new_postings_feeds = (0..gap_size - 1) + .map(|outer_idx| { + (0..outer_idx + 1) + .map(|inner_idx| { + NewPosting::new( + make_address(inner_idx as u32), + (inner_idx as u128 + 1) * 1234567, + ) + }) + .collect_vec() + }) + .collect_vec(); + new_postings_feeds + } + + fn compute_expected_stats_from_new_posting_feeds( + new_postings_feeds: &Vec>, + ) -> Vec<(Address, u128)> { + let first_expected_stats = { + let all_postings_flattened = new_postings_feeds.iter().flatten().collect_vec(); + let all_unique_addresses = new_postings_feeds.last().unwrap(); + all_unique_addresses + .iter() + .map(|unique_account_posting| { + let sum = all_postings_flattened.iter().fold(0, |acc, posting| { + if posting.address == unique_account_posting.address { + acc + posting.amount_wei + } else { + acc + } + }); + (unique_account_posting.address, sum) + }) + .collect_vec() + }; + first_expected_stats + } + + #[test] + fn new_posting_debug_container_for_debug_enabled() { + let mut logger = Logger::new("test"); + logger.set_level_for_test(Level::Debug); + let container = NewPostingsDebugContainer::new(&logger); + let new_posting_1 = NewPosting::new(make_address(1), 1234567); + let new_posting_2 = NewPosting::new(make_address(2), 7654321); + + let container = container.add(new_posting_1.address, new_posting_1.amount_wei); + let container = container.add(new_posting_1.address, new_posting_1.amount_wei); + let container = container.add(new_posting_2.address, new_posting_2.amount_wei); + + let stats: Vec = container.into(); + assert_eq!(stats, vec![new_posting_1, new_posting_1, new_posting_2]); + } + + #[test] + fn new_posting_debug_container_for_debug_not_enabled() { + let mut logger = Logger::new("test"); + logger.set_level_for_test(Level::Info); + let container = NewPostingsDebugContainer::new(&logger); + let new_posting_1 = NewPosting::new(make_address(1), 1234567); + let new_posting_2 = NewPosting::new(make_address(2), 7654321); + + let container = container.add(new_posting_1.address, new_posting_1.amount_wei); + let container = container.add(new_posting_1.address, new_posting_1.amount_wei); + let container = container.add(new_posting_2.address, new_posting_2.amount_wei); + + let stats: Vec = container.into(); + assert_eq!(stats, vec![]); + } + + #[test] + fn accounts_stats_are_logged_only_if_debug_enabled() { + init_test_logging(); + let test_name = "accounts_stats_are_logged_only_if_debug_enabled"; + let mut logger = Logger::new(test_name); + logger.set_level_for_test(Level::Debug); + let mut subject = AccountingMsgsDebugStats::default(); + let new_posting_1 = NewPosting::new(make_address(1), 1234567); + let new_posting_2 = NewPosting::new(make_address(2), 7654321); + + subject.manage_debug_log( + &logger, + AccountingMsgType::ServicesConsumed, + 1, + vec![new_posting_1, new_posting_2], + ); + + TestLogHandler::new() + .exists_log_containing(&format!("DEBUG: {test_name}: Account debits in last")); + } + + #[test] + fn accounts_stats_are_not_logged_if_debug_is_not_enabled() { + init_test_logging(); + let test_name = "accounts_stats_are_not_logged_if_debug_is_not_enabled"; + let mut logger = Logger::new("test"); + logger.set_level_for_test(Level::Info); + let mut subject = AccountingMsgsDebugStats::default(); + let new_posting_1 = NewPosting::new(make_address(1), 1234567); + let new_posting_2 = NewPosting::new(make_address(2), 7654321); + + subject.manage_debug_log( + &logger, + AccountingMsgType::ServicesConsumed, + 1, + vec![new_posting_1, new_posting_2], + ); + + TestLogHandler::new().exists_no_log_containing(&format!("DEBUG: {test_name}:")); + } + + #[test] + fn display_loggable_stats() { + let loggable_stats = LoggableStats { + msg_type: AccountingMsgType::RoutingServiceProvided, + accounting_msg_stats: hashmap!(make_address(1) => 1234567, make_address(2) => 7654321), + log_window_in_pcs_of_msgs: 15, + }; + let expected_display = "\ + Account debits in last 15 RoutingServiceProvided messages (wei):\n\ + 0x0000000000000000000001000000001000000001: 1234567\n\ + 0x0000000000000000000002000000002000000002: 7654321"; + assert_eq!(format!("{}", loggable_stats), expected_display); + } +} diff --git a/node/src/accountant/logging_utils/mod.rs b/node/src/accountant/logging_utils/mod.rs new file mode 100644 index 000000000..36b240cc4 --- /dev/null +++ b/node/src/accountant/logging_utils/mod.rs @@ -0,0 +1,23 @@ +// Copyright (c) 2025, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. + +pub mod accounting_msgs_debug; +pub mod msg_id_generator; + +use crate::accountant::logging_utils::accounting_msgs_debug::AccountingMsgsDebugStats; +use crate::accountant::logging_utils::msg_id_generator::{ + MessageIdGenerator, MessageIdGeneratorReal, +}; + +pub struct LoggingUtils { + pub accounting_msgs_stats: AccountingMsgsDebugStats, + pub msg_id_generator: Box, +} + +impl Default for LoggingUtils { + fn default() -> Self { + Self { + accounting_msgs_stats: AccountingMsgsDebugStats::default(), + msg_id_generator: Box::new(MessageIdGeneratorReal::default()), + } + } +} diff --git a/node/src/accountant/logging_utils/msg_id_generator.rs b/node/src/accountant/logging_utils/msg_id_generator.rs new file mode 100644 index 000000000..e121a37b8 --- /dev/null +++ b/node/src/accountant/logging_utils/msg_id_generator.rs @@ -0,0 +1,56 @@ +// Copyright (c) 2025, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. + +use crate::sub_lib::accountant::MSG_ID_INCREMENTER; +use std::sync::atomic::Ordering; + +pub trait MessageIdGenerator { + fn id(&self) -> u32; + as_any_ref_in_trait!(); +} + +#[derive(Default)] +pub struct MessageIdGeneratorReal {} + +impl MessageIdGenerator for MessageIdGeneratorReal { + fn id(&self) -> u32 { + MSG_ID_INCREMENTER.fetch_add(1, Ordering::Relaxed) + } + as_any_ref_in_trait_impl!(); +} + +#[cfg(test)] +mod tests { + use crate::accountant::logging_utils::msg_id_generator::{ + MessageIdGenerator, MessageIdGeneratorReal, + }; + use crate::sub_lib::accountant::MSG_ID_INCREMENTER; + use std::sync::atomic::Ordering; + use std::sync::Mutex; + + static MSG_ID_GENERATOR_TEST_GUARD: Mutex<()> = Mutex::new(()); + + #[test] + fn msg_id_generator_increments_by_one_with_every_call() { + let _guard = MSG_ID_GENERATOR_TEST_GUARD.lock().unwrap(); + let subject = MessageIdGeneratorReal::default(); + + let id1 = subject.id(); + let id2 = subject.id(); + let id3 = subject.id(); + + assert_eq!(id2, id1 + 1); + assert_eq!(id3, id2 + 1) + } + + #[test] + fn msg_id_generator_wraps_around_max_value() { + let _guard = MSG_ID_GENERATOR_TEST_GUARD.lock().unwrap(); + MSG_ID_INCREMENTER.store(u32::MAX, Ordering::Relaxed); + let subject = MessageIdGeneratorReal::default(); + subject.id(); // This returns the previous value, not the newly incremented + + let id = subject.id(); + + assert_eq!(id, 0) + } +} diff --git a/node/src/accountant/mod.rs b/node/src/accountant/mod.rs index 31f3033b5..931e610ed 100644 --- a/node/src/accountant/mod.rs +++ b/node/src/accountant/mod.rs @@ -3,6 +3,7 @@ pub mod db_access_objects; pub mod db_big_integer; pub mod financials; +mod logging_utils; pub mod payment_adjuster; pub mod scanners; @@ -22,6 +23,10 @@ use crate::accountant::db_access_objects::utils::{ use crate::accountant::financials::visibility_restricted_module::{ check_query_is_within_tech_limits, financials_entry_check, }; +use crate::accountant::logging_utils::accounting_msgs_debug::{ + AccountingMsgType, NewPosting, NewPostingsDebugContainer, +}; +use crate::accountant::logging_utils::LoggingUtils; use crate::accountant::scanners::payable_scanner::msgs::{ InitialTemplatesMessage, PricedTemplatesMessage, }; @@ -48,7 +53,6 @@ use crate::sub_lib::accountant::ReportExitServiceProvidedMessage; use crate::sub_lib::accountant::ReportRoutingServiceProvidedMessage; use crate::sub_lib::accountant::ReportServicesConsumedMessage; use crate::sub_lib::accountant::{AccountantSubs, DetailedScanType}; -use crate::sub_lib::accountant::{MessageIdGenerator, MessageIdGeneratorReal}; use crate::sub_lib::blockchain_bridge::OutboundPaymentsInstructions; use crate::sub_lib::neighborhood::{ConfigChange, ConfigChangeMsg}; use crate::sub_lib::peer_actors::{BindMessage, StartMessage}; @@ -86,6 +90,7 @@ use std::time::SystemTime; pub const CRASH_KEY: &str = "ACCOUNTANT"; pub const DEFAULT_PENDING_TOO_LONG_SEC: u64 = 21_600; //6 hours +const DEFAULT_ACCOUNTING_MSG_LOG_WINDOW: u16 = 50; pub struct Accountant { consuming_wallet_opt: Option, @@ -104,7 +109,7 @@ pub struct Accountant { report_inbound_payments_sub_opt: Option>, report_sent_payables_sub_opt: Option>, ui_message_sub_opt: Option>, - message_id_generator: Box, + logging_utils: LoggingUtils, logger: Logger, } @@ -348,8 +353,8 @@ impl Handler for Accountant { if let Some(node_to_ui_msg) = ui_msg_opt { info!( self.logger, - "Re-running the pending payable scan is recommended, as some parts \ - did not finish last time." + "Re-running the pending payable scan is recommended, as some parts were \ + prevented from completing during the previous attempt." ); self.ui_message_sub_opt .as_ref() @@ -560,6 +565,7 @@ impl Accountant { pub fn new(config: BootstrapperConfig, dao_factories: DaoFactories) -> Accountant { let payment_thresholds = config.payment_thresholds_opt.expectv("Payment thresholds"); let scan_intervals = config.scan_intervals_opt.expectv("Scan Intervals"); + let consuming_wallet_opt = config.consuming_wallet_opt.clone(); let earning_wallet = config.earning_wallet.clone(); let financial_statistics = Rc::new(RefCell::new(FinancialStatistics::default())); let payable_dao = dao_factories.payable_dao_factory.make(); @@ -573,7 +579,7 @@ impl Accountant { ); Accountant { - consuming_wallet_opt: config.consuming_wallet_opt.clone(), + consuming_wallet_opt, earning_wallet, payable_dao, receivable_dao, @@ -589,7 +595,7 @@ impl Accountant { report_inbound_payments_sub_opt: None, request_transaction_receipts_sub_opt: None, ui_message_sub_opt: None, - message_id_generator: Box::new(MessageIdGeneratorReal::default()), + logging_utils: LoggingUtils::default(), logger: Logger::new("Accountant"), } } @@ -623,7 +629,7 @@ impl Accountant { timestamp: SystemTime, payload_size: usize, wallet: &Wallet, - ) { + ) -> u128 { let byte_charge = byte_rate as u128 * (payload_size as u128); let total_charge = service_rate as u128 + byte_charge; if !self.our_wallet(wallet) { @@ -641,12 +647,14 @@ impl Accountant { ), Err(e) => panic!("Was recording services provided for {} but hit a fatal database error: {:?}", wallet, e) }; + total_charge } else { warning!( self.logger, "Declining to record a receivable against our wallet {} for services we provided", wallet ); + 0 } } @@ -657,7 +665,7 @@ impl Accountant { timestamp: SystemTime, payload_size: usize, wallet: &Wallet, - ) { + ) -> u128 { let byte_charge = byte_rate as u128 * (payload_size as u128); let total_charge = service_rate as u128 + byte_charge; if !self.our_wallet(wallet) { @@ -676,12 +684,14 @@ impl Accountant { ), Err(e) => panic!("Recording services consumed from {} but has hit fatal database error: {:?}", wallet, e) }; + total_charge } else { warning!( self.logger, "Declining to record a payable against our wallet {} for service we provided", wallet ); + 0 } } @@ -739,45 +749,65 @@ impl Accountant { &mut self, msg: ReportRoutingServiceProvidedMessage, ) { + let msg_id = self.msg_id(); trace!( self.logger, - "Charging routing of {} bytes to wallet {}", + "Msg {}: Charging routing of {} bytes to wallet {}", + msg_id, msg.payload_size, msg.paying_wallet ); - self.record_service_provided( + + let sum = self.record_service_provided( msg.service_rate, msg.byte_rate, msg.timestamp, msg.payload_size, &msg.paying_wallet, ); + + self.logging_utils.accounting_msgs_stats.manage_debug_log( + &self.logger, + AccountingMsgType::RoutingServiceProvided, + DEFAULT_ACCOUNTING_MSG_LOG_WINDOW, + vec![NewPosting::new(msg.paying_wallet.address(), sum)], + ) } fn handle_report_exit_service_provided_message( &mut self, msg: ReportExitServiceProvidedMessage, ) { - trace!( - self.logger, - "Charging exit service for {} bytes to wallet {} at {} per service and {} per byte", - msg.payload_size, - msg.paying_wallet, - msg.service_rate, - msg.byte_rate - ); - self.record_service_provided( + self.logger.trace(||{ + let msg_id = self.msg_id(); + format!( + "Msg {}: Charging exit service for {} bytes to wallet {} at {} per service and {} per byte", + msg_id, + msg.payload_size, + msg.paying_wallet, + msg.service_rate, + msg.byte_rate + ) + }); + let sum = self.record_service_provided( msg.service_rate, msg.byte_rate, msg.timestamp, msg.payload_size, &msg.paying_wallet, ); + + self.logging_utils.accounting_msgs_stats.manage_debug_log( + &self.logger, + AccountingMsgType::ExitServiceProvided, + DEFAULT_ACCOUNTING_MSG_LOG_WINDOW, + vec![NewPosting::new(msg.paying_wallet.address(), sum)], + ) } fn msg_id(&self) -> u32 { if self.logger.debug_enabled() { - self.message_id_generator.id() + self.logging_utils.msg_id_generator.id() } else { 0 } @@ -787,34 +817,59 @@ impl Accountant { let msg_id = self.msg_id(); trace!( self.logger, - "MsgId {}: Accruing debt to {} for consuming {} exited bytes", + "Msg {}: Accruing debt to {} for consuming {} exited bytes", msg_id, msg.exit.earning_wallet, msg.exit.payload_size ); - self.record_service_consumed( + let exit_sum = self.record_service_consumed( msg.exit.service_rate, msg.exit.byte_rate, msg.timestamp, msg.exit.payload_size, &msg.exit.earning_wallet, ); - msg.routing.iter().for_each(|routing_service| { - trace!( - self.logger, - "MsgId {}: Accruing debt to {} for consuming {} routed bytes", - msg_id, - routing_service.earning_wallet, - msg.routing_payload_size - ); - self.record_service_consumed( - routing_service.service_rate, - routing_service.byte_rate, - msg.timestamp, - msg.routing_payload_size, - &routing_service.earning_wallet, - ); - }) + + let new_postings = NewPostingsDebugContainer::new(&self.logger) + .add(msg.exit.earning_wallet.address(), exit_sum); + + let new_postings = self.handle_routing_services_consumed(msg, msg_id, new_postings); + + self.logging_utils.accounting_msgs_stats.manage_debug_log( + &self.logger, + AccountingMsgType::ServicesConsumed, + DEFAULT_ACCOUNTING_MSG_LOG_WINDOW, + new_postings.into(), + ) + } + + fn handle_routing_services_consumed( + &mut self, + msg: ReportServicesConsumedMessage, + msg_id: u32, + new_postings: NewPostingsDebugContainer, + ) -> NewPostingsDebugContainer { + msg.routing + .iter() + .fold(new_postings, |new_postings, routing_service| { + trace!( + self.logger, + "Msg {}: Accruing debt to {} for consuming {} routed bytes", + msg_id, + routing_service.earning_wallet, + msg.routing_payload_size + ); + + let sum = self.record_service_consumed( + routing_service.service_rate, + routing_service.byte_rate, + msg.timestamp, + msg.routing_payload_size, + &routing_service.earning_wallet, + ); + + new_postings.add(routing_service.earning_wallet.address(), sum) + }) } fn handle_payable_payment_setup(&mut self, msg: PricedTemplatesMessage) { @@ -1308,6 +1363,7 @@ mod tests { use crate::accountant::db_access_objects::utils::{ from_unix_timestamp, to_unix_timestamp, CustomQuery, }; + use crate::accountant::logging_utils::msg_id_generator::MessageIdGeneratorReal; use crate::accountant::payment_adjuster::Adjustment; use crate::accountant::scanners::payable_scanner::test_utils::PayableScannerBuilder; use crate::accountant::scanners::payable_scanner::tx_templates::initial::new::NewTxTemplates; @@ -1415,6 +1471,7 @@ mod tests { fn constants_have_correct_values() { assert_eq!(CRASH_KEY, "ACCOUNTANT"); assert_eq!(DEFAULT_PENDING_TOO_LONG_SEC, 21_600); + assert_eq!(DEFAULT_ACCOUNTING_MSG_LOG_WINDOW, 50); } #[test] @@ -1551,7 +1608,8 @@ mod tests { assert_eq!(result.consuming_wallet_opt, None); assert_eq!(result.earning_wallet, *DEFAULT_EARNING_WALLET); result - .message_id_generator + .logging_utils + .msg_id_generator .as_any() .downcast_ref::() .unwrap(); @@ -4741,7 +4799,8 @@ mod tests { .bootstrapper_config(config) .payable_daos(vec![ForAccountantBody(payable_dao_mock)]) .build(); - subject.message_id_generator = Box::new(MessageIdGeneratorMock::default().id_result(123)); + subject.logging_utils.msg_id_generator = + Box::new(MessageIdGeneratorMock::default().id_result(123)); let system = System::new("report_services_consumed_message_is_received"); let subject_addr: Addr = subject.start(); subject_addr @@ -5431,7 +5490,7 @@ mod tests { assert_using_the_same_logger(&logger, test_name, None); TestLogHandler::new().exists_log_containing(&format!( "INFO: {test_name}: Re-running the pending payable scan is recommended, as some parts \ - did not finish last time." + were prevented from completing during the previous attempt." )); } @@ -6802,8 +6861,8 @@ mod tests { let mut logger1 = Logger::new("msg_id_generator_off"); logger1.set_level_for_test(Level::Info); let mut subject = AccountantBuilder::default().build(); - let msg_id_generator = MessageIdGeneratorMock::default().id_result(789); //we prepared a result just for one call - subject.message_id_generator = Box::new(msg_id_generator); + let msg_id_generator = MessageIdGeneratorMock::default().id_result(789); // We prepared a result just for one call + subject.logging_utils.msg_id_generator = Box::new(msg_id_generator); subject.logger = logger1; let id1 = subject.msg_id(); diff --git a/node/src/accountant/scanners/mod.rs b/node/src/accountant/scanners/mod.rs index cbc337a8d..e8980d6cd 100644 --- a/node/src/accountant/scanners/mod.rs +++ b/node/src/accountant/scanners/mod.rs @@ -142,9 +142,9 @@ impl Scanners { ) -> Result { if let Some(started_at) = self.payable.scan_started_at() { unreachable!( - "Guards should ensure that no payable scanner can run if the pending payable \ - repetitive sequence is still ongoing. However, some other payable scan intruded \ - at {} and is still running at {}", + "Guards are applied to ensure that none of the payable scanners may run \ + if the pending payable has not finished the monitoring of pending txs. \ + Still, another payable scan intruded at {} and is still running at {}", StartScanError::timestamp_as_string(started_at), StartScanError::timestamp_as_string(SystemTime::now()) ) @@ -795,11 +795,11 @@ mod tests { false ); let dumped_records = pending_payable_scanner - .suspected_failed_payables + .supposed_failed_payables .dump_cache(); assert!( dumped_records.is_empty(), - "There should be no suspected failures but found {:?}.", + "There should be no supposed failures but found {:?}.", dumped_records ); assert_eq!( @@ -1027,18 +1027,19 @@ mod tests { let after = SystemTime::now(); let panic_msg = caught_panic.downcast_ref::().unwrap(); let expected_needle_1 = "internal error: entered unreachable code: \ - Guards should ensure that no payable scanner can run if the pending payable \ - repetitive sequence is still ongoing. However, some other payable scan intruded at"; + Guards are applied to ensure that none of the payable scanners may run if the pending \ + payable has not finished the monitoring of pending txs. Still, another payable scan \ + intruded at"; assert!( panic_msg.contains(expected_needle_1), - "We looked for {} but the actual string doesn't contain it: {}", + "We looked for \"{}\" but the actual string doesn't contain it: {}", expected_needle_1, panic_msg ); let expected_needle_2 = "and is still running at "; assert!( panic_msg.contains(expected_needle_2), - "We looked for {} but the actual string doesn't contain it: {}", + "We looked for \"{}\" but the actual string doesn't contain it: {}", expected_needle_2, panic_msg ); @@ -1201,7 +1202,8 @@ mod tests { TestLogHandler::new().assert_logs_match_in_order(vec![ &format!("INFO: {test_name}: Scanning for pending payable"), &format!( - "DEBUG: {test_name}: Found 1 pending payables and 1 suspected failures to process" + "DEBUG: {test_name}: Collected 1 pending payables and 1 supposed failures \ + for the receipt check" ), ]) } diff --git a/node/src/accountant/scanners/pending_payable_scanner/finish_scan.rs b/node/src/accountant/scanners/pending_payable_scanner/finish_scan.rs new file mode 100644 index 000000000..7df4e5f9f --- /dev/null +++ b/node/src/accountant/scanners/pending_payable_scanner/finish_scan.rs @@ -0,0 +1,38 @@ +// Copyright (c) 2025, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. + +use crate::accountant::scanners::pending_payable_scanner::utils::PendingPayableScanResult; +use crate::accountant::scanners::pending_payable_scanner::PendingPayableScanner; +use crate::accountant::scanners::Scanner; +use crate::accountant::TxReceiptsMessage; +use crate::time_marking_methods; +use masq_lib::logger::Logger; +use masq_lib::messages::ScanType; +use std::time::SystemTime; + +impl Scanner for PendingPayableScanner { + fn finish_scan( + &mut self, + message: TxReceiptsMessage, + logger: &Logger, + ) -> PendingPayableScanResult { + let response_skeleton_opt = message.response_skeleton_opt; + + let scan_report = self.interpret_tx_receipts(message, logger); + + let retry_opt = scan_report.requires_payments_retry(); + + debug!(logger, "Payment retry requirement: {:?}", retry_opt); + + self.process_txs_by_state(scan_report, logger); + + self.mark_as_ended(logger); + + Self::compose_scan_result(retry_opt, response_skeleton_opt) + } + + time_marking_methods!(PendingPayables); + + as_any_ref_in_trait_impl!(); + + as_any_mut_in_trait_impl!(); +} diff --git a/node/src/accountant/scanners/pending_payable_scanner/mod.rs b/node/src/accountant/scanners/pending_payable_scanner/mod.rs index 4fe12add1..a0ecfdda9 100644 --- a/node/src/accountant/scanners/pending_payable_scanner/mod.rs +++ b/node/src/accountant/scanners/pending_payable_scanner/mod.rs @@ -1,5 +1,7 @@ // Copyright (c) 2025, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. +mod finish_scan; +mod start_scan; mod tx_receipt_interpreter; pub mod utils; @@ -19,20 +21,16 @@ use crate::accountant::scanners::pending_payable_scanner::utils::{ ReceiptScanReport, RecheckRequiringFailures, Retry, TxByTable, TxCaseToBeInterpreted, TxHashByTable, UpdatableValidationStatus, }; -use crate::accountant::scanners::{ - PrivateScanner, Scanner, ScannerCommon, StartScanError, StartableScanner, -}; +use crate::accountant::scanners::{PrivateScanner, ScannerCommon, StartScanError}; use crate::accountant::{ join_with_commas, RequestTransactionReceipts, ResponseSkeleton, ScanForPendingPayables, TxReceiptResult, TxReceiptsMessage, }; use crate::blockchain::blockchain_interface::data_structures::TxBlock; use crate::sub_lib::accountant::{FinancialStatistics, PaymentThresholds}; -use crate::sub_lib::wallet::Wallet; -use crate::time_marking_methods; use itertools::{Either, Itertools}; use masq_lib::logger::Logger; -use masq_lib::messages::{ScanType, ToMessageBody, UiScanResponse}; +use masq_lib::messages::{ToMessageBody, UiScanResponse}; use masq_lib::simple_clock::{SimpleClock, SimpleClockReal}; use masq_lib::ui_gateway::{MessageTarget, NodeToUiMessage}; use std::cell::RefCell; @@ -40,7 +38,6 @@ use std::collections::{BTreeSet, HashMap}; use std::fmt::Display; use std::rc::Rc; use std::str::FromStr; -use std::time::SystemTime; use thousands::Separable; use web3::types::H256; @@ -65,7 +62,7 @@ pub struct PendingPayableScanner { pub failed_payable_dao: Box, pub financial_statistics: Rc>, pub current_sent_payables: Box>, - pub suspected_failed_payables: Box>, + pub supposed_failed_payables: Box>, pub clock: Box, } @@ -81,64 +78,10 @@ impl { } -impl StartableScanner - for PendingPayableScanner -{ - fn start_scan( - &mut self, - _wallet: &Wallet, - timestamp: SystemTime, - response_skeleton_opt: Option, - logger: &Logger, - ) -> Result { - self.mark_as_started(timestamp); - - info!(logger, "Scanning for pending payable"); - - let tx_hashes = self.harvest_tables(logger).map_err(|e| { - self.mark_as_ended(logger); - e - })?; - - Ok(RequestTransactionReceipts { - tx_hashes, - response_skeleton_opt, - }) - } -} - -impl Scanner for PendingPayableScanner { - fn finish_scan( - &mut self, - message: TxReceiptsMessage, - logger: &Logger, - ) -> PendingPayableScanResult { - let response_skeleton_opt = message.response_skeleton_opt; - - let scan_report = self.interpret_tx_receipts(message, logger); - - let retry_opt = scan_report.requires_payments_retry(); - - debug!(logger, "Payment retry requirement: {:?}", retry_opt); - - self.process_txs_by_state(scan_report, logger); - - self.mark_as_ended(logger); - - Self::compose_scan_result(retry_opt, response_skeleton_opt) - } - - time_marking_methods!(PendingPayables); - - as_any_ref_in_trait_impl!(); - - as_any_mut_in_trait_impl!(); -} - impl CachesEmptiableScanner for PendingPayableScanner { fn empty_caches(&mut self, logger: &Logger) { self.current_sent_payables.ensure_empty_cache(logger); - self.suspected_failed_payables.ensure_empty_cache(logger); + self.supposed_failed_payables.ensure_empty_cache(logger); } } @@ -157,7 +100,7 @@ impl PendingPayableScanner { failed_payable_dao, financial_statistics, current_sent_payables: Box::new(CurrentPendingPayables::default()), - suspected_failed_payables: Box::new(RecheckRequiringFailures::default()), + supposed_failed_payables: Box::new(RecheckRequiringFailures::default()), clock: Box::new(SimpleClockReal::default()), } } @@ -165,29 +108,22 @@ impl PendingPayableScanner { fn harvest_tables(&mut self, logger: &Logger) -> Result, StartScanError> { debug!(logger, "Harvesting sent_payable and failed_payable tables"); - let pending_tx_hashes_opt = self.harvest_pending_payables(); - let failure_hashes_opt = self.harvest_suspected_failures(); + let pending_tx_hashes = self.harvest_pending_payables(); + let failure_hashes = self.harvest_supposed_failures(); - if Self::is_there_nothing_to_process( - pending_tx_hashes_opt.as_ref(), - failure_hashes_opt.as_ref(), - ) { + if Self::is_there_nothing_to_process(&pending_tx_hashes, &failure_hashes) { return Err(StartScanError::NothingToProcess); } - Self::log_records_for_receipt_check( - pending_tx_hashes_opt.as_ref(), - failure_hashes_opt.as_ref(), - logger, - ); + Self::log_records_for_receipt_check(&pending_tx_hashes, &failure_hashes, logger); - Ok(Self::merge_hashes( - pending_tx_hashes_opt, - failure_hashes_opt, + Ok(Self::merge_hashes_as_single_vec( + pending_tx_hashes, + failure_hashes, )) } - fn harvest_pending_payables(&mut self) -> Option> { + fn harvest_pending_payables(&mut self) -> Vec { let pending_txs = self .sent_payable_dao .retrieve_txs(Some(RetrieveCondition::IsPending)) @@ -195,15 +131,15 @@ impl PendingPayableScanner { .collect_vec(); if pending_txs.is_empty() { - return None; + return vec![]; } let pending_tx_hashes = Self::wrap_hashes(&pending_txs, TxHashByTable::SentPayable); self.current_sent_payables.load_cache(pending_txs); - Some(pending_tx_hashes) + pending_tx_hashes } - fn harvest_suspected_failures(&mut self) -> Option> { + fn harvest_supposed_failures(&mut self) -> Vec { let failures = self .failed_payable_dao .retrieve_txs(Some(FailureRetrieveCondition::EveryRecheckRequiredRecord)) @@ -211,30 +147,28 @@ impl PendingPayableScanner { .collect_vec(); if failures.is_empty() { - return None; + return vec![]; } let failure_hashes = Self::wrap_hashes(&failures, TxHashByTable::FailedPayable); - self.suspected_failed_payables.load_cache(failures); - Some(failure_hashes) + self.supposed_failed_payables.load_cache(failures); + failure_hashes } fn is_there_nothing_to_process( - pending_tx_hashes_opt: Option<&Vec>, - failure_hashes_opt: Option<&Vec>, + pending_tx_hashes: &[TxHashByTable], + failure_hashes: &[TxHashByTable], ) -> bool { - pending_tx_hashes_opt.is_none() && failure_hashes_opt.is_none() + pending_tx_hashes.is_empty() && failure_hashes.is_empty() } - fn merge_hashes( - pending_tx_hashes_opt: Option>, - failure_hashes_opt: Option>, + fn merge_hashes_as_single_vec( + pending_tx_hashes: Vec, + failure_hashes: Vec, ) -> Vec { - let failures = failure_hashes_opt.unwrap_or_default(); - pending_tx_hashes_opt - .unwrap_or_default() + pending_tx_hashes .into_iter() - .chain(failures) + .chain(failure_hashes) .collect() } @@ -329,7 +263,7 @@ impl PendingPayableScanner { }; self.current_sent_payables.ensure_empty_cache(logger); - self.suspected_failed_payables.ensure_empty_cache(logger); + self.supposed_failed_payables.ensure_empty_cache(logger); cases } @@ -354,7 +288,7 @@ impl PendingPayableScanner { } } TxHashByTable::FailedPayable(tx_hash) => { - match self.suspected_failed_payables.get_record_by_hash(tx_hash) { + match self.supposed_failed_payables.get_record_by_hash(tx_hash) { Some(failed_tx) => { cases.push(TxCaseToBeInterpreted::new( TxByTable::FailedPayable(failed_tx), @@ -379,10 +313,10 @@ impl PendingPayableScanner { panic!( "Looking up '{:?}' in the cache, the record could not be found. Dumping \ - the remaining values. Pending payables: {:?}. Suspected failures: {:?}.", + the remaining values. Pending payables: {:?}. Supposed failures: {:?}.", missing_entry, rearrange(self.current_sent_payables.dump_cache()), - rearrange(self.suspected_failed_payables.dump_cache()), + rearrange(self.supposed_failed_payables.dump_cache()), ) } @@ -629,7 +563,7 @@ impl PendingPayableScanner { }); self.add_new_failures(grouped_failures.new_failures, logger); - self.finalize_suspected_failures(grouped_failures.rechecks_completed, logger); + self.finalize_supposed_failures(grouped_failures.rechecks_completed, logger); } fn add_new_failures(&self, new_failures: Vec, logger: &Logger) { @@ -681,7 +615,7 @@ impl PendingPayableScanner { } } - fn finalize_suspected_failures(&self, rechecks_completed: Vec, logger: &Logger) { + fn finalize_supposed_failures(&self, rechecks_completed: Vec, logger: &Logger) { fn prepare_hashmap(rechecks_completed: &[TxHash]) -> HashMap { rechecks_completed .iter() @@ -844,19 +778,15 @@ impl PendingPayableScanner { } fn log_records_for_receipt_check( - pending_tx_hashes_opt: Option<&Vec>, - failure_hashes_opt: Option<&Vec>, + pending_tx_hashes: &[TxHashByTable], + failure_hashes: &[TxHashByTable], logger: &Logger, ) { - fn resolve_optional_vec(vec_opt: Option<&Vec>) -> usize { - vec_opt.map(|hashes| hashes.len()).unwrap_or_default() - } - debug!( logger, - "Found {} pending payables and {} suspected failures to process", - resolve_optional_vec(pending_tx_hashes_opt), - resolve_optional_vec(failure_hashes_opt) + "Collected {} pending payables and {} supposed failures for the receipt check", + pending_tx_hashes.len(), + failure_hashes.len() ); } } @@ -929,7 +859,7 @@ mod tests { .build(); let logger = Logger::new("start_scan_fills_in_caches_and_returns_msg"); let pending_payable_cache_before = subject.current_sent_payables.dump_cache(); - let failed_payable_cache_before = subject.suspected_failed_payables.dump_cache(); + let failed_payable_cache_before = subject.supposed_failed_payables.dump_cache(); let result = subject.start_scan(&make_wallet("blah"), SystemTime::now(), None, &logger); @@ -956,7 +886,7 @@ mod tests { failed_payable_cache_before ); let pending_payable_cache_after = subject.current_sent_payables.dump_cache(); - let failed_payable_cache_after = subject.suspected_failed_payables.dump_cache(); + let failed_payable_cache_after = subject.supposed_failed_payables.dump_cache(); assert_eq!( pending_payable_cache_after, hashmap!(sent_tx_hash_1 => sent_tx_1, sent_tx_hash_2 => sent_tx_2) @@ -1064,7 +994,7 @@ mod tests { failed_payable_cache.load_cache(vec![failed_tx_1, failed_tx_2]); let mut subject = PendingPayableScannerBuilder::new().build(); subject.current_sent_payables = Box::new(pending_payable_cache); - subject.suspected_failed_payables = Box::new(failed_payable_cache); + subject.supposed_failed_payables = Box::new(failed_payable_cache); let logger = Logger::new("test"); let msg = TxReceiptsMessage { results: btreemap![TxHashByTable::SentPayable(sent_tx_hash_1) => Ok( @@ -1083,9 +1013,9 @@ mod tests { let expected = "Looking up 'SentPayable(0x00000000000000000000000000000000000000000000\ 00000000000000000123)' in the cache, the record could not be found. Dumping the remaining \ values. Pending payables: [SentTx { hash: 0x0000000000000000000000000000000000000000000000\ - 000000000000000890, receiver_address: 0x0000000000000000000558000000000558000000, \ + 000000000000000890, receiver_address: 0x00000000000000000001c80000001c80000001c8, \ amount_minor: 43237380096, timestamp: 29942784, gas_price_minor: 94818816, nonce: 456, \ - status: Pending(Waiting) }]. Suspected failures: []."; + status: Pending(Waiting) }]. Supposed failures: []."; assert_eq!(panic_msg, expected); } @@ -1104,7 +1034,7 @@ mod tests { failed_payable_cache.load_cache(vec![failed_tx_1]); let mut subject = PendingPayableScannerBuilder::new().build(); subject.current_sent_payables = Box::new(pending_payable_cache); - subject.suspected_failed_payables = Box::new(failed_payable_cache); + subject.supposed_failed_payables = Box::new(failed_payable_cache); let logger = Logger::new("test"); let msg = TxReceiptsMessage { results: btreemap![TxHashByTable::SentPayable(sent_tx_hash_1) => Ok(StatusReadFromReceiptCheck::Pending), @@ -1122,12 +1052,12 @@ mod tests { let expected = "Looking up 'FailedPayable(0x000000000000000000000000000000000000000000\ 00000000000000000003db)' in the cache, the record could not be found. Dumping the remaining \ values. Pending payables: [SentTx { hash: 0x000000000000000000000000000000000000000000000000\ - 00000000000001c8, receiver_address: 0x0000000000000000000558000000000558000000, amount_minor: \ + 00000000000001c8, receiver_address: 0x00000000000000000001c80000001c80000001c8, amount_minor: \ 43237380096, timestamp: 29942784, gas_price_minor: 94818816, nonce: 456, status: \ Pending(Waiting) }, SentTx { hash: 0x0000000000000000000000000000000000000000000000000000000\ - 000000315, receiver_address: 0x000000000000000000093f00000000093f000000, amount_minor: \ + 000000315, receiver_address: 0x0000000000000000000315000000315000000315, amount_minor: \ 387532395441, timestamp: 89643024, gas_price_minor: 491169069, nonce: 789, status: \ - Pending(Waiting) }]. Suspected failures: []."; + Pending(Waiting) }]. Supposed failures: []."; assert_eq!(panic_msg, expected); } @@ -2079,7 +2009,7 @@ mod tests { #[test] #[should_panic( expected = "Unable to complete the tx confirmation by the adjustment of the payable accounts \ - 0x0000000000000000000558000000000558000000 due to: \ + 0x00000000000000000001c80000001c80000001c8 due to: \ RusqliteError(\"record change not successful\")" )] fn handle_confirmed_transactions_panics_on_unchecking_payable_table() { diff --git a/node/src/accountant/scanners/pending_payable_scanner/start_scan.rs b/node/src/accountant/scanners/pending_payable_scanner/start_scan.rs new file mode 100644 index 000000000..816cbe900 --- /dev/null +++ b/node/src/accountant/scanners/pending_payable_scanner/start_scan.rs @@ -0,0 +1,34 @@ +// Copyright (c) 2025, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. + +use crate::accountant::scanners::pending_payable_scanner::PendingPayableScanner; +use crate::accountant::scanners::{Scanner, StartScanError, StartableScanner}; +use crate::accountant::{RequestTransactionReceipts, ResponseSkeleton, ScanForPendingPayables}; +use crate::sub_lib::wallet::Wallet; +use masq_lib::logger::Logger; +use std::time::SystemTime; + +impl StartableScanner + for PendingPayableScanner +{ + fn start_scan( + &mut self, + _wallet: &Wallet, + timestamp: SystemTime, + response_skeleton_opt: Option, + logger: &Logger, + ) -> Result { + self.mark_as_started(timestamp); + + info!(logger, "Scanning for pending payable"); + + let tx_hashes = self.harvest_tables(logger).map_err(|e| { + self.mark_as_ended(logger); + e + })?; + + Ok(RequestTransactionReceipts { + tx_hashes, + response_skeleton_opt, + }) + } +} diff --git a/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs b/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs index d04d458b4..ad1cbaf29 100644 --- a/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs +++ b/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs @@ -186,7 +186,7 @@ impl TxReceiptInterpreter { failed_tx.reason, ); - scan_report.register_finalization_of_suspected_failure(failed_tx.hash); + scan_report.register_finalization_of_supposed_failure(failed_tx.hash); } } scan_report diff --git a/node/src/accountant/scanners/pending_payable_scanner/utils.rs b/node/src/accountant/scanners/pending_payable_scanner/utils.rs index 2b8052496..e012d4579 100644 --- a/node/src/accountant/scanners/pending_payable_scanner/utils.rs +++ b/node/src/accountant/scanners/pending_payable_scanner/utils.rs @@ -51,7 +51,7 @@ impl ReceiptScanReport { .push(PresortedTxFailure::NewEntry(failed_tx)); } - pub(super) fn register_finalization_of_suspected_failure(&mut self, tx_hash: TxHash) { + pub(super) fn register_finalization_of_supposed_failure(&mut self, tx_hash: TxHash) { self.failures .tx_failures .push(PresortedTxFailure::RecheckCompleted(tx_hash)); @@ -761,7 +761,7 @@ mod tests { Cache misuse - some pending payables left unprocessed: \ {{0x0000000000000000000000000000000000000000000000000000000000000567: SentTx {{ hash: \ 0x0000000000000000000000000000000000000000000000000000000000000567, receiver_address: \ - 0x0000000000000000001035000000001035000000, amount_minor: 3658379210721, timestamp: \ + 0x0000000000000000000567000000567000000567, amount_minor: 3658379210721, timestamp: \ 275427216, gas_price_minor: 2645248887, nonce: 1383, status: Pending(Waiting) }}}}. \ Dumping." )); @@ -904,7 +904,7 @@ mod tests { Cache misuse - some tx failures left unprocessed: \ {{0x0000000000000000000000000000000000000000000000000000000000000567: FailedTx {{ hash: \ 0x0000000000000000000000000000000000000000000000000000000000000567, receiver_address: \ - 0x00000000000000000003cc0000000003cc000000, amount_minor: 3658379210721, timestamp: \ + 0x0000000000000000000144000000144000000144, amount_minor: 3658379210721, timestamp: \ 275427216, gas_price_minor: 2645248887, nonce: 1383, reason: PendingTooLong, status: \ RetryRequired }}}}. Dumping." )); diff --git a/node/src/accountant/test_utils.rs b/node/src/accountant/test_utils.rs index 9e3e06575..dfb2f03ed 100644 --- a/node/src/accountant/test_utils.rs +++ b/node/src/accountant/test_utils.rs @@ -21,6 +21,7 @@ use crate::accountant::db_access_objects::sent_payable_dao::{ use crate::accountant::db_access_objects::utils::{ from_unix_timestamp, to_unix_timestamp, CustomQuery, TxHash, TxIdentifiers, }; +use crate::accountant::logging_utils::msg_id_generator::MessageIdGenerator; use crate::accountant::payment_adjuster::{Adjustment, AnalysisError, PaymentAdjuster}; use crate::accountant::scanners::payable_scanner::msgs::PricedTemplatesMessage; use crate::accountant::scanners::payable_scanner::payment_adjuster_integration::PreparedAdjustment; @@ -36,8 +37,8 @@ use crate::bootstrapper::BootstrapperConfig; use crate::database::rusqlite_wrappers::TransactionSafeWrapper; use crate::db_config::config_dao::{ConfigDao, ConfigDaoFactory}; use crate::db_config::mocks::ConfigDaoMock; +use crate::sub_lib::accountant::PaymentThresholds; use crate::sub_lib::accountant::{DaoFactories, FinancialStatistics}; -use crate::sub_lib::accountant::{MessageIdGenerator, PaymentThresholds}; use crate::sub_lib::blockchain_bridge::OutboundPaymentsInstructions; use crate::sub_lib::wallet::Wallet; use crate::test_utils::make_wallet; @@ -1281,7 +1282,7 @@ pub struct PendingPayableScannerBuilder { payment_thresholds: PaymentThresholds, financial_statistics: FinancialStatistics, current_sent_payables: Box>, - suspected_failed_payables: Box>, + supposed_failed_payables: Box>, clock: Box, } @@ -1294,7 +1295,7 @@ impl PendingPayableScannerBuilder { payment_thresholds: PaymentThresholds::default(), financial_statistics: FinancialStatistics::default(), current_sent_payables: Box::new(PendingPayableCacheMock::default()), - suspected_failed_payables: Box::new(PendingPayableCacheMock::default()), + supposed_failed_payables: Box::new(PendingPayableCacheMock::default()), clock: Box::new(SimpleClockMock::default()), } } @@ -1323,7 +1324,7 @@ impl PendingPayableScannerBuilder { mut self, failures: Box>, ) -> Self { - self.suspected_failed_payables = failures; + self.supposed_failed_payables = failures; self } @@ -1341,7 +1342,7 @@ impl PendingPayableScannerBuilder { Rc::new(RefCell::new(self.financial_statistics)), ); scanner.current_sent_payables = self.current_sent_payables; - scanner.suspected_failed_payables = self.suspected_failed_payables; + scanner.supposed_failed_payables = self.supposed_failed_payables; scanner.clock = self.clock; scanner } diff --git a/node/src/blockchain/blockchain_interface/data_structures/errors.rs b/node/src/blockchain/blockchain_interface/data_structures/errors.rs index 03899343e..89275c242 100644 --- a/node/src/blockchain/blockchain_interface/data_structures/errors.rs +++ b/node/src/blockchain/blockchain_interface/data_structures/errors.rs @@ -186,12 +186,14 @@ mod tests { actual_error_msgs, slice_of_strs_to_vec_of_strings(&[ "Missing consuming wallet to pay payable from", - "Unsuccessful gas price query: \"Blockchain error: Query failed: Gas halves shut, no drop left\"", + "Unsuccessful gas price query: \"Blockchain error: Query failed: Gas halves shut, \ + no drop left\"", "Transaction id fetching failed: Blockchain error: Invalid response", - "Sending error: \"Terrible error!!\". Signed and hashed transactions: \"FailedTx { hash: 0x00000000000000\ - 000000000000000000000000000000000000000000000001c8, receiver_address: 0x00000000000\ - 00000002556000000002556000000, amount_minor: 43237380096, timestamp: 29942784, gas_\ - price_minor: 94818816, nonce: 456, reason: PendingTooLong, status: RetryRequired }\"", + "Sending error: \"Terrible error!!\". Signed and hashed transactions: \"FailedTx { \ + hash: 0x00000000000000000000000000000000000000000000000000000000000001c8, \ + receiver_address: 0x0000000000000000000c72000000c72000000c72, amount_minor: \ + 43237380096, timestamp: 29942784, gas_price_minor: 94818816, nonce: 456, reason: \ + PendingTooLong, status: RetryRequired }\"", BLOCKCHAIN_SERVICE_URL_NOT_SPECIFIED ]) ) diff --git a/node/src/blockchain/test_utils.rs b/node/src/blockchain/test_utils.rs index 238703d98..63a4eb013 100644 --- a/node/src/blockchain/test_utils.rs +++ b/node/src/blockchain/test_utils.rs @@ -199,10 +199,10 @@ pub fn make_block_hash(base: u32) -> H256 { pub fn make_address(base: u32) -> Address { let base = base % 0xfff; - let value = U256::from(base * 3); - let shifted = value << 72; - let value = U256::from(value) << 24; - let value = value | shifted; + let original_value = U256::from(base); + let long_shifted = original_value << 72; + let short_shifted = U256::from(original_value) << 36; + let value = short_shifted | long_shifted | original_value; let mut full_bytes = [0u8; 32]; value.to_big_endian(&mut full_bytes); let mut bytes = [0u8; 20]; diff --git a/node/src/sub_lib/accountant.rs b/node/src/sub_lib/accountant.rs index 039b1fe4f..8f9e80dac 100644 --- a/node/src/sub_lib/accountant.rs +++ b/node/src/sub_lib/accountant.rs @@ -21,7 +21,7 @@ use masq_lib::blockchains::chains::Chain; use masq_lib::ui_gateway::NodeFromUiMessage; use std::fmt::{Debug, Formatter}; use std::str::FromStr; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::AtomicU32; use std::time::{Duration, SystemTime}; lazy_static! { @@ -179,21 +179,6 @@ pub enum SignConversionError { I128(String), } -pub trait MessageIdGenerator { - fn id(&self) -> u32; - as_any_ref_in_trait!(); -} - -#[derive(Default)] -pub struct MessageIdGeneratorReal {} - -impl MessageIdGenerator for MessageIdGeneratorReal { - fn id(&self) -> u32 { - MSG_ID_INCREMENTER.fetch_add(1, Ordering::Relaxed) - } - as_any_ref_in_trait_impl!(); -} - #[derive(Debug, Clone, PartialEq, Eq, Copy)] pub enum DetailedScanType { NewPayables, @@ -204,12 +189,12 @@ pub enum DetailedScanType { #[cfg(test)] mod tests { + use crate::accountant::test_utils::AccountantBuilder; use crate::accountant::{checked_conversion, Accountant}; use crate::sub_lib::accountant::{ - AccountantSubsFactoryReal, DetailedScanType, MessageIdGenerator, MessageIdGeneratorReal, - PaymentThresholds, ScanIntervals, SubsFactory, DEFAULT_EARNING_WALLET, - DEFAULT_PAYMENT_THRESHOLDS, MSG_ID_INCREMENTER, TEMPORARY_CONSUMING_WALLET, + AccountantSubsFactoryReal, DetailedScanType, PaymentThresholds, ScanIntervals, SubsFactory, + DEFAULT_EARNING_WALLET, DEFAULT_PAYMENT_THRESHOLDS, TEMPORARY_CONSUMING_WALLET, }; use crate::sub_lib::wallet::Wallet; use crate::test_utils::recorder::{make_accountant_subs_from_recorder, Recorder}; @@ -217,8 +202,6 @@ mod tests { use masq_lib::blockchains::chains::Chain; use masq_lib::messages::ScanType; use std::str::FromStr; - use std::sync::atomic::Ordering; - use std::sync::Mutex; use std::time::Duration; impl From for ScanType { @@ -232,8 +215,6 @@ mod tests { } } - static MSG_ID_GENERATOR_TEST_GUARD: Mutex<()> = Mutex::new(()); - impl PaymentThresholds { pub fn sugg_thru_decreasing(&self, now: i64) -> i64 { self.sugg_and_grace(now) - checked_conversion::(self.threshold_interval_sec) @@ -282,31 +263,6 @@ mod tests { assert_eq!(subs, Accountant::make_subs_from(&addr)) } - #[test] - fn msg_id_generator_increments_by_one_with_every_call() { - let _guard = MSG_ID_GENERATOR_TEST_GUARD.lock().unwrap(); - let subject = MessageIdGeneratorReal::default(); - - let id1 = subject.id(); - let id2 = subject.id(); - let id3 = subject.id(); - - assert_eq!(id2, id1 + 1); - assert_eq!(id3, id2 + 1) - } - - #[test] - fn msg_id_generator_wraps_around_max_value() { - let _guard = MSG_ID_GENERATOR_TEST_GUARD.lock().unwrap(); - MSG_ID_INCREMENTER.store(u32::MAX, Ordering::Relaxed); - let subject = MessageIdGeneratorReal::default(); - subject.id(); //this returns the previous, not the newly incremented - - let id = subject.id(); - - assert_eq!(id, 0) - } - #[test] fn default_for_scan_intervals_can_be_computed() { let chain_a = Chain::BaseMainnet;