(cherry picked from commit c1995c647b)
Co-authored-by: Nikita <bananaelecitrus@gmail.com>
			
			
This commit is contained in:
		@@ -180,9 +180,10 @@ pub struct SignatureSubscriptionParams {
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
 | 
			
		||||
pub struct WeakSubscriptionTokenRef(Weak<SubscriptionTokenInner>, SubscriptionId);
 | 
			
		||||
 | 
			
		||||
struct SubscriptionControlInner {
 | 
			
		||||
    subscriptions: DashMap<SubscriptionParams, Weak<SubscriptionTokenInner>>,
 | 
			
		||||
    subscriptions: DashMap<SubscriptionParams, WeakSubscriptionTokenRef>,
 | 
			
		||||
    next_id: AtomicU64,
 | 
			
		||||
    max_active_subscriptions: usize,
 | 
			
		||||
    sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
 | 
			
		||||
@@ -216,33 +217,44 @@ impl SubscriptionControl {
 | 
			
		||||
            self.0.subscriptions.len()
 | 
			
		||||
        );
 | 
			
		||||
        let count = self.0.subscriptions.len();
 | 
			
		||||
        match self.0.subscriptions.entry(params) {
 | 
			
		||||
            DashEntry::Occupied(entry) => Ok(SubscriptionToken(
 | 
			
		||||
                entry
 | 
			
		||||
                    .get()
 | 
			
		||||
                    .upgrade()
 | 
			
		||||
                    .expect("dead subscription encountered in SubscriptionControl"),
 | 
			
		||||
        let create_token_and_weak_ref = |id, params| {
 | 
			
		||||
            let token = SubscriptionToken(
 | 
			
		||||
                Arc::new(SubscriptionTokenInner {
 | 
			
		||||
                    control: Arc::clone(&self.0),
 | 
			
		||||
                    params,
 | 
			
		||||
                    id,
 | 
			
		||||
                }),
 | 
			
		||||
                self.0.counter.create_token(),
 | 
			
		||||
            )),
 | 
			
		||||
            );
 | 
			
		||||
            let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id);
 | 
			
		||||
            (token, weak_ref)
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        match self.0.subscriptions.entry(params) {
 | 
			
		||||
            DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() {
 | 
			
		||||
                Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())),
 | 
			
		||||
                // This means the last Arc for this Weak pointer entered the drop just before us,
 | 
			
		||||
                // but could not remove the entry since we are holding the write lock.
 | 
			
		||||
                // See `Drop` implementation for `SubscriptionTokenInner` for further info.
 | 
			
		||||
                None => {
 | 
			
		||||
                    let (token, weak_ref) =
 | 
			
		||||
                        create_token_and_weak_ref(entry.get().1, entry.key().clone());
 | 
			
		||||
                    entry.insert(weak_ref);
 | 
			
		||||
                    Ok(token)
 | 
			
		||||
                }
 | 
			
		||||
            },
 | 
			
		||||
            DashEntry::Vacant(entry) => {
 | 
			
		||||
                if count >= self.0.max_active_subscriptions {
 | 
			
		||||
                    inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
 | 
			
		||||
                    return Err(Error::TooManySubscriptions);
 | 
			
		||||
                }
 | 
			
		||||
                let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
 | 
			
		||||
                let token = SubscriptionToken(
 | 
			
		||||
                    Arc::new(SubscriptionTokenInner {
 | 
			
		||||
                        control: Arc::clone(&self.0),
 | 
			
		||||
                        params: entry.key().clone(),
 | 
			
		||||
                        id,
 | 
			
		||||
                    }),
 | 
			
		||||
                    self.0.counter.create_token(),
 | 
			
		||||
                );
 | 
			
		||||
                let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone());
 | 
			
		||||
                let _ = self
 | 
			
		||||
                    .0
 | 
			
		||||
                    .sender
 | 
			
		||||
                    .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
 | 
			
		||||
                entry.insert(Arc::downgrade(&token.0));
 | 
			
		||||
                entry.insert(weak_ref);
 | 
			
		||||
                datapoint_info!(
 | 
			
		||||
                    "rpc-subscription",
 | 
			
		||||
                    ("total", self.0.subscriptions.len(), i64)
 | 
			
		||||
@@ -529,7 +541,9 @@ impl Drop for SubscriptionTokenInner {
 | 
			
		||||
            DashEntry::Vacant(_) => {
 | 
			
		||||
                warn!("Subscriptions inconsistency (missing entry in by_params)");
 | 
			
		||||
            }
 | 
			
		||||
            DashEntry::Occupied(entry) => {
 | 
			
		||||
            // Check the strong refs count to ensure no other thread recreated this subscription (not token)
 | 
			
		||||
            // while we were acquiring the lock.
 | 
			
		||||
            DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => {
 | 
			
		||||
                let _ = self
 | 
			
		||||
                    .control
 | 
			
		||||
                    .sender
 | 
			
		||||
@@ -540,6 +554,9 @@ impl Drop for SubscriptionTokenInner {
 | 
			
		||||
                    ("total", self.control.subscriptions.len(), i64)
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
            // This branch handles the case in which this entry got recreated
 | 
			
		||||
            // while we were waiting for the lock (inside the `DashMap::entry` method).
 | 
			
		||||
            DashEntry::Occupied(_entry) /* if _entry.get().0.strong_count() > 0 */ => (),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user