- Only commit successfully executed transactions' cost to cost_tracker;

- In-fly transactions are pended in cost_tracker until being committed
  or cancelled;

(cherry picked from commit 9e07272af8)

# Conflicts:
#	core/src/qos_service.rs
#	runtime/src/cost_model.rs
#	runtime/src/cost_tracker.rs
This commit is contained in:
Tao Zhu
2022-04-07 15:42:39 -05:00
committed by Tao Zhu
parent a225421737
commit 6cb6b9206f
6 changed files with 408 additions and 37 deletions

View File

@ -1360,8 +1360,7 @@ impl BankingStage {
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let mut lock_time = Measure::start("lock_time");
let batch =
bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.into_iter());
let batch = bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.iter());
lock_time.stop();
// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
@ -1376,21 +1375,29 @@ impl BankingStage {
gossip_vote_sender,
);
let mut unlock_time = Measure::start("unlock_time");
// Once the accounts are new transactions can enter the pipeline to process them
drop(batch);
unlock_time.stop();
let ExecuteAndCommitTransactionsOutput {
ref mut retryable_transaction_indexes,
ref execute_and_commit_timings,
..
} = execute_and_commit_transactions_output;
Self::commit_or_cancel_transaction_cost(
txs.iter(),
transactions_qos_results.iter(),
retryable_transaction_indexes,
qos_service,
bank,
);
retryable_transaction_indexes
.iter_mut()
.for_each(|x| *x += chunk_offset);
let mut unlock_time = Measure::start("unlock_time");
// Once the accounts are new transactions can enter the pipeline to process them
drop(batch);
unlock_time.stop();
let (cu, us) =
Self::accumulate_execute_units_and_time(&execute_and_commit_timings.execute_timings);
qos_service.accumulate_actual_execute_cu(cu);
@ -1411,6 +1418,30 @@ impl BankingStage {
}
}
/// To commit transaction cost to cost_tracker if it was executed successfully;
/// Otherwise cancel it from being committed, therefore prevents cost_tracker
/// being inflated with unsuccessfully executed transactions.
fn commit_or_cancel_transaction_cost<'a>(
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
transaction_results: impl Iterator<Item = &'a transaction::Result<()>>,
retryable_transaction_indexes: &[usize],
qos_service: &QosService,
bank: &Arc<Bank>,
) {
transactions
.zip(transaction_results)
.enumerate()
.for_each(|(index, (tx, result))| {
if result.is_ok() && retryable_transaction_indexes.contains(&index) {
qos_service.cancel_transaction_cost(bank, tx);
} else {
// TODO the 3rd param is for transaction's actual units. Will have
// to plumb it in next; For now, it simply commit estimated units.
qos_service.commit_transaction_cost(bank, tx, None);
}
});
}
// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
// execution_cost from the batch of transactions selected for block.
fn accumulate_batched_transaction_costs<'a>(

View File

@ -151,6 +151,33 @@ impl QosService {
(select_results, num_included)
}
<<<<<<< HEAD
=======
pub fn commit_transaction_cost(
&self,
bank: &Arc<Bank>,
transaction: &SanitizedTransaction,
actual_units: Option<u64>,
) {
bank.write_cost_tracker()
.unwrap()
.commit_transaction(transaction, actual_units);
}
pub fn cancel_transaction_cost(&self, bank: &Arc<Bank>, transaction: &SanitizedTransaction) {
bank.write_cost_tracker()
.unwrap()
.cancel_transaction(transaction);
}
// metrics are reported by bank slot
pub fn report_metrics(&self, bank: Arc<Bank>) {
self.report_sender
.send(QosMetrics::BlockBatchUpdate { bank })
.unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err));
}
>>>>>>> 9e07272af (- Only commit successfully executed transactions' cost to cost_tracker;)
pub fn accumulate_estimated_transaction_costs(
&self,
cost_details: &BatchedTransactionCostDetails,