parallel get_snapshot_storages (#17589)

This commit is contained in:
Jeff Washington (jwash)
2021-06-01 19:33:12 -05:00
committed by GitHub
parent 1b7f8777d6
commit 738cc9549f
3 changed files with 100 additions and 35 deletions

View File

@ -4251,7 +4251,7 @@ impl AccountsDb {
) -> Result<(Hash, u64), BankHashVerificationError> { ) -> Result<(Hash, u64), BankHashVerificationError> {
if !use_index { if !use_index {
let mut time = Measure::start("collect"); let mut time = Measure::start("collect");
let combined_maps = self.get_snapshot_storages(slot, Some(ancestors)); let combined_maps = self.get_snapshot_storages(slot, Some(ancestors)).0;
time.stop(); time.stop();
let timings = HashStats { let timings = HashStats {
@ -5148,26 +5148,74 @@ impl AccountsDb {
&self, &self,
snapshot_slot: Slot, snapshot_slot: Slot,
_ancestors: Option<&Ancestors>, _ancestors: Option<&Ancestors>,
) -> SnapshotStorages { ) -> (SnapshotStorages, Vec<Slot>) {
self.storage let mut m = Measure::start("get slots");
let slots = self
.storage
.0 .0
.iter() .iter()
.filter(|iter_item| { .map(|k| *k.key() as Slot)
let slot = *iter_item.key(); .collect::<Vec<_>>();
slot <= snapshot_slot && self.accounts_index.is_root(slot) m.stop();
}) let mut m2 = Measure::start("filter");
.map(|iter_item| {
iter_item let chunk_size = 5_000;
let wide = self.thread_pool_clean.install(|| {
slots
.par_chunks(chunk_size)
.map(|slots| {
slots
.iter()
.filter_map(|slot| {
if *slot <= snapshot_slot && self.accounts_index.is_root(*slot) {
self.storage.0.get(&slot).map_or_else(
|| None,
|item| {
let storages = item
.value() .value()
.read() .read()
.unwrap() .unwrap()
.values() .values()
.filter(|x| x.has_accounts()) .filter(|x| x.has_accounts())
.cloned() .cloned()
.collect() .collect::<Vec<_>>();
if !storages.is_empty() {
Some((storages, *slot))
} else {
None
}
},
)
} else {
None
}
}) })
.filter(|snapshot_storage: &SnapshotStorage| !snapshot_storage.is_empty()) .collect::<Vec<(SnapshotStorage, Slot)>>()
.collect() })
.collect::<Vec<_>>()
});
m2.stop();
let mut m3 = Measure::start("flatten");
// some slots we found above may not have been a root or met the slot # constraint.
// So the resulting 'slots' vector we return will be a subset of the raw keys we got initially.
let mut slots = Vec::with_capacity(slots.len());
let result = wide
.into_iter()
.flatten()
.map(|(storage, slot)| {
slots.push(slot);
storage
})
.collect::<Vec<_>>();
m3.stop();
debug!(
"hash_total: get slots: {}, filter: {}, flatten: {}",
m.as_us(),
m2.as_us(),
m3.as_us()
);
(result, slots)
} }
#[allow(clippy::needless_collect)] #[allow(clippy::needless_collect)]
@ -5747,7 +5795,16 @@ pub mod tests {
accounts.store_uncached(slot, &to_store[..]); accounts.store_uncached(slot, &to_store[..]);
accounts.add_root(slot); accounts.add_root(slot);
let storages = accounts.get_snapshot_storages(slot, None); let (storages, slots) = accounts.get_snapshot_storages(slot, None);
assert_eq!(storages.len(), slots.len());
storages
.iter()
.zip(slots.iter())
.for_each(|(storages, slot)| {
for storage in storages {
assert_eq!(&storage.slot(), slot);
}
});
(storages, raw_expected) (storages, raw_expected)
} }
@ -8157,7 +8214,7 @@ pub mod tests {
#[test] #[test]
fn test_get_snapshot_storages_empty() { fn test_get_snapshot_storages_empty() {
let db = AccountsDb::new(Vec::new(), &ClusterType::Development); let db = AccountsDb::new(Vec::new(), &ClusterType::Development);
assert!(db.get_snapshot_storages(0, None).is_empty()); assert!(db.get_snapshot_storages(0, None).0.is_empty());
} }
#[test] #[test]
@ -8172,10 +8229,10 @@ pub mod tests {
db.add_root(base_slot); db.add_root(base_slot);
db.store_uncached(base_slot, &[(&key, &account)]); db.store_uncached(base_slot, &[(&key, &account)]);
assert!(db.get_snapshot_storages(before_slot, None).is_empty()); assert!(db.get_snapshot_storages(before_slot, None).0.is_empty());
assert_eq!(1, db.get_snapshot_storages(base_slot, None).len()); assert_eq!(1, db.get_snapshot_storages(base_slot, None).0.len());
assert_eq!(1, db.get_snapshot_storages(after_slot, None).len()); assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len());
} }
#[test] #[test]
@ -8195,10 +8252,10 @@ pub mod tests {
.unwrap() .unwrap()
.clear(); .clear();
db.add_root(base_slot); db.add_root(base_slot);
assert!(db.get_snapshot_storages(after_slot, None).is_empty()); assert!(db.get_snapshot_storages(after_slot, None).0.is_empty());
db.store_uncached(base_slot, &[(&key, &account)]); db.store_uncached(base_slot, &[(&key, &account)]);
assert_eq!(1, db.get_snapshot_storages(after_slot, None).len()); assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len());
} }
#[test] #[test]
@ -8211,10 +8268,10 @@ pub mod tests {
let after_slot = base_slot + 1; let after_slot = base_slot + 1;
db.store_uncached(base_slot, &[(&key, &account)]); db.store_uncached(base_slot, &[(&key, &account)]);
assert!(db.get_snapshot_storages(after_slot, None).is_empty()); assert!(db.get_snapshot_storages(after_slot, None).0.is_empty());
db.add_root(base_slot); db.add_root(base_slot);
assert_eq!(1, db.get_snapshot_storages(after_slot, None).len()); assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len());
} }
#[test] #[test]
@ -8228,7 +8285,7 @@ pub mod tests {
db.store_uncached(base_slot, &[(&key, &account)]); db.store_uncached(base_slot, &[(&key, &account)]);
db.add_root(base_slot); db.add_root(base_slot);
assert_eq!(1, db.get_snapshot_storages(after_slot, None).len()); assert_eq!(1, db.get_snapshot_storages(after_slot, None).0.len());
db.storage db.storage
.get_slot_stores(0) .get_slot_stores(0)
@ -8239,7 +8296,7 @@ pub mod tests {
.next() .next()
.unwrap() .unwrap()
.remove_account(0, true); .remove_account(0, true);
assert!(db.get_snapshot_storages(after_slot, None).is_empty()); assert!(db.get_snapshot_storages(after_slot, None).0.is_empty());
} }
#[test] #[test]
@ -8403,7 +8460,7 @@ pub mod tests {
accounts.store_uncached(current_slot, &[(&pubkey2, &zero_lamport_account)]); accounts.store_uncached(current_slot, &[(&pubkey2, &zero_lamport_account)]);
accounts.store_uncached(current_slot, &[(&pubkey3, &zero_lamport_account)]); accounts.store_uncached(current_slot, &[(&pubkey3, &zero_lamport_account)]);
let snapshot_stores = accounts.get_snapshot_storages(current_slot, None); let snapshot_stores = accounts.get_snapshot_storages(current_slot, None).0;
let total_accounts: usize = snapshot_stores let total_accounts: usize = snapshot_stores
.iter() .iter()
.flatten() .flatten()

View File

@ -444,7 +444,10 @@ impl BankRc {
} }
pub fn get_snapshot_storages(&self, slot: Slot) -> SnapshotStorages { pub fn get_snapshot_storages(&self, slot: Slot) -> SnapshotStorages {
self.accounts.accounts_db.get_snapshot_storages(slot, None) self.accounts
.accounts_db
.get_snapshot_storages(slot, None)
.0
} }
} }

View File

@ -28,7 +28,7 @@ fn copy_append_vecs<P: AsRef<Path>>(
accounts_db: &AccountsDb, accounts_db: &AccountsDb,
output_dir: P, output_dir: P,
) -> std::io::Result<UnpackedAppendVecMap> { ) -> std::io::Result<UnpackedAppendVecMap> {
let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value(), None); let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value(), None).0;
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new(); let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
for storage in storage_entries.iter().flatten() { for storage in storage_entries.iter().flatten() {
let storage_path = storage.get_path(); let storage_path = storage.get_path();
@ -142,7 +142,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) {
&mut writer, &mut writer,
&*accounts.accounts_db, &*accounts.accounts_db,
0, 0,
&accounts.accounts_db.get_snapshot_storages(0, None), &accounts.accounts_db.get_snapshot_storages(0, None).0,
) )
.unwrap(); .unwrap();
@ -243,7 +243,7 @@ pub(crate) fn reconstruct_accounts_db_via_serialization(
slot: Slot, slot: Slot,
) -> AccountsDb { ) -> AccountsDb {
let mut writer = Cursor::new(vec![]); let mut writer = Cursor::new(vec![]);
let snapshot_storages = accounts.get_snapshot_storages(slot, None); let snapshot_storages = accounts.get_snapshot_storages(slot, None).0;
accountsdb_to_stream( accountsdb_to_stream(
SerdeStyle::Newer, SerdeStyle::Newer,
&mut writer, &mut writer,
@ -301,7 +301,12 @@ mod test_bank_serialize {
where where
S: serde::Serializer, S: serde::Serializer,
{ {
let snapshot_storages = bank.rc.accounts.accounts_db.get_snapshot_storages(0, None); let snapshot_storages = bank
.rc
.accounts
.accounts_db
.get_snapshot_storages(0, None)
.0;
// ensure there is a single snapshot storage example for ABI digesting // ensure there is a single snapshot storage example for ABI digesting
assert_eq!(snapshot_storages.len(), 1); assert_eq!(snapshot_storages.len(), 1);