diff --git a/client/src/http_sender.rs b/client/src/http_sender.rs index 79d5c4ed85..e8763ce478 100644 --- a/client/src/http_sender.rs +++ b/client/src/http_sender.rs @@ -6,7 +6,7 @@ use { rpc_custom_error, rpc_request::{RpcError, RpcRequest, RpcResponseErrorData}, rpc_response::RpcSimulateTransactionResult, - rpc_sender::RpcSender, + rpc_sender::*, }, log::*, reqwest::{ @@ -17,10 +17,10 @@ use { std::{ sync::{ atomic::{AtomicU64, Ordering}, - Arc, + Arc, RwLock, }, thread::sleep, - time::Duration, + time::{Duration, Instant}, }, }; @@ -28,6 +28,7 @@ pub struct HttpSender { client: Arc, url: String, request_id: AtomicU64, + stats: RwLock, } /// The standard [`RpcSender`] over HTTP. @@ -59,6 +60,7 @@ impl HttpSender { client, url, request_id: AtomicU64::new(0), + stats: RwLock::new(RpcTransportStats::default()), } } } @@ -69,8 +71,43 @@ struct RpcErrorObject { message: String, } +struct StatsUpdater<'a> { + stats: &'a RwLock, + request_start_time: Instant, + rate_limited_time: Duration, +} + +impl<'a> StatsUpdater<'a> { + fn new(stats: &'a RwLock) -> Self { + Self { + stats, + request_start_time: Instant::now(), + rate_limited_time: Duration::default(), + } + } + + fn add_rate_limited_time(&mut self, duration: Duration) { + self.rate_limited_time += duration; + } +} + +impl<'a> Drop for StatsUpdater<'a> { + fn drop(&mut self) { + let mut stats = self.stats.write().unwrap(); + stats.request_count += 1; + stats.elapsed_time += Instant::now().duration_since(self.request_start_time); + stats.rate_limited_time += self.rate_limited_time; + } +} + impl RpcSender for HttpSender { + fn get_transport_stats(&self) -> RpcTransportStats { + self.stats.read().unwrap().clone() + } + fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result { + let mut stats_updater = StatsUpdater::new(&self.stats); + let request_id = self.request_id.fetch_add(1, Ordering::Relaxed); let request_json = request.build_request_json(request_id, params).to_string(); @@ -114,6 +151,7 @@ impl RpcSender for HttpSender { ); sleep(duration); + stats_updater.add_rate_limited_time(duration); continue; } return Err(response.error_for_status().unwrap_err().into()); diff --git a/client/src/mock_sender.rs b/client/src/mock_sender.rs index b8b456e14d..44b561d514 100644 --- a/client/src/mock_sender.rs +++ b/client/src/mock_sender.rs @@ -11,7 +11,7 @@ use { RpcResponseContext, RpcSimulateTransactionResult, RpcStakeActivation, RpcSupply, RpcVersionInfo, RpcVoteAccountInfo, RpcVoteAccountStatus, StakeActivationState, }, - rpc_sender::RpcSender, + rpc_sender::*, }, serde_json::{json, Number, Value}, solana_sdk::{ @@ -83,6 +83,10 @@ impl MockSender { } impl RpcSender for MockSender { + fn get_transport_stats(&self) -> RpcTransportStats { + RpcTransportStats::default() + } + fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result { if let Some(value) = self.mocks.write().unwrap().remove(&request) { return Ok(value); diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 25be7485d9..fa3f69ea90 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -20,7 +20,7 @@ use { rpc_config::*, rpc_request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter}, rpc_response::*, - rpc_sender::RpcSender, + rpc_sender::*, }, bincode::serialize, indicatif::{ProgressBar, ProgressStyle}, @@ -3820,6 +3820,10 @@ impl RpcClient { serde_json::from_value(response) .map_err(|err| ClientError::new_with_request(err.into(), request)) } + + pub fn get_transport_stats(&self) -> RpcTransportStats { + self.sender.get_transport_stats() + } } fn serialize_encode_transaction( diff --git a/client/src/rpc_sender.rs b/client/src/rpc_sender.rs index 75e5aab0e0..8bf45af522 100644 --- a/client/src/rpc_sender.rs +++ b/client/src/rpc_sender.rs @@ -1,6 +1,22 @@ //! A transport for RPC calls. -use crate::{client_error::Result, rpc_request::RpcRequest}; +use { + crate::{client_error::Result, rpc_request::RpcRequest}, + std::time::Duration, +}; + +#[derive(Default, Clone)] +pub struct RpcTransportStats { + /// Number of RPC requests issued + pub request_count: usize, + + /// Total amount of time spent transacting with the RPC server + pub elapsed_time: Duration, + + /// Total amount of waiting time due to RPC server rate limiting + /// (a subset of `elapsed_time`) + pub rate_limited_time: Duration, +} /// A transport for RPC calls. /// @@ -15,4 +31,5 @@ use crate::{client_error::Result, rpc_request::RpcRequest}; /// [`MockSender`]: crate::mock_sender::MockSender pub trait RpcSender { fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result; + fn get_transport_stats(&self) -> RpcTransportStats; }