diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index df9714e5aa..48b2a0985c 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -78,9 +78,14 @@ impl RpcSolPubSubImpl { subscription: Arc::new(RpcSubscriptions::default()), } } +} - pub fn subscribe_to_account_updates( +impl RpcSolPubSub for RpcSolPubSubImpl { + type Metadata = Arc; + + fn account_subscribe( &self, + _meta: Self::Metadata, subscriber: Subscriber, pubkey_str: String, ) { @@ -106,8 +111,26 @@ impl RpcSolPubSubImpl { .add_account_subscription(&pubkey, &sub_id, &sink) } - pub fn subscribe_to_signature_updates( + fn account_unsubscribe( &self, + _meta: Option, + id: SubscriptionId, + ) -> Result { + info!("account_unsubscribe: id={:?}", id); + if self.subscription.remove_account_subscription(&id) { + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } + + fn signature_subscribe( + &self, + _meta: Self::Metadata, subscriber: Subscriber, signature_str: String, ) { @@ -151,45 +174,6 @@ impl RpcSolPubSubImpl { .add_signature_subscription(&signature, &sub_id, &sink), } } -} - -impl RpcSolPubSub for RpcSolPubSubImpl { - type Metadata = Arc; - - fn account_subscribe( - &self, - _meta: Self::Metadata, - subscriber: Subscriber, - pubkey_str: String, - ) { - self.subscribe_to_account_updates(subscriber, pubkey_str) - } - - fn account_unsubscribe( - &self, - _meta: Option, - id: SubscriptionId, - ) -> Result { - info!("account_unsubscribe: id={:?}", id); - if self.subscription.remove_account_subscription(&id) { - Ok(true) - } else { - Err(Error { - code: ErrorCode::InvalidParams, - message: "Invalid Request: Subscription id does not exist".into(), - data: None, - }) - } - } - - fn signature_subscribe( - &self, - _meta: Self::Metadata, - subscriber: Subscriber, - signature_str: String, - ) { - self.subscribe_to_signature_updates(subscriber, signature_str) - } fn signature_unsubscribe( &self, @@ -224,6 +208,10 @@ mod tests { use std::time::Duration; use tokio::prelude::{Async, Stream}; + fn create_session() -> Arc { + Arc::new(Session::new(mpsc::channel(1).0)) + } + #[test] fn test_signature_subscribe() { let (genesis_block, alice) = GenesisBlock::new(10_000); @@ -241,9 +229,10 @@ mod tests { // Test signature subscription let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0); + let session = create_session(); let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("signatureNotification"); - rpc.subscribe_to_signature_updates(subscriber, tx.signatures[0].to_string()); + rpc.signature_subscribe(session, subscriber, tx.signatures[0].to_string()); arc_bank .process_transaction(&tx) @@ -266,8 +255,7 @@ mod tests { let arc_bank = Arc::new(bank); let last_id = arc_bank.last_id(); - let (sender, _receiver) = mpsc::channel(1); - let session = Arc::new(Session::new(sender)); + let session = create_session(); let mut io = PubSubHandler::default(); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); @@ -324,8 +312,9 @@ mod tests { let subscription = rpc.subscription.clone(); arc_bank.set_subscriptions(subscription); + let session = create_session(); let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); - rpc.subscribe_to_account_updates(subscriber, contract_state.pubkey().to_string()); + rpc.account_subscribe(session, subscriber, contract_state.pubkey().to_string()); let tx = SystemTransaction::new_program_account( &alice, @@ -465,8 +454,7 @@ mod tests { let bank = Bank::new(&genesis_block); let arc_bank = Arc::new(bank); - let (sender, _receiver) = mpsc::channel(1); - let session = Arc::new(Session::new(sender)); + let session = create_session(); let mut io = PubSubHandler::default(); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));