diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 33ca72178e..1385f104c4 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -17,6 +17,7 @@ use { bincode, bincode::{config::Options, Error}, log::*, + rayon::prelude::*, serde::{de::DeserializeOwned, Deserialize, Serialize}, solana_sdk::{ clock::{Epoch, Slot, UnixTimestamp}, @@ -35,7 +36,6 @@ use { path::PathBuf, result::Result, sync::{atomic::Ordering, Arc, RwLock}, - time::Instant, }, }; @@ -76,7 +76,8 @@ trait TypeContext<'a> { type SerializableAccountStorageEntry: Serialize + DeserializeOwned + From<&'a AccountStorageEntry> - + SerializableStorage; + + SerializableStorage + + Sync; fn serialize_bank_and_storage( serializer: S, @@ -248,7 +249,7 @@ fn reconstruct_bank_from_fields( limit_load_slot_count_from_snapshot: Option, ) -> Result where - E: SerializableStorage, + E: SerializableStorage + std::marker::Sync, { let mut accounts_db = reconstruct_accountsdb_from_fields( accounts_db_fields, @@ -286,7 +287,7 @@ fn reconstruct_accountsdb_from_fields( limit_load_slot_count_from_snapshot: Option, ) -> Result where - E: SerializableStorage, + E: SerializableStorage + std::marker::Sync, { let mut accounts_db = AccountsDb::new_with_config( account_paths.to_vec(), @@ -302,23 +303,16 @@ where .unwrap_or_else(|err| panic!("Failed to create directory {}: {}", path.display(), err)); } - let mut last_log_update = Instant::now(); - let mut remaining_slots_to_process = storage.len(); + let storage = storage.into_iter().collect::>(); // Remap the deserialized AppendVec paths to point to correct local paths - let mut storage = storage - .into_iter() - .map(|(slot, mut slot_storage)| { - let now = Instant::now(); - if now.duration_since(last_log_update).as_secs() >= 10 { - info!("{} slots remaining...", remaining_slots_to_process); - last_log_update = now; - } - remaining_slots_to_process -= 1; - + let mut storage = (0..storage.len()) + .into_par_iter() + .map(|i| { + let (slot, slot_storage) = &storage[i]; let mut new_slot_storage = HashMap::new(); - for storage_entry in slot_storage.drain(..) { - let file_name = AppendVec::file_name(slot, storage_entry.id()); + for storage_entry in slot_storage { + let file_name = AppendVec::file_name(*slot, storage_entry.id()); let append_vec_path = unpacked_append_vec_map.get(&file_name).ok_or_else(|| { io::Error::new( @@ -330,7 +324,7 @@ where let (accounts, num_accounts) = AppendVec::new_from_file(append_vec_path, storage_entry.current_len())?; let u_storage_entry = AccountStorageEntry::new_existing( - slot, + *slot, storage_entry.id(), accounts, num_accounts, @@ -338,7 +332,7 @@ where new_slot_storage.insert(storage_entry.id(), Arc::new(u_storage_entry)); } - Ok((slot, new_slot_storage)) + Ok((*slot, new_slot_storage)) }) .collect::, Error>>()?;