From 17b00ad3a4f5096ce832562a636aeca658209d23 Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Wed, 9 Mar 2022 21:33:05 -0500 Subject: [PATCH] Add quic-client module (#23166) * Add quic-client module to send transactions via quic, abstracted behind the TpuConnection trait (along with a legacy UDP implementation of TpuConnection) and change thin-client to use TpuConnection --- Cargo.lock | 21 +++ bench-tps/src/bench.rs | 10 +- client/Cargo.toml | 6 + client/src/client_error.rs | 13 ++ client/src/lib.rs | 3 + client/src/quic_client.rs | 208 +++++++++++++++++++++++++++ client/src/thin_client.rs | 88 ++++++------ client/src/tpu_connection.rs | 19 +++ client/src/udp_client.rs | 42 ++++++ gossip/src/gossip_service.rs | 20 ++- local-cluster/src/cluster.rs | 4 +- local-cluster/src/local_cluster.rs | 11 +- local-cluster/tests/local_cluster.rs | 5 +- programs/bpf/Cargo.lock | 166 ++++++++++++++++++++- runtime/src/bank_client.rs | 7 + sdk/src/client.rs | 2 + sdk/src/quic.rs | 4 + streamer/src/quic.rs | 4 +- 18 files changed, 568 insertions(+), 65 deletions(-) create mode 100644 client/src/quic_client.rs create mode 100644 client/src/tpu_connection.rs create mode 100644 client/src/udp_client.rs diff --git a/Cargo.lock b/Cargo.lock index 5ce4d732dc..da6b800801 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,6 +145,15 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.2" @@ -1312,6 +1321,12 @@ dependencies = [ "tower-service", ] +[[package]] +name = "event-listener" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" + [[package]] name = "fake-simd" version = "0.1.2" @@ -4543,19 +4558,25 @@ name = "solana-client" version = "1.10.1" dependencies = [ "assert_matches", + "async-mutex", "async-trait", "base64 0.13.0", "bincode", "bs58 0.4.0", + "bytes", "clap 2.33.3", "crossbeam-channel", + "futures 0.3.21", "futures-util", "indicatif", + "itertools 0.10.3", "jsonrpc-core", "jsonrpc-http-server", "log", + "quinn", "rayon", "reqwest", + "rustls 0.20.4", "semver 1.0.6", "serde", "serde_derive", diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 16e32a1215..0ee62109e4 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -475,6 +475,7 @@ fn do_tx_transfers( let tx_len = txs0.len(); let transfer_start = Instant::now(); let mut old_transactions = false; + let mut transactions = Vec::<_>::new(); for tx in txs0 { let now = timestamp(); // Transactions that are too old will be rejected by the cluster Don't bother @@ -483,10 +484,13 @@ fn do_tx_transfers( old_transactions = true; continue; } - client - .async_send_transaction(tx.0) - .expect("async_send_transaction in do_tx_transfers"); + transactions.push(tx.0); } + + if let Err(error) = client.async_send_batch(transactions) { + warn!("send_batch_sync in do_tx_transfers failed: {}", error); + } + if old_transactions { let mut shared_txs_wl = shared_txs.write().expect("write lock in do_tx_transfers"); shared_txs_wl.clear(); diff --git a/client/Cargo.toml b/client/Cargo.toml index 8e3e7ce4c9..7886fa0157 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -10,18 +10,24 @@ license = "Apache-2.0" edition = "2021" [dependencies] +async-mutex = "1.4.0" async-trait = "0.1.52" base64 = "0.13.0" bincode = "1.3.3" bs58 = "0.4.0" +bytes = "1.1.0" clap = "2.33.0" crossbeam-channel = "0.5" +futures = "0.3" futures-util = "0.3.21" indicatif = "0.16.2" +itertools = "0.10.2" jsonrpc-core = "18.0.0" log = "0.4.14" +quinn = "0.8.0" rayon = "1.5.1" reqwest = { version = "0.11.9", default-features = false, features = ["blocking", "rustls-tls", "json"] } +rustls = { version = "0.20.2", features = ["dangerous_configuration"] } semver = "1.0.6" serde = "1.0.136" serde_derive = "1.0.103" diff --git a/client/src/client_error.rs b/client/src/client_error.rs index 64d1d5c2f6..783919d517 100644 --- a/client/src/client_error.rs +++ b/client/src/client_error.rs @@ -1,6 +1,7 @@ pub use reqwest; use { crate::{rpc_request, rpc_response}, + quinn::{ConnectError, WriteError}, solana_faucet::faucet::FaucetError, solana_sdk::{ signature::SignerError, transaction::TransactionError, transport::TransportError, @@ -72,6 +73,18 @@ impl From for TransportError { } } +impl From for ClientErrorKind { + fn from(write_error: WriteError) -> Self { + Self::Custom(format!("{:?}", write_error)) + } +} + +impl From for ClientErrorKind { + fn from(connect_error: ConnectError) -> Self { + Self::Custom(format!("{:?}", connect_error)) + } +} + #[derive(Error, Debug)] #[error("{kind}")] pub struct ClientError { diff --git a/client/src/lib.rs b/client/src/lib.rs index 5be24dee1b..6395ef286c 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -10,6 +10,7 @@ pub mod nonblocking; pub mod nonce_utils; pub mod perf_utils; pub mod pubsub_client; +pub mod quic_client; pub mod rpc_cache; pub mod rpc_client; pub mod rpc_config; @@ -22,7 +23,9 @@ pub mod rpc_sender; pub mod spinner; pub mod thin_client; pub mod tpu_client; +pub mod tpu_connection; pub mod transaction_executor; +pub mod udp_client; pub mod mock_sender_for_cli { /// Magic `SIGNATURE` value used by `solana-cli` unit tests. diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs new file mode 100644 index 0000000000..8abb8ba2fd --- /dev/null +++ b/client/src/quic_client.rs @@ -0,0 +1,208 @@ +//! Simple client that connects to a given UDP port with the QUIC protocol and provides +//! an interface for sending transactions which is restricted by the server's flow control. + +use { + crate::{client_error::ClientErrorKind, tpu_connection::TpuConnection}, + async_mutex::Mutex, + futures::future::join_all, + itertools::Itertools, + quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError}, + rayon::iter::{IntoParallelIterator, ParallelIterator}, + solana_sdk::{ + quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET}, + transaction::Transaction, + transport::Result as TransportResult, + }, + std::{ + net::{SocketAddr, UdpSocket}, + sync::Arc, + }, + tokio::runtime::Runtime, +}; + +struct SkipServerVerification; + +impl SkipServerVerification { + pub fn new() -> Arc { + Arc::new(Self) + } +} + +impl rustls::client::ServerCertVerifier for SkipServerVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } +} + +struct QuicClient { + runtime: Runtime, + endpoint: Endpoint, + connection: Arc>>>, + addr: SocketAddr, +} + +pub struct QuicTpuConnection { + client: Arc, +} + +impl TpuConnection for QuicTpuConnection { + fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self { + let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET); + let client = Arc::new(QuicClient::new(client_socket, tpu_addr)); + + Self { client } + } + + fn tpu_addr(&self) -> &SocketAddr { + &self.client.addr + } + + fn send_wire_transaction(&self, data: Vec) -> TransportResult<()> { + let _guard = self.client.runtime.enter(); + let send_buffer = self.client.send_buffer(&data[..]); + self.client.runtime.block_on(send_buffer)?; + Ok(()) + } + + fn send_batch(&self, transactions: Vec) -> TransportResult<()> { + let buffers = transactions + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + + let _guard = self.client.runtime.enter(); + let send_batch = self.client.send_batch(&buffers[..]); + self.client.runtime.block_on(send_batch)?; + Ok(()) + } +} + +impl QuicClient { + pub fn new(client_socket: UdpSocket, addr: SocketAddr) -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let _guard = runtime.enter(); + + let crypto = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_no_client_auth(); + + let create_endpoint = QuicClient::create_endpoint(EndpointConfig::default(), client_socket); + + let mut endpoint = runtime.block_on(create_endpoint); + + endpoint.set_default_client_config(ClientConfig::new(Arc::new(crypto))); + + Self { + runtime, + endpoint, + connection: Arc::new(Mutex::new(None)), + addr, + } + } + + // If this function becomes public, it should be changed to + // not expose details of the specific Quic implementation we're using + async fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint { + quinn::Endpoint::new(config, None, client_socket).unwrap().0 + } + + async fn _send_buffer_using_conn( + data: &[u8], + connection: &NewConnection, + ) -> Result<(), WriteError> { + let mut send_stream = connection.connection.open_uni().await?; + send_stream.write_all(data).await?; + send_stream.finish().await?; + Ok(()) + } + + // Attempts to send data, connecting/reconnecting as necessary + // On success, returns the connection used to successfully send the data + async fn _send_buffer(&self, data: &[u8]) -> Result, WriteError> { + let connection = { + let mut conn_guard = self.connection.lock().await; + + let maybe_conn = (*conn_guard).clone(); + match maybe_conn { + Some(conn) => conn.clone(), + None => { + let connecting = self.endpoint.connect(self.addr, "connect").unwrap(); + let connection = Arc::new(connecting.await?); + *conn_guard = Some(connection.clone()); + connection + } + } + }; + match Self::_send_buffer_using_conn(data, &connection).await { + Ok(()) => Ok(connection), + _ => { + let connection = { + let connecting = self.endpoint.connect(self.addr, "connect").unwrap(); + let connection = Arc::new(connecting.await?); + let mut conn_guard = self.connection.lock().await; + *conn_guard = Some(connection.clone()); + connection + }; + Self::_send_buffer_using_conn(data, &connection).await?; + Ok(connection) + } + } + } + + pub async fn send_buffer(&self, data: &[u8]) -> Result<(), ClientErrorKind> { + self._send_buffer(data).await?; + Ok(()) + } + + pub async fn send_batch(&self, buffers: &[Vec]) -> Result<(), ClientErrorKind> { + // Start off by "testing" the connection by sending the first transaction + // This will also connect to the server if not already connected + // and reconnect and retry if the first send attempt failed + // (for example due to a timed out connection), returning an error + // or the connection that was used to successfully send the transaction. + // We will use the returned connection to send the rest of the transactions in the batch + // to avoid touching the mutex in self, and not bother reconnecting if we fail along the way + // since testing even in the ideal GCE environment has found no cases + // where reconnecting and retrying in the middle of a batch send + // (i.e. we encounter a connection error in the middle of a batch send, which presumably cannot + // be due to a timed out connection) has succeeded + if buffers.is_empty() { + return Ok(()); + } + let connection = self._send_buffer(&buffers[0][..]).await?; + + // Used to avoid dereferencing the Arc multiple times below + // by just getting a reference to the NewConnection once + let connection_ref: &NewConnection = &connection; + + let chunks = buffers[1..buffers.len()] + .iter() + .chunks(QUIC_MAX_CONCURRENT_STREAMS); + + let futures = chunks.into_iter().map(|buffs| { + join_all( + buffs + .into_iter() + .map(|buf| Self::_send_buffer_using_conn(&buf[..], connection_ref)), + ) + }); + + for f in futures { + f.await.into_iter().try_for_each(|res| res)?; + } + Ok(()) + } +} diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 12b50eb606..0321f3f733 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -4,8 +4,10 @@ //! unstable and may change in future releases. use { - crate::{rpc_client::RpcClient, rpc_config::RpcProgramAccountsConfig, rpc_response::Response}, - bincode::{serialize_into, serialized_size}, + crate::{ + rpc_client::RpcClient, rpc_config::RpcProgramAccountsConfig, rpc_response::Response, + tpu_connection::TpuConnection, udp_client::UdpTpuConnection, + }, log::*, solana_sdk::{ account::Account, @@ -17,7 +19,6 @@ use { hash::Hash, instruction::Instruction, message::Message, - packet::PACKET_DATA_SIZE, pubkey::Pubkey, signature::{Keypair, Signature, Signer}, signers::Signers, @@ -117,22 +118,20 @@ impl ClientOptimizer { } /// An object for querying and sending transactions to the network. -pub struct ThinClient { - transactions_socket: UdpSocket, - tpu_addrs: Vec, +pub struct ThinClient { rpc_clients: Vec, + tpu_connections: Vec, optimizer: ClientOptimizer, } -impl ThinClient { +impl ThinClient { /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP - /// and the Tpu at `tpu_addr` over `transactions_socket` using UDP. + /// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP + /// (currently hardcoded to UDP) pub fn new(rpc_addr: SocketAddr, tpu_addr: SocketAddr, transactions_socket: UdpSocket) -> Self { - Self::new_from_client( - tpu_addr, - transactions_socket, - RpcClient::new_socket(rpc_addr), - ) + let tpu_connection = C::new(transactions_socket, tpu_addr); + + Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_connection) } pub fn new_socket_with_timeout( @@ -142,18 +141,14 @@ impl ThinClient { timeout: Duration, ) -> Self { let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout); - Self::new_from_client(tpu_addr, transactions_socket, rpc_client) + let tpu_connection = C::new(transactions_socket, tpu_addr); + Self::new_from_client(rpc_client, tpu_connection) } - fn new_from_client( - tpu_addr: SocketAddr, - transactions_socket: UdpSocket, - rpc_client: RpcClient, - ) -> Self { + fn new_from_client(rpc_client: RpcClient, tpu_connection: C) -> Self { Self { - transactions_socket, - tpu_addrs: vec![tpu_addr], rpc_clients: vec![rpc_client], + tpu_connections: vec![tpu_connection], optimizer: ClientOptimizer::new(0), } } @@ -168,16 +163,19 @@ impl ThinClient { let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect(); let optimizer = ClientOptimizer::new(rpc_clients.len()); + let tpu_connections: Vec<_> = tpu_addrs + .into_iter() + .map(|tpu_addr| C::new(transactions_socket.try_clone().unwrap(), tpu_addr)) + .collect(); Self { - transactions_socket, - tpu_addrs, rpc_clients, + tpu_connections, optimizer, } } - fn tpu_addr(&self) -> &SocketAddr { - &self.tpu_addrs[self.optimizer.best()] + fn tpu_connection(&self) -> &C { + &self.tpu_connections[self.optimizer.best()] } fn rpc_client(&self) -> &RpcClient { @@ -205,7 +203,6 @@ impl ThinClient { self.send_and_confirm_transaction(&[keypair], transaction, tries, 0) } - /// Retry sending a signed Transaction to the server for processing pub fn send_and_confirm_transaction( &self, keypairs: &T, @@ -215,18 +212,13 @@ impl ThinClient { ) -> TransportResult { for x in 0..tries { let now = Instant::now(); - let mut buf = vec![0; serialized_size(&transaction).unwrap() as usize]; - let mut wr = std::io::Cursor::new(&mut buf[..]); let mut num_confirmed = 0; let mut wait_time = MAX_PROCESSING_AGE; - serialize_into(&mut wr, &transaction) - .expect("serialize Transaction in pub fn transfer_signed"); // resend the same transaction until the transaction has no chance of succeeding while now.elapsed().as_secs() < wait_time as u64 { if num_confirmed == 0 { // Send the transaction if there has been no confirmation (e.g. the first time) - self.transactions_socket - .send_to(&buf[..], &self.tpu_addr())?; + self.tpu_connection().send_transaction(transaction)?; } if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation( @@ -321,13 +313,13 @@ impl ThinClient { } } -impl Client for ThinClient { +impl Client for ThinClient { fn tpu_addr(&self) -> String { - self.tpu_addr().to_string() + self.tpu_connection().tpu_addr().to_string() } } -impl SyncClient for ThinClient { +impl SyncClient for ThinClient { fn send_and_confirm_message( &self, keypairs: &T, @@ -607,17 +599,16 @@ impl SyncClient for ThinClient { } } -impl AsyncClient for ThinClient { +impl AsyncClient for ThinClient { fn async_send_transaction(&self, transaction: Transaction) -> TransportResult { - let mut buf = vec![0; serialized_size(&transaction).unwrap() as usize]; - let mut wr = std::io::Cursor::new(&mut buf[..]); - serialize_into(&mut wr, &transaction) - .expect("serialize Transaction in pub fn transfer_signed"); - assert!(buf.len() < PACKET_DATA_SIZE); - self.transactions_socket - .send_to(&buf[..], &self.tpu_addr())?; + self.tpu_connection().send_transaction(&transaction)?; Ok(transaction.signatures[0]) } + + fn async_send_batch(&self, transactions: Vec) -> TransportResult<()> { + self.tpu_connection().send_batch(transactions) + } + fn async_send_message( &self, keypairs: &T, @@ -649,20 +640,23 @@ impl AsyncClient for ThinClient { } } -pub fn create_client((rpc, tpu): (SocketAddr, SocketAddr), range: (u16, u16)) -> ThinClient { +pub fn create_client( + (rpc, tpu): (SocketAddr, SocketAddr), + range: (u16, u16), +) -> ThinClient { let (_, transactions_socket) = solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), range).unwrap(); - ThinClient::new(rpc, tpu, transactions_socket) + ThinClient::::new(rpc, tpu, transactions_socket) } pub fn create_client_with_timeout( (rpc, tpu): (SocketAddr, SocketAddr), range: (u16, u16), timeout: Duration, -) -> ThinClient { +) -> ThinClient { let (_, transactions_socket) = solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), range).unwrap(); - ThinClient::new_socket_with_timeout(rpc, tpu, transactions_socket, timeout) + ThinClient::::new_socket_with_timeout(rpc, tpu, transactions_socket, timeout) } #[cfg(test)] diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs new file mode 100644 index 0000000000..4b5d434d79 --- /dev/null +++ b/client/src/tpu_connection.rs @@ -0,0 +1,19 @@ +use { + solana_sdk::{transaction::Transaction, transport::Result as TransportResult}, + std::net::{SocketAddr, UdpSocket}, +}; + +pub trait TpuConnection { + fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self; + + fn tpu_addr(&self) -> &SocketAddr; + + fn send_transaction(&self, tx: &Transaction) -> TransportResult<()> { + let data = bincode::serialize(tx).expect("serialize Transaction in send_transaction"); + self.send_wire_transaction(data) + } + + fn send_wire_transaction(&self, data: Vec) -> TransportResult<()>; + + fn send_batch(&self, transactions: Vec) -> TransportResult<()>; +} diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs new file mode 100644 index 0000000000..ef3ebeb45e --- /dev/null +++ b/client/src/udp_client.rs @@ -0,0 +1,42 @@ +//! Simple TPU client that communicates with the given UDP port with UDP and provides +//! an interface for sending transactions + +use { + crate::tpu_connection::TpuConnection, + solana_sdk::{transaction::Transaction, transport::Result as TransportResult}, + std::net::{SocketAddr, UdpSocket}, +}; + +pub struct UdpTpuConnection { + socket: UdpSocket, + addr: SocketAddr, +} + +impl TpuConnection for UdpTpuConnection { + fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self { + Self { + socket: client_socket, + addr: tpu_addr, + } + } + + fn tpu_addr(&self) -> &SocketAddr { + &self.addr + } + + fn send_wire_transaction(&self, data: Vec) -> TransportResult<()> { + self.socket.send_to(&data[..], self.addr)?; + Ok(()) + } + + fn send_batch(&self, transactions: Vec) -> TransportResult<()> { + transactions + .into_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .try_for_each(|buff| -> TransportResult<()> { + self.socket.send_to(&buff[..], self.addr)?; + Ok(()) + })?; + Ok(()) + } +} diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 42b2ee2363..c81bc7de07 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -7,7 +7,10 @@ use { }, crossbeam_channel::{unbounded, Sender}, rand::{thread_rng, Rng}, - solana_client::thin_client::{create_client, ThinClient}, + solana_client::{ + thin_client::{create_client, ThinClient}, + udp_client::UdpTpuConnection, + }, solana_perf::recycler::Recycler, solana_runtime::bank_forks::BankForks, solana_sdk::{ @@ -194,7 +197,10 @@ pub fn discover( } /// Creates a ThinClient per valid node -pub fn get_clients(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> Vec { +pub fn get_clients( + nodes: &[ContactInfo], + socket_addr_space: &SocketAddrSpace, +) -> Vec> { nodes .iter() .filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) @@ -203,7 +209,10 @@ pub fn get_clients(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) - } /// Creates a ThinClient by selecting a valid node at random -pub fn get_client(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> ThinClient { +pub fn get_client( + nodes: &[ContactInfo], + socket_addr_space: &SocketAddrSpace, +) -> ThinClient { let nodes: Vec<_> = nodes .iter() .filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) @@ -215,7 +224,7 @@ pub fn get_client(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> pub fn get_multi_client( nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace, -) -> (ThinClient, usize) { +) -> (ThinClient, usize) { let addrs: Vec<_> = nodes .iter() .filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) @@ -229,7 +238,8 @@ pub fn get_multi_client( .unwrap(); let num_nodes = tpu_addrs.len(); ( - ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, transactions_socket), + //TODO: make it configurable whether to use quic + ThinClient::::new_from_addrs(rpc_addrs, tpu_addrs, transactions_socket), num_nodes, ) } diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index b5bcf658fb..260ddaabaf 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -1,5 +1,5 @@ use { - solana_client::thin_client::ThinClient, + solana_client::{thin_client::ThinClient, udp_client::UdpTpuConnection}, solana_core::validator::{Validator, ValidatorConfig}, solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, solana_sdk::{pubkey::Pubkey, signature::Keypair}, @@ -36,7 +36,7 @@ impl ClusterValidatorInfo { pub trait Cluster { fn get_node_pubkeys(&self) -> Vec; - fn get_validator_client(&self, pubkey: &Pubkey) -> Option; + fn get_validator_client(&self, pubkey: &Pubkey) -> Option>; fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo>; fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo; fn restart_node( diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index c6fc673f0e..9c9f9958b4 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -6,7 +6,10 @@ use { }, itertools::izip, log::*, - solana_client::thin_client::{create_client, ThinClient}, + solana_client::{ + thin_client::{create_client, ThinClient}, + udp_client::UdpTpuConnection, + }, solana_core::{ tower_storage::FileTowerStorage, validator::{Validator, ValidatorConfig, ValidatorStartProgress}, @@ -535,7 +538,7 @@ impl LocalCluster { } fn transfer_with_client( - client: &ThinClient, + client: &ThinClient, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64, @@ -564,7 +567,7 @@ impl LocalCluster { } fn setup_vote_and_stake_accounts( - client: &ThinClient, + client: &ThinClient, vote_account: &Keypair, from_account: &Arc, amount: u64, @@ -701,7 +704,7 @@ impl Cluster for LocalCluster { self.validators.keys().cloned().collect() } - fn get_validator_client(&self, pubkey: &Pubkey) -> Option { + fn get_validator_client(&self, pubkey: &Pubkey) -> Option> { self.validators.get(pubkey).map(|f| { create_client( f.info.contact_info.client_facing_addr(), diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 01314eb600..55684c0a48 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -17,6 +17,7 @@ use { rpc_config::{RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, rpc_response::RpcSignatureResult, thin_client::{create_client, ThinClient}, + udp_client::UdpTpuConnection, }, solana_core::{ broadcast_stage::BroadcastStageType, @@ -2646,8 +2647,8 @@ fn setup_transfer_scan_threads( num_starting_accounts: usize, exit: Arc, scan_commitment: CommitmentConfig, - update_client_receiver: Receiver, - scan_client_receiver: Receiver, + update_client_receiver: Receiver>, + scan_client_receiver: Receiver>, ) -> ( JoinHandle<()>, JoinHandle<()>, diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 8d77106f2a..dceb635fd4 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -125,6 +125,15 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + [[package]] name = "async-trait" version = "0.1.52" @@ -515,6 +524,22 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" + [[package]] name = "cpufeatures" version = "0.2.1" @@ -918,6 +943,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" + [[package]] name = "fastrand" version = "1.6.0" @@ -1062,6 +1093,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder 1.4.3", +] + [[package]] name = "generic-array" version = "0.12.3" @@ -1596,6 +1636,19 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mio" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" +dependencies = [ + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + [[package]] name = "mio" version = "0.8.0" @@ -1763,6 +1816,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "opentelemetry" version = "0.16.0" @@ -1973,6 +2032,60 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "quinn" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "584865613896a1f644d757e52c45c573441c8b04cac38ac13990b0235203db66" +dependencies = [ + "bytes 1.1.0", + "futures-channel", + "futures-util", + "fxhash", + "quinn-proto", + "quinn-udp", + "rustls", + "thiserror", + "tokio", + "tracing", + "webpki", +] + +[[package]] +name = "quinn-proto" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2b1562bf4998b0c6d1841a4742b7103bb82cdde61374833de826bab9e8ad498" +dependencies = [ + "bytes 1.1.0", + "fxhash", + "rand 0.8.2", + "ring", + "rustls", + "rustls-native-certs", + "rustls-pemfile", + "slab", + "thiserror", + "tinyvec", + "tracing", + "webpki", +] + +[[package]] +name = "quinn-udp" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df185e5e5f7611fa6e628ed8f9633df10114b03bbaecab186ec55822c44ac727" +dependencies = [ + "futures-util", + "libc", + "mio 0.7.14", + "quinn-proto", + "socket2", + "tokio", + "tracing", +] + [[package]] name = "quote" version = "0.6.13" @@ -2263,6 +2376,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "0.2.1" @@ -2293,6 +2418,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -2329,6 +2464,29 @@ dependencies = [ "untrusted", ] +[[package]] +name = "security-framework" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "0.9.0" @@ -3056,18 +3214,24 @@ dependencies = [ name = "solana-client" version = "1.10.1" dependencies = [ + "async-mutex", "async-trait", "base64 0.13.0", "bincode", "bs58 0.4.0", + "bytes 1.1.0", "clap", "crossbeam-channel", + "futures", "futures-util", "indicatif", + "itertools 0.10.3", "jsonrpc-core", "log", + "quinn", "rayon", "reqwest", + "rustls", "semver 1.0.6", "serde", "serde_derive", @@ -3969,7 +4133,7 @@ dependencies = [ "bytes 1.1.0", "libc", "memchr", - "mio", + "mio 0.8.0", "num_cpus", "once_cell", "parking_lot", diff --git a/runtime/src/bank_client.rs b/runtime/src/bank_client.rs index dfe2de40e0..742fc98769 100644 --- a/runtime/src/bank_client.rs +++ b/runtime/src/bank_client.rs @@ -45,6 +45,13 @@ impl AsyncClient for BankClient { Ok(signature) } + fn async_send_batch(&self, transactions: Vec) -> Result<()> { + for t in transactions { + self.async_send_transaction(t)?; + } + Ok(()) + } + fn async_send_message( &self, keypairs: &T, diff --git a/sdk/src/client.rs b/sdk/src/client.rs index 6d54ad590b..3a26974e02 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -175,6 +175,8 @@ pub trait AsyncClient { /// Send a signed transaction, but don't wait to see if the server accepted it. fn async_send_transaction(&self, transaction: transaction::Transaction) -> Result; + fn async_send_batch(&self, transactions: Vec) -> Result<()>; + /// Create a transaction from the given message, and send it to the /// server, but don't wait for to see if the server accepted it. fn async_send_message( diff --git a/sdk/src/quic.rs b/sdk/src/quic.rs index 43a46f91c9..2fe4cf080c 100644 --- a/sdk/src/quic.rs +++ b/sdk/src/quic.rs @@ -1 +1,5 @@ pub const QUIC_PORT_OFFSET: u16 = 6; +// Empirically found max number of concurrent streams +// that seems to maximize TPS on GCE (higher values don't seem to +// give significant improvement or seem to impact stability) +pub const QUIC_MAX_CONCURRENT_STREAMS: usize = 2048; diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index f2f0a07386..8b5dec4712 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -8,6 +8,7 @@ use { solana_perf::packet::PacketBatch, solana_sdk::{ packet::{Packet, PACKET_DATA_SIZE}, + quic::QUIC_MAX_CONCURRENT_STREAMS, signature::Keypair, timing, }, @@ -49,7 +50,8 @@ fn configure_server( .map_err(|_e| QuicServerError::ConfigureFailed)?; let config = Arc::get_mut(&mut server_config.transport).unwrap(); - const MAX_CONCURRENT_UNI_STREAMS: u32 = 1; + // QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability + const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_CONCURRENT_STREAMS * 2) as u32; config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into()); config.stream_receive_window((PACKET_DATA_SIZE as u32).into()); config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());