From 738cc9549fad2e6a8e7138c77a9c77c6238b6ccd Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Tue, 1 Jun 2021 19:33:12 -0500 Subject: [PATCH] parallel get_snapshot_storages (#17589) --- runtime/src/accounts_db.rs | 117 +++++++++++++++++++++------- runtime/src/bank.rs | 5 +- runtime/src/serde_snapshot/tests.rs | 13 +++- 3 files changed, 100 insertions(+), 35 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 25564af1fb..67f9ddf181 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -4251,7 +4251,7 @@ impl AccountsDb { ) -> Result<(Hash, u64), BankHashVerificationError> { if !use_index { let mut time = Measure::start("collect"); - let combined_maps = self.get_snapshot_storages(slot, Some(ancestors)); + let combined_maps = self.get_snapshot_storages(slot, Some(ancestors)).0; time.stop(); let timings = HashStats { @@ -5148,26 +5148,74 @@ impl AccountsDb { &self, snapshot_slot: Slot, _ancestors: Option<&Ancestors>, - ) -> SnapshotStorages { - self.storage + ) -> (SnapshotStorages, Vec) { + let mut m = Measure::start("get slots"); + let slots = self + .storage .0 .iter() - .filter(|iter_item| { - let slot = *iter_item.key(); - slot <= snapshot_slot && self.accounts_index.is_root(slot) + .map(|k| *k.key() as Slot) + .collect::>(); + m.stop(); + let mut m2 = Measure::start("filter"); + + let chunk_size = 5_000; + let wide = self.thread_pool_clean.install(|| { + slots + .par_chunks(chunk_size) + .map(|slots| { + slots + .iter() + .filter_map(|slot| { + if *slot <= snapshot_slot && self.accounts_index.is_root(*slot) { + self.storage.0.get(&slot).map_or_else( + || None, + |item| { + let storages = item + .value() + .read() + .unwrap() + .values() + .filter(|x| x.has_accounts()) + .cloned() + .collect::>(); + if !storages.is_empty() { + Some((storages, *slot)) + } else { + None + } + }, + ) + } else { + None + } + }) + .collect::>() + }) + .collect::>() + }); + m2.stop(); + let mut m3 = Measure::start("flatten"); + // some slots we found above may not have been a root or met the slot # constraint. + // So the resulting 'slots' vector we return will be a subset of the raw keys we got initially. + let mut slots = Vec::with_capacity(slots.len()); + let result = wide + .into_iter() + .flatten() + .map(|(storage, slot)| { + slots.push(slot); + storage }) - .map(|iter_item| { - iter_item - .value() - .read() - .unwrap() - .values() - .filter(|x| x.has_accounts()) - .cloned() - .collect() - }) - .filter(|snapshot_storage: &SnapshotStorage| !snapshot_storage.is_empty()) - .collect() + .collect::>(); + m3.stop(); + + debug!( + "hash_total: get slots: {}, filter: {}, flatten: {}", + m.as_us(), + m2.as_us(), + m3.as_us() + ); + (result, slots) } #[allow(clippy::needless_collect)] @@ -5747,7 +5795,16 @@ pub mod tests { accounts.store_uncached(slot, &to_store[..]); accounts.add_root(slot); - let storages = accounts.get_snapshot_storages(slot, None); + let (storages, slots) = accounts.get_snapshot_storages(slot, None); + assert_eq!(storages.len(), slots.len()); + storages + .iter() + .zip(slots.iter()) + .for_each(|(storages, slot)| { + for storage in storages { + assert_eq!(&storage.slot(), slot); + } + }); (storages, raw_expected) } @@ -8157,7 +8214,7 @@ pub mod tests { #[test] fn test_get_snapshot_storages_empty() { let db = AccountsDb::new(Vec::new(), &ClusterType::Development); - assert!(db.get_snapshot_storages(0, None).is_empty()); + assert!(db.get_snapshot_storages(0, None).0.is_empty()); } #[test] @@ -8172,10 +8229,10 @@ pub mod tests { db.add_root(base_slot); db.store_uncached(base_slot, &[(&key, &account)]); - assert!(db.get_snapshot_storages(before_slot, None).is_empty()); + assert!(db.get_snapshot_storages(before_slot, None).0.is_empty()); - assert_eq!(1, db.get_snapshot_storages(base_slot, None).len()); - assert_eq!(1, db.get_snapshot_storages(after_slot, None).len()); + assert_eq!(1, db.get_snapshot_storages(base_slot, None).0.len()); + assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len()); } #[test] @@ -8195,10 +8252,10 @@ pub mod tests { .unwrap() .clear(); db.add_root(base_slot); - assert!(db.get_snapshot_storages(after_slot, None).is_empty()); + assert!(db.get_snapshot_storages(after_slot, None).0.is_empty()); db.store_uncached(base_slot, &[(&key, &account)]); - assert_eq!(1, db.get_snapshot_storages(after_slot, None).len()); + assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len()); } #[test] @@ -8211,10 +8268,10 @@ pub mod tests { let after_slot = base_slot + 1; db.store_uncached(base_slot, &[(&key, &account)]); - assert!(db.get_snapshot_storages(after_slot, None).is_empty()); + assert!(db.get_snapshot_storages(after_slot, None).0.is_empty()); db.add_root(base_slot); - assert_eq!(1, db.get_snapshot_storages(after_slot, None).len()); + assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len()); } #[test] @@ -8228,7 +8285,7 @@ pub mod tests { db.store_uncached(base_slot, &[(&key, &account)]); db.add_root(base_slot); - assert_eq!(1, db.get_snapshot_storages(after_slot, None).len()); + assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len()); db.storage .get_slot_stores(0) @@ -8239,7 +8296,7 @@ pub mod tests { .next() .unwrap() .remove_account(0, true); - assert!(db.get_snapshot_storages(after_slot, None).is_empty()); + assert!(db.get_snapshot_storages(after_slot, None).0.is_empty()); } #[test] @@ -8403,7 +8460,7 @@ pub mod tests { accounts.store_uncached(current_slot, &[(&pubkey2, &zero_lamport_account)]); accounts.store_uncached(current_slot, &[(&pubkey3, &zero_lamport_account)]); - let snapshot_stores = accounts.get_snapshot_storages(current_slot, None); + let snapshot_stores = accounts.get_snapshot_storages(current_slot, None).0; let total_accounts: usize = snapshot_stores .iter() .flatten() diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index fd6013e1c3..f4f04b6863 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -444,7 +444,10 @@ impl BankRc { } pub fn get_snapshot_storages(&self, slot: Slot) -> SnapshotStorages { - self.accounts.accounts_db.get_snapshot_storages(slot, None) + self.accounts + .accounts_db + .get_snapshot_storages(slot, None) + .0 } } diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index 73a53e9351..8369ad3b86 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -28,7 +28,7 @@ fn copy_append_vecs>( accounts_db: &AccountsDb, output_dir: P, ) -> std::io::Result { - let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value(), None); + let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value(), None).0; let mut unpacked_append_vec_map = UnpackedAppendVecMap::new(); for storage in storage_entries.iter().flatten() { let storage_path = storage.get_path(); @@ -142,7 +142,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) { &mut writer, &*accounts.accounts_db, 0, - &accounts.accounts_db.get_snapshot_storages(0, None), + &accounts.accounts_db.get_snapshot_storages(0, None).0, ) .unwrap(); @@ -243,7 +243,7 @@ pub(crate) fn reconstruct_accounts_db_via_serialization( slot: Slot, ) -> AccountsDb { let mut writer = Cursor::new(vec![]); - let snapshot_storages = accounts.get_snapshot_storages(slot, None); + let snapshot_storages = accounts.get_snapshot_storages(slot, None).0; accountsdb_to_stream( SerdeStyle::Newer, &mut writer, @@ -301,7 +301,12 @@ mod test_bank_serialize { where S: serde::Serializer, { - let snapshot_storages = bank.rc.accounts.accounts_db.get_snapshot_storages(0, None); + let snapshot_storages = bank + .rc + .accounts + .accounts_db + .get_snapshot_storages(0, None) + .0; // ensure there is a single snapshot storage example for ABI digesting assert_eq!(snapshot_storages.len(), 1);