diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index ee8aa6f590..696b29ab43 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -107,12 +107,13 @@ impl BankForks { } } - pub fn insert(&mut self, bank: Bank) { + pub fn insert(&mut self, bank: Bank) -> Arc { let bank = Arc::new(bank); let prev = self.banks.insert(bank.slot(), bank.clone()); assert!(prev.is_none()); self.working_bank = bank.clone(); + bank } // TODO: really want to kill this... diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4badc640c7..48f1447b65 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -78,9 +78,6 @@ pub struct ClusterInfo { pub gossip: CrdsGossip, /// set the keypair that will be used to sign crds values generated. It is unset only in tests. pub(crate) keypair: Arc, - // TODO: remove gossip_leader_pubkey once all usage of `set_leader()` and `leader_data()` is - // purged - gossip_leader_pubkey: Pubkey, /// The network entrypoint entrypoint: Option, } @@ -181,7 +178,6 @@ impl ClusterInfo { let mut me = Self { gossip: CrdsGossip::default(), keypair, - gossip_leader_pubkey: Pubkey::default(), entrypoint: None, }; let id = contact_info.id; @@ -237,15 +233,6 @@ impl ClusterInfo { self.lookup(&self.id()).cloned().unwrap() } - // Deprecated: don't use leader_data(). - pub fn leader_data(&self) -> Option<&ContactInfo> { - let leader_pubkey = self.gossip_leader_pubkey; - if leader_pubkey == Pubkey::default() { - return None; - } - self.lookup(&leader_pubkey) - } - pub fn contact_info_trace(&self) -> String { let now = timestamp(); let mut spy_nodes = 0; @@ -302,17 +289,6 @@ impl ClusterInfo { ) } - /// Record the id of the current leader for use by `leader_tpu_via_blobs()` - pub fn set_leader(&mut self, leader_pubkey: &Pubkey) { - if *leader_pubkey != self.gossip_leader_pubkey { - warn!( - "{}: LEADER_UPDATE TO {} from {}", - self.gossip.id, leader_pubkey, self.gossip_leader_pubkey, - ); - self.gossip_leader_pubkey = *leader_pubkey; - } - } - pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: BTreeSet) { let now = timestamp(); let mut entry = CrdsValue::EpochSlots(EpochSlots::new(id, root, slots, now)); @@ -1949,17 +1925,6 @@ mod tests { Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); } - #[test] - fn test_default_leader() { - solana_logger::setup(); - let contact_info = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); - let network_entry_point = - ContactInfo::new_gossip_entry_point(&socketaddr!("127.0.0.1:1239")); - cluster_info.insert_info(network_entry_point); - assert!(cluster_info.leader_data().is_none()); - } - fn assert_in_range(x: u16, range: (u16, u16)) { assert!(x >= range.0); assert!(x < range.1); @@ -2043,12 +2008,9 @@ mod tests { //create new cluster info, leader, and peer let keypair = Keypair::new(); let peer_keypair = Keypair::new(); - let leader_keypair = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); - let leader = ContactInfo::new_localhost(&leader_keypair.pubkey(), 0); let peer = ContactInfo::new_localhost(&peer_keypair.pubkey(), 0); let mut cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair)); - cluster_info.set_leader(&leader.id); cluster_info.insert_info(peer.clone()); cluster_info.gossip.refresh_push_active_set(&HashMap::new()); //check that all types of gossip messages are signed correctly diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index 361a5cb40e..9d77a20bff 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -147,7 +147,7 @@ impl ContactInfo { } #[cfg(test)] - fn new_with_pubkey_socketaddr(pubkey: &Pubkey, bind_addr: &SocketAddr) -> Self { + pub(crate) fn new_with_pubkey_socketaddr(pubkey: &Pubkey, bind_addr: &SocketAddr) -> Self { fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { let mut nxt_addr = *addr; nxt_addr.set_port(addr.port() + nxt); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 652916fe84..2d7666408b 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -163,7 +163,6 @@ impl ReplayStage { &my_pubkey, &bank_forks, &poh_recorder, - &cluster_info, &leader_schedule_cache, ); @@ -189,10 +188,9 @@ impl ReplayStage { my_pubkey: &Pubkey, bank_forks: &Arc>, poh_recorder: &Arc>, - cluster_info: &Arc>, leader_schedule_cache: &Arc, ) { - let (reached_leader_tick, grace_ticks, poh_slot, parent_slot) = { + let (grace_ticks, poh_slot, parent_slot) = { let poh_recorder = poh_recorder.lock().unwrap(); // we're done @@ -201,19 +199,26 @@ impl ReplayStage { return; } - poh_recorder.reached_leader_tick() + let (reached_leader_tick, grace_ticks, poh_slot, parent_slot) = + poh_recorder.reached_leader_tick(); + + if !reached_leader_tick { + trace!("{} poh_recorder hasn't reached_leader_tick", my_pubkey); + return; + } + + (grace_ticks, poh_slot, parent_slot) }; trace!( - "{} reached_leader_tick: {} poh_slot: {} parent_slot: {}", + "{} reached_leader_tick, poh_slot: {} parent_slot: {}", my_pubkey, - reached_leader_tick, poh_slot, parent_slot, ); if bank_forks.read().unwrap().get(poh_slot).is_some() { - trace!("{} already have bank in forks at {}", my_pubkey, poh_slot); + warn!("{} already have bank in forks at {}", my_pubkey, poh_slot); return; } @@ -242,33 +247,30 @@ impl ReplayStage { next_leader, poh_slot ); - // TODO: remove me? - cluster_info.write().unwrap().set_leader(&next_leader); - if next_leader == *my_pubkey && reached_leader_tick { - trace!("{} starting tpu for slot {}", my_pubkey, poh_slot); - datapoint_warn!( - "replay_stage-new_leader", - ("count", poh_slot, i64), - ("grace", grace_ticks, i64) - ); - - let tpu_bank = Bank::new_from_parent(&parent, my_pubkey, poh_slot); - bank_forks.write().unwrap().insert(tpu_bank); - if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() { - assert_eq!( - bank_forks.read().unwrap().working_bank().slot(), - tpu_bank.slot() - ); - debug!( - "poh_recorder new working bank: me: {} next_slot: {} next_leader: {}", - my_pubkey, - tpu_bank.slot(), - next_leader - ); - poh_recorder.lock().unwrap().set_bank(&tpu_bank); - } + // I guess I missed my slot + if next_leader != *my_pubkey { + return; } + + datapoint_warn!( + "replay_stage-new_leader", + ("count", poh_slot, i64), + ("grace", grace_ticks, i64) + ); + + let tpu_bank = bank_forks + .write() + .unwrap() + .insert(Bank::new_from_parent(&parent, my_pubkey, poh_slot)); + + info!( + "poh_recorder new working bank: me: {} next_slot: {} next_leader: {}", + my_pubkey, + tpu_bank.slot(), + next_leader + ); + poh_recorder.lock().unwrap().set_bank(&tpu_bank); } else { error!("{} No next leader found", my_pubkey); } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 7065b8d610..0b6c3b9752 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -109,6 +109,10 @@ impl JsonRpcRequestProcessor { Ok(self.bank().slot()) } + fn get_slot_leader(&self) -> Result { + Ok(self.bank().collector_id().to_string()) + } + fn get_transaction_count(&self) -> Result { Ok(self.bank().transaction_count() as u64) } @@ -512,12 +516,7 @@ impl RpcSol for RpcSolImpl { } fn get_slot_leader(&self, meta: Self::Metadata) -> Result { - let cluster_info = meta.cluster_info.read().unwrap(); - let leader_data_option = cluster_info.leader_data(); - Ok(leader_data_option - .and_then(|leader_data| Some(leader_data.id)) - .unwrap_or_default() - .to_string()) + meta.request_processor.read().unwrap().get_slot_leader() } fn get_epoch_vote_accounts(&self, meta: Self::Metadata) -> Result> { @@ -577,6 +576,7 @@ mod tests { ) -> (MetaIoHandler, Meta, Arc, Hash, Keypair, Pubkey) { let (bank_forks, alice) = new_bank_forks(); let bank = bank_forks.read().unwrap().working_bank(); + let leader_pubkey = *bank.collector_id(); let exit = Arc::new(AtomicBool::new(false)); let blockhash = bank.confirmed_last_blockhash().0; @@ -595,9 +595,14 @@ mod tests { let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( ContactInfo::default(), ))); - let leader = ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - cluster_info.write().unwrap().insert_info(leader.clone()); + cluster_info + .write() + .unwrap() + .insert_info(ContactInfo::new_with_pubkey_socketaddr( + &leader_pubkey, + &socketaddr!("127.0.0.1:1234"), + )); let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; @@ -606,7 +611,7 @@ mod tests { request_processor, cluster_info, }; - (io, meta, bank, blockhash, alice, leader.id) + (io, meta, bank, blockhash, alice, leader_pubkey) } #[test] @@ -674,13 +679,12 @@ mod tests { #[test] fn test_rpc_get_slot_leader() { let bob_pubkey = Pubkey::new_rand(); - let (io, meta, _bank, _blockhash, _alice, _leader_pubkey) = + let (io, meta, _bank, _blockhash, _alice, leader_pubkey) = start_rpc_handler_with_tx(&bob_pubkey); let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getSlotLeader"}}"#); let res = io.handle_request_sync(&req, meta); - let expected = - format!(r#"{{"jsonrpc":"2.0","result":"11111111111111111111111111111111","id":1}}"#); + let expected = format!(r#"{{"jsonrpc":"2.0","result":"{}","id":1}}"#, leader_pubkey); let expected: Response = serde_json::from_str(&expected).expect("expected response deserialization"); let result: Response = serde_json::from_str(&res.expect("actual response")) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 4b0b416801..630a1b8780 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -11,7 +11,6 @@ use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; use crossbeam_channel::unbounded; -use solana_sdk::pubkey::Pubkey; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, Receiver}; @@ -29,7 +28,6 @@ pub struct Tpu { impl Tpu { #[allow(clippy::too_many_arguments)] pub fn new( - id: &Pubkey, cluster_info: &Arc>, poh_recorder: &Arc>, entry_receiver: Receiver, @@ -41,8 +39,6 @@ impl Tpu { broadcast_type: &BroadcastStageType, exit: &Arc, ) -> Self { - cluster_info.write().unwrap().set_leader(id); - let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( transactions_sockets, diff --git a/core/src/validator.rs b/core/src/validator.rs index b46f917683..8268aa8270 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -256,7 +256,6 @@ impl Validator { } let tpu = Tpu::new( - &id, &cluster_info, &poh_recorder, entry_receiver,