Leader QoS service metrics (#21708)
* - qos_service metrics tagged with leader thread ids to separate gossip/tpu votes and transactions; - qos_service metrics is reported with bank slot; - replaced timer-based reporting with signal via channel; removed async report test as qos_service now lives within a thread * - add tpu live packets (eg, not buffered packets) states to qos metrics reporting
This commit is contained in:
@@ -10,12 +10,13 @@ use {
|
||||
cost_tracker::CostTrackerError,
|
||||
},
|
||||
solana_sdk::{
|
||||
timing::AtomicInterval,
|
||||
clock::Slot,
|
||||
transaction::{self, SanitizedTransaction, TransactionError},
|
||||
},
|
||||
std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
Arc, RwLock,
|
||||
},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
@@ -23,13 +24,24 @@ use {
|
||||
},
|
||||
};
|
||||
|
||||
// QosService is local to each banking thread, each instance of QosService provides services to
|
||||
// one banking thread.
|
||||
// It hosts a private thread for async metrics reporting, tagged with banking thredas ID. Banking
|
||||
// threda calls `report_metrics(&bank)` at end of `process_and_record_tramsaction()`, or any time
|
||||
// it wants, QosService sends `&bank` to reporting thread via channel, signalling stats to be
|
||||
// reported if new bank slot has changed.
|
||||
//
|
||||
pub struct QosService {
|
||||
// cost_model instance is owned by validator, shared between replay_stage and
|
||||
// banking_stage. replay_stage writes the latest on-chain program timings to
|
||||
// it; banking_stage's qos_service reads that information to calculate
|
||||
// transaction cost, hence RwLock wrapped.
|
||||
cost_model: Arc<RwLock<CostModel>>,
|
||||
// QosService hosts metrics object and a private reporting thread, as well as sender to
|
||||
// communicate with thread.
|
||||
report_sender: Sender<Arc<Bank>>,
|
||||
metrics: Arc<QosServiceMetrics>,
|
||||
// metrics reporting runs on a private thread
|
||||
reporting_thread: Option<JoinHandle<()>>,
|
||||
running_flag: Arc<AtomicBool>,
|
||||
}
|
||||
@@ -46,16 +58,10 @@ impl Drop for QosService {
|
||||
}
|
||||
|
||||
impl QosService {
|
||||
pub fn new(cost_model: Arc<RwLock<CostModel>>) -> Self {
|
||||
Self::new_with_reporting_duration(cost_model, 1000u64)
|
||||
}
|
||||
|
||||
pub fn new_with_reporting_duration(
|
||||
cost_model: Arc<RwLock<CostModel>>,
|
||||
reporting_duration_ms: u64,
|
||||
) -> Self {
|
||||
pub fn new(cost_model: Arc<RwLock<CostModel>>, id: u32) -> Self {
|
||||
let (report_sender, report_receiver) = channel();
|
||||
let running_flag = Arc::new(AtomicBool::new(true));
|
||||
let metrics = Arc::new(QosServiceMetrics::default());
|
||||
let metrics = Arc::new(QosServiceMetrics::new(id));
|
||||
|
||||
let running_flag_clone = running_flag.clone();
|
||||
let metrics_clone = metrics.clone();
|
||||
@@ -63,18 +69,21 @@ impl QosService {
|
||||
Builder::new()
|
||||
.name("solana-qos-service-metrics-repoting".to_string())
|
||||
.spawn(move || {
|
||||
Self::reporting_loop(running_flag_clone, metrics_clone, reporting_duration_ms);
|
||||
Self::reporting_loop(running_flag_clone, metrics_clone, report_receiver);
|
||||
})
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
Self {
|
||||
cost_model,
|
||||
metrics,
|
||||
reporting_thread,
|
||||
running_flag,
|
||||
report_sender,
|
||||
}
|
||||
}
|
||||
|
||||
// invoke cost_model to calculate cost for the given list of transactions
|
||||
pub fn compute_transaction_costs<'a>(
|
||||
&self,
|
||||
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
|
||||
@@ -147,13 +156,53 @@ impl QosService {
|
||||
select_results
|
||||
}
|
||||
|
||||
// metrics are reported by bank slot
|
||||
pub fn report_metrics(&self, bank: Arc<Bank>) {
|
||||
self.report_sender
|
||||
.send(bank)
|
||||
.unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err));
|
||||
}
|
||||
|
||||
// metrics accumulating apis
|
||||
pub fn accumulate_tpu_ingested_packets_count(&self, count: u64) {
|
||||
self.metrics
|
||||
.tpu_ingested_packets_count
|
||||
.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn accumulate_tpu_buffered_packets_count(&self, count: u64) {
|
||||
self.metrics
|
||||
.tpu_buffered_packets_count
|
||||
.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn accumulated_verified_txs_count(&self, count: u64) {
|
||||
self.metrics
|
||||
.verified_txs_count
|
||||
.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn accumulated_processed_txs_count(&self, count: u64) {
|
||||
self.metrics
|
||||
.processed_txs_count
|
||||
.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn accumulated_retryable_txs_count(&self, count: u64) {
|
||||
self.metrics
|
||||
.retryable_txs_count
|
||||
.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn reporting_loop(
|
||||
running_flag: Arc<AtomicBool>,
|
||||
metrics: Arc<QosServiceMetrics>,
|
||||
reporting_duration_ms: u64,
|
||||
report_receiver: Receiver<Arc<Bank>>,
|
||||
) {
|
||||
while running_flag.load(Ordering::Relaxed) {
|
||||
metrics.report(reporting_duration_ms);
|
||||
for bank in report_receiver.try_iter() {
|
||||
metrics.report(bank.slot());
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
@@ -161,21 +210,97 @@ impl QosService {
|
||||
|
||||
#[derive(Default)]
|
||||
struct QosServiceMetrics {
|
||||
last_report: AtomicInterval,
|
||||
// banking_stage creates one QosService instance per working threads, that is uniquely
|
||||
// identified by id. This field allows to categorize metrics for gossip votes, TPU votes
|
||||
// and other transactions.
|
||||
id: u32,
|
||||
|
||||
// aggregate metrics per slot
|
||||
slot: AtomicU64,
|
||||
|
||||
// accumulated number of live packets TPU received from verified receiver for processing.
|
||||
tpu_ingested_packets_count: AtomicU64,
|
||||
|
||||
// accumulated number of live packets TPU put into buffer due to no active bank.
|
||||
tpu_buffered_packets_count: AtomicU64,
|
||||
|
||||
// accumulated number of verified txs, which excludes unsanitized transactions and
|
||||
// non-vote transactions when in vote-only mode from ingested packets
|
||||
verified_txs_count: AtomicU64,
|
||||
|
||||
// accumulated number of transactions been processed, includes those landed and those to be
|
||||
// returned (due to AccountInUse, and other QoS related reasons)
|
||||
processed_txs_count: AtomicU64,
|
||||
|
||||
// accumulated number of transactions buffered for retry, often due to AccountInUse and QoS
|
||||
// reasons, includes retried_txs_per_block_limit_count and retried_txs_per_account_limit_count
|
||||
retryable_txs_count: AtomicU64,
|
||||
|
||||
// accumulated time in micro-sec spent in computing transaction cost. It is the main performance
|
||||
// overhead introduced by cost_model
|
||||
compute_cost_time: AtomicU64,
|
||||
|
||||
// total nummber of transactions in the reporting period to be computed for theit cost. It is
|
||||
// usually the number of sanitized transactions leader receives.
|
||||
compute_cost_count: AtomicU64,
|
||||
|
||||
// acumulated time in micro-sec spent in tracking each bank's cost. It is the second part of
|
||||
// overhead introduced
|
||||
cost_tracking_time: AtomicU64,
|
||||
|
||||
// number of transactions to be included in blocks
|
||||
selected_txs_count: AtomicU64,
|
||||
|
||||
// number of transactions to be queued for retry due to its potential to breach block limit
|
||||
retried_txs_per_block_limit_count: AtomicU64,
|
||||
|
||||
// number of transactions to be queued for retry due to its potential to breach writable
|
||||
// account limit
|
||||
retried_txs_per_account_limit_count: AtomicU64,
|
||||
|
||||
// number of transactions to be queued for retry due to its account data limits
|
||||
retried_txs_per_account_data_limit_count: AtomicU64,
|
||||
}
|
||||
|
||||
impl QosServiceMetrics {
|
||||
pub fn report(&self, report_interval_ms: u64) {
|
||||
if self.last_report.should_update(report_interval_ms) {
|
||||
pub fn new(id: u32) -> Self {
|
||||
QosServiceMetrics {
|
||||
id,
|
||||
..QosServiceMetrics::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn report(&self, bank_slot: Slot) {
|
||||
if bank_slot != self.slot.load(Ordering::Relaxed) {
|
||||
datapoint_info!(
|
||||
"qos-service-stats",
|
||||
("id", self.id as i64, i64),
|
||||
("bank_slot", bank_slot as i64, i64),
|
||||
(
|
||||
"tpu_ingested_packets_count",
|
||||
self.tpu_ingested_packets_count.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"tpu_buffered_packets_count",
|
||||
self.tpu_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"verified_txs_count",
|
||||
self.verified_txs_count.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"processed_txs_count",
|
||||
self.processed_txs_count.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"retryable_txs_count",
|
||||
self.retryable_txs_count.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"compute_cost_time",
|
||||
self.compute_cost_time.swap(0, Ordering::Relaxed) as i64,
|
||||
@@ -215,6 +340,7 @@ impl QosServiceMetrics {
|
||||
i64
|
||||
),
|
||||
);
|
||||
self.slot.store(bank_slot, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -259,7 +385,7 @@ mod tests {
|
||||
let txs = vec![transfer_tx.clone(), vote_tx.clone(), vote_tx, transfer_tx];
|
||||
|
||||
let cost_model = Arc::new(RwLock::new(CostModel::default()));
|
||||
let qos_service = QosService::new(cost_model.clone());
|
||||
let qos_service = QosService::new(cost_model.clone(), 1);
|
||||
let txs_costs = qos_service.compute_transaction_costs(txs.iter());
|
||||
|
||||
// verify the size of txs_costs and its contents
|
||||
@@ -307,7 +433,7 @@ mod tests {
|
||||
// make a vec of txs
|
||||
let txs = vec![transfer_tx.clone(), vote_tx.clone(), transfer_tx, vote_tx];
|
||||
|
||||
let qos_service = QosService::new(cost_model);
|
||||
let qos_service = QosService::new(cost_model, 1);
|
||||
let txs_costs = qos_service.compute_transaction_costs(txs.iter());
|
||||
|
||||
// set cost tracker limit to fit 1 transfer tx, vote tx bypasses limit check
|
||||
@@ -324,83 +450,4 @@ mod tests {
|
||||
assert!(results[2].is_err());
|
||||
assert!(results[3].is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_async_report_metrics() {
|
||||
solana_logger::setup();
|
||||
//solana_logger::setup_with_default("solana=info");
|
||||
|
||||
// make a vec of txs
|
||||
let txs_count = 128usize;
|
||||
let keypair = Keypair::new();
|
||||
let transfer_tx = SanitizedTransaction::from_transaction_for_tests(
|
||||
system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()),
|
||||
);
|
||||
let mut txs_1 = Vec::with_capacity(txs_count);
|
||||
let mut txs_2 = Vec::with_capacity(txs_count);
|
||||
for _i in 0..txs_count {
|
||||
txs_1.push(transfer_tx.clone());
|
||||
txs_2.push(transfer_tx.clone());
|
||||
}
|
||||
|
||||
// set reporting duration to long enough so the stats wouldn't reset during testing
|
||||
let ten_min = 600_000u64;
|
||||
let cost_model = Arc::new(RwLock::new(CostModel::default()));
|
||||
let qos_service = Arc::new(QosService::new_with_reporting_duration(cost_model, ten_min));
|
||||
let qos_service_1 = qos_service.clone();
|
||||
let qos_service_2 = qos_service.clone();
|
||||
|
||||
let th_1 = Builder::new()
|
||||
.name("test-producer-1".to_string())
|
||||
.spawn(move || {
|
||||
debug!("thread 1 starts with {} txs", txs_1.len());
|
||||
let tx_costs = qos_service_1.compute_transaction_costs(txs_1.iter());
|
||||
assert_eq!(txs_count, tx_costs.len());
|
||||
debug!(
|
||||
"thread 1 done, generated {} count, see service count as {}",
|
||||
txs_count,
|
||||
qos_service_1
|
||||
.metrics
|
||||
.compute_cost_count
|
||||
.load(Ordering::Relaxed)
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let th_2 = Builder::new()
|
||||
.name("test-producer-2".to_string())
|
||||
.spawn(move || {
|
||||
debug!("thread 2 starts with {} txs", txs_2.len());
|
||||
let tx_costs = qos_service_2.compute_transaction_costs(txs_2.iter());
|
||||
assert_eq!(txs_count, tx_costs.len());
|
||||
debug!(
|
||||
"thread 2 done, generated {} count, see service count as {}",
|
||||
txs_count,
|
||||
qos_service_2
|
||||
.metrics
|
||||
.compute_cost_count
|
||||
.load(Ordering::Relaxed)
|
||||
);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
th_1.join().expect("qos service 1 panicked");
|
||||
th_2.join().expect("qos service 2 panicked");
|
||||
|
||||
debug!(
|
||||
"all threads joined. count {}",
|
||||
qos_service
|
||||
.metrics
|
||||
.compute_cost_count
|
||||
.load(Ordering::Relaxed)
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
txs_count as u64 * 2,
|
||||
qos_service
|
||||
.metrics
|
||||
.compute_cost_count
|
||||
.load(Ordering::Relaxed)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user