diff --git a/core/src/drop_bank_service.rs b/core/src/drop_bank_service.rs new file mode 100644 index 0000000000..a53f6f1520 --- /dev/null +++ b/core/src/drop_bank_service.rs @@ -0,0 +1,38 @@ +use solana_measure::measure::Measure; +use solana_runtime::bank::Bank; +use std::{ + sync::{mpsc::Receiver, Arc}, + thread::{self, Builder, JoinHandle}, +}; + +pub struct DropBankService { + thread_hdl: JoinHandle<()>, +} + +impl DropBankService { + pub fn new(bank_receiver: Receiver>>) -> Self { + let thread_hdl = Builder::new() + .name("sol-drop-b-service".to_string()) + .spawn(move || { + for banks in bank_receiver.iter() { + let len = banks.len(); + let mut dropped_banks_time = Measure::start("drop_banks"); + drop(banks); + dropped_banks_time.stop(); + if dropped_banks_time.as_ms() > 10 { + datapoint_info!( + "handle_new_root-dropped_banks", + ("elapsed_ms", dropped_banks_time.as_ms(), i64), + ("len", len, i64) + ); + } + } + }) + .unwrap(); + Self { thread_hdl } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 45f58e6e21..525a1b0d8f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -21,6 +21,7 @@ pub mod commitment_service; pub mod completed_data_sets_service; pub mod consensus; pub mod cost_update_service; +pub mod drop_bank_service; pub mod duplicate_repair_status; pub mod fetch_stage; pub mod fork_choice; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 62684d008b..f6e33960f1 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -326,6 +326,7 @@ impl ReplayStage { cluster_slots_update_sender: ClusterSlotsUpdateSender, cost_update_sender: Sender, voting_sender: Sender, + drop_bank_sender: Sender>>, ) -> Self { let ReplayStageConfig { vote_account, @@ -644,6 +645,7 @@ impl ReplayStage { &mut replay_timing, &voting_sender, &mut epoch_slots_frozen_slots, + &drop_bank_sender, ); }; voting_time.stop(); @@ -1615,6 +1617,7 @@ impl ReplayStage { replay_timing: &mut ReplayTiming, voting_sender: &Sender, epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, + bank_drop_sender: &Sender>>, ) { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -1665,6 +1668,7 @@ impl ReplayStage { has_new_vote_been_rooted, vote_signatures, epoch_slots_frozen_slots, + bank_drop_sender, ); rpc_subscriptions.notify_roots(rooted_slots); if let Some(sender) = bank_notification_sender { @@ -2723,21 +2727,17 @@ impl ReplayStage { has_new_vote_been_rooted: &mut bool, voted_signatures: &mut Vec, epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, + bank_drop_sender: &Sender>>, ) { let removed_banks = bank_forks.write().unwrap().set_root( new_root, accounts_background_request_sender, highest_confirmed_root, ); - let mut dropped_banks_time = Measure::start("handle_new_root::drop_banks"); - drop(removed_banks); - dropped_banks_time.stop(); - if dropped_banks_time.as_ms() > 10 { - datapoint_info!( - "handle_new_root-dropped_banks", - ("elapsed_ms", dropped_banks_time.as_ms(), i64) - ); - } + bank_drop_sender + .send(removed_banks) + .unwrap_or_else(|err| warn!("bank drop failed: {:?}", err)); + // Dropping the bank_forks write lock and reacquiring as a read lock is // safe because updates to bank_forks are only made by a single thread. let r_bank_forks = bank_forks.read().unwrap(); @@ -3209,6 +3209,7 @@ pub mod tests { .into_iter() .map(|slot| (slot, Hash::default())) .collect(); + let (bank_drop_sender, _bank_drop_receiver) = channel(); ReplayStage::handle_new_root( root, &bank_forks, @@ -3222,6 +3223,7 @@ pub mod tests { &mut true, &mut Vec::new(), &mut epoch_slots_frozen_slots, + &bank_drop_sender, ); assert_eq!(bank_forks.read().unwrap().root(), root); assert_eq!(progress.len(), 1); @@ -3288,6 +3290,7 @@ pub mod tests { for i in 0..=root { progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0)); } + let (bank_drop_sender, _bank_drop_receiver) = channel(); ReplayStage::handle_new_root( root, &bank_forks, @@ -3301,6 +3304,7 @@ pub mod tests { &mut true, &mut Vec::new(), &mut EpochSlotsFrozenSlots::default(), + &bank_drop_sender, ); assert_eq!(bank_forks.read().unwrap().root(), root); assert!(bank_forks.read().unwrap().get(confirmed_root).is_some()); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index eb790a13ce..387d6baeb1 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -13,6 +13,7 @@ use crate::{ completed_data_sets_service::CompletedDataSetsSender, consensus::Tower, cost_update_service::CostUpdateService, + drop_bank_service::DropBankService, ledger_cleanup_service::LedgerCleanupService, replay_stage::{ReplayStage, ReplayStageConfig}, retransmit_stage::RetransmitStage, @@ -69,6 +70,7 @@ pub struct Tvu { accounts_hash_verifier: AccountsHashVerifier, cost_update_service: CostUpdateService, voting_service: VotingService, + drop_bank_service: DropBankService, } pub struct Sockets { @@ -302,6 +304,9 @@ impl Tvu { cost_update_receiver, ); + let (drop_bank_sender, drop_bank_receiver) = channel(); + let drop_bank_service = DropBankService::new(drop_bank_receiver); + let replay_stage = ReplayStage::new( replay_stage_config, blockstore.clone(), @@ -321,6 +326,7 @@ impl Tvu { cluster_slots_update_sender, cost_update_sender, voting_sender, + drop_bank_sender, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -354,6 +360,7 @@ impl Tvu { accounts_hash_verifier, cost_update_service, voting_service, + drop_bank_service, } } @@ -369,6 +376,7 @@ impl Tvu { self.accounts_hash_verifier.join()?; self.cost_update_service.join()?; self.voting_service.join()?; + self.drop_bank_service.join()?; Ok(()) } } diff --git a/core/src/vote_simulator.rs b/core/src/vote_simulator.rs index b03fd5ec78..43a0ba2d83 100644 --- a/core/src/vote_simulator.rs +++ b/core/src/vote_simulator.rs @@ -202,6 +202,7 @@ impl VoteSimulator { } pub fn set_root(&mut self, new_root: Slot) { + let (drop_bank_sender, _drop_bank_receiver) = std::sync::mpsc::channel(); ReplayStage::handle_new_root( new_root, &self.bank_forks, @@ -215,6 +216,7 @@ impl VoteSimulator { &mut true, &mut Vec::new(), &mut EpochSlotsFrozenSlots::default(), + &drop_bank_sender, ) }