* adds metric for turbine retransmit tree mismatch In order to remove port-based forwarding logic in turbine, we need to first track how often the turbine retransmit/broadcast trees mismatch across nodes. One consistency condition is that if the node is on the critical path (i.e. the first node in each neighborhood), then we expect that the packet arrives at tvu socket as opposed to tvu-forwards. This commit adds a metric to track how often above condition is not met. (cherry picked from commit71de021177
) * removes the nested for loop from retransmit-stage The code can be simplified by just flattening the vector of packets. (cherry picked from commitff0e623d30
) Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -1383,7 +1383,7 @@ impl ClusterInfo {
|
||||
/// We need to avoid having obj locked while doing a io, such as the `send_to`
|
||||
pub fn retransmit_to(
|
||||
peers: &[&ContactInfo],
|
||||
packet: &mut Packet,
|
||||
packet: &Packet,
|
||||
s: &UdpSocket,
|
||||
forwarded: bool,
|
||||
) -> Result<()> {
|
||||
|
@ -64,6 +64,7 @@ struct RetransmitStats {
|
||||
retransmit_total: AtomicU64,
|
||||
last_ts: AtomicU64,
|
||||
compute_turbine_peers_total: AtomicU64,
|
||||
retransmit_tree_mismatch: AtomicU64,
|
||||
packets_by_slot: Mutex<BTreeMap<Slot, usize>>,
|
||||
packets_by_source: Mutex<BTreeMap<String, usize>>,
|
||||
}
|
||||
@ -82,6 +83,7 @@ fn update_retransmit_stats(
|
||||
packets_by_source: HashMap<String, usize>,
|
||||
epoch_fetch: u64,
|
||||
epoch_cach_update: u64,
|
||||
retransmit_tree_mismatch: u64,
|
||||
) {
|
||||
stats.total_time.fetch_add(total_time, Ordering::Relaxed);
|
||||
stats
|
||||
@ -104,6 +106,9 @@ fn update_retransmit_stats(
|
||||
stats
|
||||
.epoch_cache_update
|
||||
.fetch_add(epoch_cach_update, Ordering::Relaxed);
|
||||
stats
|
||||
.retransmit_tree_mismatch
|
||||
.fetch_add(retransmit_tree_mismatch, Ordering::Relaxed);
|
||||
{
|
||||
let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap();
|
||||
for (slot, count) in packets_by_slot {
|
||||
@ -156,6 +161,11 @@ fn update_retransmit_stats(
|
||||
stats.retransmit_total.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"retransmit_tree_mismatch",
|
||||
stats.retransmit_tree_mismatch.swap(0, Ordering::Relaxed) as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"compute_turbine",
|
||||
stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64,
|
||||
@ -314,15 +324,15 @@ fn retransmit(
|
||||
first_shreds_received: &Mutex<BTreeSet<Slot>>,
|
||||
rpc_subscriptions: &Option<Arc<RpcSubscriptions>>,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
let r_lock = r.lock().unwrap();
|
||||
let packets = r_lock.recv_timeout(timer)?;
|
||||
let packets = r_lock.recv_timeout(RECV_TIMEOUT)?;
|
||||
let mut timer_start = Measure::start("retransmit");
|
||||
let mut total_packets = packets.packets.len();
|
||||
let mut packet_v = vec![packets];
|
||||
let mut packets = vec![packets];
|
||||
while let Ok(nq) = r_lock.try_recv() {
|
||||
total_packets += nq.packets.len();
|
||||
packet_v.push(nq);
|
||||
packets.push(nq);
|
||||
if total_packets >= MAX_PACKET_BATCH_SIZE {
|
||||
break;
|
||||
}
|
||||
@ -367,91 +377,97 @@ fn retransmit(
|
||||
let mut repair_total = 0;
|
||||
let mut retransmit_total = 0;
|
||||
let mut compute_turbine_peers_total = 0;
|
||||
let mut retransmit_tree_mismatch = 0;
|
||||
let mut packets_by_slot: HashMap<Slot, usize> = HashMap::new();
|
||||
let mut packets_by_source: HashMap<String, usize> = HashMap::new();
|
||||
let mut max_slot = 0;
|
||||
for mut packets in packet_v {
|
||||
for packet in packets.packets.iter_mut() {
|
||||
// skip discarded packets and repair packets
|
||||
if packet.meta.discard {
|
||||
total_packets -= 1;
|
||||
discard_total += 1;
|
||||
continue;
|
||||
}
|
||||
if packet.meta.repair {
|
||||
total_packets -= 1;
|
||||
repair_total += 1;
|
||||
continue;
|
||||
}
|
||||
let shred_slot = match check_if_already_received(packet, shreds_received) {
|
||||
Some(slot) => slot,
|
||||
None => continue,
|
||||
};
|
||||
max_slot = max_slot.max(shred_slot);
|
||||
|
||||
if let Some(rpc_subscriptions) = rpc_subscriptions {
|
||||
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
|
||||
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
|
||||
slot: shred_slot,
|
||||
timestamp: timestamp(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||
let stakes_and_index = get_retransmit_peers(
|
||||
my_id,
|
||||
shred_slot,
|
||||
leader_schedule_cache,
|
||||
r_bank.deref(),
|
||||
r_epoch_stakes_cache.deref(),
|
||||
);
|
||||
let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
||||
&my_id,
|
||||
&r_epoch_stakes_cache.peers,
|
||||
&stakes_and_index,
|
||||
packet.meta.seed,
|
||||
);
|
||||
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
|
||||
// split off the indexes, we don't need the stakes anymore
|
||||
let indexes: Vec<_> = shuffled_stakes_and_index
|
||||
.into_iter()
|
||||
.map(|(_, index)| index)
|
||||
.collect();
|
||||
|
||||
let (neighbors, children) =
|
||||
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes);
|
||||
let neighbors: Vec<_> = neighbors
|
||||
.into_iter()
|
||||
.filter_map(|index| {
|
||||
let peer = &r_epoch_stakes_cache.peers[index];
|
||||
if peer.id == my_id {
|
||||
None
|
||||
} else {
|
||||
Some(peer)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let children: Vec<_> = children
|
||||
.into_iter()
|
||||
.map(|index| &r_epoch_stakes_cache.peers[index])
|
||||
.collect();
|
||||
compute_turbine_peers.stop();
|
||||
compute_turbine_peers_total += compute_turbine_peers.as_us();
|
||||
|
||||
*packets_by_slot.entry(packet.meta.slot).or_insert(0) += 1;
|
||||
*packets_by_source
|
||||
.entry(packet.meta.addr().to_string())
|
||||
.or_insert(0) += 1;
|
||||
|
||||
let mut retransmit_time = Measure::start("retransmit_to");
|
||||
if !packet.meta.forward {
|
||||
ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?;
|
||||
}
|
||||
ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?;
|
||||
retransmit_time.stop();
|
||||
retransmit_total += retransmit_time.as_us();
|
||||
for packet in packets.iter().flat_map(|p| p.packets.iter()) {
|
||||
// skip discarded packets and repair packets
|
||||
if packet.meta.discard {
|
||||
total_packets -= 1;
|
||||
discard_total += 1;
|
||||
continue;
|
||||
}
|
||||
if packet.meta.repair {
|
||||
total_packets -= 1;
|
||||
repair_total += 1;
|
||||
continue;
|
||||
}
|
||||
let shred_slot = match check_if_already_received(packet, shreds_received) {
|
||||
Some(slot) => slot,
|
||||
None => continue,
|
||||
};
|
||||
max_slot = max_slot.max(shred_slot);
|
||||
|
||||
if let Some(rpc_subscriptions) = rpc_subscriptions {
|
||||
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
|
||||
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
|
||||
slot: shred_slot,
|
||||
timestamp: timestamp(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||
let stakes_and_index = get_retransmit_peers(
|
||||
my_id,
|
||||
shred_slot,
|
||||
leader_schedule_cache,
|
||||
r_bank.deref(),
|
||||
r_epoch_stakes_cache.deref(),
|
||||
);
|
||||
let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
||||
&my_id,
|
||||
&r_epoch_stakes_cache.peers,
|
||||
&stakes_and_index,
|
||||
packet.meta.seed,
|
||||
);
|
||||
// If the node is on the critical path (i.e. the first node in each
|
||||
// neighborhood), then we expect that the packet arrives at tvu socket
|
||||
// as opposed to tvu-forwards. If this is not the case, then the
|
||||
// turbine broadcast/retransmit tree is mismatched across nodes.
|
||||
if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) {
|
||||
retransmit_tree_mismatch += 1;
|
||||
}
|
||||
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
|
||||
// split off the indexes, we don't need the stakes anymore
|
||||
let indexes: Vec<_> = shuffled_stakes_and_index
|
||||
.into_iter()
|
||||
.map(|(_, index)| index)
|
||||
.collect();
|
||||
debug_assert_eq!(my_id, r_epoch_stakes_cache.peers[indexes[my_index]].id);
|
||||
|
||||
let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes);
|
||||
let neighbors: Vec<_> = neighbors
|
||||
.into_iter()
|
||||
.filter_map(|index| {
|
||||
let peer = &r_epoch_stakes_cache.peers[index];
|
||||
if peer.id == my_id {
|
||||
None
|
||||
} else {
|
||||
Some(peer)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let children: Vec<_> = children
|
||||
.into_iter()
|
||||
.map(|index| &r_epoch_stakes_cache.peers[index])
|
||||
.collect();
|
||||
compute_turbine_peers.stop();
|
||||
compute_turbine_peers_total += compute_turbine_peers.as_us();
|
||||
|
||||
*packets_by_slot.entry(packet.meta.slot).or_default() += 1;
|
||||
*packets_by_source
|
||||
.entry(packet.meta.addr().to_string())
|
||||
.or_default() += 1;
|
||||
|
||||
let mut retransmit_time = Measure::start("retransmit_to");
|
||||
if !packet.meta.forward {
|
||||
ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?;
|
||||
}
|
||||
ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?;
|
||||
retransmit_time.stop();
|
||||
retransmit_total += retransmit_time.as_us();
|
||||
}
|
||||
max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed);
|
||||
timer_start.stop();
|
||||
@ -475,6 +491,7 @@ fn retransmit(
|
||||
packets_by_source,
|
||||
epoch_fetch.as_us(),
|
||||
epoch_cache_update.as_us(),
|
||||
retransmit_tree_mismatch,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
@ -204,7 +204,7 @@ pub fn cluster_info_retransmit() {
|
||||
p.meta.size = 10;
|
||||
let peers = c1.tvu_peers();
|
||||
let retransmit_peers: Vec<_> = peers.iter().collect();
|
||||
ClusterInfo::retransmit_to(&retransmit_peers, &mut p, &tn1, false).unwrap();
|
||||
ClusterInfo::retransmit_to(&retransmit_peers, &p, &tn1, false).unwrap();
|
||||
let res: Vec<_> = [tn1, tn2, tn3]
|
||||
.into_par_iter()
|
||||
.map(|s| {
|
||||
|
Reference in New Issue
Block a user