Add ReplayStageConfig (#7195)

This commit is contained in:
TristanDebrunner
2019-12-04 11:17:17 -07:00
committed by GitHub
parent c3e7deb4b6
commit fae9c08815
3 changed files with 66 additions and 64 deletions

View File

@ -25,7 +25,7 @@ use solana_sdk::{
clock::Slot, clock::Slot,
hash::Hash, hash::Hash,
pubkey::Pubkey, pubkey::Pubkey,
signature::KeypairUtil, signature::{Keypair, KeypairUtil},
timing::{self, duration_as_ms}, timing::{self, duration_as_ms},
transaction::Transaction, transaction::Transaction,
}; };
@ -64,6 +64,24 @@ impl Drop for Finalizer {
} }
} }
pub struct ReplayStageConfig {
pub my_pubkey: Pubkey,
pub vote_account: Pubkey,
pub voting_keypair: Option<Arc<Keypair>>,
pub blocktree: Arc<Blocktree>,
pub bank_forks: Arc<RwLock<BankForks>>,
pub cluster_info: Arc<RwLock<ClusterInfo>>,
pub exit: Arc<AtomicBool>,
pub ledger_signal_receiver: Receiver<bool>,
pub subscriptions: Arc<RpcSubscriptions>,
pub poh_recorder: Arc<Mutex<PohRecorder>>,
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
pub slot_full_senders: Vec<Sender<(u64, Pubkey)>>,
pub snapshot_package_sender: Option<SnapshotPackageSender>,
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>,
}
pub struct ReplayStage { pub struct ReplayStage {
t_replay: JoinHandle<Result<()>>, t_replay: JoinHandle<Result<()>>,
commitment_service: AggregateCommitmentService, commitment_service: AggregateCommitmentService,
@ -162,51 +180,39 @@ impl ForkProgress {
} }
impl ReplayStage { impl ReplayStage {
#[allow( #[allow(clippy::new_ret_no_self)]
clippy::new_ret_no_self, pub fn new(config: ReplayStageConfig) -> (Self, Receiver<Vec<Arc<Bank>>>) {
clippy::too_many_arguments, let ReplayStageConfig {
clippy::type_complexity my_pubkey,
)] vote_account,
pub fn new<T>( voting_keypair,
my_pubkey: &Pubkey, blocktree,
vote_account: &Pubkey, bank_forks,
voting_keypair: Option<&Arc<T>>, cluster_info,
blocktree: Arc<Blocktree>, exit,
bank_forks: &Arc<RwLock<BankForks>>, ledger_signal_receiver,
cluster_info: Arc<RwLock<ClusterInfo>>, subscriptions,
exit: &Arc<AtomicBool>, poh_recorder,
ledger_signal_receiver: Receiver<bool>, leader_schedule_cache,
subscriptions: &Arc<RpcSubscriptions>, slot_full_senders,
poh_recorder: &Arc<Mutex<PohRecorder>>, snapshot_package_sender,
leader_schedule_cache: &Arc<LeaderScheduleCache>, block_commitment_cache,
slot_full_senders: Vec<Sender<(u64, Pubkey)>>, transaction_status_sender,
snapshot_package_sender: Option<SnapshotPackageSender>, } = config;
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (Self, Receiver<Vec<Arc<Bank>>>)
where
T: 'static + KeypairUtil + Send + Sync,
{
let (root_bank_sender, root_bank_receiver) = channel(); let (root_bank_sender, root_bank_receiver) = channel();
trace!("replay stage"); trace!("replay stage");
let exit_ = exit.clone();
let subscriptions = subscriptions.clone();
let bank_forks = bank_forks.clone();
let poh_recorder = poh_recorder.clone();
let my_pubkey = *my_pubkey;
let mut tower = Tower::new(&my_pubkey, &vote_account, &bank_forks.read().unwrap()); let mut tower = Tower::new(&my_pubkey, &vote_account, &bank_forks.read().unwrap());
// Start the replay stage loop // Start the replay stage loop
let leader_schedule_cache = leader_schedule_cache.clone();
let vote_account = *vote_account;
let voting_keypair = voting_keypair.cloned();
let (lockouts_sender, commitment_service) = let (lockouts_sender, commitment_service) =
AggregateCommitmentService::new(exit, block_commitment_cache); AggregateCommitmentService::new(&exit, block_commitment_cache);
let t_replay = Builder::new() let t_replay = Builder::new()
.name("solana-replay-stage".to_string()) .name("solana-replay-stage".to_string())
.spawn(move || { .spawn(move || {
let _exit = Finalizer::new(exit_.clone()); let _exit = Finalizer::new(exit.clone());
let mut progress = HashMap::new(); let mut progress = HashMap::new();
// Initialize progress map with any root banks // Initialize progress map with any root banks
for bank in bank_forks.read().unwrap().frozen_banks().values() { for bank in bank_forks.read().unwrap().frozen_banks().values() {
@ -224,7 +230,7 @@ impl ReplayStage {
thread_mem_usage::datapoint("solana-replay-stage"); thread_mem_usage::datapoint("solana-replay-stage");
let now = Instant::now(); let now = Instant::now();
// Stop getting entries if we get exit signal // Stop getting entries if we get exit signal
if exit_.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }
@ -593,13 +599,13 @@ impl ReplayStage {
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn handle_votable_bank<T>( fn handle_votable_bank(
bank: &Arc<Bank>, bank: &Arc<Bank>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
tower: &mut Tower, tower: &mut Tower,
progress: &mut HashMap<u64, ForkProgress>, progress: &mut HashMap<u64, ForkProgress>,
vote_account: &Pubkey, vote_account: &Pubkey,
voting_keypair: &Option<Arc<T>>, voting_keypair: &Option<Arc<Keypair>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
@ -607,10 +613,7 @@ impl ReplayStage {
total_staked: u64, total_staked: u64,
lockouts_sender: &Sender<CommitmentAggregationData>, lockouts_sender: &Sender<CommitmentAggregationData>,
snapshot_package_sender: &Option<SnapshotPackageSender>, snapshot_package_sender: &Option<SnapshotPackageSender>,
) -> Result<()> ) -> Result<()> {
where
T: 'static + KeypairUtil + Send + Sync,
{
if bank.is_empty() { if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1); inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
} }

View File

@ -8,7 +8,7 @@ use crate::{
ledger_cleanup_service::LedgerCleanupService, ledger_cleanup_service::LedgerCleanupService,
partition_cfg::PartitionCfg, partition_cfg::PartitionCfg,
poh_recorder::PohRecorder, poh_recorder::PohRecorder,
replay_stage::ReplayStage, replay_stage::{ReplayStage, ReplayStageConfig},
retransmit_stage::RetransmitStage, retransmit_stage::RetransmitStage,
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
shred_fetch_stage::ShredFetchStage, shred_fetch_stage::ShredFetchStage,
@ -65,9 +65,9 @@ impl Tvu {
/// * `sockets` - fetch, repair, and retransmit sockets /// * `sockets` - fetch, repair, and retransmit sockets
/// * `blocktree` - the ledger itself /// * `blocktree` - the ledger itself
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new<T>( pub fn new(
vote_account: &Pubkey, vote_account: &Pubkey,
voting_keypair: Option<&Arc<T>>, voting_keypair: Option<Arc<Keypair>>,
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
@ -87,10 +87,7 @@ impl Tvu {
cfg: Option<PartitionCfg>, cfg: Option<PartitionCfg>,
shred_version: u16, shred_version: u16,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
) -> Self ) -> Self {
where
T: 'static + KeypairUtil + Sync + Send,
{
let keypair: Arc<Keypair> = cluster_info let keypair: Arc<Keypair> = cluster_info
.read() .read()
.expect("Unable to read from cluster_info during Tvu creation") .expect("Unable to read from cluster_info during Tvu creation")
@ -162,23 +159,25 @@ impl Tvu {
} }
}; };
let (replay_stage, root_bank_receiver) = ReplayStage::new( let replay_stage_config = ReplayStageConfig {
&keypair.pubkey(), my_pubkey: keypair.pubkey(),
vote_account, vote_account: *vote_account,
voting_keypair, voting_keypair,
blocktree.clone(), blocktree: blocktree.clone(),
&bank_forks, bank_forks: bank_forks.clone(),
cluster_info.clone(), cluster_info: cluster_info.clone(),
&exit, exit: exit.clone(),
ledger_signal_receiver, ledger_signal_receiver,
subscriptions, subscriptions: subscriptions.clone(),
poh_recorder, poh_recorder: poh_recorder.clone(),
leader_schedule_cache, leader_schedule_cache: leader_schedule_cache.clone(),
vec![blockstream_slot_sender, ledger_cleanup_slot_sender], slot_full_senders: vec![blockstream_slot_sender, ledger_cleanup_slot_sender],
snapshot_package_sender, snapshot_package_sender,
block_commitment_cache, block_commitment_cache,
transaction_status_sender, transaction_status_sender,
); };
let (replay_stage, root_bank_receiver) = ReplayStage::new(replay_stage_config);
let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket { let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket {
let blockstream_service = BlockstreamService::new( let blockstream_service = BlockstreamService::new(
@ -284,7 +283,7 @@ pub mod tests {
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let tvu = Tvu::new( let tvu = Tvu::new(
&voting_keypair.pubkey(), &voting_keypair.pubkey(),
Some(&Arc::new(voting_keypair)), Some(Arc::new(voting_keypair)),
&storage_keypair, &storage_keypair,
&Arc::new(RwLock::new(bank_forks)), &Arc::new(RwLock::new(bank_forks)),
&cref1, &cref1,

View File

@ -350,7 +350,7 @@ impl Validator {
let voting_keypair = if config.voting_disabled { let voting_keypair = if config.voting_disabled {
None None
} else { } else {
Some(voting_keypair) Some(voting_keypair.clone())
}; };
let tvu = Tvu::new( let tvu = Tvu::new(