diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 951279476a..23fd7336ff 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -23,7 +23,7 @@ use { accounts_db::ErrorCounters, bank::{Bank, TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult}, bank_utils, - cost_model::CostModel, + cost_model::{CostModel, TransactionCost}, transaction_batch::TransactionBatch, vote_sender_types::ReplayVoteSender, }, @@ -260,6 +260,14 @@ impl BankingStageStats { } } +#[derive(Debug, Default)] +pub struct BatchedTransactionCostDetails { + pub batched_signature_cost: u64, + pub batched_write_lock_cost: u64, + pub batched_data_bytes_cost: u64, + pub batched_execute_cost: u64, +} + /// Stores the stage's thread handle and output receiver. pub struct BankingStage { bank_thread_hdls: Vec>, @@ -856,7 +864,7 @@ impl BankingStage { batch: &TransactionBatch, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - ) -> (Result, Vec) { + ) -> (Result, Vec, ExecuteTimings) { let mut load_execute_time = Measure::start("load_execute_time"); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce // the likelihood of any single thread getting starved and processing old ids. @@ -906,7 +914,7 @@ impl BankingStage { ); retryable_txs.extend(retryable_record_txs); if num_to_commit.is_err() { - return (num_to_commit, retryable_txs); + return (num_to_commit, retryable_txs, execute_timings); } record_time.stop(); @@ -956,7 +964,7 @@ impl BankingStage { execute_timings ); - (Ok(num_to_commit), retryable_txs) + (Ok(num_to_commit), retryable_txs, execute_timings) } pub fn process_and_record_transactions( @@ -973,6 +981,13 @@ impl BankingStage { let transactions_qos_results = qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank); + qos_service.accumulate_estimated_transaction_costs( + &Self::accumulate_batched_transaction_costs( + tx_costs.iter(), + transactions_qos_results.iter(), + ), + ); + // Only lock accounts for those transactions are selected for the block; // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state @@ -984,13 +999,14 @@ impl BankingStage { // retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit // WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit // and WouldExceedMaxAccountDataCostLimit - let (result, mut retryable_txs) = Self::process_and_record_transactions_locked( - bank, - poh, - &batch, - transaction_status_sender, - gossip_vote_sender, - ); + let (result, mut retryable_txs, execute_timings) = + Self::process_and_record_transactions_locked( + bank, + poh, + &batch, + transaction_status_sender, + gossip_vote_sender, + ); retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); let mut unlock_time = Measure::start("unlock_time"); @@ -998,6 +1014,10 @@ impl BankingStage { drop(batch); unlock_time.stop(); + let (cu, us) = Self::accumulate_execute_units_and_time(&execute_timings); + qos_service.accumulate_actual_execute_cu(cu); + qos_service.accumulate_actual_execute_time(us); + // reports qos service stats for this batch qos_service.report_metrics(bank.clone()); @@ -1012,6 +1032,49 @@ impl BankingStage { (result, retryable_txs) } + // rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and + // execution_cost from the batch of transactions selected for block. + fn accumulate_batched_transaction_costs<'a>( + transactions_costs: impl Iterator, + transaction_results: impl Iterator>, + ) -> BatchedTransactionCostDetails { + let mut cost_details = BatchedTransactionCostDetails::default(); + transactions_costs + .zip(transaction_results) + .for_each(|(cost, result)| { + if result.is_ok() { + cost_details.batched_signature_cost = cost_details + .batched_signature_cost + .saturating_add(cost.signature_cost); + cost_details.batched_write_lock_cost = cost_details + .batched_write_lock_cost + .saturating_add(cost.write_lock_cost); + cost_details.batched_data_bytes_cost = cost_details + .batched_data_bytes_cost + .saturating_add(cost.data_bytes_cost); + cost_details.batched_execute_cost = cost_details + .batched_execute_cost + .saturating_add(cost.execution_cost); + } + }); + cost_details + } + + fn accumulate_execute_units_and_time(execute_timings: &ExecuteTimings) -> (u64, u64) { + let (units, times): (Vec<_>, Vec<_>) = execute_timings + .details + .per_program_timings + .iter() + .map(|(_program_id, program_timings)| { + ( + program_timings.accumulated_units, + program_timings.accumulated_us, + ) + }) + .unzip(); + (units.iter().sum(), times.iter().sum()) + } + /// Sends transactions to the bank. /// /// Returns the number of transactions successfully processed by the bank, which may be less @@ -1492,6 +1555,7 @@ mod tests { poh_recorder::{create_test_recorder, Record, WorkingBankEntry}, poh_service::PohService, }, + solana_program_runtime::timings::ProgramTiming, solana_rpc::transaction_status_service::TransactionStatusService, solana_runtime::bank::TransactionExecutionDetails, solana_sdk::{ @@ -3372,4 +3436,74 @@ mod tests { assert_eq!(vec![0, 1, 2], tx_packet_index); } } + + #[test] + fn test_accumulate_batched_transaction_costs() { + let tx_costs = vec![ + TransactionCost { + signature_cost: 1, + write_lock_cost: 2, + data_bytes_cost: 3, + execution_cost: 10, + ..TransactionCost::default() + }, + TransactionCost { + signature_cost: 4, + write_lock_cost: 5, + data_bytes_cost: 6, + execution_cost: 20, + ..TransactionCost::default() + }, + TransactionCost { + signature_cost: 7, + write_lock_cost: 8, + data_bytes_cost: 9, + execution_cost: 40, + ..TransactionCost::default() + }, + ]; + let tx_results = vec![ + Ok(()), + Ok(()), + Err(TransactionError::WouldExceedMaxBlockCostLimit), + ]; + // should only accumulate first two cost that are OK + let expected_signatures = 5; + let expected_write_locks = 7; + let expected_data_bytes = 9; + let expected_executions = 30; + let cost_details = + BankingStage::accumulate_batched_transaction_costs(tx_costs.iter(), tx_results.iter()); + assert_eq!(expected_signatures, cost_details.batched_signature_cost); + assert_eq!(expected_write_locks, cost_details.batched_write_lock_cost); + assert_eq!(expected_data_bytes, cost_details.batched_data_bytes_cost); + assert_eq!(expected_executions, cost_details.batched_execute_cost); + } + + #[test] + fn test_accumulate_execute_units_and_time() { + let mut execute_timings = ExecuteTimings::default(); + let mut expected_units = 0; + let mut expected_us = 0; + + for n in 0..10 { + execute_timings.details.per_program_timings.insert( + Pubkey::new_unique(), + ProgramTiming { + accumulated_us: n * 100, + accumulated_units: n * 1000, + count: n as u32, + errored_txs_compute_consumed: vec![], + total_errored_units: 0, + }, + ); + expected_us += n * 100; + expected_units += n * 1000; + } + + let (units, us) = BankingStage::accumulate_execute_units_and_time(&execute_timings); + + assert_eq!(expected_units, units); + assert_eq!(expected_us, us); + } } diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index a14d0c889c..e7e002279f 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -3,6 +3,7 @@ //! how transactions are included in blocks, and optimize those blocks. //! use { + crate::banking_stage::BatchedTransactionCostDetails, crossbeam_channel::{unbounded, Receiver, Sender}, solana_measure::measure::Measure, solana_runtime::{ @@ -24,6 +25,10 @@ use { }, }; +pub enum QosMetrics { + BlockBatchUpdate { bank: Arc }, +} + // QosService is local to each banking thread, each instance of QosService provides services to // one banking thread. // It hosts a private thread for async metrics reporting, tagged with banking thredas ID. Banking @@ -39,7 +44,7 @@ pub struct QosService { cost_model: Arc>, // QosService hosts metrics object and a private reporting thread, as well as sender to // communicate with thread. - report_sender: Sender>, + report_sender: Sender, metrics: Arc, // metrics reporting runs on a private thread reporting_thread: Option>, @@ -163,7 +168,7 @@ impl QosService { // metrics are reported by bank slot pub fn report_metrics(&self, bank: Arc) { self.report_sender - .send(bank) + .send(QosMetrics::BlockBatchUpdate { bank }) .unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err)); } @@ -198,14 +203,48 @@ impl QosService { .fetch_add(count, Ordering::Relaxed); } + pub fn accumulate_estimated_transaction_costs( + &self, + cost_details: &BatchedTransactionCostDetails, + ) { + self.metrics + .estimated_signature_cu + .fetch_add(cost_details.batched_signature_cost, Ordering::Relaxed); + self.metrics + .estimated_write_lock_cu + .fetch_add(cost_details.batched_write_lock_cost, Ordering::Relaxed); + self.metrics + .estimated_data_bytes_cu + .fetch_add(cost_details.batched_data_bytes_cost, Ordering::Relaxed); + self.metrics + .estimated_execute_cu + .fetch_add(cost_details.batched_execute_cost, Ordering::Relaxed); + } + + pub fn accumulate_actual_execute_cu(&self, units: u64) { + self.metrics + .actual_execute_cu + .fetch_add(units, Ordering::Relaxed); + } + + pub fn accumulate_actual_execute_time(&self, micro_sec: u64) { + self.metrics + .actual_execute_time_us + .fetch_add(micro_sec, Ordering::Relaxed); + } + fn reporting_loop( running_flag: Arc, metrics: Arc, - report_receiver: Receiver>, + report_receiver: Receiver, ) { while running_flag.load(Ordering::Relaxed) { - for bank in report_receiver.try_iter() { - metrics.report(bank.slot()); + for qos_metrics in report_receiver.try_iter() { + match qos_metrics { + QosMetrics::BlockBatchUpdate { bank } => { + metrics.report(bank.slot()); + } + } } thread::sleep(Duration::from_millis(100)); } @@ -267,6 +306,24 @@ struct QosServiceMetrics { // number of transactions to be queued for retry due to its account data limits retried_txs_per_account_data_limit_count: AtomicU64, + + // accumulated estimated signature Compute Unites to be packed into block + estimated_signature_cu: AtomicU64, + + // accumulated estimated write locks Compute Units to be packed into block + estimated_write_lock_cu: AtomicU64, + + // accumulated estimated instructino data Compute Units to be packed into block + estimated_data_bytes_cu: AtomicU64, + + // accumulated estimated program Compute Units to be packed into block + estimated_execute_cu: AtomicU64, + + // accumulated actual program Compute Units that have been packed into block + actual_execute_cu: AtomicU64, + + // accumulated actual program execute micro-sec that have been packed into block + actual_execute_time_us: AtomicU64, } impl QosServiceMetrics { @@ -352,6 +409,36 @@ impl QosServiceMetrics { .swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "estimated_signature_cu", + self.estimated_signature_cu.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "estimated_write_lock_cu", + self.estimated_write_lock_cu.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "estimated_data_bytes_cu", + self.estimated_data_bytes_cu.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "estimated_execute_cu", + self.estimated_execute_cu.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "actual_execute_cu", + self.actual_execute_cu.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "actual_execute_time_us", + self.actual_execute_time_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), ); self.slot.store(bank_slot, Ordering::Relaxed); }