diff --git a/cli/src/cluster_query.rs b/cli/src/cluster_query.rs index 0bfeb478e1..e22d7a2bd4 100644 --- a/cli/src/cluster_query.rs +++ b/cli/src/cluster_query.rs @@ -10,9 +10,10 @@ use solana_clap_utils::{ commitment::commitment_arg, input_parsers::*, input_validators::*, keypair::signer_from_path, }; use solana_client::{ - pubsub_client::{PubsubClient, SlotInfoMessage}, + pubsub_client::PubsubClient, rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient}, rpc_config::{RpcLargestAccountsConfig, RpcLargestAccountsFilter}, + rpc_response::SlotInfo, }; use solana_remote_wallet::remote_wallet::RemoteWalletManager; use solana_sdk::{ @@ -1033,7 +1034,7 @@ pub fn process_live_slots(url: &str) -> ProcessResult { })?; */ - let mut current: Option = None; + let mut current: Option = None; let mut message = "".to_string(); let slot_progress = new_spinner_progress_bar(); diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index 116c145bbe..daa9bac12a 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -1,13 +1,12 @@ +use crate::rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotInfo}; use log::*; -use serde::{de::DeserializeOwned, Deserialize}; +use serde::de::DeserializeOwned; use serde_json::{ json, value::Value::{Number, Object}, Map, Value, }; -use solana_sdk::{ - commitment_config::CommitmentConfig, signature::Signature, transaction::TransactionError, -}; +use solana_sdk::signature::Signature; use std::{ marker::PhantomData, sync::{ @@ -21,7 +20,7 @@ use thiserror::Error; use tungstenite::{client::AutoStream, connect, Message, WebSocket}; use url::{ParseError, Url}; -type PubsubSignatureResponse = PubsubClientSubscription>; +type PubsubSignatureResponse = PubsubClientSubscription>; #[derive(Debug, Error)] pub enum PubsubClientError { @@ -38,45 +37,6 @@ pub enum PubsubClientError { UnexpectedMessageError, } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -#[serde(rename_all = "camelCase", tag = "type", content = "result")] -pub enum SignatureResult { - ProcessedSignatureResult(ProcessedSignatureResult), - ReceivedSignature, -} - -#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)] -pub struct RpcResponseContext { - pub slot: u64, -} - -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct ProcessedSignatureResult { - pub err: Option, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct RpcResponse { - pub context: RpcResponseContext, - pub value: T, -} - -#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)] -pub struct SlotInfoMessage { - pub parent: u64, - pub root: u64, - pub slot: u64, -} - -#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct RpcSignatureSubscribeConfig { - #[serde(flatten)] - pub commitment: Option, - pub enable_received_notification: Option, -} - pub struct PubsubClientSubscription where T: DeserializeOwned, @@ -186,22 +146,16 @@ pub struct PubsubClient {} impl PubsubClient { pub fn slot_subscribe( url: &str, - ) -> Result< - ( - PubsubClientSubscription, - Receiver, - ), - PubsubClientError, - > { + ) -> Result<(PubsubClientSubscription, Receiver), PubsubClientError> { let url = Url::parse(url)?; let (socket, _response) = connect(url)?; - let (sender, receiver) = channel::(); + let (sender, receiver) = channel::(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); let exit = Arc::new(AtomicBool::new(false)); let exit_clone = exit.clone(); - let subscription_id = PubsubClientSubscription::::send_subscribe( + let subscription_id = PubsubClientSubscription::::send_subscribe( &socket_clone, json!({ "jsonrpc":"2.0","id":1,"method":format!("{}Subscribe", SLOT_OPERATION),"params":[] @@ -216,11 +170,11 @@ impl PubsubClient { break; } - let message: Result = + let message: Result = PubsubClientSubscription::read_message(&socket_clone); if let Ok(msg) = message { - match sender.send(msg.clone()) { + match sender.send(msg) { Ok(_) => (), Err(err) => { info!("receive error: {:?}", err); @@ -236,7 +190,7 @@ impl PubsubClient { info!("websocket - exited receive loop"); }); - let result: PubsubClientSubscription = PubsubClientSubscription { + let result: PubsubClientSubscription = PubsubClientSubscription { message_type: PhantomData, operation: SLOT_OPERATION, socket, @@ -254,13 +208,13 @@ impl PubsubClient { ) -> Result< ( PubsubSignatureResponse, - Receiver>, + Receiver>, ), PubsubClientError, > { let url = Url::parse(url)?; let (socket, _response) = connect(url)?; - let (sender, receiver) = channel::>(); + let (sender, receiver) = channel::>(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); @@ -277,7 +231,7 @@ impl PubsubClient { }) .to_string(); let subscription_id = - PubsubClientSubscription::>::send_subscribe( + PubsubClientSubscription::>::send_subscribe( &socket_clone, body, ) @@ -289,7 +243,7 @@ impl PubsubClient { break; } - let message: Result, PubsubClientError> = + let message: Result, PubsubClientError> = PubsubClientSubscription::read_message(&socket_clone); if let Ok(msg) = message { @@ -309,7 +263,7 @@ impl PubsubClient { info!("websocket - exited receive loop"); }); - let result: PubsubClientSubscription> = + let result: PubsubClientSubscription> = PubsubClientSubscription { message_type: PhantomData, operation: SIGNATURE_OPERATION, diff --git a/client/src/rpc_response.rs b/client/src/rpc_response.rs index afd0e1d7f2..bcafe19238 100644 --- a/client/src/rpc_response.rs +++ b/client/src/rpc_response.rs @@ -94,11 +94,18 @@ pub struct RpcKeyedAccount { pub account: UiAccount, } +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)] +pub struct SlotInfo { + pub slot: Slot, + pub parent: Slot, + pub root: Slot, +} + #[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(rename_all = "camelCase", tag = "type", content = "result")] +#[serde(rename_all = "camelCase", untagged)] pub enum RpcSignatureResult { - ProcessedSignatureResult(ProcessedSignatureResult), - ReceivedSignature, + ProcessedSignature(ProcessedSignatureResult), + ReceivedSignature(ReceivedSignatureResult), } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -107,6 +114,12 @@ pub struct ProcessedSignatureResult { pub err: Option, } +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub enum ReceivedSignatureResult { + ReceivedSignature, +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct RpcContactInfo { /// Pubkey of the node as a base-58 string diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 4a0a1148dd..0808746df2 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -1,13 +1,13 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::rpc_subscriptions::{RpcSubscriptions, RpcVote, SlotInfo}; +use crate::rpc_subscriptions::{RpcSubscriptions, RpcVote}; use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use solana_account_decoder::UiAccount; use solana_client::{ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, - rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcSignatureResult}, + rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcSignatureResult, SlotInfo}, }; #[cfg(test)] use solana_runtime::bank_forks::BankForks; @@ -362,7 +362,7 @@ mod tests { use serial_test_derive::serial; use solana_account_decoder::{parse_account_data::parse_account_data, UiAccountEncoding}; use solana_budget_program::{self, budget_instruction}; - use solana_client::rpc_response::ProcessedSignatureResult; + use solana_client::rpc_response::{ProcessedSignatureResult, ReceivedSignatureResult}; use solana_runtime::{ bank::Bank, bank_forks::BankForks, @@ -444,7 +444,7 @@ mod tests { // Test signature confirmation notification let (response, _) = robust_poll_or_panic(receiver); let expected_res = - RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None }); + RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None }); let expected = json!({ "jsonrpc": "2.0", "method": "signatureNotification", @@ -476,7 +476,8 @@ mod tests { .notify_signatures_received((received_slot, vec![tx.signatures[0]])); // Test signature confirmation notification let (response, _) = robust_poll_or_panic(receiver); - let expected_res = RpcSignatureResult::ReceivedSignature; + let expected_res = + RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature); let expected = json!({ "jsonrpc": "2.0", "method": "signatureNotification", diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index bce1a7c080..eda4b5fdaf 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -13,7 +13,8 @@ use solana_client::{ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, rpc_filter::RpcFilterType, rpc_response::{ - ProcessedSignatureResult, Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult, + ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcKeyedAccount, + RpcResponseContext, RpcSignatureResult, SlotInfo, }, }; use solana_runtime::{ @@ -47,13 +48,6 @@ use tokio_01::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor}; const RECEIVE_DELAY_MILLIS: u64 = 100; -#[derive(Serialize, Deserialize, Clone, Copy, Debug)] -pub struct SlotInfo { - pub slot: Slot, - pub parent: Slot, - pub root: Slot, -} - // A more human-friendly version of Vote, with the bank state signature base58 encoded. #[derive(Serialize, Deserialize, Debug)] pub struct RpcVote { @@ -291,9 +285,7 @@ fn filter_signature_result( ) -> (Box>, Slot) { ( Box::new(result.into_iter().map(|result| { - RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { - err: result.err(), - }) + RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: result.err() }) })), last_notified_slot, ) @@ -986,7 +978,9 @@ impl RpcSubscriptions { context: RpcResponseContext { slot: *received_slot, }, - value: RpcSignatureResult::ReceivedSignature, + value: RpcSignatureResult::ReceivedSignature( + ReceivedSignatureResult::ReceivedSignature, + ), }, &sink, ); @@ -1371,8 +1365,9 @@ pub(crate) mod tests { .notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]])); subscriptions.notify_subscribers(commitment_slots); let expected_res = - RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None }); - let received_expected_res = RpcSignatureResult::ReceivedSignature; + RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None }); + let received_expected_res = + RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature); struct Notification { slot: Slot, id: u64, diff --git a/core/tests/client.rs b/core/tests/client.rs index c67ffe6da8..bffe2e913d 100644 --- a/core/tests/client.rs +++ b/core/tests/client.rs @@ -1,7 +1,4 @@ -use solana_client::{ - pubsub_client::{PubsubClient, SlotInfoMessage}, - rpc_client::RpcClient, -}; +use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo}; use solana_core::{ rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, validator::TestValidator, @@ -105,7 +102,7 @@ fn test_slot_subscription() { let (mut client, receiver) = PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap(); - let mut errors: Vec<(SlotInfoMessage, SlotInfoMessage)> = Vec::new(); + let mut errors: Vec<(SlotInfo, SlotInfo)> = Vec::new(); for i in 0..3 { subscriptions.notify_slot(i + 1, i, i); @@ -114,7 +111,7 @@ fn test_slot_subscription() { match maybe_actual { Ok(actual) => { - let expected = SlotInfoMessage { + let expected = SlotInfo { slot: i + 1, parent: i, root: i, diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index 8c52ef5de9..59cc32ff36 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -285,7 +285,7 @@ fn test_rpc_subscriptions() { let timeout = deadline.saturating_duration_since(Instant::now()); match status_receiver.recv_timeout(timeout) { Ok((sig, result)) => { - if let RpcSignatureResult::ProcessedSignatureResult(result) = result.value { + if let RpcSignatureResult::ProcessedSignature(result) = result.value { assert!(result.err.is_none()); assert!(signature_set.remove(&sig)); } else { diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index a30bc00ce2..b5ec085313 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -3,8 +3,7 @@ use gag::BufferRedirect; use log::*; use serial_test_derive::serial; use solana_client::{ - pubsub_client::{PubsubClient, SignatureResult}, - rpc_client::RpcClient, + pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::RpcSignatureResult, thin_client::create_client, }; use solana_core::{ @@ -179,11 +178,11 @@ fn test_local_cluster_signature_subscribe() { let mut should_break = false; for response in responses { match response.value { - SignatureResult::ProcessedSignatureResult(_) => { + RpcSignatureResult::ProcessedSignature(_) => { should_break = true; break; } - SignatureResult::ReceivedSignature => { + RpcSignatureResult::ReceivedSignature(_) => { got_received_notification = true; } }