Replace type alias with newtype for UnprocesedPacketBatches
This commit is contained in:
@ -37,7 +37,6 @@ use {
|
|||||||
},
|
},
|
||||||
solana_streamer::socket::SocketAddrSpace,
|
solana_streamer::socket::SocketAddrSpace,
|
||||||
std::{
|
std::{
|
||||||
collections::VecDeque,
|
|
||||||
sync::{atomic::Ordering, Arc, RwLock},
|
sync::{atomic::Ordering, Arc, RwLock},
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
},
|
},
|
||||||
@ -80,7 +79,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
|
|||||||
let len = 4096;
|
let len = 4096;
|
||||||
let chunk_size = 1024;
|
let chunk_size = 1024;
|
||||||
let batches = to_packet_batches(&vec![tx; len], chunk_size);
|
let batches = to_packet_batches(&vec![tx; len], chunk_size);
|
||||||
let mut packet_batches = VecDeque::new();
|
let mut packet_batches = UnprocessedPacketBatches::new();
|
||||||
for batch in batches {
|
for batch in batches {
|
||||||
let batch_len = batch.packets.len();
|
let batch_len = batch.packets.len();
|
||||||
packet_batches.push_back(DeserializedPacketBatch::new(
|
packet_batches.push_back(DeserializedPacketBatch::new(
|
||||||
|
@ -58,7 +58,7 @@ use {
|
|||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
cmp,
|
cmp,
|
||||||
collections::{HashMap, VecDeque},
|
collections::HashMap,
|
||||||
env,
|
env,
|
||||||
net::{SocketAddr, UdpSocket},
|
net::{SocketAddr, UdpSocket},
|
||||||
sync::{
|
sync::{
|
||||||
@ -959,7 +959,7 @@ impl BankingStage {
|
|||||||
) {
|
) {
|
||||||
let recorder = poh_recorder.lock().unwrap().recorder();
|
let recorder = poh_recorder.lock().unwrap().recorder();
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
|
let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit);
|
||||||
let mut banking_stage_stats = BankingStageStats::new(id);
|
let mut banking_stage_stats = BankingStageStats::new(id);
|
||||||
let qos_service = QosService::new(cost_model, id);
|
let qos_service = QosService::new(cost_model, id);
|
||||||
let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
|
let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
use {
|
use {
|
||||||
|
retain_mut::RetainMut,
|
||||||
solana_perf::packet::{limited_deserialize, Packet, PacketBatch},
|
solana_perf::packet::{limited_deserialize, Packet, PacketBatch},
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
hash::Hash, message::Message, short_vec::decode_shortu16_len, signature::Signature,
|
hash::Hash, message::Message, short_vec::decode_shortu16_len, signature::Signature,
|
||||||
@ -10,8 +11,6 @@ use {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub type UnprocessedPacketBatches = VecDeque<DeserializedPacketBatch>;
|
|
||||||
|
|
||||||
/// hold deserialized messages, as well as computed message_hash and other things needed to create
|
/// hold deserialized messages, as well as computed message_hash and other things needed to create
|
||||||
/// SanitizedTransaction
|
/// SanitizedTransaction
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
@ -34,6 +33,52 @@ pub struct DeserializedPacketBatch {
|
|||||||
pub unprocessed_packets: HashMap<usize, DeserializedPacket>,
|
pub unprocessed_packets: HashMap<usize, DeserializedPacket>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct UnprocessedPacketBatches(VecDeque<DeserializedPacketBatch>);
|
||||||
|
|
||||||
|
impl std::ops::Deref for UnprocessedPacketBatches {
|
||||||
|
type Target = VecDeque<DeserializedPacketBatch>;
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::ops::DerefMut for UnprocessedPacketBatches {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RetainMut<DeserializedPacketBatch> for UnprocessedPacketBatches {
|
||||||
|
fn retain_mut<F>(&mut self, f: F)
|
||||||
|
where
|
||||||
|
F: FnMut(&mut DeserializedPacketBatch) -> bool,
|
||||||
|
{
|
||||||
|
RetainMut::retain_mut(&mut self.0, f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromIterator<DeserializedPacketBatch> for UnprocessedPacketBatches {
|
||||||
|
fn from_iter<I: IntoIterator<Item = DeserializedPacketBatch>>(iter: I) -> Self {
|
||||||
|
Self(iter.into_iter().collect())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for UnprocessedPacketBatches {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UnprocessedPacketBatches {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
UnprocessedPacketBatches(VecDeque::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_capacity(capacity: usize) -> Self {
|
||||||
|
UnprocessedPacketBatches(VecDeque::with_capacity(capacity))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl DeserializedPacketBatch {
|
impl DeserializedPacketBatch {
|
||||||
pub fn new(packet_batch: PacketBatch, packet_indexes: Vec<usize>, forwarded: bool) -> Self {
|
pub fn new(packet_batch: PacketBatch, packet_indexes: Vec<usize>, forwarded: bool) -> Self {
|
||||||
let unprocessed_packets = Self::deserialize_packets(&packet_batch, &packet_indexes);
|
let unprocessed_packets = Self::deserialize_packets(&packet_batch, &packet_indexes);
|
||||||
|
Reference in New Issue
Block a user