Added command-line argument tpu-use-quic argument.
Changed connection cache to return different connections based on the config.
(cherry picked from commit ae76fe2bd7
)
Co-authored-by: Lijun Wang <83639177+lijunwangs@users.noreply.github.com>
This commit is contained in:
@ -24,7 +24,7 @@ use {
|
|||||||
transaction::{self, SanitizedTransaction, Transaction},
|
transaction::{self, SanitizedTransaction, Transaction},
|
||||||
},
|
},
|
||||||
solana_send_transaction_service::{
|
solana_send_transaction_service::{
|
||||||
send_transaction_service::{SendTransactionService, TransactionInfo},
|
send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC},
|
||||||
tpu_info::NullTpuInfo,
|
tpu_info::NullTpuInfo,
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
@ -399,6 +399,7 @@ pub async fn start_tcp_server(
|
|||||||
receiver,
|
receiver,
|
||||||
5_000,
|
5_000,
|
||||||
0,
|
0,
|
||||||
|
DEFAULT_TPU_USE_QUIC,
|
||||||
);
|
);
|
||||||
|
|
||||||
let server = BanksServer::new(
|
let server = BanksServer::new(
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
use {
|
use {
|
||||||
crate::{tpu_connection::TpuConnection, udp_client::UdpTpuConnection},
|
crate::{
|
||||||
|
quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection,
|
||||||
|
},
|
||||||
lazy_static::lazy_static,
|
lazy_static::lazy_static,
|
||||||
std::{
|
std::{
|
||||||
collections::{hash_map::Entry, BTreeMap, HashMap},
|
collections::{hash_map::Entry, BTreeMap, HashMap},
|
||||||
@ -23,6 +25,7 @@ struct ConnMap {
|
|||||||
// that seems non-"Rust-y" and low bang/buck. This is still pretty terrible though...
|
// that seems non-"Rust-y" and low bang/buck. This is still pretty terrible though...
|
||||||
last_used_times: BTreeMap<u64, SocketAddr>,
|
last_used_times: BTreeMap<u64, SocketAddr>,
|
||||||
ticks: u64,
|
ticks: u64,
|
||||||
|
use_quic: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnMap {
|
impl ConnMap {
|
||||||
@ -31,21 +34,31 @@ impl ConnMap {
|
|||||||
map: HashMap::new(),
|
map: HashMap::new(),
|
||||||
last_used_times: BTreeMap::new(),
|
last_used_times: BTreeMap::new(),
|
||||||
ticks: 0,
|
ticks: 0,
|
||||||
|
use_quic: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_use_quic(&mut self, use_quic: bool) {
|
||||||
|
self.use_quic = use_quic;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref CONNECTION_MAP: Mutex<ConnMap> = Mutex::new(ConnMap::new());
|
static ref CONNECTION_MAP: Mutex<ConnMap> = Mutex::new(ConnMap::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_use_quic(use_quic: bool) {
|
||||||
|
let mut map = (*CONNECTION_MAP).lock().unwrap();
|
||||||
|
map.set_use_quic(use_quic);
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
// TODO: see https://github.com/solana-labs/solana/issues/23661
|
// TODO: see https://github.com/solana-labs/solana/issues/23661
|
||||||
// remove lazy_static and optimize and refactor this
|
// remove lazy_static and optimize and refactor this
|
||||||
pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sync + Send> {
|
pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sync + Send> {
|
||||||
let mut map = (*CONNECTION_MAP).lock().unwrap();
|
let mut map = (*CONNECTION_MAP).lock().unwrap();
|
||||||
let ticks = map.ticks;
|
let ticks = map.ticks;
|
||||||
|
let use_quic = map.use_quic;
|
||||||
let (conn, target_ticks) = match map.map.entry(*addr) {
|
let (conn, target_ticks) = match map.map.entry(*addr) {
|
||||||
Entry::Occupied(mut entry) => {
|
Entry::Occupied(mut entry) => {
|
||||||
let mut pair = entry.get_mut();
|
let mut pair = entry.get_mut();
|
||||||
@ -57,12 +70,15 @@ pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sy
|
|||||||
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
// TODO: see https://github.com/solana-labs/solana/issues/23659
|
// TODO: see https://github.com/solana-labs/solana/issues/23659
|
||||||
// make it configurable (e.g. via the command line) whether to use UDP or Quic
|
// make it configurable (e.g. via the command line) whether to use UDP or Quic
|
||||||
let conn = Arc::new(UdpTpuConnection::new(send_socket, *addr));
|
|
||||||
|
let conn: Arc<dyn TpuConnection + 'static + Sync + Send> = if use_quic {
|
||||||
|
Arc::new(QuicTpuConnection::new(send_socket, *addr))
|
||||||
|
} else {
|
||||||
|
Arc::new(UdpTpuConnection::new(send_socket, *addr))
|
||||||
|
};
|
||||||
|
|
||||||
entry.insert((conn.clone(), ticks));
|
entry.insert((conn.clone(), ticks));
|
||||||
(
|
(conn, ticks)
|
||||||
conn as Arc<dyn TpuConnection + 'static + Sync + Send>,
|
|
||||||
ticks,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -75,7 +75,7 @@ use {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
solana_send_transaction_service::{
|
solana_send_transaction_service::{
|
||||||
send_transaction_service::{SendTransactionService, TransactionInfo},
|
send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC},
|
||||||
tpu_info::NullTpuInfo,
|
tpu_info::NullTpuInfo,
|
||||||
},
|
},
|
||||||
solana_storage_bigtable::Error as StorageError,
|
solana_storage_bigtable::Error as StorageError,
|
||||||
@ -323,6 +323,7 @@ impl JsonRpcRequestProcessor {
|
|||||||
receiver,
|
receiver,
|
||||||
1000,
|
1000,
|
||||||
1,
|
1,
|
||||||
|
DEFAULT_TPU_USE_QUIC,
|
||||||
);
|
);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
@ -6087,6 +6088,7 @@ pub mod tests {
|
|||||||
receiver,
|
receiver,
|
||||||
1000,
|
1000,
|
||||||
1,
|
1,
|
||||||
|
DEFAULT_TPU_USE_QUIC,
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut bad_transaction = system_transaction::transfer(
|
let mut bad_transaction = system_transaction::transfer(
|
||||||
@ -6352,6 +6354,7 @@ pub mod tests {
|
|||||||
receiver,
|
receiver,
|
||||||
1000,
|
1000,
|
||||||
1,
|
1,
|
||||||
|
DEFAULT_TPU_USE_QUIC,
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
request_processor.get_block_commitment(0),
|
request_processor.get_block_commitment(0),
|
||||||
|
@ -65,12 +65,15 @@ struct ProcessTransactionsResult {
|
|||||||
retained: u64,
|
retained: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub const DEFAULT_TPU_USE_QUIC: bool = false;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub retry_rate_ms: u64,
|
pub retry_rate_ms: u64,
|
||||||
pub leader_forward_count: u64,
|
pub leader_forward_count: u64,
|
||||||
pub default_max_retries: Option<usize>,
|
pub default_max_retries: Option<usize>,
|
||||||
pub service_max_retries: usize,
|
pub service_max_retries: usize,
|
||||||
|
pub use_quic: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
@ -80,6 +83,7 @@ impl Default for Config {
|
|||||||
leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT,
|
leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT,
|
||||||
default_max_retries: None,
|
default_max_retries: None,
|
||||||
service_max_retries: DEFAULT_SERVICE_MAX_RETRIES,
|
service_max_retries: DEFAULT_SERVICE_MAX_RETRIES,
|
||||||
|
use_quic: DEFAULT_TPU_USE_QUIC,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -92,10 +96,12 @@ impl SendTransactionService {
|
|||||||
receiver: Receiver<TransactionInfo>,
|
receiver: Receiver<TransactionInfo>,
|
||||||
retry_rate_ms: u64,
|
retry_rate_ms: u64,
|
||||||
leader_forward_count: u64,
|
leader_forward_count: u64,
|
||||||
|
use_quic: bool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let config = Config {
|
let config = Config {
|
||||||
retry_rate_ms,
|
retry_rate_ms,
|
||||||
leader_forward_count,
|
leader_forward_count,
|
||||||
|
use_quic,
|
||||||
..Config::default()
|
..Config::default()
|
||||||
};
|
};
|
||||||
Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config)
|
Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config)
|
||||||
@ -352,6 +358,7 @@ mod test {
|
|||||||
receiver,
|
receiver,
|
||||||
1000,
|
1000,
|
||||||
1,
|
1,
|
||||||
|
DEFAULT_TPU_USE_QUIC,
|
||||||
);
|
);
|
||||||
|
|
||||||
drop(sender);
|
drop(sender);
|
||||||
|
@ -9,6 +9,7 @@ use {
|
|||||||
console::style,
|
console::style,
|
||||||
log::*,
|
log::*,
|
||||||
rand::{seq::SliceRandom, thread_rng},
|
rand::{seq::SliceRandom, thread_rng},
|
||||||
|
send_transaction_service::DEFAULT_TPU_USE_QUIC,
|
||||||
solana_clap_utils::{
|
solana_clap_utils::{
|
||||||
input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of},
|
input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of},
|
||||||
input_validators::{
|
input_validators::{
|
||||||
@ -456,6 +457,7 @@ pub fn main() {
|
|||||||
let default_accounts_shrink_ratio = &DEFAULT_ACCOUNTS_SHRINK_RATIO.to_string();
|
let default_accounts_shrink_ratio = &DEFAULT_ACCOUNTS_SHRINK_RATIO.to_string();
|
||||||
let default_rocksdb_fifo_shred_storage_size =
|
let default_rocksdb_fifo_shred_storage_size =
|
||||||
&DEFAULT_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES.to_string();
|
&DEFAULT_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES.to_string();
|
||||||
|
let default_tpu_use_quic = &DEFAULT_TPU_USE_QUIC.to_string();
|
||||||
|
|
||||||
let matches = App::new(crate_name!()).about(crate_description!())
|
let matches = App::new(crate_name!()).about(crate_description!())
|
||||||
.version(solana_version::version!())
|
.version(solana_version::version!())
|
||||||
@ -1144,6 +1146,14 @@ pub fn main() {
|
|||||||
.validator(is_parsable::<u64>)
|
.validator(is_parsable::<u64>)
|
||||||
.help("Milliseconds to wait in the TPU receiver for packet coalescing."),
|
.help("Milliseconds to wait in the TPU receiver for packet coalescing."),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("tpu_use_quic")
|
||||||
|
.long("tpu-use-quic")
|
||||||
|
.takes_value(true)
|
||||||
|
.value_name("BOOLEAN")
|
||||||
|
.default_value(default_tpu_use_quic)
|
||||||
|
.help("When this is set to true, the system will use QUIC to send transactions."),
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("rocksdb_max_compaction_jitter")
|
Arg::with_name("rocksdb_max_compaction_jitter")
|
||||||
.long("rocksdb-max-compaction-jitter-slots")
|
.long("rocksdb-max-compaction-jitter-slots")
|
||||||
@ -2095,6 +2105,8 @@ pub fn main() {
|
|||||||
let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
|
let restricted_repair_only_mode = matches.is_present("restricted_repair_only_mode");
|
||||||
let accounts_shrink_optimize_total_space =
|
let accounts_shrink_optimize_total_space =
|
||||||
value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
|
value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
|
||||||
|
let tpu_use_quic = value_t_or_exit!(matches, "tpu_use_quic", bool);
|
||||||
|
|
||||||
let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
|
let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
|
||||||
if !(0.0..=1.0).contains(&shrink_ratio) {
|
if !(0.0..=1.0).contains(&shrink_ratio) {
|
||||||
eprintln!(
|
eprintln!(
|
||||||
@ -2366,6 +2378,7 @@ pub fn main() {
|
|||||||
"rpc_send_transaction_service_max_retries",
|
"rpc_send_transaction_service_max_retries",
|
||||||
usize
|
usize
|
||||||
),
|
),
|
||||||
|
use_quic: tpu_use_quic,
|
||||||
},
|
},
|
||||||
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
|
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
|
||||||
no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
|
no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
|
||||||
|
Reference in New Issue
Block a user