Move slot cleanup to AccountsBackgroundService (#13911)

* Move bank drop to AccountsBackgroundService

* Send to ABS on drop instead, protects against other places banks are dropped

* Fix Abi

* test

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
carllin
2020-12-12 17:22:34 -08:00
committed by GitHub
parent 549a3107cb
commit 55fc963595
11 changed files with 297 additions and 67 deletions

View File

@ -3,21 +3,26 @@
// This can be expensive since we have to walk the append vecs being cleaned up.
use crate::{
bank::{Bank, BankSlotDelta},
bank::{Bank, BankSlotDelta, DropCallback},
bank_forks::{BankForks, SnapshotConfig},
snapshot_package::AccountsPackageSender,
snapshot_utils,
};
use crossbeam_channel::{Receiver, Sender};
use crossbeam_channel::{Receiver, SendError, Sender};
use log::*;
use rand::{thread_rng, Rng};
use solana_measure::measure::Measure;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
use solana_sdk::clock::Slot;
use std::{
boxed::Box,
fmt::{Debug, Formatter},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
const INTERVAL_MS: u64 = 100;
const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250;
@ -27,6 +32,37 @@ const CLEAN_INTERVAL_BLOCKS: u64 = 100;
pub type SnapshotRequestSender = Sender<SnapshotRequest>;
pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
pub type DroppedSlotsSender = Sender<Slot>;
pub type DroppedSlotsReceiver = Receiver<Slot>;
#[derive(Clone)]
pub struct SendDroppedBankCallback {
sender: DroppedSlotsSender,
}
impl DropCallback for SendDroppedBankCallback {
fn callback(&self, bank: &Bank) {
if let Err(e) = self.sender.send(bank.slot()) {
warn!("Error sending dropped banks: {:?}", e);
}
}
fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
Box::new(self.clone())
}
}
impl Debug for SendDroppedBankCallback {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "SendDroppedBankCallback({:p})", self)
}
}
impl SendDroppedBankCallback {
pub fn new(sender: DroppedSlotsSender) -> Self {
Self { sender }
}
}
pub struct SnapshotRequest {
pub snapshot_root_bank: Arc<Bank>,
@ -109,6 +145,60 @@ impl SnapshotRequestHandler {
}
}
#[derive(Default)]
pub struct ABSRequestSender {
snapshot_request_sender: Option<SnapshotRequestSender>,
}
impl ABSRequestSender {
pub fn new(snapshot_request_sender: Option<SnapshotRequestSender>) -> Self {
ABSRequestSender {
snapshot_request_sender,
}
}
pub fn is_snapshot_creation_enabled(&self) -> bool {
self.snapshot_request_sender.is_some()
}
pub fn send_snapshot_request(
&self,
snapshot_request: SnapshotRequest,
) -> Result<(), SendError<SnapshotRequest>> {
if let Some(ref snapshot_request_sender) = self.snapshot_request_sender {
snapshot_request_sender.send(snapshot_request)
} else {
Ok(())
}
}
}
pub struct ABSRequestHandler {
pub snapshot_request_handler: Option<SnapshotRequestHandler>,
pub pruned_banks_receiver: DroppedSlotsReceiver,
}
impl ABSRequestHandler {
// Returns the latest requested snapshot block height, if one exists
pub fn handle_snapshot_requests(&self) -> Option<u64> {
self.snapshot_request_handler
.as_ref()
.and_then(|snapshot_request_handler| {
snapshot_request_handler.handle_snapshot_requests()
})
}
pub fn handle_pruned_banks<'a>(&'a self, bank: &Bank) -> usize {
let mut count = 0;
for pruned_slot in self.pruned_banks_receiver.try_iter() {
count += 1;
bank.rc.accounts.purge_slot(pruned_slot);
}
count
}
}
pub struct AccountsBackgroundService {
t_background: JoinHandle<()>,
}
@ -117,12 +207,14 @@ impl AccountsBackgroundService {
pub fn new(
bank_forks: Arc<RwLock<BankForks>>,
exit: &Arc<AtomicBool>,
snapshot_request_handler: Option<SnapshotRequestHandler>,
request_handler: ABSRequestHandler,
) -> Self {
info!("AccountsBackgroundService active");
let exit = exit.clone();
let mut consumed_budget = 0;
let mut last_cleaned_block_height = 0;
let mut removed_slots_count = 0;
let mut total_remove_slots_time = 0;
let t_background = Builder::new()
.name("solana-accounts-background".to_string())
.spawn(move || loop {
@ -133,6 +225,14 @@ impl AccountsBackgroundService {
// Grab the current root bank
let bank = bank_forks.read().unwrap().root_bank().clone();
// Purge accounts of any dead slots
Self::remove_dead_slots(
&bank,
&request_handler,
&mut removed_slots_count,
&mut total_remove_slots_time,
);
// Check to see if there were any requests for snapshotting banks
// < the current root bank `bank` above.
@ -148,14 +248,9 @@ impl AccountsBackgroundService {
//
// However, this is impossible because BankForks.set_root() will always flush the snapshot
// request for `N` to the snapshot request channel before setting a root `R > N`, and
// snapshot_request_handler.handle_snapshot_requests() will always look for the latest
// snapshot_request_handler.handle_requests() will always look for the latest
// available snapshot in the channel.
let snapshot_block_height =
snapshot_request_handler
.as_ref()
.and_then(|snapshot_request_handler| {
snapshot_request_handler.handle_snapshot_requests()
});
let snapshot_block_height = request_handler.handle_snapshot_requests();
if let Some(snapshot_block_height) = snapshot_block_height {
// Safe, see proof above
@ -174,7 +269,6 @@ impl AccountsBackgroundService {
last_cleaned_block_height = bank.block_height();
}
}
sleep(Duration::from_millis(INTERVAL_MS));
})
.unwrap();
@ -184,4 +278,55 @@ impl AccountsBackgroundService {
pub fn join(self) -> thread::Result<()> {
self.t_background.join()
}
fn remove_dead_slots(
bank: &Bank,
request_handler: &ABSRequestHandler,
removed_slots_count: &mut usize,
total_remove_slots_time: &mut u64,
) {
let mut remove_slots_time = Measure::start("remove_slots_time");
*removed_slots_count += request_handler.handle_pruned_banks(&bank);
remove_slots_time.stop();
*total_remove_slots_time += remove_slots_time.as_us();
if *removed_slots_count >= 100 {
datapoint_info!(
"remove_slots_timing",
("remove_slots_time", *total_remove_slots_time, i64),
("removed_slots_count", *removed_slots_count, i64),
);
*total_remove_slots_time = 0;
*removed_slots_count = 0;
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::genesis_utils::create_genesis_config;
use crossbeam_channel::unbounded;
use solana_sdk::{account::Account, pubkey::Pubkey};
#[test]
fn test_accounts_background_service_remove_dead_slots() {
let genesis = create_genesis_config(10);
let bank0 = Arc::new(Bank::new(&genesis.genesis_config));
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
let request_handler = ABSRequestHandler {
snapshot_request_handler: None,
pruned_banks_receiver,
};
// Store an account in slot 0
let account_key = Pubkey::new_unique();
bank0.store_account(&account_key, &Account::new(264, 0, &Pubkey::default()));
assert!(bank0.get_account(&account_key).is_some());
pruned_banks_sender.send(0).unwrap();
AccountsBackgroundService::remove_dead_slots(&bank0, &request_handler, &mut 0, &mut 0);
// Slot should be removed
assert!(bank0.get_account(&account_key).is_none());
}
}

View File

@ -699,6 +699,11 @@ impl fmt::Display for RewardType {
}
}
pub trait DropCallback: fmt::Debug {
fn callback(&self, b: &Bank);
fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync>;
}
#[derive(Debug, PartialEq, Serialize, Deserialize, AbiExample, Clone, Copy)]
pub struct RewardInfo {
pub reward_type: RewardType,
@ -706,6 +711,16 @@ pub struct RewardInfo {
pub post_balance: u64, // Account balance in lamports after `lamports` was applied
}
#[derive(Debug, Default)]
pub struct OptionalDropCallback(Option<Box<dyn DropCallback + Send + Sync>>);
#[cfg(RUSTC_WITH_SPECIALIZATION)]
impl AbiExample for OptionalDropCallback {
fn example() -> Self {
Self(None)
}
}
/// Manager for the state of all accounts and programs after processing its entries.
/// AbiExample is needed even without Serialize/Deserialize; actual (de-)serialization
/// are implemented elsewhere for versioning
@ -849,6 +864,8 @@ pub struct Bank {
pub transaction_log_collector: Arc<RwLock<TransactionLogCollector>>,
pub feature_set: Arc<FeatureSet>,
pub drop_callback: RwLock<OptionalDropCallback>,
}
impl Default for BlockhashQueue {
@ -995,6 +1012,15 @@ impl Bank {
transaction_log_collector_config: parent.transaction_log_collector_config.clone(),
transaction_log_collector: Arc::new(RwLock::new(TransactionLogCollector::default())),
feature_set: parent.feature_set.clone(),
drop_callback: RwLock::new(OptionalDropCallback(
parent
.drop_callback
.read()
.unwrap()
.0
.as_ref()
.map(|drop_callback| drop_callback.clone_box()),
)),
};
datapoint_info!(
@ -1035,6 +1061,10 @@ impl Bank {
new
}
pub fn set_callback(&self, callback: Option<Box<dyn DropCallback + Send + Sync>>) {
*self.drop_callback.write().unwrap() = OptionalDropCallback(callback);
}
/// Like `new_from_parent` but additionally:
/// * Doesn't assume that the parent is anywhere near `slot`, parent could be millions of slots
/// in the past
@ -1113,6 +1143,7 @@ impl Bank {
transaction_log_collector_config: new(),
transaction_log_collector: new(),
feature_set: new(),
drop_callback: RwLock::new(OptionalDropCallback(None)),
};
bank.finish_init(genesis_config, additional_builtins);
@ -4615,9 +4646,16 @@ impl Bank {
impl Drop for Bank {
fn drop(&mut self) {
// For root slots this is a noop
if !self.skip_drop.load(Relaxed) {
self.rc.accounts.purge_slot(self.slot());
if let Some(drop_callback) = self.drop_callback.read().unwrap().0.as_ref() {
drop_callback.callback(self);
} else {
// Default case
// 1. Tests
// 2. At startup when replaying blockstore and there's no
// AccountsBackgroundService to perform cleanups yet.
self.rc.accounts.purge_slot(self.slot());
}
}
}
}

View File

@ -1,7 +1,7 @@
//! The `bank_forks` module implements BankForks a DAG of checkpointed Banks
use crate::{
accounts_background_service::{SnapshotRequest, SnapshotRequestSender},
accounts_background_service::{ABSRequestSender, SnapshotRequest},
bank::Bank,
};
use log::*;
@ -170,7 +170,7 @@ impl BankForks {
pub fn set_root(
&mut self,
root: Slot,
snapshot_request_sender: &Option<SnapshotRequestSender>,
accounts_background_request_sender: &ABSRequestSender,
highest_confirmed_root: Option<Slot>,
) {
let old_epoch = self.root_bank().epoch();
@ -216,20 +216,19 @@ impl BankForks {
bank.squash();
is_root_bank_squashed = bank_slot == root;
if self.snapshot_config.is_some() && snapshot_request_sender.is_some() {
if self.snapshot_config.is_some()
&& accounts_background_request_sender.is_snapshot_creation_enabled()
{
let snapshot_root_bank = self.root_bank().clone();
let root_slot = snapshot_root_bank.slot();
if let Err(e) =
snapshot_request_sender
.as_ref()
.unwrap()
.send(SnapshotRequest {
snapshot_root_bank,
// Save off the status cache because these may get pruned
// if another `set_root()` is called before the snapshots package
// can be generated
status_cache_slot_deltas: bank.src.slot_deltas(&bank.src.roots()),
})
accounts_background_request_sender.send_snapshot_request(SnapshotRequest {
snapshot_root_bank,
// Save off the status cache because these may get pruned
// if another `set_root()` is called before the snapshots package
// can be generated
status_cache_slot_deltas: bank.src.slot_deltas(&bank.src.roots()),
})
{
warn!(
"Error sending snapshot request for bank: {}, err: {:?}",
@ -244,7 +243,6 @@ impl BankForks {
root_bank.squash();
}
let new_tx_count = root_bank.transaction_count();
self.prune_non_root(root, highest_confirmed_root);
inc_new_counter_info!(
@ -405,7 +403,7 @@ mod tests {
let bank0 = Bank::new(&genesis_config);
let mut bank_forks0 = BankForks::new(bank0);
bank_forks0.set_root(0, &None, None);
bank_forks0.set_root(0, &ABSRequestSender::default(), None);
let bank1 = Bank::new(&genesis_config);
let mut bank_forks1 = BankForks::new(bank1);
@ -438,7 +436,7 @@ mod tests {
// Set root in bank_forks0 to truncate the ancestor history
bank_forks0.insert(child1);
bank_forks0.set_root(slot, &None, None);
bank_forks0.set_root(slot, &ABSRequestSender::default(), None);
// Don't set root in bank_forks1 to keep the ancestor history
bank_forks1.insert(child2);