diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index 48b2a0985c..ac9b619df3 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -13,7 +13,7 @@ use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; use std::mem; -use std::sync::{atomic, Arc, RwLock}; +use std::sync::{atomic, Arc}; #[rpc] pub trait RpcSolPubSub { @@ -54,30 +54,25 @@ pub trait RpcSolPubSub { fn signature_unsubscribe(&self, _: Option, _: SubscriptionId) -> Result; } -pub struct RpcPubSubBank { - pub bank: Arc, -} - -impl RpcPubSubBank { - pub fn new(bank: Arc) -> Self { - RpcPubSubBank { bank } - } -} - pub struct RpcSolPubSubImpl { uid: Arc, - bank: Arc>, - pub subscription: Arc, + bank: Arc, + subscriptions: Arc, } impl RpcSolPubSubImpl { - pub fn new(bank: Arc>) -> Self { - RpcSolPubSubImpl { - uid: Arc::new(atomic::AtomicUsize::default()), + pub fn new_with_subscriptions(bank: Arc, subscriptions: Arc) -> Self { + let uid = Arc::new(atomic::AtomicUsize::default()); + Self { + uid, bank, - subscription: Arc::new(RpcSubscriptions::default()), + subscriptions, } } + + pub fn new(bank: Arc) -> Self { + Self::new_with_subscriptions(bank, Arc::new(RpcSubscriptions::default())) + } } impl RpcSolPubSub for RpcSolPubSubImpl { @@ -107,7 +102,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { info!("account_subscribe: account={:?} id={:?}", pubkey, sub_id); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - self.subscription + self.subscriptions .add_account_subscription(&pubkey, &sub_id, &sink) } @@ -117,7 +112,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { id: SubscriptionId, ) -> Result { info!("account_unsubscribe: id={:?}", id); - if self.subscription.remove_account_subscription(&id) { + if self.subscriptions.remove_account_subscription(&id) { Ok(true) } else { Err(Error { @@ -151,14 +146,9 @@ impl RpcSolPubSub for RpcSolPubSubImpl { let sub_id = SubscriptionId::Number(id as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); - let status = self - .bank - .read() - .unwrap() - .bank - .get_signature_status(&signature); + let status = self.bank.get_signature_status(&signature); if status.is_none() { - self.subscription + self.subscriptions .add_signature_subscription(&signature, &sub_id, &sink); return; } @@ -170,7 +160,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { .unwrap(); } _ => self - .subscription + .subscriptions .add_signature_subscription(&signature, &sub_id, &sink), } } @@ -181,7 +171,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { id: SubscriptionId, ) -> Result { info!("signature_unsubscribe"); - if self.subscription.remove_signature_subscription(&id) { + if self.subscriptions.remove_signature_subscription(&id) { Ok(true) } else { Err(Error { @@ -221,12 +211,11 @@ mod tests { let arc_bank = Arc::new(bank); let last_id = arc_bank.last_id(); - let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); - let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); - let subscription = rpc.subscription.clone(); - arc_bank.set_subscriptions(subscription); + let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); + let subscriptions = rpc.subscriptions.clone(); + arc_bank.set_subscriptions(subscriptions); - // Test signature subscription + // Test signature subscriptions let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0); let session = create_session(); @@ -258,8 +247,7 @@ mod tests { let session = create_session(); let mut io = PubSubHandler::default(); - let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); - let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); + let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); io.extend_with(rpc.to_delegate()); let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0); @@ -307,10 +295,9 @@ mod tests { let arc_bank = Arc::new(bank); let last_id = arc_bank.last_id(); - let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); - let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); - let subscription = rpc.subscription.clone(); - arc_bank.set_subscriptions(subscription); + let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); + let subscriptions = rpc.subscriptions.clone(); + arc_bank.set_subscriptions(subscriptions); let session = create_session(); let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); @@ -457,8 +444,7 @@ mod tests { let session = create_session(); let mut io = PubSubHandler::default(); - let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); - let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); + let rpc = RpcSolPubSubImpl::new(arc_bank.clone()); io.extend_with(rpc.to_delegate()); diff --git a/src/rpc_pubsub_service.rs b/src/rpc_pubsub_service.rs index fb110c294c..163bef1168 100644 --- a/src/rpc_pubsub_service.rs +++ b/src/rpc_pubsub_service.rs @@ -1,22 +1,20 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request use crate::bank::Bank; -use crate::rpc_pubsub::{RpcPubSubBank, RpcSolPubSub, RpcSolPubSubImpl}; +use crate::rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl}; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use jsonrpc_pubsub::{PubSubHandler, Session}; use jsonrpc_ws_server::{RequestContext, ServerBuilder}; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; pub struct PubSubService { thread_hdl: JoinHandle<()>, exit: Arc, - rpc_bank: Arc>, - subscription: Arc, } impl Service for PubSubService { @@ -30,10 +28,9 @@ impl Service for PubSubService { impl PubSubService { pub fn new(bank: &Arc, pubsub_addr: SocketAddr) -> Self { info!("rpc_pubsub bound to {:?}", pubsub_addr); - let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone()))); - let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); - let subscription = rpc.subscription.clone(); - bank.set_subscriptions(subscription.clone()); + let subscriptions = Arc::new(RpcSubscriptions::default()); + let rpc = RpcSolPubSubImpl::new_with_subscriptions(bank.clone(), subscriptions.clone()); + bank.set_subscriptions(subscriptions); let exit = Arc::new(AtomicBool::new(false)); let exit_ = exit.clone(); let thread_hdl = Builder::new() @@ -62,17 +59,7 @@ impl PubSubService { server.unwrap().close(); }) .unwrap(); - PubSubService { - thread_hdl, - exit, - rpc_bank, - subscription, - } - } - - pub fn set_bank(&self, bank: &Arc) { - self.rpc_bank.write().unwrap().bank = bank.clone(); - bank.set_subscriptions(self.subscription.clone()); + Self { thread_hdl, exit } } pub fn exit(&self) {