From 8e69dd42c1dc219f7a0208d8089991f55ba59908 Mon Sep 17 00:00:00 2001 From: sakridge Date: Tue, 20 Apr 2021 09:37:33 -0700 Subject: [PATCH] Add non-default repair nonce values (#16512) * Track outstanding nonces in repair * Rework outstanding requests to use lru cache and randomize nonces Co-authored-by: Carl --- core/src/lib.rs | 2 + core/src/outstanding_requests.rs | 177 +++++++++++++++++++++++++++++++ core/src/repair_service.rs | 21 +++- core/src/request_response.rs | 5 + core/src/serve_repair.rs | 116 ++++++++++++++++++-- core/src/window_service.rs | 89 ++++++++++++++-- 6 files changed, 388 insertions(+), 22 deletions(-) create mode 100644 core/src/outstanding_requests.rs create mode 100644 core/src/request_response.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index a09a068bca..23b8d9009f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -45,6 +45,7 @@ pub mod ledger_cleanup_service; pub mod non_circulating_supply; pub mod optimistic_confirmation_verifier; pub mod optimistically_confirmed_bank_tracker; +pub mod outstanding_requests; pub mod packet_hasher; pub mod ping_pong; pub mod poh_recorder; @@ -55,6 +56,7 @@ pub mod repair_service; pub mod repair_weight; pub mod repair_weighted_traversal; pub mod replay_stage; +pub mod request_response; mod result; pub mod retransmit_stage; pub mod rewards_recorder_service; diff --git a/core/src/outstanding_requests.rs b/core/src/outstanding_requests.rs new file mode 100644 index 0000000000..fe542e92b1 --- /dev/null +++ b/core/src/outstanding_requests.rs @@ -0,0 +1,177 @@ +use crate::request_response::RequestResponse; +use lru::LruCache; +use rand::{thread_rng, Rng}; +use solana_ledger::shred::Nonce; + +pub const DEFAULT_REQUEST_EXPIRATION_MS: u64 = 60_000; + +pub struct OutstandingRequests { + requests: LruCache>, +} + +impl OutstandingRequests +where + T: RequestResponse, +{ + // Returns boolean indicating whether sufficient time has passed for a request with + // the given timestamp to be made + pub fn add_request(&mut self, request: T, now: u64) -> Nonce { + let num_expected_responses = request.num_expected_responses(); + let nonce = thread_rng().gen_range(0, Nonce::MAX); + self.requests.put( + nonce, + RequestStatus { + expire_timestamp: now + DEFAULT_REQUEST_EXPIRATION_MS, + num_expected_responses, + request, + }, + ); + nonce + } + + pub fn register_response(&mut self, nonce: u32, response: &S, now: u64) -> bool { + let (is_valid, should_delete) = self + .requests + .get_mut(&nonce) + .map(|status| { + if status.num_expected_responses > 0 + && now < status.expire_timestamp + && status.request.verify_response(response) + { + status.num_expected_responses -= 1; + (true, status.num_expected_responses == 0) + } else { + (false, true) + } + }) + .unwrap_or((false, false)); + + if should_delete { + self.requests + .pop(&nonce) + .expect("Delete must delete existing object"); + } + + is_valid + } +} + +impl Default for OutstandingRequests { + fn default() -> Self { + Self { + requests: LruCache::new(16 * 1024), + } + } +} + +pub struct RequestStatus { + expire_timestamp: u64, + num_expected_responses: u32, + request: T, +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + use crate::serve_repair::RepairType; + use solana_ledger::shred::Shred; + use solana_sdk::timing::timestamp; + + #[test] + fn test_add_request() { + let repair_type = RepairType::Orphan(9); + let mut outstanding_requests = OutstandingRequests::default(); + let nonce = outstanding_requests.add_request(repair_type, timestamp()); + let request_status = outstanding_requests.requests.get(&nonce).unwrap(); + assert_eq!(request_status.request, repair_type); + assert_eq!( + request_status.num_expected_responses, + repair_type.num_expected_responses() + ); + } + + #[test] + fn test_timeout_expired_remove() { + let repair_type = RepairType::Orphan(9); + let mut outstanding_requests = OutstandingRequests::default(); + let nonce = outstanding_requests.add_request(repair_type, timestamp()); + let shred = Shred::new_empty_data_shred(); + + let expire_timestamp = outstanding_requests + .requests + .get(&nonce) + .unwrap() + .expire_timestamp; + + assert!(!outstanding_requests.register_response(nonce, &shred, expire_timestamp + 1)); + assert!(outstanding_requests.requests.get(&nonce).is_none()); + } + + #[test] + fn test_register_response() { + let repair_type = RepairType::Orphan(9); + let mut outstanding_requests = OutstandingRequests::default(); + let nonce = outstanding_requests.add_request(repair_type, timestamp()); + + let shred = Shred::new_empty_data_shred(); + let mut expire_timestamp = outstanding_requests + .requests + .get(&nonce) + .unwrap() + .expire_timestamp; + let mut num_expected_responses = outstanding_requests + .requests + .get(&nonce) + .unwrap() + .num_expected_responses; + assert!(num_expected_responses > 1); + + // Response that passes all checks should decrease num_expected_responses + assert!(outstanding_requests.register_response(nonce, &shred, expire_timestamp - 1)); + num_expected_responses -= 1; + assert_eq!( + outstanding_requests + .requests + .get(&nonce) + .unwrap() + .num_expected_responses, + num_expected_responses + ); + + // Response with incorrect nonce is ignored + assert!(!outstanding_requests.register_response(nonce + 1, &shred, expire_timestamp - 1)); + assert!(!outstanding_requests.register_response(nonce + 1, &shred, expire_timestamp)); + assert_eq!( + outstanding_requests + .requests + .get(&nonce) + .unwrap() + .num_expected_responses, + num_expected_responses + ); + + // Response with timestamp over limit should remove status, preventing late + // responses from being accepted + assert!(!outstanding_requests.register_response(nonce, &shred, expire_timestamp)); + assert!(outstanding_requests.requests.get(&nonce).is_none()); + + // If number of outstanding requests hits zero, should also remove the entry + let nonce = outstanding_requests.add_request(repair_type, timestamp()); + expire_timestamp = outstanding_requests + .requests + .get(&nonce) + .unwrap() + .expire_timestamp; + num_expected_responses = outstanding_requests + .requests + .get(&nonce) + .unwrap() + .num_expected_responses; + assert!(num_expected_responses > 1); + for _ in 0..num_expected_responses { + assert!(outstanding_requests.requests.get(&nonce).is_some()); + assert!(outstanding_requests.register_response(nonce, &shred, expire_timestamp - 1)); + } + assert!(outstanding_requests.requests.get(&nonce).is_none()); + } +} diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 7848cf81ed..710e1bb97b 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -4,9 +4,10 @@ use crate::{ cluster_info::ClusterInfo, cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots::ClusterSlots, + outstanding_requests::OutstandingRequests, repair_weight::RepairWeight, result::Result, - serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE}, + serve_repair::{RepairType, ServeRepair}, }; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use solana_ledger::{ @@ -33,6 +34,8 @@ use std::{ pub type DuplicateSlotsResetSender = CrossbeamSender; pub type DuplicateSlotsResetReceiver = CrossbeamReceiver; +pub type OutstandingRepairs = OutstandingRequests; + #[derive(Default, Debug)] pub struct SlotRepairs { highest_shred_index: u64, @@ -145,6 +148,7 @@ impl RepairService { repair_info: RepairInfo, cluster_slots: Arc, verified_vote_receiver: VerifiedVoteReceiver, + outstanding_requests: Arc>, ) -> Self { let t_repair = Builder::new() .name("solana-repair-service".to_string()) @@ -157,6 +161,7 @@ impl RepairService { repair_info, &cluster_slots, verified_vote_receiver, + &outstanding_requests, ) }) .unwrap(); @@ -172,6 +177,7 @@ impl RepairService { repair_info: RepairInfo, cluster_slots: &ClusterSlots, verified_vote_receiver: VerifiedVoteReceiver, + outstanding_requests: &RwLock, ) { let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root()); let serve_repair = ServeRepair::new(cluster_info.clone()); @@ -190,6 +196,7 @@ impl RepairService { let mut set_root_elapsed; let mut get_votes_elapsed; let mut add_votes_elapsed; + let repairs = { let root_bank = repair_info.bank_forks.read().unwrap().root_bank().clone(); let new_root = root_bank.slot(); @@ -261,6 +268,7 @@ impl RepairService { let mut cache = HashMap::new(); let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed"); + let mut outstanding_requests = outstanding_requests.write().unwrap(); repairs.into_iter().for_each(|repair_request| { if let Ok((to, req)) = serve_repair.repair_request( &cluster_slots, @@ -268,6 +276,7 @@ impl RepairService { &mut cache, &mut repair_stats, &repair_info.repair_validators, + &mut outstanding_requests, ) { repair_socket.send_to(&req, to).unwrap_or_else(|e| { info!("{} repair req send_to({}) error {:?}", id, to, e); @@ -467,6 +476,7 @@ impl RepairService { repair_stats: &mut RepairStats, repair_socket: &UdpSocket, repair_validators: &Option>, + outstanding_requests: &RwLock, ) { duplicate_slot_repair_statuses.retain(|slot, status| { Self::update_duplicate_slot_repair_addr( @@ -480,7 +490,9 @@ impl RepairService { let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot); if let Some(repairs) = repairs { + let mut outstanding_requests = outstanding_requests.write().unwrap(); for repair_type in repairs { + let nonce = outstanding_requests.add_request(repair_type, timestamp()); if let Err(e) = Self::serialize_and_send_request( &repair_type, repair_socket, @@ -488,7 +500,7 @@ impl RepairService { &repair_addr, serve_repair, repair_stats, - DEFAULT_NONCE, + nonce, ) { info!( "repair req send_to {} ({}) error {:?}", @@ -688,7 +700,7 @@ mod test { MAX_ORPHANS, MAX_REPAIR_LENGTH, &HashSet::default(), - None + None, ), vec![RepairType::Orphan(2), RepairType::HighestShred(0, 0)] ); @@ -987,6 +999,7 @@ mod test { &mut RepairStats::default(), &UdpSocket::bind("0.0.0.0:0").unwrap(), &None, + &RwLock::new(OutstandingRequests::default()), ); assert!(duplicate_slot_repair_statuses .get(&dead_slot) @@ -1011,6 +1024,7 @@ mod test { &mut RepairStats::default(), &UdpSocket::bind("0.0.0.0:0").unwrap(), &None, + &RwLock::new(OutstandingRequests::default()), ); assert_eq!(duplicate_slot_repair_statuses.len(), 1); assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); @@ -1028,6 +1042,7 @@ mod test { &mut RepairStats::default(), &UdpSocket::bind("0.0.0.0:0").unwrap(), &None, + &RwLock::new(OutstandingRequests::default()), ); assert!(duplicate_slot_repair_statuses.is_empty()); } diff --git a/core/src/request_response.rs b/core/src/request_response.rs new file mode 100644 index 0000000000..429e49b0a8 --- /dev/null +++ b/core/src/request_response.rs @@ -0,0 +1,5 @@ +pub trait RequestResponse { + type Response; + fn num_expected_responses(&self) -> u32; + fn verify_response(&self, response: &Self::Response) -> bool; +} diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 58682ef56f..c0ca3e971d 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -3,13 +3,17 @@ use crate::{ cluster_slots::ClusterSlots, contact_info::ContactInfo, repair_response, - repair_service::RepairStats, + repair_service::{OutstandingRepairs, RepairStats}, + request_response::RequestResponse, result::{Error, Result}, weighted_shuffle::weighted_best, }; use bincode::serialize; use rand::distributions::{Distribution, WeightedIndex}; -use solana_ledger::{blockstore::Blockstore, shred::Nonce}; +use solana_ledger::{ + blockstore::Blockstore, + shred::{Nonce, Shred}, +}; use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_debug; use solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler}; @@ -31,7 +35,6 @@ use std::{ /// the number of slots to respond with when responding to `Orphan` requests pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; -pub const DEFAULT_NONCE: u32 = 42; #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)] pub enum RepairType { @@ -50,6 +53,28 @@ impl RepairType { } } +impl RequestResponse for RepairType { + type Response = Shred; + fn num_expected_responses(&self) -> u32 { + match self { + RepairType::Orphan(_) => (MAX_ORPHAN_REPAIR_RESPONSES + 1) as u32, // run_orphan uses <= MAX_ORPHAN_REPAIR_RESPONSES + RepairType::HighestShred(_, _) => 1, + RepairType::Shred(_, _) => 1, + } + } + fn verify_response(&self, response_shred: &Shred) -> bool { + match self { + RepairType::Orphan(slot) => response_shred.slot() <= *slot, + RepairType::HighestShred(slot, index) => { + response_shred.slot() as u64 == *slot && response_shred.index() as u64 >= *index + } + RepairType::Shred(slot, index) => { + response_shred.slot() as u64 == *slot && response_shred.index() as u64 == *index + } + } + } +} + #[derive(Default)] pub struct ServeRepairStats { pub total_packets: usize, @@ -376,6 +401,7 @@ impl ServeRepair { cache: &mut RepairCache, repair_stats: &mut RepairStats, repair_validators: &Option>, + outstanding_requests: &mut OutstandingRepairs, ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location @@ -395,13 +421,10 @@ impl ServeRepair { }; let n = weighted_index.sample(&mut rand::thread_rng()); let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port + let nonce = + outstanding_requests.add_request(repair_request, solana_sdk::timing::timestamp()); let repair_peer_id = repair_peers[n].id; - let out = self.map_repair_request( - &repair_request, - &repair_peer_id, - repair_stats, - DEFAULT_NONCE, - )?; + let out = self.map_repair_request(&repair_request, &repair_peer_id, repair_stats, nonce)?; Ok((addr, out)) } @@ -592,6 +615,7 @@ mod tests { max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, }, }; + use solana_perf::packet::Packet; use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp}; #[test] @@ -634,6 +658,8 @@ mod tests { nonce, ) .expect("packets"); + let request = RepairType::HighestShred(slot, index); + verify_responses(&request, rv.packets.iter()); let rv: Vec = rv .packets @@ -731,6 +757,8 @@ mod tests { nonce, ) .expect("packets"); + let request = RepairType::Shred(slot, index); + verify_responses(&request, rv.packets.iter()); let rv: Vec = rv .packets .into_iter() @@ -752,12 +780,14 @@ mod tests { let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(me)); let serve_repair = ServeRepair::new(cluster_info.clone()); + let mut outstanding_requests = OutstandingRepairs::default(); let rv = serve_repair.repair_request( &cluster_slots, RepairType::Shred(0, 0), &mut HashMap::new(), &mut RepairStats::default(), &None, + &mut outstanding_requests, ); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); @@ -785,6 +815,7 @@ mod tests { &mut HashMap::new(), &mut RepairStats::default(), &None, + &mut outstanding_requests, ) .unwrap(); assert_eq!(nxt.serve_repair, serve_repair_addr); @@ -818,6 +849,7 @@ mod tests { &mut HashMap::new(), &mut RepairStats::default(), &None, + &mut outstanding_requests, ) .unwrap(); if rv.0 == serve_repair_addr { @@ -886,6 +918,9 @@ mod tests { .collect(); // Verify responses + let request = RepairType::Orphan(slot); + verify_responses(&request, rv.iter()); + let expected: Vec<_> = (slot..slot + num_slots) .rev() .filter_map(|slot| { @@ -994,6 +1029,7 @@ mod tests { &mut HashMap::new(), &mut RepairStats::default(), &trusted_validators, + &mut OutstandingRepairs::default(), ) .is_err()); } @@ -1010,6 +1046,7 @@ mod tests { &mut HashMap::new(), &mut RepairStats::default(), &trusted_validators, + &mut OutstandingRepairs::default(), ) .is_ok()); @@ -1030,7 +1067,68 @@ mod tests { &mut HashMap::new(), &mut RepairStats::default(), &None, + &mut OutstandingRepairs::default(), ) .is_ok()); } + + #[test] + fn test_verify_response() { + let repair = RepairType::Orphan(9); + // Ensure new options are addded to this test + match repair { + RepairType::Orphan(_) => (), + RepairType::HighestShred(_, _) => (), + RepairType::Shred(_, _) => (), + }; + + let slot = 9; + let index = 5; + + // Orphan + let mut shred = Shred::new_empty_data_shred(); + shred.set_slot(slot); + let request = RepairType::Orphan(slot); + assert!(request.verify_response(&shred)); + shred.set_slot(slot - 1); + assert!(request.verify_response(&shred)); + shred.set_slot(slot + 1); + assert!(!request.verify_response(&shred)); + + // HighestShred + shred = Shred::new_empty_data_shred(); + shred.set_slot(slot); + shred.set_index(index); + let request = RepairType::HighestShred(slot, index as u64); + assert!(request.verify_response(&shred)); + shred.set_index(index + 1); + assert!(request.verify_response(&shred)); + shred.set_index(index - 1); + assert!(!request.verify_response(&shred)); + shred.set_slot(slot - 1); + shred.set_index(index); + assert!(!request.verify_response(&shred)); + shred.set_slot(slot + 1); + assert!(!request.verify_response(&shred)); + + // Shred + shred = Shred::new_empty_data_shred(); + shred.set_slot(slot); + shred.set_index(index); + let request = RepairType::Shred(slot, index as u64); + assert!(request.verify_response(&shred)); + shred.set_index(index + 1); + assert!(!request.verify_response(&shred)); + shred.set_slot(slot + 1); + shred.set_index(index); + assert!(!request.verify_response(&shred)); + } + + fn verify_responses<'a>(request: &RepairType, packets: impl Iterator) { + for packet in packets { + let shred_payload = packet.data.to_vec(); + let shred = Shred::new_from_serialized_shred(shred_payload).unwrap(); + request.verify_response(&shred); + } + } } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 1fca87e701..46d875fcd1 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -6,10 +6,10 @@ use crate::{ cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots::ClusterSlots, completed_data_sets_service::CompletedDataSetsSender, + outstanding_requests::OutstandingRequests, repair_response, - repair_service::{RepairInfo, RepairService}, + repair_service::{OutstandingRepairs, RepairInfo, RepairService}, result::{Error, Result}, - serve_repair::DEFAULT_NONCE, }; use crossbeam_channel::{ unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, @@ -28,6 +28,7 @@ use solana_rayon_threadlimit::get_thread_count; use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}; use solana_streamer::streamer::PacketSender; +use std::collections::HashSet; use std::{ net::{SocketAddr, UdpSocket}, sync::atomic::{AtomicBool, Ordering}, @@ -123,13 +124,50 @@ fn run_check_duplicate( Ok(()) } -fn verify_repair(repair_info: &Option) -> bool { - repair_info +fn verify_repair( + outstanding_requests: &mut OutstandingRepairs, + shred: &Shred, + repair_meta: &Option, +) -> bool { + repair_meta .as_ref() - .map(|repair_info| repair_info.nonce == DEFAULT_NONCE) + .map(|repair_meta| { + outstanding_requests.register_response( + repair_meta.nonce, + &shred, + solana_sdk::timing::timestamp(), + ) + }) .unwrap_or(true) } +fn prune_shreds_invalid_repair( + shreds: &mut Vec, + repair_infos: &mut Vec>, + outstanding_requests: &Arc>, +) { + assert_eq!(shreds.len(), repair_infos.len()); + let mut i = 0; + let mut removed = HashSet::new(); + { + let mut outstanding_requests = outstanding_requests.write().unwrap(); + shreds.retain(|shred| { + let should_keep = ( + verify_repair(&mut outstanding_requests, &shred, &repair_infos[i]), + i += 1, + ) + .0; + if !should_keep { + removed.insert(i - 1); + } + should_keep + }); + } + i = 0; + repair_infos.retain(|_repair_info| (!removed.contains(&i), i += 1).0); + assert_eq!(shreds.len(), repair_infos.len()); +} + fn run_insert( shred_receiver: &CrossbeamReceiver<(Vec, Vec>)>, blockstore: &Arc, @@ -137,6 +175,7 @@ fn run_insert( handle_duplicate: F, metrics: &mut BlockstoreInsertionMetrics, completed_data_sets_sender: &CompletedDataSetsSender, + outstanding_requests: &Arc>, ) -> Result<()> where F: Fn(Shred), @@ -148,11 +187,7 @@ where repair_infos.extend(more_repair_infos); } - assert_eq!(shreds.len(), repair_infos.len()); - let mut i = 0; - shreds.retain(|_shred| (verify_repair(&repair_infos[i]), i += 1).0); - repair_infos.retain(|repair_info| verify_repair(&repair_info)); - assert_eq!(shreds.len(), repair_infos.len()); + prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests); let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate( shreds, @@ -334,6 +369,9 @@ impl WindowService { + std::marker::Send + std::marker::Sync, { + let outstanding_requests: Arc> = + Arc::new(RwLock::new(OutstandingRequests::default())); + let bank_forks = Some(repair_info.bank_forks.clone()); let repair_service = RepairService::new( @@ -344,6 +382,7 @@ impl WindowService { repair_info, cluster_slots, verified_vote_receiver, + outstanding_requests.clone(), ); let (insert_sender, insert_receiver) = unbounded(); @@ -364,6 +403,7 @@ impl WindowService { insert_receiver, duplicate_sender, completed_data_sets_sender, + outstanding_requests, ); let t_window = Self::start_recv_window_thread( @@ -424,6 +464,7 @@ impl WindowService { insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, check_duplicate_sender: CrossbeamSender, completed_data_sets_sender: CompletedDataSetsSender, + outstanding_requests: Arc>, ) -> JoinHandle<()> { let exit = exit.clone(); let blockstore = blockstore.clone(); @@ -453,6 +494,7 @@ impl WindowService { &handle_duplicate, &mut metrics, &completed_data_sets_sender, + &outstanding_requests, ) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { break; @@ -720,4 +762,31 @@ mod test { duplicate_shred_slot ); } + + #[test] + fn test_prune_shreds() { + use crate::serve_repair::RepairType; + use std::net::{IpAddr, Ipv4Addr}; + solana_logger::setup(); + let (common, coding) = Shredder::new_coding_shred_header(5, 5, 5, 6, 6, 0, 0); + let shred = Shred::new_empty_from_header(common, DataShredHeader::default(), coding); + let mut shreds = vec![shred.clone(), shred.clone(), shred]; + let _from_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let repair_meta = RepairMeta { + _from_addr, + nonce: 0, + }; + let outstanding_requests = Arc::new(RwLock::new(OutstandingRepairs::default())); + let repair_type = RepairType::Orphan(9); + let nonce = outstanding_requests + .write() + .unwrap() + .add_request(repair_type, timestamp()); + let repair_meta1 = RepairMeta { _from_addr, nonce }; + let mut repair_infos = vec![None, Some(repair_meta), Some(repair_meta1)]; + prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, &outstanding_requests); + assert_eq!(repair_infos.len(), 2); + assert!(repair_infos[0].is_none()); + assert_eq!(repair_infos[1].as_ref().unwrap().nonce, nonce); + } }