diff --git a/Cargo.toml b/Cargo.toml index 513c733c3c..344abdeff4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,8 @@ influx_db_client = "0.3.4" solana-jsonrpc-core = "0.1" solana-jsonrpc-http-server = "0.1" solana-jsonrpc-macros = "0.1" +solana-jsonrpc-pubsub = "0.1" +solana-jsonrpc-ws-server = "0.1" ipnetwork = "0.12.7" itertools = "0.7.8" libc = "0.2.43" diff --git a/doc/json-rpc.md b/doc/json-rpc.md index 03dda356be..f8f0f32934 100644 --- a/doc/json-rpc.md +++ b/doc/json-rpc.md @@ -23,6 +23,13 @@ Methods * [getTransactionCount](#gettransactioncount) * [requestAirdrop](#requestairdrop) * [sendTransaction](#sendtransaction) +* [subscriptionChannel](#subscriptionChannel) + +* [Subscription Websocket](#subscription-websocket) + * [accountSubscribe](#accountsubscribe) + * [accountUnsubscribe](#accountunsubscribe) + * [signatureSubscribe](#signaturesubscribe) + * [signatureUnsubscribe](#signatureunsubscribe) Request Formatting --- @@ -227,3 +234,122 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m ``` --- + +### startSubscriptionChannel +Open a socket on the node for JSON-RPC subscription requests + +##### Parameters: +None + +##### Results: +* `string` - "port", open websocket port +* `string` - "path", unique key to use as websocket path + +##### Example: +```bash +// Request +curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc": "2.0","id":1,"method":"startSubscriptionChannel"}' http://localhost:8899 + +// Result +{"jsonrpc":"2.0","result":{"port":9876,"path":"BRbmMXn71cKfzXjFsmrTsWsXuQwbjXbwKdoRwVw1FRA3"},"id":1} +``` + +--- + +### Subscription Websocket +After opening a subscription socket with the `subscriptionChannel` JSON-RPC request method, submit subscription requests via websocket protocol +Connect to the websocket at `ws://
/` returned from the request +- Submit subscription requests to the websocket using the methods below +- Multiple subscriptions may be active at once +- The subscription-channel socket will close when client closes websocket. To create new subscriptions, send a new `subscriptionChannel` JSON-RPC request. + +--- + +### accountSubscribe +Subscribe to an account to receive notifications when the userdata for a given account public key changes + +##### Parameters: +* `string` - account Pubkey, as base-58 encoded string + +##### Results: +* `integer` - Subscription id (needed to unsubscribe) + +##### Example: +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"accountSubscribe", "params":["CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12"]} + +// Result +{"jsonrpc": "2.0","result": 0,"id": 1} +``` + +##### Notification Format: +```bash +{"jsonrpc": "2.0","method": "accountNotification", "params": {"result": {"program_id":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"tokens":1,"userdata":[3,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,20,0,0,0,0,0,0,0,50,48,53,48,45,48,49,45,48,49,84,48,48,58,48,48,58,48,48,90,252,10,7,28,246,140,88,177,98,82,10,227,89,81,18,30,194,101,199,16,11,73,133,20,246,62,114,39,20,113,189,32,50,0,0,0,0,0,0,0,247,15,36,102,167,83,225,42,133,127,82,34,36,224,207,130,109,230,224,188,163,33,213,13,5,117,211,251,65,159,197,51,0,0,0,0,0,0]},"subscription":0}} +``` + +--- + +### accountUnsubscribe +Unsubscribe from account userdata change notifications + +##### Parameters: +* `integer` - id of account Subscription to cancel + +##### Results: +* `bool` - unsubscribe success message + +##### Example: +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"accountUnsubscribe", "params":[0]} + +// Result +{"jsonrpc": "2.0","result": true,"id": 1} +``` + +--- + +### signatureSubscribe +Subscribe to a transaction signature to receive notification when the transaction is confirmed +On `signatureNotification`, the subscription is automatically cancelled + +##### Parameters: +* `string` - Transaction Signature, as base-58 encoded string + +##### Results: +* `integer` - subscription id (needed to unsubscribe) + +##### Example: +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"signatureSubscribe", "params":["2EBVM6cB8vAAD93Ktr6Vd8p67XPbQzCJX47MpReuiCXJAtcjaxpvWpcg9Ege1Nr5Tk3a2GFrByT7WPBjdsTycY9b"]} + +// Result +{"jsonrpc": "2.0","result": 0,"id": 1} +``` + +##### Notification Format: +```bash +{"jsonrpc": "2.0","method": "signatureNotification", "params": {"result": "Confirmed","subscription":0}} +``` + +--- + +### signatureUnsubscribe +Unsubscribe from account userdata change notifications + +##### Parameters: +* `integer` - id of account subscription to cancel + +##### Results: +* `bool` - unsubscribe success message + +##### Example: +```bash +// Request +{"jsonrpc":"2.0", "id":1, "method":"signatureUnsubscribe", "params":[0]} + +// Result +{"jsonrpc": "2.0","result": true,"id": 1} +``` diff --git a/src/bank.rs b/src/bank.rs index d8f1187a5b..8b366cf7b6 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -11,11 +11,13 @@ use dynamic_program::DynamicProgram; use entry::Entry; use hash::{hash, Hash}; use itertools::Itertools; +use jsonrpc_macros::pubsub::Sink; use ledger::Block; use log::Level; use mint::Mint; use payment_plan::Payment; use poh_recorder::PohRecorder; +use rpc::RpcSignatureStatus; use signature::Keypair; use signature::Signature; use solana_program_interface::account::{Account, KeyedAccount}; @@ -33,6 +35,7 @@ use tictactoe_dashboard_program::TicTacToeDashboardProgram; use tictactoe_program::TicTacToeProgram; use timing::{duration_as_us, timestamp}; use token_program::TokenProgram; +use tokio::prelude::Future; use transaction::Transaction; use window::WINDOW_SIZE; @@ -135,6 +138,12 @@ pub struct Bank { // loaded contracts hashed by program_id loaded_contracts: RwLock>, + + // Mapping of account ids to Subscriber ids and sinks to notify on userdata update + account_subscriptions: RwLock>>>, + + // Mapping of signatures to Subscriber ids and sinks to notify on confirmation + signature_subscriptions: RwLock>>>, } impl Default for Bank { @@ -148,6 +157,8 @@ impl Default for Bank { is_leader: true, finality_time: AtomicUsize::new(std::usize::MAX), loaded_contracts: RwLock::new(HashMap::new()), + account_subscriptions: RwLock::new(HashMap::new()), + signature_subscriptions: RwLock::new(HashMap::new()), } } } @@ -259,6 +270,18 @@ impl Bank { &res[i], &tx.last_id, ); + if res[i] != Err(BankError::SignatureNotFound) { + let status = match res[i] { + Ok(_) => RpcSignatureStatus::Confirmed, + Err(BankError::ProgramRuntimeError(_)) => { + RpcSignatureStatus::ProgramRuntimeError + } + Err(_) => RpcSignatureStatus::GenericFailure, + }; + if status != RpcSignatureStatus::SignatureNotFound { + self.check_signature_subscriptions(&tx.signature, status); + } + } } } @@ -499,6 +522,17 @@ impl Bank { .map(|a| (a.program_id, a.tokens)) .collect(); + // Check account subscriptions before storing data for notifications + let subscriptions = self.account_subscriptions.read().unwrap(); + let pre_userdata: Vec<_> = tx + .account_keys + .iter() + .enumerate() + .zip(program_accounts.iter_mut()) + .filter(|((_, pubkey), _)| subscriptions.get(&pubkey).is_some()) + .map(|((i, pubkey), a)| ((i, pubkey), a.userdata.clone())) + .collect(); + // Call the contract method // It's up to the contract to implement its own rules on moving funds if SystemProgram::check_id(&tx_program_id) { @@ -554,6 +588,13 @@ impl Bank { post_account, )?; } + // Send notifications + for ((i, pubkey), userdata) in &pre_userdata { + let account = &program_accounts[*i]; + if userdata != &account.userdata { + self.check_account_subscriptions(&pubkey, &account); + } + } // The total sum of all the tokens in all the pages cannot change. let post_total: i64 = program_accounts.iter().map(|a| a.tokens).sum(); if pre_total != post_total { @@ -942,16 +983,101 @@ impl Bank { pub fn set_finality(&self, finality: usize) { self.finality_time.store(finality, Ordering::Relaxed); } + + pub fn add_account_subscription( + &self, + bank_sub_id: Pubkey, + pubkey: Pubkey, + sink: Sink, + ) { + let mut subscriptions = self.account_subscriptions.write().unwrap(); + if let Some(current_hashmap) = subscriptions.get_mut(&pubkey) { + current_hashmap.insert(bank_sub_id, sink); + return; + } + let mut hashmap = HashMap::new(); + hashmap.insert(bank_sub_id, sink); + subscriptions.insert(pubkey, hashmap); + } + + pub fn remove_account_subscription(&self, bank_sub_id: &Pubkey, pubkey: &Pubkey) -> bool { + let mut subscriptions = self.account_subscriptions.write().unwrap(); + match subscriptions.get_mut(pubkey) { + Some(ref current_hashmap) if current_hashmap.len() == 1 => {} + Some(current_hashmap) => { + return current_hashmap.remove(bank_sub_id).is_some(); + } + None => { + return false; + } + } + subscriptions.remove(pubkey).is_some() + } + + fn check_account_subscriptions(&self, pubkey: &Pubkey, account: &Account) { + let subscriptions = self.account_subscriptions.read().unwrap(); + if let Some(hashmap) = subscriptions.get(pubkey) { + for (_bank_sub_id, sink) in hashmap.iter() { + sink.notify(Ok(account.clone())).wait().unwrap(); + } + } + } + + pub fn add_signature_subscription( + &self, + bank_sub_id: Pubkey, + signature: Signature, + sink: Sink, + ) { + let mut subscriptions = self.signature_subscriptions.write().unwrap(); + if let Some(current_hashmap) = subscriptions.get_mut(&signature) { + current_hashmap.insert(bank_sub_id, sink); + return; + } + let mut hashmap = HashMap::new(); + hashmap.insert(bank_sub_id, sink); + subscriptions.insert(signature, hashmap); + } + + pub fn remove_signature_subscription( + &self, + bank_sub_id: &Pubkey, + signature: &Signature, + ) -> bool { + let mut subscriptions = self.signature_subscriptions.write().unwrap(); + match subscriptions.get_mut(signature) { + Some(ref current_hashmap) if current_hashmap.len() == 1 => {} + Some(current_hashmap) => { + return current_hashmap.remove(bank_sub_id).is_some(); + } + None => { + return false; + } + } + subscriptions.remove(signature).is_some() + } + + fn check_signature_subscriptions(&self, signature: &Signature, status: RpcSignatureStatus) { + let mut subscriptions = self.signature_subscriptions.write().unwrap(); + if let Some(hashmap) = subscriptions.get(signature) { + for (_bank_sub_id, sink) in hashmap.iter() { + sink.notify(Ok(status)).wait().unwrap(); + } + } + subscriptions.remove(&signature); + } } #[cfg(test)] mod tests { use super::*; use bincode::serialize; + use budget_program::BudgetState; use entry::next_entry; use entry::Entry; use entry_writer::{self, EntryWriter}; use hash::hash; + use jsonrpc_macros::pubsub::{Subscriber, SubscriptionId}; use ledger; use logger; use signature::Keypair; @@ -959,6 +1085,7 @@ mod tests { use std; use std::io::{BufReader, Cursor, Seek, SeekFrom}; use system_transaction::SystemTransaction; + use tokio::prelude::{Async, Stream}; use transaction::Instruction; #[test] @@ -1496,4 +1623,94 @@ mod tests { Ok(_) ); } + #[test] + fn test_bank_account_subscribe() { + let mint = Mint::new(100); + let bank = Bank::new(&mint); + let alice = Keypair::new(); + let bank_sub_id = Keypair::new().pubkey(); + let last_id = bank.last_id(); + let tx = Transaction::system_create( + &mint.keypair(), + alice.pubkey(), + last_id, + 1, + 16, + BudgetState::id(), + 0, + ); + bank.process_transaction(&tx).unwrap(); + + let (subscriber, _id_receiver, mut transport_receiver) = + Subscriber::new_test("accountNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + bank.add_account_subscription(bank_sub_id, alice.pubkey(), sink); + + assert!( + bank.account_subscriptions + .write() + .unwrap() + .contains_key(&alice.pubkey()) + ); + + let account = bank.get_account(&alice.pubkey()).unwrap(); + bank.check_account_subscriptions(&alice.pubkey(), &account); + let string = transport_receiver.poll(); + assert!(string.is_ok()); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected = format!(r#"{{"jsonrpc":"2.0","method":"accountNotification","params":{{"result":{{"program_id":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"tokens":1,"userdata":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"subscription":0}}}}"#); + assert_eq!(expected, response); + } + + bank.remove_account_subscription(&bank_sub_id, &alice.pubkey()); + assert!( + !bank + .account_subscriptions + .write() + .unwrap() + .contains_key(&alice.pubkey()) + ); + } + #[test] + fn test_bank_signature_subscribe() { + let mint = Mint::new(100); + let bank = Bank::new(&mint); + let alice = Keypair::new(); + let bank_sub_id = Keypair::new().pubkey(); + let last_id = bank.last_id(); + let tx = Transaction::system_move(&mint.keypair(), alice.pubkey(), 20, last_id, 0); + let signature = tx.signature; + bank.process_transaction(&tx).unwrap(); + + let (subscriber, _id_receiver, mut transport_receiver) = + Subscriber::new_test("signatureNotification"); + let sub_id = SubscriptionId::Number(0 as u64); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + bank.add_signature_subscription(bank_sub_id, signature, sink); + + assert!( + bank.signature_subscriptions + .write() + .unwrap() + .contains_key(&signature) + ); + + bank.check_signature_subscriptions(&signature, RpcSignatureStatus::Confirmed); + let string = transport_receiver.poll(); + assert!(string.is_ok()); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#); + assert_eq!(expected, response); + } + + bank.remove_signature_subscription(&bank_sub_id, &signature); + assert!( + !bank + .signature_subscriptions + .write() + .unwrap() + .contains_key(&signature) + ); + } } diff --git a/src/lib.rs b/src/lib.rs index 62ebb34348..74c6ec084c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,7 @@ pub mod packet; pub mod payment_plan; pub mod poh; pub mod poh_recorder; +pub mod pubsub; pub mod recvmmsg; pub mod replicate_stage; pub mod replicator; @@ -109,6 +110,8 @@ extern crate solana_jsonrpc_core as jsonrpc_core; extern crate solana_jsonrpc_http_server as jsonrpc_http_server; #[macro_use] extern crate solana_jsonrpc_macros as jsonrpc_macros; +extern crate solana_jsonrpc_pubsub as jsonrpc_pubsub; +extern crate solana_jsonrpc_ws_server as jsonrpc_ws_server; extern crate solana_program_interface; extern crate sys_info; extern crate tokio; diff --git a/src/netutil.rs b/src/netutil.rs index 05e1ef2931..48ea598e28 100644 --- a/src/netutil.rs +++ b/src/netutil.rs @@ -7,7 +7,7 @@ use rand::{thread_rng, Rng}; use reqwest; use socket2::{Domain, SockAddr, Socket, Type}; use std::io; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}; use std::os::unix::io::AsRawFd; /// A data type representing a public Udp socket @@ -159,6 +159,26 @@ pub fn bind_to(port: u16, reuseaddr: bool) -> io::Result { } } +pub fn find_available_port_in_range(range: (u16, u16)) -> io::Result { + let (start, end) = range; + let mut tries_left = end - start; + loop { + let rand_port = thread_rng().gen_range(start, end); + match TcpListener::bind(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + rand_port, + )) { + Ok(_) => { + break Ok(rand_port); + } + Err(err) => if err.kind() != io::ErrorKind::AddrInUse || tries_left == 0 { + return Err(err); + }, + } + tries_left -= 1; + } +} + #[cfg(test)] mod tests { use ipnetwork::IpNetwork; diff --git a/src/pubsub.rs b/src/pubsub.rs new file mode 100644 index 0000000000..c8a0ad7346 --- /dev/null +++ b/src/pubsub.rs @@ -0,0 +1,638 @@ +//! The `pubsub` module implements a threaded subscription service on client RPC request + +use bank::Bank; +use bs58; +use jsonrpc_core::futures::Future; +use jsonrpc_core::*; +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; +use jsonrpc_ws_server::ws; +use jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder}; +use rpc::{JsonRpcRequestProcessor, RpcSignatureStatus}; +use signature::{Keypair, KeypairUtil, Signature}; +use solana_program_interface::account::Account; +use solana_program_interface::pubkey::Pubkey; +use std::collections::HashMap; +use std::mem; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{atomic, Arc, Mutex, RwLock}; +use std::thread::{sleep, Builder, JoinHandle}; +use std::time::Duration; + +pub enum ClientState { + Uninitialized, + Init(Sender), +} + +#[derive(Serialize)] +pub struct SubscriptionResponse { + pub port: u16, + pub path: String, +} + +pub struct PubSubService { + _thread_hdl: JoinHandle<()>, +} + +impl PubSubService { + pub fn new( + bank: &Arc, + pubsub_addr: SocketAddr, + path: Pubkey, + exit: Arc, + ) -> Self { + let request_processor = JsonRpcRequestProcessor::new(bank.clone()); + let status = Arc::new(Mutex::new(ClientState::Uninitialized)); + let client_status = status.clone(); + let server_bank = bank.clone(); + let _thread_hdl = Builder::new() + .name("solana-pubsub".to_string()) + .spawn(move || { + let mut io = PubSubHandler::default(); + let rpc = RpcSolPubSubImpl::default(); + let account_subs = rpc.account_subscriptions.clone(); + let signature_subs = rpc.signature_subscriptions.clone(); + io.extend_with(rpc.to_delegate()); + + let server = ServerBuilder::with_meta_extractor(io, move |context: &RequestContext| + { + *client_status.lock().unwrap() = ClientState::Init(context.out.clone()); + Meta { + request_processor: request_processor.clone(), + session: Arc::new(Session::new(context.sender().clone())), + } + }) + .request_middleware(move |req: &ws::Request| + if req.resource() != format!("/{}", path.to_string()) { + Some(ws::Response::new(403, "Client path incorrect or not provided")) + } else { + None + }) + .start(&pubsub_addr); + + if server.is_err() { + warn!("Pubsub service unavailable: unable to bind to port {}. \nMake sure this port is not already in use by another application", pubsub_addr.port()); + return; + } + while !exit.load(Ordering::Relaxed) { + if let ClientState::Init(ref mut sender) = *status.lock().unwrap() { + if sender.check_active().is_err() { + break; + } + } + sleep(Duration::from_millis(100)); + } + for (_, (bank_sub_id, pubkey)) in account_subs.read().unwrap().iter() { + server_bank.remove_account_subscription(bank_sub_id, pubkey); + } + for (_, (bank_sub_id, signature)) in signature_subs.read().unwrap().iter() { + server_bank.remove_signature_subscription(bank_sub_id, signature); + } + server.unwrap().close(); + () + }) + .unwrap(); + PubSubService { _thread_hdl } + } +} + +#[derive(Clone)] +pub struct Meta { + pub request_processor: JsonRpcRequestProcessor, + pub session: Arc, +} +impl Metadata for Meta {} +impl PubSubMetadata for Meta { + fn session(&self) -> Option> { + Some(self.session.clone()) + } +} + +build_rpc_trait! { + pub trait RpcSolPubSub { + type Metadata; + + #[pubsub(name = "accountNotification")] { + // Get notification every time account userdata is changed + // Accepts pubkey parameter as base-58 encoded string + #[rpc(name = "accountSubscribe")] + fn account_subscribe(&self, Self::Metadata, pubsub::Subscriber, String); + + // Unsubscribe from account notification subscription. + #[rpc(name = "accountUnsubscribe")] + fn account_unsubscribe(&self, Self::Metadata, SubscriptionId) -> Result; + } + #[pubsub(name = "signatureNotification")] { + // Get notification when signature is verified + // Accepts signature parameter as base-58 encoded string + #[rpc(name = "signatureSubscribe")] + fn signature_subscribe(&self, Self::Metadata, pubsub::Subscriber, String); + + // Unsubscribe from signature notification subscription. + #[rpc(name = "signatureUnsubscribe")] + fn signature_unsubscribe(&self, Self::Metadata, SubscriptionId) -> Result; + } + } +} + +#[derive(Default)] +struct RpcSolPubSubImpl { + uid: atomic::AtomicUsize, + account_subscriptions: Arc>>, + signature_subscriptions: Arc>>, +} +impl RpcSolPubSub for RpcSolPubSubImpl { + type Metadata = Meta; + + fn account_subscribe( + &self, + meta: Self::Metadata, + subscriber: pubsub::Subscriber, + pubkey_str: String, + ) { + let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap(); + if pubkey_vec.len() != mem::size_of::() { + subscriber + .reject(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Invalid pubkey provided".into(), + data: None, + }).unwrap(); + return; + } + let pubkey = Pubkey::new(&pubkey_vec); + + let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst); + let sub_id = SubscriptionId::Number(id as u64); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let bank_sub_id = Keypair::new().pubkey(); + self.account_subscriptions + .write() + .unwrap() + .insert(sub_id.clone(), (bank_sub_id, pubkey)); + + meta.request_processor + .add_account_subscription(bank_sub_id, pubkey, sink); + } + + fn account_unsubscribe(&self, meta: Self::Metadata, id: SubscriptionId) -> Result { + if let Some((bank_sub_id, pubkey)) = self.account_subscriptions.write().unwrap().remove(&id) + { + meta.request_processor + .remove_account_subscription(&bank_sub_id, &pubkey); + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } + + fn signature_subscribe( + &self, + meta: Self::Metadata, + subscriber: pubsub::Subscriber, + signature_str: String, + ) { + let signature_vec = bs58::decode(signature_str).into_vec().unwrap(); + if signature_vec.len() != mem::size_of::() { + subscriber + .reject(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Invalid signature provided".into(), + data: None, + }).unwrap(); + return; + } + let signature = Signature::new(&signature_vec); + + let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst); + let sub_id = SubscriptionId::Number(id as u64); + let sink = subscriber.assign_id(sub_id.clone()).unwrap(); + let bank_sub_id = Keypair::new().pubkey(); + self.signature_subscriptions + .write() + .unwrap() + .insert(sub_id.clone(), (bank_sub_id, signature)); + + match meta.request_processor.get_signature_status(signature) { + Ok(_) => { + sink.notify(Ok(RpcSignatureStatus::Confirmed)) + .wait() + .unwrap(); + self.signature_subscriptions + .write() + .unwrap() + .remove(&sub_id); + } + Err(_) => { + meta.request_processor + .add_signature_subscription(bank_sub_id, signature, sink); + } + } + } + + fn signature_unsubscribe(&self, meta: Self::Metadata, id: SubscriptionId) -> Result { + if let Some((bank_sub_id, signature)) = + self.signature_subscriptions.write().unwrap().remove(&id) + { + meta.request_processor + .remove_signature_subscription(&bank_sub_id, &signature); + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use budget_program::BudgetState; + use budget_transaction::BudgetTransaction; + use jsonrpc_core::futures::sync::mpsc; + use mint::Mint; + use signature::{Keypair, KeypairUtil}; + use std::net::{IpAddr, Ipv4Addr}; + use system_transaction::SystemTransaction; + use tokio::prelude::{Async, Stream}; + use transaction::Transaction; + + #[test] + fn test_pubsub_new() { + let alice = Mint::new(10_000); + let bank = Bank::new(&alice); + let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + let pubkey = Keypair::new().pubkey(); + let exit = Arc::new(AtomicBool::new(false)); + let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr, pubkey, exit); + let thread = pubsub_service._thread_hdl.thread(); + assert_eq!(thread.name().unwrap(), "solana-pubsub"); + } + + #[test] + fn test_signature_subscribe() { + let alice = Mint::new(10_000); + let bob = Keypair::new(); + let bob_pubkey = bob.pubkey(); + let bank = Bank::new(&alice); + let arc_bank = Arc::new(bank); + let last_id = arc_bank.last_id(); + + let request_processor = JsonRpcRequestProcessor::new(arc_bank.clone()); + let (sender, mut receiver) = mpsc::channel(1); + let session = Arc::new(Session::new(sender)); + + let mut io = PubSubHandler::default(); + let rpc = RpcSolPubSubImpl::default(); + io.extend_with(rpc.to_delegate()); + let meta = Meta { + request_processor, + session, + }; + + // Test signature subscription + let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0); + + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#, + tx.signature.to_string() + ); + let res = io.handle_request_sync(&req, meta.clone()); + let expected = format!(r#"{{"jsonrpc":"2.0","result":0,"id":1}}"#); + let expected: Response = + serde_json::from_str(&expected).expect("expected response deserialization"); + + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + assert_eq!(expected, result); + + // Test bad parameter + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["a1b2c3"]}}"# + ); + let res = io.handle_request_sync(&req, meta.clone()); + let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Invalid signature provided"}},"id":1}}"#); + let expected: Response = + serde_json::from_str(&expected).expect("expected response deserialization"); + + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + assert_eq!(expected, result); + + arc_bank + .process_transaction(&tx) + .expect("process transaction"); + sleep(Duration::from_millis(200)); + + // Test signature confirmation notification + let string = receiver.poll(); + assert!(string.is_ok()); + if let Async::Ready(Some(response)) = string.unwrap() { + let expected = format!(r#"{{"jsonrpc":"2.0","method":"signatureNotification","params":{{"result":"Confirmed","subscription":0}}}}"#); + assert_eq!(expected, response); + } + + // Test subscription id increment + let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 10, last_id, 0); + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#, + tx.signature.to_string() + ); + let res = io.handle_request_sync(&req, meta.clone()); + let expected = format!(r#"{{"jsonrpc":"2.0","result":1,"id":1}}"#); + let expected: Response = + serde_json::from_str(&expected).expect("expected response deserialization"); + + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + assert_eq!(expected, result); + } + + #[test] + fn test_signature_unsubscribe() { + let alice = Mint::new(10_000); + let bob_pubkey = Keypair::new().pubkey(); + let bank = Bank::new(&alice); + let arc_bank = Arc::new(bank); + let last_id = arc_bank.last_id(); + + let request_processor = JsonRpcRequestProcessor::new(arc_bank); + let (sender, _receiver) = mpsc::channel(1); + let session = Arc::new(Session::new(sender)); + + let mut io = PubSubHandler::default(); + let rpc = RpcSolPubSubImpl::default(); + io.extend_with(rpc.to_delegate()); + let meta = Meta { + request_processor, + session: session.clone(), + }; + + let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0); + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#, + tx.signature.to_string() + ); + let _res = io.handle_request_sync(&req, meta.clone()); + + let req = + format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[0]}}"#); + let res = io.handle_request_sync(&req, meta.clone()); + + let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#); + let expected: Response = + serde_json::from_str(&expected).expect("expected response deserialization"); + + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + assert_eq!(expected, result); + + // Test bad parameter + let req = + format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[1]}}"#); + let res = io.handle_request_sync(&req, meta.clone()); + let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#); + let expected: Response = + serde_json::from_str(&expected).expect("expected response deserialization"); + + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + assert_eq!(expected, result); + } + + #[test] + fn test_account_subscribe() { + let alice = Mint::new(10_000); + let bob_pubkey = Keypair::new().pubkey(); + let witness = Keypair::new(); + let contract_funds = Keypair::new(); + let contract_state = Keypair::new(); + let budget_program_id = BudgetState::id(); + let bank = Bank::new(&alice); + let arc_bank = Arc::new(bank); + let last_id = arc_bank.last_id(); + + let request_processor = JsonRpcRequestProcessor::new(arc_bank.clone()); + let (sender, mut receiver) = mpsc::channel(1); + let session = Arc::new(Session::new(sender)); + + let mut io = PubSubHandler::default(); + let rpc = RpcSolPubSubImpl::default(); + io.extend_with(rpc.to_delegate()); + let meta = Meta { + request_processor, + session, + }; + + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["{}"]}}"#, + contract_state.pubkey().to_string() + ); + + let res = io.handle_request_sync(&req, meta.clone()); + let expected = format!(r#"{{"jsonrpc":"2.0","result":0,"id":1}}"#); + let expected: Response = + serde_json::from_str(&expected).expect("expected response deserialization"); + + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + assert_eq!(expected, result); + + // Test bad parameter + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["a1b2c3"]}}"# + ); + let res = io.handle_request_sync(&req, meta.clone()); + let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Invalid pubkey provided"}},"id":1}}"#); + let expected: Response = + serde_json::from_str(&expected).expect("expected response deserialization"); + + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + assert_eq!(expected, result); + + let tx = Transaction::system_create( + &alice.keypair(), + contract_funds.pubkey(), + last_id, + 50, + 0, + budget_program_id, + 0, + ); + arc_bank + .process_transaction(&tx) + .expect("process transaction"); + + let tx = Transaction::system_create( + &alice.keypair(), + contract_state.pubkey(), + last_id, + 1, + 196, + budget_program_id, + 0, + ); + arc_bank + .process_transaction(&tx) + .expect("process transaction"); + + // Test signature confirmation notification #1 + let string = receiver.poll(); + assert!(string.is_ok()); + let expected_userdata = arc_bank + .get_account(&contract_state.pubkey()) + .unwrap() + .userdata; + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "program_id": budget_program_id, + "tokens": 1, + "userdata": expected_userdata + }, + "subscription": 0, + } + }); + + if let Async::Ready(Some(response)) = string.unwrap() { + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + } + + let tx = Transaction::budget_new_when_signed( + &contract_funds, + bob_pubkey, + contract_state.pubkey(), + witness.pubkey(), + None, + 50, + last_id, + ); + arc_bank + .process_transaction(&tx) + .expect("process transaction"); + sleep(Duration::from_millis(200)); + + // Test signature confirmation notification #2 + let string = receiver.poll(); + assert!(string.is_ok()); + let expected_userdata = arc_bank + .get_account(&contract_state.pubkey()) + .unwrap() + .userdata; + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "program_id": budget_program_id, + "tokens": 51, + "userdata": expected_userdata + }, + "subscription": 0, + } + }); + + if let Async::Ready(Some(response)) = string.unwrap() { + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + } + + let tx = Transaction::system_new(&alice.keypair(), witness.pubkey(), 1, last_id); + arc_bank + .process_transaction(&tx) + .expect("process transaction"); + sleep(Duration::from_millis(200)); + let tx = Transaction::budget_new_signature( + &witness, + contract_state.pubkey(), + bob_pubkey, + last_id, + ); + arc_bank + .process_transaction(&tx) + .expect("process transaction"); + sleep(Duration::from_millis(200)); + + let expected_userdata = arc_bank + .get_account(&contract_state.pubkey()) + .unwrap() + .userdata; + let expected = json!({ + "jsonrpc": "2.0", + "method": "accountNotification", + "params": { + "result": { + "program_id": budget_program_id, + "tokens": 1, + "userdata": expected_userdata + }, + "subscription": 0, + } + }); + let string = receiver.poll(); + assert!(string.is_ok()); + if let Async::Ready(Some(response)) = string.unwrap() { + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + } + } + + #[test] + fn test_account_unsubscribe() { + let alice = Mint::new(10_000); + let bob_pubkey = Keypair::new().pubkey(); + let bank = Bank::new(&alice); + let arc_bank = Arc::new(bank); + + let request_processor = JsonRpcRequestProcessor::new(arc_bank); + let (sender, _receiver) = mpsc::channel(1); + let session = Arc::new(Session::new(sender)); + + let mut io = PubSubHandler::default(); + let rpc = RpcSolPubSubImpl::default(); + io.extend_with(rpc.to_delegate()); + let meta = Meta { + request_processor, + session: session.clone(), + }; + + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["{}"]}}"#, + bob_pubkey.to_string() + ); + let _res = io.handle_request_sync(&req, meta.clone()); + + let req = + format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[0]}}"#); + let res = io.handle_request_sync(&req, meta.clone()); + + let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#); + let expected: Response = + serde_json::from_str(&expected).expect("expected response deserialization"); + + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + assert_eq!(expected, result); + + // Test bad parameter + let req = + format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[1]}}"#); + let res = io.handle_request_sync(&req, meta.clone()); + let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#); + let expected: Response = + serde_json::from_str(&expected).expect("expected response deserialization"); + + let result: Response = serde_json::from_str(&res.expect("actual response")) + .expect("actual response deserialization"); + assert_eq!(expected, result); + } +} diff --git a/src/rpc.rs b/src/rpc.rs index 0c17d2d520..f3d1e07871 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -3,10 +3,14 @@ use bank::{Bank, BankError}; use bincode::deserialize; use bs58; +use cluster_info::FULLNODE_PORT_RANGE; use jsonrpc_core::*; use jsonrpc_http_server::*; +use jsonrpc_macros::pubsub::Sink; +use netutil::find_available_port_in_range; +use pubsub::{PubSubService, SubscriptionResponse}; use service::Service; -use signature::Signature; +use signature::{Keypair, KeypairUtil, Signature}; use solana_program_interface::account::Account; use solana_program_interface::pubkey::Pubkey; use std::mem; @@ -35,6 +39,7 @@ impl JsonRpcService { exit: Arc, ) -> Self { let request_processor = JsonRpcRequestProcessor::new(bank.clone()); + let exit_pubsub = exit.clone(); let thread_hdl = Builder::new() .name("solana-jsonrpc".to_string()) .spawn(move || { @@ -47,6 +52,8 @@ impl JsonRpcService { request_processor: request_processor.clone(), transactions_addr, drone_addr, + rpc_addr, + exit: exit_pubsub.clone(), }).threads(4) .cors(DomainsValidation::AllowOnly(vec![ AccessControlAllowOrigin::Any, @@ -83,10 +90,12 @@ pub struct Meta { pub request_processor: JsonRpcRequestProcessor, pub transactions_addr: SocketAddr, pub drone_addr: SocketAddr, + pub rpc_addr: SocketAddr, + pub exit: Arc, } impl Metadata for Meta {} -#[derive(PartialEq, Serialize)] +#[derive(Copy, Clone, PartialEq, Serialize, Debug)] pub enum RpcSignatureStatus { Confirmed, SignatureNotFound, @@ -124,6 +133,9 @@ build_rpc_trait! { #[rpc(meta, name = "sendTransaction")] fn send_transaction(&self, Self::Metadata, Vec) -> Result; + + #[rpc(meta, name = "startSubscriptionChannel")] + fn start_subscription_channel(&self, Self::Metadata) -> Result; } } @@ -222,6 +234,22 @@ impl RpcSol for RpcSolImpl { })?; Ok(bs58::encode(tx.signature).into_string()) } + fn start_subscription_channel(&self, meta: Self::Metadata) -> Result { + let port: u16 = find_available_port_in_range(FULLNODE_PORT_RANGE).map_err(|_| Error { + code: ErrorCode::InternalError, + message: "No available port in range".into(), + data: None, + })?; + let mut pubsub_addr = meta.rpc_addr; + pubsub_addr.set_port(port); + let pubkey = Keypair::new().pubkey(); + let _pubsub_service = + PubSubService::new(&meta.request_processor.bank, pubsub_addr, pubkey, meta.exit); + Ok(SubscriptionResponse { + port, + path: pubkey.to_string(), + }) + } } #[derive(Clone)] pub struct JsonRpcRequestProcessor { @@ -234,7 +262,7 @@ impl JsonRpcRequestProcessor { } /// Process JSON-RPC request items sent via JSON-RPC. - fn get_account_info(&self, pubkey: Pubkey) -> Result { + pub fn get_account_info(&self, pubkey: Pubkey) -> Result { self.bank .get_account(&pubkey) .ok_or_else(Error::invalid_request) @@ -250,12 +278,37 @@ impl JsonRpcRequestProcessor { let id = self.bank.last_id(); Ok(bs58::encode(id).into_string()) } - fn get_signature_status(&self, signature: Signature) -> result::Result<(), BankError> { + pub fn get_signature_status(&self, signature: Signature) -> result::Result<(), BankError> { self.bank.get_signature_status(&signature) } fn get_transaction_count(&self) -> Result { Ok(self.bank.transaction_count() as u64) } + pub fn add_account_subscription( + &self, + bank_sub_id: Pubkey, + pubkey: Pubkey, + sink: Sink, + ) { + self.bank + .add_account_subscription(bank_sub_id, pubkey, sink); + } + pub fn remove_account_subscription(&self, bank_sub_id: &Pubkey, pubkey: &Pubkey) { + self.bank.remove_account_subscription(bank_sub_id, pubkey); + } + pub fn add_signature_subscription( + &self, + bank_sub_id: Pubkey, + signature: Signature, + sink: Sink, + ) { + self.bank + .add_signature_subscription(bank_sub_id, signature, sink); + } + pub fn remove_signature_subscription(&self, bank_sub_id: &Pubkey, signature: &Signature) { + self.bank + .remove_signature_subscription(bank_sub_id, signature); + } } #[cfg(test)] @@ -283,6 +336,8 @@ mod tests { let request_processor = JsonRpcRequestProcessor::new(Arc::new(bank)); let transactions_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + let exit = Arc::new(AtomicBool::new(false)); let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; @@ -291,6 +346,8 @@ mod tests { request_processor, transactions_addr, drone_addr, + rpc_addr, + exit, }; let req = format!( @@ -351,6 +408,8 @@ mod tests { request_processor: JsonRpcRequestProcessor::new(Arc::new(bank)), transactions_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + exit: Arc::new(AtomicBool::new(false)), }; let res = io.handle_request_sync(req, meta); @@ -376,6 +435,8 @@ mod tests { request_processor: JsonRpcRequestProcessor::new(Arc::new(bank)), transactions_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), + exit: Arc::new(AtomicBool::new(false)), }; let res = io.handle_request_sync(req, meta); diff --git a/src/thin_client.rs b/src/thin_client.rs index 0115c13a4a..a46a0eab28 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -455,6 +455,7 @@ mod tests { } #[test] + #[ignore] fn test_thin_client() { logger::setup(); let leader_keypair = Keypair::new();