From 0ca255220ed0ae167f9b6a97c67c10162ad3e45f Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Thu, 18 Nov 2021 15:35:30 -0600 Subject: [PATCH] - Encapsulate QoS Service metrics reporting within QosServioce, so client (#21191) code (eg banking_stage) doesn't need to worry about it. - Remove dead cost_* stats from banking_stage, clean up call path. --- core/benches/banking_stage.rs | 3 +- core/src/banking_stage.rs | 74 +++++------ core/src/qos_service.rs | 223 +++++++++++++++++++++++++--------- 3 files changed, 197 insertions(+), 103 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 9375807025..14959b9844 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -8,6 +8,7 @@ use log::*; use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana_core::banking_stage::{BankingStage, BankingStageStats}; +use solana_core::qos_service::QosService; use solana_entry::entry::{next_hash, Entry}; use solana_gossip::cluster_info::ClusterInfo; use solana_gossip::cluster_info::Node; @@ -93,7 +94,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(RwLock::new(CostModel::default())), + &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ); }); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0baf490472..200866d664 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,10 +1,7 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use crate::{ - packet_hasher::PacketHasher, - qos_service::{QosService, QosServiceStats}, -}; +use crate::{packet_hasher::PacketHasher, qos_service::QosService}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use lru::LruCache; @@ -295,6 +292,7 @@ impl BankingStage { PacketHasher::default(), ))); let data_budget = Arc::new(DataBudget::default()); + let qos_service = Arc::new(QosService::new(cost_model)); // Many banks that process transactions in parallel. assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING); let bank_thread_hdls: Vec> = (0..num_threads) @@ -319,7 +317,7 @@ impl BankingStage { let gossip_vote_sender = gossip_vote_sender.clone(); let duplicates = duplicates.clone(); let data_budget = data_budget.clone(); - let cost_model = cost_model.clone(); + let qos_service = qos_service.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -335,7 +333,7 @@ impl BankingStage { gossip_vote_sender, &duplicates, &data_budget, - cost_model, + qos_service, ); }) .unwrap() @@ -409,7 +407,7 @@ impl BankingStage { test_fn: Option, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, - cost_model: &Arc>, + qos_service: &Arc, ) { let mut rebuffered_packets_len = 0; let mut new_tx_count = 0; @@ -450,7 +448,7 @@ impl BankingStage { transaction_status_sender.clone(), gossip_vote_sender, banking_stage_stats, - cost_model, + qos_service, ); if processed < verified_txs_len || !Bank::should_bank_still_be_processing_txs( @@ -556,7 +554,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, data_budget: &DataBudget, - cost_model: &Arc>, + qos_service: &Arc, ) -> BufferedPacketsDecision { let bank_start; let ( @@ -597,7 +595,7 @@ impl BankingStage { None::>, banking_stage_stats, recorder, - cost_model, + qos_service, ); } BufferedPacketsDecision::Forward => { @@ -676,7 +674,7 @@ impl BankingStage { gossip_vote_sender: ReplayVoteSender, duplicates: &Arc, PacketHasher)>>, data_budget: &DataBudget, - cost_model: Arc>, + qos_service: Arc, ) { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -697,7 +695,7 @@ impl BankingStage { &banking_stage_stats, &recorder, data_budget, - &cost_model, + &qos_service, ); if matches!(decision, BufferedPacketsDecision::Hold) || matches!(decision, BufferedPacketsDecision::ForwardAndHold) @@ -732,7 +730,7 @@ impl BankingStage { &banking_stage_stats, duplicates, &recorder, - &cost_model, + &qos_service, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, @@ -930,22 +928,13 @@ impl BankingStage { chunk_offset: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - cost_model: &Arc>, + qos_service: &Arc, ) -> (Result, Vec) { - let mut qos_service_stats = QosServiceStats::default(); - let qos_service = QosService::new(cost_model.clone()); - let tx_costs = qos_service.compute_transaction_costs( - txs.iter(), - bank.demote_program_write_locks(), - &mut qos_service_stats, - ); + let tx_costs = + qos_service.compute_transaction_costs(txs.iter(), bank.demote_program_write_locks()); - let transactions_qos_results = qos_service.select_transactions_per_cost( - txs.iter(), - tx_costs.iter(), - bank, - &mut qos_service_stats, - ); + let transactions_qos_results = + qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank); // Only lock accounts for those transactions are selected for the block; // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -978,8 +967,6 @@ impl BankingStage { txs.len(), ); - qos_service_stats.report(); - (result, retryable_txs) } @@ -994,7 +981,7 @@ impl BankingStage { poh: &TransactionRecorder, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - cost_model: &Arc>, + qos_service: &Arc, ) -> (usize, Vec) { let mut chunk_start = 0; let mut unprocessed_txs = vec![]; @@ -1011,7 +998,7 @@ impl BankingStage { chunk_start, transaction_status_sender.clone(), gossip_vote_sender, - cost_model, + qos_service, ); trace!("process_transactions result: {:?}", result); @@ -1163,7 +1150,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, - cost_model: &Arc>, + qos_service: &Arc, ) -> (usize, usize, Vec) { let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( @@ -1185,7 +1172,7 @@ impl BankingStage { poh, transaction_status_sender, gossip_vote_sender, - cost_model, + qos_service, ); process_tx_time.stop(); let unprocessed_tx_count = unprocessed_tx_indexes.len(); @@ -1303,7 +1290,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, - cost_model: &Arc>, + qos_service: &Arc, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?; @@ -1361,7 +1348,7 @@ impl BankingStage { transaction_status_sender.clone(), gossip_vote_sender, banking_stage_stats, - cost_model, + qos_service, ); new_tx_count += processed; @@ -1574,7 +1561,6 @@ mod tests { poh_service::PohService, }; use solana_rpc::transaction_status_service::TransactionStatusService; - use solana_runtime::cost_model::CostModel; use solana_sdk::{ hash::Hash, instruction::InstructionError, @@ -2239,7 +2225,7 @@ mod tests { 0, None, &gossip_vote_sender, - &Arc::new(RwLock::new(CostModel::default())), + &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ) .0 .unwrap(); @@ -2281,7 +2267,7 @@ mod tests { 0, None, &gossip_vote_sender, - &Arc::new(RwLock::new(CostModel::default())), + &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ) .0, Err(PohRecorderError::MaxHeightReached) @@ -2369,7 +2355,7 @@ mod tests { 0, None, &gossip_vote_sender, - &Arc::new(RwLock::new(CostModel::default())), + &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ); poh_recorder @@ -2478,7 +2464,7 @@ mod tests { &recorder, None, &gossip_vote_sender, - &Arc::new(RwLock::new(CostModel::default())), + &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ); assert_eq!(processed_transactions_count, 0,); @@ -2571,7 +2557,7 @@ mod tests { enable_cpi_and_log_storage: false, }), &gossip_vote_sender, - &Arc::new(RwLock::new(CostModel::default())), + &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ); transaction_status_service.join().unwrap(); @@ -2699,7 +2685,7 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(RwLock::new(CostModel::default())), + &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ); assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions); // When the poh recorder has a bank, should process all non conflicting buffered packets. @@ -2716,7 +2702,7 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(RwLock::new(CostModel::default())), + &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ); if num_expected_unprocessed == 0 { assert!(buffered_packets.is_empty()) @@ -2782,7 +2768,7 @@ mod tests { test_fn, &BankingStageStats::default(), &recorder, - &Arc::new(RwLock::new(CostModel::default())), + &Arc::new(QosService::new(Arc::new(RwLock::new(CostModel::default())))), ); // Check everything is correct. All indexes after `interrupted_iteration` diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index b5f88db38f..52f6f75d98 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -9,58 +9,69 @@ use { cost_model::{CostModel, TransactionCost}, cost_tracker::CostTrackerError, }, - solana_sdk::transaction::{self, SanitizedTransaction, TransactionError}, - std::sync::{Arc, RwLock}, + solana_sdk::{ + timing::AtomicInterval, + transaction::{self, SanitizedTransaction, TransactionError}, + }, + std::{ + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, }; -#[derive(Default)] -pub struct QosServiceStats { - compute_cost_time: u64, - cost_tracking_time: u64, - selected_txs_count: u64, - retried_txs_per_block_limit_count: u64, - retried_txs_per_account_limit_count: u64, -} - -impl QosServiceStats { - pub fn report(&mut self) { - datapoint_info!( - "qos-service-stats", - ("compute_cost_time", self.compute_cost_time, i64), - ("cost_tracking_time", self.cost_tracking_time, i64), - ("selected_txs_count", self.selected_txs_count, i64), - ( - "retried_txs_per_block_limit_count", - self.retried_txs_per_block_limit_count, - i64 - ), - ( - "retried_txs_per_account_limit_count", - self.retried_txs_per_account_limit_count, - i64 - ), - ); - } -} - pub struct QosService { cost_model: Arc>, + metrics: Arc, + reporting_thread: Option>, + running_flag: Arc, +} + +impl Drop for QosService { + fn drop(&mut self) { + self.running_flag.store(false, Ordering::Relaxed); + self.reporting_thread + .take() + .unwrap() + .join() + .expect("qos service metrics reporting thread failed to join"); + } } impl QosService { pub fn new(cost_model: Arc>) -> Self { - Self { cost_model } + let running_flag = Arc::new(AtomicBool::new(true)); + let metrics = Arc::new(QosServiceMetrics::default()); + + let running_flag_clone = running_flag.clone(); + let metrics_clone = metrics.clone(); + let reporting_thread = Some( + Builder::new() + .name("solana-qos-service-metrics-repoting".to_string()) + .spawn(move || { + Self::reporting_loop(running_flag_clone, metrics_clone); + }) + .unwrap(), + ); + Self { + cost_model, + metrics, + reporting_thread, + running_flag, + } } pub fn compute_transaction_costs<'a>( &self, transactions: impl Iterator, demote_program_write_locks: bool, - stats: &mut QosServiceStats, ) -> Vec { let mut compute_cost_time = Measure::start("compute_cost_time"); let cost_model = self.cost_model.read().unwrap(); - let txs_costs = transactions + let txs_costs: Vec<_> = transactions .map(|tx| { let cost = cost_model.calculate_cost(tx, demote_program_write_locks); debug!( @@ -73,7 +84,12 @@ impl QosService { }) .collect(); compute_cost_time.stop(); - stats.compute_cost_time += compute_cost_time.as_us(); + self.metrics + .compute_cost_time + .fetch_add(compute_cost_time.as_us(), Ordering::Relaxed); + self.metrics + .compute_cost_count + .fetch_add(txs_costs.len() as u64, Ordering::Relaxed); txs_costs } @@ -84,7 +100,6 @@ impl QosService { transactions: impl Iterator, transactions_costs: impl Iterator, bank: &Arc, - stats: &mut QosServiceStats, ) -> Vec> { let mut cost_tracking_time = Measure::start("cost_tracking_time"); let mut cost_tracker = bank.write_cost_tracker().unwrap(); @@ -93,18 +108,18 @@ impl QosService { .map(|(tx, cost)| match cost_tracker.try_add(tx, cost) { Ok(current_block_cost) => { debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost); - stats.selected_txs_count += 1; + self.metrics.selected_txs_count.fetch_add(1, Ordering::Relaxed); Ok(()) }, Err(e) => { debug!("slot {:?}, transaction {:?}, cost {:?}, not fit into current block, '{:?}'", bank.slot(), tx, cost, e); match e { CostTrackerError::WouldExceedBlockMaxLimit => { - stats.retried_txs_per_block_limit_count += 1; + self.metrics.retried_txs_per_block_limit_count.fetch_add(1, Ordering::Relaxed); Err(TransactionError::WouldExceedMaxBlockCostLimit) } CostTrackerError::WouldExceedAccountMaxLimit => { - stats.retried_txs_per_account_limit_count += 1; + self.metrics.retried_txs_per_account_limit_count.fetch_add(1, Ordering::Relaxed); Err(TransactionError::WouldExceedMaxAccountCostLimit) } } @@ -112,9 +127,72 @@ impl QosService { }) .collect(); cost_tracking_time.stop(); - stats.cost_tracking_time += cost_tracking_time.as_us(); + self.metrics + .cost_tracking_time + .fetch_add(cost_tracking_time.as_us(), Ordering::Relaxed); select_results } + + fn reporting_loop(running_flag: Arc, metrics: Arc) { + while running_flag.load(Ordering::Relaxed) { + // hardcode to report every 1000ms + metrics.report(1000u64); + thread::sleep(Duration::from_millis(100)); + } + } +} + +#[derive(Default)] +struct QosServiceMetrics { + last_report: AtomicInterval, + compute_cost_time: AtomicU64, + compute_cost_count: AtomicU64, + cost_tracking_time: AtomicU64, + selected_txs_count: AtomicU64, + retried_txs_per_block_limit_count: AtomicU64, + retried_txs_per_account_limit_count: AtomicU64, +} + +impl QosServiceMetrics { + pub fn report(&self, report_interval_ms: u64) { + if self.last_report.should_update(report_interval_ms) { + datapoint_info!( + "qos-service-stats", + ( + "compute_cost_time", + self.compute_cost_time.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "compute_cost_count", + self.compute_cost_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "cost_tracking_time", + self.cost_tracking_time.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "selected_txs_count", + self.selected_txs_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "retried_txs_per_block_limit_count", + self.retried_txs_per_block_limit_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "retried_txs_per_account_limit_count", + self.retried_txs_per_account_limit_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ); + } + } } #[cfg(test)] @@ -135,7 +213,7 @@ mod tests { }; #[test] - fn test_compute_transactions_costs() { + fn test_compute_transaction_costs() { solana_logger::setup(); // make a vec of txs @@ -158,11 +236,7 @@ mod tests { let cost_model = Arc::new(RwLock::new(CostModel::default())); let qos_service = QosService::new(cost_model.clone()); - let txs_costs = qos_service.compute_transaction_costs( - txs.iter(), - false, - &mut QosServiceStats::default(), - ); + let txs_costs = qos_service.compute_transaction_costs(txs.iter(), false); // verify the size of txs_costs and its contents assert_eq!(txs_costs.len(), txs.len()); @@ -214,23 +288,14 @@ mod tests { let txs = vec![transfer_tx.clone(), vote_tx.clone(), transfer_tx, vote_tx]; let qos_service = QosService::new(cost_model); - let txs_costs = qos_service.compute_transaction_costs( - txs.iter(), - false, - &mut QosServiceStats::default(), - ); + let txs_costs = qos_service.compute_transaction_costs(txs.iter(), false); // set cost tracker limit to fit 1 transfer tx, vote tx bypasses limit check let cost_limit = transfer_tx_cost; bank.write_cost_tracker() .unwrap() .set_limits(cost_limit, cost_limit); - let results = qos_service.select_transactions_per_cost( - txs.iter(), - txs_costs.iter(), - &bank, - &mut QosServiceStats::default(), - ); + let results = qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank); // verify that first transfer tx and all votes are allowed assert_eq!(results.len(), txs.len()); @@ -239,4 +304,46 @@ mod tests { assert!(results[2].is_err()); assert!(results[3].is_ok()); } + + #[test] + fn test_async_report_metrics() { + solana_logger::setup(); + + // make a vec of txs + let txs_count = 2048usize; + let keypair = Keypair::new(); + let transfer_tx = SanitizedTransaction::from_transaction_for_tests( + system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()), + ); + let mut txs_1 = Vec::with_capacity(txs_count); + let mut txs_2 = Vec::with_capacity(txs_count); + for _i in 0..txs_count { + txs_1.push(transfer_tx.clone()); + txs_2.push(transfer_tx.clone()); + } + + let cost_model = Arc::new(RwLock::new(CostModel::default())); + let qos_service = Arc::new(QosService::new(cost_model)); + let qos_service_1 = qos_service.clone(); + let qos_service_2 = qos_service.clone(); + + let th_1 = thread::spawn(move || { + qos_service_1.compute_transaction_costs(txs_1.iter(), false); + }); + + let th_2 = thread::spawn(move || { + qos_service_2.compute_transaction_costs(txs_2.iter(), false); + }); + + th_1.join().expect("qos service 1 faield to join"); + th_2.join().expect("qos service 2 faield to join"); + + assert_eq!( + txs_count as u64 * 2, + qos_service + .metrics + .compute_cost_count + .load(Ordering::Relaxed) + ); + } }