diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index d4538e5d01..0488910e7d 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -10,6 +10,7 @@ //! For Entries: //! * recorded entry must be >= WorkingBank::min_tick_height && entry must be < WorkingBank::max_tick_height //! +use crate::poh_service::PohService; use solana_ledger::blockstore::Blockstore; use solana_ledger::entry::Entry; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; @@ -151,6 +152,7 @@ pub struct PohRecorder { leader_schedule_cache: Arc, poh_config: Arc, ticks_per_slot: u64, + target_ns_per_tick: u64, record_lock_contention_us: u64, flush_cache_no_tick_us: u64, flush_cache_tick_us: u64, @@ -158,6 +160,7 @@ pub struct PohRecorder { send_us: u64, tick_lock_contention_us: u64, tick_overhead_us: u64, + total_sleep_us: u64, record_us: u64, ticks_from_record: u64, last_metric: Instant, @@ -462,7 +465,17 @@ impl PohRecorder { pub fn tick(&mut self) { let now = Instant::now(); - let poh_entry = self.poh.lock().unwrap().tick(); + let (poh_entry, target_time) = { + let mut poh_l = self.poh.lock().unwrap(); + let poh_entry = poh_l.tick(); + let target_time = if poh_entry.is_some() { + Some(poh_l.target_poh_time(self.target_ns_per_tick)) + } else { + None + }; + + (poh_entry, target_time) + }; self.tick_lock_contention_us += timing::duration_as_us(&now.elapsed()); let now = Instant::now(); if let Some(poh_entry) = poh_entry { @@ -485,6 +498,15 @@ impl PohRecorder { self.tick_cache.push((entry, self.tick_height)); let _ = self.flush_cache(true); self.flush_cache_tick_us += timing::duration_as_us(&now.elapsed()); + let target_time = target_time.unwrap(); + // sleep is not accurate enough to get a predictable time. + // Kernel can not schedule the thread for a while. + let started_waiting = Instant::now(); + while Instant::now() < target_time { + // TODO: a caller could possibly desire to reset or record while we're spinning here + std::hint::spin_loop(); + } + self.total_sleep_us += started_waiting.elapsed().as_nanos() as u64 / 1000; } } @@ -500,6 +522,7 @@ impl PohRecorder { ("prepare_send_us", self.prepare_send_us, i64), ("send_us", self.send_us, i64), ("ticks_from_record", self.ticks_from_record, i64), + ("total_sleep_us", self.total_sleep_us, i64), ("tick_overhead", self.tick_overhead_us, i64), ( "record_lock_contention", @@ -511,6 +534,7 @@ impl PohRecorder { self.tick_lock_contention_us = 0; self.record_us = 0; self.tick_overhead_us = 0; + self.total_sleep_us = 0; self.record_lock_contention_us = 0; self.flush_cache_no_tick_us = 0; self.flush_cache_tick_us = 0; @@ -588,10 +612,18 @@ impl PohRecorder { leader_schedule_cache: &Arc, poh_config: &Arc, ) -> (Self, Receiver, Receiver) { - let poh = Arc::new(Mutex::new(Poh::new( + let tick_number = 0; + let poh = Arc::new(Mutex::new(Poh::new_with_slot_info( last_entry_hash, poh_config.hashes_per_tick, + ticks_per_slot, + tick_number, ))); + + let target_ns_per_tick = PohService::target_ns_per_tick( + ticks_per_slot, + poh_config.target_tick_duration.as_nanos() as u64, + ); let (sender, receiver) = channel(); let (record_sender, record_receiver) = channel(); let (leader_first_tick_height, leader_last_tick_height, grace_ticks) = @@ -613,6 +645,7 @@ impl PohRecorder { blockstore: blockstore.clone(), leader_schedule_cache: leader_schedule_cache.clone(), ticks_per_slot, + target_ns_per_tick, poh_config: poh_config.clone(), record_lock_contention_us: 0, flush_cache_tick_us: 0, @@ -622,6 +655,7 @@ impl PohRecorder { tick_lock_contention_us: 0, record_us: 0, tick_overhead_us: 0, + total_sleep_us: 0, ticks_from_record: 0, last_metric: Instant::now(), record_sender, diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index ab7c2665ce..d9aedda190 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -116,17 +116,9 @@ impl PohService { if let Some(cores) = core_affinity::get_core_ids() { core_affinity::set_for_current(cores[pinned_cpu_core]); } - // Account for some extra time outside of PoH generation to account - // for processing time outside PoH. - let adjustment_per_tick = if ticks_per_slot > 0 { - TARGET_SLOT_ADJUSTMENT_NS / ticks_per_slot - } else { - 0 - }; Self::tick_producer( poh_recorder, &poh_exit_, - poh_config.target_tick_duration.as_nanos() as u64 - adjustment_per_tick, ticks_per_slot, hashes_per_batch, record_receiver, @@ -139,6 +131,17 @@ impl PohService { Self { tick_producer } } + pub fn target_ns_per_tick(ticks_per_slot: u64, target_tick_duration_ns: u64) -> u64 { + // Account for some extra time outside of PoH generation to account + // for processing time outside PoH. + let adjustment_per_tick = if ticks_per_slot > 0 { + TARGET_SLOT_ADJUSTMENT_NS / ticks_per_slot + } else { + 0 + }; + target_tick_duration_ns.saturating_sub(adjustment_per_tick) + } + fn sleepy_tick_producer( poh_recorder: Arc>, poh_config: &PohConfig, @@ -199,6 +202,7 @@ impl PohService { } } + // returns true if we need to tick fn record_or_hash( next_record: &mut Option, poh_recorder: &Arc>, @@ -253,7 +257,8 @@ impl PohService { hash_time.stop(); timing.total_hash_time_ns += hash_time.as_ns(); if should_tick { - return true; // nothing else can be done. tick required. + // nothing else can be done. tick required. + return true; } // check to see if a record request has been sent let get_again = record_receiver.try_recv(); @@ -276,13 +281,11 @@ impl PohService { fn tick_producer( poh_recorder: Arc>, poh_exit: &AtomicBool, - target_tick_ns: u64, ticks_per_slot: u64, hashes_per_batch: u64, record_receiver: Receiver, ) { let poh = poh_recorder.lock().unwrap().poh.clone(); - let mut now = Instant::now(); let mut timing = PohTiming::new(); let mut next_record = None; loop { @@ -307,14 +310,6 @@ impl PohService { timing.total_tick_time_ns += tick_time.as_ns(); } timing.num_ticks += 1; - let elapsed_ns = now.elapsed().as_nanos() as u64; - // sleep is not accurate enough to get a predictable time. - // Kernel can not schedule the thread for a while. - while (now.elapsed().as_nanos() as u64) < target_tick_ns { - std::hint::spin_loop(); - } - timing.total_sleep_us += (now.elapsed().as_nanos() as u64 - elapsed_ns) / 1000; - now = Instant::now(); timing.report(ticks_per_slot); if poh_exit.load(Ordering::Relaxed) { diff --git a/ledger/src/poh.rs b/ledger/src/poh.rs index 7edc629f59..c230afc8b9 100644 --- a/ledger/src/poh.rs +++ b/ledger/src/poh.rs @@ -8,6 +8,9 @@ pub struct Poh { num_hashes: u64, hashes_per_tick: u64, remaining_hashes: u64, + ticks_per_slot: u64, + tick_number: u64, + slot_start_time: Instant, } #[derive(Debug)] @@ -18,23 +21,47 @@ pub struct PohEntry { impl Poh { pub fn new(hash: Hash, hashes_per_tick: Option) -> Self { + Self::new_with_slot_info(hash, hashes_per_tick, 0, 0) + } + + pub fn new_with_slot_info( + hash: Hash, + hashes_per_tick: Option, + ticks_per_slot: u64, + tick_number: u64, + ) -> Self { let hashes_per_tick = hashes_per_tick.unwrap_or(std::u64::MAX); assert!(hashes_per_tick > 1); + let now = Instant::now(); Poh { hash, num_hashes: 0, hashes_per_tick, remaining_hashes: hashes_per_tick, + ticks_per_slot, + tick_number, + slot_start_time: now, } } pub fn reset(&mut self, hash: Hash, hashes_per_tick: Option) { - let mut poh = Poh::new(hash, hashes_per_tick); + // retains ticks_per_slot: this cannot change without restarting the validator + let tick_number = 0; + let mut poh = + Poh::new_with_slot_info(hash, hashes_per_tick, self.ticks_per_slot, tick_number); std::mem::swap(&mut poh, self); } + pub fn target_poh_time(&self, target_ns_per_tick: u64) -> Instant { + assert!(self.hashes_per_tick > 0); + let offset_tick_ns = target_ns_per_tick * self.tick_number; + let offset_ns = target_ns_per_tick * self.num_hashes / self.hashes_per_tick; + self.slot_start_time + Duration::from_nanos(offset_ns + offset_tick_ns) + } + pub fn hash(&mut self, max_num_hashes: u64) -> bool { let num_hashes = std::cmp::min(self.remaining_hashes - 1, max_num_hashes); + for _ in 0..num_hashes { self.hash = hash(&self.hash.as_ref()); } @@ -75,6 +102,7 @@ impl Poh { let num_hashes = self.num_hashes; self.remaining_hashes = self.hashes_per_tick; self.num_hashes = 0; + self.tick_number += 1; Some(PohEntry { num_hashes, hash: self.hash, @@ -102,6 +130,7 @@ mod tests { use crate::poh::{Poh, PohEntry}; use matches::assert_matches; use solana_sdk::hash::{hash, hashv, Hash}; + use std::time::Duration; fn verify(initial_hash: Hash, entries: &[(PohEntry, Option)]) -> bool { let mut current_hash = initial_hash; @@ -124,6 +153,42 @@ mod tests { true } + #[test] + fn test_target_poh_time() { + let zero = Hash::default(); + for target_ns_per_tick in 10..12 { + let mut poh = Poh::new(zero, None); + assert_eq!(poh.target_poh_time(target_ns_per_tick), poh.slot_start_time); + poh.tick_number = 2; + assert_eq!( + poh.target_poh_time(target_ns_per_tick), + poh.slot_start_time + Duration::from_nanos(target_ns_per_tick * 2) + ); + let mut poh = Poh::new(zero, Some(5)); + assert_eq!(poh.target_poh_time(target_ns_per_tick), poh.slot_start_time); + poh.tick_number = 2; + assert_eq!( + poh.target_poh_time(target_ns_per_tick), + poh.slot_start_time + Duration::from_nanos(target_ns_per_tick * 2) + ); + poh.num_hashes = 3; + assert_eq!( + poh.target_poh_time(target_ns_per_tick), + poh.slot_start_time + + Duration::from_nanos(target_ns_per_tick * 2 + target_ns_per_tick * 3 / 5) + ); + } + } + + #[test] + #[should_panic(expected = "assertion failed: hashes_per_tick > 1")] + fn test_target_poh_time_hashes_per_tick() { + let zero = Hash::default(); + let poh = Poh::new(zero, Some(0)); + let target_ns_per_tick = 10; + poh.target_poh_time(target_ns_per_tick); + } + #[test] fn test_poh_verify() { let zero = Hash::default();