From 2c93062f5499a4d54a1154fa8dcb53dfcd6d714b Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Tue, 9 Apr 2019 11:17:15 -0700 Subject: [PATCH] Improve banking_stage performance messages Use transaction count instead of batch count, and set the recv_start from when we finished processing the previous batch to get a more accurate number. --- core/src/banking_stage.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 73c13fc74e..1859f1e022 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -76,10 +76,16 @@ impl BankingStage { let poh_recorder = poh_recorder.clone(); let cluster_info = cluster_info.clone(); let exit = exit.clone(); + let mut recv_start = Instant::now(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { - Self::process_loop(&verified_receiver, &poh_recorder, &cluster_info); + Self::process_loop( + &verified_receiver, + &poh_recorder, + &cluster_info, + &mut recv_start, + ); exit.store(true, Ordering::Relaxed); }) .unwrap() @@ -168,6 +174,7 @@ impl BankingStage { verified_receiver: &Arc>>, poh_recorder: &Arc>, cluster_info: &Arc>, + recv_start: &mut Instant, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; @@ -181,7 +188,7 @@ impl BankingStage { buffered_packets.clear(); } - match Self::process_packets(&verified_receiver, &poh_recorder) { + match Self::process_packets(&verified_receiver, &poh_recorder, recv_start) { Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Ok(unprocessed_packets) => { if Self::should_buffer_packets(poh_recorder, cluster_info) { @@ -360,23 +367,22 @@ impl BankingStage { pub fn process_packets( verified_receiver: &Arc>>, poh: &Arc>, + recv_start: &mut Instant, ) -> Result { - let recv_start = Instant::now(); let mms = verified_receiver .lock() .unwrap() .recv_timeout(Duration::from_millis(100))?; - let mut reqs_len = 0; let mms_len = mms.len(); + let count = mms.iter().map(|x| x.1.len()).sum(); debug!( - "@{:?} process start stalled for: {:?}ms batches: {}", + "@{:?} process start stalled for: {:?}ms txs: {}", timing::timestamp(), timing::duration_as_ms(&recv_start.elapsed()), - mms.len(), + count, ); inc_new_counter_info!("banking_stage-entries_received", mms_len); - let count = mms.iter().map(|x| x.1.len()).sum(); let proc_start = Instant::now(); let mut new_tx_count = 0; @@ -397,7 +403,6 @@ impl BankingStage { debug!("banking-stage-tx bank {}", bank.slot()); let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); - reqs_len += transactions.len(); debug!( "bank: {} transactions received {}", @@ -443,16 +448,18 @@ impl BankingStage { let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); debug!( - "@{:?} done processing transaction batches: {} time: {:?}ms reqs: {} reqs/s: {}", + "@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {}", timing::timestamp(), mms_len, total_time_ms, - reqs_len, - (reqs_len as f32) / (total_time_s) + new_tx_count, + (new_tx_count as f32) / (total_time_s) ); inc_new_counter_info!("banking_stage-process_packets", count); inc_new_counter_info!("banking_stage-process_transactions", new_tx_count); + *recv_start = Instant::now(); + Ok(unprocessed_packets) } }