From c63782f833e9a8c6bd4c4eabb33efc625470d79c Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 21 Mar 2022 21:42:53 +0000 Subject: [PATCH] Made connection cache configurable. (#23783) (#23812) Added command-line argument tpu-use-quic argument. Changed connection cache to return different connections based on the config. (cherry picked from commit ae76fe2bd74ab0b7ef579486f8034380ccdc7df2) Co-authored-by: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> --- banks-server/src/banks_server.rs | 3 +- client/src/connection_cache.rs | 30 ++++++++++++++----- rpc/src/rpc.rs | 5 +++- .../src/send_transaction_service.rs | 7 +++++ validator/src/main.rs | 13 ++++++++ 5 files changed, 49 insertions(+), 9 deletions(-) diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index bb98125672..8664b7e61e 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -24,7 +24,7 @@ use { transaction::{self, SanitizedTransaction, Transaction}, }, solana_send_transaction_service::{ - send_transaction_service::{SendTransactionService, TransactionInfo}, + send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC}, tpu_info::NullTpuInfo, }, std::{ @@ -399,6 +399,7 @@ pub async fn start_tcp_server( receiver, 5_000, 0, + DEFAULT_TPU_USE_QUIC, ); let server = BanksServer::new( diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index cee5642306..acc00f176e 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -1,5 +1,7 @@ use { - crate::{tpu_connection::TpuConnection, udp_client::UdpTpuConnection}, + crate::{ + quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection, + }, lazy_static::lazy_static, std::{ collections::{hash_map::Entry, BTreeMap, HashMap}, @@ -23,6 +25,7 @@ struct ConnMap { // that seems non-"Rust-y" and low bang/buck. This is still pretty terrible though... last_used_times: BTreeMap, ticks: u64, + use_quic: bool, } impl ConnMap { @@ -31,21 +34,31 @@ impl ConnMap { map: HashMap::new(), last_used_times: BTreeMap::new(), ticks: 0, + use_quic: false, } } + + pub fn set_use_quic(&mut self, use_quic: bool) { + self.use_quic = use_quic; + } } lazy_static! { static ref CONNECTION_MAP: Mutex = Mutex::new(ConnMap::new()); } +pub fn set_use_quic(use_quic: bool) { + let mut map = (*CONNECTION_MAP).lock().unwrap(); + map.set_use_quic(use_quic); +} + #[allow(dead_code)] // TODO: see https://github.com/solana-labs/solana/issues/23661 // remove lazy_static and optimize and refactor this pub fn get_connection(addr: &SocketAddr) -> Arc { let mut map = (*CONNECTION_MAP).lock().unwrap(); let ticks = map.ticks; - + let use_quic = map.use_quic; let (conn, target_ticks) = match map.map.entry(*addr) { Entry::Occupied(mut entry) => { let mut pair = entry.get_mut(); @@ -57,12 +70,15 @@ pub fn get_connection(addr: &SocketAddr) -> Arc = if use_quic { + Arc::new(QuicTpuConnection::new(send_socket, *addr)) + } else { + Arc::new(UdpTpuConnection::new(send_socket, *addr)) + }; + entry.insert((conn.clone(), ticks)); - ( - conn as Arc, - ticks, - ) + (conn, ticks) } }; diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 71e91329ce..9d72a3a48c 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -75,7 +75,7 @@ use { }, }, solana_send_transaction_service::{ - send_transaction_service::{SendTransactionService, TransactionInfo}, + send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC}, tpu_info::NullTpuInfo, }, solana_storage_bigtable::Error as StorageError, @@ -323,6 +323,7 @@ impl JsonRpcRequestProcessor { receiver, 1000, 1, + DEFAULT_TPU_USE_QUIC, ); Self { @@ -6087,6 +6088,7 @@ pub mod tests { receiver, 1000, 1, + DEFAULT_TPU_USE_QUIC, ); let mut bad_transaction = system_transaction::transfer( @@ -6352,6 +6354,7 @@ pub mod tests { receiver, 1000, 1, + DEFAULT_TPU_USE_QUIC, ); assert_eq!( request_processor.get_block_commitment(0), diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 2ac8b617a0..36ad86e8b1 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -65,12 +65,15 @@ struct ProcessTransactionsResult { retained: u64, } +pub const DEFAULT_TPU_USE_QUIC: bool = false; + #[derive(Clone, Debug)] pub struct Config { pub retry_rate_ms: u64, pub leader_forward_count: u64, pub default_max_retries: Option, pub service_max_retries: usize, + pub use_quic: bool, } impl Default for Config { @@ -80,6 +83,7 @@ impl Default for Config { leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT, default_max_retries: None, service_max_retries: DEFAULT_SERVICE_MAX_RETRIES, + use_quic: DEFAULT_TPU_USE_QUIC, } } } @@ -92,10 +96,12 @@ impl SendTransactionService { receiver: Receiver, retry_rate_ms: u64, leader_forward_count: u64, + use_quic: bool, ) -> Self { let config = Config { retry_rate_ms, leader_forward_count, + use_quic, ..Config::default() }; Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config) @@ -352,6 +358,7 @@ mod test { receiver, 1000, 1, + DEFAULT_TPU_USE_QUIC, ); drop(sender); diff --git a/validator/src/main.rs b/validator/src/main.rs index c4a0e66639..d019c51f6a 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -9,6 +9,7 @@ use { console::style, log::*, rand::{seq::SliceRandom, thread_rng}, + send_transaction_service::DEFAULT_TPU_USE_QUIC, solana_clap_utils::{ input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of}, input_validators::{ @@ -456,6 +457,7 @@ pub fn main() { let default_accounts_shrink_ratio = &DEFAULT_ACCOUNTS_SHRINK_RATIO.to_string(); let default_rocksdb_fifo_shred_storage_size = &DEFAULT_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES.to_string(); + let default_tpu_use_quic = &DEFAULT_TPU_USE_QUIC.to_string(); let matches = App::new(crate_name!()).about(crate_description!()) .version(solana_version::version!()) @@ -1144,6 +1146,14 @@ pub fn main() { .validator(is_parsable::) .help("Milliseconds to wait in the TPU receiver for packet coalescing."), ) + .arg( + Arg::with_name("tpu_use_quic") + .long("tpu-use-quic") + .takes_value(true) + .value_name("BOOLEAN") + .default_value(default_tpu_use_quic) + .help("When this is set to true, the system will use QUIC to send transactions."), + ) .arg( Arg::with_name("rocksdb_max_compaction_jitter") .long("rocksdb-max-compaction-jitter-slots") @@ -2095,6 +2105,8 @@ pub fn main() { let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode"); let accounts_shrink_optimize_total_space = value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool); + let tpu_use_quic = value_t_or_exit!(matches, "tpu_use_quic", bool); + let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64); if !(0.0..=1.0).contains(&shrink_ratio) { eprintln!( @@ -2366,6 +2378,7 @@ pub fn main() { "rpc_send_transaction_service_max_retries", usize ), + use_quic: tpu_use_quic, }, no_poh_speed_test: matches.is_present("no_poh_speed_test"), no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),