bin accounts index (#18111)

This commit is contained in:
Jeff Washington (jwash)
2021-06-28 15:03:57 -05:00
committed by GitHub
parent 7782a0855d
commit afd64d27c9
2 changed files with 107 additions and 80 deletions

View File

@@ -7,6 +7,7 @@ use crate::{
use bv::BitVec;
use log::*;
use ouroboros::self_referencing;
use rayon::prelude::*;
use solana_measure::measure::Measure;
use solana_sdk::{
clock::{BankId, Slot},
@@ -30,7 +31,7 @@ use std::{
use thiserror::Error;
pub const ITER_BATCH_SIZE: usize = 1000;
const BINS: usize = 16;
pub type ScanResult<T> = Result<T, ScanError>;
pub type SlotList<T> = Vec<(Slot, T)>;
pub type SlotSlice<'s, T> = &'s [(Slot, T)];
@@ -525,7 +526,7 @@ pub struct AccountsIndexRootsStats {
}
pub struct AccountsIndexIterator<'a, T> {
account_maps: &'a LockMapType<T>,
account_maps: &'a LockMapTypeSlice<T>,
start_bound: Bound<Pubkey>,
end_bound: Bound<Pubkey>,
is_finished: bool,
@@ -540,7 +541,7 @@ impl<'a, T> AccountsIndexIterator<'a, T> {
}
}
pub fn new<R>(account_maps: &'a LockMapType<T>, range: Option<R>) -> Self
pub fn new<R>(account_maps: &'a LockMapTypeSlice<T>, range: Option<R>) -> Self
where
R: RangeBounds<Pubkey>,
{
@@ -568,10 +569,15 @@ impl<'a, T: 'static + Clone> Iterator for AccountsIndexIterator<'a, T> {
let chunk: Vec<(Pubkey, AccountMapEntry<T>)> = self
.account_maps
.read()
.unwrap()
.range((self.start_bound, self.end_bound))
.map(|(pubkey, account_map_entry)| (*pubkey, account_map_entry.clone()))
.iter()
.map(|i| {
i.read()
.unwrap()
.range((self.start_bound, self.end_bound))
.map(|(pubkey, account_map_entry)| (*pubkey, account_map_entry.clone()))
.collect::<Vec<_>>()
})
.flatten()
.take(ITER_BATCH_SIZE)
.collect();
@@ -589,8 +595,14 @@ pub trait ZeroLamport {
fn is_zero_lamport(&self) -> bool;
}
fn get_bin_pubkey(pubkey: &Pubkey) -> usize {
let byte_of_pubkey_to_bin = 0; // TODO: this should not be 0. For now it needs to be due to requests for in-order pubkeys
(pubkey.as_ref()[byte_of_pubkey_to_bin] as usize) * BINS / ((u8::MAX as usize) + 1)
}
type MapType<T> = AccountMap<Pubkey, AccountMapEntry<T>>;
type LockMapType<T> = RwLock<MapType<T>>;
type LockMapType<T> = Vec<RwLock<MapType<T>>>;
type LockMapTypeSlice<T> = [RwLock<MapType<T>>];
type AccountMapsWriteLock<'a, T> = RwLockWriteGuard<'a, MapType<T>>;
type AccountMapsReadLock<'a, T> = RwLockReadGuard<'a, MapType<T>>;
@@ -634,7 +646,10 @@ pub struct AccountsIndex<T> {
impl<T> Default for AccountsIndex<T> {
fn default() -> Self {
Self {
account_maps: LockMapType::<T>::default(),
account_maps: (0..BINS)
.into_iter()
.map(|_| RwLock::new(AccountMap::<Pubkey, AccountMapEntry<T>>::default()))
.collect::<Vec<_>>(),
program_id_index: SecondaryIndex::<DashMapSecondaryIndexEntry>::new(
"program_id_index_stats",
),
@@ -651,7 +666,9 @@ impl<T> Default for AccountsIndex<T> {
}
}
impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
impl<T: 'static + Clone + IsCached + ZeroLamport + std::marker::Sync + std::marker::Send>
AccountsIndex<T>
{
fn iter<R>(&self, range: Option<R>) -> AccountsIndexIterator<T>
where
R: RangeBounds<Pubkey>,
@@ -976,7 +993,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
}
pub fn get_account_read_entry(&self, pubkey: &Pubkey) -> Option<ReadAccountMapEntry<T>> {
let lock = self.get_account_maps_read_lock();
let lock = self.get_account_maps_read_lock(pubkey);
self.get_account_read_entry_with_lock(pubkey, &lock)
}
@@ -991,7 +1008,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
}
fn get_account_write_entry(&self, pubkey: &Pubkey) -> Option<WriteAccountMapEntry<T>> {
self.account_maps
self.account_maps[get_bin_pubkey(pubkey)]
.read()
.unwrap()
.get(pubkey)
@@ -1012,7 +1029,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
self.insert_new_entry_if_missing_with_lock(*pubkey, w_account_maps, new_entry)
}
None => {
let mut w_account_maps = self.get_account_maps_write_lock();
let mut w_account_maps = self.get_account_maps_write_lock(pubkey);
self.insert_new_entry_if_missing_with_lock(*pubkey, &mut w_account_maps, new_entry)
}
}
@@ -1061,7 +1078,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
) {
if !dead_keys.is_empty() {
for key in dead_keys.iter() {
let mut w_index = self.get_account_maps_write_lock();
let mut w_index = self.get_account_maps_write_lock(key);
if let btree_map::Entry::Occupied(index_entry) = w_index.entry(**key) {
if index_entry.get().slot_list.read().unwrap().is_empty() {
index_entry.remove();
@@ -1250,7 +1267,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
ancestors: Option<&Ancestors>,
max_root: Option<Slot>,
) -> AccountIndexGetResult<'_, T> {
let read_lock = self.account_maps.read().unwrap();
let read_lock = self.account_maps[get_bin_pubkey(pubkey)].read().unwrap();
let account = read_lock
.get(pubkey)
.cloned()
@@ -1344,12 +1361,12 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
}
}
fn get_account_maps_write_lock(&self) -> AccountMapsWriteLock<T> {
self.account_maps.write().unwrap()
fn get_account_maps_write_lock(&self, pubkey: &Pubkey) -> AccountMapsWriteLock<T> {
self.account_maps[get_bin_pubkey(pubkey)].write().unwrap()
}
pub(crate) fn get_account_maps_read_lock(&self) -> AccountMapsReadLock<T> {
self.account_maps.read().unwrap()
pub(crate) fn get_account_maps_read_lock(&self, pubkey: &Pubkey) -> AccountMapsReadLock<T> {
self.account_maps[get_bin_pubkey(pubkey)].read().unwrap()
}
// Same functionally to upsert, but:
@@ -1365,37 +1382,47 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
item_len: usize,
items: impl Iterator<Item = (&'a Pubkey, T)>,
) -> (Vec<Pubkey>, u64) {
// returns (duplicate pubkey mask, insertion time us)
let potentially_new_items = items
.map(|(pubkey, account_info)| {
// this value is equivalent to what update() below would have created if we inserted a new item
(
*pubkey,
WriteAccountMapEntry::new_entry_after_update(slot, account_info),
)
})
.collect::<Vec<_>>(); // collect here so we have created all data prior to obtaining lock
let mut _reclaims = SlotList::new();
let mut duplicate_keys = Vec::with_capacity(item_len / 100); // just an estimate
let mut w_account_maps = self.get_account_maps_write_lock();
let mut insert_time = Measure::start("insert_into_primary_index");
potentially_new_items
let expected_items_per_bin = item_len * 2 / BINS; // big enough so not likely to re-allocate, small enough to not over-allocate
let mut binned = (0..BINS)
.into_iter()
.for_each(|(pubkey, new_item)| {
let already_exists = self.insert_new_entry_if_missing_with_lock(
pubkey,
&mut w_account_maps,
new_item,
);
if let Some((mut w_account_entry, account_info, pubkey)) = already_exists {
w_account_entry.update(slot, account_info, &mut _reclaims);
duplicate_keys.push(pubkey);
}
});
.map(|pubkey_bin| (pubkey_bin, Vec::with_capacity(expected_items_per_bin)))
.collect::<Vec<_>>();
items.for_each(|(pubkey, account_info)| {
let bin = get_bin_pubkey(pubkey);
// this value is equivalent to what update() below would have created if we inserted a new item
let info = WriteAccountMapEntry::new_entry_after_update(slot, account_info);
binned[bin].1.push((*pubkey, info));
});
binned.retain(|x| !x.1.is_empty());
insert_time.stop();
(duplicate_keys, insert_time.as_us())
let insertion_time = AtomicU64::new(0);
let duplicate_keys = binned
.into_par_iter()
.map(|(pubkey_bin, items)| {
let mut _reclaims = SlotList::new();
let mut w_account_maps = self.account_maps[pubkey_bin].write().unwrap();
let mut insert_time = Measure::start("insert_into_primary_index"); // really should be in each loop
let mut duplicate_keys = Vec::with_capacity(items.len());
items.into_iter().for_each(|(pubkey, new_item)| {
let already_exists = self.insert_new_entry_if_missing_with_lock(
pubkey,
&mut w_account_maps,
new_item,
);
if let Some((mut w_account_entry, account_info, pubkey)) = already_exists {
w_account_entry.update(slot, account_info, &mut _reclaims);
duplicate_keys.push(pubkey);
}
});
insert_time.stop();
insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed);
duplicate_keys
})
.flatten()
.collect::<Vec<_>>();
(duplicate_keys, insertion_time.load(Ordering::Relaxed))
}
// Updates the given pubkey at the given slot with the new account information.
@@ -1509,7 +1536,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
// locked and inserted the pubkey inbetween when `is_slot_list_empty=true` and the call to
// remove() below.
if is_slot_list_empty {
let mut w_maps = self.get_account_maps_write_lock();
let mut w_maps = self.get_account_maps_write_lock(pubkey);
if let Some(x) = w_maps.get(pubkey) {
if x.slot_list.read().unwrap().is_empty() {
w_maps.remove(pubkey);
@@ -2605,7 +2632,14 @@ pub mod tests {
}
fn test_new_entry_code_paths_helper<
T: 'static + Clone + IsCached + ZeroLamport + std::cmp::PartialEq + std::fmt::Debug,
T: 'static
+ Sync
+ Send
+ Clone
+ IsCached
+ ZeroLamport
+ std::cmp::PartialEq
+ std::fmt::Debug,
>(
account_infos: [T; 2],
is_cached: bool,
@@ -2671,7 +2705,7 @@ pub mod tests {
for lock in &[false, true] {
let read_lock = if *lock {
Some(index.get_account_maps_read_lock())
Some(index.get_account_maps_read_lock(&key))
} else {
None
};
@@ -2721,7 +2755,7 @@ pub mod tests {
let account_info = true;
let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, account_info);
let mut w_account_maps = index.get_account_maps_write_lock();
let mut w_account_maps = index.get_account_maps_write_lock(&key.pubkey());
let write = index.insert_new_entry_if_missing_with_lock(
key.pubkey(),
&mut w_account_maps,