* Rename Packets to PacketBatch (#21794)
(cherry picked from commit 254ef3e7b6
)
# Conflicts:
# core/src/verified_vote_packets.rs
* resolve conflicts
Co-authored-by: Justin Starry <justin@solana.com>
This commit is contained in:
@ -14,7 +14,7 @@ use {
|
||||
solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE},
|
||||
solana_measure::measure::Measure,
|
||||
solana_perf::{
|
||||
packet::{limited_deserialize, Packet, Packets},
|
||||
packet::{limited_deserialize, Packet, PacketBatch},
|
||||
recycler::Recycler,
|
||||
},
|
||||
solana_runtime::bank::Bank,
|
||||
@ -23,7 +23,7 @@ use {
|
||||
pubkey::Pubkey,
|
||||
timing::timestamp,
|
||||
},
|
||||
solana_streamer::streamer::{self, PacketReceiver},
|
||||
solana_streamer::streamer::{self, PacketBatchReceiver},
|
||||
std::{
|
||||
collections::HashSet,
|
||||
net::UdpSocket,
|
||||
@ -197,7 +197,7 @@ impl AncestorHashesService {
|
||||
/// Listen for responses to our ancestors hashes repair requests
|
||||
fn run_responses_listener(
|
||||
ancestor_hashes_request_statuses: Arc<DashMap<Slot, DeadSlotAncestorRequestStatus>>,
|
||||
response_receiver: PacketReceiver,
|
||||
response_receiver: PacketBatchReceiver,
|
||||
blockstore: Arc<Blockstore>,
|
||||
outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>>,
|
||||
exit: Arc<AtomicBool>,
|
||||
@ -240,7 +240,7 @@ impl AncestorHashesService {
|
||||
/// Process messages from the network
|
||||
fn process_new_packets_from_channel(
|
||||
ancestor_hashes_request_statuses: &DashMap<Slot, DeadSlotAncestorRequestStatus>,
|
||||
response_receiver: &PacketReceiver,
|
||||
response_receiver: &PacketBatchReceiver,
|
||||
blockstore: &Blockstore,
|
||||
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
|
||||
stats: &mut AncestorHashesResponsesStats,
|
||||
@ -249,17 +249,17 @@ impl AncestorHashesService {
|
||||
retryable_slots_sender: &RetryableSlotsSender,
|
||||
) -> Result<()> {
|
||||
let timeout = Duration::new(1, 0);
|
||||
let mut responses = vec![response_receiver.recv_timeout(timeout)?];
|
||||
let mut total_packets = responses[0].packets.len();
|
||||
let mut packet_batches = vec![response_receiver.recv_timeout(timeout)?];
|
||||
let mut total_packets = packet_batches[0].packets.len();
|
||||
|
||||
let mut dropped_packets = 0;
|
||||
while let Ok(more) = response_receiver.try_recv() {
|
||||
total_packets += more.packets.len();
|
||||
while let Ok(batch) = response_receiver.try_recv() {
|
||||
total_packets += batch.packets.len();
|
||||
if total_packets < *max_packets {
|
||||
// Drop the rest in the channel in case of DOS
|
||||
responses.push(more);
|
||||
packet_batches.push(batch);
|
||||
} else {
|
||||
dropped_packets += more.packets.len();
|
||||
dropped_packets += batch.packets.len();
|
||||
}
|
||||
}
|
||||
|
||||
@ -267,10 +267,10 @@ impl AncestorHashesService {
|
||||
stats.total_packets += total_packets;
|
||||
|
||||
let mut time = Measure::start("ancestor_hashes::handle_packets");
|
||||
for response in responses {
|
||||
Self::process_single_packets(
|
||||
for packet_batch in packet_batches {
|
||||
Self::process_packet_batch(
|
||||
ancestor_hashes_request_statuses,
|
||||
response,
|
||||
packet_batch,
|
||||
stats,
|
||||
outstanding_requests,
|
||||
blockstore,
|
||||
@ -289,16 +289,16 @@ impl AncestorHashesService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn process_single_packets(
|
||||
fn process_packet_batch(
|
||||
ancestor_hashes_request_statuses: &DashMap<Slot, DeadSlotAncestorRequestStatus>,
|
||||
packets: Packets,
|
||||
packet_batch: PacketBatch,
|
||||
stats: &mut AncestorHashesResponsesStats,
|
||||
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
|
||||
blockstore: &Blockstore,
|
||||
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
|
||||
retryable_slots_sender: &RetryableSlotsSender,
|
||||
) {
|
||||
packets.packets.iter().for_each(|packet| {
|
||||
packet_batch.packets.iter().for_each(|packet| {
|
||||
let decision = Self::verify_and_process_ancestor_response(
|
||||
packet,
|
||||
ancestor_hashes_request_statuses,
|
||||
@ -871,7 +871,7 @@ mod test {
|
||||
t_listen: JoinHandle<()>,
|
||||
exit: Arc<AtomicBool>,
|
||||
responder_info: ContactInfo,
|
||||
response_receiver: PacketReceiver,
|
||||
response_receiver: PacketBatchReceiver,
|
||||
correct_bank_hashes: HashMap<Slot, Hash>,
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,7 @@ use {
|
||||
solana_perf::{
|
||||
cuda_runtime::PinnedVec,
|
||||
data_budget::DataBudget,
|
||||
packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH},
|
||||
packet::{limited_deserialize, Packet, PacketBatch, PACKETS_PER_BATCH},
|
||||
perf_libs,
|
||||
},
|
||||
solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder},
|
||||
@ -64,10 +64,10 @@ use {
|
||||
};
|
||||
|
||||
/// (packets, valid_indexes, forwarded)
|
||||
/// Set of packets with a list of which are valid and if this batch has been forwarded.
|
||||
type PacketsAndOffsets = (Packets, Vec<usize>, bool);
|
||||
/// Batch of packets with a list of which are valid and if this batch has been forwarded.
|
||||
type PacketBatchAndOffsets = (PacketBatch, Vec<usize>, bool);
|
||||
|
||||
pub type UnprocessedPackets = VecDeque<PacketsAndOffsets>;
|
||||
pub type UnprocessedPacketBatches = VecDeque<PacketBatchAndOffsets>;
|
||||
|
||||
/// Transaction forwarding
|
||||
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2;
|
||||
@ -255,9 +255,9 @@ impl BankingStage {
|
||||
pub fn new(
|
||||
cluster_info: &Arc<ClusterInfo>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: ReplayVoteSender,
|
||||
cost_model: Arc<RwLock<CostModel>>,
|
||||
@ -278,9 +278,9 @@ impl BankingStage {
|
||||
fn new_num_threads(
|
||||
cluster_info: &Arc<ClusterInfo>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
num_threads: u32,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: ReplayVoteSender,
|
||||
@ -346,12 +346,12 @@ impl BankingStage {
|
||||
}
|
||||
|
||||
fn filter_valid_packets_for_forwarding<'a>(
|
||||
all_packets: impl Iterator<Item = &'a PacketsAndOffsets>,
|
||||
packet_batches: impl Iterator<Item = &'a PacketBatchAndOffsets>,
|
||||
) -> Vec<&'a Packet> {
|
||||
all_packets
|
||||
.filter(|(_p, _indexes, forwarded)| !forwarded)
|
||||
.flat_map(|(p, valid_indexes, _forwarded)| {
|
||||
valid_indexes.iter().map(move |x| &p.packets[*x])
|
||||
packet_batches
|
||||
.filter(|(_batch, _indexes, forwarded)| !forwarded)
|
||||
.flat_map(|(batch, valid_indexes, _forwarded)| {
|
||||
valid_indexes.iter().map(move |x| &batch.packets[*x])
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
@ -359,10 +359,10 @@ impl BankingStage {
|
||||
fn forward_buffered_packets(
|
||||
socket: &std::net::UdpSocket,
|
||||
tpu_forwards: &std::net::SocketAddr,
|
||||
unprocessed_packets: &UnprocessedPackets,
|
||||
buffered_packet_batches: &UnprocessedPacketBatches,
|
||||
data_budget: &DataBudget,
|
||||
) -> std::io::Result<()> {
|
||||
let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter());
|
||||
let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
|
||||
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
|
||||
const INTERVAL_MS: u64 = 100;
|
||||
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
|
||||
@ -385,7 +385,7 @@ impl BankingStage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Returns whether the given `Packets` has any more remaining unprocessed
|
||||
// Returns whether the given `PacketBatch` has any more remaining unprocessed
|
||||
// transactions
|
||||
fn update_buffered_packets_with_new_unprocessed(
|
||||
original_unprocessed_indexes: &mut Vec<usize>,
|
||||
@ -404,7 +404,7 @@ impl BankingStage {
|
||||
my_pubkey: &Pubkey,
|
||||
max_tx_ingestion_ns: u128,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
buffered_packets: &mut UnprocessedPackets,
|
||||
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
test_fn: Option<impl Fn()>,
|
||||
@ -412,19 +412,21 @@ impl BankingStage {
|
||||
recorder: &TransactionRecorder,
|
||||
qos_service: &Arc<QosService>,
|
||||
) {
|
||||
let mut rebuffered_packets_len = 0;
|
||||
let mut rebuffered_packet_count = 0;
|
||||
let mut new_tx_count = 0;
|
||||
let buffered_len = buffered_packets.len();
|
||||
let buffered_packet_batches_len = buffered_packet_batches.len();
|
||||
let mut proc_start = Measure::start("consume_buffered_process");
|
||||
let mut reached_end_of_slot = None;
|
||||
|
||||
buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes, _forwarded)| {
|
||||
buffered_packet_batches.retain_mut(|buffered_packet_batch_and_offsets| {
|
||||
let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) =
|
||||
buffered_packet_batch_and_offsets;
|
||||
if let Some((next_leader, bank)) = &reached_end_of_slot {
|
||||
// We've hit the end of this slot, no need to perform more processing,
|
||||
// just filter the remaining packets for the invalid (e.g. too old) ones
|
||||
let new_unprocessed_indexes = Self::filter_unprocessed_packets(
|
||||
bank,
|
||||
msgs,
|
||||
packet_batch,
|
||||
original_unprocessed_indexes,
|
||||
my_pubkey,
|
||||
*next_leader,
|
||||
@ -446,7 +448,7 @@ impl BankingStage {
|
||||
&working_bank,
|
||||
&bank_creation_time,
|
||||
recorder,
|
||||
msgs,
|
||||
packet_batch,
|
||||
original_unprocessed_indexes.to_owned(),
|
||||
transaction_status_sender.clone(),
|
||||
gossip_vote_sender,
|
||||
@ -467,7 +469,7 @@ impl BankingStage {
|
||||
new_tx_count += processed;
|
||||
// Out of the buffered packets just retried, collect any still unprocessed
|
||||
// transactions in this batch for forwarding
|
||||
rebuffered_packets_len += new_unprocessed_indexes.len();
|
||||
rebuffered_packet_count += new_unprocessed_indexes.len();
|
||||
let has_more_unprocessed_transactions =
|
||||
Self::update_buffered_packets_with_new_unprocessed(
|
||||
original_unprocessed_indexes,
|
||||
@ -478,7 +480,7 @@ impl BankingStage {
|
||||
}
|
||||
has_more_unprocessed_transactions
|
||||
} else {
|
||||
rebuffered_packets_len += original_unprocessed_indexes.len();
|
||||
rebuffered_packet_count += original_unprocessed_indexes.len();
|
||||
// `original_unprocessed_indexes` must have remaining packets to process
|
||||
// if not yet processed.
|
||||
assert!(Self::packet_has_more_unprocessed_transactions(
|
||||
@ -494,7 +496,7 @@ impl BankingStage {
|
||||
debug!(
|
||||
"@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}",
|
||||
timestamp(),
|
||||
buffered_len,
|
||||
buffered_packet_batches_len,
|
||||
proc_start.as_ms(),
|
||||
new_tx_count,
|
||||
(new_tx_count as f32) / (proc_start.as_s())
|
||||
@ -505,7 +507,7 @@ impl BankingStage {
|
||||
.fetch_add(proc_start.as_us(), Ordering::Relaxed);
|
||||
banking_stage_stats
|
||||
.rebuffered_packets_count
|
||||
.fetch_add(rebuffered_packets_len, Ordering::Relaxed);
|
||||
.fetch_add(rebuffered_packet_count, Ordering::Relaxed);
|
||||
banking_stage_stats
|
||||
.consumed_buffered_packets_count
|
||||
.fetch_add(new_tx_count, Ordering::Relaxed);
|
||||
@ -550,7 +552,7 @@ impl BankingStage {
|
||||
socket: &std::net::UdpSocket,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
cluster_info: &ClusterInfo,
|
||||
buffered_packets: &mut UnprocessedPackets,
|
||||
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
||||
forward_option: &ForwardOption,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
@ -592,7 +594,7 @@ impl BankingStage {
|
||||
my_pubkey,
|
||||
max_tx_ingestion_ns,
|
||||
poh_recorder,
|
||||
buffered_packets,
|
||||
buffered_packet_batches,
|
||||
transaction_status_sender,
|
||||
gossip_vote_sender,
|
||||
None::<Box<dyn Fn()>>,
|
||||
@ -605,7 +607,7 @@ impl BankingStage {
|
||||
Self::handle_forwarding(
|
||||
forward_option,
|
||||
cluster_info,
|
||||
buffered_packets,
|
||||
buffered_packet_batches,
|
||||
poh_recorder,
|
||||
socket,
|
||||
false,
|
||||
@ -616,7 +618,7 @@ impl BankingStage {
|
||||
Self::handle_forwarding(
|
||||
forward_option,
|
||||
cluster_info,
|
||||
buffered_packets,
|
||||
buffered_packet_batches,
|
||||
poh_recorder,
|
||||
socket,
|
||||
true,
|
||||
@ -631,7 +633,7 @@ impl BankingStage {
|
||||
fn handle_forwarding(
|
||||
forward_option: &ForwardOption,
|
||||
cluster_info: &ClusterInfo,
|
||||
buffered_packets: &mut UnprocessedPackets,
|
||||
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
socket: &UdpSocket,
|
||||
hold: bool,
|
||||
@ -640,7 +642,7 @@ impl BankingStage {
|
||||
let addr = match forward_option {
|
||||
ForwardOption::NotForward => {
|
||||
if !hold {
|
||||
buffered_packets.clear();
|
||||
buffered_packet_batches.clear();
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -653,20 +655,20 @@ impl BankingStage {
|
||||
Some(addr) => addr,
|
||||
None => return,
|
||||
};
|
||||
let _ = Self::forward_buffered_packets(socket, &addr, buffered_packets, data_budget);
|
||||
let _ = Self::forward_buffered_packets(socket, &addr, buffered_packet_batches, data_budget);
|
||||
if hold {
|
||||
buffered_packets.retain(|(_, index, _)| !index.is_empty());
|
||||
for (_, _, forwarded) in buffered_packets.iter_mut() {
|
||||
buffered_packet_batches.retain(|(_, index, _)| !index.is_empty());
|
||||
for (_, _, forwarded) in buffered_packet_batches.iter_mut() {
|
||||
*forwarded = true;
|
||||
}
|
||||
} else {
|
||||
buffered_packets.clear();
|
||||
buffered_packet_batches.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn process_loop(
|
||||
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
|
||||
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
cluster_info: &ClusterInfo,
|
||||
recv_start: &mut Instant,
|
||||
@ -681,17 +683,17 @@ impl BankingStage {
|
||||
) {
|
||||
let recorder = poh_recorder.lock().unwrap().recorder();
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let mut buffered_packets = VecDeque::with_capacity(batch_limit);
|
||||
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
|
||||
let banking_stage_stats = BankingStageStats::new(id);
|
||||
loop {
|
||||
let my_pubkey = cluster_info.id();
|
||||
while !buffered_packets.is_empty() {
|
||||
while !buffered_packet_batches.is_empty() {
|
||||
let decision = Self::process_buffered_packets(
|
||||
&my_pubkey,
|
||||
&socket,
|
||||
poh_recorder,
|
||||
cluster_info,
|
||||
&mut buffered_packets,
|
||||
&mut buffered_packet_batches,
|
||||
&forward_option,
|
||||
transaction_status_sender.clone(),
|
||||
&gossip_vote_sender,
|
||||
@ -709,7 +711,7 @@ impl BankingStage {
|
||||
}
|
||||
}
|
||||
|
||||
let recv_timeout = if !buffered_packets.is_empty() {
|
||||
let recv_timeout = if !buffered_packet_batches.is_empty() {
|
||||
// If packets are buffered, let's wait for less time on recv from the channel.
|
||||
// This helps detect the next leader faster, and processing the buffered
|
||||
// packets quickly
|
||||
@ -729,7 +731,7 @@ impl BankingStage {
|
||||
batch_limit,
|
||||
transaction_status_sender.clone(),
|
||||
&gossip_vote_sender,
|
||||
&mut buffered_packets,
|
||||
&mut buffered_packet_batches,
|
||||
&banking_stage_stats,
|
||||
duplicates,
|
||||
&recorder,
|
||||
@ -1076,7 +1078,7 @@ impl BankingStage {
|
||||
// with their packet indexes.
|
||||
#[allow(clippy::needless_collect)]
|
||||
fn transactions_from_packets(
|
||||
msgs: &Packets,
|
||||
packet_batch: &PacketBatch,
|
||||
transaction_indexes: &[usize],
|
||||
feature_set: &Arc<feature_set::FeatureSet>,
|
||||
votes_only: bool,
|
||||
@ -1084,7 +1086,7 @@ impl BankingStage {
|
||||
transaction_indexes
|
||||
.iter()
|
||||
.filter_map(|tx_index| {
|
||||
let p = &msgs.packets[*tx_index];
|
||||
let p = &packet_batch.packets[*tx_index];
|
||||
if votes_only && !p.meta.is_simple_vote_tx {
|
||||
return None;
|
||||
}
|
||||
@ -1149,7 +1151,7 @@ impl BankingStage {
|
||||
bank: &Arc<Bank>,
|
||||
bank_creation_time: &Instant,
|
||||
poh: &TransactionRecorder,
|
||||
msgs: &Packets,
|
||||
packet_batch: &PacketBatch,
|
||||
packet_indexes: Vec<usize>,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
@ -1158,7 +1160,7 @@ impl BankingStage {
|
||||
) -> (usize, usize, Vec<usize>) {
|
||||
let mut packet_conversion_time = Measure::start("packet_conversion");
|
||||
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
|
||||
msgs,
|
||||
packet_batch,
|
||||
&packet_indexes,
|
||||
&bank.feature_set,
|
||||
bank.vote_only_bank(),
|
||||
@ -1214,7 +1216,7 @@ impl BankingStage {
|
||||
|
||||
fn filter_unprocessed_packets(
|
||||
bank: &Arc<Bank>,
|
||||
msgs: &Packets,
|
||||
packet_batch: &PacketBatch,
|
||||
transaction_indexes: &[usize],
|
||||
my_pubkey: &Pubkey,
|
||||
next_leader: Option<Pubkey>,
|
||||
@ -1232,7 +1234,7 @@ impl BankingStage {
|
||||
let mut unprocessed_packet_conversion_time =
|
||||
Measure::start("unprocessed_packet_conversion");
|
||||
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
|
||||
msgs,
|
||||
packet_batch,
|
||||
transaction_indexes,
|
||||
&bank.feature_set,
|
||||
bank.vote_only_bank(),
|
||||
@ -1282,7 +1284,7 @@ impl BankingStage {
|
||||
/// Process the incoming packets
|
||||
fn process_packets(
|
||||
my_pubkey: &Pubkey,
|
||||
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
|
||||
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
poh: &Arc<Mutex<PohRecorder>>,
|
||||
recv_start: &mut Instant,
|
||||
recv_timeout: Duration,
|
||||
@ -1290,41 +1292,41 @@ impl BankingStage {
|
||||
batch_limit: usize,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
buffered_packets: &mut UnprocessedPackets,
|
||||
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
||||
banking_stage_stats: &BankingStageStats,
|
||||
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
|
||||
recorder: &TransactionRecorder,
|
||||
qos_service: &Arc<QosService>,
|
||||
) -> Result<(), RecvTimeoutError> {
|
||||
let mut recv_time = Measure::start("process_packets_recv");
|
||||
let mms = verified_receiver.recv_timeout(recv_timeout)?;
|
||||
let packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
|
||||
recv_time.stop();
|
||||
|
||||
let mms_len = mms.len();
|
||||
let count: usize = mms.iter().map(|x| x.packets.len()).sum();
|
||||
let packet_batches_len = packet_batches.len();
|
||||
let packet_count: usize = packet_batches.iter().map(|x| x.packets.len()).sum();
|
||||
debug!(
|
||||
"@{:?} process start stalled for: {:?}ms txs: {} id: {}",
|
||||
timestamp(),
|
||||
duration_as_ms(&recv_start.elapsed()),
|
||||
count,
|
||||
packet_count,
|
||||
id,
|
||||
);
|
||||
inc_new_counter_debug!("banking_stage-transactions_received", count);
|
||||
inc_new_counter_debug!("banking_stage-transactions_received", packet_count);
|
||||
let mut proc_start = Measure::start("process_packets_transactions_process");
|
||||
let mut new_tx_count = 0;
|
||||
|
||||
let mut mms_iter = mms.into_iter();
|
||||
let mut packet_batch_iter = packet_batches.into_iter();
|
||||
let mut dropped_packets_count = 0;
|
||||
let mut dropped_packet_batches_count = 0;
|
||||
let mut newly_buffered_packets_count = 0;
|
||||
while let Some(msgs) = mms_iter.next() {
|
||||
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
|
||||
while let Some(packet_batch) = packet_batch_iter.next() {
|
||||
let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets);
|
||||
let poh_recorder_bank = poh.lock().unwrap().get_poh_recorder_bank();
|
||||
let working_bank_start = poh_recorder_bank.working_bank_start();
|
||||
if PohRecorder::get_working_bank_if_not_expired(&working_bank_start).is_none() {
|
||||
Self::push_unprocessed(
|
||||
buffered_packets,
|
||||
msgs,
|
||||
buffered_packet_batches,
|
||||
packet_batch,
|
||||
packet_indexes,
|
||||
&mut dropped_packet_batches_count,
|
||||
&mut dropped_packets_count,
|
||||
@ -1347,7 +1349,7 @@ impl BankingStage {
|
||||
working_bank,
|
||||
bank_creation_time,
|
||||
recorder,
|
||||
&msgs,
|
||||
&packet_batch,
|
||||
packet_indexes,
|
||||
transaction_status_sender.clone(),
|
||||
gossip_vote_sender,
|
||||
@ -1359,8 +1361,8 @@ impl BankingStage {
|
||||
|
||||
// Collect any unprocessed transactions in this batch for forwarding
|
||||
Self::push_unprocessed(
|
||||
buffered_packets,
|
||||
msgs,
|
||||
buffered_packet_batches,
|
||||
packet_batch,
|
||||
unprocessed_indexes,
|
||||
&mut dropped_packet_batches_count,
|
||||
&mut dropped_packets_count,
|
||||
@ -1376,19 +1378,19 @@ impl BankingStage {
|
||||
let next_leader = poh.lock().unwrap().next_slot_leader();
|
||||
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
|
||||
#[allow(clippy::while_let_on_iterator)]
|
||||
while let Some(msgs) = mms_iter.next() {
|
||||
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
|
||||
while let Some(packet_batch) = packet_batch_iter.next() {
|
||||
let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets);
|
||||
let unprocessed_indexes = Self::filter_unprocessed_packets(
|
||||
working_bank,
|
||||
&msgs,
|
||||
&packet_batch,
|
||||
&packet_indexes,
|
||||
my_pubkey,
|
||||
next_leader,
|
||||
banking_stage_stats,
|
||||
);
|
||||
Self::push_unprocessed(
|
||||
buffered_packets,
|
||||
msgs,
|
||||
buffered_packet_batches,
|
||||
packet_batch,
|
||||
unprocessed_indexes,
|
||||
&mut dropped_packet_batches_count,
|
||||
&mut dropped_packets_count,
|
||||
@ -1409,11 +1411,11 @@ impl BankingStage {
|
||||
debug!(
|
||||
"@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}",
|
||||
timestamp(),
|
||||
mms_len,
|
||||
packet_batches_len,
|
||||
proc_start.as_ms(),
|
||||
new_tx_count,
|
||||
(new_tx_count as f32) / (proc_start.as_s()),
|
||||
count,
|
||||
packet_count,
|
||||
id,
|
||||
);
|
||||
banking_stage_stats
|
||||
@ -1421,7 +1423,7 @@ impl BankingStage {
|
||||
.fetch_add(proc_start.as_us(), Ordering::Relaxed);
|
||||
banking_stage_stats
|
||||
.process_packets_count
|
||||
.fetch_add(count, Ordering::Relaxed);
|
||||
.fetch_add(packet_count, Ordering::Relaxed);
|
||||
banking_stage_stats
|
||||
.new_tx_count
|
||||
.fetch_add(new_tx_count, Ordering::Relaxed);
|
||||
@ -1436,9 +1438,12 @@ impl BankingStage {
|
||||
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
|
||||
banking_stage_stats
|
||||
.current_buffered_packet_batches_count
|
||||
.swap(buffered_packets.len(), Ordering::Relaxed);
|
||||
.swap(buffered_packet_batches.len(), Ordering::Relaxed);
|
||||
banking_stage_stats.current_buffered_packets_count.swap(
|
||||
buffered_packets.iter().map(|packets| packets.1.len()).sum(),
|
||||
buffered_packet_batches
|
||||
.iter()
|
||||
.map(|packets| packets.1.len())
|
||||
.sum(),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
*recv_start = Instant::now();
|
||||
@ -1446,8 +1451,8 @@ impl BankingStage {
|
||||
}
|
||||
|
||||
fn push_unprocessed(
|
||||
unprocessed_packets: &mut UnprocessedPackets,
|
||||
packets: Packets,
|
||||
unprocessed_packet_batches: &mut UnprocessedPacketBatches,
|
||||
packet_batch: PacketBatch,
|
||||
mut packet_indexes: Vec<usize>,
|
||||
dropped_packet_batches_count: &mut usize,
|
||||
dropped_packets_count: &mut usize,
|
||||
@ -1462,7 +1467,7 @@ impl BankingStage {
|
||||
let mut duplicates = duplicates.lock().unwrap();
|
||||
let (cache, hasher) = duplicates.deref_mut();
|
||||
packet_indexes.retain(|i| {
|
||||
let packet_hash = hasher.hash_packet(&packets.packets[*i]);
|
||||
let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]);
|
||||
match cache.get_mut(&packet_hash) {
|
||||
Some(_hash) => false,
|
||||
None => {
|
||||
@ -1483,14 +1488,14 @@ impl BankingStage {
|
||||
);
|
||||
}
|
||||
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
|
||||
if unprocessed_packets.len() >= batch_limit {
|
||||
if unprocessed_packet_batches.len() >= batch_limit {
|
||||
*dropped_packet_batches_count += 1;
|
||||
if let Some(dropped_batch) = unprocessed_packets.pop_front() {
|
||||
if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() {
|
||||
*dropped_packets_count += dropped_batch.1.len();
|
||||
}
|
||||
}
|
||||
*newly_buffered_packets_count += packet_indexes.len();
|
||||
unprocessed_packets.push_back((packets, packet_indexes, false));
|
||||
unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1560,7 +1565,7 @@ mod tests {
|
||||
get_tmp_ledger_path,
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
},
|
||||
solana_perf::packet::to_packets_chunked,
|
||||
solana_perf::packet::to_packet_batches,
|
||||
solana_poh::{
|
||||
poh_recorder::{create_test_recorder, Record, WorkingBankEntry},
|
||||
poh_service::PohService,
|
||||
@ -1697,7 +1702,9 @@ mod tests {
|
||||
Blockstore::destroy(&ledger_path).unwrap();
|
||||
}
|
||||
|
||||
pub fn convert_from_old_verified(mut with_vers: Vec<(Packets, Vec<u8>)>) -> Vec<Packets> {
|
||||
pub fn convert_from_old_verified(
|
||||
mut with_vers: Vec<(PacketBatch, Vec<u8>)>,
|
||||
) -> Vec<PacketBatch> {
|
||||
with_vers.iter_mut().for_each(|(b, v)| {
|
||||
b.packets
|
||||
.iter_mut()
|
||||
@ -1769,18 +1776,18 @@ mod tests {
|
||||
let tx_anf = system_transaction::transfer(&keypair, &to3, 1, start_hash);
|
||||
|
||||
// send 'em over
|
||||
let packets = to_packets_chunked(&[tx_no_ver, tx_anf, tx], 3);
|
||||
let packet_batches = to_packet_batches(&[tx_no_ver, tx_anf, tx], 3);
|
||||
|
||||
// glad they all fit
|
||||
assert_eq!(packets.len(), 1);
|
||||
assert_eq!(packet_batches.len(), 1);
|
||||
|
||||
let packets = packets
|
||||
let packet_batches = packet_batches
|
||||
.into_iter()
|
||||
.map(|packets| (packets, vec![0u8, 1u8, 1u8]))
|
||||
.map(|batch| (batch, vec![0u8, 1u8, 1u8]))
|
||||
.collect();
|
||||
let packets = convert_from_old_verified(packets);
|
||||
let packet_batches = convert_from_old_verified(packet_batches);
|
||||
verified_sender // no_ver, anf, tx
|
||||
.send(packets)
|
||||
.send(packet_batches)
|
||||
.unwrap();
|
||||
|
||||
drop(verified_sender);
|
||||
@ -1846,24 +1853,24 @@ mod tests {
|
||||
let tx =
|
||||
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, genesis_config.hash());
|
||||
|
||||
let packets = to_packets_chunked(&[tx], 1);
|
||||
let packets = packets
|
||||
let packet_batches = to_packet_batches(&[tx], 1);
|
||||
let packet_batches = packet_batches
|
||||
.into_iter()
|
||||
.map(|packets| (packets, vec![1u8]))
|
||||
.map(|batch| (batch, vec![1u8]))
|
||||
.collect();
|
||||
let packets = convert_from_old_verified(packets);
|
||||
verified_sender.send(packets).unwrap();
|
||||
let packet_batches = convert_from_old_verified(packet_batches);
|
||||
verified_sender.send(packet_batches).unwrap();
|
||||
|
||||
// Process a second batch that uses the same from account, so conflicts with above TX
|
||||
let tx =
|
||||
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, genesis_config.hash());
|
||||
let packets = to_packets_chunked(&[tx], 1);
|
||||
let packets = packets
|
||||
let packet_batches = to_packet_batches(&[tx], 1);
|
||||
let packet_batches = packet_batches
|
||||
.into_iter()
|
||||
.map(|packets| (packets, vec![1u8]))
|
||||
.map(|batch| (batch, vec![1u8]))
|
||||
.collect();
|
||||
let packets = convert_from_old_verified(packets);
|
||||
verified_sender.send(packets).unwrap();
|
||||
let packet_batches = convert_from_old_verified(packet_batches);
|
||||
verified_sender.send(packet_batches).unwrap();
|
||||
|
||||
let (vote_sender, vote_receiver) = unbounded();
|
||||
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
|
||||
@ -2381,9 +2388,9 @@ mod tests {
|
||||
fn test_filter_valid_packets() {
|
||||
solana_logger::setup();
|
||||
|
||||
let mut all_packets = (0..16)
|
||||
let mut packet_batches = (0..16)
|
||||
.map(|packets_id| {
|
||||
let packets = Packets::new(
|
||||
let packet_batch = PacketBatch::new(
|
||||
(0..32)
|
||||
.map(|packet_id| {
|
||||
let mut p = Packet::default();
|
||||
@ -2395,11 +2402,11 @@ mod tests {
|
||||
let valid_indexes = (0..32)
|
||||
.filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None })
|
||||
.collect_vec();
|
||||
(packets, valid_indexes, false)
|
||||
(packet_batch, valid_indexes, false)
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter());
|
||||
let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter());
|
||||
|
||||
assert_eq!(result.len(), 256);
|
||||
|
||||
@ -2413,8 +2420,8 @@ mod tests {
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
all_packets[0].2 = true;
|
||||
let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter());
|
||||
packet_batches[0].2 = true;
|
||||
let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter());
|
||||
assert_eq!(result.len(), 240);
|
||||
}
|
||||
|
||||
@ -2666,12 +2673,15 @@ mod tests {
|
||||
setup_conflicting_transactions(&ledger_path);
|
||||
let recorder = poh_recorder.lock().unwrap().recorder();
|
||||
let num_conflicting_transactions = transactions.len();
|
||||
let mut packets_vec = to_packets_chunked(&transactions, num_conflicting_transactions);
|
||||
assert_eq!(packets_vec.len(), 1);
|
||||
assert_eq!(packets_vec[0].packets.len(), num_conflicting_transactions);
|
||||
let all_packets = packets_vec.pop().unwrap();
|
||||
let mut buffered_packets: UnprocessedPackets = vec![(
|
||||
all_packets,
|
||||
let mut packet_batches = to_packet_batches(&transactions, num_conflicting_transactions);
|
||||
assert_eq!(packet_batches.len(), 1);
|
||||
assert_eq!(
|
||||
packet_batches[0].packets.len(),
|
||||
num_conflicting_transactions
|
||||
);
|
||||
let packet_batch = packet_batches.pop().unwrap();
|
||||
let mut buffered_packet_batches: UnprocessedPacketBatches = vec![(
|
||||
packet_batch,
|
||||
(0..num_conflicting_transactions).into_iter().collect(),
|
||||
false,
|
||||
)]
|
||||
@ -2687,7 +2697,7 @@ mod tests {
|
||||
&Pubkey::default(),
|
||||
max_tx_processing_ns,
|
||||
&poh_recorder,
|
||||
&mut buffered_packets,
|
||||
&mut buffered_packet_batches,
|
||||
None,
|
||||
&gossip_vote_sender,
|
||||
None::<Box<dyn Fn()>>,
|
||||
@ -2695,7 +2705,10 @@ mod tests {
|
||||
&recorder,
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
);
|
||||
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
|
||||
assert_eq!(
|
||||
buffered_packet_batches[0].1.len(),
|
||||
num_conflicting_transactions
|
||||
);
|
||||
// When the poh recorder has a bank, should process all non conflicting buffered packets.
|
||||
// Processes one packet per iteration of the loop
|
||||
for num_expected_unprocessed in (0..num_conflicting_transactions).rev() {
|
||||
@ -2704,7 +2717,7 @@ mod tests {
|
||||
&Pubkey::default(),
|
||||
max_tx_processing_ns,
|
||||
&poh_recorder,
|
||||
&mut buffered_packets,
|
||||
&mut buffered_packet_batches,
|
||||
None,
|
||||
&gossip_vote_sender,
|
||||
None::<Box<dyn Fn()>>,
|
||||
@ -2713,9 +2726,9 @@ mod tests {
|
||||
&Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))),
|
||||
);
|
||||
if num_expected_unprocessed == 0 {
|
||||
assert!(buffered_packets.is_empty())
|
||||
assert!(buffered_packet_batches.is_empty())
|
||||
} else {
|
||||
assert_eq!(buffered_packets[0].1.len(), num_expected_unprocessed);
|
||||
assert_eq!(buffered_packet_batches[0].1.len(), num_expected_unprocessed);
|
||||
}
|
||||
}
|
||||
poh_recorder
|
||||
@ -2735,12 +2748,12 @@ mod tests {
|
||||
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
|
||||
setup_conflicting_transactions(&ledger_path);
|
||||
let num_conflicting_transactions = transactions.len();
|
||||
let packets_vec = to_packets_chunked(&transactions, 1);
|
||||
assert_eq!(packets_vec.len(), num_conflicting_transactions);
|
||||
for single_packets in &packets_vec {
|
||||
assert_eq!(single_packets.packets.len(), 1);
|
||||
let packet_batches = to_packet_batches(&transactions, 1);
|
||||
assert_eq!(packet_batches.len(), num_conflicting_transactions);
|
||||
for single_packet_batch in &packet_batches {
|
||||
assert_eq!(single_packet_batch.packets.len(), 1);
|
||||
}
|
||||
let mut buffered_packets: UnprocessedPackets = packets_vec
|
||||
let mut buffered_packet_batches: UnprocessedPacketBatches = packet_batches
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|single_packets| (single_packets, vec![0], false))
|
||||
@ -2754,8 +2767,8 @@ mod tests {
|
||||
continue_receiver.recv().unwrap();
|
||||
});
|
||||
// When the poh recorder has a bank, it should process all non conflicting buffered packets.
|
||||
// Because each conflicting transaction is in it's own `Packet` within `packets_vec`, then
|
||||
// each iteration of this loop will process one element of `packets_vec`per iteration of the
|
||||
// Because each conflicting transaction is in it's own `Packet` within a `PacketBatch`, then
|
||||
// each iteration of this loop will process one element of the batch per iteration of the
|
||||
// loop.
|
||||
let interrupted_iteration = 1;
|
||||
poh_recorder.lock().unwrap().set_bank(&bank);
|
||||
@ -2770,7 +2783,7 @@ mod tests {
|
||||
&Pubkey::default(),
|
||||
std::u128::MAX,
|
||||
&poh_recorder_,
|
||||
&mut buffered_packets,
|
||||
&mut buffered_packet_batches,
|
||||
None,
|
||||
&gossip_vote_sender,
|
||||
test_fn,
|
||||
@ -2782,13 +2795,13 @@ mod tests {
|
||||
// Check everything is correct. All indexes after `interrupted_iteration`
|
||||
// should still be unprocessed
|
||||
assert_eq!(
|
||||
buffered_packets.len(),
|
||||
packets_vec[interrupted_iteration + 1..].len()
|
||||
buffered_packet_batches.len(),
|
||||
packet_batches[interrupted_iteration + 1..].len()
|
||||
);
|
||||
for ((remaining_unprocessed_packet, _, _forwarded), original_packet) in
|
||||
buffered_packets
|
||||
buffered_packet_batches
|
||||
.iter()
|
||||
.zip(&packets_vec[interrupted_iteration + 1..])
|
||||
.zip(&packet_batches[interrupted_iteration + 1..])
|
||||
{
|
||||
assert_eq!(
|
||||
remaining_unprocessed_packet.packets[0],
|
||||
@ -2823,10 +2836,10 @@ mod tests {
|
||||
#[test]
|
||||
fn test_forwarder_budget() {
|
||||
solana_logger::setup();
|
||||
// Create `Packets` with 1 unprocessed element
|
||||
let single_element_packets = Packets::new(vec![Packet::default()]);
|
||||
let mut unprocessed_packets: UnprocessedPackets =
|
||||
vec![(single_element_packets, vec![0], false)]
|
||||
// Create `PacketBatch` with 1 unprocessed packet
|
||||
let single_packet_batch = PacketBatch::new(vec![Packet::default()]);
|
||||
let mut unprocessed_packets: UnprocessedPacketBatches =
|
||||
vec![(single_packet_batch, vec![0], false)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
@ -2872,14 +2885,16 @@ mod tests {
|
||||
#[test]
|
||||
fn test_push_unprocessed_batch_limit() {
|
||||
solana_logger::setup();
|
||||
// Create `Packets` with 2 unprocessed elements
|
||||
let new_packets = Packets::new(vec![Packet::default(); 2]);
|
||||
let mut unprocessed_packets: UnprocessedPackets =
|
||||
vec![(new_packets, vec![0, 1], false)].into_iter().collect();
|
||||
// Create `PacketBatch` with 2 unprocessed packets
|
||||
let new_packet_batch = PacketBatch::new(vec![Packet::default(); 2]);
|
||||
let mut unprocessed_packets: UnprocessedPacketBatches =
|
||||
vec![(new_packet_batch, vec![0, 1], false)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
// Set the limit to 2
|
||||
let batch_limit = 2;
|
||||
// Create some new unprocessed packets
|
||||
let new_packets = Packets::new(vec![Packet::default()]);
|
||||
// Create new unprocessed packets and add to a batch
|
||||
let new_packet_batch = PacketBatch::new(vec![Packet::default()]);
|
||||
let packet_indexes = vec![];
|
||||
|
||||
let duplicates = Arc::new(Mutex::new((
|
||||
@ -2894,7 +2909,7 @@ mod tests {
|
||||
// packets are not added to the unprocessed queue
|
||||
BankingStage::push_unprocessed(
|
||||
&mut unprocessed_packets,
|
||||
new_packets.clone(),
|
||||
new_packet_batch.clone(),
|
||||
packet_indexes,
|
||||
&mut dropped_packet_batches_count,
|
||||
&mut dropped_packets_count,
|
||||
@ -2913,7 +2928,7 @@ mod tests {
|
||||
let packet_indexes = vec![0];
|
||||
BankingStage::push_unprocessed(
|
||||
&mut unprocessed_packets,
|
||||
new_packets,
|
||||
new_packet_batch,
|
||||
packet_indexes.clone(),
|
||||
&mut dropped_packet_batches_count,
|
||||
&mut dropped_packets_count,
|
||||
@ -2929,7 +2944,7 @@ mod tests {
|
||||
|
||||
// Because we've reached the batch limit, old unprocessed packets are
|
||||
// dropped and the new one is appended to the end
|
||||
let new_packets = Packets::new(vec![Packet::from_data(
|
||||
let new_packet_batch = PacketBatch::new(vec![Packet::from_data(
|
||||
Some(&SocketAddr::from(([127, 0, 0, 1], 8001))),
|
||||
42,
|
||||
)
|
||||
@ -2937,7 +2952,7 @@ mod tests {
|
||||
assert_eq!(unprocessed_packets.len(), batch_limit);
|
||||
BankingStage::push_unprocessed(
|
||||
&mut unprocessed_packets,
|
||||
new_packets.clone(),
|
||||
new_packet_batch.clone(),
|
||||
packet_indexes.clone(),
|
||||
&mut dropped_packet_batches_count,
|
||||
&mut dropped_packets_count,
|
||||
@ -2947,7 +2962,10 @@ mod tests {
|
||||
&banking_stage_stats,
|
||||
);
|
||||
assert_eq!(unprocessed_packets.len(), 2);
|
||||
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
|
||||
assert_eq!(
|
||||
unprocessed_packets[1].0.packets[0],
|
||||
new_packet_batch.packets[0]
|
||||
);
|
||||
assert_eq!(dropped_packet_batches_count, 1);
|
||||
assert_eq!(dropped_packets_count, 2);
|
||||
assert_eq!(newly_buffered_packets_count, 2);
|
||||
@ -2955,7 +2973,7 @@ mod tests {
|
||||
// Check duplicates are dropped (newly buffered shouldn't change)
|
||||
BankingStage::push_unprocessed(
|
||||
&mut unprocessed_packets,
|
||||
new_packets.clone(),
|
||||
new_packet_batch.clone(),
|
||||
packet_indexes,
|
||||
&mut dropped_packet_batches_count,
|
||||
&mut dropped_packets_count,
|
||||
@ -2965,7 +2983,10 @@ mod tests {
|
||||
&banking_stage_stats,
|
||||
);
|
||||
assert_eq!(unprocessed_packets.len(), 2);
|
||||
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
|
||||
assert_eq!(
|
||||
unprocessed_packets[1].0.packets[0],
|
||||
new_packet_batch.packets[0]
|
||||
);
|
||||
assert_eq!(dropped_packet_batches_count, 1);
|
||||
assert_eq!(dropped_packets_count, 2);
|
||||
assert_eq!(newly_buffered_packets_count, 2);
|
||||
@ -2988,19 +3009,19 @@ mod tests {
|
||||
fn make_test_packets(
|
||||
transactions: Vec<Transaction>,
|
||||
vote_indexes: Vec<usize>,
|
||||
) -> (Packets, Vec<usize>) {
|
||||
) -> (PacketBatch, Vec<usize>) {
|
||||
let capacity = transactions.len();
|
||||
let mut packets = Packets::with_capacity(capacity);
|
||||
let mut packet_batch = PacketBatch::with_capacity(capacity);
|
||||
let mut packet_indexes = Vec::with_capacity(capacity);
|
||||
packets.packets.resize(capacity, Packet::default());
|
||||
packet_batch.packets.resize(capacity, Packet::default());
|
||||
for (index, tx) in transactions.iter().enumerate() {
|
||||
Packet::populate_packet(&mut packets.packets[index], None, tx).ok();
|
||||
Packet::populate_packet(&mut packet_batch.packets[index], None, tx).ok();
|
||||
packet_indexes.push(index);
|
||||
}
|
||||
for index in vote_indexes.iter() {
|
||||
packets.packets[*index].meta.is_simple_vote_tx = true;
|
||||
packet_batch.packets[*index].meta.is_simple_vote_tx = true;
|
||||
}
|
||||
(packets, packet_indexes)
|
||||
(packet_batch, packet_indexes)
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -3022,12 +3043,12 @@ mod tests {
|
||||
// packets with no votes
|
||||
{
|
||||
let vote_indexes = vec![];
|
||||
let (packets, packet_indexes) =
|
||||
let (packet_batch, packet_indexes) =
|
||||
make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes);
|
||||
|
||||
let mut votes_only = false;
|
||||
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
|
||||
&packets,
|
||||
&packet_batch,
|
||||
&packet_indexes,
|
||||
&Arc::new(FeatureSet::default()),
|
||||
votes_only,
|
||||
@ -3037,7 +3058,7 @@ mod tests {
|
||||
|
||||
votes_only = true;
|
||||
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
|
||||
&packets,
|
||||
&packet_batch,
|
||||
&packet_indexes,
|
||||
&Arc::new(FeatureSet::default()),
|
||||
votes_only,
|
||||
@ -3049,14 +3070,14 @@ mod tests {
|
||||
// packets with some votes
|
||||
{
|
||||
let vote_indexes = vec![0, 2];
|
||||
let (packets, packet_indexes) = make_test_packets(
|
||||
let (packet_batch, packet_indexes) = make_test_packets(
|
||||
vec![vote_tx.clone(), transfer_tx, vote_tx.clone()],
|
||||
vote_indexes,
|
||||
);
|
||||
|
||||
let mut votes_only = false;
|
||||
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
|
||||
&packets,
|
||||
&packet_batch,
|
||||
&packet_indexes,
|
||||
&Arc::new(FeatureSet::default()),
|
||||
votes_only,
|
||||
@ -3066,7 +3087,7 @@ mod tests {
|
||||
|
||||
votes_only = true;
|
||||
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
|
||||
&packets,
|
||||
&packet_batch,
|
||||
&packet_indexes,
|
||||
&Arc::new(FeatureSet::default()),
|
||||
votes_only,
|
||||
@ -3078,14 +3099,14 @@ mod tests {
|
||||
// packets with all votes
|
||||
{
|
||||
let vote_indexes = vec![0, 1, 2];
|
||||
let (packets, packet_indexes) = make_test_packets(
|
||||
let (packet_batch, packet_indexes) = make_test_packets(
|
||||
vec![vote_tx.clone(), vote_tx.clone(), vote_tx],
|
||||
vote_indexes,
|
||||
);
|
||||
|
||||
let mut votes_only = false;
|
||||
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
|
||||
&packets,
|
||||
&packet_batch,
|
||||
&packet_indexes,
|
||||
&Arc::new(FeatureSet::default()),
|
||||
votes_only,
|
||||
@ -3095,7 +3116,7 @@ mod tests {
|
||||
|
||||
votes_only = true;
|
||||
let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
|
||||
&packets,
|
||||
&packet_batch,
|
||||
&packet_indexes,
|
||||
&Arc::new(FeatureSet::default()),
|
||||
votes_only,
|
||||
|
@ -22,7 +22,7 @@ use {
|
||||
solana_ledger::blockstore::Blockstore,
|
||||
solana_measure::measure::Measure,
|
||||
solana_metrics::inc_new_counter_debug,
|
||||
solana_perf::packet::{self, Packets},
|
||||
solana_perf::packet::{self, PacketBatch},
|
||||
solana_poh::poh_recorder::PohRecorder,
|
||||
solana_rpc::{
|
||||
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
|
||||
@ -296,7 +296,7 @@ impl ClusterInfoVoteListener {
|
||||
pub fn new(
|
||||
exit: &Arc<AtomicBool>,
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
verified_packets_sender: CrossbeamSender<Vec<Packets>>,
|
||||
verified_packets_sender: CrossbeamSender<Vec<PacketBatch>>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
vote_tracker: Arc<VoteTracker>,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
@ -393,14 +393,14 @@ impl ClusterInfoVoteListener {
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn verify_votes(votes: Vec<Transaction>) -> (Vec<Transaction>, Vec<VerifiedVoteMetadata>) {
|
||||
let mut msgs = packet::to_packets_chunked(&votes, 1);
|
||||
let mut packet_batches = packet::to_packet_batches(&votes, 1);
|
||||
|
||||
// Votes should already be filtered by this point.
|
||||
let reject_non_vote = false;
|
||||
sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote);
|
||||
sigverify::ed25519_verify_cpu(&mut packet_batches, reject_non_vote);
|
||||
|
||||
let (vote_txs, vote_metadata) = izip!(votes.into_iter(), msgs,)
|
||||
.filter_map(|(vote_tx, packet)| {
|
||||
let (vote_txs, vote_metadata) = izip!(votes.into_iter(), packet_batches)
|
||||
.filter_map(|(vote_tx, packet_batch)| {
|
||||
let (vote, vote_account_key) = vote_transaction::parse_vote_transaction(&vote_tx)
|
||||
.and_then(|(vote_account_key, vote, _)| {
|
||||
if vote.slots.is_empty() {
|
||||
@ -410,16 +410,16 @@ impl ClusterInfoVoteListener {
|
||||
}
|
||||
})?;
|
||||
|
||||
// to_packets_chunked() above split into 1 packet long chunks
|
||||
assert_eq!(packet.packets.len(), 1);
|
||||
if !packet.packets[0].meta.discard {
|
||||
// to_packet_batches() above splits into 1 packet long batches
|
||||
assert_eq!(packet_batch.packets.len(), 1);
|
||||
if !packet_batch.packets[0].meta.discard {
|
||||
if let Some(signature) = vote_tx.signatures.first().cloned() {
|
||||
return Some((
|
||||
vote_tx,
|
||||
VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet,
|
||||
packet_batch,
|
||||
signature,
|
||||
},
|
||||
));
|
||||
@ -435,7 +435,7 @@ impl ClusterInfoVoteListener {
|
||||
exit: Arc<AtomicBool>,
|
||||
verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver,
|
||||
poh_recorder: Arc<Mutex<PohRecorder>>,
|
||||
verified_packets_sender: &CrossbeamSender<Vec<Packets>>,
|
||||
verified_packets_sender: &CrossbeamSender<Vec<PacketBatch>>,
|
||||
) -> Result<()> {
|
||||
let mut verified_vote_packets = VerifiedVotePackets::default();
|
||||
let mut time_since_lock = Instant::now();
|
||||
@ -483,7 +483,7 @@ impl ClusterInfoVoteListener {
|
||||
fn check_for_leader_bank_and_send_votes(
|
||||
bank_vote_sender_state_option: &mut Option<BankVoteSenderState>,
|
||||
current_working_bank: Arc<Bank>,
|
||||
verified_packets_sender: &CrossbeamSender<Vec<Packets>>,
|
||||
verified_packets_sender: &CrossbeamSender<Vec<PacketBatch>>,
|
||||
verified_vote_packets: &VerifiedVotePackets,
|
||||
) -> Result<()> {
|
||||
// We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS`
|
||||
@ -983,9 +983,9 @@ mod tests {
|
||||
use bincode::serialized_size;
|
||||
info!("max vote size {}", serialized_size(&vote_tx).unwrap());
|
||||
|
||||
let msgs = packet::to_packets_chunked(&[vote_tx], 1); // panics if won't fit
|
||||
let packet_batches = packet::to_packet_batches(&[vote_tx], 1); // panics if won't fit
|
||||
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(packet_batches.len(), 1);
|
||||
}
|
||||
|
||||
fn run_vote_contains_authorized_voter(hash: Option<Hash>) {
|
||||
@ -1815,7 +1815,7 @@ mod tests {
|
||||
fn verify_packets_len(packets: &[VerifiedVoteMetadata], ref_value: usize) {
|
||||
let num_packets: usize = packets
|
||||
.iter()
|
||||
.map(|vote_metadata| vote_metadata.packet.packets.len())
|
||||
.map(|vote_metadata| vote_metadata.packet_batch.packets.len())
|
||||
.sum();
|
||||
assert_eq!(num_packets, ref_value);
|
||||
}
|
||||
|
@ -6,10 +6,10 @@ use {
|
||||
result::{Error, Result},
|
||||
},
|
||||
solana_metrics::{inc_new_counter_debug, inc_new_counter_info},
|
||||
solana_perf::{packet::PacketsRecycler, recycler::Recycler},
|
||||
solana_perf::{packet::PacketBatchRecycler, recycler::Recycler},
|
||||
solana_poh::poh_recorder::PohRecorder,
|
||||
solana_sdk::clock::DEFAULT_TICKS_PER_SLOT,
|
||||
solana_streamer::streamer::{self, PacketReceiver, PacketSender},
|
||||
solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender},
|
||||
std::{
|
||||
net::UdpSocket,
|
||||
sync::{
|
||||
@ -34,7 +34,7 @@ impl FetchStage {
|
||||
exit: &Arc<AtomicBool>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
coalesce_ms: u64,
|
||||
) -> (Self, PacketReceiver, PacketReceiver) {
|
||||
) -> (Self, PacketBatchReceiver, PacketBatchReceiver) {
|
||||
let (sender, receiver) = channel();
|
||||
let (vote_sender, vote_receiver) = channel();
|
||||
(
|
||||
@ -58,8 +58,8 @@ impl FetchStage {
|
||||
tpu_forwards_sockets: Vec<UdpSocket>,
|
||||
tpu_vote_sockets: Vec<UdpSocket>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
sender: &PacketSender,
|
||||
vote_sender: &PacketSender,
|
||||
sender: &PacketBatchSender,
|
||||
vote_sender: &PacketBatchSender,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
coalesce_ms: u64,
|
||||
) -> Self {
|
||||
@ -79,18 +79,18 @@ impl FetchStage {
|
||||
}
|
||||
|
||||
fn handle_forwarded_packets(
|
||||
recvr: &PacketReceiver,
|
||||
sendr: &PacketSender,
|
||||
recvr: &PacketBatchReceiver,
|
||||
sendr: &PacketBatchSender,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
) -> Result<()> {
|
||||
let msgs = recvr.recv()?;
|
||||
let mut len = msgs.packets.len();
|
||||
let mut batch = vec![msgs];
|
||||
while let Ok(more) = recvr.try_recv() {
|
||||
len += more.packets.len();
|
||||
batch.push(more);
|
||||
let packet_batch = recvr.recv()?;
|
||||
let mut num_packets = packet_batch.packets.len();
|
||||
let mut packet_batches = vec![packet_batch];
|
||||
while let Ok(packet_batch) = recvr.try_recv() {
|
||||
num_packets += packet_batch.packets.len();
|
||||
packet_batches.push(packet_batch);
|
||||
// Read at most 1K transactions in a loop
|
||||
if len > 1024 {
|
||||
if num_packets > 1024 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -100,15 +100,15 @@ impl FetchStage {
|
||||
.unwrap()
|
||||
.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET.saturating_mul(DEFAULT_TICKS_PER_SLOT))
|
||||
{
|
||||
inc_new_counter_debug!("fetch_stage-honor_forwards", len);
|
||||
for packets in batch {
|
||||
inc_new_counter_debug!("fetch_stage-honor_forwards", num_packets);
|
||||
for packet_batch in packet_batches {
|
||||
#[allow(clippy::question_mark)]
|
||||
if sendr.send(packets).is_err() {
|
||||
if sendr.send(packet_batch).is_err() {
|
||||
return Err(Error::Send);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
inc_new_counter_info!("fetch_stage-discard_forwards", len);
|
||||
inc_new_counter_info!("fetch_stage-discard_forwards", num_packets);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -119,12 +119,12 @@ impl FetchStage {
|
||||
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
|
||||
tpu_vote_sockets: Vec<Arc<UdpSocket>>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
sender: &PacketSender,
|
||||
vote_sender: &PacketSender,
|
||||
sender: &PacketBatchSender,
|
||||
vote_sender: &PacketBatchSender,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
coalesce_ms: u64,
|
||||
) -> Self {
|
||||
let recycler: PacketsRecycler = Recycler::warmed(1000, 1024);
|
||||
let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);
|
||||
|
||||
let tpu_threads = sockets.into_iter().map(|socket| {
|
||||
streamer::receiver(
|
||||
|
@ -27,7 +27,7 @@ use {
|
||||
shred::{Shred, ShredType},
|
||||
},
|
||||
solana_measure::measure::Measure,
|
||||
solana_perf::packet::Packets,
|
||||
solana_perf::packet::PacketBatch,
|
||||
solana_rayon_threadlimit::get_thread_count,
|
||||
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
|
||||
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
||||
@ -433,7 +433,7 @@ impl RetransmitStage {
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
retransmit_sockets: Arc<Vec<UdpSocket>>,
|
||||
repair_socket: Arc<UdpSocket>,
|
||||
verified_receiver: Receiver<Vec<Packets>>,
|
||||
verified_receiver: Receiver<Vec<PacketBatch>>,
|
||||
exit: Arc<AtomicBool>,
|
||||
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
|
||||
epoch_schedule: EpochSchedule,
|
||||
@ -610,10 +610,10 @@ mod tests {
|
||||
let shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
|
||||
// it should send this over the sockets.
|
||||
retransmit_sender.send(vec![shred]).unwrap();
|
||||
let mut packets = Packets::new(vec![]);
|
||||
solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap();
|
||||
assert_eq!(packets.packets.len(), 1);
|
||||
assert!(!packets.packets[0].meta.repair);
|
||||
let mut packet_batch = PacketBatch::new(vec![]);
|
||||
solana_streamer::packet::recv_from(&mut packet_batch, &me_retransmit, 1).unwrap();
|
||||
assert_eq!(packet_batch.packets.len(), 1);
|
||||
assert!(!packet_batch.packets[0].meta.repair);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -25,11 +25,11 @@ use {
|
||||
},
|
||||
solana_measure::measure::Measure,
|
||||
solana_metrics::inc_new_counter_debug,
|
||||
solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler},
|
||||
solana_perf::packet::{limited_deserialize, PacketBatch, PacketBatchRecycler},
|
||||
solana_sdk::{
|
||||
clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms,
|
||||
},
|
||||
solana_streamer::streamer::{PacketReceiver, PacketSender},
|
||||
solana_streamer::streamer::{PacketBatchReceiver, PacketBatchSender},
|
||||
std::{
|
||||
collections::HashSet,
|
||||
net::SocketAddr,
|
||||
@ -229,12 +229,12 @@ impl ServeRepair {
|
||||
|
||||
fn handle_repair(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
recycler: &PacketBatchRecycler,
|
||||
from_addr: &SocketAddr,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
request: RepairProtocol,
|
||||
stats: &mut ServeRepairStats,
|
||||
) -> Option<Packets> {
|
||||
) -> Option<PacketBatch> {
|
||||
let now = Instant::now();
|
||||
|
||||
let my_id = me.read().unwrap().my_id();
|
||||
@ -317,10 +317,10 @@ impl ServeRepair {
|
||||
/// Process messages from the network
|
||||
fn run_listen(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
recycler: &PacketBatchRecycler,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
requests_receiver: &PacketReceiver,
|
||||
response_sender: &PacketSender,
|
||||
requests_receiver: &PacketBatchReceiver,
|
||||
response_sender: &PacketBatchSender,
|
||||
stats: &mut ServeRepairStats,
|
||||
max_packets: &mut usize,
|
||||
) -> Result<()> {
|
||||
@ -392,12 +392,12 @@ impl ServeRepair {
|
||||
pub fn listen(
|
||||
me: Arc<RwLock<Self>>,
|
||||
blockstore: Option<Arc<Blockstore>>,
|
||||
requests_receiver: PacketReceiver,
|
||||
response_sender: PacketSender,
|
||||
requests_receiver: PacketBatchReceiver,
|
||||
response_sender: PacketBatchSender,
|
||||
exit: &Arc<AtomicBool>,
|
||||
) -> JoinHandle<()> {
|
||||
let exit = exit.clone();
|
||||
let recycler = PacketsRecycler::default();
|
||||
let recycler = PacketBatchRecycler::default();
|
||||
Builder::new()
|
||||
.name("solana-repair-listen".to_string())
|
||||
.spawn(move || {
|
||||
@ -432,14 +432,14 @@ impl ServeRepair {
|
||||
|
||||
fn handle_packets(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
recycler: &PacketsRecycler,
|
||||
recycler: &PacketBatchRecycler,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
packets: Packets,
|
||||
response_sender: &PacketSender,
|
||||
packet_batch: PacketBatch,
|
||||
response_sender: &PacketBatchSender,
|
||||
stats: &mut ServeRepairStats,
|
||||
) {
|
||||
// iter over the packets
|
||||
packets.packets.iter().for_each(|packet| {
|
||||
packet_batch.packets.iter().for_each(|packet| {
|
||||
let from_addr = packet.meta.addr();
|
||||
limited_deserialize(&packet.data[..packet.meta.size])
|
||||
.into_iter()
|
||||
@ -609,7 +609,7 @@ impl ServeRepair {
|
||||
}
|
||||
|
||||
fn run_window_request(
|
||||
recycler: &PacketsRecycler,
|
||||
recycler: &PacketBatchRecycler,
|
||||
from: &ContactInfo,
|
||||
from_addr: &SocketAddr,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
@ -617,7 +617,7 @@ impl ServeRepair {
|
||||
slot: Slot,
|
||||
shred_index: u64,
|
||||
nonce: Nonce,
|
||||
) -> Option<Packets> {
|
||||
) -> Option<PacketBatch> {
|
||||
if let Some(blockstore) = blockstore {
|
||||
// Try to find the requested index in one of the slots
|
||||
let packet = repair_response::repair_response_packet(
|
||||
@ -630,7 +630,7 @@ impl ServeRepair {
|
||||
|
||||
if let Some(packet) = packet {
|
||||
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
|
||||
return Some(Packets::new_unpinned_with_recycler_data(
|
||||
return Some(PacketBatch::new_unpinned_with_recycler_data(
|
||||
recycler,
|
||||
"run_window_request",
|
||||
vec![packet],
|
||||
@ -651,13 +651,13 @@ impl ServeRepair {
|
||||
}
|
||||
|
||||
fn run_highest_window_request(
|
||||
recycler: &PacketsRecycler,
|
||||
recycler: &PacketBatchRecycler,
|
||||
from_addr: &SocketAddr,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
slot: Slot,
|
||||
highest_index: u64,
|
||||
nonce: Nonce,
|
||||
) -> Option<Packets> {
|
||||
) -> Option<PacketBatch> {
|
||||
let blockstore = blockstore?;
|
||||
// Try to find the requested index in one of the slots
|
||||
let meta = blockstore.meta(slot).ok()??;
|
||||
@ -670,7 +670,7 @@ impl ServeRepair {
|
||||
from_addr,
|
||||
nonce,
|
||||
)?;
|
||||
return Some(Packets::new_unpinned_with_recycler_data(
|
||||
return Some(PacketBatch::new_unpinned_with_recycler_data(
|
||||
recycler,
|
||||
"run_highest_window_request",
|
||||
vec![packet],
|
||||
@ -680,14 +680,14 @@ impl ServeRepair {
|
||||
}
|
||||
|
||||
fn run_orphan(
|
||||
recycler: &PacketsRecycler,
|
||||
recycler: &PacketBatchRecycler,
|
||||
from_addr: &SocketAddr,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
mut slot: Slot,
|
||||
max_responses: usize,
|
||||
nonce: Nonce,
|
||||
) -> Option<Packets> {
|
||||
let mut res = Packets::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan");
|
||||
) -> Option<PacketBatch> {
|
||||
let mut res = PacketBatch::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan");
|
||||
if let Some(blockstore) = blockstore {
|
||||
// Try to find the next "n" parent slots of the input slot
|
||||
while let Ok(Some(meta)) = blockstore.meta(slot) {
|
||||
@ -720,12 +720,12 @@ impl ServeRepair {
|
||||
}
|
||||
|
||||
fn run_ancestor_hashes(
|
||||
recycler: &PacketsRecycler,
|
||||
recycler: &PacketBatchRecycler,
|
||||
from_addr: &SocketAddr,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
slot: Slot,
|
||||
nonce: Nonce,
|
||||
) -> Option<Packets> {
|
||||
) -> Option<PacketBatch> {
|
||||
let blockstore = blockstore?;
|
||||
let ancestor_slot_hashes = if blockstore.is_duplicate_confirmed(slot) {
|
||||
let ancestor_iterator =
|
||||
@ -746,7 +746,7 @@ impl ServeRepair {
|
||||
from_addr,
|
||||
nonce,
|
||||
)?;
|
||||
Some(Packets::new_unpinned_with_recycler_data(
|
||||
Some(PacketBatch::new_unpinned_with_recycler_data(
|
||||
recycler,
|
||||
"run_ancestor_hashes",
|
||||
vec![packet],
|
||||
@ -778,7 +778,7 @@ mod tests {
|
||||
|
||||
/// test run_window_request responds with the right shred, and do not overrun
|
||||
fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) {
|
||||
let recycler = PacketsRecycler::default();
|
||||
let recycler = PacketBatchRecycler::default();
|
||||
solana_logger::setup();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
@ -848,7 +848,7 @@ mod tests {
|
||||
|
||||
/// test window requests respond with the right shred, and do not overrun
|
||||
fn run_window_request(slot: Slot, nonce: Nonce) {
|
||||
let recycler = PacketsRecycler::default();
|
||||
let recycler = PacketBatchRecycler::default();
|
||||
solana_logger::setup();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
@ -1017,7 +1017,7 @@ mod tests {
|
||||
|
||||
fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) {
|
||||
solana_logger::setup();
|
||||
let recycler = PacketsRecycler::default();
|
||||
let recycler = PacketBatchRecycler::default();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
@ -1091,7 +1091,7 @@ mod tests {
|
||||
#[test]
|
||||
fn run_orphan_corrupted_shred_size() {
|
||||
solana_logger::setup();
|
||||
let recycler = PacketsRecycler::default();
|
||||
let recycler = PacketBatchRecycler::default();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
|
||||
@ -1152,7 +1152,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_run_ancestor_hashes() {
|
||||
solana_logger::setup();
|
||||
let recycler = PacketsRecycler::default();
|
||||
let recycler = PacketBatchRecycler::default();
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let slot = 0;
|
||||
|
@ -6,12 +6,12 @@ use {
|
||||
solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats},
|
||||
solana_perf::{
|
||||
cuda_runtime::PinnedVec,
|
||||
packet::{Packet, PacketsRecycler},
|
||||
packet::{Packet, PacketBatchRecycler},
|
||||
recycler::Recycler,
|
||||
},
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
|
||||
solana_streamer::streamer::{self, PacketReceiver, PacketSender},
|
||||
solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender},
|
||||
std::{
|
||||
net::UdpSocket,
|
||||
sync::{atomic::AtomicBool, mpsc::channel, Arc, RwLock},
|
||||
@ -63,8 +63,8 @@ impl ShredFetchStage {
|
||||
|
||||
// updates packets received on a channel and sends them on another channel
|
||||
fn modify_packets<F>(
|
||||
recvr: PacketReceiver,
|
||||
sendr: PacketSender,
|
||||
recvr: PacketBatchReceiver,
|
||||
sendr: PacketBatchSender,
|
||||
bank_forks: Option<Arc<RwLock<BankForks>>>,
|
||||
name: &'static str,
|
||||
modify: F,
|
||||
@ -83,7 +83,7 @@ impl ShredFetchStage {
|
||||
let mut stats = ShredFetchStats::default();
|
||||
let mut packet_hasher = PacketHasher::default();
|
||||
|
||||
while let Some(mut p) = recvr.iter().next() {
|
||||
while let Some(mut packet_batch) = recvr.iter().next() {
|
||||
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
|
||||
last_updated = Instant::now();
|
||||
packet_hasher.reset();
|
||||
@ -97,8 +97,8 @@ impl ShredFetchStage {
|
||||
slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch());
|
||||
}
|
||||
}
|
||||
stats.shred_count += p.packets.len();
|
||||
p.packets.iter_mut().for_each(|packet| {
|
||||
stats.shred_count += packet_batch.packets.len();
|
||||
packet_batch.packets.iter_mut().for_each(|packet| {
|
||||
Self::process_packet(
|
||||
packet,
|
||||
&mut shreds_received,
|
||||
@ -124,7 +124,7 @@ impl ShredFetchStage {
|
||||
stats = ShredFetchStats::default();
|
||||
last_stats = Instant::now();
|
||||
}
|
||||
if sendr.send(p).is_err() {
|
||||
if sendr.send(packet_batch).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -133,7 +133,7 @@ impl ShredFetchStage {
|
||||
fn packet_modifier<F>(
|
||||
sockets: Vec<Arc<UdpSocket>>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
sender: PacketSender,
|
||||
sender: PacketBatchSender,
|
||||
recycler: Recycler<PinnedVec<Packet>>,
|
||||
bank_forks: Option<Arc<RwLock<BankForks>>>,
|
||||
name: &'static str,
|
||||
@ -169,11 +169,11 @@ impl ShredFetchStage {
|
||||
sockets: Vec<Arc<UdpSocket>>,
|
||||
forward_sockets: Vec<Arc<UdpSocket>>,
|
||||
repair_socket: Arc<UdpSocket>,
|
||||
sender: &PacketSender,
|
||||
sender: &PacketBatchSender,
|
||||
bank_forks: Option<Arc<RwLock<BankForks>>>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
let recycler: PacketsRecycler = Recycler::warmed(100, 1024);
|
||||
let recycler: PacketBatchRecycler = Recycler::warmed(100, 1024);
|
||||
|
||||
let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
|
||||
sockets,
|
||||
|
@ -5,11 +5,11 @@
|
||||
//!
|
||||
|
||||
pub use solana_perf::sigverify::{
|
||||
batch_size, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset,
|
||||
count_packets_in_batches, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset,
|
||||
};
|
||||
use {
|
||||
crate::sigverify_stage::SigVerifier,
|
||||
solana_perf::{cuda_runtime::PinnedVec, packet::Packets, recycler::Recycler, sigverify},
|
||||
solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify},
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -40,13 +40,13 @@ impl Default for TransactionSigVerifier {
|
||||
}
|
||||
|
||||
impl SigVerifier for TransactionSigVerifier {
|
||||
fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> {
|
||||
fn verify_batches(&self, mut batches: Vec<PacketBatch>) -> Vec<PacketBatch> {
|
||||
sigverify::ed25519_verify(
|
||||
&mut batch,
|
||||
&mut batches,
|
||||
&self.recycler,
|
||||
&self.recycler_out,
|
||||
self.reject_non_vote,
|
||||
);
|
||||
batch
|
||||
batches
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ use {
|
||||
leader_schedule_cache::LeaderScheduleCache, shred::Shred,
|
||||
sigverify_shreds::verify_shreds_gpu,
|
||||
},
|
||||
solana_perf::{self, packet::Packets, recycler_cache::RecyclerCache},
|
||||
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
std::{
|
||||
collections::{HashMap, HashSet},
|
||||
@ -32,7 +32,7 @@ impl ShredSigVerifier {
|
||||
recycler_cache: RecyclerCache::warmed(),
|
||||
}
|
||||
}
|
||||
fn read_slots(batches: &[Packets]) -> HashSet<u64> {
|
||||
fn read_slots(batches: &[PacketBatch]) -> HashSet<u64> {
|
||||
batches
|
||||
.iter()
|
||||
.flat_map(|batch| batch.packets.iter().filter_map(Shred::get_slot_from_packet))
|
||||
@ -41,7 +41,7 @@ impl ShredSigVerifier {
|
||||
}
|
||||
|
||||
impl SigVerifier for ShredSigVerifier {
|
||||
fn verify_batch(&self, mut batches: Vec<Packets>) -> Vec<Packets> {
|
||||
fn verify_batches(&self, mut batches: Vec<PacketBatch>) -> Vec<PacketBatch> {
|
||||
let r_bank = self.bank_forks.read().unwrap().working_bank();
|
||||
let slots: HashSet<u64> = Self::read_slots(&batches);
|
||||
let mut leader_slots: HashMap<u64, [u8; 32]> = slots
|
||||
@ -88,13 +88,13 @@ pub mod tests {
|
||||
0,
|
||||
0xc0de,
|
||||
);
|
||||
let mut batch = [Packets::default(), Packets::default()];
|
||||
let mut batches = [PacketBatch::default(), PacketBatch::default()];
|
||||
|
||||
let keypair = Keypair::new();
|
||||
Shredder::sign_shred(&keypair, &mut shred);
|
||||
batch[0].packets.resize(1, Packet::default());
|
||||
batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
|
||||
batch[0].packets[0].meta.size = shred.payload.len();
|
||||
batches[0].packets.resize(1, Packet::default());
|
||||
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
|
||||
batches[0].packets[0].meta.size = shred.payload.len();
|
||||
|
||||
let mut shred = Shred::new_from_data(
|
||||
0xc0de_dead,
|
||||
@ -108,16 +108,16 @@ pub mod tests {
|
||||
0xc0de,
|
||||
);
|
||||
Shredder::sign_shred(&keypair, &mut shred);
|
||||
batch[1].packets.resize(1, Packet::default());
|
||||
batch[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
|
||||
batch[1].packets[0].meta.size = shred.payload.len();
|
||||
batches[1].packets.resize(1, Packet::default());
|
||||
batches[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
|
||||
batches[1].packets[0].meta.size = shred.payload.len();
|
||||
|
||||
let expected: HashSet<u64> = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect();
|
||||
assert_eq!(ShredSigVerifier::read_slots(&batch), expected);
|
||||
assert_eq!(ShredSigVerifier::read_slots(&batches), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sigverify_shreds_verify_batch() {
|
||||
fn test_sigverify_shreds_verify_batches() {
|
||||
let leader_keypair = Arc::new(Keypair::new());
|
||||
let leader_pubkey = leader_keypair.pubkey();
|
||||
let bank = Bank::new_for_tests(
|
||||
@ -127,8 +127,8 @@ pub mod tests {
|
||||
let bf = Arc::new(RwLock::new(BankForks::new(bank)));
|
||||
let verifier = ShredSigVerifier::new(bf, cache);
|
||||
|
||||
let mut batch = vec![Packets::default()];
|
||||
batch[0].packets.resize(2, Packet::default());
|
||||
let mut batches = vec![PacketBatch::default()];
|
||||
batches[0].packets.resize(2, Packet::default());
|
||||
|
||||
let mut shred = Shred::new_from_data(
|
||||
0,
|
||||
@ -142,8 +142,8 @@ pub mod tests {
|
||||
0xc0de,
|
||||
);
|
||||
Shredder::sign_shred(&leader_keypair, &mut shred);
|
||||
batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
|
||||
batch[0].packets[0].meta.size = shred.payload.len();
|
||||
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
|
||||
batches[0].packets[0].meta.size = shred.payload.len();
|
||||
|
||||
let mut shred = Shred::new_from_data(
|
||||
0,
|
||||
@ -158,10 +158,10 @@ pub mod tests {
|
||||
);
|
||||
let wrong_keypair = Keypair::new();
|
||||
Shredder::sign_shred(&wrong_keypair, &mut shred);
|
||||
batch[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
|
||||
batch[0].packets[1].meta.size = shred.payload.len();
|
||||
batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
|
||||
batches[0].packets[1].meta.size = shred.payload.len();
|
||||
|
||||
let rv = verifier.verify_batch(batch);
|
||||
let rv = verifier.verify_batches(batches);
|
||||
assert!(!rv[0].packets[0].meta.discard);
|
||||
assert!(rv[0].packets[1].meta.discard);
|
||||
}
|
||||
|
@ -9,9 +9,9 @@ use {
|
||||
crate::sigverify,
|
||||
crossbeam_channel::{SendError, Sender as CrossbeamSender},
|
||||
solana_measure::measure::Measure,
|
||||
solana_perf::packet::Packets,
|
||||
solana_perf::packet::PacketBatch,
|
||||
solana_sdk::timing,
|
||||
solana_streamer::streamer::{self, PacketReceiver, StreamerError},
|
||||
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
|
||||
std::{
|
||||
collections::HashMap,
|
||||
sync::mpsc::{Receiver, RecvTimeoutError},
|
||||
@ -26,7 +26,7 @@ const MAX_SIGVERIFY_BATCH: usize = 10_000;
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SigVerifyServiceError {
|
||||
#[error("send packets batch error")]
|
||||
Send(#[from] SendError<Vec<Packets>>),
|
||||
Send(#[from] SendError<Vec<PacketBatch>>),
|
||||
|
||||
#[error("streamer error")]
|
||||
Streamer(#[from] StreamerError),
|
||||
@ -39,7 +39,7 @@ pub struct SigVerifyStage {
|
||||
}
|
||||
|
||||
pub trait SigVerifier {
|
||||
fn verify_batch(&self, batch: Vec<Packets>) -> Vec<Packets>;
|
||||
fn verify_batches(&self, batches: Vec<PacketBatch>) -> Vec<PacketBatch>;
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
@ -49,7 +49,7 @@ pub struct DisabledSigVerifier {}
|
||||
struct SigVerifierStats {
|
||||
recv_batches_us_hist: histogram::Histogram, // time to call recv_batch
|
||||
verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
|
||||
batches_hist: histogram::Histogram, // number of Packets structures per verify call
|
||||
batches_hist: histogram::Histogram, // number of packet batches per verify call
|
||||
packets_hist: histogram::Histogram, // number of packets per verify call
|
||||
total_batches: usize,
|
||||
total_packets: usize,
|
||||
@ -122,24 +122,24 @@ impl SigVerifierStats {
|
||||
}
|
||||
|
||||
impl SigVerifier for DisabledSigVerifier {
|
||||
fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> {
|
||||
sigverify::ed25519_verify_disabled(&mut batch);
|
||||
batch
|
||||
fn verify_batches(&self, mut batches: Vec<PacketBatch>) -> Vec<PacketBatch> {
|
||||
sigverify::ed25519_verify_disabled(&mut batches);
|
||||
batches
|
||||
}
|
||||
}
|
||||
|
||||
impl SigVerifyStage {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new<T: SigVerifier + 'static + Send + Clone>(
|
||||
packet_receiver: Receiver<Packets>,
|
||||
verified_sender: CrossbeamSender<Vec<Packets>>,
|
||||
packet_receiver: Receiver<PacketBatch>,
|
||||
verified_sender: CrossbeamSender<Vec<PacketBatch>>,
|
||||
verifier: T,
|
||||
) -> Self {
|
||||
let thread_hdl = Self::verifier_services(packet_receiver, verified_sender, verifier);
|
||||
Self { thread_hdl }
|
||||
}
|
||||
|
||||
pub fn discard_excess_packets(batches: &mut Vec<Packets>, max_packets: usize) {
|
||||
pub fn discard_excess_packets(batches: &mut Vec<PacketBatch>, max_packets: usize) {
|
||||
let mut received_ips = HashMap::new();
|
||||
for (batch_index, batch) in batches.iter().enumerate() {
|
||||
for (packet_index, packets) in batch.packets.iter().enumerate() {
|
||||
@ -169,12 +169,12 @@ impl SigVerifyStage {
|
||||
}
|
||||
|
||||
fn verifier<T: SigVerifier>(
|
||||
recvr: &PacketReceiver,
|
||||
sendr: &CrossbeamSender<Vec<Packets>>,
|
||||
recvr: &PacketBatchReceiver,
|
||||
sendr: &CrossbeamSender<Vec<PacketBatch>>,
|
||||
verifier: &T,
|
||||
stats: &mut SigVerifierStats,
|
||||
) -> Result<()> {
|
||||
let (mut batches, num_packets, recv_duration) = streamer::recv_batch(recvr)?;
|
||||
let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?;
|
||||
|
||||
let batches_len = batches.len();
|
||||
debug!(
|
||||
@ -187,7 +187,7 @@ impl SigVerifyStage {
|
||||
}
|
||||
|
||||
let mut verify_batch_time = Measure::start("sigverify_batch_time");
|
||||
sendr.send(verifier.verify_batch(batches))?;
|
||||
sendr.send(verifier.verify_batches(batches))?;
|
||||
verify_batch_time.stop();
|
||||
|
||||
debug!(
|
||||
@ -216,8 +216,8 @@ impl SigVerifyStage {
|
||||
}
|
||||
|
||||
fn verifier_service<T: SigVerifier + 'static + Send + Clone>(
|
||||
packet_receiver: PacketReceiver,
|
||||
verified_sender: CrossbeamSender<Vec<Packets>>,
|
||||
packet_receiver: PacketBatchReceiver,
|
||||
verified_sender: CrossbeamSender<Vec<PacketBatch>>,
|
||||
verifier: &T,
|
||||
) -> JoinHandle<()> {
|
||||
let verifier = verifier.clone();
|
||||
@ -252,8 +252,8 @@ impl SigVerifyStage {
|
||||
}
|
||||
|
||||
fn verifier_services<T: SigVerifier + 'static + Send + Clone>(
|
||||
packet_receiver: PacketReceiver,
|
||||
verified_sender: CrossbeamSender<Vec<Packets>>,
|
||||
packet_receiver: PacketBatchReceiver,
|
||||
verified_sender: CrossbeamSender<Vec<PacketBatch>>,
|
||||
verifier: T,
|
||||
) -> JoinHandle<()> {
|
||||
Self::verifier_service(packet_receiver, verified_sender, &verifier)
|
||||
@ -268,11 +268,12 @@ impl SigVerifyStage {
|
||||
mod tests {
|
||||
use {super::*, solana_perf::packet::Packet};
|
||||
|
||||
fn count_non_discard(packets: &[Packets]) -> usize {
|
||||
packets
|
||||
fn count_non_discard(packet_batches: &[PacketBatch]) -> usize {
|
||||
packet_batches
|
||||
.iter()
|
||||
.map(|pp| {
|
||||
pp.packets
|
||||
.map(|batch| {
|
||||
batch
|
||||
.packets
|
||||
.iter()
|
||||
.map(|p| if p.meta.discard { 0 } else { 1 })
|
||||
.sum::<usize>()
|
||||
@ -283,14 +284,14 @@ mod tests {
|
||||
#[test]
|
||||
fn test_packet_discard() {
|
||||
solana_logger::setup();
|
||||
let mut p = Packets::default();
|
||||
p.packets.resize(10, Packet::default());
|
||||
p.packets[3].meta.addr = [1u16; 8];
|
||||
let mut packets = vec![p];
|
||||
let mut batch = PacketBatch::default();
|
||||
batch.packets.resize(10, Packet::default());
|
||||
batch.packets[3].meta.addr = [1u16; 8];
|
||||
let mut batches = vec![batch];
|
||||
let max = 3;
|
||||
SigVerifyStage::discard_excess_packets(&mut packets, max);
|
||||
assert_eq!(count_non_discard(&packets), max);
|
||||
assert!(!packets[0].packets[0].meta.discard);
|
||||
assert!(!packets[0].packets[3].meta.discard);
|
||||
SigVerifyStage::discard_excess_packets(&mut batches, max);
|
||||
assert_eq!(count_non_discard(&batches), max);
|
||||
assert!(!batches[0].packets[0].meta.discard);
|
||||
assert!(!batches[0].packets[3].meta.discard);
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
use {
|
||||
crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result},
|
||||
crossbeam_channel::Select,
|
||||
solana_perf::packet::Packets,
|
||||
solana_perf::packet::PacketBatch,
|
||||
solana_runtime::bank::Bank,
|
||||
solana_sdk::{
|
||||
account::from_account, clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature,
|
||||
@ -20,7 +20,7 @@ const MAX_VOTES_PER_VALIDATOR: usize = 1000;
|
||||
pub struct VerifiedVoteMetadata {
|
||||
pub vote_account_key: Pubkey,
|
||||
pub vote: Vote,
|
||||
pub packet: Packets,
|
||||
pub packet_batch: PacketBatch,
|
||||
pub signature: Signature,
|
||||
}
|
||||
|
||||
@ -70,7 +70,7 @@ impl<'a> ValidatorGossipVotesIterator<'a> {
|
||||
///
|
||||
/// Iterator is done after iterating through all vote accounts
|
||||
impl<'a> Iterator for ValidatorGossipVotesIterator<'a> {
|
||||
type Item = Vec<Packets>;
|
||||
type Item = Vec<PacketBatch>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// TODO: Maybe prioritize by stake weight
|
||||
@ -116,7 +116,7 @@ impl<'a> Iterator for ValidatorGossipVotesIterator<'a> {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<Packets>>()
|
||||
.collect::<Vec<PacketBatch>>()
|
||||
})
|
||||
})
|
||||
});
|
||||
@ -130,7 +130,7 @@ impl<'a> Iterator for ValidatorGossipVotesIterator<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
pub type SingleValidatorVotes = BTreeMap<(Slot, Hash), (Packets, Signature)>;
|
||||
pub type SingleValidatorVotes = BTreeMap<(Slot, Hash), (PacketBatch, Signature)>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct VerifiedVotePackets(HashMap<Pubkey, SingleValidatorVotes>);
|
||||
@ -150,7 +150,7 @@ impl VerifiedVotePackets {
|
||||
let VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet,
|
||||
packet_batch,
|
||||
signature,
|
||||
} = verfied_vote_metadata;
|
||||
if vote.slots.is_empty() {
|
||||
@ -161,7 +161,7 @@ impl VerifiedVotePackets {
|
||||
let hash = vote.hash;
|
||||
|
||||
let validator_votes = self.0.entry(vote_account_key).or_default();
|
||||
validator_votes.insert((*slot, hash), (packet, signature));
|
||||
validator_votes.insert((*slot, hash), (packet_batch, signature));
|
||||
|
||||
if validator_votes.len() > MAX_VOTES_PER_VALIDATOR {
|
||||
let smallest_key = validator_votes.keys().next().cloned().unwrap();
|
||||
@ -199,7 +199,7 @@ mod tests {
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote: vote.clone(),
|
||||
packet: Packets::default(),
|
||||
packet_batch: PacketBatch::default(),
|
||||
signature: Signature::new(&[1u8; 64]),
|
||||
}])
|
||||
.unwrap();
|
||||
@ -219,7 +219,7 @@ mod tests {
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
packet_batch: PacketBatch::default(),
|
||||
signature: Signature::new(&[1u8; 64]),
|
||||
}])
|
||||
.unwrap();
|
||||
@ -241,7 +241,7 @@ mod tests {
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
packet_batch: PacketBatch::default(),
|
||||
signature: Signature::new(&[1u8; 64]),
|
||||
}])
|
||||
.unwrap();
|
||||
@ -264,7 +264,7 @@ mod tests {
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
packet_batch: PacketBatch::default(),
|
||||
signature: Signature::new(&[2u8; 64]),
|
||||
}])
|
||||
.unwrap();
|
||||
@ -303,7 +303,7 @@ mod tests {
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
packet_batch: PacketBatch::default(),
|
||||
signature: Signature::new(&[1u8; 64]),
|
||||
}])
|
||||
.unwrap();
|
||||
@ -340,7 +340,7 @@ mod tests {
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
packet_batch: PacketBatch::default(),
|
||||
signature: Signature::new_unique(),
|
||||
}])
|
||||
.unwrap();
|
||||
@ -394,7 +394,7 @@ mod tests {
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::new(vec![Packet::default(); num_packets]),
|
||||
packet_batch: PacketBatch::new(vec![Packet::default(); num_packets]),
|
||||
signature: Signature::new_unique(),
|
||||
}])
|
||||
.unwrap();
|
||||
@ -427,12 +427,12 @@ mod tests {
|
||||
// Get and verify batches
|
||||
let num_expected_batches = 2;
|
||||
for _ in 0..num_expected_batches {
|
||||
let validator_batch: Vec<Packets> = gossip_votes_iterator.next().unwrap();
|
||||
let validator_batch: Vec<PacketBatch> = gossip_votes_iterator.next().unwrap();
|
||||
assert_eq!(validator_batch.len(), slot_hashes.slot_hashes().len());
|
||||
let expected_len = validator_batch[0].packets.len();
|
||||
assert!(validator_batch
|
||||
.iter()
|
||||
.all(|p| p.packets.len() == expected_len));
|
||||
.all(|batch| batch.packets.len() == expected_len));
|
||||
}
|
||||
|
||||
// Should be empty now
|
||||
@ -461,7 +461,7 @@ mod tests {
|
||||
s.send(vec![VerifiedVoteMetadata {
|
||||
vote_account_key,
|
||||
vote,
|
||||
packet: Packets::default(),
|
||||
packet_batch: PacketBatch::default(),
|
||||
signature: Signature::new_unique(),
|
||||
}])
|
||||
.unwrap();
|
||||
|
@ -22,7 +22,7 @@ use {
|
||||
},
|
||||
solana_measure::measure::Measure,
|
||||
solana_metrics::{inc_new_counter_debug, inc_new_counter_error},
|
||||
solana_perf::packet::{Packet, Packets},
|
||||
solana_perf::packet::{Packet, PacketBatch},
|
||||
solana_rayon_threadlimit::get_thread_count,
|
||||
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
||||
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey},
|
||||
@ -353,7 +353,7 @@ fn recv_window<F>(
|
||||
blockstore: &Blockstore,
|
||||
bank_forks: &RwLock<BankForks>,
|
||||
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
|
||||
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
retransmit_sender: &Sender<Vec<Shred>>,
|
||||
shred_filter: F,
|
||||
thread_pool: &ThreadPool,
|
||||
@ -458,7 +458,7 @@ impl WindowService {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new<F>(
|
||||
blockstore: Arc<Blockstore>,
|
||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
retransmit_sender: Sender<Vec<Shred>>,
|
||||
repair_socket: Arc<UdpSocket>,
|
||||
exit: Arc<AtomicBool>,
|
||||
@ -629,7 +629,7 @@ impl WindowService {
|
||||
exit: Arc<AtomicBool>,
|
||||
blockstore: Arc<Blockstore>,
|
||||
insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
|
||||
shred_filter: F,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
retransmit_sender: Sender<Vec<Shred>>,
|
||||
|
Reference in New Issue
Block a user