Implement forwarding via TpuConnection (#23817) (#23936)

(cherry picked from commit 6b85c2104c)

Co-authored-by: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com>
This commit is contained in:
mergify[bot]
2022-03-28 16:38:44 +02:00
committed by GitHub
parent c66d086db1
commit 5eb085fcaf

View File

@ -14,6 +14,7 @@ use {
histogram::Histogram,
itertools::Itertools,
retain_mut::RetainMut,
solana_client::connection_cache::send_wire_transaction_batch,
solana_entry::entry::hash_transactions,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::blockstore_processor::TransactionStatusSender,
@ -51,8 +52,8 @@ use {
transaction::{
self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction,
},
transport::TransportError,
},
solana_streamer::sendmmsg::{batch_send, SendPktsError},
solana_transaction_status::token_balances::{
collect_token_balances, TransactionTokenBalancesSet,
},
@ -60,7 +61,7 @@ use {
cmp,
collections::HashMap,
env,
net::{SocketAddr, UdpSocket},
net::SocketAddr,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
@ -482,11 +483,10 @@ impl BankingStage {
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
fn forward_buffered_packets(
socket: &std::net::UdpSocket,
tpu_forwards: &std::net::SocketAddr,
packets: Vec<&Packet>,
data_budget: &DataBudget,
) -> (std::io::Result<()>, usize) {
) -> (std::result::Result<(), TransportError>, usize) {
const INTERVAL_MS: u64 = 100;
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
@ -502,18 +502,35 @@ impl BankingStage {
.iter()
.filter_map(|p| {
if !p.meta.forwarded() && data_budget.take(p.meta.size) {
Some((&p.data[..p.meta.size], tpu_forwards))
Some(&p.data[..p.meta.size])
} else {
None
}
})
.collect();
// TODO: see https://github.com/solana-labs/solana/issues/23819
// fix this so returns the correct number of succeeded packets
// when there's an error sending the batch. This was left as-is for now
// in favor of shipping Quic support, which was considered higher-priority
if !packet_vec.is_empty() {
inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec.len());
if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(socket, &packet_vec)
{
return (Err(ioerr), packet_vec.len().saturating_sub(num_failed));
let mut measure = Measure::start("banking_stage-forward-us");
let res = send_wire_transaction_batch(&packet_vec, tpu_forwards);
measure.stop();
inc_new_counter_info!(
"banking_stage-forward-us",
measure.as_us() as usize,
1000,
1000
);
if let Err(err) = res {
inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1);
return (Err(err), 0);
}
}
@ -766,7 +783,6 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
fn process_buffered_packets(
my_pubkey: &Pubkey,
socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
buffered_packet_batches: &mut UnprocessedPacketBatches,
@ -846,7 +862,6 @@ impl BankingStage {
cluster_info,
buffered_packet_batches,
poh_recorder,
socket,
false,
data_budget,
slot_metrics_tracker,
@ -865,7 +880,6 @@ impl BankingStage {
cluster_info,
buffered_packet_batches,
poh_recorder,
socket,
true,
data_budget,
slot_metrics_tracker,
@ -887,7 +901,6 @@ impl BankingStage {
cluster_info: &ClusterInfo,
buffered_packet_batches: &mut UnprocessedPacketBatches,
poh_recorder: &Arc<Mutex<PohRecorder>>,
socket: &UdpSocket,
hold: bool,
data_budget: &DataBudget,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
@ -913,7 +926,7 @@ impl BankingStage {
Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
let forwardable_packets_len = forwardable_packets.len();
let (_forward_result, sucessful_forwarded_packets_count) =
Self::forward_buffered_packets(socket, &addr, forwardable_packets, data_budget);
Self::forward_buffered_packets(&addr, forwardable_packets, data_budget);
let failed_forwarded_packets_count =
forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count);
@ -958,7 +971,6 @@ impl BankingStage {
cost_model: Arc<RwLock<CostModel>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit);
let mut banking_stage_stats = BankingStageStats::new(id);
let qos_service = QosService::new(cost_model, id);
@ -970,7 +982,6 @@ impl BankingStage {
|_| {
Self::process_buffered_packets(
&my_pubkey,
&socket,
poh_recorder,
cluster_info,
&mut buffered_packet_batches,
@ -3835,7 +3846,6 @@ mod tests {
let local_node = Node::new_localhost_with_pubkey(validator_pubkey);
let cluster_info = new_test_cluster_info(local_node.info);
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let recv_socket = &local_node.sockets.tpu_forwards[0];
let test_cases = vec![
@ -3857,7 +3867,6 @@ mod tests {
&cluster_info,
&mut unprocessed_packet_batches,
&poh_recorder,
&send_socket,
true,
&data_budget,
&mut LeaderSlotMetricsTracker::new(0),
@ -3935,7 +3944,6 @@ mod tests {
let local_node = Node::new_localhost_with_pubkey(validator_pubkey);
let cluster_info = new_test_cluster_info(local_node.info);
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let recv_socket = &local_node.sockets.tpu_forwards[0];
let test_cases = vec![
@ -3969,7 +3977,6 @@ mod tests {
&cluster_info,
&mut unprocessed_packet_batches,
&poh_recorder,
&send_socket,
hold,
&DataBudget::default(),
&mut LeaderSlotMetricsTracker::new(0),