diff --git a/client/src/generic_rpc_client_request.rs b/client/src/generic_rpc_client_request.rs new file mode 100644 index 0000000000..fad8cef426 --- /dev/null +++ b/client/src/generic_rpc_client_request.rs @@ -0,0 +1,10 @@ +use crate::rpc_request::RpcRequest; + +pub(crate) trait GenericRpcClientRequest { + fn send( + &self, + request: &RpcRequest, + params: Option, + retries: usize, + ) -> Result>; +} diff --git a/client/src/lib.rs b/client/src/lib.rs index 527164335b..4398b15888 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,4 +1,6 @@ -pub mod rpc_mock; +mod generic_rpc_client_request; +pub mod mock_rpc_client_request; +pub mod rpc_client_request; pub mod rpc_request; pub mod rpc_signature_status; pub mod thin_client; diff --git a/client/src/rpc_mock.rs b/client/src/mock_rpc_client_request.rs similarity index 60% rename from client/src/rpc_mock.rs rename to client/src/mock_rpc_client_request.rs index f99a266e0f..fa119b92ed 100644 --- a/client/src/rpc_mock.rs +++ b/client/src/mock_rpc_client_request.rs @@ -1,42 +1,28 @@ -// Implementation of RpcRequestHandler trait for testing Rpc requests without i/o - -use crate::rpc_request::{RpcRequest, RpcRequestHandler}; -use serde_json::{json, Number, Value}; -use solana_sdk::pubkey::Pubkey; -use std::error; +use crate::generic_rpc_client_request::GenericRpcClientRequest; +use crate::rpc_request::RpcRequest; +use serde_json::{Number, Value}; pub const PUBKEY: &str = "7RoSF9fUmdphVCpabEoefH81WwrW7orsWonXWqTXkKV8"; pub const SIGNATURE: &str = "43yNSFC6fYTuPgTNFFhF4axw7AfWxB2BPdurme8yrsWEYwm8299xh8n6TAHjGymiSub1XtyxTNyd9GBfY2hxoBw8"; -#[derive(Clone)] -pub struct MockRpcClient { - pub url: String, +pub struct MockRpcClientRequest { + url: String, } -impl MockRpcClient { +impl MockRpcClientRequest { pub fn new(url: String) -> Self { - MockRpcClient { url } + Self { url } } +} - pub fn retry_get_balance( - &self, - pubkey: &Pubkey, - retries: usize, - ) -> Result, Box> { - let params = json!([format!("{}", pubkey)]); - let res = self - .retry_make_rpc_request(&RpcRequest::GetBalance, Some(params), retries)? - .as_u64(); - Ok(res) - } - - pub fn retry_make_rpc_request( +impl GenericRpcClientRequest for MockRpcClientRequest { + fn send( &self, request: &RpcRequest, - params: Option, - mut _retries: usize, - ) -> Result> { + params: Option, + _retries: usize, + ) -> Result> { if self.url == "fails" { return Ok(Value::Null); } @@ -74,13 +60,3 @@ impl MockRpcClient { Ok(val) } } - -impl RpcRequestHandler for MockRpcClient { - fn make_rpc_request( - &self, - request: RpcRequest, - params: Option, - ) -> Result> { - self.retry_make_rpc_request(&request, params, 0) - } -} diff --git a/client/src/rpc_client_request.rs b/client/src/rpc_client_request.rs new file mode 100644 index 0000000000..feb02a17af --- /dev/null +++ b/client/src/rpc_client_request.rs @@ -0,0 +1,81 @@ +use crate::generic_rpc_client_request::GenericRpcClientRequest; +use crate::rpc_request::{RpcError, RpcRequest}; +use log::*; +use reqwest; +use reqwest::header::CONTENT_TYPE; +use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND}; +use std::thread::sleep; +use std::time::Duration; + +pub struct RpcClientRequest { + client: reqwest::Client, + url: String, +} + +impl RpcClientRequest { + pub fn new(url: String) -> Self { + Self { + client: reqwest::Client::new(), + url, + } + } + + pub fn new_with_timeout(url: String, timeout: Duration) -> Self { + let client = reqwest::Client::builder() + .timeout(timeout) + .build() + .expect("build rpc client"); + + Self { client, url } + } +} + +impl GenericRpcClientRequest for RpcClientRequest { + fn send( + &self, + request: &RpcRequest, + params: Option, + mut retries: usize, + ) -> Result> { + // Concurrent requests are not supported so reuse the same request id for all requests + let request_id = 1; + + let request_json = request.build_request_json(request_id, params); + + loop { + match self + .client + .post(&self.url) + .header(CONTENT_TYPE, "application/json") + .body(request_json.to_string()) + .send() + { + Ok(mut response) => { + let json: serde_json::Value = serde_json::from_str(&response.text()?)?; + if json["error"].is_object() { + Err(RpcError::RpcRequestError(format!( + "RPC Error response: {}", + serde_json::to_string(&json["error"]).unwrap() + )))? + } + return Ok(json["result"].clone()); + } + Err(e) => { + info!( + "make_rpc_request() failed, {} retries left: {:?}", + retries, e + ); + if retries == 0 { + Err(e)?; + } + retries -= 1; + + // Sleep for approximately half a slot + sleep(Duration::from_millis( + 500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND, + )); + } + } + } + } +} diff --git a/client/src/rpc_request.rs b/client/src/rpc_request.rs index 94af49a766..6f440a272e 100644 --- a/client/src/rpc_request.rs +++ b/client/src/rpc_request.rs @@ -1,46 +1,47 @@ +use crate::generic_rpc_client_request::GenericRpcClientRequest; +use crate::mock_rpc_client_request::MockRpcClientRequest; +use crate::rpc_client_request::RpcClientRequest; use bs58; use log::*; -use reqwest; -use reqwest::header::CONTENT_TYPE; use serde_json::{json, Value}; use solana_sdk::account::Account; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; -use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND}; use std::io; use std::net::SocketAddr; use std::thread::sleep; use std::time::{Duration, Instant}; use std::{error, fmt}; -#[derive(Clone)] pub struct RpcClient { - pub client: reqwest::Client, - pub url: String, + client: Box, } impl RpcClient { pub fn new(url: String) -> Self { - RpcClient { - client: reqwest::Client::new(), - url, + Self { + client: Box::new(RpcClientRequest::new(url)), } } - pub fn new_socket_with_timeout(addr: SocketAddr, timeout: Duration) -> Self { - let url = get_rpc_request_str(addr, false); - let client = reqwest::Client::builder() - .timeout(timeout) - .build() - .expect("build rpc client"); - RpcClient { client, url } + pub fn new_mock(url: String) -> Self { + Self { + client: Box::new(MockRpcClientRequest::new(url)), + } } pub fn new_socket(addr: SocketAddr) -> Self { Self::new(get_rpc_request_str(addr, false)) } + pub fn new_socket_with_timeout(addr: SocketAddr, timeout: Duration) -> Self { + let url = get_rpc_request_str(addr, false); + Self { + client: Box::new(RpcClientRequest::new_with_timeout(url, timeout)), + } + } + pub fn retry_get_balance( &self, pubkey: &Pubkey, @@ -48,14 +49,17 @@ impl RpcClient { ) -> Result, Box> { let params = json!([format!("{}", pubkey)]); let res = self - .retry_make_rpc_request(&RpcRequest::GetBalance, Some(params), retries)? + .client + .send(&RpcRequest::GetBalance, Some(params), retries)? .as_u64(); Ok(res) } pub fn get_account_data(&self, pubkey: &Pubkey) -> io::Result> { let params = json!([format!("{}", pubkey)]); - let response = self.make_rpc_request(RpcRequest::GetAccountInfo, Some(params)); + let response = self + .client + .send(&RpcRequest::GetAccountInfo, Some(params), 0); match response { Ok(account_json) => { let account: Account = @@ -77,7 +81,9 @@ impl RpcClient { /// by the network, this method will hang indefinitely. pub fn get_balance(&self, pubkey: &Pubkey) -> io::Result { let params = json!([format!("{}", pubkey)]); - let response = self.make_rpc_request(RpcRequest::GetAccountInfo, Some(params)); + let response = self + .client + .send(&RpcRequest::GetAccountInfo, Some(params), 0); response .and_then(|account_json| { @@ -98,7 +104,7 @@ impl RpcClient { pub fn transaction_count(&self) -> u64 { debug!("transaction_count"); for _tries in 0..5 { - let response = self.make_rpc_request(RpcRequest::GetTransactionCount, None); + let response = self.client.send(&RpcRequest::GetTransactionCount, None, 0); match response { Ok(value) => { @@ -118,7 +124,7 @@ impl RpcClient { /// Returns the blockhash Hash or None if there was no response from the server. pub fn try_get_recent_blockhash(&self, mut num_retries: u64) -> Option { loop { - let response = self.make_rpc_request(RpcRequest::GetRecentBlockhash, None); + let response = self.client.send(&RpcRequest::GetRecentBlockhash, None, 0); match response { Ok(value) => { @@ -213,7 +219,8 @@ impl RpcClient { loop { let response = - self.make_rpc_request(RpcRequest::ConfirmTransaction, Some(params.clone())); + self.client + .send(&RpcRequest::ConfirmTransaction, Some(params.clone()), 0); match response { Ok(confirmation) => { @@ -234,7 +241,8 @@ impl RpcClient { } pub fn fullnode_exit(&self) -> io::Result { let response = self - .make_rpc_request(RpcRequest::FullnodeExit, None) + .client + .send(&RpcRequest::FullnodeExit, None, 0) .map_err(|err| { io::Error::new( io::ErrorKind::Other, @@ -249,52 +257,14 @@ impl RpcClient { }) } + // TODO: Remove pub fn retry_make_rpc_request( &self, request: &RpcRequest, params: Option, - mut retries: usize, + retries: usize, ) -> Result> { - // Concurrent requests are not supported so reuse the same request id for all requests - let request_id = 1; - - let request_json = request.build_request_json(request_id, params); - - loop { - match self - .client - .post(&self.url) - .header(CONTENT_TYPE, "application/json") - .body(request_json.to_string()) - .send() - { - Ok(mut response) => { - let json: Value = serde_json::from_str(&response.text()?)?; - if json["error"].is_object() { - Err(RpcError::RpcRequestError(format!( - "RPC Error response: {}", - serde_json::to_string(&json["error"]).unwrap() - )))? - } - return Ok(json["result"].clone()); - } - Err(e) => { - info!( - "make_rpc_request() failed, {} retries left: {:?}", - retries, e - ); - if retries == 0 { - Err(e)?; - } - retries -= 1; - - // Sleep for approximately half a slot - sleep(Duration::from_millis( - 500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND, - )); - } - } - } + self.client.send(request, params, retries) } } @@ -306,24 +276,6 @@ pub fn get_rpc_request_str(rpc_addr: SocketAddr, tls: bool) -> String { } } -pub trait RpcRequestHandler { - fn make_rpc_request( - &self, - request: RpcRequest, - params: Option, - ) -> Result>; -} - -impl RpcRequestHandler for RpcClient { - fn make_rpc_request( - &self, - request: RpcRequest, - params: Option, - ) -> Result> { - self.retry_make_rpc_request(&request, params, 0) - } -} - #[derive(Debug, PartialEq)] pub enum RpcRequest { ConfirmTransaction, @@ -344,7 +296,7 @@ pub enum RpcRequest { } impl RpcRequest { - fn build_request_json(&self, id: u64, params: Option) -> Value { + pub(crate) fn build_request_json(&self, id: u64, params: Option) -> Value { let jsonrpc = "2.0"; let method = match self { RpcRequest::ConfirmTransaction => "confirmTransaction", @@ -470,21 +422,25 @@ mod tests { let rpc_addr = receiver.recv().unwrap(); let rpc_client = RpcClient::new_socket(rpc_addr); - let balance = rpc_client.make_rpc_request( - RpcRequest::GetBalance, + let balance = rpc_client.retry_make_rpc_request( + &RpcRequest::GetBalance, Some(json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx"])), + 0, ); assert_eq!(balance.unwrap().as_u64().unwrap(), 50); - let blockhash = rpc_client.make_rpc_request(RpcRequest::GetRecentBlockhash, None); + let blockhash = rpc_client.retry_make_rpc_request(&RpcRequest::GetRecentBlockhash, None, 0); assert_eq!( blockhash.unwrap().as_str().unwrap(), "deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx" ); // Send erroneous parameter - let blockhash = - rpc_client.make_rpc_request(RpcRequest::GetRecentBlockhash, Some(json!("paramter"))); + let blockhash = rpc_client.retry_make_rpc_request( + &RpcRequest::GetRecentBlockhash, + Some(json!("paramter")), + 0, + ); assert_eq!(blockhash.is_err(), true); } diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 00c1bbaff0..9cb5eb19b3 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -57,7 +57,7 @@ impl ThinClient { transactions_socket: UdpSocket, rpc_client: RpcClient, ) -> Self { - ThinClient { + Self { rpc_client, transactions_addr, transactions_socket, diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 8734c7b834..2ff3704cc1 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -12,9 +12,8 @@ use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT}; use crate::window_service::WindowService; use rand::thread_rng; use rand::Rng; -use solana_client::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; -use solana_client::thin_client::create_client; -use solana_client::thin_client::{retry_get_balance, ThinClient}; +use solana_client::rpc_request::{RpcClient, RpcRequest}; +use solana_client::thin_client::{create_client, retry_get_balance, ThinClient}; use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT}; use solana_sdk::hash::{Hash, Hasher}; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; @@ -357,11 +356,11 @@ impl Replicator { RpcClient::new_socket(rpc_peers[node_idx].rpc) }; let storage_blockhash = rpc_client - .make_rpc_request(RpcRequest::GetStorageBlockhash, None) + .retry_make_rpc_request(&RpcRequest::GetStorageBlockhash, None, 0) .expect("rpc request") .to_string(); let storage_entry_height = rpc_client - .make_rpc_request(RpcRequest::GetStorageEntryHeight, None) + .retry_make_rpc_request(&RpcRequest::GetStorageEntryHeight, None, 0) .expect("rpc request") .as_u64() .unwrap(); diff --git a/wallet/src/wallet.rs b/wallet/src/wallet.rs index 805d61a4bb..4f58fc6bb1 100644 --- a/wallet/src/wallet.rs +++ b/wallet/src/wallet.rs @@ -7,11 +7,7 @@ use serde_json; use serde_json::json; use solana_budget_api; use solana_budget_api::budget_transaction::BudgetTransaction; -#[cfg(test)] -use solana_client::rpc_mock::MockRpcClient as RpcClient; -#[cfg(not(test))] -use solana_client::rpc_request::RpcClient; -use solana_client::rpc_request::{get_rpc_request_str, RpcRequest}; +use solana_client::rpc_request::{get_rpc_request_str, RpcClient, RpcRequest}; use solana_client::rpc_signature_status::RpcSignatureStatus; #[cfg(not(test))] use solana_drone::drone::request_airdrop_transaction; @@ -693,12 +689,15 @@ pub fn process_command(config: &WalletConfig) -> ProcessResult { } let drone_addr = config.drone_addr(); + + let mut _rpc_client; let rpc_client = if config.rpc_client.is_none() { let rpc_addr = config.rpc_addr(); - RpcClient::new(rpc_addr) + _rpc_client = RpcClient::new(rpc_addr); + &_rpc_client } else { // Primarily for testing - config.rpc_client.clone().unwrap() + config.rpc_client.as_ref().unwrap() }; match config.command { @@ -1038,7 +1037,7 @@ mod tests { use clap::{App, Arg, ArgGroup, SubCommand}; use serde_json::Value; use solana::socketaddr; - use solana_client::rpc_mock::{PUBKEY, SIGNATURE}; + use solana_client::mock_rpc_client_request::{PUBKEY, SIGNATURE}; use solana_sdk::signature::{gen_keypair_file, read_keypair, read_pkcs8, Keypair, KeypairUtil}; use std::fs; use std::net::{Ipv4Addr, SocketAddr}; @@ -1494,7 +1493,7 @@ mod tests { fn test_wallet_process_command() { // Success cases let mut config = WalletConfig::default(); - config.rpc_client = Some(RpcClient::new("succeeds".to_string())); + config.rpc_client = Some(RpcClient::new_mock("succeeds".to_string())); let keypair = Keypair::new(); let pubkey = keypair.pubkey().to_string(); @@ -1589,7 +1588,7 @@ mod tests { config.command = WalletCommand::Airdrop(50); assert!(process_command(&config).is_err()); - config.rpc_client = Some(RpcClient::new("airdrop".to_string())); + config.rpc_client = Some(RpcClient::new_mock("airdrop".to_string())); config.command = WalletCommand::TimeElapsed(bob_pubkey, process_id, dt); let signature = process_command(&config); assert_eq!(signature.unwrap(), SIGNATURE.to_string()); @@ -1600,7 +1599,7 @@ mod tests { assert_eq!(signature.unwrap(), SIGNATURE.to_string()); // Failture cases - config.rpc_client = Some(RpcClient::new("fails".to_string())); + config.rpc_client = Some(RpcClient::new_mock("fails".to_string())); config.command = WalletCommand::Airdrop(50); assert!(process_command(&config).is_err()); @@ -1659,7 +1658,7 @@ mod tests { // Success case let mut config = WalletConfig::default(); - config.rpc_client = Some(RpcClient::new("succeeds".to_string())); + config.rpc_client = Some(RpcClient::new_mock("succeeds".to_string())); config.command = WalletCommand::Deploy(pathbuf.to_str().unwrap().to_string()); let result = process_command(&config); @@ -1675,7 +1674,7 @@ mod tests { assert_eq!(program_id_vec.len(), mem::size_of::()); // Failure cases - config.rpc_client = Some(RpcClient::new("airdrop".to_string())); + config.rpc_client = Some(RpcClient::new_mock("airdrop".to_string())); assert!(process_command(&config).is_err()); config.command = WalletCommand::Deploy("bad/file/location.so".to_string()); @@ -1708,7 +1707,7 @@ mod tests { #[test] fn test_wallet_get_recent_blockhash() { - let rpc_client = RpcClient::new("succeeds".to_string()); + let rpc_client = RpcClient::new_mock("succeeds".to_string()); let vec = bs58::decode(PUBKEY).into_vec().unwrap(); let expected_blockhash = Hash::new(&vec); @@ -1716,7 +1715,7 @@ mod tests { let blockhash = get_recent_blockhash(&rpc_client); assert_eq!(blockhash.unwrap(), expected_blockhash); - let rpc_client = RpcClient::new("fails".to_string()); + let rpc_client = RpcClient::new_mock("fails".to_string()); let blockhash = get_recent_blockhash(&rpc_client); assert!(blockhash.is_err()); @@ -1724,7 +1723,7 @@ mod tests { #[test] fn test_wallet_send_transaction() { - let rpc_client = RpcClient::new("succeeds".to_string()); + let rpc_client = RpcClient::new_mock("succeeds".to_string()); let key = Keypair::new(); let to = Keypair::new().pubkey(); @@ -1734,7 +1733,7 @@ mod tests { let signature = send_transaction(&rpc_client, &tx); assert_eq!(signature.unwrap(), SIGNATURE.to_string()); - let rpc_client = RpcClient::new("fails".to_string()); + let rpc_client = RpcClient::new_mock("fails".to_string()); let signature = send_transaction(&rpc_client, &tx); assert!(signature.is_err()); @@ -1742,17 +1741,17 @@ mod tests { #[test] fn test_wallet_confirm_transaction() { - let rpc_client = RpcClient::new("succeeds".to_string()); + let rpc_client = RpcClient::new_mock("succeeds".to_string()); let signature = "good_signature"; let status = confirm_transaction(&rpc_client, &signature); assert_eq!(status.unwrap(), RpcSignatureStatus::Confirmed); - let rpc_client = RpcClient::new("bad_sig_status".to_string()); + let rpc_client = RpcClient::new_mock("bad_sig_status".to_string()); let signature = "bad_status"; let status = confirm_transaction(&rpc_client, &signature); assert!(status.is_err()); - let rpc_client = RpcClient::new("fails".to_string()); + let rpc_client = RpcClient::new_mock("fails".to_string()); let signature = "bad_status_fmt"; let status = confirm_transaction(&rpc_client, &signature); assert!(status.is_err()); @@ -1760,7 +1759,7 @@ mod tests { #[test] fn test_wallet_send_and_confirm_transaction() { - let rpc_client = RpcClient::new("succeeds".to_string()); + let rpc_client = RpcClient::new_mock("succeeds".to_string()); let key = Keypair::new(); let to = Keypair::new().pubkey(); @@ -1770,18 +1769,18 @@ mod tests { let result = send_and_confirm_transaction(&rpc_client, &mut tx, &key); result.unwrap(); - let rpc_client = RpcClient::new("account_in_use".to_string()); + let rpc_client = RpcClient::new_mock("account_in_use".to_string()); let result = send_and_confirm_transaction(&rpc_client, &mut tx, &key); assert!(result.is_err()); - let rpc_client = RpcClient::new("fails".to_string()); + let rpc_client = RpcClient::new_mock("fails".to_string()); let result = send_and_confirm_transaction(&rpc_client, &mut tx, &key); assert!(result.is_err()); } #[test] fn test_wallet_resign_transaction() { - let rpc_client = RpcClient::new("succeeds".to_string()); + let rpc_client = RpcClient::new_mock("succeeds".to_string()); let key = Keypair::new(); let to = Keypair::new().pubkey(); @@ -1804,7 +1803,7 @@ mod tests { #[test] fn test_request_and_confirm_airdrop() { - let rpc_client = RpcClient::new("succeeds".to_string()); + let rpc_client = RpcClient::new_mock("succeeds".to_string()); let drone_addr = socketaddr!(0, 0); let pubkey = Keypair::new().pubkey(); let lamports = 50; diff --git a/wallet/tests/deploy.rs b/wallet/tests/deploy.rs index 21336e0e35..d0c5d37228 100644 --- a/wallet/tests/deploy.rs +++ b/wallet/tests/deploy.rs @@ -1,6 +1,6 @@ use serde_json::{json, Value}; use solana::fullnode::new_fullnode_for_tests; -use solana_client::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; +use solana_client::rpc_request::{RpcClient, RpcRequest}; use solana_drone::drone::run_local_drone; use solana_sdk::bpf_loader; use solana_wallet::wallet::{process_command, WalletCommand, WalletConfig}; @@ -47,7 +47,7 @@ fn test_wallet_deploy_program() { let params = json!([program_id_str]); let account_info = rpc_client - .make_rpc_request(RpcRequest::GetAccountInfo, Some(params)) + .retry_make_rpc_request(&RpcRequest::GetAccountInfo, Some(params), 0) .unwrap(); let account_info_obj = account_info.as_object().unwrap(); assert_eq!(