AcctIdx: move background() to bucket holder (#20085)
This commit is contained in:
parent
681e8728a1
commit
4fb77183ef
@ -1,8 +1,7 @@
|
|||||||
use crate::accounts_index::{AccountsIndexConfig, IndexValue};
|
use crate::accounts_index::{AccountsIndexConfig, IndexValue};
|
||||||
use crate::bucket_map_holder::{BucketMapHolder, AGE_MS};
|
use crate::bucket_map_holder::BucketMapHolder;
|
||||||
use crate::in_mem_accounts_index::InMemAccountsIndex;
|
use crate::in_mem_accounts_index::InMemAccountsIndex;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::time::Duration;
|
|
||||||
use std::{
|
use std::{
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
@ -71,7 +70,7 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
|
|||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-idx-flusher".to_string())
|
.name("solana-idx-flusher".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
Self::background(storage_, exit_, in_mem_);
|
storage_.background(exit_, in_mem_);
|
||||||
})
|
})
|
||||||
.unwrap()
|
.unwrap()
|
||||||
})
|
})
|
||||||
@ -85,38 +84,4 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
|
|||||||
in_mem,
|
in_mem,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn storage(&self) -> &Arc<BucketMapHolder<T>> {
|
|
||||||
&self.storage
|
|
||||||
}
|
|
||||||
|
|
||||||
// intended to execute in a bg thread
|
|
||||||
pub fn background(
|
|
||||||
storage: Arc<BucketMapHolder<T>>,
|
|
||||||
exit: Arc<AtomicBool>,
|
|
||||||
in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
|
|
||||||
) {
|
|
||||||
let bins = in_mem.len();
|
|
||||||
let flush = storage.disk.is_some();
|
|
||||||
loop {
|
|
||||||
// this will transition to thread throttling
|
|
||||||
storage
|
|
||||||
.wait_dirty_or_aged
|
|
||||||
.wait_timeout(Duration::from_millis(AGE_MS));
|
|
||||||
if exit.load(Ordering::Relaxed) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
storage.maybe_advance_age();
|
|
||||||
storage.stats.active_threads.fetch_add(1, Ordering::Relaxed);
|
|
||||||
for _ in 0..bins {
|
|
||||||
if flush {
|
|
||||||
let index = storage.next_bucket_to_flush();
|
|
||||||
in_mem[index].flush();
|
|
||||||
}
|
|
||||||
storage.stats.report_stats(&storage);
|
|
||||||
}
|
|
||||||
storage.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,13 +1,14 @@
|
|||||||
use crate::accounts_index::{AccountsIndexConfig, IndexValue};
|
use crate::accounts_index::{AccountsIndexConfig, IndexValue};
|
||||||
use crate::bucket_map_holder_stats::BucketMapHolderStats;
|
use crate::bucket_map_holder_stats::BucketMapHolderStats;
|
||||||
use crate::in_mem_accounts_index::SlotT;
|
use crate::in_mem_accounts_index::{InMemAccountsIndex, SlotT};
|
||||||
use crate::waitable_condvar::WaitableCondvar;
|
use crate::waitable_condvar::WaitableCondvar;
|
||||||
use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig};
|
use solana_bucket_map::bucket_map::{BucketMap, BucketMapConfig};
|
||||||
use solana_sdk::clock::SLOT_MS;
|
use solana_sdk::clock::SLOT_MS;
|
||||||
use solana_sdk::timing::AtomicInterval;
|
use solana_sdk::timing::AtomicInterval;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
||||||
use std::sync::Mutex;
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::Duration;
|
||||||
pub type Age = u8;
|
pub type Age = u8;
|
||||||
|
|
||||||
pub const AGE_MS: u64 = SLOT_MS; // match one age per slot time
|
pub const AGE_MS: u64 = SLOT_MS; // match one age per slot time
|
||||||
@ -150,6 +151,30 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||||||
*lock = (result + 1) % self.bins;
|
*lock = (result + 1) % self.bins;
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// intended to execute in a bg thread
|
||||||
|
pub fn background(&self, exit: Arc<AtomicBool>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>) {
|
||||||
|
let bins = in_mem.len();
|
||||||
|
let flush = self.disk.is_some();
|
||||||
|
loop {
|
||||||
|
// this will transition to waits and thread throttling
|
||||||
|
self.wait_dirty_or_aged
|
||||||
|
.wait_timeout(Duration::from_millis(AGE_MS));
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.stats.active_threads.fetch_add(1, Ordering::Relaxed);
|
||||||
|
for _ in 0..bins {
|
||||||
|
if flush {
|
||||||
|
let index = self.next_bucket_to_flush();
|
||||||
|
in_mem[index].flush();
|
||||||
|
}
|
||||||
|
self.stats.report_stats(self);
|
||||||
|
}
|
||||||
|
self.stats.active_threads.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -3,7 +3,6 @@ use crate::bucket_map_holder::BucketMapHolder;
|
|||||||
use solana_sdk::timing::{timestamp, AtomicInterval};
|
use solana_sdk::timing::{timestamp, AtomicInterval};
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
|
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct BucketMapHolderStats {
|
pub struct BucketMapHolderStats {
|
||||||
@ -63,7 +62,7 @@ impl BucketMapHolderStats {
|
|||||||
now.saturating_sub(last) // could saturate to 0. That is ok.
|
now.saturating_sub(last) // could saturate to 0. That is ok.
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ms_per_age<T: IndexValue>(&self, storage: &Arc<BucketMapHolder<T>>) -> u64 {
|
fn ms_per_age<T: IndexValue>(&self, storage: &BucketMapHolder<T>) -> u64 {
|
||||||
let elapsed_ms = self.get_elapsed_ms_and_reset();
|
let elapsed_ms = self.get_elapsed_ms_and_reset();
|
||||||
let mut age_now = storage.current_age();
|
let mut age_now = storage.current_age();
|
||||||
let last_age = self.last_age.swap(age_now, Ordering::Relaxed);
|
let last_age = self.last_age.swap(age_now, Ordering::Relaxed);
|
||||||
@ -79,7 +78,7 @@ impl BucketMapHolderStats {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn report_stats<T: IndexValue>(&self, storage: &Arc<BucketMapHolder<T>>) {
|
pub fn report_stats<T: IndexValue>(&self, storage: &BucketMapHolder<T>) {
|
||||||
// account index stats every 10 s
|
// account index stats every 10 s
|
||||||
if !self.last_time.should_update(10_000) {
|
if !self.last_time.should_update(10_000) {
|
||||||
return;
|
return;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user