From 4a4a1db836eceab55a73cfeeb1baaef58906cdbb Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 1 Apr 2022 14:50:12 +0000 Subject: [PATCH] expands lifetime of SlotStats (#23872) (#24002) Current slot stats are removed when the slot is full or every 30 seconds if the slot is before root: https://github.com/solana-labs/solana/blob/493a8e234/ledger/src/blockstore.rs#L2017-L2027 In order to track if the slot is ultimately marked as dead or rooted and emit more metrics, this commit expands lifetime of SlotStats while bounding total size of cache using an LRU eviction policy. (cherry picked from commit 1f9c89c1e8506346f7583a20c9203ab7360f1fd4) Co-authored-by: behzad nouri --- Cargo.lock | 2 + ledger/Cargo.toml | 2 + ledger/src/blockstore.rs | 107 ++++++++------------------------------- ledger/src/lib.rs | 1 + ledger/src/slot_stats.rs | 90 ++++++++++++++++++++++++++++++++ 5 files changed, 115 insertions(+), 87 deletions(-) create mode 100644 ledger/src/slot_stats.rs diff --git a/Cargo.lock b/Cargo.lock index f0970c210e..fbdfeaabda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5074,6 +5074,7 @@ version = "1.10.6" dependencies = [ "assert_matches", "bincode", + "bitflags", "byteorder", "chrono", "chrono-humanize", @@ -5084,6 +5085,7 @@ dependencies = [ "lazy_static", "libc", "log", + "lru", "matches", "num-derive", "num-traits", diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index d53ec45973..18c9c8fc63 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -11,6 +11,7 @@ edition = "2021" [dependencies] bincode = "1.3.3" +bitflags = "1.3.1" byteorder = "1.4.3" chrono = { version = "0.4.11", features = ["serde"] } chrono-humanize = "0.2.1" @@ -21,6 +22,7 @@ itertools = "0.10.3" lazy_static = "1.4.0" libc = "0.2.120" log = { version = "0.4.14" } +lru = "0.7.3" num-derive = "0.3" num-traits = "0.2" num_cpus = "1.13.1" diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 5bed5c8e05..392be5393b 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -16,6 +16,7 @@ use { max_ticks_per_n_shreds, ErasureSetId, Result as ShredResult, Shred, ShredId, ShredType, Shredder, SHRED_PAYLOAD_SIZE, }, + slot_stats::{ShredSource, SlotsStats}, }, bincode::deserialize, crossbeam_channel::{bounded, Receiver, Sender, TrySendError}, @@ -50,7 +51,7 @@ use { borrow::Cow, cell::RefCell, cmp, - collections::{hash_map::Entry as HashMapEntry, BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{hash_map::Entry as HashMapEntry, BTreeSet, HashMap, HashSet}, convert::TryInto, fs, io::{Error as IoError, ErrorKind}, @@ -60,7 +61,6 @@ use { atomic::{AtomicBool, Ordering}, Arc, Mutex, RwLock, RwLockWriteGuard, }, - time::Instant, }, tempfile::{Builder, TempDir}, thiserror::Error, @@ -181,26 +181,6 @@ pub struct Blockstore { column_options: LedgerColumnOptions, } -struct SlotsStats { - last_cleanup_ts: Instant, - stats: BTreeMap, -} - -impl Default for SlotsStats { - fn default() -> Self { - SlotsStats { - last_cleanup_ts: Instant::now(), - stats: BTreeMap::new(), - } - } -} - -#[derive(Default)] -struct SlotStats { - num_repaired: usize, - num_recovered: usize, -} - pub struct IndexMetaWorkingSetEntry { index: Index, // true only if at least one shred for this Index was inserted since the time this @@ -223,13 +203,6 @@ pub struct SlotMetaWorkingSetEntry { did_insert_occur: bool, } -#[derive(PartialEq, Debug, Clone)] -enum ShredSource { - Turbine, - Repaired, - Recovered, -} - #[derive(Default)] pub struct BlockstoreInsertionMetrics { pub num_shreds: usize, @@ -1250,13 +1223,13 @@ impl Blockstore { let mut newly_completed_data_sets: Vec = vec![]; let mut inserted_indices = Vec::new(); for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() { + let shred_source = if is_repaired { + ShredSource::Repaired + } else { + ShredSource::Turbine + }; match shred.shred_type() { ShredType::Data => { - let shred_source = if is_repaired { - ShredSource::Repaired - } else { - ShredSource::Turbine - }; match self.check_insert_data_shred( shred, &mut erasure_metas, @@ -1295,7 +1268,7 @@ impl Blockstore { &mut index_meta_time, handle_duplicate, is_trusted, - is_repaired, + shred_source, metrics, ); } @@ -1480,7 +1453,7 @@ impl Blockstore { index_meta_time: &mut u64, handle_duplicate: &F, is_trusted: bool, - is_repaired: bool, + shred_source: ShredSource, metrics: &mut BlockstoreInsertionMetrics, ) -> bool where @@ -1547,13 +1520,10 @@ impl Blockstore { return false; } - - if is_repaired { - let mut slots_stats = self.slots_stats.lock().unwrap(); - let mut e = slots_stats.stats.entry(slot).or_default(); - e.num_repaired += 1; - } - + self.slots_stats + .lock() + .unwrap() + .add_shred(slot, shred_source); // insert coding shred into rocks let result = self .insert_coding_shred(index_meta, &shred, write_batch) @@ -1699,7 +1669,7 @@ impl Blockstore { just_inserted_shreds, &self.last_root, leader_schedule, - shred_source.clone(), + shred_source, ) { return Err(InsertDataShredError::InvalidShred); } @@ -1971,49 +1941,12 @@ impl Blockstore { end_index, }) .collect(); - if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered { + { let mut slots_stats = self.slots_stats.lock().unwrap(); - let mut e = slots_stats.stats.entry(slot_meta.slot).or_default(); - if shred_source == ShredSource::Repaired { - e.num_repaired += 1; + slots_stats.add_shred(slot_meta.slot, shred_source); + if slot_meta.is_full() { + slots_stats.set_full(slot_meta); } - if shred_source == ShredSource::Recovered { - e.num_recovered += 1; - } - } - if slot_meta.is_full() { - let (num_repaired, num_recovered) = { - let mut slots_stats = self.slots_stats.lock().unwrap(); - if let Some(e) = slots_stats.stats.remove(&slot_meta.slot) { - if slots_stats.last_cleanup_ts.elapsed().as_secs() > 30 { - let root = self.last_root(); - slots_stats.stats = slots_stats.stats.split_off(&root); - slots_stats.last_cleanup_ts = Instant::now(); - } - (e.num_repaired, e.num_recovered) - } else { - (0, 0) - } - }; - datapoint_info!( - "shred_insert_is_full", - ( - "total_time_ms", - solana_sdk::timing::timestamp() - slot_meta.first_shred_timestamp, - i64 - ), - ("slot", slot_meta.slot, i64), - ( - "last_index", - slot_meta - .last_index - .and_then(|ix| i64::try_from(ix).ok()) - .unwrap_or(-1), - i64 - ), - ("num_repaired", num_repaired, i64), - ("num_recovered", num_recovered, i64), - ); } trace!("inserted shred into slot {:?} and index {:?}", slot, index); Ok(newly_completed_data_sets) @@ -6354,7 +6287,7 @@ pub mod tests { panic!("no dupes"); }, false, - false, + ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), )); @@ -6372,7 +6305,7 @@ pub mod tests { counter.fetch_add(1, Ordering::Relaxed); }, false, - false, + ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), )); assert_eq!(counter.load(Ordering::Relaxed), 1); diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 75b51a77ff..8931c46788 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -8,6 +8,7 @@ pub mod bigtable_delete; pub mod bigtable_upload; pub mod bigtable_upload_service; pub mod block_error; +mod slot_stats; #[macro_use] pub mod blockstore; pub mod ancestor_iterator; diff --git a/ledger/src/slot_stats.rs b/ledger/src/slot_stats.rs new file mode 100644 index 0000000000..13e90fe19e --- /dev/null +++ b/ledger/src/slot_stats.rs @@ -0,0 +1,90 @@ +use { + crate::blockstore_meta::SlotMeta, bitflags::bitflags, lru::LruCache, solana_sdk::clock::Slot, +}; + +const SLOTS_STATS_CACHE_CAPACITY: usize = 300; + +macro_rules! get_mut_entry ( + ($cache:expr, $key:expr) => ( + match $cache.get_mut(&$key) { + Some(entry) => entry, + None => { + $cache.put($key, SlotStats::default()); + $cache.get_mut(&$key).unwrap() + } + } + ); +); + +#[derive(Copy, Clone, Debug)] +pub(crate) enum ShredSource { + Turbine, + Repaired, + Recovered, +} + +bitflags! { + #[derive(Default)] + struct SlotFlags: u8 { + const DEAD = 0b00000001; + const FULL = 0b00000010; + const ROOTED = 0b00000100; + } +} + +#[derive(Default)] +struct SlotStats { + flags: SlotFlags, + num_repaired: usize, + num_recovered: usize, +} + +pub(crate) struct SlotsStats(LruCache); + +impl Default for SlotsStats { + fn default() -> Self { + // LruCache::unbounded because capacity is enforced manually. + Self(LruCache::unbounded()) + } +} + +impl SlotsStats { + pub(crate) fn add_shred(&mut self, slot: Slot, source: ShredSource) { + let entry = get_mut_entry!(self.0, slot); + match source { + ShredSource::Turbine => (), + ShredSource::Repaired => entry.num_repaired += 1, + ShredSource::Recovered => entry.num_recovered += 1, + } + self.maybe_evict_cache(); + } + + pub(crate) fn set_full(&mut self, slot_meta: &SlotMeta) { + let total_time_ms = + solana_sdk::timing::timestamp().saturating_sub(slot_meta.first_shred_timestamp); + let last_index = slot_meta + .last_index + .and_then(|ix| i64::try_from(ix).ok()) + .unwrap_or(-1); + let entry = get_mut_entry!(self.0, slot_meta.slot); + if !entry.flags.contains(SlotFlags::FULL) { + datapoint_info!( + "shred_insert_is_full", + ("total_time_ms", total_time_ms, i64), + ("slot", slot_meta.slot, i64), + ("last_index", last_index, i64), + ("num_repaired", entry.num_repaired, i64), + ("num_recovered", entry.num_recovered, i64), + ); + } + entry.flags |= SlotFlags::FULL; + self.maybe_evict_cache(); + } + + fn maybe_evict_cache(&mut self) { + while self.0.len() > SLOTS_STATS_CACHE_CAPACITY { + let (_slot, _entry) = self.0.pop_lru().unwrap(); + // TODO: submit metrics for (slot, entry). + } + } +}