From 41142a7d760cee7ffdac14f0a0fc0b579fe96992 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 7 Feb 2022 18:25:33 +0000 Subject: [PATCH] Optimize batching of transactions during replay for parallel processing (backport #22917) (#22982) * Optimize batching of transactions during replay for parallel processing (cherry picked from commit 4de14e530b09cceba10cf47d91478af800011d38) * fix build (cherry picked from commit dfef68f985698418ceb58f21124b537c0d9dd68b) * updates to address review feedback (cherry picked from commit c5d8560cdb52c22750bbd11b63da29f54bfecb74) * suppress clippy (cherry picked from commit a146f2d8537d4a4302057099856f76923d045e21) Co-authored-by: Pankaj Garg --- ledger/src/blockstore_processor.rs | 73 +++++++++++++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index b5e36ef2b0..dd9f8860ad 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -28,6 +28,7 @@ use { bank_utils, block_cost_limits::*, commitment::VOTE_THRESHOLD_SIZE, + cost_model::CostModel, snapshot_config::SnapshotConfig, snapshot_package::{AccountsPackageSender, SnapshotType}, snapshot_utils::{self, BankFromArchiveTimings}, @@ -53,6 +54,7 @@ use { collect_token_balances, TransactionTokenBalancesSet, }, std::{ + borrow::Cow, cell::RefCell, collections::{HashMap, HashSet}, path::PathBuf, @@ -247,7 +249,7 @@ fn execute_batch( first_err.map(|(result, _)| result).unwrap_or(Ok(())) } -fn execute_batches( +fn execute_batches_internal( bank: &Arc, batches: &[TransactionBatch], entry_callback: Option<&ProcessCallback>, @@ -290,6 +292,75 @@ fn execute_batches( first_err(&results) } +fn execute_batches( + bank: &Arc, + batches: &[TransactionBatch], + entry_callback: Option<&ProcessCallback>, + transaction_status_sender: Option<&TransactionStatusSender>, + replay_vote_sender: Option<&ReplayVoteSender>, + timings: &mut ExecuteTimings, + cost_capacity_meter: Arc>, +) -> Result<()> { + let lock_results = batches + .iter() + .flat_map(|batch| batch.lock_results().clone()) + .collect::>(); + let sanitized_txs = batches + .iter() + .flat_map(|batch| batch.sanitized_transactions().to_vec()) + .collect::>(); + + let cost_model = CostModel::new(); + let mut minimal_tx_cost = u64::MAX; + let mut total_cost: u64 = 0; + // Allowing collect here, since it also computes the minimal tx cost, and aggregate cost. + // These two values are later used for checking if the tx_costs vector needs to be iterated over. + #[allow(clippy::needless_collect)] + let tx_costs = sanitized_txs + .iter() + .map(|tx| { + let cost = cost_model.calculate_cost(tx).sum(); + minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost); + total_cost = total_cost.saturating_add(cost); + cost + }) + .collect::>(); + + let target_batch_count = get_thread_count() as u64; + + let mut tx_batches: Vec = vec![]; + let rebatched_txs = if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) { + let target_batch_cost = total_cost / target_batch_count; + let mut batch_cost: u64 = 0; + let mut slice_start = 0; + tx_costs.into_iter().enumerate().for_each(|(index, cost)| { + let next_index = index + 1; + batch_cost = batch_cost.saturating_add(cost); + if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() { + let txs = &sanitized_txs[slice_start..=index]; + let results = &lock_results[slice_start..=index]; + let tx_batch = TransactionBatch::new(results.to_vec(), bank, Cow::from(txs)); + slice_start = next_index; + tx_batches.push(tx_batch); + batch_cost = 0; + } + }); + &tx_batches[..] + } else { + batches + }; + + execute_batches_internal( + bank, + rebatched_txs, + entry_callback, + transaction_status_sender, + replay_vote_sender, + timings, + cost_capacity_meter, + ) +} + /// Process an ordered list of entries in parallel /// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry /// 2. Process the locked group in parallel