diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 11093705e6..586cb6a28d 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -4335,6 +4335,58 @@ impl AccountsDb { self.update_accounts_hash_with_index_option(true, true, slot, ancestors, None) } + fn scan_multiple_account_storages_one_slot( + storages: &[Arc], + scan_func: &F, + slot: Slot, + retval: &mut B, + ) where + F: Fn(LoadedAccount, &mut B, Slot) + Send + Sync, + B: Send + Default, + { + // we have to call the scan_func in order of write_version within a slot if there are multiple storages per slot + let mut len = storages.len(); + let mut progress = Vec::with_capacity(len); + let mut current = Vec::with_capacity(len); + for storage in storages { + let accounts = storage.accounts.accounts(0); + let mut iterator: std::vec::IntoIter> = accounts.into_iter(); + if let Some(item) = iterator + .next() + .map(|stored_account| (stored_account.meta.write_version, Some(stored_account))) + { + current.push(item); + progress.push(iterator); + } + } + while !progress.is_empty() { + let mut min = current[0].0; + let mut min_index = 0; + for (i, (item, _)) in current.iter().enumerate().take(len).skip(1) { + if item < &min { + min_index = i; + min = *item; + } + } + let mut account = (0, None); + std::mem::swap(&mut account, &mut current[min_index]); + scan_func(LoadedAccount::Stored(account.1.unwrap()), retval, slot); + let next = progress[min_index] + .next() + .map(|stored_account| (stored_account.meta.write_version, Some(stored_account))); + match next { + Some(item) => { + current[min_index] = item; + } + None => { + current.remove(min_index); + progress.remove(min_index); + len -= 1; + } + } + } + } + /// Scan through all the account storage in parallel fn scan_account_storage_no_bank( snapshot_storages: &SortedStorages, @@ -4357,12 +4409,12 @@ impl AccountsDb { for slot in start..end { let sub_storages = snapshot_storages.get(slot); if let Some(sub_storages) = sub_storages { - for storage in sub_storages { - let accounts = storage.accounts.accounts(0); - accounts.into_iter().for_each(|stored_account| { - scan_func(LoadedAccount::Stored(stored_account), &mut retval, slot) - }); - } + Self::scan_multiple_account_storages_one_slot( + sub_storages, + &scan_func, + slot, + &mut retval, + ); } } retval @@ -4456,7 +4508,6 @@ impl AccountsDb { return; } - let version = loaded_account.write_version(); let raw_lamports = loaded_account.lamports(); let zero_raw_lamports = raw_lamports == 0; let balance = if zero_raw_lamports { @@ -4465,11 +4516,9 @@ impl AccountsDb { raw_lamports }; - let source_item = CalculateHashIntermediate::new( - version, + let source_item = CalculateHashIntermediate::new_without_slot( loaded_account.loaded_hash(), balance, - slot, *pubkey, ); @@ -5822,7 +5871,7 @@ pub mod tests { accounts_hash::MERKLE_FANOUT, accounts_index::RefCount, accounts_index::{tests::*, AccountSecondaryIndexesIncludeExclude}, - append_vec::AccountMeta, + append_vec::{test_utils::TempFile, AccountMeta}, inline_spl_token_v2_0, }; use assert_matches::assert_matches; @@ -6266,6 +6315,130 @@ pub mod tests { assert_eq!(result, vec![vec![expected]]); } + #[test] + fn test_accountsdb_scan_account_storage_no_bank_one_slot() { + solana_logger::setup(); + + let expected = 1; + let tf = crate::append_vec::test_utils::get_append_vec_path( + "test_accountsdb_scan_account_storage_no_bank", + ); + let (_temp_dirs, paths) = get_temp_accounts_paths(1).unwrap(); + let slot_expected: Slot = 0; + let size: usize = 123; + let mut data = AccountStorageEntry::new(&paths[0], slot_expected, 0, size as u64); + let av = AppendVec::new(&tf.path, true, 1024 * 1024); + data.accounts = av; + + let arc = Arc::new(data); + let storages = vec![vec![arc]]; + let pubkey = solana_sdk::pubkey::new_rand(); + let acc = AccountSharedData::new(1, 48, AccountSharedData::default().owner()); + let sm = StoredMeta { + data_len: 1, + pubkey, + write_version: 1, + }; + storages[0][0] + .accounts + .append_accounts(&[(sm, Some(&acc))], &[&Hash::default()]); + + let calls = AtomicU64::new(0); + let mut accum = Vec::new(); + let scan_func = |loaded_account: LoadedAccount, accum: &mut Vec, slot: Slot| { + calls.fetch_add(1, Ordering::Relaxed); + assert_eq!(loaded_account.pubkey(), &pubkey); + assert_eq!(slot_expected, slot); + accum.push(expected); + }; + AccountsDb::scan_multiple_account_storages_one_slot( + &storages[0], + &scan_func, + slot_expected, + &mut accum, + ); + assert_eq!(calls.load(Ordering::Relaxed), 1); + assert_eq!(accum, vec![expected]); + } + + fn sample_storage_with_entries( + tf: &TempFile, + write_version: StoredMetaWriteVersion, + slot: Slot, + pubkey: &Pubkey, + ) -> SnapshotStorages { + let (_temp_dirs, paths) = get_temp_accounts_paths(1).unwrap(); + let size: usize = 123; + let mut data = AccountStorageEntry::new(&paths[0], slot, 0, size as u64); + let av = AppendVec::new(&tf.path, true, 1024 * 1024); + data.accounts = av; + + let arc = Arc::new(data); + let storages = vec![vec![arc]]; + let acc = AccountSharedData::new(1, 48, AccountSharedData::default().owner()); + let sm = StoredMeta { + data_len: 1, + pubkey: *pubkey, + write_version, + }; + storages[0][0] + .accounts + .append_accounts(&[(sm, Some(&acc))], &[&Hash::default()]); + storages + } + + #[test] + fn test_accountsdb_scan_multiple_account_storage_no_bank_one_slot() { + solana_logger::setup(); + + let slot_expected: Slot = 0; + let tf = crate::append_vec::test_utils::get_append_vec_path( + "test_accountsdb_scan_account_storage_no_bank", + ); + let write_version1 = 0; + let write_version2 = 1; + let pubkey1 = solana_sdk::pubkey::new_rand(); + let pubkey2 = solana_sdk::pubkey::new_rand(); + for swap in [false, true].iter() { + let mut storages = [ + sample_storage_with_entries(&tf, write_version1, slot_expected, &pubkey1) + .remove(0) + .remove(0), + sample_storage_with_entries(&tf, write_version2, slot_expected, &pubkey2) + .remove(0) + .remove(0), + ]; + if *swap { + storages[..].swap(0, 1); + } + let calls = AtomicU64::new(0); + let scan_func = |loaded_account: LoadedAccount, accum: &mut Vec, slot: Slot| { + calls.fetch_add(1, Ordering::Relaxed); + let write_version = loaded_account.write_version(); + let first = loaded_account.pubkey() == &pubkey1 && write_version == write_version1; + assert!( + first || loaded_account.pubkey() == &pubkey2 && write_version == write_version2 + ); + assert_eq!(slot_expected, slot); + if first { + assert!(accum.is_empty()); + } else { + assert!(accum.len() == 1); + } + accum.push(write_version); + }; + let mut accum = Vec::new(); + AccountsDb::scan_multiple_account_storages_one_slot( + &storages, + &scan_func, + slot_expected, + &mut accum, + ); + assert_eq!(calls.load(Ordering::Relaxed), storages.len() as u64); + assert_eq!(accum, vec![write_version1, write_version2]); + } + } + #[test] fn test_accountsdb_add_root() { solana_logger::setup(); diff --git a/runtime/src/accounts_hash.rs b/runtime/src/accounts_hash.rs index 474b2a4e0a..c6e73b1b1c 100644 --- a/runtime/src/accounts_hash.rs +++ b/runtime/src/accounts_hash.rs @@ -69,20 +69,25 @@ impl HashStats { #[derive(Default, Debug, PartialEq, Clone)] pub struct CalculateHashIntermediate { - pub version: u64, pub hash: Hash, pub lamports: u64, - pub slot: Slot, pub pubkey: Pubkey, } impl CalculateHashIntermediate { - pub fn new(version: u64, hash: Hash, lamports: u64, slot: Slot, pubkey: Pubkey) -> Self { + pub fn new_without_slot(hash: Hash, lamports: u64, pubkey: Pubkey) -> Self { + Self { + hash, + lamports, + pubkey, + } + } + + // exists so tests and benches don't have to change yet + pub fn new(_version: u64, hash: Hash, lamports: u64, _slot: Slot, pubkey: Pubkey) -> Self { Self { - version, hash, lamports, - slot, pubkey, } } @@ -551,13 +556,7 @@ impl AccountsHash { b: &CalculateHashIntermediate, ) -> std::cmp::Ordering { // note partial_cmp only returns None with floating point comparisons - match a.pubkey.partial_cmp(&b.pubkey).unwrap() { - std::cmp::Ordering::Equal => match b.slot.partial_cmp(&a.slot).unwrap() { - std::cmp::Ordering::Equal => b.version.partial_cmp(&a.version).unwrap(), - other => other, - }, - other => other, - } + a.pubkey.partial_cmp(&b.pubkey).unwrap() } fn sort_hash_intermediate( @@ -569,7 +568,8 @@ impl AccountsHash { let sorted_data_by_pubkey: Vec> = data_by_pubkey .into_par_iter() .map(|mut pk_range| { - pk_range.par_sort_unstable_by(Self::compare_two_hash_entries); + // has to be a stable sort because items are in slot order already + pk_range.sort_by(Self::compare_two_hash_entries); pk_range }) .collect(); @@ -634,7 +634,8 @@ impl AccountsHash { .map(|chunk_index| { let mut start_index = chunk_index * chunk_size; let mut end_index = start_index + chunk_size; - if chunk_index == max - 1 { + let last = chunk_index == max - 1; + if last { end_index = len; } @@ -645,7 +646,7 @@ impl AccountsHash { } let (result, sum) = Self::de_dup_accounts_from_stores( - chunk_index == 0, + last, &pubkey_division[start_index..end_index], ); let mut overall = overall_sum.lock().unwrap(); @@ -660,7 +661,7 @@ impl AccountsHash { } fn de_dup_accounts_from_stores( - is_first_slice: bool, + is_last_slice: bool, slice: &[CalculateHashIntermediate], ) -> (Vec, u128) { let len = slice.len(); @@ -669,29 +670,43 @@ impl AccountsHash { let mut sum: u128 = 0; if len > 0 { let mut i = 0; + let mut insert_item = false; // look_for_first_key means the first key we find in our slice may be a // continuation of accounts belonging to a key that started in the last slice. // so, look_for_first_key=true means we have to find the first key different than // the first key we encounter in our slice. Note that if this is true, // our slice begins one index prior to the 'actual' start of our logical range. - let mut look_for_first_key = !is_first_slice; 'outer: loop { // at start of loop, item at 'i' is the first entry for a given pubkey - unless look_for_first - let now = &slice[i]; - let last = now.pubkey; - if !look_for_first_key && now.lamports != ZERO_RAW_LAMPORTS_SENTINEL { - // first entry for this key that starts in our slice - result.push(now.hash); - sum += now.lamports as u128; + let mut now = &slice[i]; + let mut last = now.pubkey; + if insert_item { + if now.lamports != ZERO_RAW_LAMPORTS_SENTINEL { + // first entry for this key that starts in our slice + result.push(now.hash); + sum += now.lamports as u128; + } + if i + 1 == len { + break; + } + i += 1; + now = &slice[i]; + last = now.pubkey; } for (k, now) in slice.iter().enumerate().skip(i + 1) { if now.pubkey != last { - i = k; - look_for_first_key = false; + i = k - 1; + insert_item = true; continue 'outer; } } + if is_last_slice { + insert_item = true; + i = len - 1; + continue 'outer; + } + break; // ran out of items in our slice, so our slice is done } } @@ -1266,37 +1281,37 @@ pub mod tests { type ExpectedType = (String, bool, u64, String); let expected:Vec = vec![ // ("key/lamports key2/lamports ...", - // is_first_slice + // is_last_slice // result lamports // result hashes) // "a5" = key_a, 5 lamports ("a1", false, 0, "[]"), - ("a1b2", false, 2, "[4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), - ("a1b2b3", false, 2, "[4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), - ("a1b2b3b4", false, 2, "[4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), - ("a1b2b3b4c5", false, 7, "[4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("a1b2", false, 1, "[11111111111111111111111111111111]"), + ("a1b2b3", false, 1, "[11111111111111111111111111111111]"), + ("a1b2b3b4", false, 1, "[11111111111111111111111111111111]"), + ("a1b2b3b4c5", false, 5, "[11111111111111111111111111111111, CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), ("b2", false, 0, "[]"), ("b2b3", false, 0, "[]"), ("b2b3b4", false, 0, "[]"), - ("b2b3b4c5", false, 5, "[GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("b2b3b4c5", false, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), ("b3", false, 0, "[]"), ("b3b4", false, 0, "[]"), - ("b3b4c5", false, 5, "[GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("b3b4c5", false, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), ("b4", false, 0, "[]"), - ("b4c5", false, 5, "[GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("b4c5", false, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), ("c5", false, 0, "[]"), ("a1", true, 1, "[11111111111111111111111111111111]"), ("a1b2", true, 3, "[11111111111111111111111111111111, 4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), - ("a1b2b3", true, 3, "[11111111111111111111111111111111, 4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), - ("a1b2b3b4", true, 3, "[11111111111111111111111111111111, 4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), - ("a1b2b3b4c5", true, 8, "[11111111111111111111111111111111, 4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("a1b2b3", true, 4, "[11111111111111111111111111111111, 8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR]"), + ("a1b2b3b4", true, 5, "[11111111111111111111111111111111, CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), + ("a1b2b3b4c5", true, 10, "[11111111111111111111111111111111, CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), ("b2", true, 2, "[4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), - ("b2b3", true, 2, "[4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), - ("b2b3b4", true, 2, "[4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), - ("b2b3b4c5", true, 7, "[4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("b2b3", true, 3, "[8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR]"), + ("b2b3b4", true, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), + ("b2b3b4c5", true, 9, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), ("b3", true, 3, "[8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR]"), - ("b3b4", true, 3, "[8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR]"), - ("b3b4c5", true, 8, "[8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("b3b4", true, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), + ("b3b4c5", true, 9, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), ("b4", true, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), ("b4c5", true, 9, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), ("c5", true, 5, "[GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), @@ -1311,14 +1326,14 @@ pub mod tests { }).collect(); let mut expected_index = 0; - for first_slice in 0..2 { + for last_slice in 0..2 { for start in 0..COUNT { for end in start + 1..COUNT { - let is_first_slice = first_slice == 1; + let is_last_slice = last_slice == 1; let accounts = accounts.clone(); let slice = &accounts[start..end]; - let result = AccountsHash::de_dup_accounts_from_stores(is_first_slice, slice); + let result = AccountsHash::de_dup_accounts_from_stores(is_last_slice, slice); let (hashes2, lamports2) = AccountsHash::de_dup_accounts_in_parallel(slice, 1); let (hashes3, lamports3) = AccountsHash::de_dup_accounts_in_parallel(slice, 2); let (hashes4, lamports4) = AccountsHash::de_dup_and_eliminate_zeros( @@ -1384,12 +1399,12 @@ pub mod tests { let packaged_result: ExpectedType = ( human_readable, - is_first_slice, + is_last_slice, result.1 as u64, hash_result_as_string, ); - if is_first_slice { + if is_last_slice { // the parallel version always starts with 'first slice' assert_eq!( result.0, hashes, @@ -1406,7 +1421,7 @@ pub mod tests { assert_eq!(expected[expected_index], packaged_result); // for generating expected results - // error!("{:?},", packaged_result); + //error!("{:?},", packaged_result); expected_index += 1; } } @@ -1440,7 +1455,7 @@ pub mod tests { vec![val2.clone(), val.clone()], vec![val3.clone(), val4.clone()], ]; - let sorted = vec![vec![val, val2], vec![val4, val3]]; + let sorted = vec![vec![val2, val], vec![val3, val4]]; let result = AccountsHash::sort_hash_intermediate(src, &mut stats); assert_eq!(result, sorted); @@ -1464,7 +1479,7 @@ pub mod tests { let hash2 = Hash::new_unique(); let val2 = CalculateHashIntermediate::new(0, hash2, 4, 1, key); assert_eq!( - std::cmp::Ordering::Less, + std::cmp::Ordering::Equal, // no longer comparing slots or versions AccountsHash::compare_two_hash_entries(&val, &val2) ); @@ -1492,7 +1507,7 @@ pub mod tests { let hash4 = Hash::new_unique(); let val4 = CalculateHashIntermediate::new(2, hash4, 6, 1, key); assert_eq!( - std::cmp::Ordering::Greater, + std::cmp::Ordering::Equal, // no longer comparing slots or versions AccountsHash::compare_two_hash_entries(&val, &val4) ); @@ -1500,7 +1515,7 @@ pub mod tests { let hash5 = Hash::new_unique(); let val5 = CalculateHashIntermediate::new(0, hash5, 8, 2, key); assert_eq!( - std::cmp::Ordering::Greater, + std::cmp::Ordering::Equal, // no longer comparing slots or versions AccountsHash::compare_two_hash_entries(&val, &val5) ); } @@ -1526,7 +1541,7 @@ pub mod tests { Slot::default(), key, ); - account_maps.insert(0, val); // has to be before other entry since sort order matters + account_maps.push(val); // has to be after previous entry since account_maps are in slot order let result = AccountsHash::de_dup_accounts_from_stores(true, &account_maps[..]); assert_eq!(result, (vec![], 0));