Add flag to mark a packet as discarded (#6427)
This commit is contained in:
		| @@ -69,8 +69,8 @@ fn retransmit( | |||||||
|     let mut compute_turbine_peers_total = 0; |     let mut compute_turbine_peers_total = 0; | ||||||
|     for mut packets in packet_v { |     for mut packets in packet_v { | ||||||
|         for packet in packets.packets.iter_mut() { |         for packet in packets.packets.iter_mut() { | ||||||
|             // skip repair packets |             // skip discarded packets and repair packets | ||||||
|             if packet.meta.repair { |             if packet.meta.discard || packet.meta.repair { | ||||||
|                 total_packets -= 1; |                 total_packets -= 1; | ||||||
|                 continue; |                 continue; | ||||||
|             } |             } | ||||||
|   | |||||||
| @@ -9,7 +9,7 @@ use crate::result::{Error, Result}; | |||||||
| use crate::service::Service; | use crate::service::Service; | ||||||
| use crate::shred::Shred; | use crate::shred::Shred; | ||||||
| use crate::streamer::{PacketReceiver, PacketSender}; | use crate::streamer::{PacketReceiver, PacketSender}; | ||||||
| use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}; | use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; | ||||||
| use rayon::ThreadPool; | use rayon::ThreadPool; | ||||||
| use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; | use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; | ||||||
| use solana_rayon_threadlimit::get_thread_count; | use solana_rayon_threadlimit::get_thread_count; | ||||||
| @@ -23,8 +23,6 @@ use std::sync::{Arc, RwLock}; | |||||||
| use std::thread::{self, Builder, JoinHandle}; | use std::thread::{self, Builder, JoinHandle}; | ||||||
| use std::time::{Duration, Instant}; | use std::time::{Duration, Instant}; | ||||||
|  |  | ||||||
| pub const NUM_THREADS: u32 = 10; |  | ||||||
|  |  | ||||||
| fn verify_shred_slot(shred: &Shred, root: u64) -> bool { | fn verify_shred_slot(shred: &Shred, root: u64) -> bool { | ||||||
|     if shred.is_data() { |     if shred.is_data() { | ||||||
|         // Only data shreds have parent information |         // Only data shreds have parent information | ||||||
| @@ -89,40 +87,26 @@ where | |||||||
|     inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len()); |     inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len()); | ||||||
|  |  | ||||||
|     let last_root = blocktree.last_root(); |     let last_root = blocktree.last_root(); | ||||||
|     let (shreds, packets_ix): (Vec<_>, Vec<_>) = thread_pool.install(|| { |     let shreds: Vec<_> = thread_pool.install(|| { | ||||||
|         packets |         packets | ||||||
|             .packets |             .packets | ||||||
|             .par_iter_mut() |             .par_iter_mut() | ||||||
|             .enumerate() |             .filter_map(|packet| { | ||||||
|             .filter_map(|(i, packet)| { |  | ||||||
|                 if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) { |                 if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) { | ||||||
|                     if shred_filter(&shred, last_root) { |                     if shred_filter(&shred, last_root) { | ||||||
|                         packet.meta.slot = shred.slot(); |                         packet.meta.slot = shred.slot(); | ||||||
|                         packet.meta.seed = shred.seed(); |                         packet.meta.seed = shred.seed(); | ||||||
|                         Some((shred, i)) |                         Some(shred) | ||||||
|                     } else { |                     } else { | ||||||
|  |                         packet.meta.discard = true; | ||||||
|                         None |                         None | ||||||
|                     } |                     } | ||||||
|                 } else { |                 } else { | ||||||
|  |                     packet.meta.discard = true; | ||||||
|                     None |                     None | ||||||
|                 } |                 } | ||||||
|             }) |             }) | ||||||
|             .unzip() |             .collect() | ||||||
|     }); |  | ||||||
|     // to avoid lookups into the `packets_ix` vec, this block manually tracks where we are in that vec |  | ||||||
|     // and since `packets.packets.retain` and the `packets_ix` vec are both in order, |  | ||||||
|     // we should be able to automatically drop any packets in the index gaps. |  | ||||||
|     let mut retain_ix = 0; |  | ||||||
|     let mut i = 0; |  | ||||||
|     packets.packets.retain(|_| { |  | ||||||
|         let retain = if !packets_ix.is_empty() && i == packets_ix[retain_ix] { |  | ||||||
|             retain_ix = (packets_ix.len() - 1).min(retain_ix + 1); |  | ||||||
|             true |  | ||||||
|         } else { |  | ||||||
|             false |  | ||||||
|         }; |  | ||||||
|         i += 1; |  | ||||||
|         retain |  | ||||||
|     }); |     }); | ||||||
|  |  | ||||||
|     trace!("{:?} shreds from packets", shreds.len()); |     trace!("{:?} shreds from packets", shreds.len()); | ||||||
|   | |||||||
| @@ -13,6 +13,7 @@ pub struct Meta { | |||||||
|     pub size: usize, |     pub size: usize, | ||||||
|     pub forward: bool, |     pub forward: bool, | ||||||
|     pub repair: bool, |     pub repair: bool, | ||||||
|  |     pub discard: bool, | ||||||
|     pub addr: [u16; 8], |     pub addr: [u16; 8], | ||||||
|     pub port: u16, |     pub port: u16, | ||||||
|     pub v6: bool, |     pub v6: bool, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user