(cherry picked from commit 0bda0c3e0c
)
Co-authored-by: sakridge <sakridge@gmail.com>
This commit is contained in:
@ -1548,6 +1548,7 @@ pub mod test {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_root(&mut self, new_root: Slot) {
|
pub fn set_root(&mut self, new_root: Slot) {
|
||||||
|
let (bank_drop_sender, _bank_drop_receiver) = std::sync::mpsc::channel();
|
||||||
ReplayStage::handle_new_root(
|
ReplayStage::handle_new_root(
|
||||||
new_root,
|
new_root,
|
||||||
&self.bank_forks,
|
&self.bank_forks,
|
||||||
@ -1560,6 +1561,7 @@ pub mod test {
|
|||||||
&mut UnfrozenGossipVerifiedVoteHashes::default(),
|
&mut UnfrozenGossipVerifiedVoteHashes::default(),
|
||||||
&mut true,
|
&mut true,
|
||||||
&mut Vec::new(),
|
&mut Vec::new(),
|
||||||
|
&bank_drop_sender,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
38
core/src/drop_bank_service.rs
Normal file
38
core/src/drop_bank_service.rs
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
use solana_measure::measure::Measure;
|
||||||
|
use solana_runtime::bank::Bank;
|
||||||
|
use std::{
|
||||||
|
sync::{mpsc::Receiver, Arc},
|
||||||
|
thread::{self, Builder, JoinHandle},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct DropBankService {
|
||||||
|
thread_hdl: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DropBankService {
|
||||||
|
pub fn new(bank_receiver: Receiver<Vec<Arc<Bank>>>) -> Self {
|
||||||
|
let thread_hdl = Builder::new()
|
||||||
|
.name("sol-drop-b-service".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
for banks in bank_receiver.iter() {
|
||||||
|
let len = banks.len();
|
||||||
|
let mut dropped_banks_time = Measure::start("drop_banks");
|
||||||
|
drop(banks);
|
||||||
|
dropped_banks_time.stop();
|
||||||
|
if dropped_banks_time.as_ms() > 10 {
|
||||||
|
datapoint_info!(
|
||||||
|
"handle_new_root-dropped_banks",
|
||||||
|
("elapsed_ms", dropped_banks_time.as_ms(), i64),
|
||||||
|
("len", len, i64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
Self { thread_hdl }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn join(self) -> thread::Result<()> {
|
||||||
|
self.thread_hdl.join()
|
||||||
|
}
|
||||||
|
}
|
@ -20,6 +20,7 @@ pub mod commitment_service;
|
|||||||
pub mod completed_data_sets_service;
|
pub mod completed_data_sets_service;
|
||||||
pub mod consensus;
|
pub mod consensus;
|
||||||
pub mod cost_update_service;
|
pub mod cost_update_service;
|
||||||
|
pub mod drop_bank_service;
|
||||||
pub mod fetch_stage;
|
pub mod fetch_stage;
|
||||||
pub mod fork_choice;
|
pub mod fork_choice;
|
||||||
pub mod gen_keys;
|
pub mod gen_keys;
|
||||||
|
@ -316,6 +316,7 @@ impl ReplayStage {
|
|||||||
cluster_slots_update_sender: ClusterSlotsUpdateSender,
|
cluster_slots_update_sender: ClusterSlotsUpdateSender,
|
||||||
voting_sender: Sender<VoteOp>,
|
voting_sender: Sender<VoteOp>,
|
||||||
cost_update_sender: Sender<CostUpdate>,
|
cost_update_sender: Sender<CostUpdate>,
|
||||||
|
drop_bank_sender: Sender<Vec<Arc<Bank>>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let ReplayStageConfig {
|
let ReplayStageConfig {
|
||||||
my_pubkey,
|
my_pubkey,
|
||||||
@ -610,6 +611,7 @@ impl ReplayStage {
|
|||||||
&mut has_new_vote_been_rooted,
|
&mut has_new_vote_been_rooted,
|
||||||
&mut replay_timing,
|
&mut replay_timing,
|
||||||
&voting_sender,
|
&voting_sender,
|
||||||
|
&drop_bank_sender,
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
voting_time.stop();
|
voting_time.stop();
|
||||||
@ -1334,6 +1336,7 @@ impl ReplayStage {
|
|||||||
has_new_vote_been_rooted: &mut bool,
|
has_new_vote_been_rooted: &mut bool,
|
||||||
replay_timing: &mut ReplayTiming,
|
replay_timing: &mut ReplayTiming,
|
||||||
voting_sender: &Sender<VoteOp>,
|
voting_sender: &Sender<VoteOp>,
|
||||||
|
bank_drop_sender: &Sender<Vec<Arc<Bank>>>,
|
||||||
) {
|
) {
|
||||||
if bank.is_empty() {
|
if bank.is_empty() {
|
||||||
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
|
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
|
||||||
@ -1383,6 +1386,7 @@ impl ReplayStage {
|
|||||||
unfrozen_gossip_verified_vote_hashes,
|
unfrozen_gossip_verified_vote_hashes,
|
||||||
has_new_vote_been_rooted,
|
has_new_vote_been_rooted,
|
||||||
vote_signatures,
|
vote_signatures,
|
||||||
|
bank_drop_sender,
|
||||||
);
|
);
|
||||||
rpc_subscriptions.notify_roots(rooted_slots);
|
rpc_subscriptions.notify_roots(rooted_slots);
|
||||||
if let Some(sender) = bank_notification_sender {
|
if let Some(sender) = bank_notification_sender {
|
||||||
@ -2405,21 +2409,19 @@ impl ReplayStage {
|
|||||||
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
|
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
|
||||||
has_new_vote_been_rooted: &mut bool,
|
has_new_vote_been_rooted: &mut bool,
|
||||||
voted_signatures: &mut Vec<Signature>,
|
voted_signatures: &mut Vec<Signature>,
|
||||||
|
bank_drop_sender: &Sender<Vec<Arc<Bank>>>,
|
||||||
) {
|
) {
|
||||||
let removed_banks = bank_forks.write().unwrap().set_root(
|
let removed_banks = bank_forks.write().unwrap().set_root(
|
||||||
new_root,
|
new_root,
|
||||||
accounts_background_request_sender,
|
accounts_background_request_sender,
|
||||||
highest_confirmed_root,
|
highest_confirmed_root,
|
||||||
);
|
);
|
||||||
let mut dropped_banks_time = Measure::start("handle_new_root::drop_banks");
|
bank_drop_sender
|
||||||
drop(removed_banks);
|
.send(removed_banks)
|
||||||
dropped_banks_time.stop();
|
.unwrap_or_else(|err| warn!("bank drop failed: {:?}", err));
|
||||||
if dropped_banks_time.as_ms() > 10 {
|
|
||||||
datapoint_info!(
|
// Dropping the bank_forks write lock and reacquiring as a read lock is
|
||||||
"handle_new_root-dropped_banks",
|
// safe because updates to bank_forks are only made by a single thread.
|
||||||
("elapsed_ms", dropped_banks_time.as_ms(), i64)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
let r_bank_forks = bank_forks.read().unwrap();
|
let r_bank_forks = bank_forks.read().unwrap();
|
||||||
let new_root_bank = &r_bank_forks[new_root];
|
let new_root_bank = &r_bank_forks[new_root];
|
||||||
if !*has_new_vote_been_rooted {
|
if !*has_new_vote_been_rooted {
|
||||||
@ -2876,6 +2878,7 @@ mod tests {
|
|||||||
.map(|s| (s, HashMap::new()))
|
.map(|s| (s, HashMap::new()))
|
||||||
.collect(),
|
.collect(),
|
||||||
};
|
};
|
||||||
|
let (bank_drop_sender, _bank_drop_receiver) = channel();
|
||||||
ReplayStage::handle_new_root(
|
ReplayStage::handle_new_root(
|
||||||
root,
|
root,
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
@ -2888,6 +2891,7 @@ mod tests {
|
|||||||
&mut unfrozen_gossip_verified_vote_hashes,
|
&mut unfrozen_gossip_verified_vote_hashes,
|
||||||
&mut true,
|
&mut true,
|
||||||
&mut Vec::new(),
|
&mut Vec::new(),
|
||||||
|
&bank_drop_sender,
|
||||||
);
|
);
|
||||||
assert_eq!(bank_forks.read().unwrap().root(), root);
|
assert_eq!(bank_forks.read().unwrap().root(), root);
|
||||||
assert_eq!(progress.len(), 1);
|
assert_eq!(progress.len(), 1);
|
||||||
@ -2947,6 +2951,7 @@ mod tests {
|
|||||||
for i in 0..=root {
|
for i in 0..=root {
|
||||||
progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
|
progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
|
||||||
}
|
}
|
||||||
|
let (bank_drop_sender, _bank_drop_receiver) = channel();
|
||||||
ReplayStage::handle_new_root(
|
ReplayStage::handle_new_root(
|
||||||
root,
|
root,
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
@ -2959,6 +2964,7 @@ mod tests {
|
|||||||
&mut UnfrozenGossipVerifiedVoteHashes::default(),
|
&mut UnfrozenGossipVerifiedVoteHashes::default(),
|
||||||
&mut true,
|
&mut true,
|
||||||
&mut Vec::new(),
|
&mut Vec::new(),
|
||||||
|
&bank_drop_sender,
|
||||||
);
|
);
|
||||||
assert_eq!(bank_forks.read().unwrap().root(), root);
|
assert_eq!(bank_forks.read().unwrap().root(), root);
|
||||||
assert!(bank_forks.read().unwrap().get(confirmed_root).is_some());
|
assert!(bank_forks.read().unwrap().get(confirmed_root).is_some());
|
||||||
|
@ -13,6 +13,7 @@ use crate::{
|
|||||||
completed_data_sets_service::CompletedDataSetsSender,
|
completed_data_sets_service::CompletedDataSetsSender,
|
||||||
consensus::Tower,
|
consensus::Tower,
|
||||||
cost_update_service::CostUpdateService,
|
cost_update_service::CostUpdateService,
|
||||||
|
drop_bank_service::DropBankService,
|
||||||
ledger_cleanup_service::LedgerCleanupService,
|
ledger_cleanup_service::LedgerCleanupService,
|
||||||
replay_stage::{ReplayStage, ReplayStageConfig},
|
replay_stage::{ReplayStage, ReplayStageConfig},
|
||||||
retransmit_stage::RetransmitStage,
|
retransmit_stage::RetransmitStage,
|
||||||
@ -70,6 +71,7 @@ pub struct Tvu {
|
|||||||
accounts_hash_verifier: AccountsHashVerifier,
|
accounts_hash_verifier: AccountsHashVerifier,
|
||||||
voting_service: VotingService,
|
voting_service: VotingService,
|
||||||
cost_update_service: CostUpdateService,
|
cost_update_service: CostUpdateService,
|
||||||
|
drop_bank_service: DropBankService,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Sockets {
|
pub struct Sockets {
|
||||||
@ -297,6 +299,9 @@ impl Tvu {
|
|||||||
cost_update_receiver,
|
cost_update_receiver,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let (drop_bank_sender, drop_bank_receiver) = channel();
|
||||||
|
let drop_bank_service = DropBankService::new(drop_bank_receiver);
|
||||||
|
|
||||||
let replay_stage = ReplayStage::new(
|
let replay_stage = ReplayStage::new(
|
||||||
replay_stage_config,
|
replay_stage_config,
|
||||||
blockstore.clone(),
|
blockstore.clone(),
|
||||||
@ -316,6 +321,7 @@ impl Tvu {
|
|||||||
cluster_slots_update_sender,
|
cluster_slots_update_sender,
|
||||||
voting_sender,
|
voting_sender,
|
||||||
cost_update_sender,
|
cost_update_sender,
|
||||||
|
drop_bank_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
|
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
|
||||||
@ -348,6 +354,7 @@ impl Tvu {
|
|||||||
accounts_hash_verifier,
|
accounts_hash_verifier,
|
||||||
voting_service,
|
voting_service,
|
||||||
cost_update_service,
|
cost_update_service,
|
||||||
|
drop_bank_service,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -363,6 +370,7 @@ impl Tvu {
|
|||||||
self.accounts_hash_verifier.join()?;
|
self.accounts_hash_verifier.join()?;
|
||||||
self.voting_service.join()?;
|
self.voting_service.join()?;
|
||||||
self.cost_update_service.join()?;
|
self.cost_update_service.join()?;
|
||||||
|
self.drop_bank_service.join()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user