sends shreds (instead of packets) to retransmit stage

Working towards channelling through shreds recovered from erasure codes
to retransmit stage.
This commit is contained in:
behzad nouri
2021-08-12 12:04:01 -04:00
parent 6e413331b5
commit 3efccbffab
6 changed files with 148 additions and 210 deletions

View File

@@ -26,15 +26,14 @@ use {
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey},
solana_streamer::streamer::PacketSender,
std::collections::HashSet,
std::{
cmp::Reverse,
collections::HashMap,
net::{SocketAddr, UdpSocket},
ops::Deref,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::Sender,
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
@@ -309,11 +308,10 @@ where
fn recv_window<F>(
blockstore: &Blockstore,
leader_schedule_cache: &LeaderScheduleCache,
bank_forks: &RwLock<BankForks>,
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
retransmit: &PacketSender,
retransmit_sender: &Sender<Vec<Shred>>,
shred_filter: F,
thread_pool: &ThreadPool,
stats: &mut ReceiveWindowStats,
@@ -325,13 +323,9 @@ where
let mut packets = verified_receiver.recv_timeout(timer)?;
packets.extend(verified_receiver.try_iter().flatten());
let now = Instant::now();
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let last_root = blockstore.last_root();
let handle_packet = |packet: &mut Packet| {
let working_bank = bank_forks.read().unwrap().working_bank();
let handle_packet = |packet: &Packet| {
if packet.meta.discard {
inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1);
return None;
@@ -341,20 +335,10 @@ where
// call to `new_from_serialized_shred` is safe.
assert_eq!(packet.data.len(), PACKET_DATA_SIZE);
let serialized_shred = packet.data.to_vec();
let working_bank = Arc::clone(&working_bank);
let shred = match Shred::new_from_serialized_shred(serialized_shred) {
Ok(shred) if shred_filter(&shred, working_bank, last_root) => {
let leader_pubkey =
leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref()));
packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed(leader_pubkey, root_bank.deref());
shred
}
Ok(_) | Err(_) => {
packet.meta.discard = true;
return None;
}
};
let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?;
if !shred_filter(&shred, working_bank.clone(), last_root) {
return None;
}
if packet.meta.repair {
let repair_info = RepairMeta {
_from_addr: packet.meta.addr(),
@@ -368,28 +352,31 @@ where
};
let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
packets
.par_iter_mut()
.flat_map_iter(|packet| packet.packets.iter_mut().filter_map(handle_packet))
.par_iter()
.flat_map_iter(|pkt| pkt.packets.iter().filter_map(handle_packet))
.unzip()
});
stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::<usize>();
// Exclude repair packets from retransmit.
let _ = retransmit_sender.send(
shreds
.iter()
.zip(&repair_infos)
.filter(|(_, repair_info)| repair_info.is_none())
.map(|(shred, _)| shred)
.cloned()
.collect(),
);
stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
stats.num_shreds += shreds.len();
for shred in &shreds {
*stats.slots.entry(shred.slot()).or_default() += 1;
}
insert_shred_sender.send((shreds, repair_infos))?;
stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::<usize>();
for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) {
*stats.addrs.entry(packet.meta.addr()).or_default() += 1;
}
for packets in packets.into_iter() {
if !packets.is_empty() {
// Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit)
let _ = retransmit.send(packets);
}
}
insert_shred_sender.send((shreds, repair_infos))?;
stats.elapsed += now.elapsed();
Ok(())
}
@@ -429,7 +416,7 @@ impl WindowService {
pub(crate) fn new<F>(
blockstore: Arc<Blockstore>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
retransmit: PacketSender,
retransmit_sender: Sender<Vec<Shred>>,
repair_socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
repair_info: RepairInfo,
@@ -476,7 +463,7 @@ impl WindowService {
let t_insert = Self::start_window_insert_thread(
exit.clone(),
blockstore.clone(),
leader_schedule_cache.clone(),
leader_schedule_cache,
insert_receiver,
duplicate_sender,
completed_data_sets_sender,
@@ -490,9 +477,8 @@ impl WindowService {
insert_sender,
verified_receiver,
shred_filter,
leader_schedule_cache,
bank_forks,
retransmit,
retransmit_sender,
);
WindowService {
@@ -598,9 +584,8 @@ impl WindowService {
insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
shred_filter: F,
leader_schedule_cache: Arc<LeaderScheduleCache>,
bank_forks: Arc<RwLock<BankForks>>,
retransmit: PacketSender,
retransmit_sender: Sender<Vec<Shred>>,
) -> JoinHandle<()>
where
F: 'static
@@ -635,11 +620,10 @@ impl WindowService {
};
if let Err(e) = recv_window(
&blockstore,
&leader_schedule_cache,
&bank_forks,
&insert_sender,
&verified_receiver,
&retransmit,
&retransmit_sender,
|shred, bank, last_root| shred_filter(&id, shred, Some(bank), last_root),
&thread_pool,
&mut stats,