diff --git a/Cargo.lock b/Cargo.lock index 0750e56d5b..bbbbb0e413 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2212,6 +2212,7 @@ dependencies = [ "solana-exchange-program 0.17.0", "solana-kvstore 0.17.0", "solana-logger 0.17.0", + "solana-measure 0.17.0", "solana-merkle-tree 0.17.0", "solana-metrics 0.17.0", "solana-netutil 0.17.0", @@ -2588,6 +2589,13 @@ dependencies = [ "env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "solana-measure" +version = "0.17.0" +dependencies = [ + "solana-sdk 0.17.0", +] + [[package]] name = "solana-merkle-tree" version = "0.17.0" @@ -2667,6 +2675,7 @@ dependencies = [ "solana-bpf-loader-api 0.17.0", "solana-bpf-loader-program 0.17.0", "solana-logger 0.17.0", + "solana-measure 0.17.0", "solana-metrics 0.17.0", "solana-noop-program 0.17.0", "solana-sdk 0.17.0", diff --git a/Cargo.toml b/Cargo.toml index ffca17a82e..5e7bb9e3b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "ledger-tool", "logger", "merkle-tree", + "measure", "metrics", "netutil", "programs/bpf", diff --git a/core/Cargo.toml b/core/Cargo.toml index 23e095d883..adf5019046 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -58,6 +58,7 @@ solana-kvstore = { path = "../kvstore", version = "0.17.0", optional = true } solana-logger = { path = "../logger", version = "0.17.0" } solana-merkle-tree = { path = "../merkle-tree", version = "0.17.0" } solana-metrics = { path = "../metrics", version = "0.17.0" } +solana-measure = { path = "../measure", version = "0.17.0" } solana-netutil = { path = "../netutil", version = "0.17.0" } solana-runtime = { path = "../runtime", version = "0.17.0" } solana-sdk = { path = "../sdk", version = "0.17.0" } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 04b0a3a4f2..8df2c23dab 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -17,6 +17,7 @@ use crate::sigverify_stage::VerifiedPackets; use bincode::deserialize; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; +use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn}; use solana_runtime::accounts_db::ErrorCounters; use solana_runtime::bank::Bank; @@ -24,7 +25,7 @@ use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_sdk::poh_config::PohConfig; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{ - self, duration_as_us, DEFAULT_NUM_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE, + self, DEFAULT_NUM_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY, }; use solana_sdk::transaction::{self, Transaction, TransactionError}; @@ -160,7 +161,7 @@ impl BankingStage { let mut buffered_packets_iter = buffered_packets.drain(..); let mut dropped_batches_count = 0; - let proc_start = Instant::now(); + let mut proc_start = Measure::start("consume_buffered_process"); while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { let bank = poh_recorder.lock().unwrap().bank(); if bank.is_none() { @@ -218,16 +219,15 @@ impl BankingStage { } } - let total_time_s = timing::duration_as_s(&proc_start.elapsed()); - let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + proc_start.stop(); debug!( "@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}", timing::timestamp(), buffered_len, - total_time_ms, + proc_start.as_ms(), new_tx_count, - (new_tx_count as f32) / (total_time_s) + (new_tx_count as f32) / (proc_start.as_s()) ); inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets); @@ -423,6 +423,7 @@ impl BankingStage { poh: &Arc>, ) -> (Result<()>, Vec) { let mut ok_txs = vec![]; + let mut processed_generation = Measure::start("record::process_generation"); let processed_transactions: Vec<_> = results .iter() .zip(txs.iter()) @@ -438,6 +439,7 @@ impl BankingStage { } }) .collect(); + processed_generation.stop(); debug!("processed: {} ", processed_transactions.len()); // unlock all the accounts with errors which are filtered by the above `filter_map` @@ -446,7 +448,12 @@ impl BankingStage { "banking_stage-record_transactions", processed_transactions.len() ); + + let mut hash_time = Measure::start("record::hash"); let hash = hash_transactions(&processed_transactions[..]); + hash_time.stop(); + + let mut poh_record = Measure::start("record::poh_record"); // record and unlock will unlock all the successful transactions let res = poh .lock() @@ -462,6 +469,7 @@ impl BankingStage { } Err(_) => panic!("Poh recorder returned unexpected error"), } + poh_record.stop(); } (Ok(()), vec![]) } @@ -472,32 +480,34 @@ impl BankingStage { poh: &Arc>, lock_results: &LockedAccountsResults, ) -> (Result<()>, Vec) { - let now = Instant::now(); + let mut load_execute_time = Measure::start("load_execute_time"); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce // the likelihood of any single thread getting starved and processing old ids. // TODO: Banking stage threads should be prioritized to complete faster then this queue // expires. let (mut loaded_accounts, results, mut retryable_txs) = bank.load_and_execute_transactions(txs, lock_results, MAX_PROCESSING_AGE); - let load_execute_time = now.elapsed(); + load_execute_time.stop(); let freeze_lock = bank.freeze_lock(); let record_time = { - let now = Instant::now(); + let mut record_time = Measure::start("record_time"); let (res, retryable_record_txs) = Self::record_transactions(bank.slot(), txs, &results, poh); retryable_txs.extend(retryable_record_txs); if res.is_err() { return (res, retryable_txs); } - now.elapsed() + record_time.stop(); + record_time }; let commit_time = { - let now = Instant::now(); + let mut commit_time = Measure::start("commit_time"); bank.commit_transactions(txs, &mut loaded_accounts, &results); - now.elapsed() + commit_time.stop(); + commit_time }; drop(freeze_lock); @@ -505,9 +515,9 @@ impl BankingStage { debug!( "bank: {} load_execute: {}us record: {}us commit: {}us txs_len: {}", bank.slot(), - duration_as_us(&load_execute_time), - duration_as_us(&record_time), - duration_as_us(&commit_time), + load_execute_time.as_us(), + record_time.as_us(), + commit_time.as_us(), txs.len(), ); @@ -520,26 +530,26 @@ impl BankingStage { poh: &Arc>, chunk_offset: usize, ) -> (Result<()>, Vec) { - let now = Instant::now(); + let mut lock_time = Measure::start("lock_time"); // Once accounts are locked, other threads cannot encode transactions that will modify the // same account state let lock_results = bank.lock_accounts(txs); - let lock_time = now.elapsed(); + lock_time.stop(); let (result, mut retryable_txs) = Self::process_and_record_transactions_locked(bank, txs, poh, &lock_results); retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); - let now = Instant::now(); + let mut unlock_time = Measure::start("unlock_time"); // Once the accounts are new transactions can enter the pipeline to process them drop(lock_results); - let unlock_time = now.elapsed(); + unlock_time.stop(); debug!( "bank: {} lock: {}us unlock: {}us txs_len: {}", bank.slot(), - duration_as_us(&lock_time), - duration_as_us(&unlock_time), + lock_time.as_us(), + unlock_time.as_us(), txs.len(), ); @@ -780,7 +790,9 @@ impl BankingStage { id: u32, batch_limit: usize, ) -> Result { + let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?; + recv_time.stop(); let mms_len = mms.len(); let count: usize = mms.iter().map(|x| x.1.len()).sum(); @@ -792,7 +804,7 @@ impl BankingStage { id, ); inc_new_counter_debug!("banking_stage-transactions_received", count); - let proc_start = Instant::now(); + let mut proc_start = Measure::start("process_received_packets_process"); let mut new_tx_count = 0; let mut mms_iter = mms.into_iter(); @@ -850,19 +862,16 @@ impl BankingStage { } } - inc_new_counter_debug!( - "banking_stage-time_ms", - timing::duration_as_ms(&proc_start.elapsed()) as usize - ); - let total_time_s = timing::duration_as_s(&proc_start.elapsed()); - let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + proc_start.stop(); + + inc_new_counter_debug!("banking_stage-time_ms", proc_start.as_ms() as usize); debug!( "@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}", timing::timestamp(), mms_len, - total_time_ms, + proc_start.as_ms(), new_tx_count, - (new_tx_count as f32) / (total_time_s), + (new_tx_count as f32) / (proc_start.as_s()), count, id, ); diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index e9e72cf904..8155db2e05 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -14,12 +14,12 @@ use crate::sigverify; use crate::sigverify::TxOffset; use crate::streamer::{self, PacketReceiver}; use crossbeam_channel::Sender as CrossbeamSender; +use solana_measure::measure::Measure; use solana_metrics::{datapoint_info, inc_new_counter_info}; use solana_sdk::timing; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; -use std::time::Instant; #[cfg(feature = "cuda")] const RECV_BATCH_MAX: usize = 5_000; @@ -74,7 +74,7 @@ impl SigVerifyStage { )?; inc_new_counter_info!("sigverify_stage-packets_received", len); - let now = Instant::now(); + let mut verify_batch_time = Measure::start("sigverify_batch_time"); let batch_len = batch.len(); debug!( "@{:?} verifier: verifying: {} id: {}", @@ -86,31 +86,33 @@ impl SigVerifyStage { let verified_batch = Self::verify_batch(batch, sigverify_disabled, recycler, recycler_out); inc_new_counter_info!("sigverify_stage-verified_packets_send", len); - if sendr.send(verified_batch).is_err() { - return Err(Error::SendError); + for v in verified_batch { + if sendr.send(vec![v]).is_err() { + return Err(Error::SendError); + } } - let total_time_ms = timing::duration_as_ms(&now.elapsed()); - let total_time_s = timing::duration_as_s(&now.elapsed()); + verify_batch_time.stop(); + inc_new_counter_info!( "sigverify_stage-time_ms", - (total_time_ms + recv_time) as usize + (verify_batch_time.as_ms() + recv_time) as usize ); debug!( "@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}", timing::timestamp(), batch_len, - total_time_ms, + verify_batch_time.as_ms(), id, len, - (len as f32 / total_time_s) + (len as f32 / verify_batch_time.as_s()) ); datapoint_info!( "sigverify_stage-total_verify_time", ("batch_len", batch_len, i64), ("len", len, i64), - ("total_time_ms", total_time_ms, i64) + ("total_time_ms", verify_batch_time.as_ms(), i64) ); Ok(()) diff --git a/measure/.gitignore b/measure/.gitignore new file mode 100644 index 0000000000..b83d22266a --- /dev/null +++ b/measure/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/measure/Cargo.toml b/measure/Cargo.toml new file mode 100644 index 0000000000..62462ab0c1 --- /dev/null +++ b/measure/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "solana-measure" +description = "Blockchain, Rebuilt for Scale" +version = "0.17.0" +documentation = "https://docs.rs/solana" +homepage = "https://solana.com/" +readme = "../README.md" +repository = "https://github.com/solana-labs/solana" +authors = ["Solana Maintainers "] +license = "Apache-2.0" +edition = "2018" + +[dependencies] +solana-sdk = { path = "../sdk", version = "0.17.0" } diff --git a/measure/src/lib.rs b/measure/src/lib.rs new file mode 100644 index 0000000000..a669b00899 --- /dev/null +++ b/measure/src/lib.rs @@ -0,0 +1 @@ +pub mod measure; diff --git a/measure/src/measure.rs b/measure/src/measure.rs new file mode 100644 index 0000000000..bc0cd2a21a --- /dev/null +++ b/measure/src/measure.rs @@ -0,0 +1,49 @@ +use solana_sdk::timing::duration_as_ns; +use std::time::Instant; + +pub struct Measure { + start: Instant, + duration: u64, +} + +impl Measure { + pub fn start(_name: &'static str) -> Self { + Self { + start: Instant::now(), + duration: 0, + } + } + + pub fn stop(&mut self) { + self.duration = duration_as_ns(&self.start.elapsed()); + } + + pub fn as_us(&self) -> u64 { + self.duration / 1000 + } + + pub fn as_ms(&self) -> u64 { + self.duration / (1000 * 1000) + } + + pub fn as_s(&self) -> f32 { + self.duration as f32 / (1000.0f32 * 1000.0f32 * 1000.0f32) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread::sleep; + use std::time::Duration; + + #[test] + fn test_measure() { + let mut measure = Measure::start("test"); + sleep(Duration::from_secs(1)); + measure.stop(); + assert!(measure.as_s() >= 0.99f32 && measure.as_s() <= 1.01f32); + assert!(measure.as_ms() >= 990 && measure.as_ms() <= 1_010); + assert!(measure.as_us() >= 999_000 && measure.as_us() <= 1_010_000); + } +} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index b2066218ad..e3bfb70c48 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -26,6 +26,7 @@ serde = "1.0.94" serde_derive = "1.0.94" serde_json = "1.0.38" solana-logger = { path = "../logger", version = "0.17.0" } +solana-measure = { path = "../measure", version = "0.17.0" } solana-metrics = { path = "../metrics", version = "0.17.0" } solana-bpf-loader-api = { path = "../programs/bpf_loader_api", version = "0.17.0" } solana-bpf-loader-program = { path = "../programs/bpf_loader_program", version = "0.17.0" } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index a540fafd26..77c95d065e 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -28,6 +28,7 @@ use rayon::ThreadPool; use serde::de::{MapAccess, Visitor}; use serde::ser::{SerializeMap, Serializer}; use serde::{Deserialize, Serialize}; +use solana_measure::measure::Measure; use solana_sdk::account::{Account, LamportCredit}; use solana_sdk::pubkey::Pubkey; use std::collections::{HashMap, HashSet}; @@ -530,10 +531,12 @@ impl AccountsDB { ) -> (Vec<(Fork, AccountInfo)>, u64) { let mut reclaims: Vec<(Fork, AccountInfo)> = Vec::with_capacity(infos.len() * 2); let mut index = self.accounts_index.write().unwrap(); + let mut update_index_work = Measure::start("update_index_work"); for (info, account) in infos.into_iter().zip(accounts.iter()) { let key = &account.0; index.insert(fork_id, key, info, &mut reclaims); } + update_index_work.stop(); (reclaims, index.last_root) } @@ -582,20 +585,30 @@ impl AccountsDB { /// Store the account update. pub fn store(&self, fork_id: Fork, accounts: &HashMap<&Pubkey, &Account>) { + let mut store_accounts = Measure::start("store::store_accounts"); let infos = self.store_accounts(fork_id, accounts); + store_accounts.stop(); + let mut update_index = Measure::start("store::update_index"); let (reclaims, last_root) = self.update_index(fork_id, infos, accounts); + update_index.stop(); trace!("reclaim: {}", reclaims.len()); + let mut remove_dead_accounts = Measure::start("store::remove_dead"); let mut dead_forks = self.remove_dead_accounts(reclaims); + remove_dead_accounts.stop(); trace!("dead_forks: {}", dead_forks.len()); + let mut cleanup_dead_forks = Measure::start("store::cleanup_dead_forks"); self.cleanup_dead_forks(&mut dead_forks, last_root); + cleanup_dead_forks.stop(); trace!("purge_forks: {}", dead_forks.len()); + let mut purge_forks = Measure::start("store::purge_forks"); for fork in dead_forks { self.purge_fork(fork); } + purge_forks.stop(); } pub fn add_root(&self, fork: Fork) { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 2683890a2e..58a618e0dc 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -21,6 +21,7 @@ use crate::storage_utils::StorageAccounts; use bincode::{deserialize_from, serialize, serialize_into, serialized_size}; use log::*; use serde::{Deserialize, Serialize}; +use solana_measure::measure::Measure; use solana_metrics::{ datapoint_info, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info, }; @@ -38,7 +39,7 @@ use solana_sdk::syscall::{ tick_height, }; use solana_sdk::system_transaction; -use solana_sdk::timing::{duration_as_ms, duration_as_ns, duration_as_us, MAX_RECENT_BLOCKHASHES}; +use solana_sdk::timing::{duration_as_ns, MAX_RECENT_BLOCKHASHES}; use solana_sdk::transaction::{Result, Transaction, TransactionError}; use std::cmp; use std::collections::HashMap; @@ -46,7 +47,6 @@ use std::fmt; use std::io::{BufReader, Cursor, Read}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock, RwLockReadGuard}; -use std::time::Instant; pub const SECONDS_PER_YEAR: f64 = (365.0 * 24.0 * 60.0 * 60.0); @@ -538,23 +538,23 @@ impl Bank { let parents = self.parents(); *self.rc.parent.write().unwrap() = None; - let squash_accounts_start = Instant::now(); + let mut squash_accounts_time = Measure::start("squash_accounts_time"); for p in parents.iter().rev() { // root forks cannot be purged self.rc.accounts.add_root(p.slot()); } - let squash_accounts_ms = duration_as_ms(&squash_accounts_start.elapsed()); + squash_accounts_time.stop(); - let squash_cache_start = Instant::now(); + let mut squash_cache_time = Measure::start("squash_cache_time"); parents .iter() .for_each(|p| self.src.status_cache.write().unwrap().add_root(p.slot())); - let squash_cache_ms = duration_as_ms(&squash_cache_start.elapsed()); + squash_cache_time.stop(); datapoint_info!( "tower-observed", - ("squash_accounts_ms", squash_accounts_ms, i64), - ("squash_cache_ms", squash_cache_ms, i64) + ("squash_accounts_ms", squash_accounts_time.as_ms(), i64), + ("squash_cache_ms", squash_cache_time.as_ms(), i64) ); } @@ -946,7 +946,7 @@ impl Bank { ) { debug!("processing transactions: {}", txs.len()); let mut error_counters = ErrorCounters::default(); - let now = Instant::now(); + let mut load_time = Measure::start("accounts_load"); let retryable_txs: Vec<_> = lock_results .locked_accounts_results() @@ -966,9 +966,9 @@ impl Bank { &mut error_counters, ); let mut loaded_accounts = self.load_accounts(txs, sig_results, &mut error_counters); + load_time.stop(); - let load_elapsed = now.elapsed(); - let now = Instant::now(); + let mut execution_time = Measure::start("execution_time"); let mut signature_count = 0; let executed: Vec> = loaded_accounts .iter_mut() @@ -983,12 +983,12 @@ impl Bank { }) .collect(); - let execution_elapsed = now.elapsed(); + execution_time.stop(); debug!( "load: {}us execute: {}us txs_len={}", - duration_as_us(&load_elapsed), - duration_as_us(&execution_elapsed), + load_time.as_us(), + execution_time.as_us(), txs.len(), ); let mut tx_count = 0; @@ -1083,7 +1083,7 @@ impl Bank { // TODO: put this assert back in // assert!(!self.is_frozen()); - let now = Instant::now(); + let mut write_time = Measure::start("write_time"); self.rc .accounts .store_accounts(self.slot(), txs, executed, loaded_accounts); @@ -1091,12 +1091,8 @@ impl Bank { self.update_cached_accounts(txs, executed, loaded_accounts); // once committed there is no way to unroll - let write_elapsed = now.elapsed(); - debug!( - "store: {}us txs_len={}", - duration_as_us(&write_elapsed), - txs.len(), - ); + write_time.stop(); + debug!("store: {}us txs_len={}", write_time.as_us(), txs.len(),); self.update_transaction_statuses(txs, &executed); self.filter_program_errors_and_collect_fee(txs, executed) }