Add estimated and actual block cost units metrics (backport #22326) (#22517)

* Add estimated and actual block cost units metrics (#22326)

* - report cost details for transactions selected to be packed into block;
- report estimated execution units packed into block, and actual units and time after execution

* revert reporting per-transaction details

* rollup transaction cost details (eg signature cost, wirte lock, data cost and execution costs) into block stats

* change naming from units to cu, use struct to replace tuple

(cherry picked from commit 1309a9cea0)

# Conflicts:
#	core/src/banking_stage.rs
#	core/src/qos_service.rs

* fix conflicts

Co-authored-by: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com>
Co-authored-by: Tao Zhu <tao@solana.com>
This commit is contained in:
mergify[bot]
2022-01-21 15:05:19 -07:00
committed by GitHub
parent 24345d8e63
commit 35ca3182ba
2 changed files with 227 additions and 13 deletions

View File

@ -24,7 +24,7 @@ use {
accounts_db::ErrorCounters,
bank::{Bank, TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult},
bank_utils,
cost_model::CostModel,
cost_model::{CostModel, TransactionCost},
transaction_batch::TransactionBatch,
vote_sender_types::ReplayVoteSender,
},
@ -271,6 +271,14 @@ impl BankingStageStats {
}
}
#[derive(Debug, Default)]
pub struct BatchedTransactionCostDetails {
pub batched_signature_cost: u64,
pub batched_write_lock_cost: u64,
pub batched_data_bytes_cost: u64,
pub batched_execute_cost: u64,
}
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {
bank_thread_hdls: Vec<JoinHandle<()>>,
@ -864,7 +872,7 @@ impl BankingStage {
batch: &TransactionBatch,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
) -> (Result<usize, PohRecorderError>, Vec<usize>, ExecuteTimings) {
let mut load_execute_time = Measure::start("load_execute_time");
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids.
@ -914,7 +922,7 @@ impl BankingStage {
);
retryable_txs.extend(retryable_record_txs);
if num_to_commit.is_err() {
return (num_to_commit, retryable_txs);
return (num_to_commit, retryable_txs, execute_timings);
}
record_time.stop();
@ -964,7 +972,7 @@ impl BankingStage {
execute_timings
);
(Ok(num_to_commit), retryable_txs)
(Ok(num_to_commit), retryable_txs, execute_timings)
}
pub fn process_and_record_transactions(
@ -981,6 +989,13 @@ impl BankingStage {
let transactions_qos_results =
qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank);
qos_service.accumulate_estimated_transaction_costs(
&Self::accumulate_batched_transaction_costs(
tx_costs.iter(),
transactions_qos_results.iter(),
),
);
// Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
@ -989,9 +1004,11 @@ impl BankingStage {
bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.into_iter());
lock_time.stop();
// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit,
// WouldExceedMaxVoteCostLimit and WouldExceedMaxAccountCostLimit
let (result, mut retryable_txs) = Self::process_and_record_transactions_locked(
// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
// WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit
// and WouldExceedMaxAccountDataCostLimit
let (result, mut retryable_txs, execute_timings) =
Self::process_and_record_transactions_locked(
bank,
poh,
&batch,
@ -1005,6 +1022,10 @@ impl BankingStage {
drop(batch);
unlock_time.stop();
let (cu, us) = Self::accumulate_execute_units_and_time(&execute_timings);
qos_service.accumulate_actual_execute_cu(cu);
qos_service.accumulate_actual_execute_time(us);
debug!(
"bank: {} lock: {}us unlock: {}us txs_len: {}",
bank.slot(),
@ -1016,6 +1037,49 @@ impl BankingStage {
(result, retryable_txs)
}
// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
// execution_cost from the batch of transactions selected for block.
fn accumulate_batched_transaction_costs<'a>(
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_results: impl Iterator<Item = &'a transaction::Result<()>>,
) -> BatchedTransactionCostDetails {
let mut cost_details = BatchedTransactionCostDetails::default();
transactions_costs
.zip(transaction_results)
.for_each(|(cost, result)| {
if result.is_ok() {
cost_details.batched_signature_cost = cost_details
.batched_signature_cost
.saturating_add(cost.signature_cost);
cost_details.batched_write_lock_cost = cost_details
.batched_write_lock_cost
.saturating_add(cost.write_lock_cost);
cost_details.batched_data_bytes_cost = cost_details
.batched_data_bytes_cost
.saturating_add(cost.data_bytes_cost);
cost_details.batched_execute_cost = cost_details
.batched_execute_cost
.saturating_add(cost.execution_cost);
}
});
cost_details
}
fn accumulate_execute_units_and_time(execute_timings: &ExecuteTimings) -> (u64, u64) {
let (units, times): (Vec<_>, Vec<_>) = execute_timings
.details
.per_program_timings
.iter()
.map(|(_program_id, program_timings)| {
(
program_timings.accumulated_units,
program_timings.accumulated_us,
)
})
.unzip();
(units.iter().sum(), times.iter().sum())
}
/// Sends transactions to the bank.
///
/// Returns the number of transactions successfully processed by the bank, which may be less
@ -1579,6 +1643,7 @@ mod tests {
poh_recorder::{create_test_recorder, Record, WorkingBankEntry},
poh_service::PohService,
},
solana_program_runtime::timings::ProgramTiming,
solana_rpc::transaction_status_service::TransactionStatusService,
solana_runtime::bank::TransactionExecutionDetails,
solana_sdk::{
@ -3271,4 +3336,74 @@ mod tests {
assert_eq!(vec![0, 1, 2], tx_packet_index);
}
}
#[test]
fn test_accumulate_batched_transaction_costs() {
let tx_costs = vec![
TransactionCost {
signature_cost: 1,
write_lock_cost: 2,
data_bytes_cost: 3,
execution_cost: 10,
..TransactionCost::default()
},
TransactionCost {
signature_cost: 4,
write_lock_cost: 5,
data_bytes_cost: 6,
execution_cost: 20,
..TransactionCost::default()
},
TransactionCost {
signature_cost: 7,
write_lock_cost: 8,
data_bytes_cost: 9,
execution_cost: 40,
..TransactionCost::default()
},
];
let tx_results = vec![
Ok(()),
Ok(()),
Err(TransactionError::WouldExceedMaxBlockCostLimit),
];
// should only accumulate first two cost that are OK
let expected_signatures = 5;
let expected_write_locks = 7;
let expected_data_bytes = 9;
let expected_executions = 30;
let cost_details =
BankingStage::accumulate_batched_transaction_costs(tx_costs.iter(), tx_results.iter());
assert_eq!(expected_signatures, cost_details.batched_signature_cost);
assert_eq!(expected_write_locks, cost_details.batched_write_lock_cost);
assert_eq!(expected_data_bytes, cost_details.batched_data_bytes_cost);
assert_eq!(expected_executions, cost_details.batched_execute_cost);
}
#[test]
fn test_accumulate_execute_units_and_time() {
let mut execute_timings = ExecuteTimings::default();
let mut expected_units = 0;
let mut expected_us = 0;
for n in 0..10 {
execute_timings.details.per_program_timings.insert(
Pubkey::new_unique(),
ProgramTiming {
accumulated_us: n * 100,
accumulated_units: n * 1000,
count: n as u32,
errored_txs_compute_consumed: vec![],
total_errored_units: 0,
},
);
expected_us += n * 100;
expected_units += n * 1000;
}
let (units, us) = BankingStage::accumulate_execute_units_and_time(&execute_timings);
assert_eq!(expected_units, units);
assert_eq!(expected_us, us);
}
}

View File

@ -3,6 +3,7 @@
//! how transactions are included in blocks, and optimize those blocks.
//!
use {
crate::banking_stage::BatchedTransactionCostDetails,
solana_measure::measure::Measure,
solana_runtime::{
bank::Bank,
@ -147,6 +148,36 @@ impl QosService {
select_results
}
pub fn accumulate_estimated_transaction_costs(
&self,
cost_details: &BatchedTransactionCostDetails,
) {
self.metrics
.estimated_signature_cu
.fetch_add(cost_details.batched_signature_cost, Ordering::Relaxed);
self.metrics
.estimated_write_lock_cu
.fetch_add(cost_details.batched_write_lock_cost, Ordering::Relaxed);
self.metrics
.estimated_data_bytes_cu
.fetch_add(cost_details.batched_data_bytes_cost, Ordering::Relaxed);
self.metrics
.estimated_execute_cu
.fetch_add(cost_details.batched_execute_cost, Ordering::Relaxed);
}
pub fn accumulate_actual_execute_cu(&self, units: u64) {
self.metrics
.actual_execute_cu
.fetch_add(units, Ordering::Relaxed);
}
pub fn accumulate_actual_execute_time(&self, micro_sec: u64) {
self.metrics
.actual_execute_time_us
.fetch_add(micro_sec, Ordering::Relaxed);
}
fn reporting_loop(
running_flag: Arc<AtomicBool>,
metrics: Arc<QosServiceMetrics>,
@ -169,6 +200,24 @@ struct QosServiceMetrics {
retried_txs_per_block_limit_count: AtomicU64,
retried_txs_per_vote_limit_count: AtomicU64,
retried_txs_per_account_limit_count: AtomicU64,
// accumulated estimated signature Compute Unites to be packed into block
estimated_signature_cu: AtomicU64,
// accumulated estimated write locks Compute Units to be packed into block
estimated_write_lock_cu: AtomicU64,
// accumulated estimated instructino data Compute Units to be packed into block
estimated_data_bytes_cu: AtomicU64,
// accumulated estimated program Compute Units to be packed into block
estimated_execute_cu: AtomicU64,
// accumulated actual program Compute Units that have been packed into block
actual_execute_cu: AtomicU64,
// accumulated actual program execute micro-sec that have been packed into block
actual_execute_time_us: AtomicU64,
}
impl QosServiceMetrics {
@ -214,6 +263,36 @@ impl QosServiceMetrics {
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_signature_cu",
self.estimated_signature_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_write_lock_cu",
self.estimated_write_lock_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_data_bytes_cu",
self.estimated_data_bytes_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_execute_cu",
self.estimated_execute_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"actual_execute_cu",
self.actual_execute_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"actual_execute_time_us",
self.actual_execute_time_us.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}