Optimize TpuConnection and its implementations and refactor connection-cache to not use dyn in order to enable those changes (#23877) (#23909)

Co-authored-by: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com>
This commit is contained in:
mergify[bot] 2022-03-25 19:09:26 +01:00 committed by GitHub
parent af79a86a72
commit c0d3cd145e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 267 additions and 48 deletions

1
Cargo.lock generated
View File

@ -4637,6 +4637,7 @@ dependencies = [
"solana-measure", "solana-measure",
"solana-net-utils", "solana-net-utils",
"solana-sdk", "solana-sdk",
"solana-streamer",
"solana-transaction-status", "solana-transaction-status",
"solana-version", "solana-version",
"solana-vote-program", "solana-vote-program",

View File

@ -41,6 +41,7 @@ solana-faucet = { path = "../faucet", version = "=1.10.4" }
solana-measure = { path = "../measure", version = "=1.10.4" } solana-measure = { path = "../measure", version = "=1.10.4" }
solana-net-utils = { path = "../net-utils", version = "=1.10.4" } solana-net-utils = { path = "../net-utils", version = "=1.10.4" }
solana-sdk = { path = "../sdk", version = "=1.10.4" } solana-sdk = { path = "../sdk", version = "=1.10.4" }
solana-streamer = { path = "../streamer", version = "=1.10.4" }
solana-transaction-status = { path = "../transaction-status", version = "=1.10.4" } solana-transaction-status = { path = "../transaction-status", version = "=1.10.4" }
solana-version = { path = "../version", version = "=1.10.4" } solana-version = { path = "../version", version = "=1.10.4" }
solana-vote-program = { path = "../programs/vote", version = "=1.10.4" } solana-vote-program = { path = "../programs/vote", version = "=1.10.4" }

View File

@ -3,6 +3,7 @@ use {
quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection, quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection,
}, },
lazy_static::lazy_static, lazy_static::lazy_static,
solana_sdk::{transaction::VersionedTransaction, transport::TransportError},
std::{ std::{
collections::{hash_map::Entry, BTreeMap, HashMap}, collections::{hash_map::Entry, BTreeMap, HashMap},
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
@ -13,9 +14,15 @@ use {
// Should be non-zero // Should be non-zero
static MAX_CONNECTIONS: usize = 64; static MAX_CONNECTIONS: usize = 64;
#[derive(Clone)]
enum Connection {
Udp(Arc<UdpTpuConnection>),
Quic(Arc<QuicTpuConnection>),
}
struct ConnMap { struct ConnMap {
// Keeps track of the connection associated with an addr and the last time it was used // Keeps track of the connection associated with an addr and the last time it was used
map: HashMap<SocketAddr, (Arc<dyn TpuConnection + 'static + Sync + Send>, u64)>, map: HashMap<SocketAddr, (Connection, u64)>,
// Helps to find the least recently used connection. The search and inserts are O(log(n)) // Helps to find the least recently used connection. The search and inserts are O(log(n))
// but since we're bounding the size of the collections, this should be constant // but since we're bounding the size of the collections, this should be constant
// (and hopefully negligible) time. In theory, we can do this in constant time // (and hopefully negligible) time. In theory, we can do this in constant time
@ -55,7 +62,7 @@ pub fn set_use_quic(use_quic: bool) {
#[allow(dead_code)] #[allow(dead_code)]
// TODO: see https://github.com/solana-labs/solana/issues/23661 // TODO: see https://github.com/solana-labs/solana/issues/23661
// remove lazy_static and optimize and refactor this // remove lazy_static and optimize and refactor this
pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sync + Send> { fn get_connection(addr: &SocketAddr) -> Connection {
let mut map = (*CONNECTION_MAP).lock().unwrap(); let mut map = (*CONNECTION_MAP).lock().unwrap();
let ticks = map.ticks; let ticks = map.ticks;
let use_quic = map.use_quic; let use_quic = map.use_quic;
@ -71,10 +78,10 @@ pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sy
// TODO: see https://github.com/solana-labs/solana/issues/23659 // TODO: see https://github.com/solana-labs/solana/issues/23659
// make it configurable (e.g. via the command line) whether to use UDP or Quic // make it configurable (e.g. via the command line) whether to use UDP or Quic
let conn: Arc<dyn TpuConnection + 'static + Sync + Send> = if use_quic { let conn = if use_quic {
Arc::new(QuicTpuConnection::new(send_socket, *addr)) Connection::Quic(Arc::new(QuicTpuConnection::new(send_socket, *addr)))
} else { } else {
Arc::new(UdpTpuConnection::new(send_socket, *addr)) Connection::Udp(Arc::new(UdpTpuConnection::new(send_socket, *addr)))
}; };
entry.insert((conn.clone(), ticks)); entry.insert((conn.clone(), ticks));
@ -101,13 +108,69 @@ pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sy
conn conn
} }
// TODO: see https://github.com/solana-labs/solana/issues/23851
// use enum_dispatch and get rid of this tedious code.
// The main blocker to using enum_dispatch right now is that
// the it doesn't work with static methods like TpuConnection::new
// which is used by thin_client. This will be eliminated soon
// once thin_client is moved to using this connection cache.
// Once that is done, we will migrate to using enum_dispatch
// This will be done in a followup to
// https://github.com/solana-labs/solana/pull/23817
pub fn send_wire_transaction_batch(
packets: &[&[u8]],
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.send_wire_transaction_batch(packets),
Connection::Quic(conn) => conn.send_wire_transaction_batch(packets),
}
}
pub fn send_wire_transaction(
wire_transaction: &[u8],
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.send_wire_transaction(wire_transaction),
Connection::Quic(conn) => conn.send_wire_transaction(wire_transaction),
}
}
pub fn serialize_and_send_transaction(
transaction: &VersionedTransaction,
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.serialize_and_send_transaction(transaction),
Connection::Quic(conn) => conn.serialize_and_send_transaction(transaction),
}
}
pub fn par_serialize_and_send_transaction_batch(
transactions: &[VersionedTransaction],
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.par_serialize_and_send_transaction_batch(transactions),
Connection::Quic(conn) => conn.par_serialize_and_send_transaction_batch(transactions),
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use { use {
crate::connection_cache::{get_connection, CONNECTION_MAP, MAX_CONNECTIONS}, crate::{
connection_cache::{get_connection, Connection, CONNECTION_MAP, MAX_CONNECTIONS},
tpu_connection::TpuConnection,
},
rand::{Rng, SeedableRng}, rand::{Rng, SeedableRng},
rand_chacha::ChaChaRng, rand_chacha::ChaChaRng,
std::net::SocketAddr, std::net::{IpAddr, SocketAddr},
}; };
fn get_addr(rng: &mut ChaChaRng) -> SocketAddr { fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
@ -121,6 +184,13 @@ mod tests {
addr_str.parse().expect("Invalid address") addr_str.parse().expect("Invalid address")
} }
fn ip(conn: Connection) -> IpAddr {
match conn {
Connection::Udp(conn) => conn.tpu_addr().ip(),
Connection::Quic(conn) => conn.tpu_addr().ip(),
}
}
#[test] #[test]
fn test_connection_cache() { fn test_connection_cache() {
// Allow the test to run deterministically // Allow the test to run deterministically
@ -136,7 +206,7 @@ mod tests {
// be lazy and not connect until first use or handle connection errors somehow // be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator) // (without crashing, as would be required in a real practical validator)
let first_addr = get_addr(&mut rng); let first_addr = get_addr(&mut rng);
assert!(get_connection(&first_addr).tpu_addr().ip() == first_addr.ip()); assert!(ip(get_connection(&first_addr)) == first_addr.ip());
let addrs = (0..MAX_CONNECTIONS) let addrs = (0..MAX_CONNECTIONS)
.into_iter() .into_iter()
.map(|_| { .map(|_| {
@ -149,7 +219,7 @@ mod tests {
let map = (*CONNECTION_MAP).lock().unwrap(); let map = (*CONNECTION_MAP).lock().unwrap();
addrs.iter().for_each(|a| { addrs.iter().for_each(|a| {
let conn = map.map.get(a).expect("Address not found"); let conn = map.map.get(a).expect("Address not found");
assert!(a.ip() == conn.0.tpu_addr().ip()); assert!(a.ip() == ip(conn.0.clone()));
}); });
assert!(map.map.get(&first_addr).is_none()); assert!(map.map.get(&first_addr).is_none());

View File

@ -63,19 +63,22 @@ impl TpuConnection for QuicTpuConnection {
&self.client.addr &self.client.addr
} }
fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> { fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]>,
{
let _guard = self.client.runtime.enter(); let _guard = self.client.runtime.enter();
let send_buffer = self.client.send_buffer(wire_transaction); let send_buffer = self.client.send_buffer(wire_transaction);
self.client.runtime.block_on(send_buffer)?; self.client.runtime.block_on(send_buffer)?;
Ok(()) Ok(())
} }
fn send_wire_transaction_batch( fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
&self, where
wire_transaction_batch: &[Vec<u8>], T: AsRef<[u8]>,
) -> TransportResult<()> { {
let _guard = self.client.runtime.enter(); let _guard = self.client.runtime.enter();
let send_batch = self.client.send_batch(wire_transaction_batch); let send_batch = self.client.send_batch(buffers);
self.client.runtime.block_on(send_batch)?; self.client.runtime.block_on(send_batch)?;
Ok(()) Ok(())
} }
@ -158,12 +161,18 @@ impl QuicClient {
} }
} }
pub async fn send_buffer(&self, data: &[u8]) -> Result<(), ClientErrorKind> { pub async fn send_buffer<T>(&self, data: T) -> Result<(), ClientErrorKind>
self._send_buffer(data).await?; where
T: AsRef<[u8]>,
{
self._send_buffer(data.as_ref()).await?;
Ok(()) Ok(())
} }
pub async fn send_batch(&self, buffers: &[Vec<u8>]) -> Result<(), ClientErrorKind> { pub async fn send_batch<T>(&self, buffers: &[T]) -> Result<(), ClientErrorKind>
where
T: AsRef<[u8]>,
{
// Start off by "testing" the connection by sending the first transaction // Start off by "testing" the connection by sending the first transaction
// This will also connect to the server if not already connected // This will also connect to the server if not already connected
// and reconnect and retry if the first send attempt failed // and reconnect and retry if the first send attempt failed
@ -178,7 +187,7 @@ impl QuicClient {
if buffers.is_empty() { if buffers.is_empty() {
return Ok(()); return Ok(());
} }
let connection = self._send_buffer(&buffers[0]).await?; let connection = self._send_buffer(buffers[0].as_ref()).await?;
// Used to avoid dereferencing the Arc multiple times below // Used to avoid dereferencing the Arc multiple times below
// by just getting a reference to the NewConnection once // by just getting a reference to the NewConnection once
@ -192,7 +201,7 @@ impl QuicClient {
join_all( join_all(
buffs buffs
.into_iter() .into_iter()
.map(|buf| Self::_send_buffer_using_conn(buf, connection_ref)), .map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)),
) )
}); });

View File

@ -613,7 +613,7 @@ impl<C: 'static + TpuConnection> AsyncClient for ThinClient<C> {
fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> { fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> {
let batch: Vec<VersionedTransaction> = transactions.into_iter().map(Into::into).collect(); let batch: Vec<VersionedTransaction> = transactions.into_iter().map(Into::into).collect();
self.tpu_connection() self.tpu_connection()
.par_serialize_and_send_transaction_batch(&batch)?; .par_serialize_and_send_transaction_batch(&batch[..])?;
Ok(()) Ok(())
} }

View File

@ -1,13 +1,11 @@
use { use {
rayon::iter::{IntoParallelRefIterator, ParallelIterator}, rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
std::net::{SocketAddr, UdpSocket}, std::net::{SocketAddr, UdpSocket},
}; };
pub trait TpuConnection { pub trait TpuConnection {
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self;
where
Self: Sized;
fn tpu_addr(&self) -> &SocketAddr; fn tpu_addr(&self) -> &SocketAddr;
@ -20,26 +18,23 @@ pub trait TpuConnection {
self.send_wire_transaction(&wire_transaction) self.send_wire_transaction(&wire_transaction)
} }
fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()>; fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]>;
fn par_serialize_and_send_transaction_batch( fn par_serialize_and_send_transaction_batch(
&self, &self,
transaction_batch: &[VersionedTransaction], transactions: &[VersionedTransaction],
) -> TransportResult<()> { ) -> TransportResult<()> {
let wire_transaction_batch: Vec<_> = transaction_batch let buffers = transactions
.par_iter() .into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect(); .collect::<Vec<_>>();
self.send_wire_transaction_batch(&wire_transaction_batch)
self.send_wire_transaction_batch(&buffers)
} }
fn send_wire_transaction_batch( fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
&self, where
wire_transaction_batch: &[Vec<u8>], T: AsRef<[u8]>;
) -> TransportResult<()> {
for wire_transaction in wire_transaction_batch {
self.send_wire_transaction(wire_transaction)?;
}
Ok(())
}
} }

View File

@ -3,7 +3,9 @@
use { use {
crate::tpu_connection::TpuConnection, crate::tpu_connection::TpuConnection,
core::iter::repeat,
solana_sdk::transport::Result as TransportResult, solana_sdk::transport::Result as TransportResult,
solana_streamer::sendmmsg::batch_send,
std::net::{SocketAddr, UdpSocket}, std::net::{SocketAddr, UdpSocket},
}; };
@ -24,8 +26,20 @@ impl TpuConnection for UdpTpuConnection {
&self.addr &self.addr
} }
fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> { fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
self.socket.send_to(wire_transaction, self.addr)?; where
T: AsRef<[u8]>,
{
self.socket.send_to(wire_transaction.as_ref(), self.addr)?;
Ok(())
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]>,
{
let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect();
batch_send(&self.socket, &pkts)?;
Ok(()) Ok(())
} }
} }

View File

@ -1,7 +1,7 @@
use { use {
crate::tower_storage::{SavedTowerVersions, TowerStorage}, crate::tower_storage::{SavedTowerVersions, TowerStorage},
crossbeam_channel::Receiver, crossbeam_channel::Receiver,
solana_client::connection_cache::get_connection, solana_client::connection_cache,
solana_gossip::cluster_info::ClusterInfo, solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_poh::poh_recorder::PohRecorder, solana_poh::poh_recorder::PohRecorder,
@ -90,7 +90,8 @@ impl VotingService {
let mut measure = Measure::start("vote_tx_send-ms"); let mut measure = Measure::start("vote_tx_send-ms");
let target_address = target_address.unwrap_or_else(|| cluster_info.my_contact_info().tpu); let target_address = target_address.unwrap_or_else(|| cluster_info.my_contact_info().tpu);
let _ = get_connection(&target_address).send_transaction(vote_op.tx()); let wire_vote_tx = bincode::serialize(vote_op.tx()).expect("vote serialization failure");
let _ = connection_cache::send_wire_transaction(&wire_vote_tx, &target_address);
measure.stop(); measure.stop();
inc_new_counter_info!("vote_tx_send-ms", measure.as_ms() as usize); inc_new_counter_info!("vote_tx_send-ms", measure.as_ms() as usize);

126
programs/bpf/Cargo.lock generated
View File

@ -174,6 +174,12 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "base64ct"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71acf5509fc522cce1b100ac0121c635129bfd4d91cdf036bcc9b9935f97ccf5"
[[package]] [[package]]
name = "bincode" name = "bincode"
version = "1.3.3" version = "1.3.3"
@ -432,7 +438,7 @@ dependencies = [
"num-integer", "num-integer",
"num-traits", "num-traits",
"serde", "serde",
"time", "time 0.1.43",
"winapi", "winapi",
] ]
@ -527,6 +533,12 @@ dependencies = [
"web-sys", "web-sys",
] ]
[[package]]
name = "const-oid"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3"
[[package]] [[package]]
name = "constant_time_eq" name = "constant_time_eq"
version = "0.1.5" version = "0.1.5"
@ -684,6 +696,15 @@ dependencies = [
"rayon", "rayon",
] ]
[[package]]
name = "der"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c"
dependencies = [
"const-oid",
]
[[package]] [[package]]
name = "derivation-path" name = "derivation-path"
version = "0.2.0" version = "0.2.0"
@ -1226,6 +1247,12 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "histogram"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669"
[[package]] [[package]]
name = "hmac" name = "hmac"
version = "0.8.1" version = "0.8.1"
@ -1822,6 +1849,15 @@ dependencies = [
"syn 1.0.67", "syn 1.0.67",
] ]
[[package]]
name = "num_threads"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "number_prefix" name = "number_prefix"
version = "0.4.0" version = "0.4.0"
@ -1928,6 +1964,15 @@ dependencies = [
"digest 0.10.3", "digest 0.10.3",
] ]
[[package]]
name = "pem"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9a3b09a20e374558580a4914d3b7d89bd61b954a5a5e1dcbea98753addb1947"
dependencies = [
"base64 0.13.0",
]
[[package]] [[package]]
name = "percent-encoding" name = "percent-encoding"
version = "2.1.0" version = "2.1.0"
@ -1966,6 +2011,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs8"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0"
dependencies = [
"der",
"spki",
"zeroize",
]
[[package]] [[package]]
name = "pkg-config" name = "pkg-config"
version = "0.3.17" version = "0.3.17"
@ -2253,6 +2309,18 @@ dependencies = [
"num_cpus", "num_cpus",
] ]
[[package]]
name = "rcgen"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7fa2d386df8533b02184941c76ae2e0d0c1d053f5d43339169d80f21275fc5e"
dependencies = [
"pem",
"ring",
"time 0.3.7",
"yasna",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.1.56" version = "0.1.56"
@ -3284,6 +3352,7 @@ dependencies = [
"solana-measure", "solana-measure",
"solana-net-utils", "solana-net-utils",
"solana-sdk", "solana-sdk",
"solana-streamer",
"solana-transaction-status", "solana-transaction-status",
"solana-version", "solana-version",
"solana-vote-program", "solana-vote-program",
@ -3789,6 +3858,30 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "solana-streamer"
version = "1.10.4"
dependencies = [
"crossbeam-channel",
"futures-util",
"histogram",
"itertools 0.10.3",
"libc",
"log",
"nix",
"pem",
"pkcs8",
"quinn",
"rand 0.7.3",
"rcgen",
"rustls",
"solana-metrics",
"solana-perf",
"solana-sdk",
"thiserror",
"tokio",
]
[[package]] [[package]]
name = "solana-transaction-status" name = "solana-transaction-status"
version = "1.10.4" version = "1.10.4"
@ -3903,7 +3996,7 @@ dependencies = [
"rustc-demangle", "rustc-demangle",
"scroll", "scroll",
"thiserror", "thiserror",
"time", "time 0.1.43",
] ]
[[package]] [[package]]
@ -3912,6 +4005,16 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spki"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27"
dependencies = [
"base64ct",
"der",
]
[[package]] [[package]]
name = "spl-associated-token-account" name = "spl-associated-token-account"
version = "1.0.3" version = "1.0.3"
@ -4136,6 +4239,16 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "time"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d"
dependencies = [
"libc",
"num_threads",
]
[[package]] [[package]]
name = "tiny-bip39" name = "tiny-bip39"
version = "0.8.2" version = "0.8.2"
@ -4711,6 +4824,15 @@ dependencies = [
"linked-hash-map", "linked-hash-map",
] ]
[[package]]
name = "yasna"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "346d34a236c9d3e5f3b9b74563f238f955bbd05fa0b8b4efa53c130c43982f4c"
dependencies = [
"time 0.3.7",
]
[[package]] [[package]]
name = "zeroize" name = "zeroize"
version = "1.3.0" version = "1.3.0"

View File

@ -310,9 +310,8 @@ impl SendTransactionService {
fn send_transaction(tpu_address: &SocketAddr, wire_transaction: &[u8]) { fn send_transaction(tpu_address: &SocketAddr, wire_transaction: &[u8]) {
let mut measure = Measure::start("send_transaction_service-us"); let mut measure = Measure::start("send_transaction_service-us");
let connection = connection_cache::get_connection(tpu_address);
if let Err(err) = connection.send_wire_transaction(wire_transaction) { if let Err(err) = connection_cache::send_wire_transaction(wire_transaction, tpu_address) {
warn!("Failed to send transaction to {}: {:?}", tpu_address, err); warn!("Failed to send transaction to {}: {:?}", tpu_address, err);
} }
measure.stop(); measure.stop();

View File

@ -8,6 +8,7 @@ use {
std::os::unix::io::AsRawFd, std::os::unix::io::AsRawFd,
}; };
use { use {
solana_sdk::transport::TransportError,
std::{ std::{
borrow::Borrow, borrow::Borrow,
io, io,
@ -24,6 +25,12 @@ pub enum SendPktsError {
IoError(io::Error, usize), IoError(io::Error, usize),
} }
impl From<SendPktsError> for TransportError {
fn from(err: SendPktsError) -> Self {
Self::Custom(format!("{:?}", err))
}
}
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
where where