Add Measure abstraction over measuring time intervals (#4851)

Allows one to swap in different implementations. This provides
the normal Insant::now() -> .elapsed() path.
This commit is contained in:
sakridge
2019-06-29 15:34:49 +02:00
committed by GitHub
parent 41bda18046
commit a89589a1d5
12 changed files with 158 additions and 61 deletions

View File

@ -17,6 +17,7 @@ use crate::sigverify_stage::VerifiedPackets;
use bincode::deserialize;
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn};
use solana_runtime::accounts_db::ErrorCounters;
use solana_runtime::bank::Bank;
@ -24,7 +25,7 @@ use solana_runtime::locked_accounts_results::LockedAccountsResults;
use solana_sdk::poh_config::PohConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{
self, duration_as_us, DEFAULT_NUM_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE,
self, DEFAULT_NUM_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE,
MAX_TRANSACTION_FORWARDING_DELAY,
};
use solana_sdk::transaction::{self, Transaction, TransactionError};
@ -160,7 +161,7 @@ impl BankingStage {
let mut buffered_packets_iter = buffered_packets.drain(..);
let mut dropped_batches_count = 0;
let proc_start = Instant::now();
let mut proc_start = Measure::start("consume_buffered_process");
while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() {
let bank = poh_recorder.lock().unwrap().bank();
if bank.is_none() {
@ -218,16 +219,15 @@ impl BankingStage {
}
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
proc_start.stop();
debug!(
"@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}",
timing::timestamp(),
buffered_len,
total_time_ms,
proc_start.as_ms(),
new_tx_count,
(new_tx_count as f32) / (total_time_s)
(new_tx_count as f32) / (proc_start.as_s())
);
inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets);
@ -423,6 +423,7 @@ impl BankingStage {
poh: &Arc<Mutex<PohRecorder>>,
) -> (Result<()>, Vec<usize>) {
let mut ok_txs = vec![];
let mut processed_generation = Measure::start("record::process_generation");
let processed_transactions: Vec<_> = results
.iter()
.zip(txs.iter())
@ -438,6 +439,7 @@ impl BankingStage {
}
})
.collect();
processed_generation.stop();
debug!("processed: {} ", processed_transactions.len());
// unlock all the accounts with errors which are filtered by the above `filter_map`
@ -446,7 +448,12 @@ impl BankingStage {
"banking_stage-record_transactions",
processed_transactions.len()
);
let mut hash_time = Measure::start("record::hash");
let hash = hash_transactions(&processed_transactions[..]);
hash_time.stop();
let mut poh_record = Measure::start("record::poh_record");
// record and unlock will unlock all the successful transactions
let res = poh
.lock()
@ -462,6 +469,7 @@ impl BankingStage {
}
Err(_) => panic!("Poh recorder returned unexpected error"),
}
poh_record.stop();
}
(Ok(()), vec![])
}
@ -472,32 +480,34 @@ impl BankingStage {
poh: &Arc<Mutex<PohRecorder>>,
lock_results: &LockedAccountsResults,
) -> (Result<()>, Vec<usize>) {
let now = Instant::now();
let mut load_execute_time = Measure::start("load_execute_time");
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioritized to complete faster then this queue
// expires.
let (mut loaded_accounts, results, mut retryable_txs) =
bank.load_and_execute_transactions(txs, lock_results, MAX_PROCESSING_AGE);
let load_execute_time = now.elapsed();
load_execute_time.stop();
let freeze_lock = bank.freeze_lock();
let record_time = {
let now = Instant::now();
let mut record_time = Measure::start("record_time");
let (res, retryable_record_txs) =
Self::record_transactions(bank.slot(), txs, &results, poh);
retryable_txs.extend(retryable_record_txs);
if res.is_err() {
return (res, retryable_txs);
}
now.elapsed()
record_time.stop();
record_time
};
let commit_time = {
let now = Instant::now();
let mut commit_time = Measure::start("commit_time");
bank.commit_transactions(txs, &mut loaded_accounts, &results);
now.elapsed()
commit_time.stop();
commit_time
};
drop(freeze_lock);
@ -505,9 +515,9 @@ impl BankingStage {
debug!(
"bank: {} load_execute: {}us record: {}us commit: {}us txs_len: {}",
bank.slot(),
duration_as_us(&load_execute_time),
duration_as_us(&record_time),
duration_as_us(&commit_time),
load_execute_time.as_us(),
record_time.as_us(),
commit_time.as_us(),
txs.len(),
);
@ -520,26 +530,26 @@ impl BankingStage {
poh: &Arc<Mutex<PohRecorder>>,
chunk_offset: usize,
) -> (Result<()>, Vec<usize>) {
let now = Instant::now();
let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let lock_results = bank.lock_accounts(txs);
let lock_time = now.elapsed();
lock_time.stop();
let (result, mut retryable_txs) =
Self::process_and_record_transactions_locked(bank, txs, poh, &lock_results);
retryable_txs.iter_mut().for_each(|x| *x += chunk_offset);
let now = Instant::now();
let mut unlock_time = Measure::start("unlock_time");
// Once the accounts are new transactions can enter the pipeline to process them
drop(lock_results);
let unlock_time = now.elapsed();
unlock_time.stop();
debug!(
"bank: {} lock: {}us unlock: {}us txs_len: {}",
bank.slot(),
duration_as_us(&lock_time),
duration_as_us(&unlock_time),
lock_time.as_us(),
unlock_time.as_us(),
txs.len(),
);
@ -780,7 +790,9 @@ impl BankingStage {
id: u32,
batch_limit: usize,
) -> Result<UnprocessedPackets> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
recv_time.stop();
let mms_len = mms.len();
let count: usize = mms.iter().map(|x| x.1.len()).sum();
@ -792,7 +804,7 @@ impl BankingStage {
id,
);
inc_new_counter_debug!("banking_stage-transactions_received", count);
let proc_start = Instant::now();
let mut proc_start = Measure::start("process_received_packets_process");
let mut new_tx_count = 0;
let mut mms_iter = mms.into_iter();
@ -850,19 +862,16 @@ impl BankingStage {
}
}
inc_new_counter_debug!(
"banking_stage-time_ms",
timing::duration_as_ms(&proc_start.elapsed()) as usize
);
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
proc_start.stop();
inc_new_counter_debug!("banking_stage-time_ms", proc_start.as_ms() as usize);
debug!(
"@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}",
timing::timestamp(),
mms_len,
total_time_ms,
proc_start.as_ms(),
new_tx_count,
(new_tx_count as f32) / (total_time_s),
(new_tx_count as f32) / (proc_start.as_s()),
count,
id,
);

View File

@ -14,12 +14,12 @@ use crate::sigverify;
use crate::sigverify::TxOffset;
use crate::streamer::{self, PacketReceiver};
use crossbeam_channel::Sender as CrossbeamSender;
use solana_measure::measure::Measure;
use solana_metrics::{datapoint_info, inc_new_counter_info};
use solana_sdk::timing;
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
#[cfg(feature = "cuda")]
const RECV_BATCH_MAX: usize = 5_000;
@ -74,7 +74,7 @@ impl SigVerifyStage {
)?;
inc_new_counter_info!("sigverify_stage-packets_received", len);
let now = Instant::now();
let mut verify_batch_time = Measure::start("sigverify_batch_time");
let batch_len = batch.len();
debug!(
"@{:?} verifier: verifying: {} id: {}",
@ -86,31 +86,33 @@ impl SigVerifyStage {
let verified_batch = Self::verify_batch(batch, sigverify_disabled, recycler, recycler_out);
inc_new_counter_info!("sigverify_stage-verified_packets_send", len);
if sendr.send(verified_batch).is_err() {
return Err(Error::SendError);
for v in verified_batch {
if sendr.send(vec![v]).is_err() {
return Err(Error::SendError);
}
}
let total_time_ms = timing::duration_as_ms(&now.elapsed());
let total_time_s = timing::duration_as_s(&now.elapsed());
verify_batch_time.stop();
inc_new_counter_info!(
"sigverify_stage-time_ms",
(total_time_ms + recv_time) as usize
(verify_batch_time.as_ms() + recv_time) as usize
);
debug!(
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
timing::timestamp(),
batch_len,
total_time_ms,
verify_batch_time.as_ms(),
id,
len,
(len as f32 / total_time_s)
(len as f32 / verify_batch_time.as_s())
);
datapoint_info!(
"sigverify_stage-total_verify_time",
("batch_len", batch_len, i64),
("len", len, i64),
("total_time_ms", total_time_ms, i64)
("total_time_ms", verify_batch_time.as_ms(), i64)
);
Ok(())