Copy-on-write semantics for cached executors can be implemented by a
simple Arc<CachedExecutors> as opposed to CowCachedExecutors:
https://github.com/solana-labs/solana/blob/f1e2598ba/runtime/src/bank.rs#L244-L247
This will also avoid the need for double locking as in:
https://github.com/solana-labs/solana/blob/f1e2598ba/runtime/src/bank.rs#L3490-L3491
https://github.com/solana-labs/solana/blob/f1e2598ba/runtime/src/bank.rs#L3525-L3526
(cherry picked from commit c2389fc209)
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
			
			
This commit is contained in:
		@@ -242,51 +242,6 @@ pub struct SquashTiming {
 | 
			
		||||
 | 
			
		||||
type EpochCount = u64;
 | 
			
		||||
 | 
			
		||||
/// Copy-on-write holder of CachedExecutors
 | 
			
		||||
#[derive(AbiExample, Debug, Default)]
 | 
			
		||||
struct CowCachedExecutors {
 | 
			
		||||
    shared: bool,
 | 
			
		||||
    executors: Arc<RwLock<CachedExecutors>>,
 | 
			
		||||
}
 | 
			
		||||
impl Clone for CowCachedExecutors {
 | 
			
		||||
    fn clone(&self) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            shared: true,
 | 
			
		||||
            executors: self.executors.clone(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
impl CowCachedExecutors {
 | 
			
		||||
    fn clone_with_epoch(&self, epoch: u64) -> Self {
 | 
			
		||||
        let executors_raw = self.read().unwrap();
 | 
			
		||||
        if executors_raw.current_epoch() == epoch {
 | 
			
		||||
            self.clone()
 | 
			
		||||
        } else {
 | 
			
		||||
            Self {
 | 
			
		||||
                shared: false,
 | 
			
		||||
                executors: Arc::new(RwLock::new(executors_raw.clone_with_epoch(epoch))),
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    fn new(executors: Arc<RwLock<CachedExecutors>>) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            shared: true,
 | 
			
		||||
            executors,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    fn read(&self) -> LockResult<RwLockReadGuard<CachedExecutors>> {
 | 
			
		||||
        self.executors.read()
 | 
			
		||||
    }
 | 
			
		||||
    fn write(&mut self) -> LockResult<RwLockWriteGuard<CachedExecutors>> {
 | 
			
		||||
        if self.shared {
 | 
			
		||||
            self.shared = false;
 | 
			
		||||
            let local_cache = (*self.executors.read().unwrap()).clone();
 | 
			
		||||
            self.executors = Arc::new(RwLock::new(local_cache));
 | 
			
		||||
        }
 | 
			
		||||
        self.executors.write()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const MAX_CACHED_EXECUTORS: usize = 100; // 10 MB assuming programs are around 100k
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct CachedExecutorsEntry {
 | 
			
		||||
@@ -305,8 +260,8 @@ impl Default for CachedExecutors {
 | 
			
		||||
    fn default() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            max: MAX_CACHED_EXECUTORS,
 | 
			
		||||
            current_epoch: 0,
 | 
			
		||||
            executors: HashMap::new(),
 | 
			
		||||
            current_epoch: Epoch::default(),
 | 
			
		||||
            executors: HashMap::default(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -323,45 +278,43 @@ impl AbiExample for CachedExecutors {
 | 
			
		||||
 | 
			
		||||
impl Clone for CachedExecutors {
 | 
			
		||||
    fn clone(&self) -> Self {
 | 
			
		||||
        self.clone_with_epoch(self.current_epoch)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
impl CachedExecutors {
 | 
			
		||||
    fn current_epoch(&self) -> Epoch {
 | 
			
		||||
        self.current_epoch
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn clone_with_epoch(&self, epoch: Epoch) -> Self {
 | 
			
		||||
        let mut executors = HashMap::new();
 | 
			
		||||
        for (key, entry) in self.executors.iter() {
 | 
			
		||||
            // The total_count = prev_epoch_count + epoch_count will be used for LFU eviction.
 | 
			
		||||
            // If the epoch has changed, we store the prev_epoch_count and reset the epoch_count to 0.
 | 
			
		||||
            if epoch > self.current_epoch {
 | 
			
		||||
                executors.insert(
 | 
			
		||||
                    *key,
 | 
			
		||||
                    CachedExecutorsEntry {
 | 
			
		||||
                        prev_epoch_count: entry.epoch_count.load(Relaxed),
 | 
			
		||||
                        epoch_count: AtomicU64::new(0),
 | 
			
		||||
                        executor: entry.executor.clone(),
 | 
			
		||||
                    },
 | 
			
		||||
                );
 | 
			
		||||
            } else {
 | 
			
		||||
                executors.insert(
 | 
			
		||||
                    *key,
 | 
			
		||||
                    CachedExecutorsEntry {
 | 
			
		||||
        let executors = self.executors.iter().map(|(&key, entry)| {
 | 
			
		||||
            let entry = CachedExecutorsEntry {
 | 
			
		||||
                prev_epoch_count: entry.prev_epoch_count,
 | 
			
		||||
                epoch_count: AtomicU64::new(entry.epoch_count.load(Relaxed)),
 | 
			
		||||
                executor: entry.executor.clone(),
 | 
			
		||||
                    },
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
            };
 | 
			
		||||
            (key, entry)
 | 
			
		||||
        });
 | 
			
		||||
        Self {
 | 
			
		||||
            max: self.max,
 | 
			
		||||
            current_epoch: epoch,
 | 
			
		||||
            executors,
 | 
			
		||||
            current_epoch: self.current_epoch,
 | 
			
		||||
            executors: executors.collect(),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl CachedExecutors {
 | 
			
		||||
    fn clone_with_epoch(self: &Arc<Self>, epoch: Epoch) -> Arc<Self> {
 | 
			
		||||
        if self.current_epoch == epoch {
 | 
			
		||||
            return self.clone();
 | 
			
		||||
        }
 | 
			
		||||
        let executors = self.executors.iter().map(|(&key, entry)| {
 | 
			
		||||
            // The total_count = prev_epoch_count + epoch_count will be used for LFU eviction.
 | 
			
		||||
            // If the epoch has changed, we store the prev_epoch_count and reset the epoch_count to 0.
 | 
			
		||||
            let entry = CachedExecutorsEntry {
 | 
			
		||||
                prev_epoch_count: entry.epoch_count.load(Relaxed),
 | 
			
		||||
                epoch_count: AtomicU64::default(),
 | 
			
		||||
                executor: entry.executor.clone(),
 | 
			
		||||
            };
 | 
			
		||||
            (key, entry)
 | 
			
		||||
        });
 | 
			
		||||
        Arc::new(Self {
 | 
			
		||||
            max: self.max,
 | 
			
		||||
            current_epoch: epoch,
 | 
			
		||||
            executors: executors.collect(),
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn new(max: usize, current_epoch: Epoch) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
@@ -1081,7 +1034,9 @@ pub struct Bank {
 | 
			
		||||
    pub rewards_pool_pubkeys: Arc<HashSet<Pubkey>>,
 | 
			
		||||
 | 
			
		||||
    /// Cached executors
 | 
			
		||||
    cached_executors: RwLock<CowCachedExecutors>,
 | 
			
		||||
    // Inner Arc is meant to implement copy-on-write semantics as opposed to
 | 
			
		||||
    // sharing mutations (hence RwLock<Arc<...>> instead of Arc<RwLock<...>>).
 | 
			
		||||
    cached_executors: RwLock<Arc<CachedExecutors>>,
 | 
			
		||||
 | 
			
		||||
    transaction_debug_keys: Option<Arc<HashSet<Pubkey>>>,
 | 
			
		||||
 | 
			
		||||
@@ -1237,7 +1192,7 @@ impl Bank {
 | 
			
		||||
            cluster_type: Option::<ClusterType>::default(),
 | 
			
		||||
            lazy_rent_collection: AtomicBool::default(),
 | 
			
		||||
            rewards_pool_pubkeys: Arc::<HashSet<Pubkey>>::default(),
 | 
			
		||||
            cached_executors: RwLock::<CowCachedExecutors>::default(),
 | 
			
		||||
            cached_executors: RwLock::<Arc<CachedExecutors>>::default(),
 | 
			
		||||
            transaction_debug_keys: Option::<Arc<HashSet<Pubkey>>>::default(),
 | 
			
		||||
            transaction_log_collector_config: Arc::<RwLock<TransactionLogCollectorConfig>>::default(
 | 
			
		||||
            ),
 | 
			
		||||
@@ -1481,9 +1436,10 @@ impl Bank {
 | 
			
		||||
            cluster_type: parent.cluster_type,
 | 
			
		||||
            lazy_rent_collection: AtomicBool::new(parent.lazy_rent_collection.load(Relaxed)),
 | 
			
		||||
            rewards_pool_pubkeys: parent.rewards_pool_pubkeys.clone(),
 | 
			
		||||
            cached_executors: RwLock::new(
 | 
			
		||||
                (*parent.cached_executors.read().unwrap()).clone_with_epoch(epoch),
 | 
			
		||||
            ),
 | 
			
		||||
            cached_executors: {
 | 
			
		||||
                let cached_executors = parent.cached_executors.read().unwrap();
 | 
			
		||||
                RwLock::new(cached_executors.clone_with_epoch(epoch))
 | 
			
		||||
            },
 | 
			
		||||
            transaction_debug_keys: parent.transaction_debug_keys.clone(),
 | 
			
		||||
            transaction_log_collector_config: parent.transaction_log_collector_config.clone(),
 | 
			
		||||
            transaction_log_collector: Arc::new(RwLock::new(TransactionLogCollector::default())),
 | 
			
		||||
@@ -1668,9 +1624,10 @@ impl Bank {
 | 
			
		||||
            cluster_type: Some(genesis_config.cluster_type),
 | 
			
		||||
            lazy_rent_collection: new(),
 | 
			
		||||
            rewards_pool_pubkeys: new(),
 | 
			
		||||
            cached_executors: RwLock::new(CowCachedExecutors::new(Arc::new(RwLock::new(
 | 
			
		||||
                CachedExecutors::new(MAX_CACHED_EXECUTORS, fields.epoch),
 | 
			
		||||
            )))),
 | 
			
		||||
            cached_executors: RwLock::new(Arc::new(CachedExecutors::new(
 | 
			
		||||
                MAX_CACHED_EXECUTORS,
 | 
			
		||||
                fields.epoch,
 | 
			
		||||
            ))),
 | 
			
		||||
            transaction_debug_keys: debug_keys,
 | 
			
		||||
            transaction_log_collector_config: new(),
 | 
			
		||||
            transaction_log_collector: new(),
 | 
			
		||||
@@ -3517,8 +3474,7 @@ impl Bank {
 | 
			
		||||
            num_executors += program_indices_of_instruction.len();
 | 
			
		||||
        }
 | 
			
		||||
        let mut executors = HashMap::with_capacity(num_executors);
 | 
			
		||||
        let cow_cache = self.cached_executors.read().unwrap();
 | 
			
		||||
        let cache = cow_cache.read().unwrap();
 | 
			
		||||
        let cache = self.cached_executors.read().unwrap();
 | 
			
		||||
 | 
			
		||||
        for key in message.account_keys_iter() {
 | 
			
		||||
            if let Some(executor) = cache.get(key) {
 | 
			
		||||
@@ -3552,8 +3508,8 @@ impl Bank {
 | 
			
		||||
            .collect();
 | 
			
		||||
 | 
			
		||||
        if !dirty_executors.is_empty() {
 | 
			
		||||
            let mut cow_cache = self.cached_executors.write().unwrap();
 | 
			
		||||
            let mut cache = cow_cache.write().unwrap();
 | 
			
		||||
            let mut cache = self.cached_executors.write().unwrap();
 | 
			
		||||
            let cache = Arc::make_mut(&mut cache);
 | 
			
		||||
            for (key, executor) in dirty_executors.into_iter() {
 | 
			
		||||
                cache.put(key, executor);
 | 
			
		||||
            }
 | 
			
		||||
@@ -3561,10 +3517,9 @@ impl Bank {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Remove an executor from the bank's cache
 | 
			
		||||
    pub fn remove_executor(&self, pubkey: &Pubkey) {
 | 
			
		||||
        let mut cow_cache = self.cached_executors.write().unwrap();
 | 
			
		||||
        let mut cache = cow_cache.write().unwrap();
 | 
			
		||||
        cache.remove(pubkey);
 | 
			
		||||
    fn remove_executor(&self, pubkey: &Pubkey) {
 | 
			
		||||
        let mut cache = self.cached_executors.write().unwrap();
 | 
			
		||||
        Arc::make_mut(&mut cache).remove(pubkey);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Execute a transaction using the provided loaded accounts and update
 | 
			
		||||
@@ -12859,26 +12814,26 @@ pub(crate) mod tests {
 | 
			
		||||
        assert!(cache.get(&key1).is_some());
 | 
			
		||||
        assert!(cache.get(&key1).is_some());
 | 
			
		||||
 | 
			
		||||
        cache = cache.clone_with_epoch(1);
 | 
			
		||||
        let mut cache = Arc::new(cache).clone_with_epoch(1);
 | 
			
		||||
        assert!(cache.current_epoch == 1);
 | 
			
		||||
 | 
			
		||||
        assert!(cache.get(&key2).is_some());
 | 
			
		||||
        assert!(cache.get(&key2).is_some());
 | 
			
		||||
        assert!(cache.get(&key3).is_some());
 | 
			
		||||
        cache.put(&key4, executor.clone());
 | 
			
		||||
        Arc::make_mut(&mut cache).put(&key4, executor.clone());
 | 
			
		||||
 | 
			
		||||
        assert!(cache.get(&key4).is_some());
 | 
			
		||||
        assert!(cache.get(&key3).is_none());
 | 
			
		||||
 | 
			
		||||
        cache.put(&key1, executor.clone());
 | 
			
		||||
        cache.put(&key3, executor.clone());
 | 
			
		||||
        Arc::make_mut(&mut cache).put(&key1, executor.clone());
 | 
			
		||||
        Arc::make_mut(&mut cache).put(&key3, executor.clone());
 | 
			
		||||
        assert!(cache.get(&key1).is_some());
 | 
			
		||||
        assert!(cache.get(&key4).is_none());
 | 
			
		||||
 | 
			
		||||
        cache = cache.clone_with_epoch(2);
 | 
			
		||||
        assert!(cache.current_epoch == 2);
 | 
			
		||||
 | 
			
		||||
        cache.put(&key3, executor.clone());
 | 
			
		||||
        Arc::make_mut(&mut cache).put(&key3, executor.clone());
 | 
			
		||||
        assert!(cache.get(&key3).is_some());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user