Adjustments to cost_tracker updates
- don't store pending tx signatures and costs in CostTracker
- apply tx costs to global state immediately again
- go from commit_or_cancel to update_or_remove, where the cost tracker
is either updated with the true costs for successful tx, or the costs
of a retryable tx is removed
- move the function into qos_service and hold the cost tracker lock for
the whole loop
(cherry picked from commit 924b8ea1eb
)
# Conflicts:
# core/src/qos_service.rs
# runtime/src/cost_tracker.rs
This commit is contained in:
@ -1330,31 +1330,34 @@ impl BankingStage {
|
|||||||
gossip_vote_sender: &ReplayVoteSender,
|
gossip_vote_sender: &ReplayVoteSender,
|
||||||
qos_service: &Arc<QosService>,
|
qos_service: &Arc<QosService>,
|
||||||
) -> ProcessTransactionBatchOutput {
|
) -> ProcessTransactionBatchOutput {
|
||||||
let ((transactions_qos_results, cost_model_throttled_transactions_count), cost_model_time) =
|
let (
|
||||||
Measure::this(
|
(transactions_qos_results, cost_model_throttled_transactions_count, transaction_costs),
|
||||||
|_| {
|
cost_model_time,
|
||||||
let tx_costs = qos_service.compute_transaction_costs(txs.iter());
|
) = Measure::this(
|
||||||
|
|_| {
|
||||||
|
let tx_costs = qos_service.compute_transaction_costs(txs.iter());
|
||||||
|
|
||||||
let (transactions_qos_results, num_included) =
|
let (transactions_qos_results, num_included) =
|
||||||
qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank);
|
qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank);
|
||||||
|
|
||||||
let cost_model_throttled_transactions_count =
|
let cost_model_throttled_transactions_count =
|
||||||
txs.len().saturating_sub(num_included);
|
txs.len().saturating_sub(num_included);
|
||||||
|
|
||||||
qos_service.accumulate_estimated_transaction_costs(
|
qos_service.accumulate_estimated_transaction_costs(
|
||||||
&Self::accumulate_batched_transaction_costs(
|
&Self::accumulate_batched_transaction_costs(
|
||||||
tx_costs.iter(),
|
tx_costs.iter(),
|
||||||
transactions_qos_results.iter(),
|
transactions_qos_results.iter(),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
(
|
(
|
||||||
transactions_qos_results,
|
transactions_qos_results,
|
||||||
cost_model_throttled_transactions_count,
|
cost_model_throttled_transactions_count,
|
||||||
)
|
tx_costs,
|
||||||
},
|
)
|
||||||
(),
|
},
|
||||||
"cost_model",
|
(),
|
||||||
);
|
"cost_model",
|
||||||
|
);
|
||||||
|
|
||||||
// Only lock accounts for those transactions are selected for the block;
|
// Only lock accounts for those transactions are selected for the block;
|
||||||
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
||||||
@ -1386,11 +1389,10 @@ impl BankingStage {
|
|||||||
..
|
..
|
||||||
} = execute_and_commit_transactions_output;
|
} = execute_and_commit_transactions_output;
|
||||||
|
|
||||||
Self::commit_or_cancel_transaction_cost(
|
QosService::update_or_remove_transaction_costs(
|
||||||
txs.iter(),
|
transaction_costs.iter(),
|
||||||
transactions_qos_results.iter(),
|
transactions_qos_results.iter(),
|
||||||
retryable_transaction_indexes,
|
retryable_transaction_indexes,
|
||||||
qos_service,
|
|
||||||
bank,
|
bank,
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -1418,30 +1420,6 @@ impl BankingStage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// To commit transaction cost to cost_tracker if it was executed successfully;
|
|
||||||
/// Otherwise cancel it from being committed, therefore prevents cost_tracker
|
|
||||||
/// being inflated with unsuccessfully executed transactions.
|
|
||||||
fn commit_or_cancel_transaction_cost<'a>(
|
|
||||||
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
|
|
||||||
transaction_results: impl Iterator<Item = &'a transaction::Result<()>>,
|
|
||||||
retryable_transaction_indexes: &[usize],
|
|
||||||
qos_service: &QosService,
|
|
||||||
bank: &Arc<Bank>,
|
|
||||||
) {
|
|
||||||
transactions
|
|
||||||
.zip(transaction_results)
|
|
||||||
.enumerate()
|
|
||||||
.for_each(|(index, (tx, result))| {
|
|
||||||
if result.is_ok() && retryable_transaction_indexes.contains(&index) {
|
|
||||||
qos_service.cancel_transaction_cost(bank, tx);
|
|
||||||
} else {
|
|
||||||
// TODO the 3rd param is for transaction's actual units. Will have
|
|
||||||
// to plumb it in next; For now, it simply commit estimated units.
|
|
||||||
qos_service.commit_transaction_cost(bank, tx, None);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
|
// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
|
||||||
// execution_cost from the batch of transactions selected for block.
|
// execution_cost from the batch of transactions selected for block.
|
||||||
fn accumulate_batched_transaction_costs<'a>(
|
fn accumulate_batched_transaction_costs<'a>(
|
||||||
|
@ -151,23 +151,38 @@ impl QosService {
|
|||||||
(select_results, num_included)
|
(select_results, num_included)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
<<<<<<< HEAD
|
<<<<<<< HEAD
|
||||||
=======
|
=======
|
||||||
pub fn commit_transaction_cost(
|
pub fn commit_transaction_cost(
|
||||||
&self,
|
&self,
|
||||||
|
=======
|
||||||
|
/// Update the transaction cost in the cost_tracker with the real cost for
|
||||||
|
/// transactions that were executed successfully;
|
||||||
|
/// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker
|
||||||
|
/// being inflated with unsuccessfully executed transactions.
|
||||||
|
pub fn update_or_remove_transaction_costs<'a>(
|
||||||
|
transaction_costs: impl Iterator<Item = &'a TransactionCost>,
|
||||||
|
transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>,
|
||||||
|
retryable_transaction_indexes: &[usize],
|
||||||
|
>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates)
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
transaction: &SanitizedTransaction,
|
|
||||||
actual_units: Option<u64>,
|
|
||||||
) {
|
) {
|
||||||
bank.write_cost_tracker()
|
let mut cost_tracker = bank.write_cost_tracker().unwrap();
|
||||||
.unwrap()
|
transaction_costs
|
||||||
.commit_transaction(transaction, actual_units);
|
.zip(transaction_qos_results)
|
||||||
}
|
.enumerate()
|
||||||
|
.for_each(|(index, (tx_cost, qos_result))| {
|
||||||
pub fn cancel_transaction_cost(&self, bank: &Arc<Bank>, transaction: &SanitizedTransaction) {
|
if qos_result.is_ok() && retryable_transaction_indexes.contains(&index) {
|
||||||
bank.write_cost_tracker()
|
cost_tracker.remove(tx_cost);
|
||||||
.unwrap()
|
} else {
|
||||||
.cancel_transaction(transaction);
|
// TODO: Update the cost tracker with the actual execution compute units.
|
||||||
|
// Will have to plumb it in next; For now, keep estimated costs.
|
||||||
|
//
|
||||||
|
// let actual_execution_cost = 0;
|
||||||
|
// cost_tracker.update_execution_cost(tx_cost, actual_execution_cost);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// metrics are reported by bank slot
|
// metrics are reported by bank slot
|
||||||
|
@ -5,9 +5,7 @@
|
|||||||
//!
|
//!
|
||||||
use {
|
use {
|
||||||
crate::{block_cost_limits::*, cost_model::TransactionCost},
|
crate::{block_cost_limits::*, cost_model::TransactionCost},
|
||||||
solana_sdk::{
|
solana_sdk::{clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction},
|
||||||
clock::Slot, pubkey::Pubkey, signature::Signature, transaction::SanitizedTransaction,
|
|
||||||
},
|
|
||||||
std::collections::HashMap,
|
std::collections::HashMap,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -41,6 +39,7 @@ pub struct CostTracker {
|
|||||||
/// The amount of total account data size remaining. If `Some`, then do not add transactions
|
/// The amount of total account data size remaining. If `Some`, then do not add transactions
|
||||||
/// that would cause `account_data_size` to exceed this limit.
|
/// that would cause `account_data_size` to exceed this limit.
|
||||||
account_data_size_limit: Option<u64>,
|
account_data_size_limit: Option<u64>,
|
||||||
|
<<<<<<< HEAD
|
||||||
|
|
||||||
// Transactions have passed would_fit check, is being executed.
|
// Transactions have passed would_fit check, is being executed.
|
||||||
// If the execution is successful, it's actual Units can be committed
|
// If the execution is successful, it's actual Units can be committed
|
||||||
@ -48,6 +47,8 @@ pub struct CostTracker {
|
|||||||
// cost_tracker.
|
// cost_tracker.
|
||||||
pending_transactions: HashMap<Signature, TransactionCost>,
|
pending_transactions: HashMap<Signature, TransactionCost>,
|
||||||
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
||||||
|
=======
|
||||||
|
>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for CostTracker {
|
impl Default for CostTracker {
|
||||||
@ -71,7 +72,6 @@ impl Default for CostTracker {
|
|||||||
transaction_count: 0,
|
transaction_count: 0,
|
||||||
account_data_size: 0,
|
account_data_size: 0,
|
||||||
account_data_size_limit: None,
|
account_data_size_limit: None,
|
||||||
pending_transactions: HashMap::new(),
|
|
||||||
}
|
}
|
||||||
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
||||||
}
|
}
|
||||||
@ -125,7 +125,7 @@ impl CostTracker {
|
|||||||
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
||||||
pub fn try_add(
|
pub fn try_add(
|
||||||
&mut self,
|
&mut self,
|
||||||
transaction: &SanitizedTransaction,
|
_transaction: &SanitizedTransaction,
|
||||||
tx_cost: &TransactionCost,
|
tx_cost: &TransactionCost,
|
||||||
) -> Result<u64, CostTrackerError> {
|
) -> Result<u64, CostTrackerError> {
|
||||||
<<<<<<< HEAD
|
<<<<<<< HEAD
|
||||||
@ -134,28 +134,26 @@ impl CostTracker {
|
|||||||
self.add_transaction(&tx_cost.writable_accounts, cost, transaction);
|
self.add_transaction(&tx_cost.writable_accounts, cost, transaction);
|
||||||
=======
|
=======
|
||||||
self.would_fit(tx_cost)?;
|
self.would_fit(tx_cost)?;
|
||||||
|
<<<<<<< HEAD
|
||||||
self.pending_transactions
|
self.pending_transactions
|
||||||
.insert(*transaction.signature(), tx_cost.clone());
|
.insert(*transaction.signature(), tx_cost.clone());
|
||||||
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
||||||
|
=======
|
||||||
|
self.add_transaction_cost(tx_cost);
|
||||||
|
>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates)
|
||||||
Ok(self.block_cost)
|
Ok(self.block_cost)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn commit_transaction(
|
pub fn update_execution_cost(
|
||||||
&mut self,
|
&mut self,
|
||||||
transaction: &SanitizedTransaction,
|
_estimated_tx_cost: &TransactionCost,
|
||||||
actual_units: Option<u64>,
|
_actual_execution_cost: u64,
|
||||||
) {
|
) {
|
||||||
if let Some(mut tx_cost) = self.pending_transactions.remove(transaction.signature()) {
|
// adjust block_cost / vote_cost / account_cost by (actual_execution_cost - execution_cost)
|
||||||
if let Some(actual_units) = actual_units {
|
|
||||||
// using actual units to update cost tracker if available
|
|
||||||
tx_cost.execution_cost = actual_units;
|
|
||||||
}
|
|
||||||
self.add_transaction(&tx_cost);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cancel_transaction(&mut self, transaction: &SanitizedTransaction) {
|
pub fn remove(&mut self, tx_cost: &TransactionCost) {
|
||||||
self.pending_transactions.remove(transaction.signature());
|
self.remove_transaction_cost(tx_cost);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn report_stats(&self, bank_slot: Slot) {
|
pub fn report_stats(&self, bank_slot: Slot) {
|
||||||
@ -196,12 +194,11 @@ impl CostTracker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn would_fit(&self, tx_cost: &TransactionCost) -> Result<(), CostTrackerError> {
|
fn would_fit(&self, tx_cost: &TransactionCost) -> Result<(), CostTrackerError> {
|
||||||
let mut writable_account = vec![];
|
let writable_accounts = &tx_cost.writable_accounts;
|
||||||
writable_account.extend(&tx_cost.writable_accounts);
|
let cost = tx_cost.sum();
|
||||||
let mut cost = tx_cost.sum();
|
let vote_cost = if tx_cost.is_simple_vote { cost } else { 0 };
|
||||||
let mut account_data_size = tx_cost.account_data_size;
|
|
||||||
let mut vote_cost = if tx_cost.is_simple_vote { cost } else { 0 };
|
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
for tx_cost in self.pending_transactions.values() {
|
for tx_cost in self.pending_transactions.values() {
|
||||||
writable_account.extend(&tx_cost.writable_accounts);
|
writable_account.extend(&tx_cost.writable_accounts);
|
||||||
cost = cost.saturating_add(tx_cost.sum());
|
cost = cost.saturating_add(tx_cost.sum());
|
||||||
@ -228,6 +225,8 @@ impl CostTracker {
|
|||||||
vote_cost: u64,
|
vote_cost: u64,
|
||||||
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
||||||
) -> Result<(), CostTrackerError> {
|
) -> Result<(), CostTrackerError> {
|
||||||
|
=======
|
||||||
|
>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates)
|
||||||
// check against the total package cost
|
// check against the total package cost
|
||||||
if self.block_cost.saturating_add(cost) > self.block_cost_limit {
|
if self.block_cost.saturating_add(cost) > self.block_cost_limit {
|
||||||
return Err(CostTrackerError::WouldExceedBlockMaxLimit);
|
return Err(CostTrackerError::WouldExceedBlockMaxLimit);
|
||||||
@ -243,8 +242,26 @@ impl CostTracker {
|
|||||||
return Err(CostTrackerError::WouldExceedAccountMaxLimit);
|
return Err(CostTrackerError::WouldExceedAccountMaxLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
|
=======
|
||||||
|
// NOTE: Check if the total accounts data size is exceeded *before* the block accounts data
|
||||||
|
// size. This way, transactions are not unnecessarily retried.
|
||||||
|
let account_data_size = self
|
||||||
|
.account_data_size
|
||||||
|
.saturating_add(tx_cost.account_data_size);
|
||||||
|
if let Some(account_data_size_limit) = self.account_data_size_limit {
|
||||||
|
if account_data_size > account_data_size_limit {
|
||||||
|
return Err(CostTrackerError::WouldExceedAccountDataTotalLimit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if account_data_size > MAX_ACCOUNT_DATA_BLOCK_LEN {
|
||||||
|
return Err(CostTrackerError::WouldExceedAccountDataBlockLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates)
|
||||||
// check each account against account_cost_limit,
|
// check each account against account_cost_limit,
|
||||||
for account_key in keys.iter() {
|
for account_key in writable_accounts.iter() {
|
||||||
match self.cost_by_writable_accounts.get(account_key) {
|
match self.cost_by_writable_accounts.get(account_key) {
|
||||||
Some(chained_cost) => {
|
Some(chained_cost) => {
|
||||||
if chained_cost.saturating_add(cost) > self.account_cost_limit {
|
if chained_cost.saturating_add(cost) > self.account_cost_limit {
|
||||||
@ -260,11 +277,15 @@ impl CostTracker {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
<<<<<<< HEAD
|
<<<<<<< HEAD
|
||||||
fn add_transaction(&mut self, keys: &[Pubkey], cost: u64, transaction: &SanitizedTransaction) {
|
fn add_transaction(&mut self, keys: &[Pubkey], cost: u64, transaction: &SanitizedTransaction) {
|
||||||
for account_key in keys.iter() {
|
for account_key in keys.iter() {
|
||||||
=======
|
=======
|
||||||
fn add_transaction(&mut self, tx_cost: &TransactionCost) {
|
fn add_transaction(&mut self, tx_cost: &TransactionCost) {
|
||||||
|
=======
|
||||||
|
fn add_transaction_cost(&mut self, tx_cost: &TransactionCost) {
|
||||||
|
>>>>>>> 924b8ea1e (Adjustments to cost_tracker updates)
|
||||||
let cost = tx_cost.sum();
|
let cost = tx_cost.sum();
|
||||||
for account_key in tx_cost.writable_accounts.iter() {
|
for account_key in tx_cost.writable_accounts.iter() {
|
||||||
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
||||||
@ -286,6 +307,25 @@ impl CostTracker {
|
|||||||
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
|
||||||
self.transaction_count = self.transaction_count.saturating_add(1);
|
self.transaction_count = self.transaction_count.saturating_add(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn remove_transaction_cost(&mut self, tx_cost: &TransactionCost) {
|
||||||
|
let cost = tx_cost.sum();
|
||||||
|
for account_key in tx_cost.writable_accounts.iter() {
|
||||||
|
let account_cost = self
|
||||||
|
.cost_by_writable_accounts
|
||||||
|
.entry(*account_key)
|
||||||
|
.or_insert(0);
|
||||||
|
*account_cost = account_cost.saturating_sub(cost);
|
||||||
|
}
|
||||||
|
self.block_cost = self.block_cost.saturating_sub(cost);
|
||||||
|
if tx_cost.is_simple_vote {
|
||||||
|
self.vote_cost = self.vote_cost.saturating_sub(cost);
|
||||||
|
}
|
||||||
|
self.account_data_size = self
|
||||||
|
.account_data_size
|
||||||
|
.saturating_sub(tx_cost.account_data_size);
|
||||||
|
self.transaction_count = self.transaction_count.saturating_sub(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
Reference in New Issue
Block a user