runtime: Executor usage counts retain only single-epoch memory (#17162)

This commit is contained in:
jon-chuang
2021-05-25 03:01:56 +08:00
committed by GitHub
parent e867d7f3b8
commit ad1f24d487

View File

@ -216,6 +216,17 @@ impl Clone for CowCachedExecutors {
} }
} }
impl CowCachedExecutors { impl CowCachedExecutors {
fn clone_with_epoch(&self, epoch: u64) -> Self {
let executors_raw = self.read().unwrap();
if executors_raw.current_epoch() == epoch {
self.clone()
} else {
Self {
shared: false,
executors: Arc::new(RwLock::new(executors_raw.clone_with_epoch(epoch))),
}
}
}
fn new(executors: Arc<RwLock<CachedExecutors>>) -> Self { fn new(executors: Arc<RwLock<CachedExecutors>>) -> Self {
Self { Self {
shared: true, shared: true,
@ -256,17 +267,24 @@ pub struct Builtins {
} }
const MAX_CACHED_EXECUTORS: usize = 100; // 10 MB assuming programs are around 100k const MAX_CACHED_EXECUTORS: usize = 100; // 10 MB assuming programs are around 100k
#[derive(Debug)]
/// LFU Cache of executors struct CachedExecutorsEntry {
prev_epoch_count: u64,
epoch_count: AtomicU64,
executor: Arc<dyn Executor>,
}
/// LFU Cache of executors with single-epoch memory of usage counts
#[derive(Debug)] #[derive(Debug)]
struct CachedExecutors { struct CachedExecutors {
max: usize, max: usize,
executors: HashMap<Pubkey, (AtomicU64, Arc<dyn Executor>)>, current_epoch: Epoch,
executors: HashMap<Pubkey, CachedExecutorsEntry>,
} }
impl Default for CachedExecutors { impl Default for CachedExecutors {
fn default() -> Self { fn default() -> Self {
Self { Self {
max: MAX_CACHED_EXECUTORS, max: MAX_CACHED_EXECUTORS,
current_epoch: 0,
executors: HashMap::new(), executors: HashMap::new(),
} }
} }
@ -284,30 +302,57 @@ impl AbiExample for CachedExecutors {
impl Clone for CachedExecutors { impl Clone for CachedExecutors {
fn clone(&self) -> Self { fn clone(&self) -> Self {
let mut executors = HashMap::new(); self.clone_with_epoch(self.current_epoch)
for (key, (count, executor)) in self.executors.iter() {
executors.insert(
*key,
(AtomicU64::new(count.load(Relaxed)), executor.clone()),
);
}
Self {
max: self.max,
executors,
}
} }
} }
impl CachedExecutors { impl CachedExecutors {
fn new(max: usize) -> Self { fn current_epoch(&self) -> Epoch {
self.current_epoch
}
fn clone_with_epoch(&self, epoch: Epoch) -> Self {
let mut executors = HashMap::new();
for (key, entry) in self.executors.iter() {
// The total_count = prev_epoch_count + epoch_count will be used for LFU eviction.
// If the epoch has changed, we store the prev_epoch_count and reset the epoch_count to 0.
if epoch > self.current_epoch {
executors.insert(
*key,
CachedExecutorsEntry {
prev_epoch_count: entry.epoch_count.load(Relaxed),
epoch_count: AtomicU64::new(0),
executor: entry.executor.clone(),
},
);
} else {
executors.insert(
*key,
CachedExecutorsEntry {
prev_epoch_count: entry.prev_epoch_count,
epoch_count: AtomicU64::new(entry.epoch_count.load(Relaxed)),
executor: entry.executor.clone(),
},
);
}
}
Self {
max: self.max,
current_epoch: epoch,
executors,
}
}
fn new(max: usize, current_epoch: Epoch) -> Self {
Self { Self {
max, max,
current_epoch,
executors: HashMap::new(), executors: HashMap::new(),
} }
} }
fn get(&self, pubkey: &Pubkey) -> Option<Arc<dyn Executor>> { fn get(&self, pubkey: &Pubkey) -> Option<Arc<dyn Executor>> {
self.executors.get(pubkey).map(|(count, executor)| { self.executors.get(pubkey).map(|entry| {
count.fetch_add(1, Relaxed); entry.epoch_count.fetch_add(1, Relaxed);
executor.clone() entry.executor.clone()
}) })
} }
fn put(&mut self, pubkey: &Pubkey, executor: Arc<dyn Executor>) { fn put(&mut self, pubkey: &Pubkey, executor: Arc<dyn Executor>) {
@ -315,8 +360,9 @@ impl CachedExecutors {
let mut least = u64::MAX; let mut least = u64::MAX;
let default_key = Pubkey::default(); let default_key = Pubkey::default();
let mut least_key = &default_key; let mut least_key = &default_key;
for (key, (count, _)) in self.executors.iter() {
let count = count.load(Relaxed); for (key, entry) in self.executors.iter() {
let count = entry.prev_epoch_count + entry.epoch_count.load(Relaxed);
if count < least { if count < least {
least = count; least = count;
least_key = key; least_key = key;
@ -325,9 +371,14 @@ impl CachedExecutors {
let least_key = *least_key; let least_key = *least_key;
let _ = self.executors.remove(&least_key); let _ = self.executors.remove(&least_key);
} }
let _ = self let _ = self.executors.insert(
.executors *pubkey,
.insert(*pubkey, (AtomicU64::new(0), executor)); CachedExecutorsEntry {
prev_epoch_count: 0,
epoch_count: AtomicU64::new(0),
executor,
},
);
} }
fn remove(&mut self, pubkey: &Pubkey) { fn remove(&mut self, pubkey: &Pubkey) {
let _ = self.executors.remove(pubkey); let _ = self.executors.remove(pubkey);
@ -1107,7 +1158,9 @@ impl Bank {
lazy_rent_collection: AtomicBool::new(parent.lazy_rent_collection.load(Relaxed)), lazy_rent_collection: AtomicBool::new(parent.lazy_rent_collection.load(Relaxed)),
no_stake_rewrite: AtomicBool::new(parent.no_stake_rewrite.load(Relaxed)), no_stake_rewrite: AtomicBool::new(parent.no_stake_rewrite.load(Relaxed)),
rewards_pool_pubkeys: parent.rewards_pool_pubkeys.clone(), rewards_pool_pubkeys: parent.rewards_pool_pubkeys.clone(),
cached_executors: RwLock::new((*parent.cached_executors.read().unwrap()).clone()), cached_executors: RwLock::new(
(*parent.cached_executors.read().unwrap()).clone_with_epoch(epoch),
),
transaction_debug_keys: parent.transaction_debug_keys.clone(), transaction_debug_keys: parent.transaction_debug_keys.clone(),
transaction_log_collector_config: parent.transaction_log_collector_config.clone(), transaction_log_collector_config: parent.transaction_log_collector_config.clone(),
transaction_log_collector: Arc::new(RwLock::new(TransactionLogCollector::default())), transaction_log_collector: Arc::new(RwLock::new(TransactionLogCollector::default())),
@ -1263,7 +1316,7 @@ impl Bank {
no_stake_rewrite: new(), no_stake_rewrite: new(),
rewards_pool_pubkeys: new(), rewards_pool_pubkeys: new(),
cached_executors: RwLock::new(CowCachedExecutors::new(Arc::new(RwLock::new( cached_executors: RwLock::new(CowCachedExecutors::new(Arc::new(RwLock::new(
CachedExecutors::new(MAX_CACHED_EXECUTORS), CachedExecutors::new(MAX_CACHED_EXECUTORS, fields.epoch),
)))), )))),
transaction_debug_keys: debug_keys, transaction_debug_keys: debug_keys,
transaction_log_collector_config: new(), transaction_log_collector_config: new(),
@ -11136,7 +11189,7 @@ pub(crate) mod tests {
let key3 = solana_sdk::pubkey::new_rand(); let key3 = solana_sdk::pubkey::new_rand();
let key4 = solana_sdk::pubkey::new_rand(); let key4 = solana_sdk::pubkey::new_rand();
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {}); let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let mut cache = CachedExecutors::new(3); let mut cache = CachedExecutors::new(3, 0);
cache.put(&key1, executor.clone()); cache.put(&key1, executor.clone());
cache.put(&key2, executor.clone()); cache.put(&key2, executor.clone());
@ -11164,6 +11217,41 @@ pub(crate) mod tests {
assert!(cache.get(&key4).is_some()); assert!(cache.get(&key4).is_some());
} }
#[test]
fn test_cached_executors_eviction() {
let key1 = solana_sdk::pubkey::new_rand();
let key2 = solana_sdk::pubkey::new_rand();
let key3 = solana_sdk::pubkey::new_rand();
let key4 = solana_sdk::pubkey::new_rand();
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let mut cache = CachedExecutors::new(3, 0);
assert!(cache.current_epoch == 0);
cache.put(&key1, executor.clone());
cache.put(&key2, executor.clone());
cache.put(&key3, executor.clone());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
cache = cache.clone_with_epoch(1);
assert!(cache.current_epoch == 1);
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_some());
cache.put(&key4, executor.clone());
assert!(cache.get(&key4).is_some());
assert!(cache.get(&key3).is_none());
cache = cache.clone_with_epoch(2);
assert!(cache.current_epoch == 2);
cache.put(&key3, executor.clone());
assert!(cache.get(&key3).is_some());
}
#[test] #[test]
fn test_bank_executor_cache() { fn test_bank_executor_cache() {
solana_logger::setup(); solana_logger::setup();