Banking stage: Deserialize packets only once

Benchmarks show roughly a 6% improvement. The impact could be more
significant when transactions need to be retried a lot.

after patch:
{'name': 'banking_bench_total', 'median': '72767.43'}
{'name': 'banking_bench_tx_total', 'median': '80240.38'}
{'name': 'banking_bench_success_tx_total', 'median': '72767.43'}
test bench_banking_stage_multi_accounts
... bench:   6,137,264 ns/iter (+/- 1,364,111)
test bench_banking_stage_multi_programs
... bench:  10,086,435 ns/iter (+/- 2,921,440)

before patch:
{'name': 'banking_bench_total', 'median': '68572.26'}
{'name': 'banking_bench_tx_total', 'median': '75704.75'}
{'name': 'banking_bench_success_tx_total', 'median': '68572.26'}
test bench_banking_stage_multi_accounts
... bench:   6,521,007 ns/iter (+/- 1,926,741)
test bench_banking_stage_multi_programs
... bench:  10,526,433 ns/iter (+/- 2,736,530)

(cherry picked from commit 97f2eb8e65)
This commit is contained in:
Christian Kamm
2022-04-11 17:37:45 +02:00
committed by mergify-bot
parent c289cd2a4b
commit f5c8c0ffad
2 changed files with 48 additions and 64 deletions

View File

@ -23,7 +23,7 @@ use {
solana_perf::{ solana_perf::{
cuda_runtime::PinnedVec, cuda_runtime::PinnedVec,
data_budget::DataBudget, data_budget::DataBudget,
packet::{limited_deserialize, Packet, PacketBatch, PACKETS_PER_BATCH}, packet::{Packet, PacketBatch, PACKETS_PER_BATCH},
perf_libs, perf_libs,
}, },
solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder},
@ -45,13 +45,10 @@ use {
MAX_TRANSACTION_FORWARDING_DELAY_GPU, MAX_TRANSACTION_FORWARDING_DELAY_GPU,
}, },
feature_set, feature_set,
message::Message,
pubkey::Pubkey, pubkey::Pubkey,
saturating_add_assign, saturating_add_assign,
timing::{duration_as_ms, timestamp, AtomicInterval}, timing::{duration_as_ms, timestamp, AtomicInterval},
transaction::{ transaction::{self, AddressLoader, SanitizedTransaction, TransactionError},
self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction,
},
transport::TransportError, transport::TransportError,
}, },
solana_transaction_status::token_balances::{ solana_transaction_status::token_balances::{
@ -558,7 +555,6 @@ impl BankingStage {
let mut reached_end_of_slot: Option<EndOfSlot> = None; let mut reached_end_of_slot: Option<EndOfSlot> = None;
RetainMut::retain_mut(buffered_packet_batches, |deserialized_packet_batch| { RetainMut::retain_mut(buffered_packet_batches, |deserialized_packet_batch| {
let packet_batch = &deserialized_packet_batch.packet_batch;
let original_unprocessed_indexes = deserialized_packet_batch let original_unprocessed_indexes = deserialized_packet_batch
.unprocessed_packets .unprocessed_packets
.keys() .keys()
@ -572,8 +568,7 @@ impl BankingStage {
let should_retain = if let Some(bank) = &end_of_slot.working_bank { let should_retain = if let Some(bank) = &end_of_slot.working_bank {
let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot(
bank, bank,
packet_batch, &deserialized_packet_batch.unprocessed_packets,
&original_unprocessed_indexes,
my_pubkey, my_pubkey,
end_of_slot.next_slot_leader, end_of_slot.next_slot_leader,
banking_stage_stats, banking_stage_stats,
@ -624,8 +619,7 @@ impl BankingStage {
&working_bank, &working_bank,
&bank_creation_time, &bank_creation_time,
recorder, recorder,
packet_batch, &deserialized_packet_batch.unprocessed_packets,
original_unprocessed_indexes.to_owned(),
transaction_status_sender.clone(), transaction_status_sender.clone(),
gossip_vote_sender, gossip_vote_sender,
banking_stage_stats, banking_stage_stats,
@ -1687,32 +1681,27 @@ impl BankingStage {
// with their packet indexes. // with their packet indexes.
#[allow(clippy::needless_collect)] #[allow(clippy::needless_collect)]
fn transactions_from_packets( fn transactions_from_packets(
packet_batch: &PacketBatch, deserialized_packet_batch: &HashMap<usize, DeserializedPacket>,
transaction_indexes: &[usize],
feature_set: &Arc<feature_set::FeatureSet>, feature_set: &Arc<feature_set::FeatureSet>,
votes_only: bool, votes_only: bool,
address_loader: impl AddressLoader, address_loader: impl AddressLoader,
) -> (Vec<SanitizedTransaction>, Vec<usize>) { ) -> (Vec<SanitizedTransaction>, Vec<usize>) {
transaction_indexes deserialized_packet_batch
.iter() .iter()
.filter_map(|tx_index| { .filter_map(|(&tx_index, deserialized_packet)| {
let p = &packet_batch.packets[*tx_index]; if votes_only && !deserialized_packet.is_simple_vote {
if votes_only && !p.meta.is_simple_vote_tx() {
return None; return None;
} }
let tx: VersionedTransaction = limited_deserialize(&p.data[0..p.meta.size]).ok()?;
let message_bytes = DeserializedPacketBatch::packet_message(p)?;
let message_hash = Message::hash_raw_message(message_bytes);
let tx = SanitizedTransaction::try_create( let tx = SanitizedTransaction::try_create(
tx, deserialized_packet.versioned_transaction.clone(),
message_hash, deserialized_packet.message_hash,
Some(p.meta.is_simple_vote_tx()), Some(deserialized_packet.is_simple_vote),
address_loader.clone(), address_loader.clone(),
) )
.ok()?; .ok()?;
tx.verify_precompiles(feature_set).ok()?; tx.verify_precompiles(feature_set).ok()?;
Some((tx, *tx_index)) Some((tx, tx_index))
}) })
.unzip() .unzip()
} }
@ -1761,8 +1750,7 @@ impl BankingStage {
bank: &Arc<Bank>, bank: &Arc<Bank>,
bank_creation_time: &Instant, bank_creation_time: &Instant,
poh: &TransactionRecorder, poh: &TransactionRecorder,
packet_batch: &PacketBatch, deserialized_packet_batch: &HashMap<usize, DeserializedPacket>,
packet_indexes: Vec<usize>,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender, gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
@ -1773,8 +1761,7 @@ impl BankingStage {
let ((transactions, transaction_to_packet_indexes), packet_conversion_time) = Measure::this( let ((transactions, transaction_to_packet_indexes), packet_conversion_time) = Measure::this(
|_| { |_| {
Self::transactions_from_packets( Self::transactions_from_packets(
packet_batch, deserialized_packet_batch,
&packet_indexes,
&bank.feature_set, &bank.feature_set,
bank.vote_only_bank(), bank.vote_only_bank(),
bank.as_ref(), bank.as_ref(),
@ -1862,8 +1849,7 @@ impl BankingStage {
fn filter_unprocessed_packets_at_end_of_slot( fn filter_unprocessed_packets_at_end_of_slot(
bank: &Arc<Bank>, bank: &Arc<Bank>,
packet_batch: &PacketBatch, deserialized_packet_batch: &HashMap<usize, DeserializedPacket>,
transaction_indexes: &[usize],
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
next_leader: Option<Pubkey>, next_leader: Option<Pubkey>,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
@ -1873,15 +1859,17 @@ impl BankingStage {
// Filtering helps if we were going to forward the packets to some other node // Filtering helps if we were going to forward the packets to some other node
if let Some(leader) = next_leader { if let Some(leader) = next_leader {
if leader == *my_pubkey { if leader == *my_pubkey {
return transaction_indexes.to_vec(); return deserialized_packet_batch
.keys()
.cloned()
.collect::<Vec<usize>>();
} }
} }
let mut unprocessed_packet_conversion_time = let mut unprocessed_packet_conversion_time =
Measure::start("unprocessed_packet_conversion"); Measure::start("unprocessed_packet_conversion");
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
packet_batch, deserialized_packet_batch,
transaction_indexes,
&bank.feature_set, &bank.feature_set,
bank.vote_only_bank(), bank.vote_only_bank(),
bank.as_ref(), bank.as_ref(),
@ -2119,7 +2107,7 @@ mod tests {
get_tmp_ledger_path_auto_delete, get_tmp_ledger_path_auto_delete,
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
}, },
solana_perf::packet::{to_packet_batches, PacketFlags}, solana_perf::packet::{limited_deserialize, to_packet_batches, PacketFlags},
solana_poh::{ solana_poh::{
poh_recorder::{create_test_recorder, Record, WorkingBankEntry}, poh_recorder::{create_test_recorder, Record, WorkingBankEntry},
poh_service::PohService, poh_service::PohService,
@ -2139,7 +2127,10 @@ mod tests {
signature::{Keypair, Signer}, signature::{Keypair, Signer},
system_instruction::SystemError, system_instruction::SystemError,
system_transaction, system_transaction,
transaction::{MessageHash, SimpleAddressLoader, Transaction, TransactionError}, transaction::{
MessageHash, SimpleAddressLoader, Transaction, TransactionError,
VersionedTransaction,
},
}, },
solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace},
solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta}, solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta},
@ -4245,7 +4236,7 @@ mod tests {
fn make_test_packets( fn make_test_packets(
transactions: Vec<Transaction>, transactions: Vec<Transaction>,
vote_indexes: Vec<usize>, vote_indexes: Vec<usize>,
) -> (PacketBatch, Vec<usize>) { ) -> DeserializedPacketBatch {
let capacity = transactions.len(); let capacity = transactions.len();
let mut packet_batch = PacketBatch::with_capacity(capacity); let mut packet_batch = PacketBatch::with_capacity(capacity);
let mut packet_indexes = Vec::with_capacity(capacity); let mut packet_indexes = Vec::with_capacity(capacity);
@ -4257,7 +4248,7 @@ mod tests {
for index in vote_indexes.iter() { for index in vote_indexes.iter() {
packet_batch.packets[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX; packet_batch.packets[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX;
} }
(packet_batch, packet_indexes) DeserializedPacketBatch::new(packet_batch, packet_indexes, false)
} }
#[test] #[test]
@ -4275,28 +4266,30 @@ mod tests {
&keypair, &keypair,
None, None,
); );
let sorted = |mut v: Vec<usize>| {
v.sort_unstable();
v
};
// packets with no votes // packets with no votes
{ {
let vote_indexes = vec![]; let vote_indexes = vec![];
let (packet_batch, packet_indexes) = let deserialized_packet_batch =
make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes);
let mut votes_only = false; let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets( let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch, &deserialized_packet_batch.unprocessed_packets,
&packet_indexes,
&Arc::new(FeatureSet::default()), &Arc::new(FeatureSet::default()),
votes_only, votes_only,
SimpleAddressLoader::Disabled, SimpleAddressLoader::Disabled,
); );
assert_eq!(2, txs.len()); assert_eq!(2, txs.len());
assert_eq!(vec![0, 1], tx_packet_index); assert_eq!(vec![0, 1], sorted(tx_packet_index));
votes_only = true; votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets( let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch, &deserialized_packet_batch.unprocessed_packets,
&packet_indexes,
&Arc::new(FeatureSet::default()), &Arc::new(FeatureSet::default()),
votes_only, votes_only,
SimpleAddressLoader::Disabled, SimpleAddressLoader::Disabled,
@ -4308,63 +4301,59 @@ mod tests {
// packets with some votes // packets with some votes
{ {
let vote_indexes = vec![0, 2]; let vote_indexes = vec![0, 2];
let (packet_batch, packet_indexes) = make_test_packets( let deserialized_packet_batch = make_test_packets(
vec![vote_tx.clone(), transfer_tx, vote_tx.clone()], vec![vote_tx.clone(), transfer_tx, vote_tx.clone()],
vote_indexes, vote_indexes,
); );
let mut votes_only = false; let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets( let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch, &deserialized_packet_batch.unprocessed_packets,
&packet_indexes,
&Arc::new(FeatureSet::default()), &Arc::new(FeatureSet::default()),
votes_only, votes_only,
SimpleAddressLoader::Disabled, SimpleAddressLoader::Disabled,
); );
assert_eq!(3, txs.len()); assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index); assert_eq!(vec![0, 1, 2], sorted(tx_packet_index));
votes_only = true; votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets( let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch, &deserialized_packet_batch.unprocessed_packets,
&packet_indexes,
&Arc::new(FeatureSet::default()), &Arc::new(FeatureSet::default()),
votes_only, votes_only,
SimpleAddressLoader::Disabled, SimpleAddressLoader::Disabled,
); );
assert_eq!(2, txs.len()); assert_eq!(2, txs.len());
assert_eq!(vec![0, 2], tx_packet_index); assert_eq!(vec![0, 2], sorted(tx_packet_index));
} }
// packets with all votes // packets with all votes
{ {
let vote_indexes = vec![0, 1, 2]; let vote_indexes = vec![0, 1, 2];
let (packet_batch, packet_indexes) = make_test_packets( let deserialized_packet_batch = make_test_packets(
vec![vote_tx.clone(), vote_tx.clone(), vote_tx], vec![vote_tx.clone(), vote_tx.clone(), vote_tx],
vote_indexes, vote_indexes,
); );
let mut votes_only = false; let mut votes_only = false;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets( let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch, &deserialized_packet_batch.unprocessed_packets,
&packet_indexes,
&Arc::new(FeatureSet::default()), &Arc::new(FeatureSet::default()),
votes_only, votes_only,
SimpleAddressLoader::Disabled, SimpleAddressLoader::Disabled,
); );
assert_eq!(3, txs.len()); assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index); assert_eq!(vec![0, 1, 2], sorted(tx_packet_index));
votes_only = true; votes_only = true;
let (txs, tx_packet_index) = BankingStage::transactions_from_packets( let (txs, tx_packet_index) = BankingStage::transactions_from_packets(
&packet_batch, &deserialized_packet_batch.unprocessed_packets,
&packet_indexes,
&Arc::new(FeatureSet::default()), &Arc::new(FeatureSet::default()),
votes_only, votes_only,
SimpleAddressLoader::Disabled, SimpleAddressLoader::Disabled,
); );
assert_eq!(3, txs.len()); assert_eq!(3, txs.len());
assert_eq!(vec![0, 1, 2], tx_packet_index); assert_eq!(vec![0, 1, 2], sorted(tx_packet_index));
} }
} }

View File

@ -15,14 +15,9 @@ use {
/// SanitizedTransaction /// SanitizedTransaction
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct DeserializedPacket { pub struct DeserializedPacket {
#[allow(dead_code)] pub versioned_transaction: VersionedTransaction,
versioned_transaction: VersionedTransaction, pub message_hash: Hash,
pub is_simple_vote: bool,
#[allow(dead_code)]
message_hash: Hash,
#[allow(dead_code)]
is_simple_vote: bool,
} }
/// Defines the type of entry in `UnprocessedPacketBatches`, it holds original packet_batch /// Defines the type of entry in `UnprocessedPacketBatches`, it holds original packet_batch