Revert "Rename Packets to PacketBatch (backport #21794) (#21804)"

This reverts commit 39e27b130f.
This commit is contained in:
Trent Nelson
2021-12-13 08:48:37 -07:00
committed by Tao Zhu
parent 09c68ce696
commit 13d40d6a66
25 changed files with 645 additions and 695 deletions

View File

@@ -14,7 +14,7 @@ use {
solana_perf::{
cuda_runtime::PinnedVec,
data_budget::DataBudget,
packet::{limited_deserialize, Packet, PacketBatch, PACKETS_PER_BATCH},
packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH},
perf_libs,
},
solana_poh::poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder},
@@ -65,10 +65,10 @@ use {
};
/// (packets, valid_indexes, forwarded)
/// Batch of packets with a list of which are valid and if this batch has been forwarded.
type PacketBatchAndOffsets = (PacketBatch, Vec<usize>, bool);
/// Set of packets with a list of which are valid and if this batch has been forwarded.
type PacketsAndOffsets = (Packets, Vec<usize>, bool);
pub type UnprocessedPacketBatches = VecDeque<PacketBatchAndOffsets>;
pub type UnprocessedPackets = VecDeque<PacketsAndOffsets>;
/// Transaction forwarding
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2;
@@ -287,9 +287,9 @@ impl BankingStage {
pub fn new(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
@@ -310,9 +310,9 @@ impl BankingStage {
fn new_num_threads(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
@@ -379,12 +379,12 @@ impl BankingStage {
}
fn filter_valid_packets_for_forwarding<'a>(
packet_batches: impl Iterator<Item = &'a PacketBatchAndOffsets>,
all_packets: impl Iterator<Item = &'a PacketsAndOffsets>,
) -> Vec<&'a Packet> {
packet_batches
.filter(|(_batch, _indexes, forwarded)| !forwarded)
.flat_map(|(batch, valid_indexes, _forwarded)| {
valid_indexes.iter().map(move |x| &batch.packets[*x])
all_packets
.filter(|(_p, _indexes, forwarded)| !forwarded)
.flat_map(|(p, valid_indexes, _forwarded)| {
valid_indexes.iter().map(move |x| &p.packets[*x])
})
.collect()
}
@@ -392,10 +392,10 @@ impl BankingStage {
fn forward_buffered_packets(
socket: &std::net::UdpSocket,
tpu_forwards: &std::net::SocketAddr,
buffered_packet_batches: &UnprocessedPacketBatches,
unprocessed_packets: &UnprocessedPackets,
data_budget: &DataBudget,
) -> std::io::Result<()> {
let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter());
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
const INTERVAL_MS: u64 = 100;
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
@@ -413,7 +413,7 @@ impl BankingStage {
Ok(())
}
// Returns whether the given `PacketBatch` has any more remaining unprocessed
// Returns whether the given `Packets` has any more remaining unprocessed
// transactions
fn update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes: &mut Vec<usize>,
@@ -432,7 +432,7 @@ impl BankingStage {
my_pubkey: &Pubkey,
max_tx_ingestion_ns: u128,
poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packet_batches: &mut UnprocessedPacketBatches,
buffered_packets: &mut UnprocessedPackets,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
test_fn: Option<impl Fn()>,
@@ -440,21 +440,19 @@ impl BankingStage {
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
) {
let mut rebuffered_packet_count = 0;
let mut rebuffered_packets_len = 0;
let mut new_tx_count = 0;
let buffered_packet_batches_len = buffered_packet_batches.len();
let buffered_len = buffered_packets.len();
let mut proc_start = Measure::start("consume_buffered_process");
let mut reached_end_of_slot = None;
buffered_packet_batches.retain_mut(|buffered_packet_batch_and_offsets| {
let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) =
buffered_packet_batch_and_offsets;
buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes, _forwarded)| {
if let Some((next_leader, bank)) = &reached_end_of_slot {
// We've hit the end of this slot, no need to perform more processing,
// just filter the remaining packets for the invalid (e.g. too old) ones
let new_unprocessed_indexes = Self::filter_unprocessed_packets(
bank,
packet_batch,
msgs,
original_unprocessed_indexes,
my_pubkey,
*next_leader,
@@ -473,7 +471,7 @@ impl BankingStage {
&bank,
&bank_creation_time,
recorder,
packet_batch,
msgs,
original_unprocessed_indexes.to_owned(),
transaction_status_sender.clone(),
gossip_vote_sender,
@@ -492,7 +490,7 @@ impl BankingStage {
new_tx_count += processed;
// Out of the buffered packets just retried, collect any still unprocessed
// transactions in this batch for forwarding
rebuffered_packet_count += new_unprocessed_indexes.len();
rebuffered_packets_len += new_unprocessed_indexes.len();
let has_more_unprocessed_transactions =
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
@@ -503,7 +501,7 @@ impl BankingStage {
}
has_more_unprocessed_transactions
} else {
rebuffered_packet_count += original_unprocessed_indexes.len();
rebuffered_packets_len += original_unprocessed_indexes.len();
// `original_unprocessed_indexes` must have remaining packets to process
// if not yet processed.
assert!(Self::packet_has_more_unprocessed_transactions(
@@ -519,7 +517,7 @@ impl BankingStage {
debug!(
"@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}",
timestamp(),
buffered_packet_batches_len,
buffered_len,
proc_start.as_ms(),
new_tx_count,
(new_tx_count as f32) / (proc_start.as_s())
@@ -530,7 +528,7 @@ impl BankingStage {
.fetch_add(proc_start.as_us(), Ordering::Relaxed);
banking_stage_stats
.rebuffered_packets_count
.fetch_add(rebuffered_packet_count, Ordering::Relaxed);
.fetch_add(rebuffered_packets_len, Ordering::Relaxed);
banking_stage_stats
.consumed_buffered_packets_count
.fetch_add(new_tx_count, Ordering::Relaxed);
@@ -575,7 +573,7 @@ impl BankingStage {
socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
buffered_packet_batches: &mut UnprocessedPacketBatches,
buffered_packets: &mut UnprocessedPackets,
forward_option: &ForwardOption,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
@@ -617,7 +615,7 @@ impl BankingStage {
my_pubkey,
max_tx_ingestion_ns,
poh_recorder,
buffered_packet_batches,
buffered_packets,
transaction_status_sender,
gossip_vote_sender,
None::<Box<dyn Fn()>>,
@@ -630,7 +628,7 @@ impl BankingStage {
Self::handle_forwarding(
forward_option,
cluster_info,
buffered_packet_batches,
buffered_packets,
poh_recorder,
socket,
false,
@@ -641,7 +639,7 @@ impl BankingStage {
Self::handle_forwarding(
forward_option,
cluster_info,
buffered_packet_batches,
buffered_packets,
poh_recorder,
socket,
true,
@@ -656,7 +654,7 @@ impl BankingStage {
fn handle_forwarding(
forward_option: &ForwardOption,
cluster_info: &ClusterInfo,
buffered_packet_batches: &mut UnprocessedPacketBatches,
buffered_packets: &mut UnprocessedPackets,
poh_recorder: &Arc<Mutex<PohRecorder>>,
socket: &UdpSocket,
hold: bool,
@@ -665,7 +663,7 @@ impl BankingStage {
let addr = match forward_option {
ForwardOption::NotForward => {
if !hold {
buffered_packet_batches.clear();
buffered_packets.clear();
}
return;
}
@@ -678,21 +676,21 @@ impl BankingStage {
Some(addr) => addr,
None => return,
};
let _ = Self::forward_buffered_packets(socket, &addr, buffered_packet_batches, data_budget);
let _ = Self::forward_buffered_packets(socket, &addr, buffered_packets, data_budget);
if hold {
buffered_packet_batches.retain(|(_, index, _)| !index.is_empty());
for (_, _, forwarded) in buffered_packet_batches.iter_mut() {
buffered_packets.retain(|(_, index, _)| !index.is_empty());
for (_, _, forwarded) in buffered_packets.iter_mut() {
*forwarded = true;
}
} else {
buffered_packet_batches.clear();
buffered_packets.clear();
}
}
#[allow(clippy::too_many_arguments)]
pub fn process_loop(
my_pubkey: Pubkey,
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
recv_start: &mut Instant,
@@ -707,16 +705,16 @@ impl BankingStage {
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
let mut buffered_packets = VecDeque::with_capacity(batch_limit);
let banking_stage_stats = BankingStageStats::new(id);
loop {
while !buffered_packet_batches.is_empty() {
while !buffered_packets.is_empty() {
let decision = Self::process_buffered_packets(
&my_pubkey,
&socket,
poh_recorder,
cluster_info,
&mut buffered_packet_batches,
&mut buffered_packets,
&forward_option,
transaction_status_sender.clone(),
&gossip_vote_sender,
@@ -734,7 +732,7 @@ impl BankingStage {
}
}
let recv_timeout = if !buffered_packet_batches.is_empty() {
let recv_timeout = if !buffered_packets.is_empty() {
// If packets are buffered, let's wait for less time on recv from the channel.
// This helps detect the next leader faster, and processing the buffered
// packets quickly
@@ -754,7 +752,7 @@ impl BankingStage {
batch_limit,
transaction_status_sender.clone(),
&gossip_vote_sender,
&mut buffered_packet_batches,
&mut buffered_packets,
&banking_stage_stats,
duplicates,
&recorder,
@@ -1093,7 +1091,7 @@ impl BankingStage {
// Also returned is packet indexes for transaction should be retried due to cost limits.
#[allow(clippy::needless_collect)]
fn transactions_from_packets(
packet_batch: &PacketBatch,
msgs: &Packets,
transaction_indexes: &[usize],
feature_set: &Arc<feature_set::FeatureSet>,
read_cost_tracker: &RwLockReadGuard<CostTracker>,
@@ -1107,7 +1105,7 @@ impl BankingStage {
let verified_transactions_with_packet_indexes: Vec<_> = transaction_indexes
.iter()
.filter_map(|tx_index| {
let p = &packet_batch.packets[*tx_index];
let p = &msgs.packets[*tx_index];
if votes_only && !p.meta.is_simple_vote_tx {
return None;
}
@@ -1130,7 +1128,7 @@ impl BankingStage {
.filter_map(|(tx, tx_index)| {
// put transaction into retry queue if it wouldn't fit
// into current bank
let is_vote = &packet_batch.packets[tx_index].meta.is_simple_vote_tx;
let is_vote = &msgs.packets[tx_index].meta.is_simple_vote_tx;
// excluding vote TX from cost_model, for now
if !is_vote
@@ -1158,7 +1156,7 @@ impl BankingStage {
filtered_transactions_with_packet_indexes
.into_iter()
.filter_map(|(tx, tx_index)| {
let p = &packet_batch.packets[tx_index];
let p = &msgs.packets[tx_index];
let message_bytes = Self::packet_message(p)?;
let message_hash = Message::hash_raw_message(message_bytes);
Some((
@@ -1227,7 +1225,7 @@ impl BankingStage {
bank: &Arc<Bank>,
bank_creation_time: &Instant,
poh: &TransactionRecorder,
packet_batch: &PacketBatch,
msgs: &Packets,
packet_indexes: Vec<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
@@ -1237,7 +1235,7 @@ impl BankingStage {
let mut packet_conversion_time = Measure::start("packet_conversion");
let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) =
Self::transactions_from_packets(
packet_batch,
msgs,
&packet_indexes,
&bank.feature_set,
&bank.read_cost_tracker().unwrap(),
@@ -1328,7 +1326,7 @@ impl BankingStage {
fn filter_unprocessed_packets(
bank: &Arc<Bank>,
packet_batch: &PacketBatch,
msgs: &Packets,
transaction_indexes: &[usize],
my_pubkey: &Pubkey,
next_leader: Option<Pubkey>,
@@ -1348,7 +1346,7 @@ impl BankingStage {
Measure::start("unprocessed_packet_conversion");
let (transactions, transaction_to_packet_indexes, retry_packet_indexes) =
Self::transactions_from_packets(
packet_batch,
msgs,
&transaction_indexes,
&bank.feature_set,
&bank.read_cost_tracker().unwrap(),
@@ -1404,7 +1402,7 @@ impl BankingStage {
/// Process the incoming packets
pub fn process_packets(
my_pubkey: &Pubkey,
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
poh: &Arc<Mutex<PohRecorder>>,
recv_start: &mut Instant,
recv_timeout: Duration,
@@ -1412,40 +1410,40 @@ impl BankingStage {
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
buffered_packet_batches: &mut UnprocessedPacketBatches,
buffered_packets: &mut UnprocessedPackets,
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
cost_model: &Arc<RwLock<CostModel>>,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
let mms = verified_receiver.recv_timeout(recv_timeout)?;
recv_time.stop();
let packet_batches_len = packet_batches.len();
let packet_count: usize = packet_batches.iter().map(|x| x.packets.len()).sum();
let mms_len = mms.len();
let count: usize = mms.iter().map(|x| x.packets.len()).sum();
debug!(
"@{:?} process start stalled for: {:?}ms txs: {} id: {}",
timestamp(),
duration_as_ms(&recv_start.elapsed()),
packet_count,
count,
id,
);
inc_new_counter_debug!("banking_stage-transactions_received", packet_count);
inc_new_counter_debug!("banking_stage-transactions_received", count);
let mut proc_start = Measure::start("process_packets_transactions_process");
let mut new_tx_count = 0;
let mut packet_batch_iter = packet_batches.into_iter();
let mut mms_iter = mms.into_iter();
let mut dropped_packets_count = 0;
let mut dropped_packet_batches_count = 0;
let mut newly_buffered_packets_count = 0;
while let Some(packet_batch) = packet_batch_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets);
while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
let bank_start = poh.lock().unwrap().bank_start();
if PohRecorder::get_bank_still_processing_txs(&bank_start).is_none() {
Self::push_unprocessed(
buffered_packet_batches,
packet_batch,
buffered_packets,
msgs,
packet_indexes,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
@@ -1463,7 +1461,7 @@ impl BankingStage {
&bank,
&bank_creation_time,
recorder,
&packet_batch,
&msgs,
packet_indexes,
transaction_status_sender.clone(),
gossip_vote_sender,
@@ -1475,8 +1473,8 @@ impl BankingStage {
// Collect any unprocessed transactions in this batch for forwarding
Self::push_unprocessed(
buffered_packet_batches,
packet_batch,
buffered_packets,
msgs,
unprocessed_indexes,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
@@ -1492,11 +1490,11 @@ impl BankingStage {
let next_leader = poh.lock().unwrap().next_slot_leader();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
#[allow(clippy::while_let_on_iterator)]
while let Some(packet_batch) = packet_batch_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets);
while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
let unprocessed_indexes = Self::filter_unprocessed_packets(
&bank,
&packet_batch,
&msgs,
&packet_indexes,
my_pubkey,
next_leader,
@@ -1504,8 +1502,8 @@ impl BankingStage {
cost_model,
);
Self::push_unprocessed(
buffered_packet_batches,
packet_batch,
buffered_packets,
msgs,
unprocessed_indexes,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
@@ -1526,11 +1524,11 @@ impl BankingStage {
debug!(
"@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}",
timestamp(),
packet_batches_len,
mms_len,
proc_start.as_ms(),
new_tx_count,
(new_tx_count as f32) / (proc_start.as_s()),
packet_count,
count,
id,
);
banking_stage_stats
@@ -1538,7 +1536,7 @@ impl BankingStage {
.fetch_add(proc_start.as_us(), Ordering::Relaxed);
banking_stage_stats
.process_packets_count
.fetch_add(packet_count, Ordering::Relaxed);
.fetch_add(count, Ordering::Relaxed);
banking_stage_stats
.new_tx_count
.fetch_add(new_tx_count, Ordering::Relaxed);
@@ -1553,12 +1551,9 @@ impl BankingStage {
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
banking_stage_stats
.current_buffered_packet_batches_count
.swap(buffered_packet_batches.len(), Ordering::Relaxed);
.swap(buffered_packets.len(), Ordering::Relaxed);
banking_stage_stats.current_buffered_packets_count.swap(
buffered_packet_batches
.iter()
.map(|packets| packets.1.len())
.sum(),
buffered_packets.iter().map(|packets| packets.1.len()).sum(),
Ordering::Relaxed,
);
*recv_start = Instant::now();
@@ -1566,8 +1561,8 @@ impl BankingStage {
}
fn push_unprocessed(
unprocessed_packet_batches: &mut UnprocessedPacketBatches,
packet_batch: PacketBatch,
unprocessed_packets: &mut UnprocessedPackets,
packets: Packets,
mut packet_indexes: Vec<usize>,
dropped_packet_batches_count: &mut usize,
dropped_packets_count: &mut usize,
@@ -1582,7 +1577,7 @@ impl BankingStage {
let mut duplicates = duplicates.lock().unwrap();
let (cache, hasher) = duplicates.deref_mut();
packet_indexes.retain(|i| {
let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]);
let packet_hash = hasher.hash_packet(&packets.packets[*i]);
match cache.get_mut(&packet_hash) {
Some(_hash) => false,
None => {
@@ -1603,14 +1598,14 @@ impl BankingStage {
);
}
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
if unprocessed_packet_batches.len() >= batch_limit {
if unprocessed_packets.len() >= batch_limit {
*dropped_packet_batches_count += 1;
if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() {
if let Some(dropped_batch) = unprocessed_packets.pop_front() {
*dropped_packets_count += dropped_batch.1.len();
}
}
*newly_buffered_packets_count += packet_indexes.len();
unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false));
unprocessed_packets.push_back((packets, packet_indexes, false));
}
}
@@ -1680,7 +1675,7 @@ mod tests {
get_tmp_ledger_path,
leader_schedule_cache::LeaderScheduleCache,
},
solana_perf::packet::to_packet_batches,
solana_perf::packet::to_packets_chunked,
solana_poh::{
poh_recorder::{create_test_recorder, Record, WorkingBank, WorkingBankEntry},
poh_service::PohService,
@@ -1817,9 +1812,7 @@ mod tests {
Blockstore::destroy(&ledger_path).unwrap();
}
pub fn convert_from_old_verified(
mut with_vers: Vec<(PacketBatch, Vec<u8>)>,
) -> Vec<PacketBatch> {
pub fn convert_from_old_verified(mut with_vers: Vec<(Packets, Vec<u8>)>) -> Vec<Packets> {
with_vers.iter_mut().for_each(|(b, v)| {
b.packets
.iter_mut()
@@ -1891,18 +1884,18 @@ mod tests {
let tx_anf = system_transaction::transfer(&keypair, &to3, 1, start_hash);
// send 'em over
let packet_batches = to_packet_batches(&[tx_no_ver, tx_anf, tx], 3);
let packets = to_packets_chunked(&[tx_no_ver, tx_anf, tx], 3);
// glad they all fit
assert_eq!(packet_batches.len(), 1);
assert_eq!(packets.len(), 1);
let packet_batches = packet_batches
let packets = packets
.into_iter()
.map(|batch| (batch, vec![0u8, 1u8, 1u8]))
.map(|packets| (packets, vec![0u8, 1u8, 1u8]))
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
let packets = convert_from_old_verified(packets);
verified_sender // no_ver, anf, tx
.send(packet_batches)
.send(packets)
.unwrap();
drop(verified_sender);
@@ -1968,24 +1961,24 @@ mod tests {
let tx =
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, genesis_config.hash());
let packet_batches = to_packet_batches(&[tx], 1);
let packet_batches = packet_batches
let packets = to_packets_chunked(&[tx], 1);
let packets = packets
.into_iter()
.map(|batch| (batch, vec![1u8]))
.map(|packets| (packets, vec![1u8]))
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
verified_sender.send(packet_batches).unwrap();
let packets = convert_from_old_verified(packets);
verified_sender.send(packets).unwrap();
// Process a second batch that uses the same from account, so conflicts with above TX
let tx =
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, genesis_config.hash());
let packet_batches = to_packet_batches(&[tx], 1);
let packet_batches = packet_batches
let packets = to_packets_chunked(&[tx], 1);
let packets = packets
.into_iter()
.map(|batch| (batch, vec![1u8]))
.map(|packets| (packets, vec![1u8]))
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
verified_sender.send(packet_batches).unwrap();
let packets = convert_from_old_verified(packets);
verified_sender.send(packets).unwrap();
let (vote_sender, vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
@@ -2527,9 +2520,9 @@ mod tests {
fn test_filter_valid_packets() {
solana_logger::setup();
let mut packet_batches = (0..16)
let mut all_packets = (0..16)
.map(|packets_id| {
let packet_batch = PacketBatch::new(
let packets = Packets::new(
(0..32)
.map(|packet_id| {
let mut p = Packet::default();
@@ -2541,11 +2534,11 @@ mod tests {
let valid_indexes = (0..32)
.filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None })
.collect_vec();
(packet_batch, valid_indexes, false)
(packets, valid_indexes, false)
})
.collect_vec();
let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter());
let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter());
assert_eq!(result.len(), 256);
@@ -2559,8 +2552,8 @@ mod tests {
})
.collect_vec();
packet_batches[0].2 = true;
let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter());
all_packets[0].2 = true;
let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter());
assert_eq!(result.len(), 240);
}
@@ -2814,15 +2807,12 @@ mod tests {
setup_conflicting_transactions(&ledger_path);
let recorder = poh_recorder.lock().unwrap().recorder();
let num_conflicting_transactions = transactions.len();
let mut packet_batches = to_packet_batches(&transactions, num_conflicting_transactions);
assert_eq!(packet_batches.len(), 1);
assert_eq!(
packet_batches[0].packets.len(),
num_conflicting_transactions
);
let packet_batch = packet_batches.pop().unwrap();
let mut buffered_packet_batches: UnprocessedPacketBatches = vec![(
packet_batch,
let mut packets_vec = to_packets_chunked(&transactions, num_conflicting_transactions);
assert_eq!(packets_vec.len(), 1);
assert_eq!(packets_vec[0].packets.len(), num_conflicting_transactions);
let all_packets = packets_vec.pop().unwrap();
let mut buffered_packets: UnprocessedPackets = vec![(
all_packets,
(0..num_conflicting_transactions).into_iter().collect(),
false,
)]
@@ -2838,7 +2828,7 @@ mod tests {
&Pubkey::default(),
max_tx_processing_ns,
&poh_recorder,
&mut buffered_packet_batches,
&mut buffered_packets,
None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
@@ -2846,10 +2836,7 @@ mod tests {
&recorder,
&Arc::new(RwLock::new(CostModel::default())),
);
assert_eq!(
buffered_packet_batches[0].1.len(),
num_conflicting_transactions
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
// When the poh recorder has a bank, should process all non conflicting buffered packets.
// Processes one packet per iteration of the loop
for num_expected_unprocessed in (0..num_conflicting_transactions).rev() {
@@ -2858,7 +2845,7 @@ mod tests {
&Pubkey::default(),
max_tx_processing_ns,
&poh_recorder,
&mut buffered_packet_batches,
&mut buffered_packets,
None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
@@ -2867,9 +2854,9 @@ mod tests {
&Arc::new(RwLock::new(CostModel::default())),
);
if num_expected_unprocessed == 0 {
assert!(buffered_packet_batches.is_empty())
assert!(buffered_packets.is_empty())
} else {
assert_eq!(buffered_packet_batches[0].1.len(), num_expected_unprocessed);
assert_eq!(buffered_packets[0].1.len(), num_expected_unprocessed);
}
}
poh_recorder
@@ -2889,12 +2876,12 @@ mod tests {
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
setup_conflicting_transactions(&ledger_path);
let num_conflicting_transactions = transactions.len();
let packet_batches = to_packet_batches(&transactions, 1);
assert_eq!(packet_batches.len(), num_conflicting_transactions);
for single_packet_batch in &packet_batches {
assert_eq!(single_packet_batch.packets.len(), 1);
let packets_vec = to_packets_chunked(&transactions, 1);
assert_eq!(packets_vec.len(), num_conflicting_transactions);
for single_packets in &packets_vec {
assert_eq!(single_packets.packets.len(), 1);
}
let mut buffered_packet_batches: UnprocessedPacketBatches = packet_batches
let mut buffered_packets: UnprocessedPackets = packets_vec
.clone()
.into_iter()
.map(|single_packets| (single_packets, vec![0], false))
@@ -2908,8 +2895,8 @@ mod tests {
continue_receiver.recv().unwrap();
});
// When the poh recorder has a bank, it should process all non conflicting buffered packets.
// Because each conflicting transaction is in it's own `Packet` within a `PacketBatch`, then
// each iteration of this loop will process one element of the batch per iteration of the
// Because each conflicting transaction is in it's own `Packet` within `packets_vec`, then
// each iteration of this loop will process one element of `packets_vec`per iteration of the
// loop.
let interrupted_iteration = 1;
poh_recorder.lock().unwrap().set_bank(&bank);
@@ -2924,7 +2911,7 @@ mod tests {
&Pubkey::default(),
std::u128::MAX,
&poh_recorder_,
&mut buffered_packet_batches,
&mut buffered_packets,
None,
&gossip_vote_sender,
test_fn,
@@ -2936,13 +2923,13 @@ mod tests {
// Check everything is correct. All indexes after `interrupted_iteration`
// should still be unprocessed
assert_eq!(
buffered_packet_batches.len(),
packet_batches[interrupted_iteration + 1..].len()
buffered_packets.len(),
packets_vec[interrupted_iteration + 1..].len()
);
for ((remaining_unprocessed_packet, _, _forwarded), original_packet) in
buffered_packet_batches
buffered_packets
.iter()
.zip(&packet_batches[interrupted_iteration + 1..])
.zip(&packets_vec[interrupted_iteration + 1..])
{
assert_eq!(
remaining_unprocessed_packet.packets[0],
@@ -2977,10 +2964,10 @@ mod tests {
#[test]
fn test_forwarder_budget() {
solana_logger::setup();
// Create `PacketBatch` with 1 unprocessed packet
let single_packet_batch = PacketBatch::new(vec![Packet::default()]);
let mut unprocessed_packets: UnprocessedPacketBatches =
vec![(single_packet_batch, vec![0], false)]
// Create `Packets` with 1 unprocessed element
let single_element_packets = Packets::new(vec![Packet::default()]);
let mut unprocessed_packets: UnprocessedPackets =
vec![(single_element_packets, vec![0], false)]
.into_iter()
.collect();
@@ -3026,16 +3013,14 @@ mod tests {
#[test]
fn test_push_unprocessed_batch_limit() {
solana_logger::setup();
// Create `PacketBatch` with 2 unprocessed packets
let new_packet_batch = PacketBatch::new(vec![Packet::default(); 2]);
let mut unprocessed_packets: UnprocessedPacketBatches =
vec![(new_packet_batch, vec![0, 1], false)]
.into_iter()
.collect();
// Create `Packets` with 2 unprocessed elements
let new_packets = Packets::new(vec![Packet::default(); 2]);
let mut unprocessed_packets: UnprocessedPackets =
vec![(new_packets, vec![0, 1], false)].into_iter().collect();
// Set the limit to 2
let batch_limit = 2;
// Create new unprocessed packets and add to a batch
let new_packet_batch = PacketBatch::new(vec![Packet::default()]);
// Create some new unprocessed packets
let new_packets = Packets::new(vec![Packet::default()]);
let packet_indexes = vec![];
let duplicates = Arc::new(Mutex::new((
@@ -3050,7 +3035,7 @@ mod tests {
// packets are not added to the unprocessed queue
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packet_batch.clone(),
new_packets.clone(),
packet_indexes,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
@@ -3069,7 +3054,7 @@ mod tests {
let packet_indexes = vec![0];
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packet_batch,
new_packets,
packet_indexes.clone(),
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
@@ -3085,7 +3070,7 @@ mod tests {
// Because we've reached the batch limit, old unprocessed packets are
// dropped and the new one is appended to the end
let new_packet_batch = PacketBatch::new(vec![Packet::from_data(
let new_packets = Packets::new(vec![Packet::from_data(
Some(&SocketAddr::from(([127, 0, 0, 1], 8001))),
42,
)
@@ -3093,7 +3078,7 @@ mod tests {
assert_eq!(unprocessed_packets.len(), batch_limit);
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packet_batch.clone(),
new_packets.clone(),
packet_indexes.clone(),
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
@@ -3103,10 +3088,7 @@ mod tests {
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(
unprocessed_packets[1].0.packets[0],
new_packet_batch.packets[0]
);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_packet_batches_count, 1);
assert_eq!(dropped_packets_count, 2);
assert_eq!(newly_buffered_packets_count, 2);
@@ -3114,7 +3096,7 @@ mod tests {
// Check duplicates are dropped (newly buffered shouldn't change)
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packet_batch.clone(),
new_packets.clone(),
packet_indexes,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
@@ -3124,10 +3106,7 @@ mod tests {
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(
unprocessed_packets[1].0.packets[0],
new_packet_batch.packets[0]
);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_packet_batches_count, 1);
assert_eq!(dropped_packets_count, 2);
assert_eq!(newly_buffered_packets_count, 2);
@@ -3150,19 +3129,19 @@ mod tests {
fn make_test_packets(
transactions: Vec<Transaction>,
vote_indexes: Vec<usize>,
) -> (PacketBatch, Vec<usize>) {
) -> (Packets, Vec<usize>) {
let capacity = transactions.len();
let mut packet_batch = PacketBatch::with_capacity(capacity);
let mut packets = Packets::with_capacity(capacity);
let mut packet_indexes = Vec::with_capacity(capacity);
packet_batch.packets.resize(capacity, Packet::default());
packets.packets.resize(capacity, Packet::default());
for (index, tx) in transactions.iter().enumerate() {
Packet::populate_packet(&mut packet_batch.packets[index], None, tx).ok();
Packet::populate_packet(&mut packets.packets[index], None, tx).ok();
packet_indexes.push(index);
}
for index in vote_indexes.iter() {
packet_batch.packets[*index].meta.is_simple_vote_tx = true;
packets.packets[*index].meta.is_simple_vote_tx = true;
}
(packet_batch, packet_indexes)
(packets, packet_indexes)
}
#[test]
@@ -3183,13 +3162,13 @@ mod tests {
// packets with no votes
{
let vote_indexes = vec![];
let (packet_batch, packet_indexes) =
let (packets, packet_indexes) =
make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes);
let mut votes_only = false;
let (txs, tx_packet_index, _retryable_packet_indexes) =
BankingStage::transactions_from_packets(
&packet_batch,
&packets,
&packet_indexes,
&Arc::new(feature_set::FeatureSet::default()),
&RwLock::new(CostTracker::default()).read().unwrap(),
@@ -3204,7 +3183,7 @@ mod tests {
votes_only = true;
let (txs, tx_packet_index, _retryable_packet_indexes) =
BankingStage::transactions_from_packets(
&packet_batch,
&packets,
&packet_indexes,
&Arc::new(feature_set::FeatureSet::default()),
&RwLock::new(CostTracker::default()).read().unwrap(),
@@ -3220,7 +3199,7 @@ mod tests {
// packets with some votes
{
let vote_indexes = vec![0, 2];
let (packet_batch, packet_indexes) = make_test_packets(
let (packets, packet_indexes) = make_test_packets(
vec![vote_tx.clone(), transfer_tx, vote_tx.clone()],
vote_indexes,
);
@@ -3228,7 +3207,7 @@ mod tests {
let mut votes_only = false;
let (txs, tx_packet_index, _retryable_packet_indexes) =
BankingStage::transactions_from_packets(
&packet_batch,
&packets,
&packet_indexes,
&Arc::new(feature_set::FeatureSet::default()),
&RwLock::new(CostTracker::default()).read().unwrap(),
@@ -3243,7 +3222,7 @@ mod tests {
votes_only = true;
let (txs, tx_packet_index, _retryable_packet_indexes) =
BankingStage::transactions_from_packets(
&packet_batch,
&packets,
&packet_indexes,
&Arc::new(feature_set::FeatureSet::default()),
&RwLock::new(CostTracker::default()).read().unwrap(),
@@ -3259,7 +3238,7 @@ mod tests {
// packets with all votes
{
let vote_indexes = vec![0, 1, 2];
let (packet_batch, packet_indexes) = make_test_packets(
let (packets, packet_indexes) = make_test_packets(
vec![vote_tx.clone(), vote_tx.clone(), vote_tx],
vote_indexes,
);
@@ -3267,7 +3246,7 @@ mod tests {
let mut votes_only = false;
let (txs, tx_packet_index, _retryable_packet_indexes) =
BankingStage::transactions_from_packets(
&packet_batch,
&packets,
&packet_indexes,
&Arc::new(feature_set::FeatureSet::default()),
&RwLock::new(CostTracker::default()).read().unwrap(),
@@ -3282,7 +3261,7 @@ mod tests {
votes_only = true;
let (txs, tx_packet_index, _retryable_packet_indexes) =
BankingStage::transactions_from_packets(
&packet_batch,
&packets,
&packet_indexes,
&Arc::new(feature_set::FeatureSet::default()),
&RwLock::new(CostTracker::default()).read().unwrap(),

View File

@@ -20,7 +20,7 @@ use {
},
solana_ledger::blockstore::Blockstore,
solana_metrics::inc_new_counter_debug,
solana_perf::packet::{self, PacketBatch},
solana_perf::packet::{self, Packets},
solana_poh::poh_recorder::PohRecorder,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
@@ -56,9 +56,8 @@ use {
// Map from a vote account to the authorized voter for an epoch
pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>;
pub type VotedHashUpdates = HashMap<Hash, Vec<Pubkey>>;
pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Slot, PacketBatch)>>;
pub type VerifiedLabelVotePacketsReceiver =
CrossbeamReceiver<Vec<(CrdsValueLabel, Slot, PacketBatch)>>;
pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Slot, Packets)>>;
pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Slot, Packets)>>;
pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>;
pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec<Slot>)>;
@@ -254,7 +253,7 @@ impl ClusterInfoVoteListener {
pub fn new(
exit: &Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>,
verified_packets_sender: CrossbeamSender<Vec<PacketBatch>>,
verified_packets_sender: CrossbeamSender<Vec<Packets>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>,
@@ -353,35 +352,35 @@ impl ClusterInfoVoteListener {
fn verify_votes(
votes: Vec<Transaction>,
labels: Vec<CrdsValueLabel>,
) -> (Vec<Transaction>, Vec<(CrdsValueLabel, Slot, PacketBatch)>) {
let mut packet_batches = packet::to_packet_batches(&votes, 1);
) -> (Vec<Transaction>, Vec<(CrdsValueLabel, Slot, Packets)>) {
let mut msgs = packet::to_packets_chunked(&votes, 1);
// Votes should already be filtered by this point.
let reject_non_vote = false;
sigverify::ed25519_verify_cpu(&mut packet_batches, reject_non_vote);
sigverify::ed25519_verify_cpu(&mut msgs, reject_non_vote);
let (vote_txs, packet_batch) = izip!(labels.into_iter(), votes.into_iter(), packet_batches)
.filter_map(|(label, vote, packet_batch)| {
let (vote_txs, packets) = izip!(labels.into_iter(), votes.into_iter(), msgs,)
.filter_map(|(label, vote, packet)| {
let slot = vote_transaction::parse_vote_transaction(&vote)
.and_then(|(_, vote, _)| vote.slots.last().copied())?;
// to_packet_batches() above split into 1 packet long chunks
assert_eq!(packet_batch.packets.len(), 1);
if !packet_batch.packets[0].meta.discard {
Some((vote, (label, slot, packet_batch)))
// to_packets_chunked() above split into 1 packet long chunks
assert_eq!(packet.packets.len(), 1);
if !packet.packets[0].meta.discard {
Some((vote, (label, slot, packet)))
} else {
None
}
})
.unzip();
(vote_txs, packet_batch)
(vote_txs, packets)
}
fn bank_send_loop(
exit: Arc<AtomicBool>,
verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver,
poh_recorder: Arc<Mutex<PohRecorder>>,
verified_packets_sender: &CrossbeamSender<Vec<PacketBatch>>,
verified_packets_sender: &CrossbeamSender<Vec<Packets>>,
) -> Result<()> {
let mut verified_vote_packets = VerifiedVotePackets::default();
let mut time_since_lock = Instant::now();
@@ -415,11 +414,10 @@ impl ClusterInfoVoteListener {
let bank = poh_recorder.lock().unwrap().bank();
if let Some(bank) = bank {
let last_version = bank.last_vote_sync.load(Ordering::Relaxed);
let (new_version, packet_batch) =
verified_vote_packets.get_latest_votes(last_version);
inc_new_counter_info!("bank_send_loop_batch_size", packet_batch.packets.len());
let (new_version, msgs) = verified_vote_packets.get_latest_votes(last_version);
inc_new_counter_info!("bank_send_loop_batch_size", msgs.packets.len());
inc_new_counter_info!("bank_send_loop_num_batches", 1);
verified_packets_sender.send(vec![packet_batch])?;
verified_packets_sender.send(vec![msgs])?;
#[allow(deprecated)]
bank.last_vote_sync.compare_and_swap(
last_version,
@@ -878,9 +876,9 @@ mod tests {
use bincode::serialized_size;
info!("max vote size {}", serialized_size(&vote_tx).unwrap());
let packet_batches = packet::to_packet_batches(&[vote_tx], 1); // panics if won't fit
let msgs = packet::to_packets_chunked(&[vote_tx], 1); // panics if won't fit
assert_eq!(packet_batches.len(), 1);
assert_eq!(msgs.len(), 1);
}
fn run_vote_contains_authorized_voter(hash: Option<Hash>) {
@@ -1708,11 +1706,8 @@ mod tests {
assert!(packets.is_empty());
}
fn verify_packets_len(packets: &[(CrdsValueLabel, Slot, PacketBatch)], ref_value: usize) {
let num_packets: usize = packets
.iter()
.map(|(_, _, batch)| batch.packets.len())
.sum();
fn verify_packets_len(packets: &[(CrdsValueLabel, Slot, Packets)], ref_value: usize) {
let num_packets: usize = packets.iter().map(|(_, _, p)| p.packets.len()).sum();
assert_eq!(num_packets, ref_value);
}

View File

@@ -6,10 +6,10 @@ use {
result::{Error, Result},
},
solana_metrics::{inc_new_counter_debug, inc_new_counter_info},
solana_perf::{packet::PacketBatchRecycler, recycler::Recycler},
solana_perf::{packet::PacketsRecycler, recycler::Recycler},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::clock::DEFAULT_TICKS_PER_SLOT,
solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender},
solana_streamer::streamer::{self, PacketReceiver, PacketSender},
std::{
net::UdpSocket,
sync::{
@@ -34,7 +34,7 @@ impl FetchStage {
exit: &Arc<AtomicBool>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
) -> (Self, PacketBatchReceiver, PacketBatchReceiver) {
) -> (Self, PacketReceiver, PacketReceiver) {
let (sender, receiver) = channel();
let (vote_sender, vote_receiver) = channel();
(
@@ -58,8 +58,8 @@ impl FetchStage {
tpu_forwards_sockets: Vec<UdpSocket>,
tpu_vote_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
sender: &PacketBatchSender,
vote_sender: &PacketBatchSender,
sender: &PacketSender,
vote_sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
) -> Self {
@@ -79,18 +79,18 @@ impl FetchStage {
}
fn handle_forwarded_packets(
recvr: &PacketBatchReceiver,
sendr: &PacketBatchSender,
recvr: &PacketReceiver,
sendr: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> Result<()> {
let packet_batch = recvr.recv()?;
let mut num_packets = packet_batch.packets.len();
let mut packet_batches = vec![packet_batch];
while let Ok(packet_batch) = recvr.try_recv() {
num_packets += packet_batch.packets.len();
packet_batches.push(packet_batch);
let msgs = recvr.recv()?;
let mut len = msgs.packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
len += more.packets.len();
batch.push(more);
// Read at most 1K transactions in a loop
if num_packets > 1024 {
if len > 1024 {
break;
}
}
@@ -100,14 +100,14 @@ impl FetchStage {
.unwrap()
.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET.saturating_mul(DEFAULT_TICKS_PER_SLOT))
{
inc_new_counter_debug!("fetch_stage-honor_forwards", num_packets);
for packet_batch in packet_batches {
if sendr.send(packet_batch).is_err() {
inc_new_counter_debug!("fetch_stage-honor_forwards", len);
for packets in batch {
if sendr.send(packets).is_err() {
return Err(Error::Send);
}
}
} else {
inc_new_counter_info!("fetch_stage-discard_forwards", num_packets);
inc_new_counter_info!("fetch_stage-discard_forwards", len);
}
Ok(())
@@ -118,12 +118,12 @@ impl FetchStage {
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
tpu_vote_sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
sender: &PacketBatchSender,
vote_sender: &PacketBatchSender,
sender: &PacketSender,
vote_sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
coalesce_ms: u64,
) -> Self {
let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);
let recycler: PacketsRecycler = Recycler::warmed(1000, 1024);
let tpu_threads = sockets.into_iter().map(|socket| {
streamer::receiver(

View File

@@ -21,7 +21,7 @@ use {
blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache, shred::Shred,
},
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_perf::packet::Packets,
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_runtime::{bank::Bank, bank_forks::BankForks},
@@ -438,7 +438,7 @@ impl RetransmitStage {
cluster_info: Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<PacketBatch>>,
verified_receiver: Receiver<Vec<Packets>>,
exit: Arc<AtomicBool>,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
@@ -603,10 +603,10 @@ mod tests {
let shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
// it should send this over the sockets.
retransmit_sender.send(vec![shred]).unwrap();
let mut packet_batch = PacketBatch::new(vec![]);
solana_streamer::packet::recv_from(&mut packet_batch, &me_retransmit, 1).unwrap();
assert_eq!(packet_batch.packets.len(), 1);
assert!(!packet_batch.packets[0].meta.repair);
let mut packets = Packets::new(vec![]);
solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap();
assert_eq!(packets.packets.len(), 1);
assert!(!packets.packets[0].meta.repair);
}
#[test]

View File

@@ -23,14 +23,14 @@ use {
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_debug,
solana_perf::packet::{limited_deserialize, PacketBatch, PacketBatchRecycler},
solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler},
solana_sdk::{
clock::Slot,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::duration_as_ms,
},
solana_streamer::streamer::{PacketBatchReceiver, PacketBatchSender},
solana_streamer::streamer::{PacketReceiver, PacketSender},
std::{
collections::HashSet,
net::SocketAddr,
@@ -183,12 +183,12 @@ impl ServeRepair {
fn handle_repair(
me: &Arc<RwLock<Self>>,
recycler: &PacketBatchRecycler,
recycler: &PacketsRecycler,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
request: RepairProtocol,
stats: &mut ServeRepairStats,
) -> Option<PacketBatch> {
) -> Option<Packets> {
let now = Instant::now();
//TODO verify from is signed
@@ -264,10 +264,10 @@ impl ServeRepair {
/// Process messages from the network
fn run_listen(
obj: &Arc<RwLock<Self>>,
recycler: &PacketBatchRecycler,
recycler: &PacketsRecycler,
blockstore: Option<&Arc<Blockstore>>,
requests_receiver: &PacketBatchReceiver,
response_sender: &PacketBatchSender,
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
max_packets: &mut usize,
) -> Result<()> {
@@ -336,12 +336,12 @@ impl ServeRepair {
pub fn listen(
me: Arc<RwLock<Self>>,
blockstore: Option<Arc<Blockstore>>,
requests_receiver: PacketBatchReceiver,
response_sender: PacketBatchSender,
requests_receiver: PacketReceiver,
response_sender: PacketSender,
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
let recycler = PacketBatchRecycler::default();
let recycler = PacketsRecycler::default();
Builder::new()
.name("solana-repair-listen".to_string())
.spawn(move || {
@@ -376,14 +376,14 @@ impl ServeRepair {
fn handle_packets(
me: &Arc<RwLock<Self>>,
recycler: &PacketBatchRecycler,
recycler: &PacketsRecycler,
blockstore: Option<&Arc<Blockstore>>,
packet_batch: PacketBatch,
response_sender: &PacketBatchSender,
packets: Packets,
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
) {
// iter over the packets
packet_batch.packets.iter().for_each(|packet| {
packets.packets.iter().for_each(|packet| {
let from_addr = packet.meta.addr();
limited_deserialize(&packet.data[..packet.meta.size])
.into_iter()
@@ -526,7 +526,7 @@ impl ServeRepair {
}
fn run_window_request(
recycler: &PacketBatchRecycler,
recycler: &PacketsRecycler,
from: &ContactInfo,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
@@ -534,7 +534,7 @@ impl ServeRepair {
slot: Slot,
shred_index: u64,
nonce: Nonce,
) -> Option<PacketBatch> {
) -> Option<Packets> {
if let Some(blockstore) = blockstore {
// Try to find the requested index in one of the slots
let packet = repair_response::repair_response_packet(
@@ -547,7 +547,7 @@ impl ServeRepair {
if let Some(packet) = packet {
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
return Some(PacketBatch::new_unpinned_with_recycler_data(
return Some(Packets::new_unpinned_with_recycler_data(
recycler,
"run_window_request",
vec![packet],
@@ -568,13 +568,13 @@ impl ServeRepair {
}
fn run_highest_window_request(
recycler: &PacketBatchRecycler,
recycler: &PacketsRecycler,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
slot: Slot,
highest_index: u64,
nonce: Nonce,
) -> Option<PacketBatch> {
) -> Option<Packets> {
let blockstore = blockstore?;
// Try to find the requested index in one of the slots
let meta = blockstore.meta(slot).ok()??;
@@ -587,7 +587,7 @@ impl ServeRepair {
from_addr,
nonce,
)?;
return Some(PacketBatch::new_unpinned_with_recycler_data(
return Some(Packets::new_unpinned_with_recycler_data(
recycler,
"run_highest_window_request",
vec![packet],
@@ -597,14 +597,14 @@ impl ServeRepair {
}
fn run_orphan(
recycler: &PacketBatchRecycler,
recycler: &PacketsRecycler,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
mut slot: Slot,
max_responses: usize,
nonce: Nonce,
) -> Option<PacketBatch> {
let mut res = PacketBatch::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan");
) -> Option<Packets> {
let mut res = Packets::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan");
if let Some(blockstore) = blockstore {
// Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blockstore.meta(slot) {
@@ -661,7 +661,7 @@ mod tests {
/// test run_window_request responds with the right shred, and do not overrun
fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) {
let recycler = PacketBatchRecycler::default();
let recycler = PacketsRecycler::default();
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
@@ -731,7 +731,7 @@ mod tests {
/// test window requests respond with the right shred, and do not overrun
fn run_window_request(slot: Slot, nonce: Nonce) {
let recycler = PacketBatchRecycler::default();
let recycler = PacketsRecycler::default();
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
@@ -900,7 +900,7 @@ mod tests {
fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) {
solana_logger::setup();
let recycler = PacketBatchRecycler::default();
let recycler = PacketsRecycler::default();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
@@ -974,7 +974,7 @@ mod tests {
#[test]
fn run_orphan_corrupted_shred_size() {
solana_logger::setup();
let recycler = PacketBatchRecycler::default();
let recycler = PacketsRecycler::default();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());

View File

@@ -6,12 +6,12 @@ use {
solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats},
solana_perf::{
cuda_runtime::PinnedVec,
packet::{Packet, PacketBatchRecycler},
packet::{Packet, PacketsRecycler},
recycler::Recycler,
},
solana_runtime::bank_forks::BankForks,
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender},
solana_streamer::streamer::{self, PacketReceiver, PacketSender},
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, mpsc::channel, Arc, RwLock},
@@ -63,8 +63,8 @@ impl ShredFetchStage {
// updates packets received on a channel and sends them on another channel
fn modify_packets<F>(
recvr: PacketBatchReceiver,
sendr: PacketBatchSender,
recvr: PacketReceiver,
sendr: PacketSender,
bank_forks: Option<Arc<RwLock<BankForks>>>,
name: &'static str,
modify: F,
@@ -83,7 +83,7 @@ impl ShredFetchStage {
let mut stats = ShredFetchStats::default();
let mut packet_hasher = PacketHasher::default();
while let Some(mut packet_batch) = recvr.iter().next() {
while let Some(mut p) = recvr.iter().next() {
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now();
packet_hasher.reset();
@@ -97,8 +97,8 @@ impl ShredFetchStage {
slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch());
}
}
stats.shred_count += packet_batch.packets.len();
packet_batch.packets.iter_mut().for_each(|mut packet| {
stats.shred_count += p.packets.len();
p.packets.iter_mut().for_each(|mut packet| {
Self::process_packet(
&mut packet,
&mut shreds_received,
@@ -124,7 +124,7 @@ impl ShredFetchStage {
stats = ShredFetchStats::default();
last_stats = Instant::now();
}
if sendr.send(packet_batch).is_err() {
if sendr.send(p).is_err() {
break;
}
}
@@ -133,7 +133,7 @@ impl ShredFetchStage {
fn packet_modifier<F>(
sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
sender: PacketBatchSender,
sender: PacketSender,
recycler: Recycler<PinnedVec<Packet>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
name: &'static str,
@@ -169,11 +169,11 @@ impl ShredFetchStage {
sockets: Vec<Arc<UdpSocket>>,
forward_sockets: Vec<Arc<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
sender: &PacketBatchSender,
sender: &PacketSender,
bank_forks: Option<Arc<RwLock<BankForks>>>,
exit: &Arc<AtomicBool>,
) -> Self {
let recycler: PacketBatchRecycler = Recycler::warmed(100, 1024);
let recycler: PacketsRecycler = Recycler::warmed(100, 1024);
let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
sockets,

View File

@@ -5,11 +5,11 @@
//!
pub use solana_perf::sigverify::{
count_packets_in_batches, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset,
batch_size, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset,
};
use {
crate::sigverify_stage::SigVerifier,
solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify},
solana_perf::{cuda_runtime::PinnedVec, packet::Packets, recycler::Recycler, sigverify},
};
#[derive(Clone)]
@@ -40,13 +40,13 @@ impl Default for TransactionSigVerifier {
}
impl SigVerifier for TransactionSigVerifier {
fn verify_batches(&self, mut batches: Vec<PacketBatch>) -> Vec<PacketBatch> {
fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> {
sigverify::ed25519_verify(
&mut batches,
&mut batch,
&self.recycler,
&self.recycler_out,
self.reject_non_vote,
);
batches
batch
}
}

View File

@@ -5,7 +5,7 @@ use {
leader_schedule_cache::LeaderScheduleCache, shred::Shred,
sigverify_shreds::verify_shreds_gpu,
},
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_perf::{self, packet::Packets, recycler_cache::RecyclerCache},
solana_runtime::bank_forks::BankForks,
std::{
collections::{HashMap, HashSet},
@@ -32,7 +32,7 @@ impl ShredSigVerifier {
recycler_cache: RecyclerCache::warmed(),
}
}
fn read_slots(batches: &[PacketBatch]) -> HashSet<u64> {
fn read_slots(batches: &[Packets]) -> HashSet<u64> {
batches
.iter()
.flat_map(|batch| batch.packets.iter().filter_map(Shred::get_slot_from_packet))
@@ -41,7 +41,7 @@ impl ShredSigVerifier {
}
impl SigVerifier for ShredSigVerifier {
fn verify_batches(&self, mut batches: Vec<PacketBatch>) -> Vec<PacketBatch> {
fn verify_batch(&self, mut batches: Vec<Packets>) -> Vec<Packets> {
let r_bank = self.bank_forks.read().unwrap().working_bank();
let slots: HashSet<u64> = Self::read_slots(&batches);
let mut leader_slots: HashMap<u64, [u8; 32]> = slots
@@ -88,13 +88,13 @@ pub mod tests {
0,
0xc0de,
);
let mut batches = [PacketBatch::default(), PacketBatch::default()];
let mut batch = [Packets::default(), Packets::default()];
let keypair = Keypair::new();
Shredder::sign_shred(&keypair, &mut shred);
batches[0].packets.resize(1, Packet::default());
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batch[0].packets.resize(1, Packet::default());
batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batch[0].packets[0].meta.size = shred.payload.len();
let mut shred = Shred::new_from_data(
0xc0de_dead,
@@ -108,16 +108,16 @@ pub mod tests {
0xc0de,
);
Shredder::sign_shred(&keypair, &mut shred);
batches[1].packets.resize(1, Packet::default());
batches[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[1].packets[0].meta.size = shred.payload.len();
batch[1].packets.resize(1, Packet::default());
batch[1].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batch[1].packets[0].meta.size = shred.payload.len();
let expected: HashSet<u64> = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect();
assert_eq!(ShredSigVerifier::read_slots(&batches), expected);
assert_eq!(ShredSigVerifier::read_slots(&batch), expected);
}
#[test]
fn test_sigverify_shreds_verify_batches() {
fn test_sigverify_shreds_verify_batch() {
let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey();
let bank =
@@ -126,8 +126,8 @@ pub mod tests {
let bf = Arc::new(RwLock::new(BankForks::new(bank)));
let verifier = ShredSigVerifier::new(bf, cache);
let mut batches = vec![PacketBatch::default()];
batches[0].packets.resize(2, Packet::default());
let mut batch = vec![Packets::default()];
batch[0].packets.resize(2, Packet::default());
let mut shred = Shred::new_from_data(
0,
@@ -141,8 +141,8 @@ pub mod tests {
0xc0de,
);
Shredder::sign_shred(&leader_keypair, &mut shred);
batches[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[0].meta.size = shred.payload.len();
batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batch[0].packets[0].meta.size = shred.payload.len();
let mut shred = Shred::new_from_data(
0,
@@ -157,10 +157,10 @@ pub mod tests {
);
let wrong_keypair = Keypair::new();
Shredder::sign_shred(&wrong_keypair, &mut shred);
batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batches[0].packets[1].meta.size = shred.payload.len();
batch[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batch[0].packets[1].meta.size = shred.payload.len();
let rv = verifier.verify_batches(batches);
let rv = verifier.verify_batch(batch);
assert!(!rv[0].packets[0].meta.discard);
assert!(rv[0].packets[1].meta.discard);
}

View File

@@ -9,9 +9,9 @@ use {
crate::sigverify,
crossbeam_channel::{SendError, Sender as CrossbeamSender},
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_perf::packet::Packets,
solana_sdk::timing,
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
solana_streamer::streamer::{self, PacketReceiver, StreamerError},
std::{
collections::HashMap,
sync::mpsc::{Receiver, RecvTimeoutError},
@@ -26,7 +26,7 @@ const MAX_SIGVERIFY_BATCH: usize = 10_000;
#[derive(Error, Debug)]
pub enum SigVerifyServiceError {
#[error("send packets batch error")]
Send(#[from] SendError<Vec<PacketBatch>>),
Send(#[from] SendError<Vec<Packets>>),
#[error("streamer error")]
Streamer(#[from] StreamerError),
@@ -39,7 +39,7 @@ pub struct SigVerifyStage {
}
pub trait SigVerifier {
fn verify_batches(&self, batches: Vec<PacketBatch>) -> Vec<PacketBatch>;
fn verify_batch(&self, batch: Vec<Packets>) -> Vec<Packets>;
}
#[derive(Default, Clone)]
@@ -49,7 +49,7 @@ pub struct DisabledSigVerifier {}
struct SigVerifierStats {
recv_batches_us_hist: histogram::Histogram, // time to call recv_batch
verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
batches_hist: histogram::Histogram, // number of packet batches per verify call
batches_hist: histogram::Histogram, // number of Packets structures per verify call
packets_hist: histogram::Histogram, // number of packets per verify call
total_batches: usize,
total_packets: usize,
@@ -122,24 +122,24 @@ impl SigVerifierStats {
}
impl SigVerifier for DisabledSigVerifier {
fn verify_batches(&self, mut batches: Vec<PacketBatch>) -> Vec<PacketBatch> {
sigverify::ed25519_verify_disabled(&mut batches);
batches
fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> {
sigverify::ed25519_verify_disabled(&mut batch);
batch
}
}
impl SigVerifyStage {
#[allow(clippy::new_ret_no_self)]
pub fn new<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: Receiver<PacketBatch>,
verified_sender: CrossbeamSender<Vec<PacketBatch>>,
packet_receiver: Receiver<Packets>,
verified_sender: CrossbeamSender<Vec<Packets>>,
verifier: T,
) -> Self {
let thread_hdl = Self::verifier_services(packet_receiver, verified_sender, verifier);
Self { thread_hdl }
}
pub fn discard_excess_packets(batches: &mut Vec<PacketBatch>, max_packets: usize) {
pub fn discard_excess_packets(batches: &mut Vec<Packets>, max_packets: usize) {
let mut received_ips = HashMap::new();
for (batch_index, batch) in batches.iter().enumerate() {
for (packet_index, packets) in batch.packets.iter().enumerate() {
@@ -169,8 +169,8 @@ impl SigVerifyStage {
}
fn verifier<T: SigVerifier>(
recvr: &PacketBatchReceiver,
sendr: &CrossbeamSender<Vec<PacketBatch>>,
recvr: &PacketReceiver,
sendr: &CrossbeamSender<Vec<Packets>>,
verifier: &T,
stats: &mut SigVerifierStats,
) -> Result<()> {
@@ -182,7 +182,7 @@ impl SigVerifyStage {
if len > MAX_SIGVERIFY_BATCH {
Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH);
}
sendr.send(verifier.verify_batches(batches))?;
sendr.send(verifier.verify_batch(batches))?;
verify_batch_time.stop();
debug!(
@@ -219,8 +219,8 @@ impl SigVerifyStage {
}
fn verifier_service<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: PacketBatchReceiver,
verified_sender: CrossbeamSender<Vec<PacketBatch>>,
packet_receiver: PacketReceiver,
verified_sender: CrossbeamSender<Vec<Packets>>,
verifier: &T,
) -> JoinHandle<()> {
let verifier = verifier.clone();
@@ -255,8 +255,8 @@ impl SigVerifyStage {
}
fn verifier_services<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: PacketBatchReceiver,
verified_sender: CrossbeamSender<Vec<PacketBatch>>,
packet_receiver: PacketReceiver,
verified_sender: CrossbeamSender<Vec<Packets>>,
verifier: T,
) -> JoinHandle<()> {
Self::verifier_service(packet_receiver, verified_sender, &verifier)
@@ -271,12 +271,11 @@ impl SigVerifyStage {
mod tests {
use {super::*, solana_perf::packet::Packet};
fn count_non_discard(packet_batches: &[PacketBatch]) -> usize {
packet_batches
fn count_non_discard(packets: &[Packets]) -> usize {
packets
.iter()
.map(|batch| {
batch
.packets
.map(|pp| {
pp.packets
.iter()
.map(|p| if p.meta.discard { 0 } else { 1 })
.sum::<usize>()
@@ -287,14 +286,14 @@ mod tests {
#[test]
fn test_packet_discard() {
solana_logger::setup();
let mut batch = PacketBatch::default();
batch.packets.resize(10, Packet::default());
batch.packets[3].meta.addr = [1u16; 8];
let mut batches = vec![batch];
let mut p = Packets::default();
p.packets.resize(10, Packet::default());
p.packets[3].meta.addr = [1u16; 8];
let mut packets = vec![p];
let max = 3;
SigVerifyStage::discard_excess_packets(&mut batches, max);
assert_eq!(count_non_discard(&batches), max);
assert!(!batches[0].packets[0].meta.discard);
assert!(!batches[0].packets[3].meta.discard);
SigVerifyStage::discard_excess_packets(&mut packets, max);
assert_eq!(count_non_discard(&packets), max);
assert!(!packets[0].packets[0].meta.discard);
assert!(!packets[0].packets[3].meta.discard);
}
}

View File

@@ -1,7 +1,7 @@
use {
crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result},
solana_gossip::crds_value::CrdsValueLabel,
solana_perf::packet::PacketBatch,
solana_perf::packet::Packets,
solana_sdk::clock::Slot,
std::{
collections::{hash_map::Entry, HashMap},
@@ -10,7 +10,7 @@ use {
};
#[derive(Default)]
pub struct VerifiedVotePackets(HashMap<CrdsValueLabel, (u64, Slot, PacketBatch)>);
pub struct VerifiedVotePackets(HashMap<CrdsValueLabel, (u64, Slot, Packets)>);
impl VerifiedVotePackets {
pub fn receive_and_process_vote_packets(
@@ -41,11 +41,11 @@ impl VerifiedVotePackets {
}
#[cfg(test)]
fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Slot, PacketBatch)> {
fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Slot, Packets)> {
self.0.get(key)
}
pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, PacketBatch) {
pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Packets) {
let mut new_update_version = last_update_version;
let mut votes = HashMap::new();
for (label, (version, slot, packets)) in &self.0 {
@@ -70,7 +70,7 @@ impl VerifiedVotePackets {
.flat_map(|(_, (_, packets))| &packets.packets)
.cloned()
.collect();
(new_update_version, PacketBatch::new(packets))
(new_update_version, Packets::new(packets))
}
}
@@ -98,14 +98,14 @@ mod tests {
..Packet::default()
};
let none_empty_packets = PacketBatch::new(vec![data, Packet::default()]);
let none_empty_packets = Packets::new(vec![data, Packet::default()]);
verified_vote_packets
.0
.insert(label1, (2, 42, none_empty_packets));
verified_vote_packets
.0
.insert(label2, (1, 23, PacketBatch::default()));
.insert(label2, (1, 23, Packets::default()));
// Both updates have timestamps greater than 0, so both should be returned
let (new_update_version, updates) = verified_vote_packets.get_latest_votes(0);
@@ -132,9 +132,9 @@ mod tests {
let label1 = CrdsValueLabel::Vote(0, pubkey);
let label2 = CrdsValueLabel::Vote(1, pubkey);
let mut update_version = 0;
s.send(vec![(label1.clone(), 17, PacketBatch::default())])
s.send(vec![(label1.clone(), 17, Packets::default())])
.unwrap();
s.send(vec![(label2.clone(), 23, PacketBatch::default())])
s.send(vec![(label2.clone(), 23, Packets::default())])
.unwrap();
let data = Packet {
@@ -145,7 +145,7 @@ mod tests {
..Packet::default()
};
let later_packets = PacketBatch::new(vec![data, Packet::default()]);
let later_packets = Packets::new(vec![data, Packet::default()]);
s.send(vec![(label1.clone(), 42, later_packets)]).unwrap();
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
verified_vote_packets
@@ -180,7 +180,7 @@ mod tests {
);
// Test timestamp for next batch overwrites the original
s.send(vec![(label2.clone(), 51, PacketBatch::default())])
s.send(vec![(label2.clone(), 51, Packets::default())])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, &mut update_version, true)

View File

@@ -22,7 +22,7 @@ use {
},
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_debug, inc_new_counter_error},
solana_perf::packet::{Packet, PacketBatch},
solana_perf::packet::{Packet, Packets},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey},
@@ -348,7 +348,7 @@ fn recv_window<F>(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
retransmit_sender: &Sender<Vec<Shred>>,
shred_filter: F,
thread_pool: &ThreadPool,
@@ -454,7 +454,7 @@ impl WindowService {
pub(crate) fn new<F>(
blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
retransmit_sender: Sender<Vec<Shred>>,
repair_socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
@@ -624,7 +624,7 @@ impl WindowService {
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
shred_filter: F,
bank_forks: Arc<RwLock<BankForks>>,
retransmit_sender: Sender<Vec<Shred>>,