Poh timing service (#23736)

* initial work for poh timing report service

* add poh_timing_report_service to validator

* fix comments

* clippy

* imrove test coverage

* delete record when complete

* rename shred full to slot full.

* debug logging

* fix slot full

* remove debug comments

* adding fmt trait

* derive default

* default for poh timing reporter

* better comments

* remove commented code

* fix test

* more test fixes

* delete timestamps for slot that are older than root_slot

* debug log

* record poh start end in bank reset

* report full to start time instead

* fix poh slot offset

* report poh start for normal ticks

* fix typo

* refactor out poh point report fn

* rename

* optimize delete - delete only when last_root changed

* change log level to trace

* convert if to match

* remove redudant check

* fix SlotPohTiming comments

* review feedback on poh timing reporter

* review feedback on poh_recorder

* add test case for out-of-order arrival of timing points and incomplete timing points

* refactor poh_timing_points into its own mod

* remove option for poh_timing_report service

* move poh_timing_point_sender to constructor

* clippy

* better comments

* more clippy

* more clippy

* add slot poh timing point macro

* clippy

* assert in test

* comments and display fmt

* fix check

* assert format

* revise comments

* refactor

* extrac send fn

* revert reporting_poh_timing_point

* align loggin

* small refactor

* move type declaration to the top of the module

* replace macro with constructor

* clippy: remove redundant closure

* review comments

* simplify poh timing point creation

Co-authored-by: Haoran Yi <hyi@Haorans-MacBook-Air.local>
This commit is contained in:
HaoranYi
2022-03-30 09:04:49 -05:00
committed by GitHub
parent cda3d66b21
commit ba770832d0
8 changed files with 649 additions and 3 deletions

View File

@ -37,6 +37,8 @@ pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_hasher;
pub mod packet_threshold;
pub mod poh_timing_report_service;
pub mod poh_timing_reporter;
pub mod progress_map;
pub mod qos_service;
pub mod repair_generic_traversal;

View File

@ -0,0 +1,87 @@
//! PohTimingReportService module
use {
crate::poh_timing_reporter::PohTimingReporter,
solana_metrics::poh_timing_point::{PohTimingReceiver, SlotPohTimingInfo},
std::{
string::ToString,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
/// Timeout to wait on the poh timing points from the channel
const POH_TIMING_RECEIVER_TIMEOUT_MILLISECONDS: u64 = 1000;
/// The `poh_timing_report_service` receives signals of relevant timing points
/// during the processing of a slot, (i.e. from blockstore and poh), aggregate and
/// report the result as datapoints.
pub struct PohTimingReportService {
t_poh_timing: JoinHandle<()>,
}
impl PohTimingReportService {
pub fn new(receiver: PohTimingReceiver, exit: Arc<AtomicBool>) -> Self {
let exit_signal = exit;
let mut poh_timing_reporter = PohTimingReporter::default();
let t_poh_timing = Builder::new()
.name("poh_timing_report".to_string())
.spawn(move || loop {
if exit_signal.load(Ordering::Relaxed) {
break;
}
if let Ok(SlotPohTimingInfo {
slot,
root_slot,
timing_point,
}) = receiver.recv_timeout(Duration::from_millis(
POH_TIMING_RECEIVER_TIMEOUT_MILLISECONDS,
)) {
poh_timing_reporter.process(slot, root_slot, timing_point);
}
})
.unwrap();
Self { t_poh_timing }
}
pub fn join(self) -> thread::Result<()> {
self.t_poh_timing.join()
}
}
#[cfg(test)]
mod test {
use {
super::*, crossbeam_channel::unbounded, solana_metrics::poh_timing_point::SlotPohTimingInfo,
};
#[test]
/// Test the life cycle of the PohTimingReportService
fn test_poh_timing_report_service() {
let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded();
let exit = Arc::new(AtomicBool::new(false));
// Create the service
let poh_timing_report_service =
PohTimingReportService::new(poh_timing_point_receiver, exit.clone());
// Send SlotPohTimingPoint
let _ = poh_timing_point_sender.send(SlotPohTimingInfo::new_slot_start_poh_time_point(
42, None, 100,
));
let _ = poh_timing_point_sender.send(SlotPohTimingInfo::new_slot_end_poh_time_point(
42, None, 200,
));
let _ = poh_timing_point_sender.send(SlotPohTimingInfo::new_slot_full_poh_time_point(
42, None, 150,
));
// Shutdown the service
exit.store(true, Ordering::Relaxed);
poh_timing_report_service
.join()
.expect("poh_timing_report_service completed");
}
}

View File

@ -0,0 +1,239 @@
//! A poh_timing_reporter module implement poh timing point and timing reporter
//! structs.
use {
solana_metrics::{datapoint_info, poh_timing_point::PohTimingPoint},
solana_sdk::clock::Slot,
std::{collections::HashMap, fmt},
};
/// A SlotPohTimestamp records timing of the events during the processing of a
/// slot by the validator
#[derive(Debug, Clone, Copy, Default)]
pub struct SlotPohTimestamp {
/// Slot start time from poh
pub start_time: u64,
/// Slot end time from poh
pub end_time: u64,
/// Last shred received time from block producer
pub full_time: u64,
}
/// Display trait
impl fmt::Display for SlotPohTimestamp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"SlotPohTimestamp: start={} end={} full={}",
self.start_time, self.end_time, self.full_time
)
}
}
impl SlotPohTimestamp {
/// Return true if the timing points of all events are received.
pub fn is_complete(&self) -> bool {
self.start_time != 0 && self.end_time != 0 && self.full_time != 0
}
/// Update with timing point
pub fn update(&mut self, timing_point: PohTimingPoint) {
match timing_point {
PohTimingPoint::PohSlotStart(ts) => self.start_time = ts,
PohTimingPoint::PohSlotEnd(ts) => self.end_time = ts,
PohTimingPoint::FullSlotReceived(ts) => self.full_time = ts,
}
}
/// Return the time difference from slot start to slot full
fn slot_start_to_full_time(&self) -> i64 {
(self.full_time as i64).saturating_sub(self.start_time as i64)
}
/// Return the time difference from slot full to slot end
fn slot_full_to_end_time(&self) -> i64 {
(self.end_time as i64).saturating_sub(self.full_time as i64)
}
/// Report PohTiming for a slot
pub fn report(&self, slot: Slot) {
datapoint_info!(
"poh_slot_timing",
("slot", slot as i64, i64),
("start_time", self.start_time as i64, i64),
("end_time", self.end_time as i64, i64),
("full_time", self.full_time as i64, i64),
(
"start_to_full_time_diff",
self.slot_start_to_full_time(),
i64
),
("full_to_end_time_diff", self.slot_full_to_end_time(), i64),
);
}
}
/// A PohTimingReporter manages and reports the timing of events for incoming
/// slots
#[derive(Default)]
pub struct PohTimingReporter {
/// Storage map of SlotPohTimestamp per slot
slot_timestamps: HashMap<Slot, SlotPohTimestamp>,
last_root_slot: Slot,
}
impl PohTimingReporter {
/// Return true if PohTiming is complete for the slot
pub fn is_complete(&self, slot: Slot) -> bool {
if let Some(slot_timestamp) = self.slot_timestamps.get(&slot) {
slot_timestamp.is_complete()
} else {
false
}
}
/// Process incoming PohTimingPoint from the channel
pub fn process(&mut self, slot: Slot, root_slot: Option<Slot>, t: PohTimingPoint) -> bool {
let slot_timestamp = self
.slot_timestamps
.entry(slot)
.or_insert_with(SlotPohTimestamp::default);
slot_timestamp.update(t);
let is_completed = slot_timestamp.is_complete();
if is_completed {
slot_timestamp.report(slot);
}
// delete slots that are older than the root_slot
if let Some(root_slot) = root_slot {
if root_slot > self.last_root_slot {
self.slot_timestamps.retain(|&k, _| k >= root_slot);
self.last_root_slot = root_slot;
}
}
is_completed
}
/// Return the count of slot_timestamps in tracking
pub fn slot_count(&self) -> usize {
self.slot_timestamps.len()
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
/// Test poh_timing_reporter
fn test_poh_timing_reporter() {
// create a reporter
let mut reporter = PohTimingReporter::default();
// process all relevant PohTimingPoints for slot 42
let complete = reporter.process(42, None, PohTimingPoint::PohSlotStart(100));
assert!(!complete);
let complete = reporter.process(42, None, PohTimingPoint::PohSlotEnd(200));
assert!(!complete);
let complete = reporter.process(42, None, PohTimingPoint::FullSlotReceived(150));
// assert that the PohTiming is complete
assert!(complete);
// Move root to slot 43
let root = Some(43);
// process all relevant PohTimingPoints for slot 45
let complete = reporter.process(45, None, PohTimingPoint::PohSlotStart(100));
assert!(!complete);
let complete = reporter.process(45, None, PohTimingPoint::PohSlotEnd(200));
assert!(!complete);
let complete = reporter.process(45, root, PohTimingPoint::FullSlotReceived(150));
// assert that the PohTiming is complete
assert!(complete);
// assert that only one timestamp remains in track
assert_eq!(reporter.slot_count(), 1)
}
#[test]
/// Test poh_timing_reporter
fn test_poh_timing_reporter_out_of_order() {
// create a reporter
let mut reporter = PohTimingReporter::default();
// process all relevant PohTimingPoints for slot 42/43 out of order
let mut c = 0;
// slot_start 42
c += reporter.process(42, None, PohTimingPoint::PohSlotStart(100)) as i32;
// slot_full 42
c += reporter.process(42, None, PohTimingPoint::FullSlotReceived(120)) as i32;
// slot_full 43
c += reporter.process(43, None, PohTimingPoint::FullSlotReceived(140)) as i32;
// slot_end 42
c += reporter.process(42, None, PohTimingPoint::PohSlotEnd(200)) as i32;
// slot start 43
c += reporter.process(43, None, PohTimingPoint::PohSlotStart(100)) as i32;
// slot end 43
c += reporter.process(43, None, PohTimingPoint::PohSlotEnd(200)) as i32;
// assert that both timing points are complete
assert_eq!(c, 2);
// assert that both timestamps remain in track
assert_eq!(reporter.slot_count(), 2)
}
#[test]
/// Test poh_timing_reporter
fn test_poh_timing_reporter_never_complete() {
// create a reporter
let mut reporter = PohTimingReporter::default();
let mut c = 0;
// process all relevant PohTimingPoints for slot 42/43 out of order
// slot_start 42
c += reporter.process(42, None, PohTimingPoint::PohSlotStart(100)) as i32;
// slot_full 42
c += reporter.process(42, None, PohTimingPoint::FullSlotReceived(120)) as i32;
// slot_full 43
c += reporter.process(43, None, PohTimingPoint::FullSlotReceived(140)) as i32;
// skip slot 42, jump to slot 43
// slot start 43
c += reporter.process(43, None, PohTimingPoint::PohSlotStart(100)) as i32;
// slot end 43
c += reporter.process(43, None, PohTimingPoint::PohSlotEnd(200)) as i32;
// assert that only one timing point is complete
assert_eq!(c, 1);
// assert that both timestamp is in track
assert_eq!(reporter.slot_count(), 2)
}
#[test]
fn test_poh_timing_reporter_overflow() {
// create a reporter
let mut reporter = PohTimingReporter::default();
// process all relevant PohTimingPoints for a slot
let complete = reporter.process(42, None, PohTimingPoint::PohSlotStart(1647624609896));
assert!(!complete);
let complete = reporter.process(42, None, PohTimingPoint::PohSlotEnd(1647624610286));
assert!(!complete);
let complete = reporter.process(42, None, PohTimingPoint::FullSlotReceived(1647624610281));
// assert that the PohTiming is complete
assert!(complete);
}
#[test]
fn test_slot_poh_timestamp_fmt() {
let t = SlotPohTimestamp::default();
assert_eq!(format!("{}", t), "SlotPohTimestamp: start=0 end=0 full=0");
}
}

View File

@ -8,6 +8,7 @@ use {
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
consensus::{reconcile_blockstore_roots_with_tower, Tower},
poh_timing_report_service::PohTimingReportService,
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
sample_performance_service::SamplePerformanceService,
serve_repair::ServeRepair,
@ -44,7 +45,7 @@ use {
leader_schedule_cache::LeaderScheduleCache,
},
solana_measure::measure::Measure,
solana_metrics::datapoint_info,
solana_metrics::{datapoint_info, poh_timing_point::PohTimingSender},
solana_poh::{
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
poh_service::{self, PohService},
@ -323,6 +324,7 @@ pub struct Validator {
cache_block_meta_service: Option<CacheBlockMetaService>,
system_monitor_service: Option<SystemMonitorService>,
sample_performance_service: Option<SamplePerformanceService>,
poh_timing_report_service: PohTimingReportService,
stats_reporter_service: StatsReporterService,
gossip_service: GossipService,
serve_repair_service: ServeRepairService,
@ -482,6 +484,10 @@ impl Validator {
!config.no_os_network_stats_reporting,
));
let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded();
let poh_timing_report_service =
PohTimingReportService::new(poh_timing_point_receiver, exit.clone());
let (
genesis_config,
mut bank_forks,
@ -508,6 +514,7 @@ impl Validator {
&start_progress,
accounts_update_notifier,
transaction_notifier,
Some(poh_timing_point_sender.clone()),
);
let last_full_snapshot_slot = process_blockstore(
@ -525,7 +532,6 @@ impl Validator {
last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0));
maybe_warp_slot(config, ledger_path, &mut bank_forks, &leader_schedule_cache);
let tower = {
let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id);
if let Ok(tower) = &restored_tower {
@ -653,6 +659,7 @@ impl Validator {
blockstore.new_shreds_signals.first().cloned(),
&leader_schedule_cache,
&poh_config,
Some(poh_timing_point_sender),
exit.clone(),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
@ -973,6 +980,7 @@ impl Validator {
cache_block_meta_service,
system_monitor_service,
sample_performance_service,
poh_timing_report_service,
snapshot_packager_service,
completed_data_sets_service,
tpu,
@ -1109,6 +1117,10 @@ impl Validator {
if let Some(geyser_plugin_service) = self.geyser_plugin_service {
geyser_plugin_service.join().expect("geyser_plugin_service");
}
self.poh_timing_report_service
.join()
.expect("poh_timing_report_service");
}
}
@ -1247,6 +1259,7 @@ fn load_blockstore(
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
transaction_notifier: Option<TransactionNotifierLock>,
poh_timing_point_sender: Option<PohTimingSender>,
) -> (
GenesisConfig,
BankForks,
@ -1301,6 +1314,7 @@ fn load_blockstore(
)
.expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction);
blockstore.shred_timing_point_sender = poh_timing_point_sender;
let blockstore = Arc::new(blockstore);
let blockstore_root_scan = BlockstoreRootScan::new(config, &blockstore, exit);