allows only one thread to update cluster-nodes cache entry for an epoch
If two threads simultaneously call into ClusterNodesCache::get for the same epoch, and the cache entry is outdated, then both threads recompute cluster-nodes for the epoch and redundantly overwrite each other. This commit wraps ClusterNodesCache entries in Arc<Mutex<...>>, so that when needed only one thread does the computations to update the entry.
This commit is contained in:
@ -18,6 +18,7 @@ use {
|
||||
cmp::Reverse,
|
||||
collections::HashMap,
|
||||
marker::PhantomData,
|
||||
ops::Deref,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, Instant},
|
||||
},
|
||||
@ -47,9 +48,12 @@ pub struct ClusterNodes<T> {
|
||||
_phantom: PhantomData<T>,
|
||||
}
|
||||
|
||||
type CacheEntry<T> = Option<(/*as of:*/ Instant, Arc<ClusterNodes<T>>)>;
|
||||
|
||||
pub(crate) struct ClusterNodesCache<T> {
|
||||
#[allow(clippy::type_complexity)]
|
||||
cache: Mutex<LruCache<Epoch, (Instant, Arc<ClusterNodes<T>>)>>,
|
||||
// Cache entries are wrapped in Arc<Mutex<...>>, so that, when needed, only
|
||||
// one thread does the computations to update the entry for the epoch.
|
||||
cache: Mutex<LruCache<Epoch, Arc<Mutex<CacheEntry<T>>>>>,
|
||||
ttl: Duration, // Time to live.
|
||||
}
|
||||
|
||||
@ -241,6 +245,18 @@ impl<T> ClusterNodesCache<T> {
|
||||
}
|
||||
|
||||
impl<T: 'static> ClusterNodesCache<T> {
|
||||
fn get_cache_entry(&self, epoch: Epoch) -> Arc<Mutex<CacheEntry<T>>> {
|
||||
let mut cache = self.cache.lock().unwrap();
|
||||
match cache.get(&epoch) {
|
||||
Some(entry) => Arc::clone(entry),
|
||||
None => {
|
||||
let entry = Arc::default();
|
||||
cache.put(epoch, Arc::clone(&entry));
|
||||
entry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get(
|
||||
&self,
|
||||
shred_slot: Slot,
|
||||
@ -248,13 +264,13 @@ impl<T: 'static> ClusterNodesCache<T> {
|
||||
cluster_info: &ClusterInfo,
|
||||
) -> Arc<ClusterNodes<T>> {
|
||||
let epoch = root_bank.get_leader_schedule_epoch(shred_slot);
|
||||
{
|
||||
let mut cache = self.cache.lock().unwrap();
|
||||
if let Some((asof, nodes)) = cache.get(&epoch) {
|
||||
if asof.elapsed() < self.ttl {
|
||||
return Arc::clone(nodes);
|
||||
}
|
||||
cache.pop(&epoch);
|
||||
let entry = self.get_cache_entry(epoch);
|
||||
// Hold the lock on the entry here so that, if needed, only
|
||||
// one thread recomputes cluster-nodes for this epoch.
|
||||
let mut entry = entry.lock().unwrap();
|
||||
if let Some((asof, nodes)) = entry.deref() {
|
||||
if asof.elapsed() < self.ttl {
|
||||
return Arc::clone(nodes);
|
||||
}
|
||||
}
|
||||
let epoch_staked_nodes = root_bank.epoch_staked_nodes(epoch);
|
||||
@ -269,10 +285,7 @@ impl<T: 'static> ClusterNodesCache<T> {
|
||||
cluster_info,
|
||||
&epoch_staked_nodes.unwrap_or_default(),
|
||||
));
|
||||
{
|
||||
let mut cache = self.cache.lock().unwrap();
|
||||
cache.put(epoch, (Instant::now(), Arc::clone(&nodes)));
|
||||
}
|
||||
*entry = Some((Instant::now(), Arc::clone(&nodes)));
|
||||
nodes
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user