diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 776b6cfdf4..1c6c607baf 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -12,6 +12,7 @@ use { banking_stage::{BankingStage, BankingStageStats}, leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, qos_service::QosService, + unprocessed_packet_batches::*, }, solana_entry::entry::{next_hash, Entry}, solana_gossip::cluster_info::{ClusterInfo, Node}, @@ -82,7 +83,11 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let mut packet_batches = VecDeque::new(); for batch in batches { let batch_len = batch.packets.len(); - packet_batches.push_back((batch, vec![0usize; batch_len], false)); + packet_batches.push_back(DeserializedPacketBatch::new( + batch, + vec![0usize; batch_len], + false, + )); } let (s, _r) = unbounded(); // This tests the performance of buffering packets. diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 00af7f1f09..598d459b6d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -8,6 +8,7 @@ use { LeaderExecuteAndCommitTimings, RecordTransactionsTimings, }, qos_service::QosService, + unprocessed_packet_batches::*, }, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, histogram::Histogram, @@ -45,8 +46,6 @@ use { feature_set, message::Message, pubkey::Pubkey, - short_vec::decode_shortu16_len, - signature::Signature, timing::{duration_as_ms, timestamp, AtomicInterval}, transaction::{ self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction, @@ -60,7 +59,6 @@ use { cmp, collections::{HashMap, VecDeque}, env, - mem::size_of, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, @@ -71,12 +69,6 @@ use { }, }; -/// (packets, valid_indexes, forwarded) -/// Batch of packets with a list of which are valid and if this batch has been forwarded. -type PacketBatchAndOffsets = (PacketBatch, Vec, bool); - -pub type UnprocessedPacketBatches = VecDeque; - /// Transaction forwarding pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; pub const HOLD_TRANSACTIONS_SLOT_OFFSET: u64 = 20; @@ -458,12 +450,15 @@ impl BankingStage { } fn filter_valid_packets_for_forwarding<'a>( - packet_batches: impl Iterator, + packet_batches: impl Iterator, ) -> Vec<&'a Packet> { packet_batches - .filter(|(_batch, _indexes, forwarded)| !forwarded) - .flat_map(|(batch, valid_indexes, _forwarded)| { - valid_indexes.iter().map(move |x| &batch.packets[*x]) + .filter(|deserialized_packet_batch| !deserialized_packet_batch.forwarded) + .flat_map(|deserialized_packet_batch| { + deserialized_packet_batch + .unprocessed_packets + .iter() + .map(|(index, _)| &deserialized_packet_batch.packet_batch.packets[*index]) }) .collect() } @@ -509,20 +504,6 @@ impl BankingStage { (Ok(()), packet_vec.len()) } - // Returns whether the given `PacketBatch` has any more remaining unprocessed - // transactions - fn update_buffered_packets_with_new_unprocessed( - original_unprocessed_indexes: &mut Vec, - new_unprocessed_indexes: Vec, - ) -> bool { - let has_more_unprocessed_transactions = - Self::packet_has_more_unprocessed_transactions(&new_unprocessed_indexes); - if has_more_unprocessed_transactions { - *original_unprocessed_indexes = new_unprocessed_indexes - }; - has_more_unprocessed_transactions - } - #[allow(clippy::too_many_arguments)] pub fn consume_buffered_packets( my_pubkey: &Pubkey, @@ -543,154 +524,101 @@ impl BankingStage { let mut proc_start = Measure::start("consume_buffered_process"); let mut reached_end_of_slot: Option = None; - RetainMut::retain_mut( - buffered_packet_batches, - |buffered_packet_batch_and_offsets| { - let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) = - buffered_packet_batch_and_offsets; - if let Some(end_of_slot) = &reached_end_of_slot { - let (should_retain, end_of_slot_filtering_time) = Measure::this( - |_| { - // We've hit the end of this slot, no need to perform more processing, - // just filter the remaining packets for the invalid (e.g. too old) ones - // if the working_bank is available - if let Some(bank) = &end_of_slot.working_bank { - let new_unprocessed_indexes = - Self::filter_unprocessed_packets_at_end_of_slot( - bank, - packet_batch, - original_unprocessed_indexes, - my_pubkey, - end_of_slot.next_slot_leader, - banking_stage_stats, - ); - - let end_of_slot_filtered_invalid_count = - original_unprocessed_indexes - .len() - .saturating_sub(new_unprocessed_indexes.len()); - - slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( - end_of_slot_filtered_invalid_count as u64, - ); - - banking_stage_stats - .end_of_slot_filtered_invalid_count - .fetch_add( - end_of_slot_filtered_invalid_count, - Ordering::Relaxed, - ); - - Self::update_buffered_packets_with_new_unprocessed( - original_unprocessed_indexes, - new_unprocessed_indexes, - ) - } else { - true - } - }, - (), - "end_of_slot_filtering", + RetainMut::retain_mut(buffered_packet_batches, |deserialized_packet_batch| { + let packet_batch = &deserialized_packet_batch.packet_batch; + let original_unprocessed_indexes = deserialized_packet_batch + .unprocessed_packets + .keys() + .cloned() + .collect::>(); + if let Some(end_of_slot) = &reached_end_of_slot { + // We've hit the end of this slot, no need to perform more processing, + // just filter the remaining packets for the invalid (e.g. too old) ones + // if the working_bank is available + let mut end_of_slot_filtering_time = Measure::start("end_of_slot_filtering"); + let should_retain = if let Some(bank) = &end_of_slot.working_bank { + let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( + bank, + packet_batch, + &original_unprocessed_indexes, + my_pubkey, + end_of_slot.next_slot_leader, + banking_stage_stats, ); - slot_metrics_tracker - .increment_end_of_slot_filtering_us(end_of_slot_filtering_time.as_us()); - should_retain + + let end_of_slot_filtered_invalid_count = original_unprocessed_indexes + .len() + .saturating_sub(new_unprocessed_indexes.len()); + + slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( + end_of_slot_filtered_invalid_count as u64, + ); + + banking_stage_stats + .end_of_slot_filtered_invalid_count + .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); + + deserialized_packet_batch.update_buffered_packets_with_new_unprocessed( + &original_unprocessed_indexes, + &new_unprocessed_indexes, + ) } else { - let (bank_start, poh_recorder_lock_time) = Measure::this( - |_| poh_recorder.lock().unwrap().bank_start(), - (), - "poh_recorder_lock", - ); - slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( - poh_recorder_lock_time.as_us(), - ); + true + }; + end_of_slot_filtering_time.stop(); + slot_metrics_tracker + .increment_end_of_slot_filtering_us(end_of_slot_filtering_time.as_us()); + should_retain + } else { + let (bank_start, poh_recorder_lock_time) = Measure::this( + |_| poh_recorder.lock().unwrap().bank_start(), + (), + "poh_recorder_lock", + ); + slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( + poh_recorder_lock_time.as_us(), + ); - if let Some(BankStart { - working_bank, - bank_creation_time, - }) = bank_start - { - let (process_transactions_summary, process_packets_transactions_time) = - Measure::this( - |_| { - Self::process_packets_transactions( - &working_bank, - &bank_creation_time, - recorder, - packet_batch, - original_unprocessed_indexes.to_owned(), - transaction_status_sender.clone(), - gossip_vote_sender, - banking_stage_stats, - qos_service, - slot_metrics_tracker, - ) - }, - (), - "process_packets_transactions", - ); - slot_metrics_tracker.increment_process_packets_transactions_us( - process_packets_transactions_time.as_us(), + if let Some(BankStart { + working_bank, + bank_creation_time, + }) = bank_start + { + let (process_transactions_summary, process_packets_transactions_time) = + Measure::this( + |_| { + Self::process_packets_transactions( + &working_bank, + &bank_creation_time, + recorder, + packet_batch, + original_unprocessed_indexes.to_owned(), + transaction_status_sender.clone(), + gossip_vote_sender, + banking_stage_stats, + qos_service, + slot_metrics_tracker, + ) + }, + (), + "process_packets_transactions", ); + slot_metrics_tracker.increment_process_packets_transactions_us( + process_packets_transactions_time.as_us(), + ); - let ProcessTransactionsSummary { - reached_max_poh_height, - retryable_transaction_indexes, - .. - } = process_transactions_summary; + let ProcessTransactionsSummary { + reached_max_poh_height, + retryable_transaction_indexes, + .. + } = process_transactions_summary; - if reached_max_poh_height - || !Bank::should_bank_still_be_processing_txs( - &bank_creation_time, - max_tx_ingestion_ns, - ) - { - let poh_recorder_lock_time = { - let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this( - |_| poh_recorder.lock().unwrap(), - (), - "poh_recorder_lock", - ); - - reached_end_of_slot = Some(EndOfSlot { - next_slot_leader: poh_recorder_locked.next_slot_leader(), - working_bank: Some(working_bank), - }); - poh_recorder_lock_time - }; - - slot_metrics_tracker - .increment_consume_buffered_packets_poh_recorder_lock_us( - poh_recorder_lock_time.as_us(), - ); - } - - // The difference between all transactions passed to execution and the ones that - // are retryable were the ones that were either: - // 1) Committed into the block - // 2) Dropped without being committed because they had some fatal error (too old, - // duplicate signature, etc.) - // - // Note: This assumes that every packet deserializes into one transaction! - consumed_buffered_packets_count += original_unprocessed_indexes - .len() - .saturating_sub(retryable_transaction_indexes.len()); - - // Out of the buffered packets just retried, collect any still unprocessed - // transactions in this batch for forwarding - rebuffered_packet_count += retryable_transaction_indexes.len(); - let has_more_unprocessed_transactions = - Self::update_buffered_packets_with_new_unprocessed( - original_unprocessed_indexes, - retryable_transaction_indexes, - ); - if let Some(test_fn) = &test_fn { - test_fn(); - } - has_more_unprocessed_transactions - } else { - // mark as end-of-slot to avoid aggressively lock poh for the remaining for - // packet batches in buffer + if reached_max_poh_height + || !Bank::should_bank_still_be_processing_txs( + &bank_creation_time, + max_tx_ingestion_ns, + ) + { let poh_recorder_lock_time = { let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this( |_| poh_recorder.lock().unwrap(), @@ -700,25 +628,69 @@ impl BankingStage { reached_end_of_slot = Some(EndOfSlot { next_slot_leader: poh_recorder_locked.next_slot_leader(), - working_bank: None, + working_bank: Some(working_bank), }); poh_recorder_lock_time }; + slot_metrics_tracker .increment_consume_buffered_packets_poh_recorder_lock_us( poh_recorder_lock_time.as_us(), ); - - // `original_unprocessed_indexes` must have remaining packets to process - // if not yet processed. - assert!(Self::packet_has_more_unprocessed_transactions( - original_unprocessed_indexes - )); - true } + + // The difference between all transactions passed to execution and the ones that + // are retryable were the ones that were either: + // 1) Committed into the block + // 2) Dropped without being committed because they had some fatal error (too old, + // duplicate signature, etc.) + // + // Note: This assumes that every packet deserializes into one transaction! + consumed_buffered_packets_count += original_unprocessed_indexes + .len() + .saturating_sub(retryable_transaction_indexes.len()); + + // Out of the buffered packets just retried, collect any still unprocessed + // transactions in this batch for forwarding + rebuffered_packet_count += retryable_transaction_indexes.len(); + let has_more_unprocessed_transactions = deserialized_packet_batch + .update_buffered_packets_with_new_unprocessed( + &original_unprocessed_indexes, + &retryable_transaction_indexes, + ); + if let Some(test_fn) = &test_fn { + test_fn(); + } + has_more_unprocessed_transactions + } else { + // mark as end-of-slot to avoid aggressively lock poh for the remaining for + // packet batches in buffer + let poh_recorder_lock_time = { + let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this( + |_| poh_recorder.lock().unwrap(), + (), + "poh_recorder_lock", + ); + + reached_end_of_slot = Some(EndOfSlot { + next_slot_leader: poh_recorder_locked.next_slot_leader(), + working_bank: None, + }); + poh_recorder_lock_time + }; + slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( + poh_recorder_lock_time.as_us(), + ); + + // `original_unprocessed_indexes` must have remaining packets to process + // if not yet processed. + assert!(Self::packet_has_more_unprocessed_transactions( + &original_unprocessed_indexes + )); + true } - }, - ); + } + }); proc_start.stop(); @@ -942,9 +914,11 @@ impl BankingStage { } if hold { - buffered_packet_batches.retain(|(_, index, _)| !index.is_empty()); - for (_, _, forwarded) in buffered_packet_batches.iter_mut() { - *forwarded = true; + buffered_packet_batches.retain(|deserialized_packet_batch| { + !deserialized_packet_batch.unprocessed_packets.is_empty() + }); + for deserialized_packet_batch in buffered_packet_batches.iter_mut() { + deserialized_packet_batch.forwarded = true; } } else { slot_metrics_tracker @@ -1634,16 +1608,6 @@ impl BankingStage { .collect_vec() } - /// Read the transaction message from packet data - fn packet_message(packet: &Packet) -> Option<&[u8]> { - let (sig_len, sig_size) = decode_shortu16_len(&packet.data).ok()?; - let msg_start = sig_len - .checked_mul(size_of::()) - .and_then(|v| v.checked_add(sig_size))?; - let msg_end = packet.meta.size; - Some(&packet.data[msg_start..msg_end]) - } - // This function deserializes packets into transactions, computes the blake3 hash of transaction // messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned // with their packet indexes. @@ -1664,7 +1628,7 @@ impl BankingStage { } let tx: VersionedTransaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?; - let message_bytes = Self::packet_message(p)?; + let message_bytes = DeserializedPacketBatch::packet_message(p)?; let message_hash = Message::hash_raw_message(message_bytes); let tx = SanitizedTransaction::try_create( tx, @@ -1934,7 +1898,7 @@ impl BankingStage { batch_limit, banking_stage_stats, slot_metrics_tracker, - ); + ) } proc_start.stop(); @@ -1967,7 +1931,9 @@ impl BankingStage { banking_stage_stats.current_buffered_packets_count.swap( buffered_packet_batches .iter() - .map(|packets| packets.1.len()) + .map(|deserialized_packet_batch| { + deserialized_packet_batch.unprocessed_packets.len() + }) .sum(), Ordering::Relaxed, ); @@ -1990,9 +1956,9 @@ impl BankingStage { if unprocessed_packet_batches.len() >= batch_limit { *dropped_packet_batches_count += 1; if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() { - *dropped_packets_count += dropped_batch.1.len(); + *dropped_packets_count += dropped_batch.unprocessed_packets.len(); slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( - dropped_batch.1.len() as u64, + dropped_batch.unprocessed_packets.len() as u64, ); } } @@ -2003,7 +1969,12 @@ impl BankingStage { *newly_buffered_packets_count += packet_indexes.len(); slot_metrics_tracker .increment_newly_buffered_packets_count(packet_indexes.len() as u64); - unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false)); + + unprocessed_packet_batches.push_back(DeserializedPacketBatch::new( + packet_batch, + packet_indexes, + false, + )); } } @@ -2958,7 +2929,14 @@ mod tests { let packet_batch = PacketBatch::new( (0..32) .map(|packet_id| { - let mut p = Packet::default(); + // packets are deserialized upon receiving, failed packets will not be + // forwarded; Therefore we need to create real packets here. + let keypair = Keypair::new(); + let pubkey = solana_sdk::pubkey::new_rand(); + let blockhash = Hash::new_unique(); + let transaction = + system_transaction::transfer(&keypair, &pubkey, 1, blockhash); + let mut p = Packet::from_data(None, &transaction).unwrap(); p.meta.port = packets_id << 8 | packet_id; p }) @@ -2967,7 +2945,7 @@ mod tests { let valid_indexes = (0..32) .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None }) .collect_vec(); - (packet_batch, valid_indexes, false) + DeserializedPacketBatch::new(packet_batch, valid_indexes, false) }) .collect_vec(); @@ -2975,17 +2953,22 @@ mod tests { assert_eq!(result.len(), 256); - let _ = result - .into_iter() - .enumerate() - .map(|(index, p)| { - let packets_id = index / 16; - let packet_id = (index % 16) * 2 + 1; - assert_eq!(p.meta.port, (packets_id << 8 | packet_id) as u16); + // packets in a batch are forwarded in arbitrary order; verify the ports match after + // sorting + let expected_ports: Vec<_> = (0..16) + .flat_map(|packets_id| { + (0..16).map(move |packet_id| { + let packet_id = packet_id * 2 + 1; + (packets_id << 8 | packet_id) as u16 + }) }) - .collect_vec(); + .collect(); - packet_batches[0].2 = true; + let mut forwarded_ports: Vec<_> = result.into_iter().map(|p| p.meta.port).collect(); + forwarded_ports.sort_unstable(); + assert_eq!(expected_ports, forwarded_ports); + + packet_batches[0].forwarded = true; let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter()); assert_eq!(result.len(), 240); } @@ -3580,13 +3563,14 @@ mod tests { num_conflicting_transactions ); let packet_batch = packet_batches.pop().unwrap(); - let mut buffered_packet_batches: UnprocessedPacketBatches = vec![( - packet_batch, - (0..num_conflicting_transactions).into_iter().collect(), - false, - )] - .into_iter() - .collect(); + let mut buffered_packet_batches: UnprocessedPacketBatches = + vec![DeserializedPacketBatch::new( + packet_batch, + (0..num_conflicting_transactions).into_iter().collect(), + false, + )] + .into_iter() + .collect(); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); @@ -3607,7 +3591,7 @@ mod tests { &mut LeaderSlotMetricsTracker::new(0), ); assert_eq!( - buffered_packet_batches[0].1.len(), + buffered_packet_batches[0].unprocessed_packets.len(), num_conflicting_transactions ); // When the poh recorder has a bank, should process all non conflicting buffered packets. @@ -3630,7 +3614,10 @@ mod tests { if num_expected_unprocessed == 0 { assert!(buffered_packet_batches.is_empty()) } else { - assert_eq!(buffered_packet_batches[0].1.len(), num_expected_unprocessed); + assert_eq!( + buffered_packet_batches[0].unprocessed_packets.len(), + num_expected_unprocessed + ); } } poh_recorder @@ -3658,7 +3645,7 @@ mod tests { let mut buffered_packet_batches: UnprocessedPacketBatches = packet_batches .clone() .into_iter() - .map(|single_packets| (single_packets, vec![0], false)) + .map(|single_packets| DeserializedPacketBatch::new(single_packets, vec![0], false)) .collect(); let (continue_sender, continue_receiver) = unbounded(); @@ -3701,13 +3688,12 @@ mod tests { buffered_packet_batches.len(), packet_batches[interrupted_iteration + 1..].len() ); - for ((remaining_unprocessed_packet, _, _forwarded), original_packet) in - buffered_packet_batches - .iter() - .zip(&packet_batches[interrupted_iteration + 1..]) + for (deserialized_packet_batch, original_packet) in buffered_packet_batches + .iter() + .zip(&packet_batches[interrupted_iteration + 1..]) { assert_eq!( - remaining_unprocessed_packet.packets[0], + deserialized_packet_batch.packet_batch.packets[0], original_packet.packets[0] ); } @@ -3740,7 +3726,13 @@ mod tests { fn test_forwarder_budget() { solana_logger::setup(); // Create `PacketBatch` with 1 unprocessed packet - let packet = Packet::from_data(None, &[0]).unwrap(); + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, &tx).unwrap(); let single_packet_batch = PacketBatch::new(vec![packet]); let genesis_config_info = create_slow_genesis_config(10_000); @@ -3779,9 +3771,13 @@ mod tests { for (name, data_budget, expected_num_forwarded) in test_cases { let mut unprocessed_packet_batches: UnprocessedPacketBatches = - vec![(single_packet_batch.clone(), vec![0], false)] - .into_iter() - .collect(); + vec![DeserializedPacketBatch::new( + single_packet_batch.clone(), + vec![0], + false, + )] + .into_iter() + .collect(); BankingStage::handle_forwarding( &ForwardOption::ForwardTransaction, &cluster_info, @@ -3811,22 +3807,34 @@ mod tests { #[test] fn test_handle_forwarding() { solana_logger::setup(); + // packets are deserialized upon receiving, failed packets will not be + // forwarded; Therefore need to create real packets here. + let keypair = Keypair::new(); + let pubkey = solana_sdk::pubkey::new_rand(); - const FWD_PACKET: u8 = 1; + let fwd_block_hash = Hash::new_unique(); let forwarded_packet = { - let mut packet = Packet::from_data(None, &[FWD_PACKET]).unwrap(); + let transaction = system_transaction::transfer(&keypair, &pubkey, 1, fwd_block_hash); + let mut packet = Packet::from_data(None, &transaction).unwrap(); packet.meta.flags |= PacketFlags::FORWARDED; packet }; - const NORMAL_PACKET: u8 = 2; - let normal_packet = Packet::from_data(None, &[NORMAL_PACKET]).unwrap(); + let normal_block_hash = Hash::new_unique(); + let normal_packet = { + let transaction = system_transaction::transfer(&keypair, &pubkey, 1, normal_block_hash); + Packet::from_data(None, &transaction).unwrap() + }; let packet_batch = PacketBatch::new(vec![forwarded_packet, normal_packet]); let mut unprocessed_packet_batches: UnprocessedPacketBatches = - vec![(packet_batch, vec![0, 1], false)] - .into_iter() - .collect(); + vec![DeserializedPacketBatch::new( + packet_batch, + vec![0, 1], + false, + )] + .into_iter() + .collect(); let genesis_config_info = create_slow_genesis_config(10_000); let GenesisConfigInfo { @@ -3862,7 +3870,7 @@ mod tests { "fwd-normal", ForwardOption::ForwardTransaction, true, - vec![NORMAL_PACKET], + vec![normal_block_hash], 2, ), ( @@ -3901,13 +3909,20 @@ mod tests { let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); assert_eq!(num_received, expected_ids.len(), "{}", name); for (i, expected_id) in expected_ids.iter().enumerate() { - assert_eq!(packets[i].meta.size, 1); - assert_eq!(packets[i].data[0], *expected_id, "{}", name); + assert_eq!(packets[i].meta.size, 215); + let recv_transaction: VersionedTransaction = + limited_deserialize(&packets[i].data[0..packets[i].meta.size]).unwrap(); + assert_eq!( + recv_transaction.message.recent_blockhash(), + expected_id, + "{}", + name + ); } let num_unprocessed_packets: usize = unprocessed_packet_batches .iter() - .map(|(b, ..)| b.packets.len()) + .map(|b| b.packet_batch.packets.len()) .sum(); assert_eq!( num_unprocessed_packets, expected_num_unprocessed, @@ -3926,11 +3941,24 @@ mod tests { fn test_push_unprocessed_batch_limit() { solana_logger::setup(); // Create `PacketBatch` with 2 unprocessed packets - let new_packet_batch = PacketBatch::new(vec![Packet::default(); 2]); - let mut unprocessed_packets: UnprocessedPacketBatches = - vec![(new_packet_batch, vec![0, 1], false)] - .into_iter() - .collect(); + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, &tx).unwrap(); + let new_packet_batch = PacketBatch::new(vec![packet; 2]); + let mut unprocessed_packets: UnprocessedPacketBatches = vec![DeserializedPacketBatch::new( + new_packet_batch, + vec![0, 1], + false, + )] + .into_iter() + .collect(); + assert_eq!(unprocessed_packets.len(), 1); + assert_eq!(unprocessed_packets[0].unprocessed_packets.len(), 2); + // Set the limit to 2 let batch_limit = 2; // Create new unprocessed packets and add to a batch @@ -3999,7 +4027,7 @@ mod tests { ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!( - unprocessed_packets[1].0.packets[0], + unprocessed_packets[1].packet_batch.packets[0], new_packet_batch.packets[0] ); assert_eq!(dropped_packet_batches_count, 1); @@ -4007,19 +4035,6 @@ mod tests { assert_eq!(newly_buffered_packets_count, 2); } - #[test] - fn test_packet_message() { - let keypair = Keypair::new(); - let pubkey = solana_sdk::pubkey::new_rand(); - let blockhash = Hash::new_unique(); - let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash); - let packet = Packet::from_data(None, &transaction).unwrap(); - assert_eq!( - BankingStage::packet_message(&packet).unwrap().to_vec(), - transaction.message_data() - ); - } - #[cfg(test)] fn make_test_packets( transactions: Vec, diff --git a/core/src/lib.rs b/core/src/lib.rs index 8cc6f046cd..8f8e9adc3d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -64,6 +64,7 @@ pub mod tpu; pub mod tree_diff; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; +pub mod unprocessed_packet_batches; pub mod validator; pub mod verified_vote_packets; pub mod vote_simulator; diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs new file mode 100644 index 0000000000..fd7426295b --- /dev/null +++ b/core/src/unprocessed_packet_batches.rs @@ -0,0 +1,131 @@ +use { + solana_perf::packet::{limited_deserialize, Packet, PacketBatch}, + solana_sdk::{ + hash::Hash, message::Message, short_vec::decode_shortu16_len, signature::Signature, + transaction::VersionedTransaction, + }, + std::{ + collections::{HashMap, VecDeque}, + mem::size_of, + }, +}; + +pub type UnprocessedPacketBatches = VecDeque; + +/// hold deserialized messages, as well as computed message_hash and other things needed to create +/// SanitizedTransaction +#[derive(Debug, Default)] +pub struct DeserializedPacket { + #[allow(dead_code)] + versioned_transaction: VersionedTransaction, + + #[allow(dead_code)] + message_hash: Hash, + + #[allow(dead_code)] + is_simple_vote: bool, +} + +#[derive(Debug, Default)] +pub struct DeserializedPacketBatch { + pub packet_batch: PacketBatch, + pub forwarded: bool, + // indexes of valid packets in batch, and their corrersponding deserialized_packet + pub unprocessed_packets: HashMap, +} + +impl DeserializedPacketBatch { + pub fn new(packet_batch: PacketBatch, packet_indexes: Vec, forwarded: bool) -> Self { + let unprocessed_packets = Self::deserialize_packets(&packet_batch, &packet_indexes); + Self { + packet_batch, + unprocessed_packets, + forwarded, + } + } + + fn deserialize_packets( + packet_batch: &PacketBatch, + packet_indexes: &[usize], + ) -> HashMap { + packet_indexes + .iter() + .filter_map(|packet_index| { + let deserialized_packet = + Self::deserialize_packet(&packet_batch.packets[*packet_index])?; + Some((*packet_index, deserialized_packet)) + }) + .collect() + } + + fn deserialize_packet(packet: &Packet) -> Option { + let versioned_transaction: VersionedTransaction = + match limited_deserialize(&packet.data[0..packet.meta.size]) { + Ok(tx) => tx, + Err(_) => return None, + }; + + if let Some(message_bytes) = Self::packet_message(packet) { + let message_hash = Message::hash_raw_message(message_bytes); + let is_simple_vote = packet.meta.is_simple_vote_tx(); + Some(DeserializedPacket { + versioned_transaction, + message_hash, + is_simple_vote, + }) + } else { + None + } + } + + /// Read the transaction message from packet data + pub fn packet_message(packet: &Packet) -> Option<&[u8]> { + let (sig_len, sig_size) = decode_shortu16_len(&packet.data).ok()?; + let msg_start = sig_len + .checked_mul(size_of::()) + .and_then(|v| v.checked_add(sig_size))?; + let msg_end = packet.meta.size; + Some(&packet.data[msg_start..msg_end]) + } + + // Returns whether the given `PacketBatch` has any more remaining unprocessed + // transactions + pub fn update_buffered_packets_with_new_unprocessed( + &mut self, + _original_unprocessed_indexes: &[usize], + new_unprocessed_indexes: &[usize], + ) -> bool { + let has_more_unprocessed_transactions = !new_unprocessed_indexes.is_empty(); + if has_more_unprocessed_transactions { + self.unprocessed_packets + .retain(|index, _| new_unprocessed_indexes.contains(index)); + } else { + self.unprocessed_packets.clear(); + } + + has_more_unprocessed_transactions + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_sdk::{signature::Keypair, system_transaction}, + }; + + #[test] + fn test_packet_message() { + let keypair = Keypair::new(); + let pubkey = solana_sdk::pubkey::new_rand(); + let blockhash = Hash::new_unique(); + let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash); + let packet = Packet::from_data(None, &transaction).unwrap(); + assert_eq!( + DeserializedPacketBatch::packet_message(&packet) + .unwrap() + .to_vec(), + transaction.message_data() + ); + } +}