From 304cd65ecb21aff7ea58900195669232c8379135 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 13 Apr 2022 22:56:29 -0600 Subject: [PATCH] Support quic in bench-tps (#24295) (#24317) * Update comment * Use connection_cache in tpu_client * Add --tpu-use-quic to bench-tps * Use connection_cache async send (cherry picked from commit 26899359d196e33992f34d83138872c3a9154ab9) Co-authored-by: Tyera Eulberg --- bench-tps/src/cli.rs | 13 +++++++++++++ bench-tps/src/main.rs | 8 ++++++++ client/src/tpu_client.rs | 28 ++++++++++++---------------- validator/src/main.rs | 2 +- 4 files changed, 34 insertions(+), 17 deletions(-) diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index da22a0f445..d80567a560 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -51,6 +51,7 @@ pub struct Config { pub target_slots_per_epoch: u64, pub target_node: Option, pub external_client_type: ExternalClientType, + pub use_quic: bool, } impl Default for Config { @@ -76,6 +77,7 @@ impl Default for Config { target_slots_per_epoch: 0, target_node: None, external_client_type: ExternalClientType::default(), + use_quic: false, } } } @@ -264,6 +266,13 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { .takes_value(false) .help("Submit transactions with a TpuClient") ) + .arg( + Arg::with_name("tpu_use_quic") + .long("tpu-use-quic") + .takes_value(false) + .help("Submit transactions via QUIC; only affects ThinClient (default) \ + or TpuClient sends"), + ) } /// Parses a clap `ArgMatches` structure into a `Config` @@ -305,6 +314,10 @@ pub fn extract_args(matches: &ArgMatches) -> Config { args.external_client_type = ExternalClientType::RpcClient; } + if matches.is_present("tpu_use_quic") { + args.use_quic = true; + } + if let Some(addr) = matches.value_of("entrypoint") { args.entrypoint_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| { eprintln!("failed to parse entrypoint address: {}", e); diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index a7c4f524fa..0cf38e7e40 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -7,6 +7,7 @@ use { keypairs::get_keypairs, }, solana_client::{ + connection_cache, rpc_client::RpcClient, tpu_client::{TpuClient, TpuClientConfig}, }, @@ -45,6 +46,7 @@ fn main() { num_lamports_per_account, target_node, external_client_type, + use_quic, .. } = &cli_config; @@ -103,6 +105,9 @@ fn main() { eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); exit(1); }); + if *use_quic { + connection_cache::set_use_quic(true); + } let client = if *multi_client { let (client, num_clients) = get_multi_client(&nodes, &SocketAddrSpace::Unspecified); if nodes.len() < num_clients { @@ -145,6 +150,9 @@ fn main() { json_rpc_url.to_string(), CommitmentConfig::confirmed(), )); + if *use_quic { + connection_cache::set_use_quic(true); + } let client = Arc::new( TpuClient::new(rpc_client, websocket_url, TpuClientConfig::default()) .unwrap_or_else(|err| { diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 29efc97228..abb87244c1 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -1,6 +1,7 @@ use { crate::{ client_error::ClientError, + connection_cache::send_wire_transaction_async, pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription}, rpc_client::RpcClient, rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, @@ -17,6 +18,7 @@ use { signature::SignerError, signers::Signers, transaction::{Transaction, TransactionError}, + transport::{Result as TransportResult, TransportError}, }, std::{ collections::{HashMap, HashSet, VecDeque}, @@ -73,7 +75,7 @@ impl Default for TpuClientConfig { /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info pub struct TpuClient { - send_socket: UdpSocket, + _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket fanout_slots: u64, leader_tpu_service: LeaderTpuService, exit: Arc, @@ -85,39 +87,33 @@ impl TpuClient { /// size pub fn send_transaction(&self, transaction: &Transaction) -> bool { let wire_transaction = serialize(transaction).expect("serialization should succeed"); - self.send_wire_transaction(&wire_transaction) + self.send_wire_transaction(wire_transaction) } /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size - pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool { + pub fn send_wire_transaction(&self, wire_transaction: Vec) -> bool { self.try_send_wire_transaction(wire_transaction).is_ok() } /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size /// Returns the last error if all sends fail - pub fn try_send_transaction( - &self, - transaction: &Transaction, - ) -> std::result::Result<(), std::io::Error> { + pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> { let wire_transaction = serialize(transaction).expect("serialization should succeed"); - self.try_send_wire_transaction(&wire_transaction) + self.try_send_wire_transaction(wire_transaction) } /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Returns the last error if all sends fail - fn try_send_wire_transaction( - &self, - wire_transaction: &[u8], - ) -> std::result::Result<(), std::io::Error> { - let mut last_error: Option = None; + fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { + let mut last_error: Option = None; let mut some_success = false; for tpu_address in self .leader_tpu_service .leader_tpu_sockets(self.fanout_slots) { - let result = self.send_socket.send_to(wire_transaction, tpu_address); + let result = send_wire_transaction_async(wire_transaction.clone(), &tpu_address); if let Err(err) = result { last_error = Some(err); } else { @@ -128,7 +124,7 @@ impl TpuClient { Err(if let Some(err) = last_error { err } else { - std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted") + std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into() }) } else { Ok(()) @@ -146,7 +142,7 @@ impl TpuClient { LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?; Ok(Self { - send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1), leader_tpu_service, exit, diff --git a/validator/src/main.rs b/validator/src/main.rs index f0da3babe0..1793c99969 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1149,7 +1149,7 @@ pub fn main() { Arg::with_name("tpu_use_quic") .long("tpu-use-quic") .takes_value(false) - .help("When this is set to true, the system will use QUIC to send transactions."), + .help("Use QUIC to send transactions."), ) .arg( Arg::with_name("rocksdb_max_compaction_jitter")