From a6a8a712e54564617fd7cf335f3ed1aa1bd7b84e Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 26 Dec 2021 21:04:56 +0300 Subject: [PATCH] Add async WebSocket PubsubClient --- client-test/Cargo.toml | 1 + client-test/tests/client.rs | 83 ++++++++ client/Cargo.toml | 7 + client/src/lib.rs | 2 + client/src/pubsub_client.rs | 2 +- client/src/pubsub_client_async.rs | 323 ++++++++++++++++++++++++++++++ 6 files changed, 417 insertions(+), 1 deletion(-) create mode 100644 client/src/pubsub_client_async.rs diff --git a/client-test/Cargo.toml b/client-test/Cargo.toml index b87247e27f..f10aa2a32b 100644 --- a/client-test/Cargo.toml +++ b/client-test/Cargo.toml @@ -27,6 +27,7 @@ solana-test-validator = { path = "../test-validator", version = "=1.10.0" } solana-transaction-status = { path = "../transaction-status", version = "=1.10.0" } solana-version = { path = "../version", version = "=1.10.0" } systemstat = "0.1.10" +tokio = { version = "1", features = ["full"] } [dev-dependencies] solana-logger = { path = "../logger", version = "=1.10.0" } diff --git a/client-test/tests/client.rs b/client-test/tests/client.rs index 21597ad185..a012a0924e 100644 --- a/client-test/tests/client.rs +++ b/client-test/tests/client.rs @@ -514,3 +514,86 @@ fn test_slot_subscription() { assert_eq!(errors, [].to_vec()); } + +#[tokio::test] +#[serial] +async fn test_slot_subscription_async() { + use {futures_util::StreamExt, solana_client::pubsub_client_async::PubsubClient}; + + let sync_service = Arc::new(AtomicU64::new(0)); + let sync_client = Arc::clone(&sync_service); + fn wait_until(atomic: &Arc, value: u64) { + while atomic.load(Ordering::Relaxed) != value { + sleep(Duration::from_millis(1)) + } + } + + let pubsub_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + rpc_port::DEFAULT_RPC_PUBSUB_PORT, + ); + + tokio::task::spawn_blocking(move || { + let exit = Arc::new(AtomicBool::new(false)); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( + &exit, + max_complete_transaction_status_slot, + bank_forks, + Arc::new(RwLock::new(BlockCommitmentCache::default())), + optimistically_confirmed_bank, + )); + let (trigger, pubsub_service) = + PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); + sleep(Duration::from_millis(100)); + sync_service.store(1, Ordering::Relaxed); + + wait_until(&sync_service, 2); + subscriptions.notify_slot(1, 0, 0); + sync_service.store(3, Ordering::Relaxed); + + wait_until(&sync_service, 4); + subscriptions.notify_slot(2, 1, 1); + sync_service.store(5, Ordering::Relaxed); + + wait_until(&sync_service, 6); + exit.store(true, Ordering::Relaxed); + trigger.cancel(); + pubsub_service.close().unwrap(); + }); + + wait_until(&sync_client, 1); + let url = format!("ws://0.0.0.0:{}/", pubsub_addr.port()); + let pubsub_client = PubsubClient::connect(&url).await.unwrap(); + let (mut notifications, unsubscribe) = pubsub_client.slot_subscribe().await.unwrap(); + sync_client.store(2, Ordering::Relaxed); + + wait_until(&sync_client, 3); + assert_eq!( + tokio::time::timeout(Duration::from_millis(25), notifications.next()).await, + Ok(Some(SlotInfo { + slot: 1, + parent: 0, + root: 0, + })) + ); + sync_client.store(4, Ordering::Relaxed); + + wait_until(&sync_client, 5); + assert_eq!( + tokio::time::timeout(Duration::from_millis(25), notifications.next()).await, + Ok(Some(SlotInfo { + slot: 2, + parent: 1, + root: 1, + })) + ); + sync_client.store(6, Ordering::Relaxed); + + unsubscribe().await; +} diff --git a/client/Cargo.toml b/client/Cargo.toml index 52752b49ea..16b5e17874 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -16,6 +16,7 @@ bincode = "1.3.3" bs58 = "0.4.0" clap = "2.33.0" crossbeam-channel = "0.5" +futures-util = { version = "0.3.19", optional = true } indicatif = "0.16.2" jsonrpc-core = "18.0.0" log = "0.4.14" @@ -36,6 +37,8 @@ solana-version = { path = "../version", version = "=1.10.0" } solana-vote-program = { path = "../programs/vote", version = "=1.10.0" } thiserror = "1.0" tokio = { version = "1", features = ["full"] } +tokio-stream = { version = "0.1.8", optional = true } +tokio-tungstenite = { version = "0.16.0", features = ["rustls-tls-webpki-roots"], optional = true } tungstenite = { version = "0.16.0", features = ["rustls-tls-webpki-roots"] } url = "2.2.2" @@ -46,3 +49,7 @@ solana-logger = { path = "../logger", version = "=1.10.0" } [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] + +[features] +default = ["async"] +async = ["futures-util", "tokio-stream", "tokio-tungstenite"] diff --git a/client/src/lib.rs b/client/src/lib.rs index 72cb649686..26930b1721 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -10,6 +10,8 @@ pub mod nonblocking; pub mod nonce_utils; pub mod perf_utils; pub mod pubsub_client; +#[cfg(feature = "async")] +pub mod pubsub_client_async; pub mod rpc_cache; pub mod rpc_client; pub mod rpc_config; diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index edb742e4d7..22d5182ae4 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -615,5 +615,5 @@ impl PubsubClient { #[cfg(test)] mod tests { - // see core/tests/client.rs#test_slot_subscription() + // see client-test/test/client.rs } diff --git a/client/src/pubsub_client_async.rs b/client/src/pubsub_client_async.rs new file mode 100644 index 0000000000..4750050ae0 --- /dev/null +++ b/client/src/pubsub_client_async.rs @@ -0,0 +1,323 @@ +use { + crate::{ + rpc_config::{ + RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, + RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, + RpcTransactionLogsFilter, + }, + rpc_response::{ + Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse, + RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate, + }, + }, + futures_util::{ + future::{ready, BoxFuture, FutureExt}, + sink::SinkExt, + stream::{BoxStream, StreamExt}, + }, + serde::de::DeserializeOwned, + serde_json::{json, Map, Value}, + solana_account_decoder::UiAccount, + solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}, + std::collections::BTreeMap, + thiserror::Error, + tokio::{ + net::TcpStream, + sync::{mpsc, oneshot}, + task::JoinHandle, + time::{sleep, Duration}, + }, + tokio_stream::wrappers::UnboundedReceiverStream, + tokio_tungstenite::{ + connect_async, + tungstenite::{ + protocol::frame::{coding::CloseCode, CloseFrame}, + Message, + }, + MaybeTlsStream, WebSocketStream, + }, + url::Url, +}; + +pub type PubsubClientResult = Result; + +#[derive(Debug, Error)] +pub enum PubsubClientError { + #[error("url parse error")] + UrlParseError(#[from] url::ParseError), + + #[error("unable to connect to server")] + ConnectionError(tokio_tungstenite::tungstenite::Error), + #[error("websocket error")] + WsError(#[from] tokio_tungstenite::tungstenite::Error), + #[error("connection closed")] + ConnectionClosed, + + #[error("json parse error")] + JsonParseError(#[from] serde_json::error::Error), + #[error("subscribe failed: {reason}")] + SubscribeFailed { + reason: &'static str, + message: String, + }, +} + +type UnsubscribeFn = Box BoxFuture<'static, ()> + Send>; +type SubscribeResponseMsg = (mpsc::UnboundedReceiver, UnsubscribeFn); +type SubscribeRequestMsg = (String, Value, oneshot::Sender); +type SubscribeResult<'a, T> = PubsubClientResult<(BoxStream<'a, T>, UnsubscribeFn)>; + +#[derive(Debug)] +pub struct PubsubClient { + subscribe_tx: mpsc::UnboundedSender, + shutdown_tx: oneshot::Sender<()>, + ws: JoinHandle, +} + +impl PubsubClient { + pub async fn connect(url: &str) -> PubsubClientResult { + let url = Url::parse(url)?; + let (ws, _response) = connect_async(url) + .await + .map_err(PubsubClientError::ConnectionError)?; + + let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + Ok(Self { + subscribe_tx, + shutdown_tx, + ws: tokio::spawn(PubsubClient::run_ws(ws, subscribe_rx, shutdown_rx)), + }) + } + + pub async fn shutdown(self) -> PubsubClientResult { + let _ = self.shutdown_tx.send(()); + self.ws.await.unwrap() // WS future should not be cancelled or panicked + } + + async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T> + where + T: DeserializeOwned + Send + 'a, + { + let (response_tx, response_rx) = oneshot::channel(); + self.subscribe_tx + .send((operation.to_string(), params, response_tx)) + .map_err(|_| PubsubClientError::ConnectionClosed)?; + let (notifications, unsubscribe) = response_rx + .await + .map_err(|_| PubsubClientError::ConnectionClosed)?; + Ok(( + UnboundedReceiverStream::new(notifications) + .filter_map(|value| ready(serde_json::from_value::(value).ok())) + .boxed(), + unsubscribe, + )) + } + + pub async fn account_subscribe( + &self, + pubkey: &Pubkey, + config: Option, + ) -> SubscribeResult<'_, RpcResponse> { + let params = json!([pubkey.to_string(), config]); + self.subscribe("account", params).await + } + + pub async fn block_subscribe( + &self, + filter: RpcBlockSubscribeFilter, + config: Option, + ) -> SubscribeResult<'_, RpcResponse> { + self.subscribe("block", json!([filter, config])).await + } + + pub async fn logs_subscribe( + &self, + filter: RpcTransactionLogsFilter, + config: RpcTransactionLogsConfig, + ) -> SubscribeResult<'_, RpcResponse> { + self.subscribe("logs", json!([filter, config])).await + } + + pub async fn program_subscribe( + &self, + pubkey: &Pubkey, + config: Option, + ) -> SubscribeResult<'_, RpcResponse> { + let params = json!([pubkey.to_string(), config]); + self.subscribe("program", params).await + } + + pub async fn vote_subscribe(&self) -> SubscribeResult<'_, RpcVote> { + self.subscribe("vote", json!([])).await + } + + pub async fn root_subscribe(&self) -> SubscribeResult<'_, Slot> { + self.subscribe("root", json!([])).await + } + + pub async fn signature_subscribe( + &self, + signature: &Signature, + config: Option, + ) -> SubscribeResult<'_, RpcResponse> { + let params = json!([signature.to_string(), config]); + self.subscribe("signature", params).await + } + + pub async fn slot_subscribe(&self) -> SubscribeResult<'_, SlotInfo> { + self.subscribe("slot", json!([])).await + } + + pub async fn slot_updates_subscribe(&self) -> SubscribeResult<'_, SlotUpdate> { + self.subscribe("slotsUpdates", json!([])).await + } + + async fn run_ws( + mut ws: WebSocketStream>, + mut subscribe_rx: mpsc::UnboundedReceiver, + mut shutdown_rx: oneshot::Receiver<()>, + ) -> PubsubClientResult { + let mut request_id: u64 = 0; + + let mut requests_subscribe = BTreeMap::new(); + let mut requests_unsubscribe = BTreeMap::>::new(); + let mut subscriptions = BTreeMap::new(); + let (unsubscribe_tx, mut unsubscribe_rx) = mpsc::unbounded_channel(); + + loop { + tokio::select! { + // Send close on shutdown signal + _ = (&mut shutdown_rx) => { + let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() }; + ws.send(Message::Close(Some(frame))).await?; + ws.flush().await?; + break; + }, + // Send `Message::Ping` each 10s if no any other communication + () = sleep(Duration::from_secs(10)) => { + ws.send(Message::Ping(Vec::new())).await?; + }, + // Read message for subscribe + Some((operation, params, response_tx)) = subscribe_rx.recv() => { + request_id += 1; + let method = format!("{}Subscribe", operation); + let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string(); + ws.send(Message::Text(text)).await?; + requests_subscribe.insert(request_id, (operation, response_tx)); + }, + // Read message for unsubscribe + Some((operation, sid, response_tx)) = unsubscribe_rx.recv() => { + subscriptions.remove(&sid); + request_id += 1; + let method = format!("{}Unsubscribe", operation); + let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string(); + ws.send(Message::Text(text)).await?; + requests_unsubscribe.insert(request_id, response_tx); + }, + // Read incoming WebSocket message + next_msg = ws.next() => { + let msg = match next_msg { + Some(msg) => msg?, + None => break, + }; + + // Get text from the message + let text = match msg { + Message::Text(text) => text, + Message::Binary(_data) => continue, // Ignore + Message::Ping(data) => { + ws.send(Message::Pong(data)).await?; + continue + }, + Message::Pong(_data) => continue, + Message::Close(_frame) => break, + }; + let mut json: Map = serde_json::from_str(&text)?; + + // Subscribe/Unsubscribe response, example: + // `{"jsonrpc":"2.0","result":5308752,"id":1}` + if let Some(id) = json.get("id") { + // Request Id + let id = id.as_u64().ok_or_else(|| { + PubsubClientError::SubscribeFailed { reason: "invalid `id` field", message: text.clone() } + })?; + + // Check that response is unsubscribe + if let Some(response_tx) = requests_unsubscribe.remove(&id) { + let _ = response_tx.send(()); // do not care if receiver is closed + } else { + // Subscribe Id + let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| { + PubsubClientError::SubscribeFailed { reason: "invalid `result` field", message: text.clone() } + })?; + + // Get subscribe request details + let (operation, response_tx) = requests_subscribe.remove(&id).ok_or_else(|| { + PubsubClientError::SubscribeFailed { reason: "request for received `id` not found", message: text.clone() } + })?; + + // Create notifications channel and unsubscribe function + let (notifications_tx, notifications_rx) = mpsc::unbounded_channel(); + let unsubscribe_tx = unsubscribe_tx.clone(); + let unsubscribe = Box::new(move || async move { + let (response_tx, response_rx) = oneshot::channel(); + // do nothing if ws already closed + if unsubscribe_tx.send((operation, sid, response_tx)).is_ok() { + let _ = response_rx.await; // channel can be closed only if ws is closed + } + }.boxed()); + + // Resolve subscribe request + match response_tx.send((notifications_rx, unsubscribe)) { + Ok(()) => { + subscriptions.insert(sid, notifications_tx); + } + Err((_notifications_rx, unsubscribe)) => { + unsubscribe(); + } + }; + } + + continue; + } + + // Notification, example: + // `{"jsonrpc":"2.0","method":"logsNotification","params":{"result":{...},"subscription":3114862}}` + if let Some(Value::Object(params)) = json.get_mut("params") { + if let Some(sid) = params.get("subscription").and_then(Value::as_u64) { + let mut unsubscribe_required = false; + + if let Some(notifications_tx) = subscriptions.get(&sid) { + if let Some(result) = params.remove("result") { + if notifications_tx.send(result).is_err() { + unsubscribe_required = true; + } + } + } else { + unsubscribe_required = true; + } + + if unsubscribe_required { + if let Some(Value::String(method)) = json.remove("method") { + if let Some(operation) = method.strip_suffix("Notification") { + let (response_tx, _response_rx) = oneshot::channel(); + let _ = unsubscribe_tx.send((operation.to_string(), sid, response_tx)); + } + } + } + } + } + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + // see client-test/test/client.rs +}