From 26899359d196e33992f34d83138872c3a9154ab9 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Wed, 13 Apr 2022 14:17:10 -0400 Subject: [PATCH] Support quic in bench-tps (#24295) * Update comment * Use connection_cache in tpu_client * Add --tpu-use-quic to bench-tps * Use connection_cache async send --- bench-tps/src/cli.rs | 13 +++++++++++++ bench-tps/src/main.rs | 8 ++++++++ client/src/tpu_client.rs | 28 ++++++++++++---------------- validator/src/main.rs | 2 +- 4 files changed, 34 insertions(+), 17 deletions(-) diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index b14bbe4a44..0ed3c1300b 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -49,6 +49,7 @@ pub struct Config { pub target_slots_per_epoch: u64, pub target_node: Option, pub external_client_type: ExternalClientType, + pub use_quic: bool, } impl Default for Config { @@ -74,6 +75,7 @@ impl Default for Config { target_slots_per_epoch: 0, target_node: None, external_client_type: ExternalClientType::default(), + use_quic: false, } } } @@ -254,6 +256,13 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { .takes_value(false) .help("Submit transactions with a TpuClient") ) + .arg( + Arg::with_name("tpu_use_quic") + .long("tpu-use-quic") + .takes_value(false) + .help("Submit transactions via QUIC; only affects ThinClient (default) \ + or TpuClient sends"), + ) } /// Parses a clap `ArgMatches` structure into a `Config` @@ -293,6 +302,10 @@ pub fn extract_args(matches: &ArgMatches) -> Config { args.external_client_type = ExternalClientType::TpuClient; } + if matches.is_present("tpu_use_quic") { + args.use_quic = true; + } + if let Some(addr) = matches.value_of("entrypoint") { args.entrypoint_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| { eprintln!("failed to parse entrypoint address: {}", e); diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 701f0967ad..8667e8e701 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -7,6 +7,7 @@ use { keypairs::get_keypairs, }, solana_client::{ + connection_cache, rpc_client::RpcClient, tpu_client::{TpuClient, TpuClientConfig}, }, @@ -45,6 +46,7 @@ fn main() { num_lamports_per_account, target_node, external_client_type, + use_quic, .. } = &cli_config; @@ -88,6 +90,9 @@ fn main() { eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); exit(1); }); + if *use_quic { + connection_cache::set_use_quic(true); + } let client = if *multi_client { let (client, num_clients) = get_multi_client(&nodes, &SocketAddrSpace::Unspecified); if nodes.len() < num_clients { @@ -130,6 +135,9 @@ fn main() { json_rpc_url.to_string(), CommitmentConfig::confirmed(), )); + if *use_quic { + connection_cache::set_use_quic(true); + } let client = Arc::new( TpuClient::new(rpc_client, websocket_url, TpuClientConfig::default()) .unwrap_or_else(|err| { diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 29efc97228..abb87244c1 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -1,6 +1,7 @@ use { crate::{ client_error::ClientError, + connection_cache::send_wire_transaction_async, pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription}, rpc_client::RpcClient, rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, @@ -17,6 +18,7 @@ use { signature::SignerError, signers::Signers, transaction::{Transaction, TransactionError}, + transport::{Result as TransportResult, TransportError}, }, std::{ collections::{HashMap, HashSet, VecDeque}, @@ -73,7 +75,7 @@ impl Default for TpuClientConfig { /// Client which sends transactions directly to the current leader's TPU port over UDP. /// The client uses RPC to determine the current leader and fetch node contact info pub struct TpuClient { - send_socket: UdpSocket, + _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket fanout_slots: u64, leader_tpu_service: LeaderTpuService, exit: Arc, @@ -85,39 +87,33 @@ impl TpuClient { /// size pub fn send_transaction(&self, transaction: &Transaction) -> bool { let wire_transaction = serialize(transaction).expect("serialization should succeed"); - self.send_wire_transaction(&wire_transaction) + self.send_wire_transaction(wire_transaction) } /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size - pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool { + pub fn send_wire_transaction(&self, wire_transaction: Vec) -> bool { self.try_send_wire_transaction(wire_transaction).is_ok() } /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size /// Returns the last error if all sends fail - pub fn try_send_transaction( - &self, - transaction: &Transaction, - ) -> std::result::Result<(), std::io::Error> { + pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> { let wire_transaction = serialize(transaction).expect("serialization should succeed"); - self.try_send_wire_transaction(&wire_transaction) + self.try_send_wire_transaction(wire_transaction) } /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Returns the last error if all sends fail - fn try_send_wire_transaction( - &self, - wire_transaction: &[u8], - ) -> std::result::Result<(), std::io::Error> { - let mut last_error: Option = None; + fn try_send_wire_transaction(&self, wire_transaction: Vec) -> TransportResult<()> { + let mut last_error: Option = None; let mut some_success = false; for tpu_address in self .leader_tpu_service .leader_tpu_sockets(self.fanout_slots) { - let result = self.send_socket.send_to(wire_transaction, tpu_address); + let result = send_wire_transaction_async(wire_transaction.clone(), &tpu_address); if let Err(err) = result { last_error = Some(err); } else { @@ -128,7 +124,7 @@ impl TpuClient { Err(if let Some(err) = last_error { err } else { - std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted") + std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into() }) } else { Ok(()) @@ -146,7 +142,7 @@ impl TpuClient { LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?; Ok(Self { - send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1), leader_tpu_service, exit, diff --git a/validator/src/main.rs b/validator/src/main.rs index 3f523792bd..cbb071390e 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1171,7 +1171,7 @@ pub fn main() { Arg::with_name("tpu_use_quic") .long("tpu-use-quic") .takes_value(false) - .help("When this is set to true, the system will use QUIC to send transactions."), + .help("Use QUIC to send transactions."), ) .arg( Arg::with_name("rocksdb_max_compaction_jitter")