From a9553cb40153e60904ca39e05d9c3932f0533916 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 17 Jun 2020 19:48:05 +0000 Subject: [PATCH] Entry verify cleanup and gossip counters (#10632) (#10650) * Add prune message counter * Switch to us verification time to match other counters * Add separate transaction/poh verify timing Co-authored-by: sakridge Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- core/src/cluster_info.rs | 23 ++++-- core/src/progress_map.rs | 11 ++- ledger/src/blockstore_processor.rs | 12 ++-- ledger/src/entry.rs | 110 ++++++++++++++++------------- 4 files changed, 94 insertions(+), 62 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4cff79f08c..07caaeb457 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -226,6 +226,8 @@ struct GossipStats { process_prune: Counter, process_push_message: Counter, prune_received_cache: Counter, + prune_message_count: Counter, + prune_message_len: Counter, purge: Counter, epoch_slots_lookup: Counter, epoch_slots_push: Counter, @@ -1641,7 +1643,7 @@ impl ClusterInfo { } ret }); - let rsp = Self::handle_push_message(self, recycler, &from, data, stakes); + let rsp = self.handle_push_message(recycler, &from, data, stakes); if let Some(rsp) = rsp { let _ignore_disconnect = response_sender.send(rsp); } @@ -1653,11 +1655,10 @@ impl ClusterInfo { Protocol::PruneMessage(from, data) => { let start = allocated.get(); if data.verify() { - inc_new_counter_debug!("cluster_info-prune_message", 1); - inc_new_counter_debug!( - "cluster_info-prune_message-size", - data.prunes.len() - ); + self.stats.prune_message_count.add_relaxed(1); + self.stats + .prune_message_len + .add_relaxed(data.prunes.len() as u64); match self .time_gossip_write_lock("process_prune", &self.stats.process_prune) .process_prune_msg( @@ -2242,6 +2243,16 @@ impl ClusterInfo { self.stats.new_pull_requests_count.clear(), i64 ), + ( + "prune_message_count", + self.stats.prune_message_count.clear(), + i64 + ), + ( + "prune_message_len", + self.stats.prune_message_len.clear(), + i64 + ), ); *last_print = Instant::now(); diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs index 15a50b62c8..70e5c874ab 100644 --- a/core/src/progress_map.rs +++ b/core/src/progress_map.rs @@ -42,7 +42,16 @@ impl ReplaySlotStats { self.fetch_fail_elapsed as i64, i64 ), - ("entry_verification_time", self.verify_elapsed as i64, i64), + ( + "entry_poh_verification_time", + self.poh_verify_elapsed as i64, + i64 + ), + ( + "entry_transaction_verification_time", + self.transaction_verify_elapsed as i64, + i64 + ), ("replay_time", self.replay_elapsed as i64, i64), ( "replay_total_elapsed", diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 915294f703..d1de25eefa 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -484,7 +484,8 @@ fn confirm_full_slot( pub struct ConfirmationTiming { pub started: Instant, pub replay_elapsed: u64, - pub verify_elapsed: u64, + pub poh_verify_elapsed: u64, + pub transaction_verify_elapsed: u64, pub fetch_elapsed: u64, pub fetch_fail_elapsed: u64, } @@ -494,7 +495,8 @@ impl Default for ConfirmationTiming { Self { started: Instant::now(), replay_elapsed: 0, - verify_elapsed: 0, + poh_verify_elapsed: 0, + transaction_verify_elapsed: 0, fetch_elapsed: 0, fetch_fail_elapsed: 0, } @@ -599,11 +601,13 @@ pub fn confirm_slot( timing.replay_elapsed += replay_elapsed.as_us(); if let Some(mut verifier) = verifier { - if !verifier.finish_verify(&entries) { + let verified = verifier.finish_verify(&entries); + timing.poh_verify_elapsed += verifier.poh_duration_us(); + timing.transaction_verify_elapsed += verifier.transaction_duration_us(); + if !verified { warn!("Ledger proof of history failed at slot: {}", bank.slot()); return Err(BlockError::InvalidEntryHash.into()); } - timing.verify_elapsed += verifier.duration_ms(); } process_result?; diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index 0ea0c4221f..96ea6bb91a 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -151,12 +151,22 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction } } -pub struct VerificationData { +pub struct GpuVerificationData { thread_h: Option>, - verification_status: EntryVerificationStatus, hashes: Option>>>, tx_hashes: Vec>, - duration_ms: u64, +} + +pub enum DeviceVerificationData { + CPU(), + GPU(GpuVerificationData), +} + +pub struct EntryVerificationState { + verification_status: EntryVerificationStatus, + poh_duration_us: u64, + transaction_duration_us: u64, + device_verification_data: DeviceVerificationData, } #[derive(Default, Clone)] @@ -172,33 +182,30 @@ pub enum EntryVerificationStatus { Pending, } -pub enum EntryVerificationState { - CPU(VerificationData), - GPU(VerificationData), -} - impl EntryVerificationState { pub fn status(&self) -> EntryVerificationStatus { - match self { - EntryVerificationState::CPU(state) => state.verification_status, - EntryVerificationState::GPU(state) => state.verification_status, - } + self.verification_status } - pub fn duration_ms(&self) -> u64 { - match self { - EntryVerificationState::CPU(state) => state.duration_ms, - EntryVerificationState::GPU(state) => state.duration_ms, - } + pub fn poh_duration_us(&self) -> u64 { + self.poh_duration_us + } + + pub fn set_transaction_duration_us(&mut self, new: u64) { + self.transaction_duration_us = new; + } + + pub fn transaction_duration_us(&self) -> u64 { + self.transaction_duration_us } pub fn finish_verify(&mut self, entries: &[Entry]) -> bool { - match self { - EntryVerificationState::GPU(verification_state) => { - let gpu_time_ms = verification_state.thread_h.take().unwrap().join().unwrap(); + match &mut self.device_verification_data { + DeviceVerificationData::GPU(verification_state) => { + let gpu_time_us = verification_state.thread_h.take().unwrap().join().unwrap(); let mut verify_check_time = Measure::start("verify_check"); - let hashes = verification_state.hashes.take().expect("hashes.as_ref"); + let hashes = verification_state.hashes.take().unwrap(); let hashes = Arc::try_unwrap(hashes) .expect("unwrap Arc") .into_inner() @@ -225,21 +232,17 @@ impl EntryVerificationState { }); verify_check_time.stop(); - verification_state.duration_ms += gpu_time_ms + verify_check_time.as_ms(); - inc_new_counter_info!( - "entry_verify-duration", - verification_state.duration_ms as usize - ); + self.poh_duration_us += gpu_time_us + verify_check_time.as_us(); - verification_state.verification_status = if res { + self.verification_status = if res { EntryVerificationStatus::Success } else { EntryVerificationStatus::Failure }; res } - EntryVerificationState::CPU(verification_state) => { - verification_state.verification_status == EntryVerificationStatus::Success + DeviceVerificationData::CPU() => { + self.verification_status == EntryVerificationStatus::Success } } } @@ -290,19 +293,17 @@ impl EntrySlice for [Entry] { }) }) }); - let duration_ms = timing::duration_as_ms(&now.elapsed()); - inc_new_counter_warn!("entry_verify-duration", duration_ms as usize); - EntryVerificationState::CPU(VerificationData { - thread_h: None, + let poh_duration_us = timing::duration_as_us(&now.elapsed()); + EntryVerificationState { verification_status: if res { EntryVerificationStatus::Success } else { EntryVerificationStatus::Failure }, - hashes: None, - tx_hashes: vec![], - duration_ms, - }) + poh_duration_us, + transaction_duration_us: 0, + device_verification_data: DeviceVerificationData::CPU(), + } } fn verify_transaction_signatures(&self) -> bool { @@ -324,19 +325,22 @@ impl EntrySlice for [Entry] { ) -> EntryVerificationState { let start = Instant::now(); let res = self.verify_transaction_signatures(); + let transaction_duration_us = timing::duration_as_us(&start.elapsed()); if !res { - return EntryVerificationState::CPU(VerificationData { - thread_h: None, + return EntryVerificationState { verification_status: EntryVerificationStatus::Failure, - duration_ms: timing::duration_as_ms(&start.elapsed()), - hashes: None, - tx_hashes: vec![], - }); + transaction_duration_us, + poh_duration_us: 0, + device_verification_data: DeviceVerificationData::CPU(), + }; } + let start = Instant::now(); let api = perf_libs::api(); if api.is_none() { - return self.verify_cpu(start_hash); + let mut res: EntryVerificationState = self.verify_cpu(start_hash); + res.set_transaction_duration_us(transaction_duration_us); + return res; } let api = api.unwrap(); inc_new_counter_warn!("entry_verify-num_entries", self.len() as usize); @@ -386,11 +390,11 @@ impl EntrySlice for [Entry] { if res != 0 { panic!("GPU PoH verify many failed"); } - inc_new_counter_warn!( + inc_new_counter_info!( "entry_verify-gpu_thread", - timing::duration_as_ms(&gpu_wait.elapsed()) as usize + timing::duration_as_us(&gpu_wait.elapsed()) as usize ); - timing::duration_as_ms(&gpu_wait.elapsed()) + timing::duration_as_us(&gpu_wait.elapsed()) }); let tx_hashes = PAR_THREAD_POOL.with(|thread_pool| { @@ -407,13 +411,17 @@ impl EntrySlice for [Entry] { }) }); - EntryVerificationState::GPU(VerificationData { + let device_verification_data = DeviceVerificationData::GPU(GpuVerificationData { thread_h: Some(gpu_verify_thread), - verification_status: EntryVerificationStatus::Pending, tx_hashes, - duration_ms: timing::duration_as_ms(&start.elapsed()), hashes: Some(hashes), - }) + }); + EntryVerificationState { + verification_status: EntryVerificationStatus::Pending, + poh_duration_us: timing::duration_as_us(&start.elapsed()), + transaction_duration_us, + device_verification_data, + } } fn verify_tick_hash_count(&self, tick_hash_count: &mut u64, hashes_per_tick: u64) -> bool {