From 7de79425ce74b5f736098b30778a61639bfd7183 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Wed, 16 Jun 2021 08:48:24 -0500 Subject: [PATCH] parallel storage -> accounts folder (#17955) --- runtime/src/serde_snapshot.rs | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 776f054c0e..756730efba 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -19,6 +19,7 @@ use { bincode, bincode::{config::Options, Error}, log::*, + rayon::prelude::*, serde::{de::DeserializeOwned, Deserialize, Serialize}, solana_sdk::{ clock::{Epoch, Slot, UnixTimestamp}, @@ -37,7 +38,6 @@ use { path::PathBuf, result::Result, sync::{atomic::Ordering, Arc, RwLock}, - time::Instant, }, }; @@ -78,7 +78,8 @@ trait TypeContext<'a> { type SerializableAccountStorageEntry: Serialize + DeserializeOwned + From<&'a AccountStorageEntry> - + SerializableStorage; + + SerializableStorage + + Sync; fn serialize_bank_and_storage( serializer: S, @@ -253,7 +254,7 @@ fn reconstruct_bank_from_fields( shrink_ratio: AccountShrinkThreshold, ) -> Result where - E: SerializableStorage, + E: SerializableStorage + std::marker::Sync, { let mut accounts_db = reconstruct_accountsdb_from_fields( accounts_db_fields, @@ -297,7 +298,7 @@ fn reconstruct_accountsdb_from_fields( shrink_ratio: AccountShrinkThreshold, ) -> Result where - E: SerializableStorage, + E: SerializableStorage + std::marker::Sync, { let mut accounts_db = AccountsDb::new_with_config( account_paths.to_vec(), @@ -314,23 +315,16 @@ where .unwrap_or_else(|err| panic!("Failed to create directory {}: {}", path.display(), err)); } - let mut last_log_update = Instant::now(); - let mut remaining_slots_to_process = storage.len(); + let storage = storage.into_iter().collect::>(); // Remap the deserialized AppendVec paths to point to correct local paths - let mut storage = storage - .into_iter() - .map(|(slot, mut slot_storage)| { - let now = Instant::now(); - if now.duration_since(last_log_update).as_secs() >= 10 { - info!("{} slots remaining...", remaining_slots_to_process); - last_log_update = now; - } - remaining_slots_to_process -= 1; - + let mut storage = (0..storage.len()) + .into_par_iter() + .map(|i| { + let (slot, slot_storage) = &storage[i]; let mut new_slot_storage = HashMap::new(); - for storage_entry in slot_storage.drain(..) { - let file_name = AppendVec::file_name(slot, storage_entry.id()); + for storage_entry in slot_storage { + let file_name = AppendVec::file_name(*slot, storage_entry.id()); let append_vec_path = unpacked_append_vec_map.get(&file_name).ok_or_else(|| { io::Error::new( @@ -342,7 +336,7 @@ where let (accounts, num_accounts) = AppendVec::new_from_file(append_vec_path, storage_entry.current_len())?; let u_storage_entry = AccountStorageEntry::new_existing( - slot, + *slot, storage_entry.id(), accounts, num_accounts, @@ -350,7 +344,7 @@ where new_slot_storage.insert(storage_entry.id(), Arc::new(u_storage_entry)); } - Ok((slot, new_slot_storage)) + Ok((*slot, new_slot_storage)) }) .collect::, Error>>()?;