From 1700820583ae99af1fa2d55304e5116d73bbd165 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 12 Apr 2022 13:12:33 -0600 Subject: [PATCH] Add TpuClient support to bench-tps (backport #24227) (#24284) * Add TpuClient support to bench-tps (#24227) * Add fallible send methods, and rpc_client helper * Add helper to return RpcClient url * Implement BenchTpsClient for TpuClient * Add cli rpc and identity handling * Handle different kinds of clients in main, use TpuClient * Add tpu_client integration test (cherry picked from commit 8487030ea64abf39df388f0b395e899e6fbf6a82) # Conflicts: # bench-tps/Cargo.toml * Fix conflicts Co-authored-by: Tyera Eulberg --- Cargo.lock | 3 + bench-tps/Cargo.toml | 3 + bench-tps/src/bench_tps_client.rs | 1 + bench-tps/src/bench_tps_client/tpu_client.rs | 99 ++++++++++++ bench-tps/src/cli.rs | 99 +++++++++++- bench-tps/src/keypairs.rs | 72 +++++++++ bench-tps/src/lib.rs | 3 +- bench-tps/src/main.rs | 158 +++++++++---------- bench-tps/tests/bench_tps.rs | 61 ++++++- client/src/http_sender.rs | 4 + client/src/mock_sender.rs | 4 + client/src/nonblocking/rpc_client.rs | 5 + client/src/rpc_client.rs | 5 + client/src/rpc_sender.rs | 1 + client/src/tpu_client.rs | 49 +++++- 15 files changed, 466 insertions(+), 101 deletions(-) create mode 100644 bench-tps/src/bench_tps_client/tpu_client.rs create mode 100644 bench-tps/src/keypairs.rs diff --git a/Cargo.lock b/Cargo.lock index 67fe4cc94d..efbc4ff229 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4456,6 +4456,8 @@ dependencies = [ "serde_json", "serde_yaml", "serial_test", + "solana-clap-utils", + "solana-cli-config", "solana-client", "solana-core", "solana-faucet", @@ -4470,6 +4472,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-streamer", + "solana-test-validator", "solana-version", "thiserror", ] diff --git a/bench-tps/Cargo.toml b/bench-tps/Cargo.toml index b1b835dc0c..d1cdc724aa 100644 --- a/bench-tps/Cargo.toml +++ b/bench-tps/Cargo.toml @@ -15,6 +15,8 @@ log = "0.4.14" rayon = "1.5.1" serde_json = "1.0.79" serde_yaml = "0.8.23" +solana-clap-utils = { path = "../clap-utils", version = "=1.10.9" } +solana-cli-config = { path = "../cli-config", version = "=1.10.9" } solana-client = { path = "../client", version = "=1.10.9" } solana-core = { path = "../core", version = "=1.10.9" } solana-faucet = { path = "../faucet", version = "=1.10.9" } @@ -34,6 +36,7 @@ thiserror = "1.0" [dev-dependencies] serial_test = "0.6.0" solana-local-cluster = { path = "../local-cluster", version = "=1.10.9" } +solana-test-validator = { path = "../test-validator", version = "=1.10.9" } [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/bench-tps/src/bench_tps_client.rs b/bench-tps/src/bench_tps_client.rs index 5f0428487f..fbf500f778 100644 --- a/bench-tps/src/bench_tps_client.rs +++ b/bench-tps/src/bench_tps_client.rs @@ -83,3 +83,4 @@ pub trait BenchTpsClient { mod bank_client; mod thin_client; +mod tpu_client; diff --git a/bench-tps/src/bench_tps_client/tpu_client.rs b/bench-tps/src/bench_tps_client/tpu_client.rs new file mode 100644 index 0000000000..d968338161 --- /dev/null +++ b/bench-tps/src/bench_tps_client/tpu_client.rs @@ -0,0 +1,99 @@ +use { + crate::bench_tps_client::{BenchTpsClient, Result}, + solana_client::tpu_client::TpuClient, + solana_sdk::{ + commitment_config::CommitmentConfig, epoch_info::EpochInfo, hash::Hash, message::Message, + pubkey::Pubkey, signature::Signature, transaction::Transaction, + }, +}; + +impl BenchTpsClient for TpuClient { + fn send_transaction(&self, transaction: Transaction) -> Result { + let signature = transaction.signatures[0]; + self.try_send_transaction(&transaction)?; + Ok(signature) + } + fn send_batch(&self, transactions: Vec) -> Result<()> { + for transaction in transactions { + BenchTpsClient::send_transaction(self, transaction)?; + } + Ok(()) + } + fn get_latest_blockhash(&self) -> Result { + self.rpc_client() + .get_latest_blockhash() + .map_err(|err| err.into()) + } + + fn get_latest_blockhash_with_commitment( + &self, + commitment_config: CommitmentConfig, + ) -> Result<(Hash, u64)> { + self.rpc_client() + .get_latest_blockhash_with_commitment(commitment_config) + .map_err(|err| err.into()) + } + + fn get_transaction_count(&self) -> Result { + self.rpc_client() + .get_transaction_count() + .map_err(|err| err.into()) + } + + fn get_transaction_count_with_commitment( + &self, + commitment_config: CommitmentConfig, + ) -> Result { + self.rpc_client() + .get_transaction_count_with_commitment(commitment_config) + .map_err(|err| err.into()) + } + + fn get_epoch_info(&self) -> Result { + self.rpc_client().get_epoch_info().map_err(|err| err.into()) + } + + fn get_balance(&self, pubkey: &Pubkey) -> Result { + self.rpc_client() + .get_balance(pubkey) + .map_err(|err| err.into()) + } + + fn get_balance_with_commitment( + &self, + pubkey: &Pubkey, + commitment_config: CommitmentConfig, + ) -> Result { + self.rpc_client() + .get_balance_with_commitment(pubkey, commitment_config) + .map(|res| res.value) + .map_err(|err| err.into()) + } + + fn get_fee_for_message(&self, message: &Message) -> Result { + self.rpc_client() + .get_fee_for_message(message) + .map_err(|err| err.into()) + } + + fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> Result { + self.rpc_client() + .get_minimum_balance_for_rent_exemption(data_len) + .map_err(|err| err.into()) + } + + fn addr(&self) -> String { + self.rpc_client().url() + } + + fn request_airdrop_with_blockhash( + &self, + pubkey: &Pubkey, + lamports: u64, + recent_blockhash: &Hash, + ) -> Result { + self.rpc_client() + .request_airdrop_with_blockhash(pubkey, lamports, recent_blockhash) + .map_err(|err| err.into()) + } +} diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 1f289f0520..b14bbe4a44 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -1,5 +1,7 @@ use { clap::{crate_description, crate_name, App, Arg, ArgMatches}, + solana_clap_utils::input_validators::{is_url, is_url_or_moniker}, + solana_cli_config::{ConfigInput, CONFIG_FILE}, solana_sdk::{ fee_calculator::FeeRateGovernor, pubkey::Pubkey, @@ -10,9 +12,26 @@ use { const NUM_LAMPORTS_PER_ACCOUNT_DEFAULT: u64 = solana_sdk::native_token::LAMPORTS_PER_SOL; +pub enum ExternalClientType { + // Submits transactions directly to leaders using a ThinClient, broadcasting to multiple + // leaders when num_nodes > 1 + ThinClient, + // Submits transactions directly to leaders using a TpuClient, broadcasting to upcoming leaders + // via TpuClient default configuration + TpuClient, +} + +impl Default for ExternalClientType { + fn default() -> Self { + Self::ThinClient + } +} + /// Holds the configuration for a single run of the benchmark pub struct Config { pub entrypoint_addr: SocketAddr, + pub json_rpc_url: String, + pub websocket_url: String, pub id: Keypair, pub threads: usize, pub num_nodes: usize, @@ -29,12 +48,15 @@ pub struct Config { pub num_lamports_per_account: u64, pub target_slots_per_epoch: u64, pub target_node: Option, + pub external_client_type: ExternalClientType, } impl Default for Config { fn default() -> Config { Config { entrypoint_addr: SocketAddr::from(([127, 0, 0, 1], 8001)), + json_rpc_url: ConfigInput::default().json_rpc_url, + websocket_url: ConfigInput::default().websocket_url, id: Keypair::new(), threads: 4, num_nodes: 1, @@ -51,6 +73,7 @@ impl Default for Config { num_lamports_per_account: NUM_LAMPORTS_PER_ACCOUNT_DEFAULT, target_slots_per_epoch: 0, target_node: None, + external_client_type: ExternalClientType::default(), } } } @@ -59,6 +82,42 @@ impl Default for Config { pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { App::new(crate_name!()).about(crate_description!()) .version(version) + .arg({ + let arg = Arg::with_name("config_file") + .short("C") + .long("config") + .value_name("FILEPATH") + .takes_value(true) + .global(true) + .help("Configuration file to use"); + if let Some(ref config_file) = *CONFIG_FILE { + arg.default_value(config_file) + } else { + arg + } + }) + .arg( + Arg::with_name("json_rpc_url") + .short("u") + .long("url") + .value_name("URL_OR_MONIKER") + .takes_value(true) + .global(true) + .validator(is_url_or_moniker) + .help( + "URL for Solana's JSON RPC or moniker (or their first letter): \ + [mainnet-beta, testnet, devnet, localhost]", + ), + ) + .arg( + Arg::with_name("websocket_url") + .long("ws") + .value_name("URL") + .takes_value(true) + .global(true) + .validator(is_url) + .help("WebSocket URL for the solana cluster"), + ) .arg( Arg::with_name("entrypoint") .short("n") @@ -189,6 +248,12 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { "Wait until epochs are this many slots long.", ), ) + .arg( + Arg::with_name("tpu_client") + .long("use-tpu-client") + .takes_value(false) + .help("Submit transactions with a TpuClient") + ) } /// Parses a clap `ArgMatches` structure into a `Config` @@ -199,6 +264,35 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { pub fn extract_args(matches: &ArgMatches) -> Config { let mut args = Config::default(); + let config = if let Some(config_file) = matches.value_of("config_file") { + solana_cli_config::Config::load(config_file).unwrap_or_default() + } else { + solana_cli_config::Config::default() + }; + let (_, json_rpc_url) = ConfigInput::compute_json_rpc_url_setting( + matches.value_of("json_rpc_url").unwrap_or(""), + &config.json_rpc_url, + ); + args.json_rpc_url = json_rpc_url; + + let (_, websocket_url) = ConfigInput::compute_websocket_url_setting( + matches.value_of("websocket_url").unwrap_or(""), + &config.websocket_url, + matches.value_of("json_rpc_url").unwrap_or(""), + &config.json_rpc_url, + ); + args.websocket_url = websocket_url; + + let (_, id_path) = ConfigInput::compute_keypair_path_setting( + matches.value_of("identity").unwrap_or(""), + &config.keypair_path, + ); + args.id = read_keypair_file(id_path).expect("could not parse identity path"); + + if matches.is_present("tpu_client") { + args.external_client_type = ExternalClientType::TpuClient; + } + if let Some(addr) = matches.value_of("entrypoint") { args.entrypoint_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| { eprintln!("failed to parse entrypoint address: {}", e); @@ -206,11 +300,6 @@ pub fn extract_args(matches: &ArgMatches) -> Config { }); } - if matches.is_present("identity") { - args.id = read_keypair_file(matches.value_of("identity").unwrap()) - .expect("can't read client identity"); - } - if let Some(t) = matches.value_of("threads") { args.threads = t.to_string().parse().expect("can't parse threads"); } diff --git a/bench-tps/src/keypairs.rs b/bench-tps/src/keypairs.rs new file mode 100644 index 0000000000..e165e484d5 --- /dev/null +++ b/bench-tps/src/keypairs.rs @@ -0,0 +1,72 @@ +use { + crate::{ + bench::{fund_keypairs, generate_and_fund_keypairs}, + bench_tps_client::BenchTpsClient, + }, + log::*, + solana_genesis::Base64Account, + solana_sdk::signature::{Keypair, Signer}, + std::{collections::HashMap, fs::File, path::Path, process::exit, sync::Arc}, +}; + +pub fn get_keypairs( + client: Arc, + id: &Keypair, + keypair_count: usize, + num_lamports_per_account: u64, + client_ids_and_stake_file: &str, + read_from_client_file: bool, +) -> Vec +where + T: 'static + BenchTpsClient + Send + Sync, +{ + if read_from_client_file { + let path = Path::new(client_ids_and_stake_file); + let file = File::open(path).unwrap(); + + info!("Reading {}", client_ids_and_stake_file); + let accounts: HashMap = serde_yaml::from_reader(file).unwrap(); + let mut keypairs = vec![]; + let mut last_balance = 0; + + accounts + .into_iter() + .for_each(|(keypair, primordial_account)| { + let bytes: Vec = serde_json::from_str(keypair.as_str()).unwrap(); + keypairs.push(Keypair::from_bytes(&bytes).unwrap()); + last_balance = primordial_account.balance; + }); + + if keypairs.len() < keypair_count { + eprintln!( + "Expected {} accounts in {}, only received {} (--tx_count mismatch?)", + keypair_count, + client_ids_and_stake_file, + keypairs.len(), + ); + exit(1); + } + // Sort keypairs so that do_bench_tps() uses the same subset of accounts for each run. + // This prevents the amount of storage needed for bench-tps accounts from creeping up + // across multiple runs. + keypairs.sort_by_key(|x| x.pubkey().to_string()); + fund_keypairs( + client, + id, + &keypairs, + keypairs.len().saturating_sub(keypair_count) as u64, + last_balance, + ) + .unwrap_or_else(|e| { + eprintln!("Error could not fund keys: {:?}", e); + exit(1); + }); + keypairs + } else { + generate_and_fund_keypairs(client, id, keypair_count, num_lamports_per_account) + .unwrap_or_else(|e| { + eprintln!("Error could not fund keys: {:?}", e); + exit(1); + }) + } +} diff --git a/bench-tps/src/lib.rs b/bench-tps/src/lib.rs index 6897b54d26..06d5eaa1af 100644 --- a/bench-tps/src/lib.rs +++ b/bench-tps/src/lib.rs @@ -1,5 +1,6 @@ #![allow(clippy::integer_arithmetic)] pub mod bench; -mod bench_tps_client; +pub mod bench_tps_client; pub mod cli; +pub mod keypairs; mod perf_utils; diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 88cd75d943..701f0967ad 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -2,15 +2,18 @@ use { log::*, solana_bench_tps::{ - bench::{do_bench_tps, fund_keypairs, generate_and_fund_keypairs, generate_keypairs}, - cli, + bench::{do_bench_tps, generate_keypairs}, + cli::{self, ExternalClientType}, + keypairs::get_keypairs, + }, + solana_client::{ + rpc_client::RpcClient, + tpu_client::{TpuClient, TpuClientConfig}, }, solana_genesis::Base64Account, solana_gossip::gossip_service::{discover_cluster, get_client, get_multi_client}, solana_sdk::{ - fee_calculator::FeeRateGovernor, - signature::{Keypair, Signer}, - system_program, + commitment_config::CommitmentConfig, fee_calculator::FeeRateGovernor, system_program, }, solana_streamer::socket::SocketAddrSpace, std::{collections::HashMap, fs::File, io::prelude::*, path::Path, process::exit, sync::Arc}, @@ -28,6 +31,8 @@ fn main() { let cli::Config { entrypoint_addr, + json_rpc_url, + websocket_url, id, num_nodes, tx_count, @@ -39,6 +44,7 @@ fn main() { multi_client, num_lamports_per_account, target_node, + external_client_type, .. } = &cli_config; @@ -74,88 +80,72 @@ fn main() { } info!("Connecting to the cluster"); - let nodes = discover_cluster(entrypoint_addr, *num_nodes, SocketAddrSpace::Unspecified) - .unwrap_or_else(|err| { - eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); - exit(1); - }); - let client = if *multi_client { - let (client, num_clients) = get_multi_client(&nodes, &SocketAddrSpace::Unspecified); - if nodes.len() < num_clients { - eprintln!( - "Error: Insufficient nodes discovered. Expecting {} or more", - num_nodes - ); - exit(1); - } - Arc::new(client) - } else if let Some(target_node) = target_node { - info!("Searching for target_node: {:?}", target_node); - let mut target_client = None; - for node in nodes { - if node.id == *target_node { - target_client = Some(Arc::new(get_client(&[node], &SocketAddrSpace::Unspecified))); - break; - } - } - target_client.unwrap_or_else(|| { - eprintln!("Target node {} not found", target_node); - exit(1); - }) - } else { - Arc::new(get_client(&nodes, &SocketAddrSpace::Unspecified)) - }; - - let keypairs = if *read_from_client_file { - let path = Path::new(&client_ids_and_stake_file); - let file = File::open(path).unwrap(); - - info!("Reading {}", client_ids_and_stake_file); - let accounts: HashMap = serde_yaml::from_reader(file).unwrap(); - let mut keypairs = vec![]; - let mut last_balance = 0; - - accounts - .into_iter() - .for_each(|(keypair, primordial_account)| { - let bytes: Vec = serde_json::from_str(keypair.as_str()).unwrap(); - keypairs.push(Keypair::from_bytes(&bytes).unwrap()); - last_balance = primordial_account.balance; - }); - - if keypairs.len() < keypair_count { - eprintln!( - "Expected {} accounts in {}, only received {} (--tx_count mismatch?)", + match external_client_type { + ExternalClientType::ThinClient => { + let nodes = discover_cluster(entrypoint_addr, *num_nodes, SocketAddrSpace::Unspecified) + .unwrap_or_else(|err| { + eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); + exit(1); + }); + let client = if *multi_client { + let (client, num_clients) = get_multi_client(&nodes, &SocketAddrSpace::Unspecified); + if nodes.len() < num_clients { + eprintln!( + "Error: Insufficient nodes discovered. Expecting {} or more", + num_nodes + ); + exit(1); + } + Arc::new(client) + } else if let Some(target_node) = target_node { + info!("Searching for target_node: {:?}", target_node); + let mut target_client = None; + for node in nodes { + if node.id == *target_node { + target_client = + Some(Arc::new(get_client(&[node], &SocketAddrSpace::Unspecified))); + break; + } + } + target_client.unwrap_or_else(|| { + eprintln!("Target node {} not found", target_node); + exit(1); + }) + } else { + Arc::new(get_client(&nodes, &SocketAddrSpace::Unspecified)) + }; + let keypairs = get_keypairs( + client.clone(), + id, keypair_count, + *num_lamports_per_account, client_ids_and_stake_file, - keypairs.len(), + *read_from_client_file, ); - exit(1); + do_bench_tps(client, cli_config, keypairs); } - // Sort keypairs so that do_bench_tps() uses the same subset of accounts for each run. - // This prevents the amount of storage needed for bench-tps accounts from creeping up - // across multiple runs. - keypairs.sort_by_key(|x| x.pubkey().to_string()); - fund_keypairs( - client.clone(), - id, - &keypairs, - keypairs.len().saturating_sub(keypair_count) as u64, - last_balance, - ) - .unwrap_or_else(|e| { - eprintln!("Error could not fund keys: {:?}", e); - exit(1); - }); - keypairs - } else { - generate_and_fund_keypairs(client.clone(), id, keypair_count, *num_lamports_per_account) - .unwrap_or_else(|e| { - eprintln!("Error could not fund keys: {:?}", e); - exit(1); - }) - }; - - do_bench_tps(client, cli_config, keypairs); + ExternalClientType::TpuClient => { + let rpc_client = Arc::new(RpcClient::new_with_commitment( + json_rpc_url.to_string(), + CommitmentConfig::confirmed(), + )); + let client = Arc::new( + TpuClient::new(rpc_client, websocket_url, TpuClientConfig::default()) + .unwrap_or_else(|err| { + eprintln!("Could not create TpuClient {:?}", err); + exit(1); + }), + ); + let keypairs = get_keypairs( + client.clone(), + id, + keypair_count, + *num_lamports_per_account, + client_ids_and_stake_file, + *read_from_client_file, + ); + do_bench_tps(client, cli_config, keypairs); + } + } } diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index 24f1bb26a8..33a7dbc9e0 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -6,16 +6,24 @@ use { bench::{do_bench_tps, generate_and_fund_keypairs}, cli::Config, }, - solana_client::thin_client::create_client, + solana_client::{ + rpc_client::RpcClient, + thin_client::create_client, + tpu_client::{TpuClient, TpuClientConfig}, + }, solana_core::validator::ValidatorConfig, - solana_faucet::faucet::run_local_faucet_with_port, + solana_faucet::faucet::{run_local_faucet, run_local_faucet_with_port}, solana_local_cluster::{ local_cluster::{ClusterConfig, LocalCluster}, validator_configs::make_identical_validator_configs, }, solana_rpc::rpc::JsonRpcConfig, - solana_sdk::signature::{Keypair, Signer}, + solana_sdk::{ + commitment_config::CommitmentConfig, + signature::{Keypair, Signer}, + }, solana_streamer::socket::SocketAddrSpace, + solana_test_validator::TestValidator, std::{sync::Arc, time::Duration}, }; @@ -83,6 +91,43 @@ fn test_bench_tps_local_cluster(config: Config) { assert!(_total > 100); } +fn test_bench_tps_test_validator(config: Config) { + solana_logger::setup(); + + let mint_keypair = Keypair::new(); + let mint_pubkey = mint_keypair.pubkey(); + + let faucet_addr = run_local_faucet(mint_keypair, None); + + let test_validator = + TestValidator::with_no_fees(mint_pubkey, Some(faucet_addr), SocketAddrSpace::Unspecified); + + let rpc_client = Arc::new(RpcClient::new_with_commitment( + test_validator.rpc_url(), + CommitmentConfig::processed(), + )); + let websocket_url = test_validator.rpc_pubsub_url(); + + let client = + Arc::new(TpuClient::new(rpc_client, &websocket_url, TpuClientConfig::default()).unwrap()); + + let lamports_per_account = 100; + + let keypair_count = config.tx_count * config.keypair_multiplier; + let keypairs = generate_and_fund_keypairs( + client.clone(), + &config.id, + keypair_count, + lamports_per_account, + ) + .unwrap(); + + let _total = do_bench_tps(client, config, keypairs); + + #[cfg(not(debug_assertions))] + assert!(_total > 100); +} + #[test] #[serial] fn test_bench_tps_local_cluster_solana() { @@ -92,3 +137,13 @@ fn test_bench_tps_local_cluster_solana() { ..Config::default() }); } + +#[test] +#[serial] +fn test_bench_tps_tpu_client() { + test_bench_tps_test_validator(Config { + tx_count: 100, + duration: Duration::from_secs(10), + ..Config::default() + }); +} diff --git a/client/src/http_sender.rs b/client/src/http_sender.rs index f1e3601b6c..19c710b241 100644 --- a/client/src/http_sender.rs +++ b/client/src/http_sender.rs @@ -197,6 +197,10 @@ impl RpcSender for HttpSender { return Ok(json["result"].take()); } } + + fn url(&self) -> String { + self.url.clone() + } } #[cfg(test)] diff --git a/client/src/mock_sender.rs b/client/src/mock_sender.rs index 3b6c0856df..37a29d99fe 100644 --- a/client/src/mock_sender.rs +++ b/client/src/mock_sender.rs @@ -468,4 +468,8 @@ impl RpcSender for MockSender { }; Ok(val) } + + fn url(&self) -> String { + format!("MockSender: {}", self.url) + } } diff --git a/client/src/nonblocking/rpc_client.rs b/client/src/nonblocking/rpc_client.rs index 29dee89aa3..1dc38a657d 100644 --- a/client/src/nonblocking/rpc_client.rs +++ b/client/src/nonblocking/rpc_client.rs @@ -502,6 +502,11 @@ impl RpcClient { Self::new_with_timeout(url, timeout) } + /// Get the configured url of the client's sender + pub fn url(&self) -> String { + self.sender.url() + } + async fn get_node_version(&self) -> Result { let r_node_version = self.node_version.read().await; if let Some(version) = &*r_node_version { diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 1f670b5d75..2985c0ca75 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -535,6 +535,11 @@ impl RpcClient { Self::new_with_timeout(url, timeout) } + /// Get the configured url of the client's sender + pub fn url(&self) -> String { + self.rpc_client.url() + } + /// Get the configured default [commitment level][cl]. /// /// [cl]: https://docs.solana.com/developing/clients/jsonrpc-api#configuring-state-commitment diff --git a/client/src/rpc_sender.rs b/client/src/rpc_sender.rs index 10b3dbe7bd..dded04dfc6 100644 --- a/client/src/rpc_sender.rs +++ b/client/src/rpc_sender.rs @@ -32,4 +32,5 @@ pub trait RpcSender { params: serde_json::Value, ) -> Result; fn get_transport_stats(&self) -> RpcTransportStats; + fn url(&self) -> String; } diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 10c649e3d1..29efc97228 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -90,20 +90,49 @@ impl TpuClient { /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool { - let mut sent = false; + self.try_send_wire_transaction(wire_transaction).is_ok() + } + + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// size + /// Returns the last error if all sends fail + pub fn try_send_transaction( + &self, + transaction: &Transaction, + ) -> std::result::Result<(), std::io::Error> { + let wire_transaction = serialize(transaction).expect("serialization should succeed"); + self.try_send_wire_transaction(&wire_transaction) + } + + /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size + /// Returns the last error if all sends fail + fn try_send_wire_transaction( + &self, + wire_transaction: &[u8], + ) -> std::result::Result<(), std::io::Error> { + let mut last_error: Option = None; + let mut some_success = false; + for tpu_address in self .leader_tpu_service .leader_tpu_sockets(self.fanout_slots) { - if self - .send_socket - .send_to(wire_transaction, tpu_address) - .is_ok() - { - sent = true; + let result = self.send_socket.send_to(wire_transaction, tpu_address); + if let Err(err) = result { + last_error = Some(err); + } else { + some_success = true; } } - sent + if !some_success { + Err(if let Some(err) = last_error { + err + } else { + std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted") + }) + } else { + Ok(()) + } } /// Create a new client that disconnects when dropped @@ -266,6 +295,10 @@ impl TpuClient { } Err(TpuSenderError::Custom("Max retries exceeded".into())) } + + pub fn rpc_client(&self) -> &RpcClient { + &self.rpc_client + } } impl Drop for TpuClient {