Rationalize usage of sendmmsg(2). Skip packets which failed to send and track failures.
(cherry picked from commit ae5ad5cf9b
)
Co-authored-by: Jeff Biseda <jbiseda@gmail.com>
This commit is contained in:
@@ -22,7 +22,10 @@ use solana_poh::poh_recorder::WorkingBankEntry;
|
||||
use solana_runtime::{bank::Bank, bank_forks::BankForks};
|
||||
use solana_sdk::timing::{timestamp, AtomicInterval};
|
||||
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
||||
use solana_streamer::{sendmmsg::send_mmsg, socket::SocketAddrSpace};
|
||||
use solana_streamer::{
|
||||
sendmmsg::{batch_send, SendPktsError},
|
||||
socket::SocketAddrSpace,
|
||||
};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::UdpSocket,
|
||||
@@ -394,10 +397,11 @@ pub fn broadcast_shreds(
|
||||
self_pubkey: Pubkey,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
) -> Result<()> {
|
||||
let mut result = Ok(());
|
||||
let broadcast_len = cluster_nodes.num_peers();
|
||||
if broadcast_len == 0 {
|
||||
update_peer_stats(1, 1, last_datapoint_submit);
|
||||
return Ok(());
|
||||
return result;
|
||||
}
|
||||
let mut shred_select = Measure::start("shred_select");
|
||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||
@@ -407,7 +411,7 @@ pub fn broadcast_shreds(
|
||||
let seed = shred.seed(Some(self_pubkey), &root_bank);
|
||||
let node = cluster_nodes.get_broadcast_peer(seed)?;
|
||||
if socket_addr_space.check(&node.tvu) {
|
||||
Some((&shred.payload, &node.tvu))
|
||||
Some((&shred.payload[..], &node.tvu))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -416,18 +420,14 @@ pub fn broadcast_shreds(
|
||||
shred_select.stop();
|
||||
transmit_stats.shred_select += shred_select.as_us();
|
||||
|
||||
let mut sent = 0;
|
||||
let mut send_mmsg_time = Measure::start("send_mmsg");
|
||||
while sent < packets.len() {
|
||||
match send_mmsg(s, &packets[sent..]) {
|
||||
Ok(n) => sent += n,
|
||||
Err(e) => {
|
||||
return Err(Error::Io(e));
|
||||
}
|
||||
}
|
||||
if let Err(SendPktsError::IoError(ioerr, num_failed)) = batch_send(s, &packets[..]) {
|
||||
transmit_stats.dropped_packets += num_failed;
|
||||
result = Err(Error::Io(ioerr));
|
||||
}
|
||||
send_mmsg_time.stop();
|
||||
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
|
||||
transmit_stats.total_packets += packets.len();
|
||||
|
||||
let num_live_peers = cluster_nodes.num_peers_live(timestamp()) as i64;
|
||||
update_peer_stats(
|
||||
@@ -435,7 +435,7 @@ pub fn broadcast_shreds(
|
||||
broadcast_len as i64 + 1,
|
||||
last_datapoint_submit,
|
||||
);
|
||||
Ok(())
|
||||
result
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@@ -19,6 +19,8 @@ pub struct TransmitShredsStats {
|
||||
pub get_peers_elapsed: u64,
|
||||
pub shred_select: u64,
|
||||
pub num_shreds: usize,
|
||||
pub total_packets: usize,
|
||||
pub dropped_packets: usize,
|
||||
}
|
||||
|
||||
impl BroadcastStats for TransmitShredsStats {
|
||||
@@ -28,6 +30,8 @@ impl BroadcastStats for TransmitShredsStats {
|
||||
self.get_peers_elapsed += new_stats.get_peers_elapsed;
|
||||
self.num_shreds += new_stats.num_shreds;
|
||||
self.shred_select += new_stats.shred_select;
|
||||
self.total_packets += new_stats.total_packets;
|
||||
self.dropped_packets += new_stats.dropped_packets;
|
||||
}
|
||||
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
|
||||
datapoint_info!(
|
||||
@@ -45,6 +49,8 @@ impl BroadcastStats for TransmitShredsStats {
|
||||
("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
|
||||
("num_shreds", self.num_shreds as i64, i64),
|
||||
("shred_select", self.shred_select as i64, i64),
|
||||
("total_packets", self.total_packets as i64, i64),
|
||||
("dropped_packets", self.dropped_packets as i64, i64),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -173,6 +179,8 @@ mod test {
|
||||
send_mmsg_elapsed: 3,
|
||||
shred_select: 4,
|
||||
num_shreds: 5,
|
||||
total_packets: 6,
|
||||
dropped_packets: 7,
|
||||
},
|
||||
&Some(BroadcastShredBatchInfo {
|
||||
slot: 0,
|
||||
@@ -190,14 +198,18 @@ mod test {
|
||||
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3);
|
||||
assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4);
|
||||
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5);
|
||||
assert_eq!(slot_0_stats.broadcast_shred_stats.total_packets, 6);
|
||||
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets, 7);
|
||||
|
||||
slot_broadcast_stats.update(
|
||||
&TransmitShredsStats {
|
||||
transmit_elapsed: 7,
|
||||
get_peers_elapsed: 8,
|
||||
send_mmsg_elapsed: 9,
|
||||
shred_select: 10,
|
||||
num_shreds: 11,
|
||||
transmit_elapsed: 11,
|
||||
get_peers_elapsed: 12,
|
||||
send_mmsg_elapsed: 13,
|
||||
shred_select: 14,
|
||||
num_shreds: 15,
|
||||
total_packets: 16,
|
||||
dropped_packets: 17,
|
||||
},
|
||||
&None,
|
||||
);
|
||||
@@ -211,6 +223,8 @@ mod test {
|
||||
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3);
|
||||
assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4);
|
||||
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5);
|
||||
assert_eq!(slot_0_stats.broadcast_shred_stats.total_packets, 6);
|
||||
assert_eq!(slot_0_stats.broadcast_shred_stats.dropped_packets, 7);
|
||||
|
||||
// If another batch is given, then total number of batches == num_expected_batches == 2,
|
||||
// so the batch should be purged from the HashMap
|
||||
@@ -221,6 +235,8 @@ mod test {
|
||||
send_mmsg_elapsed: 1,
|
||||
shred_select: 1,
|
||||
num_shreds: 1,
|
||||
total_packets: 1,
|
||||
dropped_packets: 1,
|
||||
},
|
||||
&Some(BroadcastShredBatchInfo {
|
||||
slot: 0,
|
||||
|
Reference in New Issue
Block a user