Remove need to send bank in retransmit request from ReplayStage (#21943)
* Remove need to send bank in retransmitter
This commit is contained in:
@ -22,7 +22,7 @@ use {
|
|||||||
solana_measure::measure::Measure,
|
solana_measure::measure::Measure,
|
||||||
solana_metrics::{inc_new_counter_error, inc_new_counter_info},
|
solana_metrics::{inc_new_counter_error, inc_new_counter_info},
|
||||||
solana_poh::poh_recorder::WorkingBankEntry,
|
solana_poh::poh_recorder::WorkingBankEntry,
|
||||||
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
solana_runtime::bank_forks::BankForks,
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
clock::Slot,
|
clock::Slot,
|
||||||
pubkey::Pubkey,
|
pubkey::Pubkey,
|
||||||
@ -34,7 +34,7 @@ use {
|
|||||||
socket::SocketAddrSpace,
|
socket::SocketAddrSpace,
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
collections::HashMap,
|
collections::{HashMap, HashSet},
|
||||||
iter::repeat,
|
iter::repeat,
|
||||||
net::UdpSocket,
|
net::UdpSocket,
|
||||||
sync::{
|
sync::{
|
||||||
@ -58,8 +58,8 @@ const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
|
|||||||
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
|
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
pub(crate) const NUM_INSERT_THREADS: usize = 2;
|
pub(crate) const NUM_INSERT_THREADS: usize = 2;
|
||||||
pub(crate) type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>;
|
pub(crate) type RetransmitSlotsSender = CrossbeamSender<Slot>;
|
||||||
pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>;
|
pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver<Slot>;
|
||||||
pub(crate) type RecordReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
|
pub(crate) type RecordReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
|
||||||
pub(crate) type TransmitReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
|
pub(crate) type TransmitReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
|
||||||
|
|
||||||
@ -216,8 +216,8 @@ impl BroadcastStage {
|
|||||||
return Some(BroadcastStageReturnType::ChannelDisconnected);
|
return Some(BroadcastStageReturnType::ChannelDisconnected);
|
||||||
}
|
}
|
||||||
Error::RecvTimeout(RecvTimeoutError::Timeout)
|
Error::RecvTimeout(RecvTimeoutError::Timeout)
|
||||||
| Error::CrossbeamRecvTimeout(CrossbeamRecvTimeoutError::Timeout) => (),
|
| Error::CrossbeamRecvTimeout(CrossbeamRecvTimeoutError::Timeout)
|
||||||
Error::ClusterInfo(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
| Error::ClusterInfo(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
|
||||||
_ => {
|
_ => {
|
||||||
inc_new_counter_error!("streamer-broadcaster-error", 1, 1);
|
inc_new_counter_error!("streamer-broadcaster-error", 1, 1);
|
||||||
error!("{} broadcaster error: {:?}", name, e);
|
error!("{} broadcaster error: {:?}", name, e);
|
||||||
@ -342,33 +342,34 @@ impl BroadcastStage {
|
|||||||
retransmit_slots_receiver: &RetransmitSlotsReceiver,
|
retransmit_slots_receiver: &RetransmitSlotsReceiver,
|
||||||
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::from_millis(100);
|
const RECV_TIMEOUT: Duration = Duration::from_millis(100);
|
||||||
|
let retransmit_slots: HashSet<Slot> =
|
||||||
|
std::iter::once(retransmit_slots_receiver.recv_timeout(RECV_TIMEOUT)?)
|
||||||
|
.chain(retransmit_slots_receiver.try_iter())
|
||||||
|
.collect();
|
||||||
|
|
||||||
// Check for a retransmit signal
|
for new_retransmit_slot in retransmit_slots {
|
||||||
let mut retransmit_slots = retransmit_slots_receiver.recv_timeout(timer)?;
|
|
||||||
while let Ok(new_retransmit_slots) = retransmit_slots_receiver.try_recv() {
|
|
||||||
retransmit_slots.extend(new_retransmit_slots);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (_, bank) in retransmit_slots.iter() {
|
|
||||||
let slot = bank.slot();
|
|
||||||
let data_shreds = Arc::new(
|
let data_shreds = Arc::new(
|
||||||
blockstore
|
blockstore
|
||||||
.get_data_shreds_for_slot(slot, 0)
|
.get_data_shreds_for_slot(new_retransmit_slot, 0)
|
||||||
.expect("My own shreds must be reconstructable"),
|
.expect("My own shreds must be reconstructable"),
|
||||||
);
|
);
|
||||||
debug_assert!(data_shreds.iter().all(|shred| shred.slot() == slot));
|
debug_assert!(data_shreds
|
||||||
|
.iter()
|
||||||
|
.all(|shred| shred.slot() == new_retransmit_slot));
|
||||||
if !data_shreds.is_empty() {
|
if !data_shreds.is_empty() {
|
||||||
socket_sender.send((data_shreds, None))?;
|
socket_sender.send((data_shreds, None))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let coding_shreds = Arc::new(
|
let coding_shreds = Arc::new(
|
||||||
blockstore
|
blockstore
|
||||||
.get_coding_shreds_for_slot(slot, 0)
|
.get_coding_shreds_for_slot(new_retransmit_slot, 0)
|
||||||
.expect("My own shreds must be reconstructable"),
|
.expect("My own shreds must be reconstructable"),
|
||||||
);
|
);
|
||||||
|
|
||||||
debug_assert!(coding_shreds.iter().all(|shred| shred.slot() == slot));
|
debug_assert!(coding_shreds
|
||||||
|
.iter()
|
||||||
|
.all(|shred| shred.slot() == new_retransmit_slot));
|
||||||
if !coding_shreds.is_empty() {
|
if !coding_shreds.is_empty() {
|
||||||
socket_sender.send((coding_shreds, None))?;
|
socket_sender.send((coding_shreds, None))?;
|
||||||
}
|
}
|
||||||
@ -546,8 +547,6 @@ pub mod test {
|
|||||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||||
let (transmit_sender, transmit_receiver) = channel();
|
let (transmit_sender, transmit_receiver) = channel();
|
||||||
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
|
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
|
||||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000);
|
|
||||||
let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
|
|
||||||
|
|
||||||
// Make some shreds
|
// Make some shreds
|
||||||
let updated_slot = 0;
|
let updated_slot = 0;
|
||||||
@ -567,12 +566,8 @@ pub mod test {
|
|||||||
|
|
||||||
// Insert duplicate retransmit signal, blocks should
|
// Insert duplicate retransmit signal, blocks should
|
||||||
// only be retransmitted once
|
// only be retransmitted once
|
||||||
retransmit_slots_sender
|
retransmit_slots_sender.send(updated_slot).unwrap();
|
||||||
.send(vec![(updated_slot, bank0.clone())].into_iter().collect())
|
retransmit_slots_sender.send(updated_slot).unwrap();
|
||||||
.unwrap();
|
|
||||||
retransmit_slots_sender
|
|
||||||
.send(vec![(updated_slot, bank0)].into_iter().collect())
|
|
||||||
.unwrap();
|
|
||||||
BroadcastStage::check_retransmit_signals(
|
BroadcastStage::check_retransmit_signals(
|
||||||
&blockstore,
|
&blockstore,
|
||||||
&retransmit_slots_receiver,
|
&retransmit_slots_receiver,
|
||||||
|
@ -1432,21 +1432,14 @@ impl ReplayStage {
|
|||||||
progress_map.log_propagated_stats(latest_unconfirmed_leader_slot, bank_forks);
|
progress_map.log_propagated_stats(latest_unconfirmed_leader_slot, bank_forks);
|
||||||
skipped_slots_info.last_skipped_slot = poh_slot;
|
skipped_slots_info.last_skipped_slot = poh_slot;
|
||||||
}
|
}
|
||||||
let bank = bank_forks
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.get(latest_unconfirmed_leader_slot)
|
|
||||||
.expect(
|
|
||||||
"In order for propagated check to fail, \
|
|
||||||
latest leader must exist in progress map, and thus also in BankForks",
|
|
||||||
)
|
|
||||||
.clone();
|
|
||||||
|
|
||||||
// Signal retransmit
|
// Signal retransmit
|
||||||
if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) {
|
if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) {
|
||||||
datapoint_info!("replay_stage-retransmit", ("slot", bank.slot(), i64),);
|
datapoint_info!(
|
||||||
let _ = retransmit_slots_sender
|
"replay_stage-retransmit",
|
||||||
.send(vec![(bank.slot(), bank.clone())].into_iter().collect());
|
("slot", latest_unconfirmed_leader_slot, i64),
|
||||||
|
);
|
||||||
|
let _ = retransmit_slots_sender.send(latest_unconfirmed_leader_slot);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user