diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 712dec1007..777161d01a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -3,7 +3,6 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::blocktree_processor; -use crate::blocktree_processor::BankForksInfo; use crate::cluster_info::ClusterInfo; use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; use crate::leader_schedule_utils; @@ -59,7 +58,6 @@ impl ReplayStage { voting_keypair: Option>, blocktree: Arc, bank_forks: &Arc>, - _bank_forks_info: &[BankForksInfo], cluster_info: Arc>, exit: Arc, ledger_signal_receiver: Receiver, @@ -77,14 +75,14 @@ impl ReplayStage { let bank_forks = bank_forks.clone(); let poh_recorder = poh_recorder.clone(); - let mut progress = HashMap::new(); - // Start the replay stage loop let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_.clone()); let mut first_block = false; + let mut progress = HashMap::new(); + loop { let now = Instant::now(); // Stop getting entries if we get exit signal @@ -97,7 +95,7 @@ impl ReplayStage { let mut votable: Vec = vec![]; for bank_slot in &active_banks { let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); - if !Self::is_tpu(&bank, my_id) { + if bank.collector_id() != my_id { Self::replay_blocktree_into_bank( &bank, &blocktree, @@ -111,8 +109,9 @@ impl ReplayStage { info!("bank frozen {}", bank.slot()); votable.push(*bank_slot); progress.remove(bank_slot); - let id = leader_schedule_utils::slot_leader_at(bank.slot(), &bank); - if let Err(e) = slot_full_sender.send((bank.slot(), id)) { + if let Err(e) = + slot_full_sender.send((bank.slot(), bank.collector_id())) + { info!("{} slot_full alert failed: {:?}", my_id, e); } } @@ -139,8 +138,9 @@ impl ReplayStage { .unwrap() .clone(); let next_slot = *latest_slot_vote + 1; - let next_leader = leader_schedule_utils::slot_leader_at(next_slot, &parent); - cluster_info.write().unwrap().set_leader(next_leader); + let next_leader_id = + leader_schedule_utils::slot_leader_at(next_slot, &parent); + cluster_info.write().unwrap().set_leader(next_leader_id); subscriptions.notify_subscribers(&parent); @@ -159,9 +159,9 @@ impl ReplayStage { .unwrap() .reset(parent.tick_height(), parent.last_blockhash()); - if next_leader == my_id { - let frozen = bank_forks.read().unwrap().frozen_banks(); - assert!(frozen.get(&next_slot).is_none()); + if next_leader_id == my_id { + let frozen = bank_forks.read().unwrap().frozen_banks(); + assert!(frozen.get(&next_slot).is_none()); assert!(bank_forks.read().unwrap().get(next_slot).is_none()); let tpu_bank = Bank::new_from_parent(&parent, my_id, next_slot); @@ -169,14 +169,17 @@ impl ReplayStage { if let Some(tpu_bank) = bank_forks.read().unwrap().get(next_slot).cloned() { - assert_eq!(bank_forks.read().unwrap().working_bank().slot(), tpu_bank.slot()); + assert_eq!( + bank_forks.read().unwrap().working_bank().slot(), + tpu_bank.slot() + ); debug!( - "poh_recorder new working bank: me: {} next_slot: {} next_leader: {}", + "new working bank: me: {} next_slot: {} next_leader: {}", my_id, tpu_bank.slot(), - next_leader + next_leader_id ); - poh_recorder.lock().unwrap().set_bank(&tpu_bank); + poh_recorder.lock().unwrap().set_bank(&tpu_bank); } } } @@ -256,10 +259,6 @@ impl ReplayStage { result } - pub fn is_tpu(bank: &Bank, my_id: Pubkey) -> bool { - my_id == leader_schedule_utils::slot_leader(&bank) - } - pub fn close(self) -> thread::Result<()> { self.exit(); self.join() @@ -372,7 +371,7 @@ mod test { let exit = Arc::new(AtomicBool::new(false)); let voting_keypair = Arc::new(Keypair::new()); { - let (bank_forks, bank_forks_info, blocktree, l_receiver) = + let (bank_forks, _bank_forks_info, blocktree, l_receiver) = new_banks_from_blocktree(&my_ledger_path, None); let bank = bank_forks.working_bank(); @@ -383,7 +382,6 @@ mod test { Some(voting_keypair.clone()), blocktree.clone(), &Arc::new(RwLock::new(bank_forks)), - &bank_forks_info, cluster_info_me.clone(), exit.clone(), l_receiver, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c3c35c83c9..5488b79d64 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -111,7 +111,6 @@ impl Tvu { voting_keypair, blocktree.clone(), &bank_forks, - &bank_forks_info, cluster_info.clone(), exit.clone(), ledger_signal_receiver, diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d83bb4953a..ea3cac5201 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -191,6 +191,10 @@ impl Bank { bank } + pub fn collector_id(&self) -> Pubkey { + self.collector_id + } + pub fn slot(&self) -> u64 { self.slot }