Fix rooted accounts cleanup, simplify locking (#12194)

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
carllin
2020-09-28 16:04:46 -07:00
committed by GitHub
parent 35208c5ee7
commit 06f84c65f1
18 changed files with 758 additions and 399 deletions

View File

@ -1,59 +0,0 @@
// Service to clean up dead slots in accounts_db
//
// This can be expensive since we have to walk the append vecs being cleaned up.
use rand::{thread_rng, Rng};
use solana_runtime::bank_forks::BankForks;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
pub struct AccountsBackgroundService {
t_background: JoinHandle<()>,
}
const INTERVAL_MS: u64 = 100;
const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250;
const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize =
SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize);
const CLEAN_INTERVAL_SLOTS: u64 = 100;
impl AccountsBackgroundService {
pub fn new(bank_forks: Arc<RwLock<BankForks>>, exit: &Arc<AtomicBool>) -> Self {
info!("AccountsBackgroundService active");
let exit = exit.clone();
let mut consumed_budget = 0;
let mut last_cleaned_slot = 0;
let t_background = Builder::new()
.name("solana-accounts-background".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let bank = bank_forks.read().unwrap().root_bank().clone();
bank.process_dead_slots();
consumed_budget = bank
.process_stale_slot_with_budget(consumed_budget, SHRUNKEN_ACCOUNT_PER_INTERVAL);
if bank.block_height() - last_cleaned_slot
> (CLEAN_INTERVAL_SLOTS + thread_rng().gen_range(0, 10))
{
bank.clean_accounts();
last_cleaned_slot = bank.block_height();
}
sleep(Duration::from_millis(INTERVAL_MS));
})
.unwrap();
Self { t_background }
}
pub fn join(self) -> thread::Result<()> {
self.t_background.join()
}
}

View File

@ -9,7 +9,6 @@
#[macro_use]
extern crate solana_bpf_loader_program;
pub mod accounts_background_service;
pub mod accounts_hash_verifier;
pub mod banking_stage;
pub mod bigtable_upload_service;

View File

@ -29,8 +29,8 @@ use solana_ledger::{
use solana_measure::{measure::Measure, thread_mem_usage};
use solana_metrics::inc_new_counter_info;
use solana_runtime::{
bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache,
snapshot_package::AccountsPackageSender, vote_sender_types::ReplayVoteSender,
accounts_background_service::SnapshotRequestSender, bank::Bank, bank_forks::BankForks,
commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender,
};
use solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
@ -103,7 +103,7 @@ pub struct ReplayStageConfig {
pub subscriptions: Arc<RpcSubscriptions>,
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
pub latest_root_senders: Vec<Sender<Slot>>,
pub accounts_hash_sender: Option<AccountsPackageSender>,
pub snapshot_request_sender: Option<SnapshotRequestSender>,
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
@ -234,7 +234,7 @@ impl ReplayStage {
subscriptions,
leader_schedule_cache,
latest_root_senders,
accounts_hash_sender,
snapshot_request_sender,
block_commitment_cache,
transaction_status_sender,
rewards_recorder_sender,
@ -455,7 +455,7 @@ impl ReplayStage {
&blockstore,
&leader_schedule_cache,
&lockouts_sender,
&accounts_hash_sender,
&snapshot_request_sender,
&latest_root_senders,
&mut all_pubkeys,
&subscriptions,
@ -1025,7 +1025,7 @@ impl ReplayStage {
blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
lockouts_sender: &Sender<CommitmentAggregationData>,
accounts_hash_sender: &Option<AccountsPackageSender>,
snapshot_request_sender: &Option<SnapshotRequestSender>,
latest_root_senders: &[Sender<Slot>],
all_pubkeys: &mut PubkeyReferences,
subscriptions: &Arc<RpcSubscriptions>,
@ -1081,7 +1081,7 @@ impl ReplayStage {
new_root,
&bank_forks,
progress,
accounts_hash_sender,
snapshot_request_sender,
all_pubkeys,
highest_confirmed_root,
heaviest_subtree_fork_choice,
@ -1778,7 +1778,7 @@ impl ReplayStage {
new_root: Slot,
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,
accounts_hash_sender: &Option<AccountsPackageSender>,
snapshot_request_sender: &Option<SnapshotRequestSender>,
all_pubkeys: &mut PubkeyReferences,
highest_confirmed_root: Option<Slot>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
@ -1786,7 +1786,7 @@ impl ReplayStage {
let old_epoch = bank_forks.read().unwrap().root_bank().epoch();
bank_forks.write().unwrap().set_root(
new_root,
accounts_hash_sender,
snapshot_request_sender,
highest_confirmed_root,
);
let r_bank_forks = bank_forks.read().unwrap();

View File

@ -2,7 +2,6 @@
//! validation pipeline in software.
use crate::{
accounts_background_service::AccountsBackgroundService,
accounts_hash_verifier::AccountsHashVerifier,
broadcast_stage::RetransmitSlotsSender,
cache_block_time_service::CacheBlockTimeSender,
@ -28,8 +27,11 @@ use solana_ledger::{
leader_schedule_cache::LeaderScheduleCache,
};
use solana_runtime::{
bank_forks::BankForks, commitment::BlockCommitmentCache,
snapshot_package::AccountsPackageSender, vote_sender_types::ReplayVoteSender,
accounts_background_service::{AccountsBackgroundService, SnapshotRequestHandler},
bank_forks::{BankForks, SnapshotConfig},
commitment::BlockCommitmentCache,
snapshot_package::AccountsPackageSender,
vote_sender_types::ReplayVoteSender,
};
use solana_sdk::{
pubkey::Pubkey,
@ -100,7 +102,7 @@ impl Tvu {
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_time_sender: Option<CacheBlockTimeSender>,
snapshot_package_sender: Option<AccountsPackageSender>,
snapshot_config_and_package_sender: Option<(SnapshotConfig, AccountsPackageSender)>,
vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender,
verified_vote_receiver: VerifiedVoteReceiver,
@ -171,10 +173,15 @@ impl Tvu {
}
};
info!("snapshot_interval_slots: {}", snapshot_interval_slots);
let (snapshot_config, accounts_package_sender) = snapshot_config_and_package_sender
.map(|(snapshot_config, accounts_package_sender)| {
(Some(snapshot_config), Some(accounts_package_sender))
})
.unwrap_or((None, None));
let (accounts_hash_sender, accounts_hash_receiver) = channel();
let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_hash_receiver,
snapshot_package_sender,
accounts_package_sender,
exit,
&cluster_info,
tvu_config.trusted_validators.clone(),
@ -183,6 +190,22 @@ impl Tvu {
snapshot_interval_slots,
);
let (snapshot_request_sender, snapshot_request_handler) = {
snapshot_config
.map(|snapshot_config| {
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
(
Some(snapshot_request_sender),
Some(SnapshotRequestHandler {
snapshot_config,
snapshot_request_receiver,
accounts_package_sender: accounts_hash_sender,
}),
)
})
.unwrap_or((None, None))
};
let replay_stage_config = ReplayStageConfig {
my_pubkey: keypair.pubkey(),
vote_account: *vote_account,
@ -191,7 +214,7 @@ impl Tvu {
subscriptions: subscriptions.clone(),
leader_schedule_cache: leader_schedule_cache.clone(),
latest_root_senders: vec![ledger_cleanup_slot_sender],
accounts_hash_sender: Some(accounts_hash_sender),
snapshot_request_sender,
block_commitment_cache,
transaction_status_sender,
rewards_recorder_sender,
@ -222,7 +245,8 @@ impl Tvu {
)
});
let accounts_background_service = AccountsBackgroundService::new(bank_forks.clone(), &exit);
let accounts_background_service =
AccountsBackgroundService::new(bank_forks.clone(), &exit, snapshot_request_handler);
Tvu {
fetch_stage,

View File

@ -454,13 +454,16 @@ impl Validator {
cluster_info.set_entrypoint(cluster_entrypoint.clone());
}
let (snapshot_packager_service, snapshot_package_sender) =
if config.snapshot_config.is_some() {
let (snapshot_packager_service, snapshot_config_and_package_sender) =
if let Some(snapshot_config) = config.snapshot_config.clone() {
// Start a snapshot packaging service
let (sender, receiver) = channel();
let snapshot_packager_service =
SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info);
(Some(snapshot_packager_service), Some(sender))
(
Some(snapshot_packager_service),
Some((snapshot_config, sender)),
)
} else {
(None, None)
};
@ -523,7 +526,7 @@ impl Validator {
transaction_status_sender.clone(),
rewards_recorder_sender,
cache_block_time_sender,
snapshot_package_sender,
snapshot_config_and_package_sender,
vote_tracker.clone(),
retransmit_slots_sender,
verified_vote_receiver,

View File

@ -35,12 +35,15 @@ macro_rules! DEFINE_SNAPSHOT_VERSION_PARAMETERIZED_TEST_FUNCTIONS {
#[cfg(test)]
mod tests {
use bincode::serialize_into;
use crossbeam_channel::unbounded;
use fs_extra::dir::CopyOptions;
use itertools::Itertools;
use solana_core::cluster_info::ClusterInfo;
use solana_core::contact_info::ContactInfo;
use solana_core::snapshot_packager_service::SnapshotPackagerService;
use solana_core::{
cluster_info::ClusterInfo, contact_info::ContactInfo,
snapshot_packager_service::SnapshotPackagerService,
};
use solana_runtime::{
accounts_background_service::SnapshotRequestHandler,
bank::{Bank, BankSlotDelta},
bank_forks::{BankForks, CompressionType, SnapshotConfig},
genesis_utils::{create_genesis_config, GenesisConfigInfo},
@ -182,8 +185,14 @@ mod tests {
let bank_forks = &mut snapshot_test_config.bank_forks;
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
let (s, _r) = channel();
let sender = Some(s);
let (s, snapshot_request_receiver) = unbounded();
let (accounts_package_sender, _r) = channel();
let snapshot_request_sender = Some(s);
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,
accounts_package_sender,
};
for slot in 0..last_slot {
let mut bank = Bank::new_from_parent(&bank_forks[slot], &Pubkey::default(), slot + 1);
f(&mut bank, &mint_keypair);
@ -192,7 +201,9 @@ mod tests {
// and to allow snapshotting of bank and the purging logic on status_cache to
// kick in
if slot % set_root_interval == 0 || slot == last_slot - 1 {
bank_forks.set_root(bank.slot(), &sender, None);
// set_root should send a snapshot request
bank_forks.set_root(bank.slot(), &snapshot_request_sender, None);
snapshot_request_handler.handle_snapshot_requests();
}
}
@ -207,7 +218,7 @@ mod tests {
last_bank,
&last_slot_snapshot_path,
snapshot_path,
&last_bank.src.roots(),
last_bank.src.slot_deltas(&last_bank.src.roots()),
&snapshot_config.snapshot_package_output_path,
last_bank.get_snapshot_storages(),
CompressionType::Bzip2,
@ -312,7 +323,6 @@ mod tests {
assert_eq!(bank.process_transaction(&tx), Ok(()));
bank.squash();
let accounts_hash = bank.update_accounts_hash();
bank_forks.insert(bank);
let package_sender = {
if slot == saved_slot as u64 {
@ -325,10 +335,18 @@ mod tests {
}
};
bank_forks
.generate_accounts_package(slot, &[], &package_sender)
.unwrap();
snapshot_utils::snapshot_bank(
&bank,
vec![],
&package_sender,
&snapshot_path,
&snapshot_package_output_path,
snapshot_config.snapshot_version,
&snapshot_config.compression,
)
.unwrap();
bank_forks.insert(bank);
if slot == saved_slot as u64 {
let options = CopyOptions::new();
fs_extra::dir::copy(accounts_dir, &saved_accounts_dir, &options).unwrap();
@ -359,7 +377,7 @@ mod tests {
// Purge all the outdated snapshots, including the ones needed to generate the package
// currently sitting in the channel
bank_forks.purge_old_snapshots();
snapshot_utils::purge_old_snapshots(&snapshot_path);
assert!(snapshot_utils::get_snapshot_paths(&snapshots_dir)
.into_iter()
.map(|path| path.slot)
@ -418,7 +436,7 @@ mod tests {
let num_set_roots = MAX_CACHE_ENTRIES * 2;
for add_root_interval in &[1, 3, 9] {
let (snapshot_sender, _snapshot_receiver) = channel();
let (snapshot_sender, _snapshot_receiver) = unbounded();
// Make sure this test never clears bank.slots_since_snapshot
let mut snapshot_test_config = SnapshotTestConfig::new(
snapshot_version,