Made connection cache configurable. (#23783)
Added command-line argument tpu-use-quic argument. Changed connection cache to return different connections based on the config.
This commit is contained in:
		| @@ -24,7 +24,7 @@ use { | |||||||
|         transaction::{self, SanitizedTransaction, Transaction}, |         transaction::{self, SanitizedTransaction, Transaction}, | ||||||
|     }, |     }, | ||||||
|     solana_send_transaction_service::{ |     solana_send_transaction_service::{ | ||||||
|         send_transaction_service::{SendTransactionService, TransactionInfo}, |         send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC}, | ||||||
|         tpu_info::NullTpuInfo, |         tpu_info::NullTpuInfo, | ||||||
|     }, |     }, | ||||||
|     std::{ |     std::{ | ||||||
| @@ -399,6 +399,7 @@ pub async fn start_tcp_server( | |||||||
|                 receiver, |                 receiver, | ||||||
|                 5_000, |                 5_000, | ||||||
|                 0, |                 0, | ||||||
|  |                 DEFAULT_TPU_USE_QUIC, | ||||||
|             ); |             ); | ||||||
|  |  | ||||||
|             let server = BanksServer::new( |             let server = BanksServer::new( | ||||||
|   | |||||||
| @@ -1,5 +1,7 @@ | |||||||
| use { | use { | ||||||
|     crate::{tpu_connection::TpuConnection, udp_client::UdpTpuConnection}, |     crate::{ | ||||||
|  |         quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection, | ||||||
|  |     }, | ||||||
|     lazy_static::lazy_static, |     lazy_static::lazy_static, | ||||||
|     std::{ |     std::{ | ||||||
|         collections::{hash_map::Entry, BTreeMap, HashMap}, |         collections::{hash_map::Entry, BTreeMap, HashMap}, | ||||||
| @@ -23,6 +25,7 @@ struct ConnMap { | |||||||
|     // that seems non-"Rust-y" and low bang/buck. This is still pretty terrible though... |     // that seems non-"Rust-y" and low bang/buck. This is still pretty terrible though... | ||||||
|     last_used_times: BTreeMap<u64, SocketAddr>, |     last_used_times: BTreeMap<u64, SocketAddr>, | ||||||
|     ticks: u64, |     ticks: u64, | ||||||
|  |     use_quic: bool, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl ConnMap { | impl ConnMap { | ||||||
| @@ -31,21 +34,31 @@ impl ConnMap { | |||||||
|             map: HashMap::new(), |             map: HashMap::new(), | ||||||
|             last_used_times: BTreeMap::new(), |             last_used_times: BTreeMap::new(), | ||||||
|             ticks: 0, |             ticks: 0, | ||||||
|  |             use_quic: false, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn set_use_quic(&mut self, use_quic: bool) { | ||||||
|  |         self.use_quic = use_quic; | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| lazy_static! { | lazy_static! { | ||||||
|     static ref CONNECTION_MAP: Mutex<ConnMap> = Mutex::new(ConnMap::new()); |     static ref CONNECTION_MAP: Mutex<ConnMap> = Mutex::new(ConnMap::new()); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | pub fn set_use_quic(use_quic: bool) { | ||||||
|  |     let mut map = (*CONNECTION_MAP).lock().unwrap(); | ||||||
|  |     map.set_use_quic(use_quic); | ||||||
|  | } | ||||||
|  |  | ||||||
| #[allow(dead_code)] | #[allow(dead_code)] | ||||||
| // TODO: see https://github.com/solana-labs/solana/issues/23661 | // TODO: see https://github.com/solana-labs/solana/issues/23661 | ||||||
| // remove lazy_static and optimize and refactor this | // remove lazy_static and optimize and refactor this | ||||||
| pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sync + Send> { | pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sync + Send> { | ||||||
|     let mut map = (*CONNECTION_MAP).lock().unwrap(); |     let mut map = (*CONNECTION_MAP).lock().unwrap(); | ||||||
|     let ticks = map.ticks; |     let ticks = map.ticks; | ||||||
|  |     let use_quic = map.use_quic; | ||||||
|     let (conn, target_ticks) = match map.map.entry(*addr) { |     let (conn, target_ticks) = match map.map.entry(*addr) { | ||||||
|         Entry::Occupied(mut entry) => { |         Entry::Occupied(mut entry) => { | ||||||
|             let mut pair = entry.get_mut(); |             let mut pair = entry.get_mut(); | ||||||
| @@ -57,12 +70,15 @@ pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sy | |||||||
|             let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); |             let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); | ||||||
|             // TODO: see https://github.com/solana-labs/solana/issues/23659 |             // TODO: see https://github.com/solana-labs/solana/issues/23659 | ||||||
|             // make it configurable (e.g. via the command line) whether to use UDP or Quic |             // make it configurable (e.g. via the command line) whether to use UDP or Quic | ||||||
|             let conn = Arc::new(UdpTpuConnection::new(send_socket, *addr)); |  | ||||||
|  |             let conn: Arc<dyn TpuConnection + 'static + Sync + Send> = if use_quic { | ||||||
|  |                 Arc::new(QuicTpuConnection::new(send_socket, *addr)) | ||||||
|  |             } else { | ||||||
|  |                 Arc::new(UdpTpuConnection::new(send_socket, *addr)) | ||||||
|  |             }; | ||||||
|  |  | ||||||
|             entry.insert((conn.clone(), ticks)); |             entry.insert((conn.clone(), ticks)); | ||||||
|             ( |             (conn, ticks) | ||||||
|                 conn as Arc<dyn TpuConnection + 'static + Sync + Send>, |  | ||||||
|                 ticks, |  | ||||||
|             ) |  | ||||||
|         } |         } | ||||||
|     }; |     }; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -75,7 +75,7 @@ use { | |||||||
|         }, |         }, | ||||||
|     }, |     }, | ||||||
|     solana_send_transaction_service::{ |     solana_send_transaction_service::{ | ||||||
|         send_transaction_service::{SendTransactionService, TransactionInfo}, |         send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC}, | ||||||
|         tpu_info::NullTpuInfo, |         tpu_info::NullTpuInfo, | ||||||
|     }, |     }, | ||||||
|     solana_storage_bigtable::Error as StorageError, |     solana_storage_bigtable::Error as StorageError, | ||||||
| @@ -323,6 +323,7 @@ impl JsonRpcRequestProcessor { | |||||||
|             receiver, |             receiver, | ||||||
|             1000, |             1000, | ||||||
|             1, |             1, | ||||||
|  |             DEFAULT_TPU_USE_QUIC, | ||||||
|         ); |         ); | ||||||
|  |  | ||||||
|         Self { |         Self { | ||||||
| @@ -6087,6 +6088,7 @@ pub mod tests { | |||||||
|             receiver, |             receiver, | ||||||
|             1000, |             1000, | ||||||
|             1, |             1, | ||||||
|  |             DEFAULT_TPU_USE_QUIC, | ||||||
|         ); |         ); | ||||||
|  |  | ||||||
|         let mut bad_transaction = system_transaction::transfer( |         let mut bad_transaction = system_transaction::transfer( | ||||||
| @@ -6352,6 +6354,7 @@ pub mod tests { | |||||||
|             receiver, |             receiver, | ||||||
|             1000, |             1000, | ||||||
|             1, |             1, | ||||||
|  |             DEFAULT_TPU_USE_QUIC, | ||||||
|         ); |         ); | ||||||
|         assert_eq!( |         assert_eq!( | ||||||
|             request_processor.get_block_commitment(0), |             request_processor.get_block_commitment(0), | ||||||
|   | |||||||
| @@ -65,12 +65,15 @@ struct ProcessTransactionsResult { | |||||||
|     retained: u64, |     retained: u64, | ||||||
| } | } | ||||||
|  |  | ||||||
|  | pub const DEFAULT_TPU_USE_QUIC: bool = false; | ||||||
|  |  | ||||||
| #[derive(Clone, Debug)] | #[derive(Clone, Debug)] | ||||||
| pub struct Config { | pub struct Config { | ||||||
|     pub retry_rate_ms: u64, |     pub retry_rate_ms: u64, | ||||||
|     pub leader_forward_count: u64, |     pub leader_forward_count: u64, | ||||||
|     pub default_max_retries: Option<usize>, |     pub default_max_retries: Option<usize>, | ||||||
|     pub service_max_retries: usize, |     pub service_max_retries: usize, | ||||||
|  |     pub use_quic: bool, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Default for Config { | impl Default for Config { | ||||||
| @@ -80,6 +83,7 @@ impl Default for Config { | |||||||
|             leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT, |             leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT, | ||||||
|             default_max_retries: None, |             default_max_retries: None, | ||||||
|             service_max_retries: DEFAULT_SERVICE_MAX_RETRIES, |             service_max_retries: DEFAULT_SERVICE_MAX_RETRIES, | ||||||
|  |             use_quic: DEFAULT_TPU_USE_QUIC, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -92,10 +96,12 @@ impl SendTransactionService { | |||||||
|         receiver: Receiver<TransactionInfo>, |         receiver: Receiver<TransactionInfo>, | ||||||
|         retry_rate_ms: u64, |         retry_rate_ms: u64, | ||||||
|         leader_forward_count: u64, |         leader_forward_count: u64, | ||||||
|  |         use_quic: bool, | ||||||
|     ) -> Self { |     ) -> Self { | ||||||
|         let config = Config { |         let config = Config { | ||||||
|             retry_rate_ms, |             retry_rate_ms, | ||||||
|             leader_forward_count, |             leader_forward_count, | ||||||
|  |             use_quic, | ||||||
|             ..Config::default() |             ..Config::default() | ||||||
|         }; |         }; | ||||||
|         Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config) |         Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config) | ||||||
| @@ -352,6 +358,7 @@ mod test { | |||||||
|             receiver, |             receiver, | ||||||
|             1000, |             1000, | ||||||
|             1, |             1, | ||||||
|  |             DEFAULT_TPU_USE_QUIC, | ||||||
|         ); |         ); | ||||||
|  |  | ||||||
|         drop(sender); |         drop(sender); | ||||||
|   | |||||||
| @@ -9,6 +9,7 @@ use { | |||||||
|     console::style, |     console::style, | ||||||
|     log::*, |     log::*, | ||||||
|     rand::{seq::SliceRandom, thread_rng}, |     rand::{seq::SliceRandom, thread_rng}, | ||||||
|  |     send_transaction_service::DEFAULT_TPU_USE_QUIC, | ||||||
|     solana_clap_utils::{ |     solana_clap_utils::{ | ||||||
|         input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of}, |         input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of}, | ||||||
|         input_validators::{ |         input_validators::{ | ||||||
| @@ -456,6 +457,7 @@ pub fn main() { | |||||||
|     let default_accounts_shrink_ratio = &DEFAULT_ACCOUNTS_SHRINK_RATIO.to_string(); |     let default_accounts_shrink_ratio = &DEFAULT_ACCOUNTS_SHRINK_RATIO.to_string(); | ||||||
|     let default_rocksdb_fifo_shred_storage_size = |     let default_rocksdb_fifo_shred_storage_size = | ||||||
|         &DEFAULT_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES.to_string(); |         &DEFAULT_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES.to_string(); | ||||||
|  |     let default_tpu_use_quic = &DEFAULT_TPU_USE_QUIC.to_string(); | ||||||
|  |  | ||||||
|     let matches = App::new(crate_name!()).about(crate_description!()) |     let matches = App::new(crate_name!()).about(crate_description!()) | ||||||
|         .version(solana_version::version!()) |         .version(solana_version::version!()) | ||||||
| @@ -1144,6 +1146,14 @@ pub fn main() { | |||||||
|                 .validator(is_parsable::<u64>) |                 .validator(is_parsable::<u64>) | ||||||
|                 .help("Milliseconds to wait in the TPU receiver for packet coalescing."), |                 .help("Milliseconds to wait in the TPU receiver for packet coalescing."), | ||||||
|         ) |         ) | ||||||
|  |         .arg( | ||||||
|  |             Arg::with_name("tpu_use_quic") | ||||||
|  |                 .long("tpu-use-quic") | ||||||
|  |                 .takes_value(true) | ||||||
|  |                 .value_name("BOOLEAN") | ||||||
|  |                 .default_value(default_tpu_use_quic) | ||||||
|  |                 .help("When this is set to true, the system will use QUIC to send transactions."), | ||||||
|  |         ) | ||||||
|         .arg( |         .arg( | ||||||
|             Arg::with_name("rocksdb_max_compaction_jitter") |             Arg::with_name("rocksdb_max_compaction_jitter") | ||||||
|                 .long("rocksdb-max-compaction-jitter-slots") |                 .long("rocksdb-max-compaction-jitter-slots") | ||||||
| @@ -2095,6 +2105,8 @@ pub fn main() { | |||||||
|     let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode"); |     let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode"); | ||||||
|     let accounts_shrink_optimize_total_space = |     let accounts_shrink_optimize_total_space = | ||||||
|         value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool); |         value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool); | ||||||
|  |     let tpu_use_quic = value_t_or_exit!(matches, "tpu_use_quic", bool); | ||||||
|  |  | ||||||
|     let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64); |     let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64); | ||||||
|     if !(0.0..=1.0).contains(&shrink_ratio) { |     if !(0.0..=1.0).contains(&shrink_ratio) { | ||||||
|         eprintln!( |         eprintln!( | ||||||
| @@ -2366,6 +2378,7 @@ pub fn main() { | |||||||
|                 "rpc_send_transaction_service_max_retries", |                 "rpc_send_transaction_service_max_retries", | ||||||
|                 usize |                 usize | ||||||
|             ), |             ), | ||||||
|  |             use_quic: tpu_use_quic, | ||||||
|         }, |         }, | ||||||
|         no_poh_speed_test: matches.is_present("no_poh_speed_test"), |         no_poh_speed_test: matches.is_present("no_poh_speed_test"), | ||||||
|         no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"), |         no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"), | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user