Hold read lock during startup shrink (#17309)
* hold read lock during account scan of shrink * rename and improve rusty
This commit is contained in:
committed by
GitHub
parent
96cde36784
commit
3f3324231d
@ -1911,7 +1911,7 @@ impl AccountsDb {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I)
|
fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I, is_startup: bool)
|
||||||
where
|
where
|
||||||
I: Iterator<Item = &'a Arc<AccountStorageEntry>>,
|
I: Iterator<Item = &'a Arc<AccountStorageEntry>>,
|
||||||
{
|
{
|
||||||
@ -1950,31 +1950,47 @@ impl AccountsDb {
|
|||||||
|
|
||||||
let mut index_read_elapsed = Measure::start("index_read_elapsed");
|
let mut index_read_elapsed = Measure::start("index_read_elapsed");
|
||||||
let mut alive_total = 0;
|
let mut alive_total = 0;
|
||||||
let alive_accounts: Vec<_> = {
|
let accounts_index_map_lock = if is_startup {
|
||||||
stored_accounts
|
// at startup, there is nobody else to contend with the accounts_index read lock, so it is more efficient for us to keep it held
|
||||||
.iter()
|
Some(self.accounts_index.get_account_maps_read_lock())
|
||||||
.filter(|(pubkey, stored_account)| {
|
} else {
|
||||||
if let Some(locked_entry) = self.accounts_index.get_account_read_entry(pubkey) {
|
None
|
||||||
let is_alive = locked_entry.slot_list().iter().any(|(_slot, i)| {
|
|
||||||
i.store_id == stored_account.store_id
|
|
||||||
&& i.offset == stored_account.account.offset
|
|
||||||
});
|
|
||||||
if !is_alive {
|
|
||||||
// This pubkey was found in the storage, but no longer exists in the index.
|
|
||||||
// It would have had a ref to the storage from the initial store, but it will
|
|
||||||
// not exist in the re-written slot. Unref it to keep the index consistent with
|
|
||||||
// rewriting the storage entries.
|
|
||||||
locked_entry.unref()
|
|
||||||
} else {
|
|
||||||
alive_total += stored_account.account_size as u64;
|
|
||||||
}
|
|
||||||
is_alive
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
};
|
};
|
||||||
|
let accounts_index_map_lock_ref = accounts_index_map_lock.as_ref();
|
||||||
|
|
||||||
|
let alive_accounts: Vec<_> = stored_accounts
|
||||||
|
.iter()
|
||||||
|
.filter(|(pubkey, stored_account)| {
|
||||||
|
let lookup = if is_startup {
|
||||||
|
self.accounts_index.get_account_read_entry_with_lock(
|
||||||
|
pubkey,
|
||||||
|
accounts_index_map_lock_ref.unwrap(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
self.accounts_index.get_account_read_entry(pubkey)
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(locked_entry) = lookup {
|
||||||
|
let is_alive = locked_entry.slot_list().iter().any(|(_slot, i)| {
|
||||||
|
i.store_id == stored_account.store_id
|
||||||
|
&& i.offset == stored_account.account.offset
|
||||||
|
});
|
||||||
|
if !is_alive {
|
||||||
|
// This pubkey was found in the storage, but no longer exists in the index.
|
||||||
|
// It would have had a ref to the storage from the initial store, but it will
|
||||||
|
// not exist in the re-written slot. Unref it to keep the index consistent with
|
||||||
|
// rewriting the storage entries.
|
||||||
|
locked_entry.unref()
|
||||||
|
} else {
|
||||||
|
alive_total += stored_account.account_size as u64;
|
||||||
|
}
|
||||||
|
is_alive
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
drop(accounts_index_map_lock);
|
||||||
index_read_elapsed.stop();
|
index_read_elapsed.stop();
|
||||||
let aligned_total: u64 = self.page_align(alive_total);
|
let aligned_total: u64 = self.page_align(alive_total);
|
||||||
|
|
||||||
@ -2136,7 +2152,7 @@ impl AccountsDb {
|
|||||||
|
|
||||||
// Reads all accounts in given slot's AppendVecs and filter only to alive,
|
// Reads all accounts in given slot's AppendVecs and filter only to alive,
|
||||||
// then create a minimum AppendVec filled with the alive.
|
// then create a minimum AppendVec filled with the alive.
|
||||||
fn shrink_slot_forced(&self, slot: Slot) -> usize {
|
fn shrink_slot_forced(&self, slot: Slot, is_startup: bool) -> usize {
|
||||||
debug!("shrink_slot_forced: slot: {}", slot);
|
debug!("shrink_slot_forced: slot: {}", slot);
|
||||||
|
|
||||||
if let Some(stores_lock) = self.storage.get_slot_stores(slot) {
|
if let Some(stores_lock) = self.storage.get_slot_stores(slot) {
|
||||||
@ -2157,7 +2173,7 @@ impl AccountsDb {
|
|||||||
);
|
);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
self.do_shrink_slot_stores(slot, stores.iter());
|
self.do_shrink_slot_stores(slot, stores.iter(), is_startup);
|
||||||
alive_count
|
alive_count
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
@ -2177,7 +2193,7 @@ impl AccountsDb {
|
|||||||
let num_candidates = shrink_slots.len();
|
let num_candidates = shrink_slots.len();
|
||||||
for (slot, slot_shrink_candidates) in shrink_slots {
|
for (slot, slot_shrink_candidates) in shrink_slots {
|
||||||
let mut measure = Measure::start("shrink_candidate_slots-ms");
|
let mut measure = Measure::start("shrink_candidate_slots-ms");
|
||||||
self.do_shrink_slot_stores(slot, slot_shrink_candidates.values());
|
self.do_shrink_slot_stores(slot, slot_shrink_candidates.values(), false);
|
||||||
measure.stop();
|
measure.stop();
|
||||||
inc_new_counter_info!("shrink_candidate_slots-ms", measure.as_ms() as usize);
|
inc_new_counter_info!("shrink_candidate_slots-ms", measure.as_ms() as usize);
|
||||||
}
|
}
|
||||||
@ -2190,13 +2206,13 @@ impl AccountsDb {
|
|||||||
let chunk_size = std::cmp::max(slots.len() / 8, 1); // approximately 400k slots in a snapshot
|
let chunk_size = std::cmp::max(slots.len() / 8, 1); // approximately 400k slots in a snapshot
|
||||||
slots.par_chunks(chunk_size).for_each(|slots| {
|
slots.par_chunks(chunk_size).for_each(|slots| {
|
||||||
for slot in slots {
|
for slot in slots {
|
||||||
self.shrink_slot_forced(*slot);
|
self.shrink_slot_forced(*slot, is_startup);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
for slot in self.all_slots_in_storage() {
|
for slot in self.all_slots_in_storage() {
|
||||||
if self.caching_enabled {
|
if self.caching_enabled {
|
||||||
self.shrink_slot_forced(slot);
|
self.shrink_slot_forced(slot, false);
|
||||||
} else {
|
} else {
|
||||||
self.do_shrink_slot_forced_v1(slot);
|
self.do_shrink_slot_forced_v1(slot);
|
||||||
}
|
}
|
||||||
|
@ -838,10 +838,16 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_account_read_entry(&self, pubkey: &Pubkey) -> Option<ReadAccountMapEntry<T>> {
|
pub fn get_account_read_entry(&self, pubkey: &Pubkey) -> Option<ReadAccountMapEntry<T>> {
|
||||||
self.account_maps
|
let lock = self.get_account_maps_read_lock();
|
||||||
.read()
|
self.get_account_read_entry_with_lock(pubkey, &lock)
|
||||||
.unwrap()
|
}
|
||||||
.get(pubkey)
|
|
||||||
|
pub fn get_account_read_entry_with_lock(
|
||||||
|
&self,
|
||||||
|
pubkey: &Pubkey,
|
||||||
|
lock: &AccountMapsReadLock<'_, T>,
|
||||||
|
) -> Option<ReadAccountMapEntry<T>> {
|
||||||
|
lock.get(pubkey)
|
||||||
.cloned()
|
.cloned()
|
||||||
.map(ReadAccountMapEntry::from_account_map_entry)
|
.map(ReadAccountMapEntry::from_account_map_entry)
|
||||||
}
|
}
|
||||||
@ -1184,6 +1190,10 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
|
|||||||
self.account_maps.write().unwrap()
|
self.account_maps.write().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_account_maps_read_lock(&self) -> AccountMapsReadLock<T> {
|
||||||
|
self.account_maps.read().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
// Same functionally to upsert, but:
|
// Same functionally to upsert, but:
|
||||||
// 1. operates on a batch of items
|
// 1. operates on a batch of items
|
||||||
// 2. holds the write lock for the duration of adding the items
|
// 2. holds the write lock for the duration of adding the items
|
||||||
@ -2411,8 +2421,21 @@ pub mod tests {
|
|||||||
}
|
}
|
||||||
assert!(gc.is_empty());
|
assert!(gc.is_empty());
|
||||||
|
|
||||||
{
|
for lock in &[false, true] {
|
||||||
let entry = index.get_account_read_entry(&key).unwrap();
|
let read_lock = if *lock {
|
||||||
|
Some(index.get_account_maps_read_lock())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let entry = if *lock {
|
||||||
|
index
|
||||||
|
.get_account_read_entry_with_lock(&key, read_lock.as_ref().unwrap())
|
||||||
|
.unwrap()
|
||||||
|
} else {
|
||||||
|
index.get_account_read_entry(&key).unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
entry.ref_count().load(Ordering::Relaxed),
|
entry.ref_count().load(Ordering::Relaxed),
|
||||||
if is_cached { 0 } else { 2 }
|
if is_cached { 0 } else { 2 }
|
||||||
|
Reference in New Issue
Block a user