@ -3,10 +3,11 @@ use {
|
||||
quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection,
|
||||
},
|
||||
lazy_static::lazy_static,
|
||||
solana_net_utils::VALIDATOR_PORT_RANGE,
|
||||
solana_sdk::{transaction::VersionedTransaction, transport::TransportError},
|
||||
std::{
|
||||
collections::{hash_map::Entry, BTreeMap, HashMap},
|
||||
net::{SocketAddr, UdpSocket},
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
sync::{Arc, Mutex},
|
||||
},
|
||||
};
|
||||
@ -74,10 +75,11 @@ fn get_connection(addr: &SocketAddr) -> Connection {
|
||||
(pair.0.clone(), old_ticks)
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
// TODO: see https://github.com/solana-labs/solana/issues/23659
|
||||
// make it configurable (e.g. via the command line) whether to use UDP or Quic
|
||||
|
||||
let (_, send_socket) = solana_net_utils::bind_in_range(
|
||||
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||
VALIDATOR_PORT_RANGE,
|
||||
)
|
||||
.unwrap();
|
||||
let conn = if use_quic {
|
||||
Connection::Quic(Arc::new(QuicTpuConnection::new(send_socket, *addr)))
|
||||
} else {
|
||||
|
@ -5,8 +5,13 @@
|
||||
|
||||
use {
|
||||
crate::{
|
||||
rpc_client::RpcClient, rpc_config::RpcProgramAccountsConfig, rpc_response::Response,
|
||||
tpu_connection::TpuConnection, udp_client::UdpTpuConnection,
|
||||
connection_cache::{
|
||||
par_serialize_and_send_transaction_batch, send_wire_transaction,
|
||||
serialize_and_send_transaction,
|
||||
},
|
||||
rpc_client::RpcClient,
|
||||
rpc_config::RpcProgramAccountsConfig,
|
||||
rpc_response::Response,
|
||||
},
|
||||
log::*,
|
||||
solana_sdk::{
|
||||
@ -29,7 +34,7 @@ use {
|
||||
},
|
||||
std::{
|
||||
io,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
|
||||
net::SocketAddr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||
RwLock,
|
||||
@ -118,64 +123,52 @@ impl ClientOptimizer {
|
||||
}
|
||||
|
||||
/// An object for querying and sending transactions to the network.
|
||||
pub struct ThinClient<C: 'static + TpuConnection> {
|
||||
pub struct ThinClient {
|
||||
rpc_clients: Vec<RpcClient>,
|
||||
tpu_connections: Vec<C>,
|
||||
tpu_addrs: Vec<SocketAddr>,
|
||||
optimizer: ClientOptimizer,
|
||||
}
|
||||
|
||||
impl<C: 'static + TpuConnection> ThinClient<C> {
|
||||
impl ThinClient {
|
||||
/// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
|
||||
/// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
|
||||
/// (currently hardcoded to UDP)
|
||||
pub fn new(rpc_addr: SocketAddr, tpu_addr: SocketAddr, transactions_socket: UdpSocket) -> Self {
|
||||
let tpu_connection = C::new(transactions_socket, tpu_addr);
|
||||
|
||||
Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_connection)
|
||||
pub fn new(rpc_addr: SocketAddr, tpu_addr: SocketAddr) -> Self {
|
||||
Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr)
|
||||
}
|
||||
|
||||
pub fn new_socket_with_timeout(
|
||||
rpc_addr: SocketAddr,
|
||||
tpu_addr: SocketAddr,
|
||||
transactions_socket: UdpSocket,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
|
||||
let tpu_connection = C::new(transactions_socket, tpu_addr);
|
||||
Self::new_from_client(rpc_client, tpu_connection)
|
||||
Self::new_from_client(rpc_client, tpu_addr)
|
||||
}
|
||||
|
||||
fn new_from_client(rpc_client: RpcClient, tpu_connection: C) -> Self {
|
||||
fn new_from_client(rpc_client: RpcClient, tpu_addr: SocketAddr) -> Self {
|
||||
Self {
|
||||
rpc_clients: vec![rpc_client],
|
||||
tpu_connections: vec![tpu_connection],
|
||||
tpu_addrs: vec![tpu_addr],
|
||||
optimizer: ClientOptimizer::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_from_addrs(
|
||||
rpc_addrs: Vec<SocketAddr>,
|
||||
tpu_addrs: Vec<SocketAddr>,
|
||||
transactions_socket: UdpSocket,
|
||||
) -> Self {
|
||||
pub fn new_from_addrs(rpc_addrs: Vec<SocketAddr>, tpu_addrs: Vec<SocketAddr>) -> Self {
|
||||
assert!(!rpc_addrs.is_empty());
|
||||
assert_eq!(rpc_addrs.len(), tpu_addrs.len());
|
||||
|
||||
let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
|
||||
let optimizer = ClientOptimizer::new(rpc_clients.len());
|
||||
let tpu_connections: Vec<_> = tpu_addrs
|
||||
.into_iter()
|
||||
.map(|tpu_addr| C::new(transactions_socket.try_clone().unwrap(), tpu_addr))
|
||||
.collect();
|
||||
Self {
|
||||
rpc_clients,
|
||||
tpu_connections,
|
||||
tpu_addrs,
|
||||
optimizer,
|
||||
}
|
||||
}
|
||||
|
||||
fn tpu_connection(&self) -> &C {
|
||||
&self.tpu_connections[self.optimizer.best()]
|
||||
fn tpu_addr(&self) -> &SocketAddr {
|
||||
&self.tpu_addrs[self.optimizer.best()]
|
||||
}
|
||||
|
||||
fn rpc_client(&self) -> &RpcClient {
|
||||
@ -220,8 +213,7 @@ impl<C: 'static + TpuConnection> ThinClient<C> {
|
||||
while now.elapsed().as_secs() < wait_time as u64 {
|
||||
if num_confirmed == 0 {
|
||||
// Send the transaction if there has been no confirmation (e.g. the first time)
|
||||
self.tpu_connection()
|
||||
.send_wire_transaction(&wire_transaction)?;
|
||||
send_wire_transaction(&wire_transaction, self.tpu_addr())?;
|
||||
}
|
||||
|
||||
if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
|
||||
@ -316,13 +308,13 @@ impl<C: 'static + TpuConnection> ThinClient<C> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: 'static + TpuConnection> Client for ThinClient<C> {
|
||||
impl Client for ThinClient {
|
||||
fn tpu_addr(&self) -> String {
|
||||
self.tpu_connection().tpu_addr().to_string()
|
||||
self.tpu_addr().to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: 'static + TpuConnection> SyncClient for ThinClient<C> {
|
||||
impl SyncClient for ThinClient {
|
||||
fn send_and_confirm_message<T: Signers>(
|
||||
&self,
|
||||
keypairs: &T,
|
||||
@ -602,18 +594,16 @@ impl<C: 'static + TpuConnection> SyncClient for ThinClient<C> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: 'static + TpuConnection> AsyncClient for ThinClient<C> {
|
||||
impl AsyncClient for ThinClient {
|
||||
fn async_send_transaction(&self, transaction: Transaction) -> TransportResult<Signature> {
|
||||
let transaction = VersionedTransaction::from(transaction);
|
||||
self.tpu_connection()
|
||||
.serialize_and_send_transaction(&transaction)?;
|
||||
serialize_and_send_transaction(&transaction, self.tpu_addr())?;
|
||||
Ok(transaction.signatures[0])
|
||||
}
|
||||
|
||||
fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> {
|
||||
let batch: Vec<VersionedTransaction> = transactions.into_iter().map(Into::into).collect();
|
||||
self.tpu_connection()
|
||||
.par_serialize_and_send_transaction_batch(&batch[..])?;
|
||||
par_serialize_and_send_transaction_batch(&batch[..], self.tpu_addr())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -648,23 +638,15 @@ impl<C: 'static + TpuConnection> AsyncClient for ThinClient<C> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_client(
|
||||
(rpc, tpu): (SocketAddr, SocketAddr),
|
||||
range: (u16, u16),
|
||||
) -> ThinClient<UdpTpuConnection> {
|
||||
let (_, transactions_socket) =
|
||||
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), range).unwrap();
|
||||
ThinClient::<UdpTpuConnection>::new(rpc, tpu, transactions_socket)
|
||||
pub fn create_client((rpc, tpu): (SocketAddr, SocketAddr)) -> ThinClient {
|
||||
ThinClient::new(rpc, tpu)
|
||||
}
|
||||
|
||||
pub fn create_client_with_timeout(
|
||||
(rpc, tpu): (SocketAddr, SocketAddr),
|
||||
range: (u16, u16),
|
||||
timeout: Duration,
|
||||
) -> ThinClient<UdpTpuConnection> {
|
||||
let (_, transactions_socket) =
|
||||
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), range).unwrap();
|
||||
ThinClient::<UdpTpuConnection>::new_socket_with_timeout(rpc, tpu, transactions_socket, timeout)
|
||||
) -> ThinClient {
|
||||
ThinClient::new_socket_with_timeout(rpc, tpu, timeout)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
Reference in New Issue
Block a user