diff --git a/Cargo.lock b/Cargo.lock index 1cf348b19c..35f6debcb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4658,6 +4658,7 @@ dependencies = [ "log", "lru", "quinn", + "quinn-proto", "rand 0.7.3", "rand_chacha 0.2.2", "rayon", diff --git a/client/Cargo.toml b/client/Cargo.toml index ebe822557c..5b936757b0 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -27,6 +27,7 @@ lazy_static = "1.4.0" log = "0.4.14" lru = "0.7.5" quinn = "0.8.0" +quinn-proto = "0.8.0" rand = "0.7.0" rand_chacha = "0.2.2" rayon = "1.5.1" diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index fa4a1925ed..4d37ffe73a 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -1,14 +1,21 @@ use { crate::{ - quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection, + quic_client::QuicTpuConnection, + tpu_connection::{ClientStats, TpuConnection}, + udp_client::UdpTpuConnection, }, lazy_static::lazy_static, lru::LruCache, solana_net_utils::VALIDATOR_PORT_RANGE, - solana_sdk::{transaction::VersionedTransaction, transport::TransportError}, + solana_sdk::{ + timing::AtomicInterval, transaction::VersionedTransaction, transport::TransportError, + }, std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::{Arc, Mutex}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, + }, }, }; @@ -21,8 +28,99 @@ enum Connection { Quic(Arc), } +#[derive(Default)] +struct ConnectionCacheStats { + cache_hits: AtomicU64, + cache_misses: AtomicU64, + sent_packets: AtomicU64, + total_batches: AtomicU64, + batch_success: AtomicU64, + batch_failure: AtomicU64, + + // Need to track these separately per-connection + // because we need to track the base stat value from quinn + total_client_stats: ClientStats, +} + +const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000; + +impl ConnectionCacheStats { + fn add_client_stats(&self, client_stats: &ClientStats, num_packets: usize, is_success: bool) { + self.total_client_stats.total_connections.fetch_add( + client_stats.total_connections.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + self.total_client_stats.connection_reuse.fetch_add( + client_stats.connection_reuse.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + self.sent_packets + .fetch_add(num_packets as u64, Ordering::Relaxed); + self.total_batches.fetch_add(1, Ordering::Relaxed); + if is_success { + self.batch_success.fetch_add(1, Ordering::Relaxed); + } else { + self.batch_failure.fetch_add(1, Ordering::Relaxed); + } + } + + fn report(&self) { + datapoint_info!( + "quic-client-connection-stats", + ( + "cache_hits", + self.cache_hits.swap(0, Ordering::Relaxed), + i64 + ), + ( + "cache_misses", + self.cache_misses.swap(0, Ordering::Relaxed), + i64 + ), + ( + "total_connections", + self.total_client_stats + .total_connections + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "connection_reuse", + self.total_client_stats + .connection_reuse + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "congestion_events", + self.total_client_stats.congestion_events.load_and_reset(), + i64 + ), + ( + "tx_streams_blocked_uni", + self.total_client_stats + .tx_streams_blocked_uni + .load_and_reset(), + i64 + ), + ( + "tx_data_blocked", + self.total_client_stats.tx_data_blocked.load_and_reset(), + i64 + ), + ( + "tx_acks", + self.total_client_stats.tx_acks.load_and_reset(), + i64 + ), + ); + } +} + struct ConnMap { map: LruCache, + stats: Arc, + last_stats: AtomicInterval, use_quic: bool, } @@ -30,6 +128,8 @@ impl ConnMap { pub fn new() -> Self { Self { map: LruCache::new(MAX_CONNECTIONS), + stats: Arc::new(ConnectionCacheStats::default()), + last_stats: AtomicInterval::default(), use_quic: false, } } @@ -50,11 +150,25 @@ pub fn set_use_quic(use_quic: bool) { // TODO: see https://github.com/solana-labs/solana/issues/23661 // remove lazy_static and optimize and refactor this -fn get_connection(addr: &SocketAddr) -> Connection { +fn get_connection(addr: &SocketAddr) -> (Connection, Arc) { let mut map = (*CONNECTION_MAP).lock().unwrap(); - match map.map.get(addr) { - Some(connection) => connection.clone(), + if map + .last_stats + .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL) + { + map.stats.report(); + } + + let (connection, hit, maybe_stats) = match map.map.get(addr) { + Some(connection) => { + let mut stats = None; + // update connection stats + if let Connection::Quic(conn) = connection { + stats = conn.stats().map(|s| (conn.base_stats(), s)); + } + (connection.clone(), true, stats) + } None => { let (_, send_socket) = solana_net_utils::bind_in_range( IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), @@ -68,9 +182,41 @@ fn get_connection(addr: &SocketAddr) -> Connection { }; map.map.put(*addr, connection.clone()); - connection + (connection, false, None) } + }; + + if let Some((connection_stats, new_stats)) = maybe_stats { + map.stats.total_client_stats.congestion_events.update_stat( + &connection_stats.congestion_events, + new_stats.path.congestion_events, + ); + + map.stats + .total_client_stats + .tx_streams_blocked_uni + .update_stat( + &connection_stats.tx_streams_blocked_uni, + new_stats.frame_tx.streams_blocked_uni, + ); + + map.stats.total_client_stats.tx_data_blocked.update_stat( + &connection_stats.tx_data_blocked, + new_stats.frame_tx.data_blocked, + ); + + map.stats + .total_client_stats + .tx_acks + .update_stat(&connection_stats.tx_acks, new_stats.frame_tx.acks); } + + if hit { + map.stats.cache_hits.fetch_add(1, Ordering::Relaxed); + } else { + map.stats.cache_misses.fetch_add(1, Ordering::Relaxed); + } + (connection, map.stats.clone()) } // TODO: see https://github.com/solana-labs/solana/issues/23851 @@ -86,55 +232,67 @@ pub fn send_wire_transaction_batch( packets: &[&[u8]], addr: &SocketAddr, ) -> Result<(), TransportError> { - let conn = get_connection(addr); - match conn { - Connection::Udp(conn) => conn.send_wire_transaction_batch(packets), - Connection::Quic(conn) => conn.send_wire_transaction_batch(packets), - } + let (conn, stats) = get_connection(addr); + let client_stats = ClientStats::default(); + let r = match conn { + Connection::Udp(conn) => conn.send_wire_transaction_batch(packets, &client_stats), + Connection::Quic(conn) => conn.send_wire_transaction_batch(packets, &client_stats), + }; + stats.add_client_stats(&client_stats, packets.len(), r.is_ok()); + r } pub fn send_wire_transaction_async( packets: Vec, addr: &SocketAddr, ) -> Result<(), TransportError> { - let conn = get_connection(addr); - match conn { - Connection::Udp(conn) => conn.send_wire_transaction_async(packets), - Connection::Quic(conn) => conn.send_wire_transaction_async(packets), - } + let (conn, stats) = get_connection(addr); + let client_stats = Arc::new(ClientStats::default()); + let r = match conn { + Connection::Udp(conn) => conn.send_wire_transaction_async(packets, client_stats.clone()), + Connection::Quic(conn) => conn.send_wire_transaction_async(packets, client_stats.clone()), + }; + stats.add_client_stats(&client_stats, 1, r.is_ok()); + r } pub fn send_wire_transaction( wire_transaction: &[u8], addr: &SocketAddr, ) -> Result<(), TransportError> { - let conn = get_connection(addr); - match conn { - Connection::Udp(conn) => conn.send_wire_transaction(wire_transaction), - Connection::Quic(conn) => conn.send_wire_transaction(wire_transaction), - } + send_wire_transaction_batch(&[wire_transaction], addr) } pub fn serialize_and_send_transaction( transaction: &VersionedTransaction, addr: &SocketAddr, ) -> Result<(), TransportError> { - let conn = get_connection(addr); - match conn { - Connection::Udp(conn) => conn.serialize_and_send_transaction(transaction), - Connection::Quic(conn) => conn.serialize_and_send_transaction(transaction), - } + let (conn, stats) = get_connection(addr); + let client_stats = ClientStats::default(); + let r = match conn { + Connection::Udp(conn) => conn.serialize_and_send_transaction(transaction, &client_stats), + Connection::Quic(conn) => conn.serialize_and_send_transaction(transaction, &client_stats), + }; + stats.add_client_stats(&client_stats, 1, r.is_ok()); + r } pub fn par_serialize_and_send_transaction_batch( transactions: &[VersionedTransaction], addr: &SocketAddr, ) -> Result<(), TransportError> { - let conn = get_connection(addr); - match conn { - Connection::Udp(conn) => conn.par_serialize_and_send_transaction_batch(transactions), - Connection::Quic(conn) => conn.par_serialize_and_send_transaction_batch(transactions), - } + let (conn, stats) = get_connection(addr); + let client_stats = ClientStats::default(); + let r = match conn { + Connection::Udp(conn) => { + conn.par_serialize_and_send_transaction_batch(transactions, &client_stats) + } + Connection::Quic(conn) => { + conn.par_serialize_and_send_transaction_batch(transactions, &client_stats) + } + }; + stats.add_client_stats(&client_stats, transactions.len(), r.is_ok()); + r } #[cfg(test)] @@ -182,7 +340,7 @@ mod tests { // be lazy and not connect until first use or handle connection errors somehow // (without crashing, as would be required in a real practical validator) let first_addr = get_addr(&mut rng); - assert!(ip(get_connection(&first_addr)) == first_addr.ip()); + assert!(ip(get_connection(&first_addr).0) == first_addr.ip()); let addrs = (0..MAX_CONNECTIONS) .into_iter() .map(|_| { diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index 08d3af8a9b..0bcb97f4e1 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -2,20 +2,24 @@ //! an interface for sending transactions which is restricted by the server's flow control. use { - crate::{client_error::ClientErrorKind, tpu_connection::TpuConnection}, + crate::{ + client_error::ClientErrorKind, + tpu_connection::{ClientStats, TpuConnection}, + }, async_mutex::Mutex, futures::future::join_all, itertools::Itertools, lazy_static::lazy_static, log::*, quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError}, + quinn_proto::ConnectionStats, solana_sdk::{ quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET}, transport::Result as TransportResult, }, std::{ net::{SocketAddr, UdpSocket}, - sync::Arc, + sync::{atomic::Ordering, Arc}, }, tokio::runtime::Runtime, }; @@ -52,12 +56,23 @@ struct QuicClient { endpoint: Endpoint, connection: Arc>>>, addr: SocketAddr, + stats: Arc, } pub struct QuicTpuConnection { client: Arc, } +impl QuicTpuConnection { + pub fn stats(&self) -> Option { + self.client.stats() + } + + pub fn base_stats(&self) -> Arc { + self.client.stats.clone() + } +} + impl TpuConnection for QuicTpuConnection { fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self { let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET); @@ -70,33 +85,45 @@ impl TpuConnection for QuicTpuConnection { &self.client.addr } - fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> + fn send_wire_transaction( + &self, + wire_transaction: T, + stats: &ClientStats, + ) -> TransportResult<()> where T: AsRef<[u8]>, { let _guard = RUNTIME.enter(); - let send_buffer = self.client.send_buffer(wire_transaction); + let send_buffer = self.client.send_buffer(wire_transaction, stats); RUNTIME.block_on(send_buffer)?; Ok(()) } - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> + fn send_wire_transaction_batch( + &self, + buffers: &[T], + stats: &ClientStats, + ) -> TransportResult<()> where T: AsRef<[u8]>, { let _guard = RUNTIME.enter(); - let send_batch = self.client.send_batch(buffers); + let send_batch = self.client.send_batch(buffers, stats); RUNTIME.block_on(send_batch)?; Ok(()) } - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { + fn send_wire_transaction_async( + &self, + wire_transaction: Vec, + stats: Arc, + ) -> TransportResult<()> { let _guard = RUNTIME.enter(); //drop and detach the task let client = self.client.clone(); inc_new_counter_info!("send_wire_transaction_async", 1); let _ = RUNTIME.spawn(async move { - let send_buffer = client.send_buffer(wire_transaction); + let send_buffer = client.send_buffer(wire_transaction, &stats); if let Err(e) = send_buffer.await { inc_new_counter_warn!("send_wire_transaction_async_fail", 1); warn!("Failed to send transaction async to {:?}", e); @@ -127,9 +154,16 @@ impl QuicClient { endpoint, connection: Arc::new(Mutex::new(None)), addr, + stats: Arc::new(ClientStats::default()), } } + pub fn stats(&self) -> Option { + let conn_guard = self.connection.lock(); + let x = RUNTIME.block_on(conn_guard); + x.as_ref().map(|c| c.connection.stats()) + } + // If this function becomes public, it should be changed to // not expose details of the specific Quic implementation we're using async fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint { @@ -146,18 +180,35 @@ impl QuicClient { Ok(()) } + async fn make_connection(&self, stats: &ClientStats) -> Result, WriteError> { + let connecting = self.endpoint.connect(self.addr, "connect").unwrap(); + stats.total_connections.fetch_add(1, Ordering::Relaxed); + let connecting_result = connecting.await; + if connecting_result.is_err() { + stats.connection_errors.fetch_add(1, Ordering::Relaxed); + } + let connection = connecting_result?; + Ok(Arc::new(connection)) + } + // Attempts to send data, connecting/reconnecting as necessary // On success, returns the connection used to successfully send the data - async fn _send_buffer(&self, data: &[u8]) -> Result, WriteError> { + async fn _send_buffer( + &self, + data: &[u8], + stats: &ClientStats, + ) -> Result, WriteError> { let connection = { let mut conn_guard = self.connection.lock().await; let maybe_conn = (*conn_guard).clone(); match maybe_conn { - Some(conn) => conn.clone(), + Some(conn) => { + stats.connection_reuse.fetch_add(1, Ordering::Relaxed); + conn.clone() + } None => { - let connecting = self.endpoint.connect(self.addr, "connect").unwrap(); - let connection = Arc::new(connecting.await?); + let connection = self.make_connection(stats).await?; *conn_guard = Some(connection.clone()); connection } @@ -167,8 +218,7 @@ impl QuicClient { Ok(()) => Ok(connection), _ => { let connection = { - let connecting = self.endpoint.connect(self.addr, "connect").unwrap(); - let connection = Arc::new(connecting.await?); + let connection = self.make_connection(stats).await?; let mut conn_guard = self.connection.lock().await; *conn_guard = Some(connection.clone()); connection @@ -179,15 +229,19 @@ impl QuicClient { } } - pub async fn send_buffer(&self, data: T) -> Result<(), ClientErrorKind> + pub async fn send_buffer(&self, data: T, stats: &ClientStats) -> Result<(), ClientErrorKind> where T: AsRef<[u8]>, { - self._send_buffer(data.as_ref()).await?; + self._send_buffer(data.as_ref(), stats).await?; Ok(()) } - pub async fn send_batch(&self, buffers: &[T]) -> Result<(), ClientErrorKind> + pub async fn send_batch( + &self, + buffers: &[T], + stats: &ClientStats, + ) -> Result<(), ClientErrorKind> where T: AsRef<[u8]>, { @@ -205,7 +259,7 @@ impl QuicClient { if buffers.is_empty() { return Ok(()); } - let connection = self._send_buffer(buffers[0].as_ref()).await?; + let connection = self._send_buffer(buffers[0].as_ref(), stats).await?; // Used to avoid dereferencing the Arc multiple times below // by just getting a reference to the NewConnection once diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index 0c11e204fe..eb76336c27 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -1,9 +1,26 @@ use { rayon::iter::{IntoParallelIterator, ParallelIterator}, + solana_metrics::MovingStat, solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, - std::net::{SocketAddr, UdpSocket}, + std::{ + net::{SocketAddr, UdpSocket}, + sync::{atomic::AtomicU64, Arc}, + }, }; +#[derive(Default)] +pub struct ClientStats { + pub total_connections: AtomicU64, + pub connection_reuse: AtomicU64, + pub connection_errors: AtomicU64, + + // these will be the last values of these stats + pub congestion_events: MovingStat, + pub tx_streams_blocked_uni: MovingStat, + pub tx_data_blocked: MovingStat, + pub tx_acks: MovingStat, +} + pub trait TpuConnection { fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self; @@ -12,31 +29,45 @@ pub trait TpuConnection { fn serialize_and_send_transaction( &self, transaction: &VersionedTransaction, + stats: &ClientStats, ) -> TransportResult<()> { let wire_transaction = bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(&wire_transaction) + self.send_wire_transaction(&wire_transaction, stats) } - fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> + fn send_wire_transaction( + &self, + wire_transaction: T, + stats: &ClientStats, + ) -> TransportResult<()> where T: AsRef<[u8]>; - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()>; + fn send_wire_transaction_async( + &self, + wire_transaction: Vec, + stats: Arc, + ) -> TransportResult<()>; fn par_serialize_and_send_transaction_batch( &self, transactions: &[VersionedTransaction], + stats: &ClientStats, ) -> TransportResult<()> { let buffers = transactions .into_par_iter() .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) .collect::>(); - self.send_wire_transaction_batch(&buffers) + self.send_wire_transaction_batch(&buffers, stats) } - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> + fn send_wire_transaction_batch( + &self, + buffers: &[T], + stats: &ClientStats, + ) -> TransportResult<()> where T: AsRef<[u8]>; } diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs index c9a2747bff..b13b899f8e 100644 --- a/client/src/udp_client.rs +++ b/client/src/udp_client.rs @@ -2,11 +2,14 @@ //! an interface for sending transactions use { - crate::tpu_connection::TpuConnection, + crate::tpu_connection::{ClientStats, TpuConnection}, core::iter::repeat, solana_sdk::transport::Result as TransportResult, solana_streamer::sendmmsg::batch_send, - std::net::{SocketAddr, UdpSocket}, + std::{ + net::{SocketAddr, UdpSocket}, + sync::Arc, + }, }; pub struct UdpTpuConnection { @@ -26,7 +29,11 @@ impl TpuConnection for UdpTpuConnection { &self.addr } - fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> + fn send_wire_transaction( + &self, + wire_transaction: T, + _stats: &ClientStats, + ) -> TransportResult<()> where T: AsRef<[u8]>, { @@ -34,12 +41,20 @@ impl TpuConnection for UdpTpuConnection { Ok(()) } - fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { + fn send_wire_transaction_async( + &self, + wire_transaction: Vec, + _stats: Arc, + ) -> TransportResult<()> { self.socket.send_to(wire_transaction.as_ref(), self.addr)?; Ok(()) } - fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> + fn send_wire_transaction_batch( + &self, + buffers: &[T], + _stats: &ClientStats, + ) -> TransportResult<()> where T: AsRef<[u8]>, { diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 80e999c755..379351d528 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -4,7 +4,28 @@ pub mod datapoint; mod metrics; pub mod poh_timing_point; pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit}; -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +// To track an external counter which cannot be reset and is always increasing +#[derive(Default)] +pub struct MovingStat { + value: AtomicU64, +} + +impl MovingStat { + pub fn update_stat(&self, old_value: &MovingStat, new_value: u64) { + let old = old_value.value.swap(new_value, Ordering::Acquire); + self.value + .fetch_add(new_value.saturating_sub(old), Ordering::Release); + } + + pub fn load_and_reset(&self) -> u64 { + self.value.swap(0, Ordering::Acquire) + } +} /// A helper that sends the count of created tokens as a datapoint. #[allow(clippy::redundant_allocation)] diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 818d0b23b4..7c27e6cb50 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -3363,6 +3363,7 @@ dependencies = [ "log", "lru", "quinn", + "quinn-proto", "rand 0.7.3", "rand_chacha 0.2.2", "rayon",