Banking Stage drops transactions that'll exceed the total account data size limit (#23537)

This commit is contained in:
Brooks Prumo
2022-03-13 10:58:57 -05:00
committed by GitHub
parent bf57252298
commit 7758c32035
9 changed files with 418 additions and 149 deletions

View File

@@ -3,7 +3,7 @@
//! how transactions are included in blocks, and optimize those blocks.
//!
use {
crate::banking_stage::BatchedTransactionCostDetails,
crate::banking_stage::BatchedTransactionDetails,
crossbeam_channel::{unbounded, Receiver, Sender},
solana_measure::measure::Measure,
solana_runtime::{
@@ -68,8 +68,8 @@ impl QosService {
let running_flag = Arc::new(AtomicBool::new(true));
let metrics = Arc::new(QosServiceMetrics::new(id));
let running_flag_clone = running_flag.clone();
let metrics_clone = metrics.clone();
let running_flag_clone = Arc::clone(&running_flag);
let metrics_clone = Arc::clone(&metrics);
let reporting_thread = Some(
Builder::new()
.name("solana-qos-service-metrics-repoting".to_string())
@@ -109,9 +109,11 @@ impl QosService {
.collect();
compute_cost_time.stop();
self.metrics
.stats
.compute_cost_time
.fetch_add(compute_cost_time.as_us(), Ordering::Relaxed);
self.metrics
.stats
.compute_cost_count
.fetch_add(txs_costs.len() as u64, Ordering::Relaxed);
txs_costs
@@ -134,7 +136,7 @@ impl QosService {
.map(|(tx, cost)| match cost_tracker.try_add(tx, cost) {
Ok(current_block_cost) => {
debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost);
self.metrics.selected_txs_count.fetch_add(1, Ordering::Relaxed);
self.metrics.stats.selected_txs_count.fetch_add(1, Ordering::Relaxed);
num_included += 1;
Ok(())
},
@@ -142,20 +144,19 @@ impl QosService {
debug!("slot {:?}, transaction {:?}, cost {:?}, not fit into current block, '{:?}'", bank.slot(), tx, cost, e);
match e {
CostTrackerError::WouldExceedBlockMaxLimit => {
self.metrics.retried_txs_per_block_limit_count.fetch_add(1, Ordering::Relaxed);
Err(TransactionError::WouldExceedMaxBlockCostLimit)
}
CostTrackerError::WouldExceedVoteMaxLimit => {
self.metrics.retried_txs_per_vote_limit_count.fetch_add(1, Ordering::Relaxed);
Err(TransactionError::WouldExceedMaxVoteCostLimit)
}
CostTrackerError::WouldExceedAccountMaxLimit => {
self.metrics.retried_txs_per_account_limit_count.fetch_add(1, Ordering::Relaxed);
Err(TransactionError::WouldExceedMaxAccountCostLimit)
}
CostTrackerError::WouldExceedAccountDataMaxLimit => {
self.metrics.retried_txs_per_account_data_limit_count.fetch_add(1, Ordering::Relaxed);
Err(TransactionError::WouldExceedMaxAccountDataCostLimit)
CostTrackerError::WouldExceedAccountDataBlockLimit => {
Err(TransactionError::WouldExceedAccountDataBlockLimit)
}
CostTrackerError::WouldExceedAccountDataTotalLimit => {
Err(TransactionError::WouldExceedAccountDataTotalLimit)
}
}
}
@@ -163,6 +164,7 @@ impl QosService {
.collect();
cost_tracking_time.stop();
self.metrics
.stats
.cost_tracking_time
.fetch_add(cost_tracking_time.as_us(), Ordering::Relaxed);
(select_results, num_included)
@@ -177,30 +179,82 @@ impl QosService {
pub fn accumulate_estimated_transaction_costs(
&self,
cost_details: &BatchedTransactionCostDetails,
batched_transaction_details: &BatchedTransactionDetails,
) {
self.metrics.stats.estimated_signature_cu.fetch_add(
batched_transaction_details.costs.batched_signature_cost,
Ordering::Relaxed,
);
self.metrics.stats.estimated_write_lock_cu.fetch_add(
batched_transaction_details.costs.batched_write_lock_cost,
Ordering::Relaxed,
);
self.metrics.stats.estimated_data_bytes_cu.fetch_add(
batched_transaction_details.costs.batched_data_bytes_cost,
Ordering::Relaxed,
);
self.metrics.stats.estimated_execute_cu.fetch_add(
batched_transaction_details.costs.batched_execute_cost,
Ordering::Relaxed,
);
self.metrics
.estimated_signature_cu
.fetch_add(cost_details.batched_signature_cost, Ordering::Relaxed);
.errors
.retried_txs_per_block_limit_count
.fetch_add(
batched_transaction_details
.errors
.batched_retried_txs_per_block_limit_count,
Ordering::Relaxed,
);
self.metrics
.estimated_write_lock_cu
.fetch_add(cost_details.batched_write_lock_cost, Ordering::Relaxed);
.errors
.retried_txs_per_vote_limit_count
.fetch_add(
batched_transaction_details
.errors
.batched_retried_txs_per_vote_limit_count,
Ordering::Relaxed,
);
self.metrics
.estimated_data_bytes_cu
.fetch_add(cost_details.batched_data_bytes_cost, Ordering::Relaxed);
.errors
.retried_txs_per_account_limit_count
.fetch_add(
batched_transaction_details
.errors
.batched_retried_txs_per_account_limit_count,
Ordering::Relaxed,
);
self.metrics
.estimated_execute_cu
.fetch_add(cost_details.batched_execute_cost, Ordering::Relaxed);
.errors
.retried_txs_per_account_data_block_limit_count
.fetch_add(
batched_transaction_details
.errors
.batched_retried_txs_per_account_data_block_limit_count,
Ordering::Relaxed,
);
self.metrics
.errors
.dropped_txs_per_account_data_total_limit_count
.fetch_add(
batched_transaction_details
.errors
.batched_dropped_txs_per_account_data_total_limit_count,
Ordering::Relaxed,
);
}
pub fn accumulate_actual_execute_cu(&self, units: u64) {
self.metrics
.stats
.actual_execute_cu
.fetch_add(units, Ordering::Relaxed);
}
pub fn accumulate_actual_execute_time(&self, micro_sec: u64) {
self.metrics
.stats
.actual_execute_time_us
.fetch_add(micro_sec, Ordering::Relaxed);
}
@@ -223,63 +277,77 @@ impl QosService {
}
}
#[derive(Default)]
#[derive(Debug, Default)]
struct QosServiceMetrics {
// banking_stage creates one QosService instance per working threads, that is uniquely
// identified by id. This field allows to categorize metrics for gossip votes, TPU votes
// and other transactions.
/// banking_stage creates one QosService instance per working threads, that is uniquely
/// identified by id. This field allows to categorize metrics for gossip votes, TPU votes
/// and other transactions.
id: u32,
// aggregate metrics per slot
/// aggregate metrics per slot
slot: AtomicU64,
// accumulated time in micro-sec spent in computing transaction cost. It is the main performance
// overhead introduced by cost_model
stats: QosServiceMetricsStats,
errors: QosServiceMetricsErrors,
}
#[derive(Debug, Default)]
struct QosServiceMetricsStats {
/// accumulated time in micro-sec spent in computing transaction cost. It is the main performance
/// overhead introduced by cost_model
compute_cost_time: AtomicU64,
// total nummber of transactions in the reporting period to be computed for theit cost. It is
// usually the number of sanitized transactions leader receives.
/// total nummber of transactions in the reporting period to be computed for theit cost. It is
/// usually the number of sanitized transactions leader receives.
compute_cost_count: AtomicU64,
// acumulated time in micro-sec spent in tracking each bank's cost. It is the second part of
// overhead introduced
/// acumulated time in micro-sec spent in tracking each bank's cost. It is the second part of
/// overhead introduced
cost_tracking_time: AtomicU64,
// number of transactions to be included in blocks
/// number of transactions to be included in blocks
selected_txs_count: AtomicU64,
// number of transactions to be queued for retry due to its potential to breach block limit
retried_txs_per_block_limit_count: AtomicU64,
// number of transactions to be queued for retry due to its potential to breach vote limit
retried_txs_per_vote_limit_count: AtomicU64,
// number of transactions to be queued for retry due to its potential to breach writable
// account limit
retried_txs_per_account_limit_count: AtomicU64,
// number of transactions to be queued for retry due to its account data limits
retried_txs_per_account_data_limit_count: AtomicU64,
// accumulated estimated signature Compute Unites to be packed into block
/// accumulated estimated signature Compute Unites to be packed into block
estimated_signature_cu: AtomicU64,
// accumulated estimated write locks Compute Units to be packed into block
/// accumulated estimated write locks Compute Units to be packed into block
estimated_write_lock_cu: AtomicU64,
// accumulated estimated instructino data Compute Units to be packed into block
/// accumulated estimated instructino data Compute Units to be packed into block
estimated_data_bytes_cu: AtomicU64,
// accumulated estimated program Compute Units to be packed into block
/// accumulated estimated program Compute Units to be packed into block
estimated_execute_cu: AtomicU64,
// accumulated actual program Compute Units that have been packed into block
/// accumulated actual program Compute Units that have been packed into block
actual_execute_cu: AtomicU64,
// accumulated actual program execute micro-sec that have been packed into block
/// accumulated actual program execute micro-sec that have been packed into block
actual_execute_time_us: AtomicU64,
}
#[derive(Debug, Default)]
struct QosServiceMetricsErrors {
/// number of transactions to be queued for retry due to their potential to breach block limit
retried_txs_per_block_limit_count: AtomicU64,
/// number of transactions to be queued for retry due to their potential to breach vote limit
retried_txs_per_vote_limit_count: AtomicU64,
/// number of transactions to be queued for retry due to their potential to breach writable
/// account limit
retried_txs_per_account_limit_count: AtomicU64,
/// number of transactions to be queued for retry due to their potential to breach account data
/// block limits
retried_txs_per_account_data_block_limit_count: AtomicU64,
/// number of transactions to be dropped due to their potential to breach account data total
/// limits
dropped_txs_per_account_data_total_limit_count: AtomicU64,
}
impl QosServiceMetrics {
pub fn new(id: u32) -> Self {
QosServiceMetrics {
@@ -296,76 +364,96 @@ impl QosServiceMetrics {
("bank_slot", bank_slot as i64, i64),
(
"compute_cost_time",
self.compute_cost_time.swap(0, Ordering::Relaxed) as i64,
self.stats.compute_cost_time.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"compute_cost_count",
self.compute_cost_count.swap(0, Ordering::Relaxed) as i64,
self.stats.compute_cost_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"cost_tracking_time",
self.cost_tracking_time.swap(0, Ordering::Relaxed) as i64,
self.stats.cost_tracking_time.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"selected_txs_count",
self.selected_txs_count.swap(0, Ordering::Relaxed) as i64,
self.stats.selected_txs_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_signature_cu",
self.stats.estimated_signature_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_write_lock_cu",
self.stats
.estimated_write_lock_cu
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_data_bytes_cu",
self.stats
.estimated_data_bytes_cu
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_execute_cu",
self.stats.estimated_execute_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"actual_execute_cu",
self.stats.actual_execute_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"actual_execute_time_us",
self.stats.actual_execute_time_us.swap(0, Ordering::Relaxed) as i64,
i64
),
);
datapoint_info!(
"qos-service-errors",
("id", self.id as i64, i64),
("bank_slot", bank_slot as i64, i64),
(
"retried_txs_per_block_limit_count",
self.retried_txs_per_block_limit_count
self.errors
.retried_txs_per_block_limit_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"retried_txs_per_vote_limit_count",
self.retried_txs_per_vote_limit_count
self.errors
.retried_txs_per_vote_limit_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"retried_txs_per_account_limit_count",
self.retried_txs_per_account_limit_count
self.errors
.retried_txs_per_account_limit_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"retried_txs_per_account_data_limit_count",
self.retried_txs_per_account_data_limit_count
"retried_txs_per_account_data_block_limit_count",
self.errors
.retried_txs_per_account_data_block_limit_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_signature_cu",
self.estimated_signature_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_write_lock_cu",
self.estimated_write_lock_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_data_bytes_cu",
self.estimated_data_bytes_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"estimated_execute_cu",
self.estimated_execute_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"actual_execute_cu",
self.actual_execute_cu.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"actual_execute_time_us",
self.actual_execute_time_us.swap(0, Ordering::Relaxed) as i64,
"dropped_txs_per_account_data_total_limit_count",
self.errors
.dropped_txs_per_account_data_total_limit_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
);