From 8d1e5ac294bdab2811d8d85271810ca321981b28 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Wed, 8 Dec 2021 14:09:34 -0600 Subject: [PATCH] refactor and test scan abort code (#21390) --- runtime/src/accounts.rs | 135 +++++++++++++++++++++++++++------- runtime/src/accounts_index.rs | 34 +++++++++ 2 files changed, 144 insertions(+), 25 deletions(-) diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index ee78fabbe2..514a97fefc 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -783,6 +783,42 @@ impl Accounts { ) } + fn calc_scan_result_size(account: &AccountSharedData) -> usize { + account.data().len() + + std::mem::size_of::() + + std::mem::size_of::() + } + + /// Accumulate size of (pubkey + account) into sum. + /// Return true iff sum > 'byte_limit_for_scan' + fn accumulate_and_check_scan_result_size( + sum: &AtomicUsize, + account: &AccountSharedData, + byte_limit_for_scan: &Option, + ) -> bool { + if let Some(byte_limit_for_scan) = byte_limit_for_scan.as_ref() { + let added = Self::calc_scan_result_size(account); + sum.fetch_add(added, Ordering::Relaxed) + .saturating_add(added) + > *byte_limit_for_scan + } else { + false + } + } + + fn maybe_abort_scan( + result: ScanResult>, + config: &ScanConfig, + ) -> ScanResult> { + if config.is_aborted() { + ScanResult::Err(ScanError::Aborted( + "The accumulated scan results exceeded the limit".to_string(), + )) + } else { + result + } + } + pub fn load_by_index_key_with_filter bool>( &self, ancestors: &Ancestors, @@ -793,10 +829,7 @@ impl Accounts { byte_limit_for_scan: Option, ) -> ScanResult> { let sum = AtomicUsize::default(); - let config = ScanConfig { - abort: Some(config.abort.as_ref().map(Arc::clone).unwrap_or_default()), - collect_all_unsorted: config.collect_all_unsorted, - }; + let config = config.recreate_with_abort(); let result = self .accounts_db .index_scan_accounts( @@ -806,20 +839,15 @@ impl Accounts { |collector: &mut Vec<(Pubkey, AccountSharedData)>, some_account_tuple| { Self::load_while_filtering(collector, some_account_tuple, |account| { let use_account = filter(account); - if use_account { - if let Some(byte_limit_for_scan) = byte_limit_for_scan.as_ref() { - let added = account.data().len() - + std::mem::size_of::() - + std::mem::size_of::(); - if sum - .fetch_add(added, Ordering::Relaxed) - .saturating_add(added) - > *byte_limit_for_scan - { - // total size of results exceeds size limit, so abort scan - config.abort(); - } - } + if use_account + && Self::accumulate_and_check_scan_result_size( + &sum, + account, + &byte_limit_for_scan, + ) + { + // total size of results exceeds size limit, so abort scan + config.abort(); } use_account }); @@ -827,13 +855,7 @@ impl Accounts { &config, ) .map(|result| result.0); - if config.is_aborted() { - ScanResult::Err(ScanError::Aborted( - "The accumulated scan results exceeded the limit".to_string(), - )) - } else { - result - } + Self::maybe_abort_scan(result, &config) } pub fn account_indexes_include_key(&self, key: &Pubkey) -> bool { @@ -3329,4 +3351,67 @@ mod tests { vec![(pubkey1, 42), (pubkey2, 41)] ); } + + fn zero_len_account_size() -> usize { + std::mem::size_of::() + std::mem::size_of::() + } + + #[test] + fn test_calc_scan_result_size() { + for len in 0..3 { + assert_eq!( + Accounts::calc_scan_result_size(&AccountSharedData::new( + 0, + len, + &Pubkey::default() + )), + zero_len_account_size() + len + ); + } + } + + #[test] + fn test_maybe_abort_scan() { + assert!(Accounts::maybe_abort_scan(ScanResult::Ok(vec![]), &ScanConfig::default()).is_ok()); + let config = ScanConfig::default().recreate_with_abort(); + assert!(Accounts::maybe_abort_scan(ScanResult::Ok(vec![]), &config).is_ok()); + config.abort(); + assert!(Accounts::maybe_abort_scan(ScanResult::Ok(vec![]), &config).is_err()); + } + + #[test] + fn test_accumulate_and_check_scan_result_size() { + for (account, byte_limit_for_scan, result) in [ + (AccountSharedData::default(), zero_len_account_size(), false), + ( + AccountSharedData::new(0, 1, &Pubkey::default()), + zero_len_account_size(), + true, + ), + ( + AccountSharedData::new(0, 2, &Pubkey::default()), + zero_len_account_size() + 3, + false, + ), + ] { + let sum = AtomicUsize::default(); + assert_eq!( + result, + Accounts::accumulate_and_check_scan_result_size( + &sum, + &account, + &Some(byte_limit_for_scan) + ) + ); + // calling a second time should accumulate above the threshold + assert!(Accounts::accumulate_and_check_scan_result_size( + &sum, + &account, + &Some(byte_limit_for_scan) + )); + assert!(!Accounts::accumulate_and_check_scan_result_size( + &sum, &account, &None + )); + } + } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 18b03c8c0a..c453e4a2e3 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -80,12 +80,21 @@ impl ScanConfig { } } + /// mark the scan as aborted pub fn abort(&self) { if let Some(abort) = self.abort.as_ref() { abort.store(true, Ordering::Relaxed) } } + /// use existing 'abort' if available, otherwise allocate one + pub fn recreate_with_abort(&self) -> Self { + ScanConfig { + abort: Some(self.abort.as_ref().map(Arc::clone).unwrap_or_default()), + collect_all_unsorted: self.collect_all_unsorted, + } + } + /// true if scan should abort pub fn is_aborted(&self) -> bool { if let Some(abort) = self.abort.as_ref() { @@ -4320,4 +4329,29 @@ pub mod tests { config.bins = Some(3); AccountsIndex::::new(Some(config)); } + + #[test] + fn test_scan_config() { + for collect_all_unsorted in [false, true] { + let config = ScanConfig::new(collect_all_unsorted); + assert_eq!(config.collect_all_unsorted, collect_all_unsorted); + assert!(config.abort.is_none()); // not allocated + assert!(!config.is_aborted()); + config.abort(); // has no effect + assert!(!config.is_aborted()); + } + + let config = ScanConfig::default(); + assert!(!config.collect_all_unsorted); + assert!(config.abort.is_none()); + + let config = config.recreate_with_abort(); + assert!(config.abort.is_some()); + assert!(!config.is_aborted()); + config.abort(); + assert!(config.is_aborted()); + + let config = config.recreate_with_abort(); + assert!(config.is_aborted()); + } }