Add solana_client::nonblocking::RpcClient

This commit is contained in:
Michael Vines
2022-01-24 17:01:07 -08:00
parent db481e1799
commit 85e8bece2e
13 changed files with 5579 additions and 1419 deletions

2
Cargo.lock generated
View File

@ -4713,6 +4713,7 @@ name = "solana-client"
version = "1.10.0" version = "1.10.0"
dependencies = [ dependencies = [
"assert_matches", "assert_matches",
"async-trait",
"base64 0.13.0", "base64 0.13.0",
"bincode", "bincode",
"bs58 0.4.0", "bs58 0.4.0",
@ -5985,6 +5986,7 @@ dependencies = [
"solana-runtime", "solana-runtime",
"solana-sdk", "solana-sdk",
"solana-streamer", "solana-streamer",
"tokio",
] ]
[[package]] [[package]]

View File

@ -1706,7 +1706,7 @@ mod tests {
serde_json::{json, Value}, serde_json::{json, Value},
solana_client::{ solana_client::{
blockhash_query, blockhash_query,
mock_sender::SIGNATURE, mock_sender_for_cli::SIGNATURE,
rpc_request::RpcRequest, rpc_request::RpcRequest,
rpc_response::{Response, RpcResponseContext}, rpc_response::{Response, RpcResponseContext},
}, },

View File

@ -10,6 +10,7 @@ license = "Apache-2.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
async-trait = "0.1.52"
base64 = "0.13.0" base64 = "0.13.0"
bincode = "1.3.3" bincode = "1.3.3"
bs58 = "0.4.0" bs58 = "0.4.0"

View File

@ -1,4 +1,4 @@
//! The standard [`RpcSender`] over HTTP. //! Nonblocking [`RpcSender`] over HTTP.
use { use {
crate::{ crate::{
@ -8,6 +8,7 @@ use {
rpc_response::RpcSimulateTransactionResult, rpc_response::RpcSimulateTransactionResult,
rpc_sender::*, rpc_sender::*,
}, },
async_trait::async_trait,
log::*, log::*,
reqwest::{ reqwest::{
self, self,
@ -25,13 +26,13 @@ use {
}; };
pub struct HttpSender { pub struct HttpSender {
client: Arc<reqwest::blocking::Client>, client: Arc<reqwest::Client>,
url: String, url: String,
request_id: AtomicU64, request_id: AtomicU64,
stats: RwLock<RpcTransportStats>, stats: RwLock<RpcTransportStats>,
} }
/// The standard [`RpcSender`] over HTTP. /// Nonblocking [`RpcSender`] over HTTP.
impl HttpSender { impl HttpSender {
/// Create an HTTP RPC sender. /// Create an HTTP RPC sender.
/// ///
@ -45,15 +46,11 @@ impl HttpSender {
/// ///
/// The URL is an HTTP URL, usually for port 8899. /// The URL is an HTTP URL, usually for port 8899.
pub fn new_with_timeout(url: String, timeout: Duration) -> Self { pub fn new_with_timeout(url: String, timeout: Duration) -> Self {
// `reqwest::blocking::Client` panics if run in a tokio async context. Shuttle the
// request to a different tokio thread to avoid this
let client = Arc::new( let client = Arc::new(
tokio::task::block_in_place(move || { reqwest::Client::builder()
reqwest::blocking::Client::builder() .timeout(timeout)
.timeout(timeout) .build()
.build() .expect("build rpc client"),
})
.expect("build rpc client"),
); );
Self { Self {
@ -100,12 +97,17 @@ impl<'a> Drop for StatsUpdater<'a> {
} }
} }
#[async_trait]
impl RpcSender for HttpSender { impl RpcSender for HttpSender {
fn get_transport_stats(&self) -> RpcTransportStats { fn get_transport_stats(&self) -> RpcTransportStats {
self.stats.read().unwrap().clone() self.stats.read().unwrap().clone()
} }
fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> { async fn send(
&self,
request: RpcRequest,
params: serde_json::Value,
) -> Result<serde_json::Value> {
let mut stats_updater = StatsUpdater::new(&self.stats); let mut stats_updater = StatsUpdater::new(&self.stats);
let request_id = self.request_id.fetch_add(1, Ordering::Relaxed); let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
@ -113,18 +115,15 @@ impl RpcSender for HttpSender {
let mut too_many_requests_retries = 5; let mut too_many_requests_retries = 5;
loop { loop {
// `reqwest::blocking::Client` panics if run in a tokio async context. Shuttle the
// request to a different tokio thread to avoid this
let response = { let response = {
let client = self.client.clone(); let client = self.client.clone();
let request_json = request_json.clone(); let request_json = request_json.clone();
tokio::task::block_in_place(move || { client
client .post(&self.url)
.post(&self.url) .header(CONTENT_TYPE, "application/json")
.header(CONTENT_TYPE, "application/json") .body(request_json)
.body(request_json) .send()
.send() .await
})
}?; }?;
if !response.status().is_success() { if !response.status().is_success() {
@ -155,8 +154,7 @@ impl RpcSender for HttpSender {
return Err(response.error_for_status().unwrap_err().into()); return Err(response.error_for_status().unwrap_err().into());
} }
let mut json = let mut json = response.json::<serde_json::Value>().await?;
tokio::task::block_in_place(move || response.json::<serde_json::Value>())?;
if json["error"].is_object() { if json["error"].is_object() {
return match serde_json::from_value::<RpcErrorObject>(json["error"].clone()) { return match serde_json::from_value::<RpcErrorObject>(json["error"].clone()) {
Ok(rpc_error_object) => { Ok(rpc_error_object) => {
@ -208,14 +206,16 @@ mod tests {
#[tokio::test(flavor = "multi_thread")] #[tokio::test(flavor = "multi_thread")]
async fn http_sender_on_tokio_multi_thread() { async fn http_sender_on_tokio_multi_thread() {
let http_sender = HttpSender::new("http://localhost:1234".to_string()); let http_sender = HttpSender::new("http://localhost:1234".to_string());
let _ = http_sender.send(RpcRequest::GetVersion, serde_json::Value::Null); let _ = http_sender
.send(RpcRequest::GetVersion, serde_json::Value::Null)
.await;
} }
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
#[should_panic(expected = "can call blocking only when running on the multi-threaded runtime")] async fn http_sender_on_tokio_current_thread() {
async fn http_sender_ontokio_current_thread_should_panic() { let http_sender = HttpSender::new("http://localhost:1234".to_string());
// RpcClient::new() will panic in the tokio current-thread runtime due to `tokio::task::block_in_place()` usage, and there let _ = http_sender
// doesn't seem to be a way to detect whether the tokio runtime is multi_thread or current_thread... .send(RpcRequest::GetVersion, serde_json::Value::Null)
let _ = HttpSender::new("http://localhost:1234".to_string()); .await;
} }
} }

View File

@ -4,8 +4,9 @@ extern crate serde_derive;
pub mod blockhash_query; pub mod blockhash_query;
pub mod client_error; pub mod client_error;
pub mod http_sender; pub(crate) mod http_sender;
pub mod mock_sender; pub(crate) mod mock_sender;
pub mod nonblocking;
pub mod nonce_utils; pub mod nonce_utils;
pub mod perf_utils; pub mod perf_utils;
pub mod pubsub_client; pub mod pubsub_client;
@ -17,8 +18,15 @@ pub mod rpc_deprecated_config;
pub mod rpc_filter; pub mod rpc_filter;
pub mod rpc_request; pub mod rpc_request;
pub mod rpc_response; pub mod rpc_response;
pub mod rpc_sender; pub(crate) mod rpc_sender;
pub mod spinner; pub mod spinner;
pub mod thin_client; pub mod thin_client;
pub mod tpu_client; pub mod tpu_client;
pub mod transaction_executor; pub mod transaction_executor;
pub mod mock_sender_for_cli {
/// Magic `SIGNATURE` value used by `solana-cli` unit tests.
/// Please don't use this constant.
pub const SIGNATURE: &str =
"43yNSFC6fYTuPgTNFFhF4axw7AfWxB2BPdurme8yrsWEYwm8299xh8n6TAHjGymiSub1XtyxTNyd9GBfY2hxoBw8";
}

View File

@ -1,4 +1,4 @@
//! An [`RpcSender`] used for unit testing [`RpcClient`](crate::rpc_client::RpcClient). //! A nonblocking [`RpcSender`] used for unit testing [`RpcClient`](crate::rpc_client::RpcClient).
use { use {
crate::{ crate::{
@ -15,6 +15,7 @@ use {
}, },
rpc_sender::*, rpc_sender::*,
}, },
async_trait::async_trait,
serde_json::{json, Number, Value}, serde_json::{json, Number, Value},
solana_account_decoder::{UiAccount, UiAccountEncoding}, solana_account_decoder::{UiAccount, UiAccountEncoding},
solana_sdk::{ solana_sdk::{
@ -40,8 +41,6 @@ use {
}; };
pub const PUBKEY: &str = "7RoSF9fUmdphVCpabEoefH81WwrW7orsWonXWqTXkKV8"; pub const PUBKEY: &str = "7RoSF9fUmdphVCpabEoefH81WwrW7orsWonXWqTXkKV8";
pub const SIGNATURE: &str =
"43yNSFC6fYTuPgTNFFhF4axw7AfWxB2BPdurme8yrsWEYwm8299xh8n6TAHjGymiSub1XtyxTNyd9GBfY2hxoBw8";
pub type Mocks = HashMap<RpcRequest, Value>; pub type Mocks = HashMap<RpcRequest, Value>;
pub struct MockSender { pub struct MockSender {
@ -87,12 +86,17 @@ impl MockSender {
} }
} }
#[async_trait]
impl RpcSender for MockSender { impl RpcSender for MockSender {
fn get_transport_stats(&self) -> RpcTransportStats { fn get_transport_stats(&self) -> RpcTransportStats {
RpcTransportStats::default() RpcTransportStats::default()
} }
fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value> { async fn send(
&self,
request: RpcRequest,
params: serde_json::Value,
) -> Result<serde_json::Value> {
if let Some(value) = self.mocks.write().unwrap().remove(&request) { if let Some(value) = self.mocks.write().unwrap().remove(&request) {
return Ok(value); return Ok(value);
} }
@ -386,7 +390,7 @@ impl RpcSender for MockSender {
"getBlocksWithLimit" => serde_json::to_value(vec![1, 2, 3])?, "getBlocksWithLimit" => serde_json::to_value(vec![1, 2, 3])?,
"getSignaturesForAddress" => { "getSignaturesForAddress" => {
serde_json::to_value(vec![RpcConfirmedTransactionStatusWithSignature { serde_json::to_value(vec![RpcConfirmedTransactionStatusWithSignature {
signature: SIGNATURE.to_string(), signature: crate::mock_sender_for_cli::SIGNATURE.to_string(),
slot: 123, slot: 123,
err: None, err: None,
memo: None, memo: None,

View File

@ -0,0 +1 @@
pub mod rpc_client;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
//! A transport for RPC calls. //! A transport for RPC calls.
use { use {
crate::{client_error::Result, rpc_request::RpcRequest}, crate::{client_error::Result, rpc_request::RpcRequest},
async_trait::async_trait,
std::time::Duration, std::time::Duration,
}; };
@ -26,10 +26,14 @@ pub struct RpcTransportStats {
/// It is typically implemented by [`HttpSender`] in production, and /// It is typically implemented by [`HttpSender`] in production, and
/// [`MockSender`] in unit tests. /// [`MockSender`] in unit tests.
/// ///
/// [`RpcClient`]: crate::rpc_client::RpcClient
/// [`HttpSender`]: crate::http_sender::HttpSender /// [`HttpSender`]: crate::http_sender::HttpSender
/// [`MockSender`]: crate::mock_sender::MockSender /// [`MockSender`]: crate::mock_sender::MockSender
pub trait RpcSender { #[async_trait]
fn send(&self, request: RpcRequest, params: serde_json::Value) -> Result<serde_json::Value>; pub(crate) trait RpcSender {
async fn send(
&self,
request: RpcRequest,
params: serde_json::Value,
) -> Result<serde_json::Value>;
fn get_transport_stats(&self) -> RpcTransportStats; fn get_transport_stats(&self) -> RpcTransportStats;
} }

View File

@ -3142,6 +3142,7 @@ dependencies = [
name = "solana-client" name = "solana-client"
version = "1.10.0" version = "1.10.0"
dependencies = [ dependencies = [
"async-trait",
"base64 0.13.0", "base64 0.13.0",
"bincode", "bincode",
"bs58 0.4.0", "bs58 0.4.0",

View File

@ -27,6 +27,7 @@ solana-rpc = { path = "../rpc", version = "=1.10.0" }
solana-runtime = { path = "../runtime", version = "=1.10.0" } solana-runtime = { path = "../runtime", version = "=1.10.0" }
solana-sdk = { path = "../sdk", version = "=1.10.0" } solana-sdk = { path = "../sdk", version = "=1.10.0" }
solana-streamer = { path = "../streamer", version = "=1.10.0" } solana-streamer = { path = "../streamer", version = "=1.10.0" }
tokio = { version = "1", features = ["full"] }
[package.metadata.docs.rs] [package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"] targets = ["x86_64-unknown-linux-gnu"]

View File

@ -2,7 +2,7 @@
use { use {
log::*, log::*,
solana_cli_output::CliAccount, solana_cli_output::CliAccount,
solana_client::rpc_client::RpcClient, solana_client::{nonblocking, rpc_client::RpcClient},
solana_core::{ solana_core::{
tower_storage::TowerStorage, tower_storage::TowerStorage,
validator::{Validator, ValidatorConfig, ValidatorStartProgress}, validator::{Validator, ValidatorConfig, ValidatorStartProgress},
@ -43,9 +43,9 @@ use {
path::{Path, PathBuf}, path::{Path, PathBuf},
str::FromStr, str::FromStr,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
thread::sleep,
time::Duration, time::Duration,
}, },
tokio::time::sleep,
}; };
#[derive(Clone)] #[derive(Clone)]
@ -358,9 +358,39 @@ impl TestValidatorGenesis {
socket_addr_space: SocketAddrSpace, socket_addr_space: SocketAddrSpace,
) -> (TestValidator, Keypair) { ) -> (TestValidator, Keypair) {
let mint_keypair = Keypair::new(); let mint_keypair = Keypair::new();
TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space) match TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space) {
.map(|test_validator| (test_validator, mint_keypair)) Ok(test_validator) => {
.expect("Test validator failed to start") let runtime = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();
runtime.block_on(test_validator.wait_for_nonzero_fees());
(test_validator, mint_keypair)
}
Err(err) => panic!("Test validator failed to start: {}", err),
}
}
pub async fn start_async(&self) -> (TestValidator, Keypair) {
self.start_async_with_socket_addr_space(SocketAddrSpace::new(
/*allow_private_addr=*/ true,
))
.await
}
pub async fn start_async_with_socket_addr_space(
&self,
socket_addr_space: SocketAddrSpace,
) -> (TestValidator, Keypair) {
let mint_keypair = Keypair::new();
match TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space) {
Ok(test_validator) => {
test_validator.wait_for_nonzero_fees().await;
(test_validator, mint_keypair)
}
Err(err) => panic!("Test validator failed to start: {}", err),
}
} }
} }
@ -617,53 +647,7 @@ impl TestValidator {
discover_cluster(&gossip, 1, socket_addr_space) discover_cluster(&gossip, 1, socket_addr_space)
.map_err(|err| format!("TestValidator startup failed: {:?}", err))?; .map_err(|err| format!("TestValidator startup failed: {:?}", err))?;
// This is a hack to delay until the fees are non-zero for test consistency let test_validator = TestValidator {
// (fees from genesis are zero until the first block with a transaction in it is completed
// due to a bug in the Bank)
{
let rpc_client =
RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::processed());
let mut message = Message::new(
&[Instruction::new_with_bytes(
Pubkey::new_unique(),
&[],
vec![AccountMeta::new(Pubkey::new_unique(), true)],
)],
None,
);
const MAX_TRIES: u64 = 10;
let mut num_tries = 0;
loop {
num_tries += 1;
if num_tries > MAX_TRIES {
break;
}
println!("Waiting for fees to stabilize {:?}...", num_tries);
match rpc_client.get_latest_blockhash() {
Ok(blockhash) => {
message.recent_blockhash = blockhash;
match rpc_client.get_fee_for_message(&message) {
Ok(fee) => {
if fee != 0 {
break;
}
}
Err(err) => {
warn!("get_fee_for_message() failed: {:?}", err);
break;
}
}
}
Err(err) => {
warn!("get_latest_blockhash() failed: {:?}", err);
break;
}
}
sleep(Duration::from_millis(DEFAULT_MS_PER_SLOT));
}
}
Ok(TestValidator {
ledger_path, ledger_path,
preserve_ledger, preserve_ledger,
rpc_pubsub_url, rpc_pubsub_url,
@ -672,7 +656,56 @@ impl TestValidator {
gossip, gossip,
validator, validator,
vote_account_address, vote_account_address,
}) };
Ok(test_validator)
}
/// This is a hack to delay until the fees are non-zero for test consistency
/// (fees from genesis are zero until the first block with a transaction in it is completed
/// due to a bug in the Bank)
async fn wait_for_nonzero_fees(&self) {
let rpc_client = nonblocking::rpc_client::RpcClient::new_with_commitment(
self.rpc_url.clone(),
CommitmentConfig::processed(),
);
let mut message = Message::new(
&[Instruction::new_with_bytes(
Pubkey::new_unique(),
&[],
vec![AccountMeta::new(Pubkey::new_unique(), true)],
)],
None,
);
const MAX_TRIES: u64 = 10;
let mut num_tries = 0;
loop {
num_tries += 1;
if num_tries > MAX_TRIES {
break;
}
println!("Waiting for fees to stabilize {:?}...", num_tries);
match rpc_client.get_latest_blockhash().await {
Ok(blockhash) => {
message.recent_blockhash = blockhash;
match rpc_client.get_fee_for_message(&message).await {
Ok(fee) => {
if fee != 0 {
break;
}
}
Err(err) => {
warn!("get_fee_for_message() failed: {:?}", err);
break;
}
}
}
Err(err) => {
warn!("get_latest_blockhash() failed: {:?}", err);
break;
}
}
sleep(Duration::from_millis(DEFAULT_MS_PER_SLOT)).await;
}
} }
/// Return the validator's TPU address /// Return the validator's TPU address
@ -719,6 +752,14 @@ impl TestValidator {
RpcClient::new_with_commitment(self.rpc_url.clone(), CommitmentConfig::processed()) RpcClient::new_with_commitment(self.rpc_url.clone(), CommitmentConfig::processed())
} }
/// Return a nonblocking RpcClient for the validator.
pub fn get_async_rpc_client(&self) -> nonblocking::rpc_client::RpcClient {
nonblocking::rpc_client::RpcClient::new_with_commitment(
self.rpc_url.clone(),
CommitmentConfig::processed(),
)
}
pub fn join(mut self) { pub fn join(mut self) {
if let Some(validator) = self.validator.take() { if let Some(validator) = self.validator.take() {
validator.join(); validator.join();
@ -746,3 +787,29 @@ impl Drop for TestValidator {
} }
} }
} }
#[cfg(test)]
mod test {
use super::*;
#[test]
fn get_health() {
let (test_validator, _payer) = TestValidatorGenesis::default().start();
let rpc_client = test_validator.get_rpc_client();
rpc_client.get_health().expect("health");
}
#[tokio::test]
async fn nonblocking_get_health() {
let (test_validator, _payer) = TestValidatorGenesis::default().start_async().await;
let rpc_client = test_validator.get_async_rpc_client();
rpc_client.get_health().await.expect("health");
}
#[tokio::test]
#[should_panic]
async fn document_tokio_panic() {
// `start()` blows up when run within tokio
let (_test_validator, _payer) = TestValidatorGenesis::default().start();
}
}