adds metrics for number of outgoing shreds in retransmit stage (#20882)

(cherry picked from commit 5e1cf39c74)

# Conflicts:
#	core/src/retransmit_stage.rs
This commit is contained in:
behzad nouri
2021-10-24 13:12:27 +00:00
committed by Michael Vines
parent d20cccc26b
commit 55a1f03eee

View File

@ -28,9 +28,9 @@ use {
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp},
std::{
collections::{BTreeSet, HashSet},
collections::{BTreeSet, HashMap, HashSet},
net::UdpSocket,
ops::DerefMut,
ops::{AddAssign, DerefMut},
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
mpsc::{self, channel, RecvTimeoutError},
@ -47,9 +47,25 @@ const DEFAULT_LRU_SIZE: usize = 10_000;
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
#[derive(Default)]
struct RetransmitSlotStats {
num_shreds: usize,
num_nodes: usize,
}
impl AddAssign for RetransmitSlotStats {
fn add_assign(&mut self, other: Self) {
*self = Self {
num_shreds: self.num_shreds + other.num_shreds,
num_nodes: self.num_nodes + other.num_nodes,
}
}
}
#[derive(Default)]
struct RetransmitStats {
since: Option<Instant>,
num_nodes: AtomicUsize,
num_shreds: usize,
num_shreds_skipped: AtomicUsize,
total_batches: usize,
@ -58,6 +74,7 @@ struct RetransmitStats {
epoch_cache_update: u64,
retransmit_total: AtomicU64,
compute_turbine_peers_total: AtomicU64,
slot_stats: HashMap<Slot, RetransmitSlotStats>,
unknown_shred_slot_leader: AtomicUsize,
}
@ -91,6 +108,7 @@ impl RetransmitStats {
("epoch_fetch", stats.epoch_fetch, i64),
("epoch_cache_update", stats.epoch_cache_update, i64),
("total_batches", stats.total_batches, i64),
("num_nodes", stats.num_nodes.into_inner(), i64),
("num_shreds", stats.num_shreds, i64),
(
"num_shreds_skipped",
@ -109,6 +127,14 @@ impl RetransmitStats {
i64
),
);
for (slot, stats) in stats.slot_stats {
datapoint_info!(
"retransmit-stage-slot-stats",
("slot", slot, i64),
("num_shreds", stats.num_shreds, i64),
("num_nodes", stats.num_nodes, i64),
);
}
}
}
@ -216,10 +242,10 @@ fn retransmit(
let my_id = cluster_info.id();
let socket_addr_space = cluster_info.socket_addr_space();
let retransmit_shred = |shred: Shred, socket: &UdpSocket| {
if should_skip_retransmit(&shred, shreds_received) {
let retransmit_shred = |shred: &Shred, socket: &UdpSocket| {
if should_skip_retransmit(shred, shreds_received) {
stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed);
return;
return 0;
}
let shred_slot = shred.slot();
max_slots
@ -247,21 +273,30 @@ fn retransmit(
stats
.unknown_shred_slot_leader
.fetch_add(1, Ordering::Relaxed);
return;
return 0;
}
};
let cluster_nodes =
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
<<<<<<< HEAD
let shred_seed = shred.seed(slot_leader, &root_bank);
let (neighbors, children) =
cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader);
let anchor_node = neighbors[0].id == my_id;
=======
let addrs: Vec<_> = cluster_nodes
.get_retransmit_addrs(slot_leader, shred, &root_bank, DATA_PLANE_FANOUT)
.into_iter()
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect();
>>>>>>> 5e1cf39c7 (adds metrics for number of outgoing shreds in retransmit stage (#20882))
compute_turbine_peers.stop();
stats
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);
let mut retransmit_time = Measure::start("retransmit_to");
<<<<<<< HEAD
// If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its
// children and also tvu_forward socket of its neighbors. Otherwise it
@ -283,18 +318,64 @@ fn retransmit(
!anchor_node, // send to forward socket!
socket_addr_space,
);
=======
let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) {
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
inc_new_counter_info!("cluster_info-retransmit-packets", addrs.len(), 1);
inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1);
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
addrs.len(),
);
addrs.len() - num_failed
}
};
>>>>>>> 5e1cf39c7 (adds metrics for number of outgoing shreds in retransmit stage (#20882))
retransmit_time.stop();
stats.num_nodes.fetch_add(num_nodes, Ordering::Relaxed);
stats
.retransmit_total
.fetch_add(retransmit_time.as_us(), Ordering::Relaxed);
num_nodes
};
thread_pool.install(|| {
shreds.into_par_iter().with_min_len(4).for_each(|shred| {
let index = thread_pool.current_thread_index().unwrap();
let socket = &sockets[index % sockets.len()];
retransmit_shred(shred, socket);
});
fn merge<K, V>(mut acc: HashMap<K, V>, other: HashMap<K, V>) -> HashMap<K, V>
where
K: Eq + std::hash::Hash,
V: Default + AddAssign,
{
if acc.len() < other.len() {
return merge(other, acc);
}
for (key, value) in other {
*acc.entry(key).or_default() += value;
}
acc
}
let slot_stats = thread_pool.install(|| {
shreds
.into_par_iter()
.with_min_len(4)
.map(|shred| {
let index = thread_pool.current_thread_index().unwrap();
let socket = &sockets[index % sockets.len()];
let num_nodes = retransmit_shred(&shred, socket);
(shred.slot(), num_nodes)
})
.fold(
HashMap::<Slot, RetransmitSlotStats>::new,
|mut acc, (slot, num_nodes)| {
let stats = acc.entry(slot).or_default();
stats.num_nodes += num_nodes;
stats.num_shreds += 1;
acc
},
)
.reduce(HashMap::new, merge)
});
stats.slot_stats = merge(std::mem::take(&mut stats.slot_stats), slot_stats);
timer_start.stop();
stats.total_time += timer_start.as_us();
stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache);