diff --git a/Cargo.lock b/Cargo.lock index 836e3fc2b5..92d80fd4d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4637,6 +4637,7 @@ dependencies = [ "solana-measure", "solana-net-utils", "solana-sdk", + "solana-streamer", "solana-transaction-status", "solana-version", "solana-vote-program", diff --git a/client/Cargo.toml b/client/Cargo.toml index ed0b40c55a..a615b4ce43 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -41,6 +41,7 @@ solana-faucet = { path = "../faucet", version = "=1.10.4" } solana-measure = { path = "../measure", version = "=1.10.4" } solana-net-utils = { path = "../net-utils", version = "=1.10.4" } solana-sdk = { path = "../sdk", version = "=1.10.4" } +solana-streamer = { path = "../streamer", version = "=1.10.4" } solana-transaction-status = { path = "../transaction-status", version = "=1.10.4" } solana-version = { path = "../version", version = "=1.10.4" } solana-vote-program = { path = "../programs/vote", version = "=1.10.4" } diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index acc00f176e..399b502dc7 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -3,6 +3,7 @@ use { quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection, }, lazy_static::lazy_static, + solana_sdk::{transaction::VersionedTransaction, transport::TransportError}, std::{ collections::{hash_map::Entry, BTreeMap, HashMap}, net::{SocketAddr, UdpSocket}, @@ -13,9 +14,15 @@ use { // Should be non-zero static MAX_CONNECTIONS: usize = 64; +#[derive(Clone)] +enum Connection { + Udp(Arc), + Quic(Arc), +} + struct ConnMap { // Keeps track of the connection associated with an addr and the last time it was used - map: HashMap, u64)>, + map: HashMap, // Helps to find the least recently used connection. The search and inserts are O(log(n)) // but since we're bounding the size of the collections, this should be constant // (and hopefully negligible) time. In theory, we can do this in constant time @@ -55,7 +62,7 @@ pub fn set_use_quic(use_quic: bool) { #[allow(dead_code)] // TODO: see https://github.com/solana-labs/solana/issues/23661 // remove lazy_static and optimize and refactor this -pub fn get_connection(addr: &SocketAddr) -> Arc { +fn get_connection(addr: &SocketAddr) -> Connection { let mut map = (*CONNECTION_MAP).lock().unwrap(); let ticks = map.ticks; let use_quic = map.use_quic; @@ -71,10 +78,10 @@ pub fn get_connection(addr: &SocketAddr) -> Arc = if use_quic { - Arc::new(QuicTpuConnection::new(send_socket, *addr)) + let conn = if use_quic { + Connection::Quic(Arc::new(QuicTpuConnection::new(send_socket, *addr))) } else { - Arc::new(UdpTpuConnection::new(send_socket, *addr)) + Connection::Udp(Arc::new(UdpTpuConnection::new(send_socket, *addr))) }; entry.insert((conn.clone(), ticks)); @@ -101,13 +108,69 @@ pub fn get_connection(addr: &SocketAddr) -> Arc Result<(), TransportError> { + let conn = get_connection(addr); + match conn { + Connection::Udp(conn) => conn.send_wire_transaction_batch(packets), + Connection::Quic(conn) => conn.send_wire_transaction_batch(packets), + } +} + +pub fn send_wire_transaction( + wire_transaction: &[u8], + addr: &SocketAddr, +) -> Result<(), TransportError> { + let conn = get_connection(addr); + match conn { + Connection::Udp(conn) => conn.send_wire_transaction(wire_transaction), + Connection::Quic(conn) => conn.send_wire_transaction(wire_transaction), + } +} + +pub fn serialize_and_send_transaction( + transaction: &VersionedTransaction, + addr: &SocketAddr, +) -> Result<(), TransportError> { + let conn = get_connection(addr); + match conn { + Connection::Udp(conn) => conn.serialize_and_send_transaction(transaction), + Connection::Quic(conn) => conn.serialize_and_send_transaction(transaction), + } +} + +pub fn par_serialize_and_send_transaction_batch( + transactions: &[VersionedTransaction], + addr: &SocketAddr, +) -> Result<(), TransportError> { + let conn = get_connection(addr); + match conn { + Connection::Udp(conn) => conn.par_serialize_and_send_transaction_batch(transactions), + Connection::Quic(conn) => conn.par_serialize_and_send_transaction_batch(transactions), + } +} + #[cfg(test)] mod tests { use { - crate::connection_cache::{get_connection, CONNECTION_MAP, MAX_CONNECTIONS}, + crate::{ + connection_cache::{get_connection, Connection, CONNECTION_MAP, MAX_CONNECTIONS}, + tpu_connection::TpuConnection, + }, rand::{Rng, SeedableRng}, rand_chacha::ChaChaRng, - std::net::SocketAddr, + std::net::{IpAddr, SocketAddr}, }; fn get_addr(rng: &mut ChaChaRng) -> SocketAddr { @@ -121,6 +184,13 @@ mod tests { addr_str.parse().expect("Invalid address") } + fn ip(conn: Connection) -> IpAddr { + match conn { + Connection::Udp(conn) => conn.tpu_addr().ip(), + Connection::Quic(conn) => conn.tpu_addr().ip(), + } + } + #[test] fn test_connection_cache() { // Allow the test to run deterministically @@ -136,7 +206,7 @@ mod tests { // be lazy and not connect until first use or handle connection errors somehow // (without crashing, as would be required in a real practical validator) let first_addr = get_addr(&mut rng); - assert!(get_connection(&first_addr).tpu_addr().ip() == first_addr.ip()); + assert!(ip(get_connection(&first_addr)) == first_addr.ip()); let addrs = (0..MAX_CONNECTIONS) .into_iter() .map(|_| { @@ -149,7 +219,7 @@ mod tests { let map = (*CONNECTION_MAP).lock().unwrap(); addrs.iter().for_each(|a| { let conn = map.map.get(a).expect("Address not found"); - assert!(a.ip() == conn.0.tpu_addr().ip()); + assert!(a.ip() == ip(conn.0.clone())); }); assert!(map.map.get(&first_addr).is_none()); diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index 204b9d78f3..aa7135d44f 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -63,19 +63,22 @@ impl TpuConnection for QuicTpuConnection { &self.client.addr } - fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> { + fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> + where + T: AsRef<[u8]>, + { let _guard = self.client.runtime.enter(); let send_buffer = self.client.send_buffer(wire_transaction); self.client.runtime.block_on(send_buffer)?; Ok(()) } - fn send_wire_transaction_batch( - &self, - wire_transaction_batch: &[Vec], - ) -> TransportResult<()> { + fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> + where + T: AsRef<[u8]>, + { let _guard = self.client.runtime.enter(); - let send_batch = self.client.send_batch(wire_transaction_batch); + let send_batch = self.client.send_batch(buffers); self.client.runtime.block_on(send_batch)?; Ok(()) } @@ -158,12 +161,18 @@ impl QuicClient { } } - pub async fn send_buffer(&self, data: &[u8]) -> Result<(), ClientErrorKind> { - self._send_buffer(data).await?; + pub async fn send_buffer(&self, data: T) -> Result<(), ClientErrorKind> + where + T: AsRef<[u8]>, + { + self._send_buffer(data.as_ref()).await?; Ok(()) } - pub async fn send_batch(&self, buffers: &[Vec]) -> Result<(), ClientErrorKind> { + pub async fn send_batch(&self, buffers: &[T]) -> Result<(), ClientErrorKind> + where + T: AsRef<[u8]>, + { // Start off by "testing" the connection by sending the first transaction // This will also connect to the server if not already connected // and reconnect and retry if the first send attempt failed @@ -178,7 +187,7 @@ impl QuicClient { if buffers.is_empty() { return Ok(()); } - let connection = self._send_buffer(&buffers[0]).await?; + let connection = self._send_buffer(buffers[0].as_ref()).await?; // Used to avoid dereferencing the Arc multiple times below // by just getting a reference to the NewConnection once @@ -192,7 +201,7 @@ impl QuicClient { join_all( buffs .into_iter() - .map(|buf| Self::_send_buffer_using_conn(buf, connection_ref)), + .map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)), ) }); diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 0ebc4f02f6..c94e2c140a 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -613,7 +613,7 @@ impl AsyncClient for ThinClient { fn async_send_batch(&self, transactions: Vec) -> TransportResult<()> { let batch: Vec = transactions.into_iter().map(Into::into).collect(); self.tpu_connection() - .par_serialize_and_send_transaction_batch(&batch)?; + .par_serialize_and_send_transaction_batch(&batch[..])?; Ok(()) } diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index 8f61c27d3a..5228a54536 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -1,13 +1,11 @@ use { - rayon::iter::{IntoParallelRefIterator, ParallelIterator}, + rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, std::net::{SocketAddr, UdpSocket}, }; pub trait TpuConnection { - fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self - where - Self: Sized; + fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self; fn tpu_addr(&self) -> &SocketAddr; @@ -20,26 +18,23 @@ pub trait TpuConnection { self.send_wire_transaction(&wire_transaction) } - fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()>; + fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> + where + T: AsRef<[u8]>; fn par_serialize_and_send_transaction_batch( &self, - transaction_batch: &[VersionedTransaction], + transactions: &[VersionedTransaction], ) -> TransportResult<()> { - let wire_transaction_batch: Vec<_> = transaction_batch - .par_iter() + let buffers = transactions + .into_par_iter() .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) - .collect(); - self.send_wire_transaction_batch(&wire_transaction_batch) + .collect::>(); + + self.send_wire_transaction_batch(&buffers) } - fn send_wire_transaction_batch( - &self, - wire_transaction_batch: &[Vec], - ) -> TransportResult<()> { - for wire_transaction in wire_transaction_batch { - self.send_wire_transaction(wire_transaction)?; - } - Ok(()) - } + fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> + where + T: AsRef<[u8]>; } diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs index ce48c134bf..7838bc7c14 100644 --- a/client/src/udp_client.rs +++ b/client/src/udp_client.rs @@ -3,7 +3,9 @@ use { crate::tpu_connection::TpuConnection, + core::iter::repeat, solana_sdk::transport::Result as TransportResult, + solana_streamer::sendmmsg::batch_send, std::net::{SocketAddr, UdpSocket}, }; @@ -24,8 +26,20 @@ impl TpuConnection for UdpTpuConnection { &self.addr } - fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> { - self.socket.send_to(wire_transaction, self.addr)?; + fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> + where + T: AsRef<[u8]>, + { + self.socket.send_to(wire_transaction.as_ref(), self.addr)?; + Ok(()) + } + + fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> + where + T: AsRef<[u8]>, + { + let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect(); + batch_send(&self.socket, &pkts)?; Ok(()) } } diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 4d18c33bec..837a627b00 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -1,7 +1,7 @@ use { crate::tower_storage::{SavedTowerVersions, TowerStorage}, crossbeam_channel::Receiver, - solana_client::connection_cache::get_connection, + solana_client::connection_cache, solana_gossip::cluster_info::ClusterInfo, solana_measure::measure::Measure, solana_poh::poh_recorder::PohRecorder, @@ -90,7 +90,8 @@ impl VotingService { let mut measure = Measure::start("vote_tx_send-ms"); let target_address = target_address.unwrap_or_else(|| cluster_info.my_contact_info().tpu); - let _ = get_connection(&target_address).send_transaction(vote_op.tx()); + let wire_vote_tx = bincode::serialize(vote_op.tx()).expect("vote serialization failure"); + let _ = connection_cache::send_wire_transaction(&wire_vote_tx, &target_address); measure.stop(); inc_new_counter_info!("vote_tx_send-ms", measure.as_ms() as usize); diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 7f24759645..4e8658fe3a 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -174,6 +174,12 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64ct" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71acf5509fc522cce1b100ac0121c635129bfd4d91cdf036bcc9b9935f97ccf5" + [[package]] name = "bincode" version = "1.3.3" @@ -432,7 +438,7 @@ dependencies = [ "num-integer", "num-traits", "serde", - "time", + "time 0.1.43", "winapi", ] @@ -527,6 +533,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "const-oid" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" + [[package]] name = "constant_time_eq" version = "0.1.5" @@ -684,6 +696,15 @@ dependencies = [ "rayon", ] +[[package]] +name = "der" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c" +dependencies = [ + "const-oid", +] + [[package]] name = "derivation-path" version = "0.2.0" @@ -1226,6 +1247,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "histogram" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" + [[package]] name = "hmac" version = "0.8.1" @@ -1822,6 +1849,15 @@ dependencies = [ "syn 1.0.67", ] +[[package]] +name = "num_threads" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0" +dependencies = [ + "libc", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -1928,6 +1964,15 @@ dependencies = [ "digest 0.10.3", ] +[[package]] +name = "pem" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9a3b09a20e374558580a4914d3b7d89bd61b954a5a5e1dcbea98753addb1947" +dependencies = [ + "base64 0.13.0", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -1966,6 +2011,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0" +dependencies = [ + "der", + "spki", + "zeroize", +] + [[package]] name = "pkg-config" version = "0.3.17" @@ -2253,6 +2309,18 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rcgen" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7fa2d386df8533b02184941c76ae2e0d0c1d053f5d43339169d80f21275fc5e" +dependencies = [ + "pem", + "ring", + "time 0.3.7", + "yasna", +] + [[package]] name = "redox_syscall" version = "0.1.56" @@ -3284,6 +3352,7 @@ dependencies = [ "solana-measure", "solana-net-utils", "solana-sdk", + "solana-streamer", "solana-transaction-status", "solana-version", "solana-vote-program", @@ -3789,6 +3858,30 @@ dependencies = [ "thiserror", ] +[[package]] +name = "solana-streamer" +version = "1.10.4" +dependencies = [ + "crossbeam-channel", + "futures-util", + "histogram", + "itertools 0.10.3", + "libc", + "log", + "nix", + "pem", + "pkcs8", + "quinn", + "rand 0.7.3", + "rcgen", + "rustls", + "solana-metrics", + "solana-perf", + "solana-sdk", + "thiserror", + "tokio", +] + [[package]] name = "solana-transaction-status" version = "1.10.4" @@ -3903,7 +3996,7 @@ dependencies = [ "rustc-demangle", "scroll", "thiserror", - "time", + "time 0.1.43", ] [[package]] @@ -3912,6 +4005,16 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spki" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "spl-associated-token-account" version = "1.0.3" @@ -4136,6 +4239,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d" +dependencies = [ + "libc", + "num_threads", +] + [[package]] name = "tiny-bip39" version = "0.8.2" @@ -4711,6 +4824,15 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "yasna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "346d34a236c9d3e5f3b9b74563f238f955bbd05fa0b8b4efa53c130c43982f4c" +dependencies = [ + "time 0.3.7", +] + [[package]] name = "zeroize" version = "1.3.0" diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 99b3f2fc85..ce467b5d2c 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -310,9 +310,8 @@ impl SendTransactionService { fn send_transaction(tpu_address: &SocketAddr, wire_transaction: &[u8]) { let mut measure = Measure::start("send_transaction_service-us"); - let connection = connection_cache::get_connection(tpu_address); - if let Err(err) = connection.send_wire_transaction(wire_transaction) { + if let Err(err) = connection_cache::send_wire_transaction(wire_transaction, tpu_address) { warn!("Failed to send transaction to {}: {:?}", tpu_address, err); } measure.stop(); diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index b236473b65..e50acd8614 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -8,6 +8,7 @@ use { std::os::unix::io::AsRawFd, }; use { + solana_sdk::transport::TransportError, std::{ borrow::Borrow, io, @@ -24,6 +25,12 @@ pub enum SendPktsError { IoError(io::Error, usize), } +impl From for TransportError { + fn from(err: SendPktsError) -> Self { + Self::Custom(format!("{:?}", err)) + } +} + #[cfg(not(target_os = "linux"))] pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> where