parallelize index_read in shrink (#19506)

This commit is contained in:
Jeff Washington (jwash)
2021-09-01 15:40:59 -05:00
committed by GitHub
parent dd9fdd7b68
commit dd9481c403

View File

@ -2266,36 +2266,64 @@ impl AccountsDb {
} }
let mut index_read_elapsed = Measure::start("index_read_elapsed"); let mut index_read_elapsed = Measure::start("index_read_elapsed");
let mut alive_total = 0; let alive_total_collect = AtomicUsize::new(0);
let mut alive_accounts: Vec<_> = Vec::with_capacity(stored_accounts.len()); let len = stored_accounts.len();
let mut unrefed_pubkeys = vec![]; let alive_accounts_collect = Mutex::new(Vec::with_capacity(len));
for (pubkey, stored_account) in &stored_accounts { let unrefed_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
let lookup = self.accounts_index.get_account_read_entry(pubkey); self.thread_pool.install(|| {
if let Some(locked_entry) = lookup { let chunk_size = 50; // # accounts/thread
let is_alive = locked_entry.slot_list().iter().any(|(_slot, i)| { let chunks = len / chunk_size + 1;
i.store_id == stored_account.store_id (0..chunks).into_par_iter().for_each(|chunk| {
&& i.offset == stored_account.account.offset let mut alive_accounts = Vec::with_capacity(chunk_size);
}); let mut unrefed_pubkeys = Vec::with_capacity(chunk_size);
if !is_alive { let mut alive_total = 0;
// This pubkey was found in the storage, but no longer exists in the index. let skip = chunk * chunk_size;
// It would have had a ref to the storage from the initial store, but it will stored_accounts.iter().skip(skip).take(chunk_size).for_each(
// not exist in the re-written slot. Unref it to keep the index consistent with |(pubkey, stored_account)| {
// rewriting the storage entries. let lookup = self.accounts_index.get_account_read_entry(pubkey);
unrefed_pubkeys.push(pubkey); if let Some(locked_entry) = lookup {
locked_entry.unref(); let is_alive = locked_entry.slot_list().iter().any(|(_slot, i)| {
self.shrink_stats i.store_id == stored_account.store_id
.dead_accounts && i.offset == stored_account.account.offset
.fetch_add(1, Ordering::Relaxed); });
} else { if !is_alive {
alive_accounts.push((pubkey, stored_account)); // This pubkey was found in the storage, but no longer exists in the index.
alive_total += stored_account.account_size; // It would have had a ref to the storage from the initial store, but it will
self.shrink_stats // not exist in the re-written slot. Unref it to keep the index consistent with
.alive_accounts // rewriting the storage entries.
.fetch_add(1, Ordering::Relaxed); unrefed_pubkeys.push(pubkey);
} locked_entry.unref();
} self.shrink_stats
} .dead_accounts
.fetch_add(1, Ordering::Relaxed);
} else {
alive_accounts.push((pubkey, stored_account));
alive_total += stored_account.account_size;
self.shrink_stats
.alive_accounts
.fetch_add(1, Ordering::Relaxed);
}
}
},
);
// collect
alive_accounts_collect
.lock()
.unwrap()
.append(&mut alive_accounts);
unrefed_pubkeys_collect
.lock()
.unwrap()
.append(&mut unrefed_pubkeys);
alive_total_collect.fetch_add(alive_total, Ordering::Relaxed);
});
});
let alive_accounts = alive_accounts_collect.into_inner().unwrap();
let unrefed_pubkeys = unrefed_pubkeys_collect.into_inner().unwrap();
let alive_total = alive_total_collect.load(Ordering::Relaxed);
index_read_elapsed.stop(); index_read_elapsed.stop();
let aligned_total: u64 = Self::page_align(alive_total as u64); let aligned_total: u64 = Self::page_align(alive_total as u64);