From 9ae13f26e5ced96df6b28af6732a82de4f3276d5 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Fri, 8 Apr 2022 14:22:31 +0200 Subject: [PATCH] Adjustments to cost_tracker updates - don't store pending tx signatures and costs in CostTracker - apply tx costs to global state immediately again - go from commit_or_cancel to update_or_remove, where the cost tracker is either updated with the true costs for successful tx, or the costs of a retryable tx is removed - move the function into qos_service and hold the cost tracker lock for the whole loop (cherry picked from commit 924b8ea1eb4a9a59de64414d24e85fe0345685fe) # Conflicts: # core/src/qos_service.rs # runtime/src/cost_tracker.rs --- core/src/banking_stage.rs | 76 ++++++++++++-------------------- core/src/qos_service.rs | 37 +++++++++++----- runtime/src/cost_tracker.rs | 86 +++++++++++++++++++++++++++---------- 3 files changed, 116 insertions(+), 83 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index c0cec357a6..0d011e5df5 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1330,31 +1330,34 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, qos_service: &Arc, ) -> ProcessTransactionBatchOutput { - let ((transactions_qos_results, cost_model_throttled_transactions_count), cost_model_time) = - Measure::this( - |_| { - let tx_costs = qos_service.compute_transaction_costs(txs.iter()); + let ( + (transactions_qos_results, cost_model_throttled_transactions_count, transaction_costs), + cost_model_time, + ) = Measure::this( + |_| { + let tx_costs = qos_service.compute_transaction_costs(txs.iter()); - let (transactions_qos_results, num_included) = - qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank); + let (transactions_qos_results, num_included) = + qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank); - let cost_model_throttled_transactions_count = - txs.len().saturating_sub(num_included); + let cost_model_throttled_transactions_count = + txs.len().saturating_sub(num_included); - qos_service.accumulate_estimated_transaction_costs( - &Self::accumulate_batched_transaction_costs( - tx_costs.iter(), - transactions_qos_results.iter(), - ), - ); - ( - transactions_qos_results, - cost_model_throttled_transactions_count, - ) - }, - (), - "cost_model", - ); + qos_service.accumulate_estimated_transaction_costs( + &Self::accumulate_batched_transaction_costs( + tx_costs.iter(), + transactions_qos_results.iter(), + ), + ); + ( + transactions_qos_results, + cost_model_throttled_transactions_count, + tx_costs, + ) + }, + (), + "cost_model", + ); // Only lock accounts for those transactions are selected for the block; // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -1386,11 +1389,10 @@ impl BankingStage { .. } = execute_and_commit_transactions_output; - Self::commit_or_cancel_transaction_cost( - txs.iter(), + QosService::update_or_remove_transaction_costs( + transaction_costs.iter(), transactions_qos_results.iter(), retryable_transaction_indexes, - qos_service, bank, ); @@ -1418,30 +1420,6 @@ impl BankingStage { } } - /// To commit transaction cost to cost_tracker if it was executed successfully; - /// Otherwise cancel it from being committed, therefore prevents cost_tracker - /// being inflated with unsuccessfully executed transactions. - fn commit_or_cancel_transaction_cost<'a>( - transactions: impl Iterator, - transaction_results: impl Iterator>, - retryable_transaction_indexes: &[usize], - qos_service: &QosService, - bank: &Arc, - ) { - transactions - .zip(transaction_results) - .enumerate() - .for_each(|(index, (tx, result))| { - if result.is_ok() && retryable_transaction_indexes.contains(&index) { - qos_service.cancel_transaction_cost(bank, tx); - } else { - // TODO the 3rd param is for transaction's actual units. Will have - // to plumb it in next; For now, it simply commit estimated units. - qos_service.commit_transaction_cost(bank, tx, None); - } - }); - } - // rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and // execution_cost from the batch of transactions selected for block. fn accumulate_batched_transaction_costs<'a>( diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index e306af4f72..26c9f61816 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -151,23 +151,38 @@ impl QosService { (select_results, num_included) } +<<<<<<< HEAD <<<<<<< HEAD ======= pub fn commit_transaction_cost( &self, +======= + /// Update the transaction cost in the cost_tracker with the real cost for + /// transactions that were executed successfully; + /// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker + /// being inflated with unsuccessfully executed transactions. + pub fn update_or_remove_transaction_costs<'a>( + transaction_costs: impl Iterator, + transaction_qos_results: impl Iterator>, + retryable_transaction_indexes: &[usize], +>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates) bank: &Arc, - transaction: &SanitizedTransaction, - actual_units: Option, ) { - bank.write_cost_tracker() - .unwrap() - .commit_transaction(transaction, actual_units); - } - - pub fn cancel_transaction_cost(&self, bank: &Arc, transaction: &SanitizedTransaction) { - bank.write_cost_tracker() - .unwrap() - .cancel_transaction(transaction); + let mut cost_tracker = bank.write_cost_tracker().unwrap(); + transaction_costs + .zip(transaction_qos_results) + .enumerate() + .for_each(|(index, (tx_cost, qos_result))| { + if qos_result.is_ok() && retryable_transaction_indexes.contains(&index) { + cost_tracker.remove(tx_cost); + } else { + // TODO: Update the cost tracker with the actual execution compute units. + // Will have to plumb it in next; For now, keep estimated costs. + // + // let actual_execution_cost = 0; + // cost_tracker.update_execution_cost(tx_cost, actual_execution_cost); + } + }); } // metrics are reported by bank slot diff --git a/runtime/src/cost_tracker.rs b/runtime/src/cost_tracker.rs index 846988867a..20f566fe63 100644 --- a/runtime/src/cost_tracker.rs +++ b/runtime/src/cost_tracker.rs @@ -5,9 +5,7 @@ //! use { crate::{block_cost_limits::*, cost_model::TransactionCost}, - solana_sdk::{ - clock::Slot, pubkey::Pubkey, signature::Signature, transaction::SanitizedTransaction, - }, + solana_sdk::{clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction}, std::collections::HashMap, }; @@ -41,6 +39,7 @@ pub struct CostTracker { /// The amount of total account data size remaining. If `Some`, then do not add transactions /// that would cause `account_data_size` to exceed this limit. account_data_size_limit: Option, +<<<<<<< HEAD // Transactions have passed would_fit check, is being executed. // If the execution is successful, it's actual Units can be committed @@ -48,6 +47,8 @@ pub struct CostTracker { // cost_tracker. pending_transactions: HashMap, >>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;) +======= +>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates) } impl Default for CostTracker { @@ -71,7 +72,6 @@ impl Default for CostTracker { transaction_count: 0, account_data_size: 0, account_data_size_limit: None, - pending_transactions: HashMap::new(), } >>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;) } @@ -125,7 +125,7 @@ impl CostTracker { >>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;) pub fn try_add( &mut self, - transaction: &SanitizedTransaction, + _transaction: &SanitizedTransaction, tx_cost: &TransactionCost, ) -> Result { <<<<<<< HEAD @@ -134,28 +134,26 @@ impl CostTracker { self.add_transaction(&tx_cost.writable_accounts, cost, transaction); ======= self.would_fit(tx_cost)?; +<<<<<<< HEAD self.pending_transactions .insert(*transaction.signature(), tx_cost.clone()); >>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;) +======= + self.add_transaction_cost(tx_cost); +>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates) Ok(self.block_cost) } - pub fn commit_transaction( + pub fn update_execution_cost( &mut self, - transaction: &SanitizedTransaction, - actual_units: Option, + _estimated_tx_cost: &TransactionCost, + _actual_execution_cost: u64, ) { - if let Some(mut tx_cost) = self.pending_transactions.remove(transaction.signature()) { - if let Some(actual_units) = actual_units { - // using actual units to update cost tracker if available - tx_cost.execution_cost = actual_units; - } - self.add_transaction(&tx_cost); - } + // adjust block_cost / vote_cost / account_cost by (actual_execution_cost - execution_cost) } - pub fn cancel_transaction(&mut self, transaction: &SanitizedTransaction) { - self.pending_transactions.remove(transaction.signature()); + pub fn remove(&mut self, tx_cost: &TransactionCost) { + self.remove_transaction_cost(tx_cost); } pub fn report_stats(&self, bank_slot: Slot) { @@ -196,12 +194,11 @@ impl CostTracker { } fn would_fit(&self, tx_cost: &TransactionCost) -> Result<(), CostTrackerError> { - let mut writable_account = vec![]; - writable_account.extend(&tx_cost.writable_accounts); - let mut cost = tx_cost.sum(); - let mut account_data_size = tx_cost.account_data_size; - let mut vote_cost = if tx_cost.is_simple_vote { cost } else { 0 }; + let writable_accounts = &tx_cost.writable_accounts; + let cost = tx_cost.sum(); + let vote_cost = if tx_cost.is_simple_vote { cost } else { 0 }; +<<<<<<< HEAD for tx_cost in self.pending_transactions.values() { writable_account.extend(&tx_cost.writable_accounts); cost = cost.saturating_add(tx_cost.sum()); @@ -228,6 +225,8 @@ impl CostTracker { vote_cost: u64, >>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;) ) -> Result<(), CostTrackerError> { +======= +>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates) // check against the total package cost if self.block_cost.saturating_add(cost) > self.block_cost_limit { return Err(CostTrackerError::WouldExceedBlockMaxLimit); @@ -243,8 +242,26 @@ impl CostTracker { return Err(CostTrackerError::WouldExceedAccountMaxLimit); } +<<<<<<< HEAD +======= + // NOTE: Check if the total accounts data size is exceeded *before* the block accounts data + // size. This way, transactions are not unnecessarily retried. + let account_data_size = self + .account_data_size + .saturating_add(tx_cost.account_data_size); + if let Some(account_data_size_limit) = self.account_data_size_limit { + if account_data_size > account_data_size_limit { + return Err(CostTrackerError::WouldExceedAccountDataTotalLimit); + } + } + + if account_data_size > MAX_ACCOUNT_DATA_BLOCK_LEN { + return Err(CostTrackerError::WouldExceedAccountDataBlockLimit); + } + +>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates) // check each account against account_cost_limit, - for account_key in keys.iter() { + for account_key in writable_accounts.iter() { match self.cost_by_writable_accounts.get(account_key) { Some(chained_cost) => { if chained_cost.saturating_add(cost) > self.account_cost_limit { @@ -260,11 +277,15 @@ impl CostTracker { Ok(()) } +<<<<<<< HEAD <<<<<<< HEAD fn add_transaction(&mut self, keys: &[Pubkey], cost: u64, transaction: &SanitizedTransaction) { for account_key in keys.iter() { ======= fn add_transaction(&mut self, tx_cost: &TransactionCost) { +======= + fn add_transaction_cost(&mut self, tx_cost: &TransactionCost) { +>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates) let cost = tx_cost.sum(); for account_key in tx_cost.writable_accounts.iter() { >>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;) @@ -286,6 +307,25 @@ impl CostTracker { >>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;) self.transaction_count = self.transaction_count.saturating_add(1); } + + fn remove_transaction_cost(&mut self, tx_cost: &TransactionCost) { + let cost = tx_cost.sum(); + for account_key in tx_cost.writable_accounts.iter() { + let account_cost = self + .cost_by_writable_accounts + .entry(*account_key) + .or_insert(0); + *account_cost = account_cost.saturating_sub(cost); + } + self.block_cost = self.block_cost.saturating_sub(cost); + if tx_cost.is_simple_vote { + self.vote_cost = self.vote_cost.saturating_sub(cost); + } + self.account_data_size = self + .account_data_size + .saturating_sub(tx_cost.account_data_size); + self.transaction_count = self.transaction_count.saturating_sub(1); + } } #[cfg(test)]