Support quic in bench-tps (#24295)

* Update comment

* Use connection_cache in tpu_client

* Add --tpu-use-quic to bench-tps

* Use connection_cache async send
This commit is contained in:
Tyera Eulberg
2022-04-13 14:17:10 -04:00
committed by GitHub
parent 2e0bc89ec4
commit 26899359d1
4 changed files with 34 additions and 17 deletions

View File

@ -49,6 +49,7 @@ pub struct Config {
pub target_slots_per_epoch: u64, pub target_slots_per_epoch: u64,
pub target_node: Option<Pubkey>, pub target_node: Option<Pubkey>,
pub external_client_type: ExternalClientType, pub external_client_type: ExternalClientType,
pub use_quic: bool,
} }
impl Default for Config { impl Default for Config {
@ -74,6 +75,7 @@ impl Default for Config {
target_slots_per_epoch: 0, target_slots_per_epoch: 0,
target_node: None, target_node: None,
external_client_type: ExternalClientType::default(), external_client_type: ExternalClientType::default(),
use_quic: false,
} }
} }
} }
@ -254,6 +256,13 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
.takes_value(false) .takes_value(false)
.help("Submit transactions with a TpuClient") .help("Submit transactions with a TpuClient")
) )
.arg(
Arg::with_name("tpu_use_quic")
.long("tpu-use-quic")
.takes_value(false)
.help("Submit transactions via QUIC; only affects ThinClient (default) \
or TpuClient sends"),
)
} }
/// Parses a clap `ArgMatches` structure into a `Config` /// Parses a clap `ArgMatches` structure into a `Config`
@ -293,6 +302,10 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
args.external_client_type = ExternalClientType::TpuClient; args.external_client_type = ExternalClientType::TpuClient;
} }
if matches.is_present("tpu_use_quic") {
args.use_quic = true;
}
if let Some(addr) = matches.value_of("entrypoint") { if let Some(addr) = matches.value_of("entrypoint") {
args.entrypoint_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| { args.entrypoint_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| {
eprintln!("failed to parse entrypoint address: {}", e); eprintln!("failed to parse entrypoint address: {}", e);

View File

@ -7,6 +7,7 @@ use {
keypairs::get_keypairs, keypairs::get_keypairs,
}, },
solana_client::{ solana_client::{
connection_cache,
rpc_client::RpcClient, rpc_client::RpcClient,
tpu_client::{TpuClient, TpuClientConfig}, tpu_client::{TpuClient, TpuClientConfig},
}, },
@ -45,6 +46,7 @@ fn main() {
num_lamports_per_account, num_lamports_per_account,
target_node, target_node,
external_client_type, external_client_type,
use_quic,
.. ..
} = &cli_config; } = &cli_config;
@ -88,6 +90,9 @@ fn main() {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
exit(1); exit(1);
}); });
if *use_quic {
connection_cache::set_use_quic(true);
}
let client = if *multi_client { let client = if *multi_client {
let (client, num_clients) = get_multi_client(&nodes, &SocketAddrSpace::Unspecified); let (client, num_clients) = get_multi_client(&nodes, &SocketAddrSpace::Unspecified);
if nodes.len() < num_clients { if nodes.len() < num_clients {
@ -130,6 +135,9 @@ fn main() {
json_rpc_url.to_string(), json_rpc_url.to_string(),
CommitmentConfig::confirmed(), CommitmentConfig::confirmed(),
)); ));
if *use_quic {
connection_cache::set_use_quic(true);
}
let client = Arc::new( let client = Arc::new(
TpuClient::new(rpc_client, websocket_url, TpuClientConfig::default()) TpuClient::new(rpc_client, websocket_url, TpuClientConfig::default())
.unwrap_or_else(|err| { .unwrap_or_else(|err| {

View File

@ -1,6 +1,7 @@
use { use {
crate::{ crate::{
client_error::ClientError, client_error::ClientError,
connection_cache::send_wire_transaction_async,
pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription}, pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
rpc_client::RpcClient, rpc_client::RpcClient,
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
@ -17,6 +18,7 @@ use {
signature::SignerError, signature::SignerError,
signers::Signers, signers::Signers,
transaction::{Transaction, TransactionError}, transaction::{Transaction, TransactionError},
transport::{Result as TransportResult, TransportError},
}, },
std::{ std::{
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
@ -73,7 +75,7 @@ impl Default for TpuClientConfig {
/// Client which sends transactions directly to the current leader's TPU port over UDP. /// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info /// The client uses RPC to determine the current leader and fetch node contact info
pub struct TpuClient { pub struct TpuClient {
send_socket: UdpSocket, _deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket
fanout_slots: u64, fanout_slots: u64,
leader_tpu_service: LeaderTpuService, leader_tpu_service: LeaderTpuService,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
@ -85,39 +87,33 @@ impl TpuClient {
/// size /// size
pub fn send_transaction(&self, transaction: &Transaction) -> bool { pub fn send_transaction(&self, transaction: &Transaction) -> bool {
let wire_transaction = serialize(transaction).expect("serialization should succeed"); let wire_transaction = serialize(transaction).expect("serialization should succeed");
self.send_wire_transaction(&wire_transaction) self.send_wire_transaction(wire_transaction)
} }
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool { pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
self.try_send_wire_transaction(wire_transaction).is_ok() self.try_send_wire_transaction(wire_transaction).is_ok()
} }
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size /// size
/// Returns the last error if all sends fail /// Returns the last error if all sends fail
pub fn try_send_transaction( pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
&self,
transaction: &Transaction,
) -> std::result::Result<(), std::io::Error> {
let wire_transaction = serialize(transaction).expect("serialization should succeed"); let wire_transaction = serialize(transaction).expect("serialization should succeed");
self.try_send_wire_transaction(&wire_transaction) self.try_send_wire_transaction(wire_transaction)
} }
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
/// Returns the last error if all sends fail /// Returns the last error if all sends fail
fn try_send_wire_transaction( fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
&self, let mut last_error: Option<TransportError> = None;
wire_transaction: &[u8],
) -> std::result::Result<(), std::io::Error> {
let mut last_error: Option<std::io::Error> = None;
let mut some_success = false; let mut some_success = false;
for tpu_address in self for tpu_address in self
.leader_tpu_service .leader_tpu_service
.leader_tpu_sockets(self.fanout_slots) .leader_tpu_sockets(self.fanout_slots)
{ {
let result = self.send_socket.send_to(wire_transaction, tpu_address); let result = send_wire_transaction_async(wire_transaction.clone(), &tpu_address);
if let Err(err) = result { if let Err(err) = result {
last_error = Some(err); last_error = Some(err);
} else { } else {
@ -128,7 +124,7 @@ impl TpuClient {
Err(if let Some(err) = last_error { Err(if let Some(err) = last_error {
err err
} else { } else {
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted") std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
}) })
} else { } else {
Ok(()) Ok(())
@ -146,7 +142,7 @@ impl TpuClient {
LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?; LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?;
Ok(Self { Ok(Self {
send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(), _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1), fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
leader_tpu_service, leader_tpu_service,
exit, exit,

View File

@ -1171,7 +1171,7 @@ pub fn main() {
Arg::with_name("tpu_use_quic") Arg::with_name("tpu_use_quic")
.long("tpu-use-quic") .long("tpu-use-quic")
.takes_value(false) .takes_value(false)
.help("When this is set to true, the system will use QUIC to send transactions."), .help("Use QUIC to send transactions."),
) )
.arg( .arg(
Arg::with_name("rocksdb_max_compaction_jitter") Arg::with_name("rocksdb_max_compaction_jitter")