From f5c8c0ffad319a135d5a644e868c85be74fe6a89 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Mon, 11 Apr 2022 17:37:45 +0200 Subject: [PATCH] Banking stage: Deserialize packets only once Benchmarks show roughly a 6% improvement. The impact could be more significant when transactions need to be retried a lot. after patch: {'name': 'banking_bench_total', 'median': '72767.43'} {'name': 'banking_bench_tx_total', 'median': '80240.38'} {'name': 'banking_bench_success_tx_total', 'median': '72767.43'} test bench_banking_stage_multi_accounts ... bench: 6,137,264 ns/iter (+/- 1,364,111) test bench_banking_stage_multi_programs ... bench: 10,086,435 ns/iter (+/- 2,921,440) before patch: {'name': 'banking_bench_total', 'median': '68572.26'} {'name': 'banking_bench_tx_total', 'median': '75704.75'} {'name': 'banking_bench_success_tx_total', 'median': '68572.26'} test bench_banking_stage_multi_accounts ... bench: 6,521,007 ns/iter (+/- 1,926,741) test bench_banking_stage_multi_programs ... bench: 10,526,433 ns/iter (+/- 2,736,530) (cherry picked from commit 97f2eb8e65ffbf6d8b2abb1b1af7800f772dc406) --- core/src/banking_stage.rs | 101 +++++++++++-------------- core/src/unprocessed_packet_batches.rs | 11 +-- 2 files changed, 48 insertions(+), 64 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 1ee6a4fc6d..eb03a60a7d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -23,7 +23,7 @@ use { solana_perf::{ cuda_runtime::PinnedVec, data_budget::DataBudget, - packet::{limited_deserialize, Packet, PacketBatch, PACKETS_PER_BATCH}, + packet::{Packet, PacketBatch, PACKETS_PER_BATCH}, perf_libs, }, solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, @@ -45,13 +45,10 @@ use { MAX_TRANSACTION_FORWARDING_DELAY_GPU, }, feature_set, - message::Message, pubkey::Pubkey, saturating_add_assign, timing::{duration_as_ms, timestamp, AtomicInterval}, - transaction::{ - self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction, - }, + transaction::{self, AddressLoader, SanitizedTransaction, TransactionError}, transport::TransportError, }, solana_transaction_status::token_balances::{ @@ -558,7 +555,6 @@ impl BankingStage { let mut reached_end_of_slot: Option = None; RetainMut::retain_mut(buffered_packet_batches, |deserialized_packet_batch| { - let packet_batch = &deserialized_packet_batch.packet_batch; let original_unprocessed_indexes = deserialized_packet_batch .unprocessed_packets .keys() @@ -572,8 +568,7 @@ impl BankingStage { let should_retain = if let Some(bank) = &end_of_slot.working_bank { let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( bank, - packet_batch, - &original_unprocessed_indexes, + &deserialized_packet_batch.unprocessed_packets, my_pubkey, end_of_slot.next_slot_leader, banking_stage_stats, @@ -624,8 +619,7 @@ impl BankingStage { &working_bank, &bank_creation_time, recorder, - packet_batch, - original_unprocessed_indexes.to_owned(), + &deserialized_packet_batch.unprocessed_packets, transaction_status_sender.clone(), gossip_vote_sender, banking_stage_stats, @@ -1687,32 +1681,27 @@ impl BankingStage { // with their packet indexes. #[allow(clippy::needless_collect)] fn transactions_from_packets( - packet_batch: &PacketBatch, - transaction_indexes: &[usize], + deserialized_packet_batch: &HashMap, feature_set: &Arc, votes_only: bool, address_loader: impl AddressLoader, ) -> (Vec, Vec) { - transaction_indexes + deserialized_packet_batch .iter() - .filter_map(|tx_index| { - let p = &packet_batch.packets[*tx_index]; - if votes_only && !p.meta.is_simple_vote_tx() { + .filter_map(|(&tx_index, deserialized_packet)| { + if votes_only && !deserialized_packet.is_simple_vote { return None; } - let tx: VersionedTransaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?; - let message_bytes = DeserializedPacketBatch::packet_message(p)?; - let message_hash = Message::hash_raw_message(message_bytes); let tx = SanitizedTransaction::try_create( - tx, - message_hash, - Some(p.meta.is_simple_vote_tx()), + deserialized_packet.versioned_transaction.clone(), + deserialized_packet.message_hash, + Some(deserialized_packet.is_simple_vote), address_loader.clone(), ) .ok()?; tx.verify_precompiles(feature_set).ok()?; - Some((tx, *tx_index)) + Some((tx, tx_index)) }) .unzip() } @@ -1761,8 +1750,7 @@ impl BankingStage { bank: &Arc, bank_creation_time: &Instant, poh: &TransactionRecorder, - packet_batch: &PacketBatch, - packet_indexes: Vec, + deserialized_packet_batch: &HashMap, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, @@ -1773,8 +1761,7 @@ impl BankingStage { let ((transactions, transaction_to_packet_indexes), packet_conversion_time) = Measure::this( |_| { Self::transactions_from_packets( - packet_batch, - &packet_indexes, + deserialized_packet_batch, &bank.feature_set, bank.vote_only_bank(), bank.as_ref(), @@ -1862,8 +1849,7 @@ impl BankingStage { fn filter_unprocessed_packets_at_end_of_slot( bank: &Arc, - packet_batch: &PacketBatch, - transaction_indexes: &[usize], + deserialized_packet_batch: &HashMap, my_pubkey: &Pubkey, next_leader: Option, banking_stage_stats: &BankingStageStats, @@ -1873,15 +1859,17 @@ impl BankingStage { // Filtering helps if we were going to forward the packets to some other node if let Some(leader) = next_leader { if leader == *my_pubkey { - return transaction_indexes.to_vec(); + return deserialized_packet_batch + .keys() + .cloned() + .collect::>(); } } let mut unprocessed_packet_conversion_time = Measure::start("unprocessed_packet_conversion"); let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( - packet_batch, - transaction_indexes, + deserialized_packet_batch, &bank.feature_set, bank.vote_only_bank(), bank.as_ref(), @@ -2119,7 +2107,7 @@ mod tests { get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, }, - solana_perf::packet::{to_packet_batches, PacketFlags}, + solana_perf::packet::{limited_deserialize, to_packet_batches, PacketFlags}, solana_poh::{ poh_recorder::{create_test_recorder, Record, WorkingBankEntry}, poh_service::PohService, @@ -2139,7 +2127,10 @@ mod tests { signature::{Keypair, Signer}, system_instruction::SystemError, system_transaction, - transaction::{MessageHash, SimpleAddressLoader, Transaction, TransactionError}, + transaction::{ + MessageHash, SimpleAddressLoader, Transaction, TransactionError, + VersionedTransaction, + }, }, solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta}, @@ -4245,7 +4236,7 @@ mod tests { fn make_test_packets( transactions: Vec, vote_indexes: Vec, - ) -> (PacketBatch, Vec) { + ) -> DeserializedPacketBatch { let capacity = transactions.len(); let mut packet_batch = PacketBatch::with_capacity(capacity); let mut packet_indexes = Vec::with_capacity(capacity); @@ -4257,7 +4248,7 @@ mod tests { for index in vote_indexes.iter() { packet_batch.packets[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX; } - (packet_batch, packet_indexes) + DeserializedPacketBatch::new(packet_batch, packet_indexes, false) } #[test] @@ -4275,28 +4266,30 @@ mod tests { &keypair, None, ); + let sorted = |mut v: Vec| { + v.sort_unstable(); + v + }; // packets with no votes { let vote_indexes = vec![]; - let (packet_batch, packet_indexes) = + let deserialized_packet_batch = make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); let mut votes_only = false; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packet_batch, - &packet_indexes, + &deserialized_packet_batch.unprocessed_packets, &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, ); assert_eq!(2, txs.len()); - assert_eq!(vec![0, 1], tx_packet_index); + assert_eq!(vec![0, 1], sorted(tx_packet_index)); votes_only = true; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packet_batch, - &packet_indexes, + &deserialized_packet_batch.unprocessed_packets, &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -4308,63 +4301,59 @@ mod tests { // packets with some votes { let vote_indexes = vec![0, 2]; - let (packet_batch, packet_indexes) = make_test_packets( + let deserialized_packet_batch = make_test_packets( vec![vote_tx.clone(), transfer_tx, vote_tx.clone()], vote_indexes, ); let mut votes_only = false; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packet_batch, - &packet_indexes, + &deserialized_packet_batch.unprocessed_packets, &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, ); assert_eq!(3, txs.len()); - assert_eq!(vec![0, 1, 2], tx_packet_index); + assert_eq!(vec![0, 1, 2], sorted(tx_packet_index)); votes_only = true; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packet_batch, - &packet_indexes, + &deserialized_packet_batch.unprocessed_packets, &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, ); assert_eq!(2, txs.len()); - assert_eq!(vec![0, 2], tx_packet_index); + assert_eq!(vec![0, 2], sorted(tx_packet_index)); } // packets with all votes { let vote_indexes = vec![0, 1, 2]; - let (packet_batch, packet_indexes) = make_test_packets( + let deserialized_packet_batch = make_test_packets( vec![vote_tx.clone(), vote_tx.clone(), vote_tx], vote_indexes, ); let mut votes_only = false; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packet_batch, - &packet_indexes, + &deserialized_packet_batch.unprocessed_packets, &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, ); assert_eq!(3, txs.len()); - assert_eq!(vec![0, 1, 2], tx_packet_index); + assert_eq!(vec![0, 1, 2], sorted(tx_packet_index)); votes_only = true; let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &packet_batch, - &packet_indexes, + &deserialized_packet_batch.unprocessed_packets, &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, ); assert_eq!(3, txs.len()); - assert_eq!(vec![0, 1, 2], tx_packet_index); + assert_eq!(vec![0, 1, 2], sorted(tx_packet_index)); } } diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 87af6f0445..56aae6b44c 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -15,14 +15,9 @@ use { /// SanitizedTransaction #[derive(Debug, Default)] pub struct DeserializedPacket { - #[allow(dead_code)] - versioned_transaction: VersionedTransaction, - - #[allow(dead_code)] - message_hash: Hash, - - #[allow(dead_code)] - is_simple_vote: bool, + pub versioned_transaction: VersionedTransaction, + pub message_hash: Hash, + pub is_simple_vote: bool, } /// Defines the type of entry in `UnprocessedPacketBatches`, it holds original packet_batch