diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 1c6c607baf..da4a9a899b 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -37,7 +37,6 @@ use { }, solana_streamer::socket::SocketAddrSpace, std::{ - collections::VecDeque, sync::{atomic::Ordering, Arc, RwLock}, time::{Duration, Instant}, }, @@ -80,7 +79,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let len = 4096; let chunk_size = 1024; let batches = to_packet_batches(&vec![tx; len], chunk_size); - let mut packet_batches = VecDeque::new(); + let mut packet_batches = UnprocessedPacketBatches::new(); for batch in batches { let batch_len = batch.packets.len(); packet_batches.push_back(DeserializedPacketBatch::new( diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index b2494d9c86..5ba171bc3c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -58,7 +58,7 @@ use { }, std::{ cmp, - collections::{HashMap, VecDeque}, + collections::HashMap, env, net::{SocketAddr, UdpSocket}, sync::{ @@ -959,7 +959,7 @@ impl BankingStage { ) { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); + let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); let qos_service = QosService::new(cost_model, id); let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index fd7426295b..3beca7e55e 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -1,4 +1,5 @@ use { + retain_mut::RetainMut, solana_perf::packet::{limited_deserialize, Packet, PacketBatch}, solana_sdk::{ hash::Hash, message::Message, short_vec::decode_shortu16_len, signature::Signature, @@ -10,8 +11,6 @@ use { }, }; -pub type UnprocessedPacketBatches = VecDeque; - /// hold deserialized messages, as well as computed message_hash and other things needed to create /// SanitizedTransaction #[derive(Debug, Default)] @@ -34,6 +33,52 @@ pub struct DeserializedPacketBatch { pub unprocessed_packets: HashMap, } +pub struct UnprocessedPacketBatches(VecDeque); + +impl std::ops::Deref for UnprocessedPacketBatches { + type Target = VecDeque; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for UnprocessedPacketBatches { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl RetainMut for UnprocessedPacketBatches { + fn retain_mut(&mut self, f: F) + where + F: FnMut(&mut DeserializedPacketBatch) -> bool, + { + RetainMut::retain_mut(&mut self.0, f); + } +} + +impl FromIterator for UnprocessedPacketBatches { + fn from_iter>(iter: I) -> Self { + Self(iter.into_iter().collect()) + } +} + +impl Default for UnprocessedPacketBatches { + fn default() -> Self { + Self::new() + } +} + +impl UnprocessedPacketBatches { + pub fn new() -> Self { + UnprocessedPacketBatches(VecDeque::new()) + } + + pub fn with_capacity(capacity: usize) -> Self { + UnprocessedPacketBatches(VecDeque::with_capacity(capacity)) + } +} + impl DeserializedPacketBatch { pub fn new(packet_batch: PacketBatch, packet_indexes: Vec, forwarded: bool) -> Self { let unprocessed_packets = Self::deserialize_packets(&packet_batch, &packet_indexes);