diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 5d19f631da..153d762a59 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -4,13 +4,12 @@ use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; use crate::leader_schedule_cache::LeaderScheduleCache; -use crate::packet::Packets; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; use crate::shred::Shred; use crate::streamer::{PacketReceiver, PacketSender}; -use rayon::iter::{ParallelBridge, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}; use rayon::ThreadPool; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; use solana_runtime::bank::Bank; @@ -77,18 +76,18 @@ where let now = Instant::now(); inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len()); - let (shreds, packets): (Vec<_>, Vec<_>) = thread_pool.install(|| { + let (shreds, packets_ix): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets .packets - .drain(..) - .par_bridge() - .filter_map(|mut packet| { + .par_iter_mut() + .enumerate() + .filter_map(|(i, packet)| { if let Ok(s) = bincode::deserialize(&packet.data) { let shred: Shred = s; if shred_filter(&shred, &packet.data) { packet.meta.slot = shred.slot(); packet.meta.seed = shred.seed(); - Some((shred, packet)) + Some((shred, i)) } else { None } @@ -98,7 +97,21 @@ where }) .unzip() }); - let packets = Packets::new(packets); + // to avoid lookups into the `packets_ix` vec, this block manually tracks where we are in that vec + // and since `packets.packets.retain` and the `packets_ix` vec are both in order, + // we should be able to automatically drop any packets in the index gaps. + let mut retain_ix = 0; + let mut i = 0; + packets.packets.retain(|_| { + let retain = if !packets_ix.is_empty() && i == packets_ix[retain_ix] { + retain_ix = (packets_ix.len() - 1).min(retain_ix + 1); + true + } else { + false + }; + i += 1; + retain + }); trace!("{:?} shreds from packets", shreds.len()); @@ -258,22 +271,29 @@ impl Service for WindowService { mod test { use super::*; use crate::bank_forks::BankForks; + use crate::blocktree::tests::make_many_slot_entries; use crate::blocktree::{get_tmp_ledger_path, Blocktree}; use crate::cluster_info::{ClusterInfo, Node}; + use crate::contact_info::ContactInfo; use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, Entry}; use crate::genesis_utils::create_genesis_block_with_leader; + use crate::packet::{Packet, Packets}; use crate::recycler::Recycler; + use crate::repair_service::RepairSlotRange; use crate::service::Service; use crate::shred::Shredder; use crate::streamer::{receiver, responder}; + use rand::seq::SliceRandom; + use rand::thread_rng; use solana_runtime::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::channel; + use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; + use std::thread::sleep; use std::time::Duration; fn local_entries_to_shred(entries: Vec, keypair: &Arc) -> Vec { @@ -539,4 +559,68 @@ mod test { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); let _ignored = remove_dir_all(&blocktree_path); } + + fn make_test_window( + packet_receiver: Receiver, + exit: Arc, + ) -> WindowService { + let blocktree_path = get_tmp_ledger_path!(); + let (blocktree, _, _) = Blocktree::open_with_signal(&blocktree_path) + .expect("Expected to be able to open database ledger"); + + let blocktree = Arc::new(blocktree); + let (retransmit_sender, _retransmit_receiver) = channel(); + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( + ContactInfo::new_localhost(&Pubkey::default(), 0), + ))); + let repair_sock = Arc::new(UdpSocket::bind(socketaddr_any!()).unwrap()); + let window = WindowService::new( + blocktree, + cluster_info, + packet_receiver, + retransmit_sender, + repair_sock, + &exit, + RepairStrategy::RepairRange(RepairSlotRange { start: 0, end: 0 }), + &Arc::new(LeaderScheduleCache::default()), + |_, _, _, _| true, + ); + window + } + + #[test] + fn test_recv_window() { + let (packet_sender, packet_receiver) = channel(); + let exit = Arc::new(AtomicBool::new(false)); + let window = make_test_window(packet_receiver, exit.clone()); + // send 5 slots worth of data to the window + let (shreds, _) = make_many_slot_entries(0, 5, 10); + let packets: Vec<_> = shreds + .into_iter() + .map(|s| { + let mut p = Packet::default(); + p.data + .copy_from_slice(&mut bincode::serialize(&s).unwrap().as_ref()); + p + }) + .collect(); + let mut packets = Packets::new(packets); + packet_sender.send(packets.clone()).unwrap(); + sleep(Duration::from_millis(500)); + + // add some empty packets to the data set. These should fail to deserialize + packets.packets.append(&mut vec![Packet::default(); 10]); + packets.packets.shuffle(&mut thread_rng()); + packet_sender.send(packets.clone()).unwrap(); + sleep(Duration::from_millis(500)); + + // send 1 empty packet that cannot deserialize into a shred + packet_sender + .send(Packets::new(vec![Packet::default(); 1])) + .unwrap(); + sleep(Duration::from_millis(500)); + + exit.store(true, Ordering::Relaxed); + window.join().unwrap(); + } }