parallel storage -> accounts folder (#17955) (#18004)

(cherry picked from commit 7de79425ce)

Co-authored-by: Jeff Washington (jwash) <75863576+jeffwashington@users.noreply.github.com>
This commit is contained in:
mergify[bot]
2021-06-16 15:51:00 +00:00
committed by GitHub
parent 67e1814581
commit bff7259111

View File

@ -17,6 +17,7 @@ use {
bincode, bincode,
bincode::{config::Options, Error}, bincode::{config::Options, Error},
log::*, log::*,
rayon::prelude::*,
serde::{de::DeserializeOwned, Deserialize, Serialize}, serde::{de::DeserializeOwned, Deserialize, Serialize},
solana_sdk::{ solana_sdk::{
clock::{Epoch, Slot, UnixTimestamp}, clock::{Epoch, Slot, UnixTimestamp},
@ -35,7 +36,6 @@ use {
path::PathBuf, path::PathBuf,
result::Result, result::Result,
sync::{atomic::Ordering, Arc, RwLock}, sync::{atomic::Ordering, Arc, RwLock},
time::Instant,
}, },
}; };
@ -76,7 +76,8 @@ trait TypeContext<'a> {
type SerializableAccountStorageEntry: Serialize type SerializableAccountStorageEntry: Serialize
+ DeserializeOwned + DeserializeOwned
+ From<&'a AccountStorageEntry> + From<&'a AccountStorageEntry>
+ SerializableStorage; + SerializableStorage
+ Sync;
fn serialize_bank_and_storage<S: serde::ser::Serializer>( fn serialize_bank_and_storage<S: serde::ser::Serializer>(
serializer: S, serializer: S,
@ -248,7 +249,7 @@ fn reconstruct_bank_from_fields<E>(
limit_load_slot_count_from_snapshot: Option<usize>, limit_load_slot_count_from_snapshot: Option<usize>,
) -> Result<Bank, Error> ) -> Result<Bank, Error>
where where
E: SerializableStorage, E: SerializableStorage + std::marker::Sync,
{ {
let mut accounts_db = reconstruct_accountsdb_from_fields( let mut accounts_db = reconstruct_accountsdb_from_fields(
accounts_db_fields, accounts_db_fields,
@ -286,7 +287,7 @@ fn reconstruct_accountsdb_from_fields<E>(
limit_load_slot_count_from_snapshot: Option<usize>, limit_load_slot_count_from_snapshot: Option<usize>,
) -> Result<AccountsDb, Error> ) -> Result<AccountsDb, Error>
where where
E: SerializableStorage, E: SerializableStorage + std::marker::Sync,
{ {
let mut accounts_db = AccountsDb::new_with_config( let mut accounts_db = AccountsDb::new_with_config(
account_paths.to_vec(), account_paths.to_vec(),
@ -302,23 +303,16 @@ where
.unwrap_or_else(|err| panic!("Failed to create directory {}: {}", path.display(), err)); .unwrap_or_else(|err| panic!("Failed to create directory {}: {}", path.display(), err));
} }
let mut last_log_update = Instant::now(); let storage = storage.into_iter().collect::<Vec<_>>();
let mut remaining_slots_to_process = storage.len();
// Remap the deserialized AppendVec paths to point to correct local paths // Remap the deserialized AppendVec paths to point to correct local paths
let mut storage = storage let mut storage = (0..storage.len())
.into_iter() .into_par_iter()
.map(|(slot, mut slot_storage)| { .map(|i| {
let now = Instant::now(); let (slot, slot_storage) = &storage[i];
if now.duration_since(last_log_update).as_secs() >= 10 {
info!("{} slots remaining...", remaining_slots_to_process);
last_log_update = now;
}
remaining_slots_to_process -= 1;
let mut new_slot_storage = HashMap::new(); let mut new_slot_storage = HashMap::new();
for storage_entry in slot_storage.drain(..) { for storage_entry in slot_storage {
let file_name = AppendVec::file_name(slot, storage_entry.id()); let file_name = AppendVec::file_name(*slot, storage_entry.id());
let append_vec_path = unpacked_append_vec_map.get(&file_name).ok_or_else(|| { let append_vec_path = unpacked_append_vec_map.get(&file_name).ok_or_else(|| {
io::Error::new( io::Error::new(
@ -330,7 +324,7 @@ where
let (accounts, num_accounts) = let (accounts, num_accounts) =
AppendVec::new_from_file(append_vec_path, storage_entry.current_len())?; AppendVec::new_from_file(append_vec_path, storage_entry.current_len())?;
let u_storage_entry = AccountStorageEntry::new_existing( let u_storage_entry = AccountStorageEntry::new_existing(
slot, *slot,
storage_entry.id(), storage_entry.id(),
accounts, accounts,
num_accounts, num_accounts,
@ -338,7 +332,7 @@ where
new_slot_storage.insert(storage_entry.id(), Arc::new(u_storage_entry)); new_slot_storage.insert(storage_entry.id(), Arc::new(u_storage_entry));
} }
Ok((slot, new_slot_storage)) Ok((*slot, new_slot_storage))
}) })
.collect::<Result<HashMap<Slot, _>, Error>>()?; .collect::<Result<HashMap<Slot, _>, Error>>()?;