From 5eb085fcafe75bd41c3a7a2a6435dcce9407b2ec Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 28 Mar 2022 16:38:44 +0200 Subject: [PATCH] Implement forwarding via TpuConnection (#23817) (#23936) (cherry picked from commit 6b85c2104cb95881234546131a1086594844c0b1) Co-authored-by: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> --- core/src/banking_stage.rs | 45 ++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 94174821a2..528e022dfe 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -14,6 +14,7 @@ use { histogram::Histogram, itertools::Itertools, retain_mut::RetainMut, + solana_client::connection_cache::send_wire_transaction_batch, solana_entry::entry::hash_transactions, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::blockstore_processor::TransactionStatusSender, @@ -51,8 +52,8 @@ use { transaction::{ self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction, }, + transport::TransportError, }, - solana_streamer::sendmmsg::{batch_send, SendPktsError}, solana_transaction_status::token_balances::{ collect_token_balances, TransactionTokenBalancesSet, }, @@ -60,7 +61,7 @@ use { cmp, collections::HashMap, env, - net::{SocketAddr, UdpSocket}, + net::SocketAddr, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, RwLock, @@ -482,11 +483,10 @@ impl BankingStage { /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns /// the number of successfully forwarded packets in second part of tuple fn forward_buffered_packets( - socket: &std::net::UdpSocket, tpu_forwards: &std::net::SocketAddr, packets: Vec<&Packet>, data_budget: &DataBudget, - ) -> (std::io::Result<()>, usize) { + ) -> (std::result::Result<(), TransportError>, usize) { const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; @@ -502,18 +502,35 @@ impl BankingStage { .iter() .filter_map(|p| { if !p.meta.forwarded() && data_budget.take(p.meta.size) { - Some((&p.data[..p.meta.size], tpu_forwards)) + Some(&p.data[..p.meta.size]) } else { None } }) .collect(); + // TODO: see https://github.com/solana-labs/solana/issues/23819 + // fix this so returns the correct number of succeeded packets + // when there's an error sending the batch. This was left as-is for now + // in favor of shipping Quic support, which was considered higher-priority if !packet_vec.is_empty() { inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec.len()); - if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(socket, &packet_vec) - { - return (Err(ioerr), packet_vec.len().saturating_sub(num_failed)); + + let mut measure = Measure::start("banking_stage-forward-us"); + + let res = send_wire_transaction_batch(&packet_vec, tpu_forwards); + + measure.stop(); + inc_new_counter_info!( + "banking_stage-forward-us", + measure.as_us() as usize, + 1000, + 1000 + ); + + if let Err(err) = res { + inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1); + return (Err(err), 0); } } @@ -766,7 +783,6 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn process_buffered_packets( my_pubkey: &Pubkey, - socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &ClusterInfo, buffered_packet_batches: &mut UnprocessedPacketBatches, @@ -846,7 +862,6 @@ impl BankingStage { cluster_info, buffered_packet_batches, poh_recorder, - socket, false, data_budget, slot_metrics_tracker, @@ -865,7 +880,6 @@ impl BankingStage { cluster_info, buffered_packet_batches, poh_recorder, - socket, true, data_budget, slot_metrics_tracker, @@ -887,7 +901,6 @@ impl BankingStage { cluster_info: &ClusterInfo, buffered_packet_batches: &mut UnprocessedPacketBatches, poh_recorder: &Arc>, - socket: &UdpSocket, hold: bool, data_budget: &DataBudget, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, @@ -913,7 +926,7 @@ impl BankingStage { Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); let forwardable_packets_len = forwardable_packets.len(); let (_forward_result, sucessful_forwarded_packets_count) = - Self::forward_buffered_packets(socket, &addr, forwardable_packets, data_budget); + Self::forward_buffered_packets(&addr, forwardable_packets, data_budget); let failed_forwarded_packets_count = forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count); @@ -958,7 +971,6 @@ impl BankingStage { cost_model: Arc>, ) { let recorder = poh_recorder.lock().unwrap().recorder(); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); let qos_service = QosService::new(cost_model, id); @@ -970,7 +982,6 @@ impl BankingStage { |_| { Self::process_buffered_packets( &my_pubkey, - &socket, poh_recorder, cluster_info, &mut buffered_packet_batches, @@ -3835,7 +3846,6 @@ mod tests { let local_node = Node::new_localhost_with_pubkey(validator_pubkey); let cluster_info = new_test_cluster_info(local_node.info); - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let recv_socket = &local_node.sockets.tpu_forwards[0]; let test_cases = vec![ @@ -3857,7 +3867,6 @@ mod tests { &cluster_info, &mut unprocessed_packet_batches, &poh_recorder, - &send_socket, true, &data_budget, &mut LeaderSlotMetricsTracker::new(0), @@ -3935,7 +3944,6 @@ mod tests { let local_node = Node::new_localhost_with_pubkey(validator_pubkey); let cluster_info = new_test_cluster_info(local_node.info); - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let recv_socket = &local_node.sockets.tpu_forwards[0]; let test_cases = vec![ @@ -3969,7 +3977,6 @@ mod tests { &cluster_info, &mut unprocessed_packet_batches, &poh_recorder, - &send_socket, hold, &DataBudget::default(), &mut LeaderSlotMetricsTracker::new(0),