diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 388f3e6168..f065ceb612 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -22,7 +22,7 @@ use { solana_measure::measure::Measure, solana_metrics::{inc_new_counter_error, inc_new_counter_info}, solana_poh::poh_recorder::WorkingBankEntry, - solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_runtime::bank_forks::BankForks, solana_sdk::{ clock::Slot, pubkey::Pubkey, @@ -34,7 +34,7 @@ use { socket::SocketAddrSpace, }, std::{ - collections::HashMap, + collections::{HashMap, HashSet}, iter::repeat, net::UdpSocket, sync::{ @@ -58,8 +58,8 @@ const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); pub(crate) const NUM_INSERT_THREADS: usize = 2; -pub(crate) type RetransmitSlotsSender = CrossbeamSender>>; -pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver>>; +pub(crate) type RetransmitSlotsSender = CrossbeamSender; +pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver; pub(crate) type RecordReceiver = Receiver<(Arc>, Option)>; pub(crate) type TransmitReceiver = Receiver<(Arc>, Option)>; @@ -216,8 +216,8 @@ impl BroadcastStage { return Some(BroadcastStageReturnType::ChannelDisconnected); } Error::RecvTimeout(RecvTimeoutError::Timeout) - | Error::CrossbeamRecvTimeout(CrossbeamRecvTimeoutError::Timeout) => (), - Error::ClusterInfo(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? + | Error::CrossbeamRecvTimeout(CrossbeamRecvTimeoutError::Timeout) + | Error::ClusterInfo(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? _ => { inc_new_counter_error!("streamer-broadcaster-error", 1, 1); error!("{} broadcaster error: {:?}", name, e); @@ -342,33 +342,34 @@ impl BroadcastStage { retransmit_slots_receiver: &RetransmitSlotsReceiver, socket_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { - let timer = Duration::from_millis(100); + const RECV_TIMEOUT: Duration = Duration::from_millis(100); + let retransmit_slots: HashSet = + std::iter::once(retransmit_slots_receiver.recv_timeout(RECV_TIMEOUT)?) + .chain(retransmit_slots_receiver.try_iter()) + .collect(); - // Check for a retransmit signal - let mut retransmit_slots = retransmit_slots_receiver.recv_timeout(timer)?; - while let Ok(new_retransmit_slots) = retransmit_slots_receiver.try_recv() { - retransmit_slots.extend(new_retransmit_slots); - } - - for (_, bank) in retransmit_slots.iter() { - let slot = bank.slot(); + for new_retransmit_slot in retransmit_slots { let data_shreds = Arc::new( blockstore - .get_data_shreds_for_slot(slot, 0) + .get_data_shreds_for_slot(new_retransmit_slot, 0) .expect("My own shreds must be reconstructable"), ); - debug_assert!(data_shreds.iter().all(|shred| shred.slot() == slot)); + debug_assert!(data_shreds + .iter() + .all(|shred| shred.slot() == new_retransmit_slot)); if !data_shreds.is_empty() { socket_sender.send((data_shreds, None))?; } let coding_shreds = Arc::new( blockstore - .get_coding_shreds_for_slot(slot, 0) + .get_coding_shreds_for_slot(new_retransmit_slot, 0) .expect("My own shreds must be reconstructable"), ); - debug_assert!(coding_shreds.iter().all(|shred| shred.slot() == slot)); + debug_assert!(coding_shreds + .iter() + .all(|shred| shred.slot() == new_retransmit_slot)); if !coding_shreds.is_empty() { socket_sender.send((coding_shreds, None))?; } @@ -546,8 +547,6 @@ pub mod test { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let (transmit_sender, transmit_receiver) = channel(); let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000); - let bank0 = Arc::new(Bank::new_for_tests(&genesis_config)); // Make some shreds let updated_slot = 0; @@ -567,12 +566,8 @@ pub mod test { // Insert duplicate retransmit signal, blocks should // only be retransmitted once - retransmit_slots_sender - .send(vec![(updated_slot, bank0.clone())].into_iter().collect()) - .unwrap(); - retransmit_slots_sender - .send(vec![(updated_slot, bank0)].into_iter().collect()) - .unwrap(); + retransmit_slots_sender.send(updated_slot).unwrap(); + retransmit_slots_sender.send(updated_slot).unwrap(); BroadcastStage::check_retransmit_signals( &blockstore, &retransmit_slots_receiver, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 83f633fd8a..048787b591 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1432,21 +1432,14 @@ impl ReplayStage { progress_map.log_propagated_stats(latest_unconfirmed_leader_slot, bank_forks); skipped_slots_info.last_skipped_slot = poh_slot; } - let bank = bank_forks - .read() - .unwrap() - .get(latest_unconfirmed_leader_slot) - .expect( - "In order for propagated check to fail, \ - latest leader must exist in progress map, and thus also in BankForks", - ) - .clone(); // Signal retransmit if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) { - datapoint_info!("replay_stage-retransmit", ("slot", bank.slot(), i64),); - let _ = retransmit_slots_sender - .send(vec![(bank.slot(), bank.clone())].into_iter().collect()); + datapoint_info!( + "replay_stage-retransmit", + ("slot", latest_unconfirmed_leader_slot, i64), + ); + let _ = retransmit_slots_sender.send(latest_unconfirmed_leader_slot); } return; }