* Update comment
* Use connection_cache in tpu_client
* Add --tpu-use-quic to bench-tps
* Use connection_cache async send
(cherry picked from commit 26899359d1
)
Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
use {
|
||||
crate::{
|
||||
client_error::ClientError,
|
||||
connection_cache::send_wire_transaction_async,
|
||||
pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
|
||||
rpc_client::RpcClient,
|
||||
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
|
||||
@@ -17,6 +18,7 @@ use {
|
||||
signature::SignerError,
|
||||
signers::Signers,
|
||||
transaction::{Transaction, TransactionError},
|
||||
transport::{Result as TransportResult, TransportError},
|
||||
},
|
||||
std::{
|
||||
collections::{HashMap, HashSet, VecDeque},
|
||||
@@ -73,7 +75,7 @@ impl Default for TpuClientConfig {
|
||||
/// Client which sends transactions directly to the current leader's TPU port over UDP.
|
||||
/// The client uses RPC to determine the current leader and fetch node contact info
|
||||
pub struct TpuClient {
|
||||
send_socket: UdpSocket,
|
||||
_deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket
|
||||
fanout_slots: u64,
|
||||
leader_tpu_service: LeaderTpuService,
|
||||
exit: Arc<AtomicBool>,
|
||||
@@ -85,39 +87,33 @@ impl TpuClient {
|
||||
/// size
|
||||
pub fn send_transaction(&self, transaction: &Transaction) -> bool {
|
||||
let wire_transaction = serialize(transaction).expect("serialization should succeed");
|
||||
self.send_wire_transaction(&wire_transaction)
|
||||
self.send_wire_transaction(wire_transaction)
|
||||
}
|
||||
|
||||
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
||||
pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool {
|
||||
pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
|
||||
self.try_send_wire_transaction(wire_transaction).is_ok()
|
||||
}
|
||||
|
||||
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
||||
/// size
|
||||
/// Returns the last error if all sends fail
|
||||
pub fn try_send_transaction(
|
||||
&self,
|
||||
transaction: &Transaction,
|
||||
) -> std::result::Result<(), std::io::Error> {
|
||||
pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
|
||||
let wire_transaction = serialize(transaction).expect("serialization should succeed");
|
||||
self.try_send_wire_transaction(&wire_transaction)
|
||||
self.try_send_wire_transaction(wire_transaction)
|
||||
}
|
||||
|
||||
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
||||
/// Returns the last error if all sends fail
|
||||
fn try_send_wire_transaction(
|
||||
&self,
|
||||
wire_transaction: &[u8],
|
||||
) -> std::result::Result<(), std::io::Error> {
|
||||
let mut last_error: Option<std::io::Error> = None;
|
||||
fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
|
||||
let mut last_error: Option<TransportError> = None;
|
||||
let mut some_success = false;
|
||||
|
||||
for tpu_address in self
|
||||
.leader_tpu_service
|
||||
.leader_tpu_sockets(self.fanout_slots)
|
||||
{
|
||||
let result = self.send_socket.send_to(wire_transaction, tpu_address);
|
||||
let result = send_wire_transaction_async(wire_transaction.clone(), &tpu_address);
|
||||
if let Err(err) = result {
|
||||
last_error = Some(err);
|
||||
} else {
|
||||
@@ -128,7 +124,7 @@ impl TpuClient {
|
||||
Err(if let Some(err) = last_error {
|
||||
err
|
||||
} else {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted")
|
||||
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
@@ -146,7 +142,7 @@ impl TpuClient {
|
||||
LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?;
|
||||
|
||||
Ok(Self {
|
||||
send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||
_deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||
fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
|
||||
leader_tpu_service,
|
||||
exit,
|
||||
|
Reference in New Issue
Block a user