- move unprocessed_packet_batches from BankingStage to its own (#23508)

module
- deserialize packets during receving and buffering
This commit is contained in:
Tao Zhu
2022-03-10 12:47:46 -06:00
committed by GitHub
parent 3c6840050c
commit 35d1235ed0
4 changed files with 411 additions and 259 deletions

View File

@ -12,6 +12,7 @@ use {
banking_stage::{BankingStage, BankingStageStats}, banking_stage::{BankingStage, BankingStageStats},
leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker,
qos_service::QosService, qos_service::QosService,
unprocessed_packet_batches::*,
}, },
solana_entry::entry::{next_hash, Entry}, solana_entry::entry::{next_hash, Entry},
solana_gossip::cluster_info::{ClusterInfo, Node}, solana_gossip::cluster_info::{ClusterInfo, Node},
@ -82,7 +83,11 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
let mut packet_batches = VecDeque::new(); let mut packet_batches = VecDeque::new();
for batch in batches { for batch in batches {
let batch_len = batch.packets.len(); let batch_len = batch.packets.len();
packet_batches.push_back((batch, vec![0usize; batch_len], false)); packet_batches.push_back(DeserializedPacketBatch::new(
batch,
vec![0usize; batch_len],
false,
));
} }
let (s, _r) = unbounded(); let (s, _r) = unbounded();
// This tests the performance of buffering packets. // This tests the performance of buffering packets.

View File

@ -8,6 +8,7 @@ use {
LeaderExecuteAndCommitTimings, RecordTransactionsTimings, LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
}, },
qos_service::QosService, qos_service::QosService,
unprocessed_packet_batches::*,
}, },
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
histogram::Histogram, histogram::Histogram,
@ -45,8 +46,6 @@ use {
feature_set, feature_set,
message::Message, message::Message,
pubkey::Pubkey, pubkey::Pubkey,
short_vec::decode_shortu16_len,
signature::Signature,
timing::{duration_as_ms, timestamp, AtomicInterval}, timing::{duration_as_ms, timestamp, AtomicInterval},
transaction::{ transaction::{
self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction, self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction,
@ -60,7 +59,6 @@ use {
cmp, cmp,
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
env, env,
mem::size_of,
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
sync::{ sync::{
atomic::{AtomicU64, AtomicUsize, Ordering}, atomic::{AtomicU64, AtomicUsize, Ordering},
@ -71,12 +69,6 @@ use {
}, },
}; };
/// (packets, valid_indexes, forwarded)
/// Batch of packets with a list of which are valid and if this batch has been forwarded.
type PacketBatchAndOffsets = (PacketBatch, Vec<usize>, bool);
pub type UnprocessedPacketBatches = VecDeque<PacketBatchAndOffsets>;
/// Transaction forwarding /// Transaction forwarding
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2;
pub const HOLD_TRANSACTIONS_SLOT_OFFSET: u64 = 20; pub const HOLD_TRANSACTIONS_SLOT_OFFSET: u64 = 20;
@ -458,12 +450,15 @@ impl BankingStage {
} }
fn filter_valid_packets_for_forwarding<'a>( fn filter_valid_packets_for_forwarding<'a>(
packet_batches: impl Iterator<Item = &'a PacketBatchAndOffsets>, packet_batches: impl Iterator<Item = &'a DeserializedPacketBatch>,
) -> Vec<&'a Packet> { ) -> Vec<&'a Packet> {
packet_batches packet_batches
.filter(|(_batch, _indexes, forwarded)| !forwarded) .filter(|deserialized_packet_batch| !deserialized_packet_batch.forwarded)
.flat_map(|(batch, valid_indexes, _forwarded)| { .flat_map(|deserialized_packet_batch| {
valid_indexes.iter().map(move |x| &batch.packets[*x]) deserialized_packet_batch
.unprocessed_packets
.iter()
.map(|(index, _)| &deserialized_packet_batch.packet_batch.packets[*index])
}) })
.collect() .collect()
} }
@ -509,20 +504,6 @@ impl BankingStage {
(Ok(()), packet_vec.len()) (Ok(()), packet_vec.len())
} }
// Returns whether the given `PacketBatch` has any more remaining unprocessed
// transactions
fn update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes: &mut Vec<usize>,
new_unprocessed_indexes: Vec<usize>,
) -> bool {
let has_more_unprocessed_transactions =
Self::packet_has_more_unprocessed_transactions(&new_unprocessed_indexes);
if has_more_unprocessed_transactions {
*original_unprocessed_indexes = new_unprocessed_indexes
};
has_more_unprocessed_transactions
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn consume_buffered_packets( pub fn consume_buffered_packets(
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
@ -543,154 +524,101 @@ impl BankingStage {
let mut proc_start = Measure::start("consume_buffered_process"); let mut proc_start = Measure::start("consume_buffered_process");
let mut reached_end_of_slot: Option<EndOfSlot> = None; let mut reached_end_of_slot: Option<EndOfSlot> = None;
RetainMut::retain_mut( RetainMut::retain_mut(buffered_packet_batches, |deserialized_packet_batch| {
buffered_packet_batches, let packet_batch = &deserialized_packet_batch.packet_batch;
|buffered_packet_batch_and_offsets| { let original_unprocessed_indexes = deserialized_packet_batch
let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) = .unprocessed_packets
buffered_packet_batch_and_offsets; .keys()
if let Some(end_of_slot) = &reached_end_of_slot { .cloned()
let (should_retain, end_of_slot_filtering_time) = Measure::this( .collect::<Vec<usize>>();
|_| { if let Some(end_of_slot) = &reached_end_of_slot {
// We've hit the end of this slot, no need to perform more processing, // We've hit the end of this slot, no need to perform more processing,
// just filter the remaining packets for the invalid (e.g. too old) ones // just filter the remaining packets for the invalid (e.g. too old) ones
// if the working_bank is available // if the working_bank is available
if let Some(bank) = &end_of_slot.working_bank { let mut end_of_slot_filtering_time = Measure::start("end_of_slot_filtering");
let new_unprocessed_indexes = let should_retain = if let Some(bank) = &end_of_slot.working_bank {
Self::filter_unprocessed_packets_at_end_of_slot( let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot(
bank, bank,
packet_batch, packet_batch,
original_unprocessed_indexes, &original_unprocessed_indexes,
my_pubkey, my_pubkey,
end_of_slot.next_slot_leader, end_of_slot.next_slot_leader,
banking_stage_stats, banking_stage_stats,
);
let end_of_slot_filtered_invalid_count =
original_unprocessed_indexes
.len()
.saturating_sub(new_unprocessed_indexes.len());
slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count(
end_of_slot_filtered_invalid_count as u64,
);
banking_stage_stats
.end_of_slot_filtered_invalid_count
.fetch_add(
end_of_slot_filtered_invalid_count,
Ordering::Relaxed,
);
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
new_unprocessed_indexes,
)
} else {
true
}
},
(),
"end_of_slot_filtering",
); );
slot_metrics_tracker
.increment_end_of_slot_filtering_us(end_of_slot_filtering_time.as_us()); let end_of_slot_filtered_invalid_count = original_unprocessed_indexes
should_retain .len()
.saturating_sub(new_unprocessed_indexes.len());
slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count(
end_of_slot_filtered_invalid_count as u64,
);
banking_stage_stats
.end_of_slot_filtered_invalid_count
.fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed);
deserialized_packet_batch.update_buffered_packets_with_new_unprocessed(
&original_unprocessed_indexes,
&new_unprocessed_indexes,
)
} else { } else {
let (bank_start, poh_recorder_lock_time) = Measure::this( true
|_| poh_recorder.lock().unwrap().bank_start(), };
(), end_of_slot_filtering_time.stop();
"poh_recorder_lock", slot_metrics_tracker
); .increment_end_of_slot_filtering_us(end_of_slot_filtering_time.as_us());
slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( should_retain
poh_recorder_lock_time.as_us(), } else {
); let (bank_start, poh_recorder_lock_time) = Measure::this(
|_| poh_recorder.lock().unwrap().bank_start(),
(),
"poh_recorder_lock",
);
slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(),
);
if let Some(BankStart { if let Some(BankStart {
working_bank, working_bank,
bank_creation_time, bank_creation_time,
}) = bank_start }) = bank_start
{ {
let (process_transactions_summary, process_packets_transactions_time) = let (process_transactions_summary, process_packets_transactions_time) =
Measure::this( Measure::this(
|_| { |_| {
Self::process_packets_transactions( Self::process_packets_transactions(
&working_bank, &working_bank,
&bank_creation_time, &bank_creation_time,
recorder, recorder,
packet_batch, packet_batch,
original_unprocessed_indexes.to_owned(), original_unprocessed_indexes.to_owned(),
transaction_status_sender.clone(), transaction_status_sender.clone(),
gossip_vote_sender, gossip_vote_sender,
banking_stage_stats, banking_stage_stats,
qos_service, qos_service,
slot_metrics_tracker, slot_metrics_tracker,
) )
}, },
(), (),
"process_packets_transactions", "process_packets_transactions",
);
slot_metrics_tracker.increment_process_packets_transactions_us(
process_packets_transactions_time.as_us(),
); );
slot_metrics_tracker.increment_process_packets_transactions_us(
process_packets_transactions_time.as_us(),
);
let ProcessTransactionsSummary { let ProcessTransactionsSummary {
reached_max_poh_height, reached_max_poh_height,
retryable_transaction_indexes, retryable_transaction_indexes,
.. ..
} = process_transactions_summary; } = process_transactions_summary;
if reached_max_poh_height if reached_max_poh_height
|| !Bank::should_bank_still_be_processing_txs( || !Bank::should_bank_still_be_processing_txs(
&bank_creation_time, &bank_creation_time,
max_tx_ingestion_ns, max_tx_ingestion_ns,
) )
{ {
let poh_recorder_lock_time = {
let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this(
|_| poh_recorder.lock().unwrap(),
(),
"poh_recorder_lock",
);
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder_locked.next_slot_leader(),
working_bank: Some(working_bank),
});
poh_recorder_lock_time
};
slot_metrics_tracker
.increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(),
);
}
// The difference between all transactions passed to execution and the ones that
// are retryable were the ones that were either:
// 1) Committed into the block
// 2) Dropped without being committed because they had some fatal error (too old,
// duplicate signature, etc.)
//
// Note: This assumes that every packet deserializes into one transaction!
consumed_buffered_packets_count += original_unprocessed_indexes
.len()
.saturating_sub(retryable_transaction_indexes.len());
// Out of the buffered packets just retried, collect any still unprocessed
// transactions in this batch for forwarding
rebuffered_packet_count += retryable_transaction_indexes.len();
let has_more_unprocessed_transactions =
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
retryable_transaction_indexes,
);
if let Some(test_fn) = &test_fn {
test_fn();
}
has_more_unprocessed_transactions
} else {
// mark as end-of-slot to avoid aggressively lock poh for the remaining for
// packet batches in buffer
let poh_recorder_lock_time = { let poh_recorder_lock_time = {
let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this( let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this(
|_| poh_recorder.lock().unwrap(), |_| poh_recorder.lock().unwrap(),
@ -700,25 +628,69 @@ impl BankingStage {
reached_end_of_slot = Some(EndOfSlot { reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder_locked.next_slot_leader(), next_slot_leader: poh_recorder_locked.next_slot_leader(),
working_bank: None, working_bank: Some(working_bank),
}); });
poh_recorder_lock_time poh_recorder_lock_time
}; };
slot_metrics_tracker slot_metrics_tracker
.increment_consume_buffered_packets_poh_recorder_lock_us( .increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(), poh_recorder_lock_time.as_us(),
); );
// `original_unprocessed_indexes` must have remaining packets to process
// if not yet processed.
assert!(Self::packet_has_more_unprocessed_transactions(
original_unprocessed_indexes
));
true
} }
// The difference between all transactions passed to execution and the ones that
// are retryable were the ones that were either:
// 1) Committed into the block
// 2) Dropped without being committed because they had some fatal error (too old,
// duplicate signature, etc.)
//
// Note: This assumes that every packet deserializes into one transaction!
consumed_buffered_packets_count += original_unprocessed_indexes
.len()
.saturating_sub(retryable_transaction_indexes.len());
// Out of the buffered packets just retried, collect any still unprocessed
// transactions in this batch for forwarding
rebuffered_packet_count += retryable_transaction_indexes.len();
let has_more_unprocessed_transactions = deserialized_packet_batch
.update_buffered_packets_with_new_unprocessed(
&original_unprocessed_indexes,
&retryable_transaction_indexes,
);
if let Some(test_fn) = &test_fn {
test_fn();
}
has_more_unprocessed_transactions
} else {
// mark as end-of-slot to avoid aggressively lock poh for the remaining for
// packet batches in buffer
let poh_recorder_lock_time = {
let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this(
|_| poh_recorder.lock().unwrap(),
(),
"poh_recorder_lock",
);
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder_locked.next_slot_leader(),
working_bank: None,
});
poh_recorder_lock_time
};
slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(),
);
// `original_unprocessed_indexes` must have remaining packets to process
// if not yet processed.
assert!(Self::packet_has_more_unprocessed_transactions(
&original_unprocessed_indexes
));
true
} }
}, }
); });
proc_start.stop(); proc_start.stop();
@ -942,9 +914,11 @@ impl BankingStage {
} }
if hold { if hold {
buffered_packet_batches.retain(|(_, index, _)| !index.is_empty()); buffered_packet_batches.retain(|deserialized_packet_batch| {
for (_, _, forwarded) in buffered_packet_batches.iter_mut() { !deserialized_packet_batch.unprocessed_packets.is_empty()
*forwarded = true; });
for deserialized_packet_batch in buffered_packet_batches.iter_mut() {
deserialized_packet_batch.forwarded = true;
} }
} else { } else {
slot_metrics_tracker slot_metrics_tracker
@ -1634,16 +1608,6 @@ impl BankingStage {
.collect_vec() .collect_vec()
} }
/// Read the transaction message from packet data
fn packet_message(packet: &Packet) -> Option<&[u8]> {
let (sig_len, sig_size) = decode_shortu16_len(&packet.data).ok()?;
let msg_start = sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))?;
let msg_end = packet.meta.size;
Some(&packet.data[msg_start..msg_end])
}
// This function deserializes packets into transactions, computes the blake3 hash of transaction // This function deserializes packets into transactions, computes the blake3 hash of transaction
// messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned // messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned
// with their packet indexes. // with their packet indexes.
@ -1664,7 +1628,7 @@ impl BankingStage {
} }
let tx: VersionedTransaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?; let tx: VersionedTransaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?;
let message_bytes = Self::packet_message(p)?; let message_bytes = DeserializedPacketBatch::packet_message(p)?;
let message_hash = Message::hash_raw_message(message_bytes); let message_hash = Message::hash_raw_message(message_bytes);
let tx = SanitizedTransaction::try_create( let tx = SanitizedTransaction::try_create(
tx, tx,
@ -1934,7 +1898,7 @@ impl BankingStage {
batch_limit, batch_limit,
banking_stage_stats, banking_stage_stats,
slot_metrics_tracker, slot_metrics_tracker,
); )
} }
proc_start.stop(); proc_start.stop();
@ -1967,7 +1931,9 @@ impl BankingStage {
banking_stage_stats.current_buffered_packets_count.swap( banking_stage_stats.current_buffered_packets_count.swap(
buffered_packet_batches buffered_packet_batches
.iter() .iter()
.map(|packets| packets.1.len()) .map(|deserialized_packet_batch| {
deserialized_packet_batch.unprocessed_packets.len()
})
.sum(), .sum(),
Ordering::Relaxed, Ordering::Relaxed,
); );
@ -1990,9 +1956,9 @@ impl BankingStage {
if unprocessed_packet_batches.len() >= batch_limit { if unprocessed_packet_batches.len() >= batch_limit {
*dropped_packet_batches_count += 1; *dropped_packet_batches_count += 1;
if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() { if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() {
*dropped_packets_count += dropped_batch.1.len(); *dropped_packets_count += dropped_batch.unprocessed_packets.len();
slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count(
dropped_batch.1.len() as u64, dropped_batch.unprocessed_packets.len() as u64,
); );
} }
} }
@ -2003,7 +1969,12 @@ impl BankingStage {
*newly_buffered_packets_count += packet_indexes.len(); *newly_buffered_packets_count += packet_indexes.len();
slot_metrics_tracker slot_metrics_tracker
.increment_newly_buffered_packets_count(packet_indexes.len() as u64); .increment_newly_buffered_packets_count(packet_indexes.len() as u64);
unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false));
unprocessed_packet_batches.push_back(DeserializedPacketBatch::new(
packet_batch,
packet_indexes,
false,
));
} }
} }
@ -2958,7 +2929,14 @@ mod tests {
let packet_batch = PacketBatch::new( let packet_batch = PacketBatch::new(
(0..32) (0..32)
.map(|packet_id| { .map(|packet_id| {
let mut p = Packet::default(); // packets are deserialized upon receiving, failed packets will not be
// forwarded; Therefore we need to create real packets here.
let keypair = Keypair::new();
let pubkey = solana_sdk::pubkey::new_rand();
let blockhash = Hash::new_unique();
let transaction =
system_transaction::transfer(&keypair, &pubkey, 1, blockhash);
let mut p = Packet::from_data(None, &transaction).unwrap();
p.meta.port = packets_id << 8 | packet_id; p.meta.port = packets_id << 8 | packet_id;
p p
}) })
@ -2967,7 +2945,7 @@ mod tests {
let valid_indexes = (0..32) let valid_indexes = (0..32)
.filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None }) .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None })
.collect_vec(); .collect_vec();
(packet_batch, valid_indexes, false) DeserializedPacketBatch::new(packet_batch, valid_indexes, false)
}) })
.collect_vec(); .collect_vec();
@ -2975,17 +2953,22 @@ mod tests {
assert_eq!(result.len(), 256); assert_eq!(result.len(), 256);
let _ = result // packets in a batch are forwarded in arbitrary order; verify the ports match after
.into_iter() // sorting
.enumerate() let expected_ports: Vec<_> = (0..16)
.map(|(index, p)| { .flat_map(|packets_id| {
let packets_id = index / 16; (0..16).map(move |packet_id| {
let packet_id = (index % 16) * 2 + 1; let packet_id = packet_id * 2 + 1;
assert_eq!(p.meta.port, (packets_id << 8 | packet_id) as u16); (packets_id << 8 | packet_id) as u16
})
}) })
.collect_vec(); .collect();
packet_batches[0].2 = true; let mut forwarded_ports: Vec<_> = result.into_iter().map(|p| p.meta.port).collect();
forwarded_ports.sort_unstable();
assert_eq!(expected_ports, forwarded_ports);
packet_batches[0].forwarded = true;
let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter()); let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter());
assert_eq!(result.len(), 240); assert_eq!(result.len(), 240);
} }
@ -3580,13 +3563,14 @@ mod tests {
num_conflicting_transactions num_conflicting_transactions
); );
let packet_batch = packet_batches.pop().unwrap(); let packet_batch = packet_batches.pop().unwrap();
let mut buffered_packet_batches: UnprocessedPacketBatches = vec![( let mut buffered_packet_batches: UnprocessedPacketBatches =
packet_batch, vec![DeserializedPacketBatch::new(
(0..num_conflicting_transactions).into_iter().collect(), packet_batch,
false, (0..num_conflicting_transactions).into_iter().collect(),
)] false,
.into_iter() )]
.collect(); .into_iter()
.collect();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
@ -3607,7 +3591,7 @@ mod tests {
&mut LeaderSlotMetricsTracker::new(0), &mut LeaderSlotMetricsTracker::new(0),
); );
assert_eq!( assert_eq!(
buffered_packet_batches[0].1.len(), buffered_packet_batches[0].unprocessed_packets.len(),
num_conflicting_transactions num_conflicting_transactions
); );
// When the poh recorder has a bank, should process all non conflicting buffered packets. // When the poh recorder has a bank, should process all non conflicting buffered packets.
@ -3630,7 +3614,10 @@ mod tests {
if num_expected_unprocessed == 0 { if num_expected_unprocessed == 0 {
assert!(buffered_packet_batches.is_empty()) assert!(buffered_packet_batches.is_empty())
} else { } else {
assert_eq!(buffered_packet_batches[0].1.len(), num_expected_unprocessed); assert_eq!(
buffered_packet_batches[0].unprocessed_packets.len(),
num_expected_unprocessed
);
} }
} }
poh_recorder poh_recorder
@ -3658,7 +3645,7 @@ mod tests {
let mut buffered_packet_batches: UnprocessedPacketBatches = packet_batches let mut buffered_packet_batches: UnprocessedPacketBatches = packet_batches
.clone() .clone()
.into_iter() .into_iter()
.map(|single_packets| (single_packets, vec![0], false)) .map(|single_packets| DeserializedPacketBatch::new(single_packets, vec![0], false))
.collect(); .collect();
let (continue_sender, continue_receiver) = unbounded(); let (continue_sender, continue_receiver) = unbounded();
@ -3701,13 +3688,12 @@ mod tests {
buffered_packet_batches.len(), buffered_packet_batches.len(),
packet_batches[interrupted_iteration + 1..].len() packet_batches[interrupted_iteration + 1..].len()
); );
for ((remaining_unprocessed_packet, _, _forwarded), original_packet) in for (deserialized_packet_batch, original_packet) in buffered_packet_batches
buffered_packet_batches .iter()
.iter() .zip(&packet_batches[interrupted_iteration + 1..])
.zip(&packet_batches[interrupted_iteration + 1..])
{ {
assert_eq!( assert_eq!(
remaining_unprocessed_packet.packets[0], deserialized_packet_batch.packet_batch.packets[0],
original_packet.packets[0] original_packet.packets[0]
); );
} }
@ -3740,7 +3726,13 @@ mod tests {
fn test_forwarder_budget() { fn test_forwarder_budget() {
solana_logger::setup(); solana_logger::setup();
// Create `PacketBatch` with 1 unprocessed packet // Create `PacketBatch` with 1 unprocessed packet
let packet = Packet::from_data(None, &[0]).unwrap(); let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let packet = Packet::from_data(None, &tx).unwrap();
let single_packet_batch = PacketBatch::new(vec![packet]); let single_packet_batch = PacketBatch::new(vec![packet]);
let genesis_config_info = create_slow_genesis_config(10_000); let genesis_config_info = create_slow_genesis_config(10_000);
@ -3779,9 +3771,13 @@ mod tests {
for (name, data_budget, expected_num_forwarded) in test_cases { for (name, data_budget, expected_num_forwarded) in test_cases {
let mut unprocessed_packet_batches: UnprocessedPacketBatches = let mut unprocessed_packet_batches: UnprocessedPacketBatches =
vec![(single_packet_batch.clone(), vec![0], false)] vec![DeserializedPacketBatch::new(
.into_iter() single_packet_batch.clone(),
.collect(); vec![0],
false,
)]
.into_iter()
.collect();
BankingStage::handle_forwarding( BankingStage::handle_forwarding(
&ForwardOption::ForwardTransaction, &ForwardOption::ForwardTransaction,
&cluster_info, &cluster_info,
@ -3811,22 +3807,34 @@ mod tests {
#[test] #[test]
fn test_handle_forwarding() { fn test_handle_forwarding() {
solana_logger::setup(); solana_logger::setup();
// packets are deserialized upon receiving, failed packets will not be
// forwarded; Therefore need to create real packets here.
let keypair = Keypair::new();
let pubkey = solana_sdk::pubkey::new_rand();
const FWD_PACKET: u8 = 1; let fwd_block_hash = Hash::new_unique();
let forwarded_packet = { let forwarded_packet = {
let mut packet = Packet::from_data(None, &[FWD_PACKET]).unwrap(); let transaction = system_transaction::transfer(&keypair, &pubkey, 1, fwd_block_hash);
let mut packet = Packet::from_data(None, &transaction).unwrap();
packet.meta.flags |= PacketFlags::FORWARDED; packet.meta.flags |= PacketFlags::FORWARDED;
packet packet
}; };
const NORMAL_PACKET: u8 = 2; let normal_block_hash = Hash::new_unique();
let normal_packet = Packet::from_data(None, &[NORMAL_PACKET]).unwrap(); let normal_packet = {
let transaction = system_transaction::transfer(&keypair, &pubkey, 1, normal_block_hash);
Packet::from_data(None, &transaction).unwrap()
};
let packet_batch = PacketBatch::new(vec![forwarded_packet, normal_packet]); let packet_batch = PacketBatch::new(vec![forwarded_packet, normal_packet]);
let mut unprocessed_packet_batches: UnprocessedPacketBatches = let mut unprocessed_packet_batches: UnprocessedPacketBatches =
vec![(packet_batch, vec![0, 1], false)] vec![DeserializedPacketBatch::new(
.into_iter() packet_batch,
.collect(); vec![0, 1],
false,
)]
.into_iter()
.collect();
let genesis_config_info = create_slow_genesis_config(10_000); let genesis_config_info = create_slow_genesis_config(10_000);
let GenesisConfigInfo { let GenesisConfigInfo {
@ -3862,7 +3870,7 @@ mod tests {
"fwd-normal", "fwd-normal",
ForwardOption::ForwardTransaction, ForwardOption::ForwardTransaction,
true, true,
vec![NORMAL_PACKET], vec![normal_block_hash],
2, 2,
), ),
( (
@ -3901,13 +3909,20 @@ mod tests {
let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default();
assert_eq!(num_received, expected_ids.len(), "{}", name); assert_eq!(num_received, expected_ids.len(), "{}", name);
for (i, expected_id) in expected_ids.iter().enumerate() { for (i, expected_id) in expected_ids.iter().enumerate() {
assert_eq!(packets[i].meta.size, 1); assert_eq!(packets[i].meta.size, 215);
assert_eq!(packets[i].data[0], *expected_id, "{}", name); let recv_transaction: VersionedTransaction =
limited_deserialize(&packets[i].data[0..packets[i].meta.size]).unwrap();
assert_eq!(
recv_transaction.message.recent_blockhash(),
expected_id,
"{}",
name
);
} }
let num_unprocessed_packets: usize = unprocessed_packet_batches let num_unprocessed_packets: usize = unprocessed_packet_batches
.iter() .iter()
.map(|(b, ..)| b.packets.len()) .map(|b| b.packet_batch.packets.len())
.sum(); .sum();
assert_eq!( assert_eq!(
num_unprocessed_packets, expected_num_unprocessed, num_unprocessed_packets, expected_num_unprocessed,
@ -3926,11 +3941,24 @@ mod tests {
fn test_push_unprocessed_batch_limit() { fn test_push_unprocessed_batch_limit() {
solana_logger::setup(); solana_logger::setup();
// Create `PacketBatch` with 2 unprocessed packets // Create `PacketBatch` with 2 unprocessed packets
let new_packet_batch = PacketBatch::new(vec![Packet::default(); 2]); let tx = system_transaction::transfer(
let mut unprocessed_packets: UnprocessedPacketBatches = &Keypair::new(),
vec![(new_packet_batch, vec![0, 1], false)] &solana_sdk::pubkey::new_rand(),
.into_iter() 1,
.collect(); Hash::new_unique(),
);
let packet = Packet::from_data(None, &tx).unwrap();
let new_packet_batch = PacketBatch::new(vec![packet; 2]);
let mut unprocessed_packets: UnprocessedPacketBatches = vec![DeserializedPacketBatch::new(
new_packet_batch,
vec![0, 1],
false,
)]
.into_iter()
.collect();
assert_eq!(unprocessed_packets.len(), 1);
assert_eq!(unprocessed_packets[0].unprocessed_packets.len(), 2);
// Set the limit to 2 // Set the limit to 2
let batch_limit = 2; let batch_limit = 2;
// Create new unprocessed packets and add to a batch // Create new unprocessed packets and add to a batch
@ -3999,7 +4027,7 @@ mod tests {
); );
assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets.len(), 2);
assert_eq!( assert_eq!(
unprocessed_packets[1].0.packets[0], unprocessed_packets[1].packet_batch.packets[0],
new_packet_batch.packets[0] new_packet_batch.packets[0]
); );
assert_eq!(dropped_packet_batches_count, 1); assert_eq!(dropped_packet_batches_count, 1);
@ -4007,19 +4035,6 @@ mod tests {
assert_eq!(newly_buffered_packets_count, 2); assert_eq!(newly_buffered_packets_count, 2);
} }
#[test]
fn test_packet_message() {
let keypair = Keypair::new();
let pubkey = solana_sdk::pubkey::new_rand();
let blockhash = Hash::new_unique();
let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash);
let packet = Packet::from_data(None, &transaction).unwrap();
assert_eq!(
BankingStage::packet_message(&packet).unwrap().to_vec(),
transaction.message_data()
);
}
#[cfg(test)] #[cfg(test)]
fn make_test_packets( fn make_test_packets(
transactions: Vec<Transaction>, transactions: Vec<Transaction>,

View File

@ -64,6 +64,7 @@ pub mod tpu;
pub mod tree_diff; pub mod tree_diff;
pub mod tvu; pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes; pub mod unfrozen_gossip_verified_vote_hashes;
pub mod unprocessed_packet_batches;
pub mod validator; pub mod validator;
pub mod verified_vote_packets; pub mod verified_vote_packets;
pub mod vote_simulator; pub mod vote_simulator;

View File

@ -0,0 +1,131 @@
use {
solana_perf::packet::{limited_deserialize, Packet, PacketBatch},
solana_sdk::{
hash::Hash, message::Message, short_vec::decode_shortu16_len, signature::Signature,
transaction::VersionedTransaction,
},
std::{
collections::{HashMap, VecDeque},
mem::size_of,
},
};
pub type UnprocessedPacketBatches = VecDeque<DeserializedPacketBatch>;
/// hold deserialized messages, as well as computed message_hash and other things needed to create
/// SanitizedTransaction
#[derive(Debug, Default)]
pub struct DeserializedPacket {
#[allow(dead_code)]
versioned_transaction: VersionedTransaction,
#[allow(dead_code)]
message_hash: Hash,
#[allow(dead_code)]
is_simple_vote: bool,
}
#[derive(Debug, Default)]
pub struct DeserializedPacketBatch {
pub packet_batch: PacketBatch,
pub forwarded: bool,
// indexes of valid packets in batch, and their corrersponding deserialized_packet
pub unprocessed_packets: HashMap<usize, DeserializedPacket>,
}
impl DeserializedPacketBatch {
pub fn new(packet_batch: PacketBatch, packet_indexes: Vec<usize>, forwarded: bool) -> Self {
let unprocessed_packets = Self::deserialize_packets(&packet_batch, &packet_indexes);
Self {
packet_batch,
unprocessed_packets,
forwarded,
}
}
fn deserialize_packets(
packet_batch: &PacketBatch,
packet_indexes: &[usize],
) -> HashMap<usize, DeserializedPacket> {
packet_indexes
.iter()
.filter_map(|packet_index| {
let deserialized_packet =
Self::deserialize_packet(&packet_batch.packets[*packet_index])?;
Some((*packet_index, deserialized_packet))
})
.collect()
}
fn deserialize_packet(packet: &Packet) -> Option<DeserializedPacket> {
let versioned_transaction: VersionedTransaction =
match limited_deserialize(&packet.data[0..packet.meta.size]) {
Ok(tx) => tx,
Err(_) => return None,
};
if let Some(message_bytes) = Self::packet_message(packet) {
let message_hash = Message::hash_raw_message(message_bytes);
let is_simple_vote = packet.meta.is_simple_vote_tx();
Some(DeserializedPacket {
versioned_transaction,
message_hash,
is_simple_vote,
})
} else {
None
}
}
/// Read the transaction message from packet data
pub fn packet_message(packet: &Packet) -> Option<&[u8]> {
let (sig_len, sig_size) = decode_shortu16_len(&packet.data).ok()?;
let msg_start = sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))?;
let msg_end = packet.meta.size;
Some(&packet.data[msg_start..msg_end])
}
// Returns whether the given `PacketBatch` has any more remaining unprocessed
// transactions
pub fn update_buffered_packets_with_new_unprocessed(
&mut self,
_original_unprocessed_indexes: &[usize],
new_unprocessed_indexes: &[usize],
) -> bool {
let has_more_unprocessed_transactions = !new_unprocessed_indexes.is_empty();
if has_more_unprocessed_transactions {
self.unprocessed_packets
.retain(|index, _| new_unprocessed_indexes.contains(index));
} else {
self.unprocessed_packets.clear();
}
has_more_unprocessed_transactions
}
}
#[cfg(test)]
mod tests {
use {
super::*,
solana_sdk::{signature::Keypair, system_transaction},
};
#[test]
fn test_packet_message() {
let keypair = Keypair::new();
let pubkey = solana_sdk::pubkey::new_rand();
let blockhash = Hash::new_unique();
let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash);
let packet = Packet::from_data(None, &transaction).unwrap();
assert_eq!(
DeserializedPacketBatch::packet_message(&packet)
.unwrap()
.to_vec(),
transaction.message_data()
);
}
}