From 4fb77183ef8acb5a6eeedfe9e21a1c67765b5c9c Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Wed, 22 Sep 2021 12:40:30 -0500 Subject: [PATCH] AcctIdx: move background() to bucket holder (#20085) --- runtime/src/accounts_index_storage.rs | 39 ++------------------------ runtime/src/bucket_map_holder.rs | 29 +++++++++++++++++-- runtime/src/bucket_map_holder_stats.rs | 5 ++-- 3 files changed, 31 insertions(+), 42 deletions(-) diff --git a/runtime/src/accounts_index_storage.rs b/runtime/src/accounts_index_storage.rs index bc03d2f57e..911fa53efc 100644 --- a/runtime/src/accounts_index_storage.rs +++ b/runtime/src/accounts_index_storage.rs @@ -1,8 +1,7 @@ use crate::accounts_index::{AccountsIndexConfig, IndexValue}; -use crate::bucket_map_holder::{BucketMapHolder, AGE_MS}; +use crate::bucket_map_holder::BucketMapHolder; use crate::in_mem_accounts_index::InMemAccountsIndex; use std::fmt::Debug; -use std::time::Duration; use std::{ sync::{ atomic::{AtomicBool, Ordering}, @@ -71,7 +70,7 @@ impl AccountsIndexStorage { Builder::new() .name("solana-idx-flusher".to_string()) .spawn(move || { - Self::background(storage_, exit_, in_mem_); + storage_.background(exit_, in_mem_); }) .unwrap() }) @@ -85,38 +84,4 @@ impl AccountsIndexStorage { in_mem, } } - - pub fn storage(&self) -> &Arc> { - &self.storage - } - - // intended to execute in a bg thread - pub fn background( - storage: Arc>, - exit: Arc, - in_mem: Vec>>, - ) { - let bins = in_mem.len(); - let flush = storage.disk.is_some(); - loop { - // this will transition to thread throttling - storage - .wait_dirty_or_aged - .wait_timeout(Duration::from_millis(AGE_MS)); - if exit.load(Ordering::Relaxed) { - break; - } - - storage.maybe_advance_age(); - storage.stats.active_threads.fetch_add(1, Ordering::Relaxed); - for _ in 0..bins { - if flush { - let index = storage.next_bucket_to_flush(); - in_mem[index].flush(); - } - storage.stats.report_stats(&storage); - } - storage.stats.active_threads.fetch_sub(1, Ordering::Relaxed); - } - } } diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs index 315a122305..9fd2c5c932 100644 --- a/runtime/src/bucket_map_holder.rs +++ b/runtime/src/bucket_map_holder.rs @@ -1,13 +1,14 @@ use crate::accounts_index::{AccountsIndexConfig, IndexValue}; use crate::bucket_map_holder_stats::BucketMapHolderStats; -use crate::in_mem_accounts_index::SlotT; +use crate::in_mem_accounts_index::{InMemAccountsIndex, SlotT}; use crate::waitable_condvar::WaitableCondvar; use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig}; use solana_sdk::clock::SLOT_MS; use solana_sdk::timing::AtomicInterval; use std::fmt::Debug; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; +use std::time::Duration; pub type Age = u8; pub const AGE_MS: u64 = SLOT_MS; // match one age per slot time @@ -150,6 +151,30 @@ impl BucketMapHolder { *lock = (result + 1) % self.bins; result } + + // intended to execute in a bg thread + pub fn background(&self, exit: Arc, in_mem: Vec>>) { + let bins = in_mem.len(); + let flush = self.disk.is_some(); + loop { + // this will transition to waits and thread throttling + self.wait_dirty_or_aged + .wait_timeout(Duration::from_millis(AGE_MS)); + if exit.load(Ordering::Relaxed) { + break; + } + + self.stats.active_threads.fetch_add(1, Ordering::Relaxed); + for _ in 0..bins { + if flush { + let index = self.next_bucket_to_flush(); + in_mem[index].flush(); + } + self.stats.report_stats(self); + } + self.stats.active_threads.fetch_sub(1, Ordering::Relaxed); + } + } } #[cfg(test)] diff --git a/runtime/src/bucket_map_holder_stats.rs b/runtime/src/bucket_map_holder_stats.rs index 6073b03ecc..a04abe816d 100644 --- a/runtime/src/bucket_map_holder_stats.rs +++ b/runtime/src/bucket_map_holder_stats.rs @@ -3,7 +3,6 @@ use crate::bucket_map_holder::BucketMapHolder; use solana_sdk::timing::{timestamp, AtomicInterval}; use std::fmt::Debug; use std::sync::atomic::{AtomicU64, AtomicU8, Ordering}; -use std::sync::Arc; #[derive(Debug, Default)] pub struct BucketMapHolderStats { @@ -63,7 +62,7 @@ impl BucketMapHolderStats { now.saturating_sub(last) // could saturate to 0. That is ok. } - fn ms_per_age(&self, storage: &Arc>) -> u64 { + fn ms_per_age(&self, storage: &BucketMapHolder) -> u64 { let elapsed_ms = self.get_elapsed_ms_and_reset(); let mut age_now = storage.current_age(); let last_age = self.last_age.swap(age_now, Ordering::Relaxed); @@ -79,7 +78,7 @@ impl BucketMapHolderStats { } } - pub fn report_stats(&self, storage: &Arc>) { + pub fn report_stats(&self, storage: &BucketMapHolder) { // account index stats every 10 s if !self.last_time.should_update(10_000) { return;