From 8fe8a5717eae97b143e73ac54e243a26016c279b Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 25 May 2020 11:41:46 -0700 Subject: [PATCH] Clean up RPCClient retry handling: only retry on 429, after a little sleep (#10182) (#10184) automerge --- archiver-lib/src/archiver.rs | 7 +- client/src/generic_rpc_client_request.rs | 7 +- client/src/mock_rpc_client_request.rs | 7 +- client/src/rpc_client.rs | 124 ++++++----------------- client/src/rpc_client_request.rs | 37 ++++--- 5 files changed, 51 insertions(+), 131 deletions(-) diff --git a/archiver-lib/src/archiver.rs b/archiver-lib/src/archiver.rs index 56471fc556..9812c092f2 100644 --- a/archiver-lib/src/archiver.rs +++ b/archiver-lib/src/archiver.rs @@ -699,7 +699,6 @@ impl Archiver { .send::( RpcRequest::GetSlotsPerSegment, serde_json::json!([client_commitment]), - 0, ) .unwrap()) } else { @@ -746,11 +745,7 @@ impl Archiver { let RpcStorageTurn { blockhash: storage_blockhash, slot: turn_slot, - } = rpc_client.send( - RpcRequest::GetStorageTurn, - serde_json::value::Value::Null, - 0, - )?; + } = rpc_client.send(RpcRequest::GetStorageTurn, serde_json::value::Value::Null)?; let turn_blockhash = storage_blockhash.parse().map_err(|err| { io::Error::new( io::ErrorKind::Other, diff --git a/client/src/generic_rpc_client_request.rs b/client/src/generic_rpc_client_request.rs index 18f49c450e..8ab71990c2 100644 --- a/client/src/generic_rpc_client_request.rs +++ b/client/src/generic_rpc_client_request.rs @@ -1,10 +1,5 @@ use crate::{client_error::Result, rpc_request::RpcRequest}; pub(crate) trait GenericRpcClientRequest { - fn send( - &self, - request: RpcRequest, - params: serde_json::Value, - retries: usize, - ) -> Result; + fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result; } diff --git a/client/src/mock_rpc_client_request.rs b/client/src/mock_rpc_client_request.rs index 6fbd52f128..8eeb75c40b 100644 --- a/client/src/mock_rpc_client_request.rs +++ b/client/src/mock_rpc_client_request.rs @@ -38,12 +38,7 @@ impl MockRpcClientRequest { } impl GenericRpcClientRequest for MockRpcClientRequest { - fn send( - &self, - request: RpcRequest, - params: serde_json::Value, - _retries: usize, - ) -> Result { + fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result { if let Some(value) = self.mocks.write().unwrap().remove(&request) { return Ok(value); } diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 49a9e4084f..cc6976986d 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -99,7 +99,7 @@ impl RpcClient { let serialized_encoded = bs58::encode(serialize(transaction).unwrap()).into_string(); let signature_base58_str: String = - self.send(RpcRequest::SendTransaction, json!([serialized_encoded]), 5)?; + self.send(RpcRequest::SendTransaction, json!([serialized_encoded]))?; let signature = signature_base58_str .parse::() @@ -128,7 +128,6 @@ impl RpcClient { self.send( RpcRequest::SimulateTransaction, json!([serialized_encoded, { "sigVerify": sig_verify }]), - 0, ) } @@ -144,7 +143,7 @@ impl RpcClient { signatures: &[Signature], ) -> RpcResult>> { let signatures: Vec<_> = signatures.iter().map(|s| s.to_string()).collect(); - self.send(RpcRequest::GetSignatureStatuses, json!([signatures]), 5) + self.send(RpcRequest::GetSignatureStatuses, json!([signatures])) } pub fn get_signature_status_with_commitment( @@ -155,7 +154,6 @@ impl RpcClient { let result: Response>> = self.send( RpcRequest::GetSignatureStatuses, json!([[signature.to_string()]]), - 5, )?; Ok(result.value[0] .clone() @@ -174,7 +172,6 @@ impl RpcClient { json!([[signature.to_string()], { "searchTransactionHistory": search_transaction_history }]), - 5, )?; Ok(result.value[0] .clone() @@ -190,14 +187,14 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.send(RpcRequest::GetSlot, json!([commitment_config]), 0) + self.send(RpcRequest::GetSlot, json!([commitment_config])) } pub fn supply_with_commitment( &self, commitment_config: CommitmentConfig, ) -> RpcResult { - self.send(RpcRequest::GetSupply, json!([commitment_config]), 0) + self.send(RpcRequest::GetSupply, json!([commitment_config])) } pub fn total_supply(&self) -> ClientResult { @@ -208,14 +205,14 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.send(RpcRequest::GetTotalSupply, json!([commitment_config]), 0) + self.send(RpcRequest::GetTotalSupply, json!([commitment_config])) } pub fn get_largest_accounts_with_config( &self, config: RpcLargestAccountsConfig, ) -> RpcResult> { - self.send(RpcRequest::GetLargestAccounts, json!([config]), 0) + self.send(RpcRequest::GetLargestAccounts, json!([config])) } pub fn get_vote_accounts(&self) -> ClientResult { @@ -226,11 +223,11 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.send(RpcRequest::GetVoteAccounts, json!([commitment_config]), 0) + self.send(RpcRequest::GetVoteAccounts, json!([commitment_config])) } pub fn get_cluster_nodes(&self) -> ClientResult> { - self.send(RpcRequest::GetClusterNodes, Value::Null, 0) + self.send(RpcRequest::GetClusterNodes, Value::Null) } pub fn get_confirmed_block(&self, slot: Slot) -> ClientResult { @@ -242,7 +239,7 @@ impl RpcClient { slot: Slot, encoding: TransactionEncoding, ) -> ClientResult { - self.send(RpcRequest::GetConfirmedBlock, json!([slot, encoding]), 0) + self.send(RpcRequest::GetConfirmedBlock, json!([slot, encoding])) } pub fn get_confirmed_blocks( @@ -253,7 +250,6 @@ impl RpcClient { self.send( RpcRequest::GetConfirmedBlocks, json!([start_slot, end_slot]), - 0, ) } @@ -266,7 +262,6 @@ impl RpcClient { let signatures_base58_str: Vec = self.send( RpcRequest::GetConfirmedSignaturesForAddress, json!([address.to_string(), start_slot, end_slot]), - 0, )?; let mut signatures = vec![]; @@ -288,13 +283,12 @@ impl RpcClient { self.send( RpcRequest::GetConfirmedTransaction, json!([signature.to_string(), encoding]), - 0, ) } pub fn get_block_time(&self, slot: Slot) -> ClientResult { let request = RpcRequest::GetBlockTime; - let response = self.client.send(request, json!([slot]), 0); + let response = self.client.send(request, json!([slot])); response .map(|result_json| { @@ -317,7 +311,7 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.send(RpcRequest::GetEpochInfo, json!([commitment_config]), 0) + self.send(RpcRequest::GetEpochInfo, json!([commitment_config])) } pub fn get_leader_schedule( @@ -335,16 +329,15 @@ impl RpcClient { self.send( RpcRequest::GetLeaderSchedule, json!([slot, commitment_config]), - 0, ) } pub fn get_epoch_schedule(&self) -> ClientResult { - self.send(RpcRequest::GetEpochSchedule, Value::Null, 0) + self.send(RpcRequest::GetEpochSchedule, Value::Null) } pub fn get_identity(&self) -> ClientResult { - let rpc_identity: RpcIdentity = self.send(RpcRequest::GetIdentity, Value::Null, 0)?; + let rpc_identity: RpcIdentity = self.send(RpcRequest::GetIdentity, Value::Null)?; rpc_identity.identity.parse::().map_err(|_| { ClientError::new_with_request( @@ -355,15 +348,15 @@ impl RpcClient { } pub fn get_inflation(&self) -> ClientResult { - self.send(RpcRequest::GetInflation, Value::Null, 0) + self.send(RpcRequest::GetInflation, Value::Null) } pub fn get_version(&self) -> ClientResult { - self.send(RpcRequest::GetVersion, Value::Null, 0) + self.send(RpcRequest::GetVersion, Value::Null) } pub fn minimum_ledger_slot(&self) -> ClientResult { - self.send(RpcRequest::MinimumLedgerSlot, Value::Null, 0) + self.send(RpcRequest::MinimumLedgerSlot, Value::Null) } pub fn send_and_confirm_transaction( @@ -493,12 +486,12 @@ impl RpcClient { pub fn retry_get_balance( &self, pubkey: &Pubkey, - retries: usize, + _retries: usize, ) -> Result, Box> { let request = RpcRequest::GetBalance; let balance_json = self .client - .send(request, json!([pubkey.to_string()]), retries) + .send(request, json!([pubkey.to_string()])) .map_err(|err| err.into_with_request(request))?; Ok(Some( @@ -522,7 +515,6 @@ impl RpcClient { let response = self.client.send( RpcRequest::GetAccountInfo, json!([pubkey.to_string(), commitment_config]), - 0, ); response @@ -559,7 +551,7 @@ impl RpcClient { let request = RpcRequest::GetMinimumBalanceForRentExemption; let minimum_balance_json = self .client - .send(request, json!([data_len]), 0) + .send(request, json!([data_len])) .map_err(|err| err.into_with_request(request))?; let minimum_balance: u64 = serde_json::from_value(minimum_balance_json) @@ -587,16 +579,12 @@ impl RpcClient { self.send( RpcRequest::GetBalance, json!([pubkey.to_string(), commitment_config]), - 0, ) } pub fn get_program_accounts(&self, pubkey: &Pubkey) -> ClientResult> { - let accounts: Vec = self.send( - RpcRequest::GetProgramAccounts, - json!([pubkey.to_string()]), - 0, - )?; + let accounts: Vec = + self.send(RpcRequest::GetProgramAccounts, json!([pubkey.to_string()]))?; let mut pubkey_accounts: Vec<(Pubkey, Account)> = Vec::new(); for RpcKeyedAccount { pubkey, account } in accounts.into_iter() { let pubkey = pubkey.parse().map_err(|_| { @@ -619,11 +607,7 @@ impl RpcClient { &self, commitment_config: CommitmentConfig, ) -> ClientResult { - self.send( - RpcRequest::GetTransactionCount, - json!([commitment_config]), - 0, - ) + self.send(RpcRequest::GetTransactionCount, json!([commitment_config])) } pub fn get_recent_blockhash(&self) -> ClientResult<(Hash, FeeCalculator)> { @@ -646,7 +630,6 @@ impl RpcClient { } = self.send::>( RpcRequest::GetRecentBlockhash, json!([commitment_config]), - 0, )?; let blockhash = blockhash.parse().map_err(|_| { @@ -668,7 +651,6 @@ impl RpcClient { let Response { value, .. } = self.send::>>( RpcRequest::GetFeeCalculatorForBlockhash, json!([blockhash.to_string()]), - 0, )?; Ok(value.map(|rf| rf.fee_calculator)) @@ -678,11 +660,8 @@ impl RpcClient { let Response { context, value: RpcFeeRateGovernor { fee_rate_governor }, - } = self.send::>( - RpcRequest::GetFeeRateGovernor, - Value::Null, - 0, - )?; + } = + self.send::>(RpcRequest::GetFeeRateGovernor, Value::Null)?; Ok(Response { context, @@ -717,7 +696,7 @@ impl RpcClient { } pub fn get_genesis_hash(&self) -> ClientResult { - let hash_str: String = self.send(RpcRequest::GetGenesisHash, Value::Null, 0)?; + let hash_str: String = self.send(RpcRequest::GetGenesisHash, Value::Null)?; let hash = hash_str.parse().map_err(|_| { ClientError::new_with_request( RpcError::ParseError("Hash".to_string()).into(), @@ -913,7 +892,6 @@ impl RpcClient { let result: Response>> = self.send( RpcRequest::GetSignatureStatuses, json!([[signature.to_string()]]), - 5, )?; let confirmations = result.value[0] @@ -1025,17 +1003,17 @@ impl RpcClient { } pub fn validator_exit(&self) -> ClientResult { - self.send(RpcRequest::ValidatorExit, Value::Null, 0) + self.send(RpcRequest::ValidatorExit, Value::Null) } - pub fn send(&self, request: RpcRequest, params: Value, retries: usize) -> ClientResult + pub fn send(&self, request: RpcRequest, params: Value) -> ClientResult where T: serde::de::DeserializeOwned, { assert!(params.is_array() || params.is_null()); let response = self .client - .send(request, params, retries) + .send(request, params) .map_err(|err| err.into_with_request(request))?; serde_json::from_value(response) .map_err(|err| ClientError::new_with_request(err.into(), request)) @@ -1066,7 +1044,6 @@ mod tests { use jsonrpc_core::{Error, IoHandler, Params}; use jsonrpc_http_server::{AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; use serde_json::Number; - use solana_logger; use solana_sdk::{ instruction::InstructionError, signature::Keypair, system_transaction, transaction::TransactionError, @@ -1112,62 +1089,21 @@ mod tests { .send( RpcRequest::GetBalance, json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx"]), - 0, ) .unwrap(); assert_eq!(balance, 50); let blockhash: String = rpc_client - .send(RpcRequest::GetRecentBlockhash, Value::Null, 0) + .send(RpcRequest::GetRecentBlockhash, Value::Null) .unwrap(); assert_eq!(blockhash, "deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhx"); // Send erroneous parameter let blockhash: ClientResult = - rpc_client.send(RpcRequest::GetRecentBlockhash, json!(["parameter"]), 0); + rpc_client.send(RpcRequest::GetRecentBlockhash, json!(["parameter"])); assert_eq!(blockhash.is_err(), true); } - #[test] - fn test_retry_send() { - solana_logger::setup(); - let (sender, receiver) = channel(); - thread::spawn(move || { - // 1. Pick a random port - // 2. Tell the client to start using it - // 3. Delay for 1.5 seconds before starting the server to ensure the client will fail - // and need to retry - let rpc_addr: SocketAddr = "0.0.0.0:4242".parse().unwrap(); - sender.send(rpc_addr.clone()).unwrap(); - sleep(Duration::from_millis(1500)); - - let mut io = IoHandler::default(); - io.add_method("getBalance", move |_params: Params| { - Ok(Value::Number(Number::from(5))) - }); - let server = ServerBuilder::new(io) - .threads(1) - .cors(DomainsValidation::AllowOnly(vec![ - AccessControlAllowOrigin::Any, - ])) - .start_http(&rpc_addr) - .expect("Unable to start RPC server"); - server.wait(); - }); - - let rpc_addr = receiver.recv().unwrap(); - let rpc_client = RpcClient::new_socket(rpc_addr); - - let balance: u64 = rpc_client - .send( - RpcRequest::GetBalance, - json!(["deadbeefXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNHhw"]), - 10, - ) - .unwrap(); - assert_eq!(balance, 5); - } - #[test] fn test_send_transaction() { let rpc_client = RpcClient::new_mock("succeeds".to_string()); diff --git a/client/src/rpc_client_request.rs b/client/src/rpc_client_request.rs index 22d6cf452e..5bc3178da5 100644 --- a/client/src/rpc_client_request.rs +++ b/client/src/rpc_client_request.rs @@ -4,8 +4,7 @@ use crate::{ rpc_request::{RpcError, RpcRequest}, }; use log::*; -use reqwest::{self, header::CONTENT_TYPE}; -use solana_sdk::clock::{DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT}; +use reqwest::{self, header::CONTENT_TYPE, StatusCode}; use std::{thread::sleep, time::Duration}; pub struct RpcClientRequest { @@ -29,17 +28,13 @@ impl RpcClientRequest { } impl GenericRpcClientRequest for RpcClientRequest { - fn send( - &self, - request: RpcRequest, - params: serde_json::Value, - mut retries: usize, - ) -> Result { + fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result { // Concurrent requests are not supported so reuse the same request id for all requests let request_id = 1; let request_json = request.build_request_json(request_id, params); + let mut too_many_requests_retries = 5; loop { match self .client @@ -50,6 +45,19 @@ impl GenericRpcClientRequest for RpcClientRequest { { Ok(response) => { if !response.status().is_success() { + if response.status() == StatusCode::TOO_MANY_REQUESTS + && too_many_requests_retries > 0 + { + too_many_requests_retries -= 1; + debug!( + "Server responded with {:?}, {} retries left", + response, too_many_requests_retries + ); + + // Sleep for 500ms to give the server a break + sleep(Duration::from_millis(500)); + continue; + } return Err(response.error_for_status().unwrap_err().into()); } @@ -63,17 +71,8 @@ impl GenericRpcClientRequest for RpcClientRequest { } return Ok(json["result"].clone()); } - Err(e) => { - info!("{:?} failed, {} retries left: {:?}", request, retries, e); - if retries == 0 { - return Err(e.into()); - } - retries -= 1; - - // Sleep for approximately half a slot - sleep(Duration::from_millis( - 500 * DEFAULT_TICKS_PER_SLOT / DEFAULT_TICKS_PER_SECOND, - )); + Err(err) => { + return Err(err.into()); } } }