Add RpcClient::get_transport_stats()

(cherry picked from commit 21f4606212)
This commit is contained in:
Michael Vines
2021-09-08 18:44:35 -05:00
parent b9a0156a93
commit 0b1aadf446
4 changed files with 69 additions and 6 deletions

View File

@ -6,7 +6,7 @@ use {
rpc_custom_error, rpc_custom_error,
rpc_request::{RpcError, RpcRequest, RpcResponseErrorData}, rpc_request::{RpcError, RpcRequest, RpcResponseErrorData},
rpc_response::RpcSimulateTransactionResult, rpc_response::RpcSimulateTransactionResult,
rpc_sender::RpcSender, rpc_sender::*,
}, },
log::*, log::*,
reqwest::{ reqwest::{
@ -17,10 +17,10 @@ use {
std::{ std::{
sync::{ sync::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, Arc, RwLock,
}, },
thread::sleep, thread::sleep,
time::Duration, time::{Duration, Instant},
}, },
}; };
@ -28,6 +28,7 @@ pub struct HttpSender {
client: Arc<reqwest::blocking::Client>, client: Arc<reqwest::blocking::Client>,
url: String, url: String,
request_id: AtomicU64, request_id: AtomicU64,
stats: RwLock<RpcTransportStats>,
} }
/// The standard [`RpcSender`] over HTTP. /// The standard [`RpcSender`] over HTTP.
@ -59,6 +60,7 @@ impl HttpSender {
client, client,
url, url,
request_id: AtomicU64::new(0), request_id: AtomicU64::new(0),
stats: RwLock::new(RpcTransportStats::default()),
} }
} }
} }
@ -69,8 +71,43 @@ struct RpcErrorObject {
message: String, message: String,
} }
struct StatsUpdater<'a> {
stats: &'a RwLock<RpcTransportStats>,
request_start_time: Instant,
rate_limited_time: Duration,
}
impl<'a> StatsUpdater<'a> {
fn new(stats: &'a RwLock<RpcTransportStats>) -> Self {
Self {
stats,
request_start_time: Instant::now(),
rate_limited_time: Duration::default(),
}
}
fn add_rate_limited_time(&mut self, duration: Duration) {
self.rate_limited_time += duration;
}
}
impl<'a> Drop for StatsUpdater<'a> {
fn drop(&mut self) {
let mut stats = self.stats.write().unwrap();
stats.request_count += 1;
stats.elapsed_time += Instant::now().duration_since(self.request_start_time);
stats.rate_limited_time += self.rate_limited_time;
}
}
impl RpcSender for HttpSender { impl RpcSender for HttpSender {
fn get_transport_stats(&self) -> RpcTransportStats {
self.stats.read().unwrap().clone()
}
fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> { fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> {
let mut stats_updater = StatsUpdater::new(&self.stats);
let request_id = self.request_id.fetch_add(1, Ordering::Relaxed); let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
let request_json = request.build_request_json(request_id, params).to_string(); let request_json = request.build_request_json(request_id, params).to_string();
@ -114,6 +151,7 @@ impl RpcSender for HttpSender {
); );
sleep(duration); sleep(duration);
stats_updater.add_rate_limited_time(duration);
continue; continue;
} }
return Err(response.error_for_status().unwrap_err().into()); return Err(response.error_for_status().unwrap_err().into());

View File

@ -11,7 +11,7 @@ use {
RpcResponseContext, RpcSimulateTransactionResult, RpcStakeActivation, RpcSupply, RpcResponseContext, RpcSimulateTransactionResult, RpcStakeActivation, RpcSupply,
RpcVersionInfo, RpcVoteAccountInfo, RpcVoteAccountStatus, StakeActivationState, RpcVersionInfo, RpcVoteAccountInfo, RpcVoteAccountStatus, StakeActivationState,
}, },
rpc_sender::RpcSender, rpc_sender::*,
}, },
serde_json::{json, Number, Value}, serde_json::{json, Number, Value},
solana_sdk::{ solana_sdk::{
@ -83,6 +83,10 @@ impl MockSender {
} }
impl RpcSender for MockSender { impl RpcSender for MockSender {
fn get_transport_stats(&self) -> RpcTransportStats {
RpcTransportStats::default()
}
fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> { fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> {
if let Some(value) = self.mocks.write().unwrap().remove(&request) { if let Some(value) = self.mocks.write().unwrap().remove(&request) {
return Ok(value); return Ok(value);

View File

@ -20,7 +20,7 @@ use {
rpc_config::*, rpc_config::*,
rpc_request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter}, rpc_request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter},
rpc_response::*, rpc_response::*,
rpc_sender::RpcSender, rpc_sender::*,
}, },
bincode::serialize, bincode::serialize,
indicatif::{ProgressBar, ProgressStyle}, indicatif::{ProgressBar, ProgressStyle},
@ -3820,6 +3820,10 @@ impl RpcClient {
serde_json::from_value(response) serde_json::from_value(response)
.map_err(|err| ClientError::new_with_request(err.into(), request)) .map_err(|err| ClientError::new_with_request(err.into(), request))
} }
pub fn get_transport_stats(&self) -> RpcTransportStats {
self.sender.get_transport_stats()
}
} }
fn serialize_encode_transaction( fn serialize_encode_transaction(

View File

@ -1,6 +1,22 @@
//! A transport for RPC calls. //! A transport for RPC calls.
use crate::{client_error::Result, rpc_request::RpcRequest}; use {
crate::{client_error::Result, rpc_request::RpcRequest},
std::time::Duration,
};
#[derive(Default, Clone)]
pub struct RpcTransportStats {
/// Number of RPC requests issued
pub request_count: usize,
/// Total amount of time spent transacting with the RPC server
pub elapsed_time: Duration,
/// Total amount of waiting time due to RPC server rate limiting
/// (a subset of `elapsed_time`)
pub rate_limited_time: Duration,
}
/// A transport for RPC calls. /// A transport for RPC calls.
/// ///
@ -15,4 +31,5 @@ use crate::{client_error::Result, rpc_request::RpcRequest};
/// [`MockSender`]: crate::mock_sender::MockSender /// [`MockSender`]: crate::mock_sender::MockSender
pub trait RpcSender { pub trait RpcSender {
fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value>; fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value>;
fn get_transport_stats(&self) -> RpcTransportStats;
} }