use rand::{thread_rng, Rng}; use solana_measure::measure::Measure; use std::{ sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex, Weak, }, time::Instant, }; pub const DEFAULT_MINIMUM_OBJECT_COUNT: u32 = 1000; pub const DEFAULT_SHRINK_PCT: u32 = 80; pub const DEFAULT_MAX_ABOVE_SHRINK_PCT_COUNT: u32 = 10; pub const DEFAULT_CHECK_SHRINK_INTERVAL_MS: u32 = 10000; enum AllocationDecision { Reuse(T), Allocate(u32, usize), AllocationLimitReached, } #[derive(Debug, Default)] struct RecyclerStats { total: AtomicUsize, freed: AtomicUsize, reuse: AtomicUsize, max_gc: AtomicUsize, } #[derive(Debug, Default)] struct RecyclerShrinkStats { resulting_size: u32, target_size: u32, ideal_num_to_remove: u32, shrink_elapsed: u64, drop_elapsed: u64, } impl RecyclerShrinkStats { fn report(&self, shrink_metric_name: &'static str) { datapoint_info!( shrink_metric_name, ("target_size", self.target_size as i64, i64), ("resulting_size", self.resulting_size as i64, i64), ("ideal_num_to_remove", self.ideal_num_to_remove as i64, i64), ("recycler_shrink_elapsed", self.shrink_elapsed as i64, i64), ("drop_elapsed", self.drop_elapsed as i64, i64) ); } } #[derive(Clone)] pub struct Recycler { recycler: Arc>, shrink_metric_name: &'static str, } impl Recycler { pub fn new_without_limit(shrink_metric_name: &'static str) -> Self { Self { recycler: Arc::new(RecyclerX::default()), shrink_metric_name, } } pub fn new_with_limit(shrink_metric_name: &'static str, limit: u32) -> Self { Self { recycler: Arc::new(RecyclerX::new(Some(limit))), shrink_metric_name, } } } #[derive(Debug)] pub struct ObjectPool { objects: Vec, shrink_pct: u32, minimum_object_count: u32, above_shrink_pct_count: u32, max_above_shrink_pct_count: u32, check_shrink_interval_ms: u32, last_shrink_check_time: Instant, pub total_allocated_count: u32, limit: Option, } impl Default for ObjectPool { fn default() -> Self { ObjectPool { objects: vec![], shrink_pct: DEFAULT_SHRINK_PCT, minimum_object_count: DEFAULT_MINIMUM_OBJECT_COUNT, above_shrink_pct_count: 0, max_above_shrink_pct_count: DEFAULT_MAX_ABOVE_SHRINK_PCT_COUNT, check_shrink_interval_ms: DEFAULT_CHECK_SHRINK_INTERVAL_MS, last_shrink_check_time: Instant::now(), total_allocated_count: 0, limit: None, } } } impl ObjectPool { fn new(limit: Option) -> Self { Self { limit, ..Self::default() } } fn len(&self) -> usize { self.objects.len() } fn get_shrink_target(shrink_pct: u32, current_size: u32) -> u32 { ((shrink_pct * current_size) + 99) / 100 } fn shrink_if_necessary( &mut self, recycler_name: &'static str, ) -> Option<(RecyclerShrinkStats, Vec)> { let is_consistent = self.total_allocated_count as usize >= self.len(); assert!( is_consistent, "Object pool inconsistent: {} {} {}", self.total_allocated_count, self.len(), recycler_name ); if self.last_shrink_check_time.elapsed().as_millis() > self.check_shrink_interval_ms as u128 { self.last_shrink_check_time = Instant::now(); let shrink_threshold_count = Self::get_shrink_target(self.shrink_pct, self.total_allocated_count); // If more than the shrink threshold of all allocated objects are sitting doing nothing, // increment the `above_shrink_pct_count`. if self.len() > self.minimum_object_count as usize && self.len() > shrink_threshold_count as usize { self.above_shrink_pct_count += 1; } else { self.above_shrink_pct_count = 0; } if self.above_shrink_pct_count as usize >= self.max_above_shrink_pct_count as usize { let mut recycler_shrink_elapsed = Measure::start("recycler_shrink"); // Do the shrink let target_size = std::cmp::max(self.minimum_object_count, shrink_threshold_count); let ideal_num_to_remove = self.total_allocated_count - target_size; let mut shrink_removed_objects = Vec::with_capacity(ideal_num_to_remove as usize); for _ in 0..ideal_num_to_remove { if let Some(mut expired_object) = self.objects.pop() { expired_object.unset_recycler(); // Drop these outside of the lock because the Drop() implmentation for // certain objects like PinnedVec's can be expensive shrink_removed_objects.push(expired_object); // May not be able to shrink exactly `ideal_num_to_remove` objects since // in the case of new allocations, `total_allocated_count` is incremented // before the object is allocated (see `should_allocate_new` logic below). // This race allows a difference of up to the number of threads allocating // with this recycler. self.total_allocated_count -= 1; } else { break; } } recycler_shrink_elapsed.stop(); self.above_shrink_pct_count = 0; Some(( RecyclerShrinkStats { resulting_size: self.total_allocated_count, target_size, ideal_num_to_remove, shrink_elapsed: recycler_shrink_elapsed.as_us(), // Filled in later drop_elapsed: 0, }, shrink_removed_objects, )) } else { None } } else { None } } fn make_allocation_decision(&mut self) -> AllocationDecision { if let Some(reused_object) = self.objects.pop() { AllocationDecision::Reuse(reused_object) } else if let Some(limit) = self.limit { if self.total_allocated_count < limit { self.total_allocated_count += 1; AllocationDecision::Allocate(self.total_allocated_count, self.len()) } else { AllocationDecision::AllocationLimitReached } } else { self.total_allocated_count += 1; AllocationDecision::Allocate(self.total_allocated_count, self.len()) } } } #[derive(Debug)] pub struct RecyclerX { gc: Mutex>, stats: RecyclerStats, id: usize, } impl Default for RecyclerX { fn default() -> RecyclerX { let id = thread_rng().gen_range(0, 1000); trace!("new recycler..{}", id); RecyclerX { gc: Mutex::new(ObjectPool::default()), stats: RecyclerStats::default(), id, } } } impl RecyclerX { fn new(limit: Option) -> Self { RecyclerX { gc: Mutex::new(ObjectPool::new(limit)), ..Self::default() } } } pub trait Reset { fn reset(&mut self); fn warm(&mut self, size_hint: usize); fn set_recycler(&mut self, recycler: Weak>) where Self: std::marker::Sized; fn unset_recycler(&mut self) where Self: std::marker::Sized; } lazy_static! { static ref WARM_RECYCLERS: AtomicBool = AtomicBool::new(false); } pub fn enable_recycler_warming() { WARM_RECYCLERS.store(true, Ordering::Relaxed); } fn warm_recyclers() -> bool { WARM_RECYCLERS.load(Ordering::Relaxed) } impl Recycler { pub fn warmed( num: u32, size_hint: usize, limit: Option, shrink_metric_name: &'static str, ) -> Self { assert!(num <= limit.unwrap_or(std::u32::MAX)); let new = Self { recycler: Arc::new(RecyclerX::new(limit)), shrink_metric_name, }; if warm_recyclers() { let warmed_items: Vec<_> = (0..num) .map(|_| { let mut item = new.allocate().unwrap(); item.warm(size_hint); item }) .collect(); warmed_items .into_iter() .for_each(|i| new.recycler.recycle(i)); } new } pub fn allocate(&self) -> Option { let (allocation_decision, shrink_output) = { let mut object_pool = self .recycler .gc .lock() .expect("recycler lock in pb fn allocate"); let shrink_output = object_pool.shrink_if_necessary(self.shrink_metric_name); // Grab the allocation decision and shrinking stats, do the expensive // allocations/deallocations outside of the lock. (object_pool.make_allocation_decision(), shrink_output) }; if let Some((mut shrink_stats, shrink_removed_objects)) = shrink_output { let mut shrink_removed_object_elapsed = Measure::start("shrink_removed_object_elapsed"); drop(shrink_removed_objects); shrink_removed_object_elapsed.stop(); shrink_stats.drop_elapsed = shrink_removed_object_elapsed.as_us(); shrink_stats.report(self.shrink_metric_name); } match allocation_decision { AllocationDecision::Reuse(mut reused_object) => { self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); reused_object.reset(); Some(reused_object) } AllocationDecision::Allocate(total_allocated_count, recycled_len) => { let mut t = T::default(); t.set_recycler(Arc::downgrade(&self.recycler)); if total_allocated_count % 1000 == 0 { datapoint_info!( "recycler_total_allocated_count", ("name", self.shrink_metric_name, String), ("count", total_allocated_count as i64, i64), ("recycled_len", recycled_len as i64, i64), ) } Some(t) } AllocationDecision::AllocationLimitReached => None, } } pub fn recycle_for_test(&self, x: T) { self.recycler.recycle(x); } } impl RecyclerX { pub fn recycle(&self, x: T) { let len = { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); gc.objects.push(x); gc.len() }; let max_gc = self.stats.max_gc.load(Ordering::Relaxed); if len > max_gc { // this is not completely accurate, but for most cases should be fine. let _ = self.stats.max_gc.compare_exchange( max_gc, len, Ordering::Relaxed, Ordering::Relaxed, ); } let total = self.stats.total.load(Ordering::Relaxed); let reuse = self.stats.reuse.load(Ordering::Relaxed); let freed = self.stats.total.fetch_add(1, Ordering::Relaxed); datapoint_debug!( "recycler", ("gc_len", len as i64, i64), ("total", total as i64, i64), ("freed", freed as i64, i64), ("reuse", reuse as i64, i64), ); } } #[cfg(test)] mod tests { use super::*; use crate::packet::PacketsRecycler; use std::{thread::sleep, time::Duration}; impl Reset for u64 { fn reset(&mut self) { *self = 10; } fn warm(&mut self, _size_hint: usize) {} fn set_recycler(&mut self, _recycler: Weak>) {} fn unset_recycler(&mut self) {} } #[test] fn test_recycler() { let recycler = Recycler::new_without_limit(""); let mut y: u64 = recycler.allocate().unwrap(); assert_eq!(y, 0); y = 20; let recycler2 = recycler.clone(); recycler2.recycler.recycle(y); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1); let z = recycler.allocate().unwrap(); assert_eq!(z, 10); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); } #[test] fn test_recycler_limit() { let limit = 10; assert!(limit <= DEFAULT_MINIMUM_OBJECT_COUNT); // Use PacketRecycler so that dropping the allocated object // actually recycles let recycler = PacketsRecycler::new_with_limit("", limit); let mut allocated_items = vec![]; for i in 0..limit * 2 { let x = recycler.allocate(); if i < limit { allocated_items.push(x.unwrap()); } else { assert!(x.is_none()); } } assert_eq!( recycler.recycler.gc.lock().unwrap().total_allocated_count, limit ); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0_usize); drop(allocated_items); assert_eq!( recycler.recycler.gc.lock().unwrap().total_allocated_count, limit ); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), limit as usize); } #[test] fn test_recycler_shrink() { let limit = DEFAULT_MINIMUM_OBJECT_COUNT * 2; let max_above_shrink_pct_count = 2; let shrink_pct = 80; let recycler = PacketsRecycler::new_with_limit("", limit); { let mut locked_recycler = recycler.recycler.gc.lock().unwrap(); // Make the shrink interval a long time so shrinking doesn't happen yet locked_recycler.check_shrink_interval_ms = std::u32::MAX; // Set the count to one so that we shrink on every other allocation later. locked_recycler.max_above_shrink_pct_count = max_above_shrink_pct_count; locked_recycler.shrink_pct = shrink_pct; } let mut allocated_items = vec![]; for _ in 0..limit { allocated_items.push(recycler.allocate().unwrap()); } assert_eq!( recycler.recycler.gc.lock().unwrap().total_allocated_count, limit ); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); drop(allocated_items); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), limit as usize); let shrink_interval = 10; { let mut locked_recycler = recycler.recycler.gc.lock().unwrap(); locked_recycler.check_shrink_interval_ms = shrink_interval; } let mut current_total_allocated_count = recycler.recycler.gc.lock().unwrap().total_allocated_count; // Shrink the recycler until it hits the minimum let mut i = 0; while current_total_allocated_count != DEFAULT_MINIMUM_OBJECT_COUNT { sleep(Duration::from_millis(shrink_interval as u64 * 2)); recycler.allocate().unwrap(); let expected_above_shrink_pct_count = (i + 1) % max_above_shrink_pct_count; assert_eq!( recycler.recycler.gc.lock().unwrap().above_shrink_pct_count, (i + 1) % max_above_shrink_pct_count ); if expected_above_shrink_pct_count == 0 { // Shrink happened, update the expected `current_total_allocated_count`; current_total_allocated_count = std::cmp::max( ObjectPool::::get_shrink_target(shrink_pct, current_total_allocated_count), DEFAULT_MINIMUM_OBJECT_COUNT, ); assert_eq!( recycler.recycler.gc.lock().unwrap().total_allocated_count, current_total_allocated_count ); assert_eq!( recycler.recycler.gc.lock().unwrap().len(), current_total_allocated_count as usize ); } i += 1; } } }