* Optimize batching of transactions during replay for parallel processing (cherry picked from commit4de14e530b
) * fix build (cherry picked from commitdfef68f985
) * updates to address review feedback (cherry picked from commitc5d8560cdb
) * suppress clippy (cherry picked from commita146f2d853
) Co-authored-by: Pankaj Garg <pankaj@solana.com>
This commit is contained in:
@ -28,6 +28,7 @@ use {
|
|||||||
bank_utils,
|
bank_utils,
|
||||||
block_cost_limits::*,
|
block_cost_limits::*,
|
||||||
commitment::VOTE_THRESHOLD_SIZE,
|
commitment::VOTE_THRESHOLD_SIZE,
|
||||||
|
cost_model::CostModel,
|
||||||
snapshot_config::SnapshotConfig,
|
snapshot_config::SnapshotConfig,
|
||||||
snapshot_package::{AccountsPackageSender, SnapshotType},
|
snapshot_package::{AccountsPackageSender, SnapshotType},
|
||||||
snapshot_utils::{self, BankFromArchiveTimings},
|
snapshot_utils::{self, BankFromArchiveTimings},
|
||||||
@ -53,6 +54,7 @@ use {
|
|||||||
collect_token_balances, TransactionTokenBalancesSet,
|
collect_token_balances, TransactionTokenBalancesSet,
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
|
borrow::Cow,
|
||||||
cell::RefCell,
|
cell::RefCell,
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
@ -247,7 +249,7 @@ fn execute_batch(
|
|||||||
first_err.map(|(result, _)| result).unwrap_or(Ok(()))
|
first_err.map(|(result, _)| result).unwrap_or(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn execute_batches(
|
fn execute_batches_internal(
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
batches: &[TransactionBatch],
|
batches: &[TransactionBatch],
|
||||||
entry_callback: Option<&ProcessCallback>,
|
entry_callback: Option<&ProcessCallback>,
|
||||||
@ -290,6 +292,75 @@ fn execute_batches(
|
|||||||
first_err(&results)
|
first_err(&results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn execute_batches(
|
||||||
|
bank: &Arc<Bank>,
|
||||||
|
batches: &[TransactionBatch],
|
||||||
|
entry_callback: Option<&ProcessCallback>,
|
||||||
|
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||||
|
replay_vote_sender: Option<&ReplayVoteSender>,
|
||||||
|
timings: &mut ExecuteTimings,
|
||||||
|
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let lock_results = batches
|
||||||
|
.iter()
|
||||||
|
.flat_map(|batch| batch.lock_results().clone())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let sanitized_txs = batches
|
||||||
|
.iter()
|
||||||
|
.flat_map(|batch| batch.sanitized_transactions().to_vec())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let cost_model = CostModel::new();
|
||||||
|
let mut minimal_tx_cost = u64::MAX;
|
||||||
|
let mut total_cost: u64 = 0;
|
||||||
|
// Allowing collect here, since it also computes the minimal tx cost, and aggregate cost.
|
||||||
|
// These two values are later used for checking if the tx_costs vector needs to be iterated over.
|
||||||
|
#[allow(clippy::needless_collect)]
|
||||||
|
let tx_costs = sanitized_txs
|
||||||
|
.iter()
|
||||||
|
.map(|tx| {
|
||||||
|
let cost = cost_model.calculate_cost(tx).sum();
|
||||||
|
minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost);
|
||||||
|
total_cost = total_cost.saturating_add(cost);
|
||||||
|
cost
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let target_batch_count = get_thread_count() as u64;
|
||||||
|
|
||||||
|
let mut tx_batches: Vec<TransactionBatch> = vec![];
|
||||||
|
let rebatched_txs = if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) {
|
||||||
|
let target_batch_cost = total_cost / target_batch_count;
|
||||||
|
let mut batch_cost: u64 = 0;
|
||||||
|
let mut slice_start = 0;
|
||||||
|
tx_costs.into_iter().enumerate().for_each(|(index, cost)| {
|
||||||
|
let next_index = index + 1;
|
||||||
|
batch_cost = batch_cost.saturating_add(cost);
|
||||||
|
if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() {
|
||||||
|
let txs = &sanitized_txs[slice_start..=index];
|
||||||
|
let results = &lock_results[slice_start..=index];
|
||||||
|
let tx_batch = TransactionBatch::new(results.to_vec(), bank, Cow::from(txs));
|
||||||
|
slice_start = next_index;
|
||||||
|
tx_batches.push(tx_batch);
|
||||||
|
batch_cost = 0;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
&tx_batches[..]
|
||||||
|
} else {
|
||||||
|
batches
|
||||||
|
};
|
||||||
|
|
||||||
|
execute_batches_internal(
|
||||||
|
bank,
|
||||||
|
rebatched_txs,
|
||||||
|
entry_callback,
|
||||||
|
transaction_status_sender,
|
||||||
|
replay_vote_sender,
|
||||||
|
timings,
|
||||||
|
cost_capacity_meter,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
/// Process an ordered list of entries in parallel
|
/// Process an ordered list of entries in parallel
|
||||||
/// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry
|
/// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry
|
||||||
/// 2. Process the locked group in parallel
|
/// 2. Process the locked group in parallel
|
||||||
|
Reference in New Issue
Block a user