From 71de02117752013a1c544cf194bd443dcd550c0f Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 20 May 2021 12:16:36 -0400 Subject: [PATCH] adds metric for turbine retransmit tree mismatch In order to remove port-based forwarding logic in turbine, we need to first track how often the turbine retransmit/broadcast trees mismatch across nodes. One consistency condition is that if the node is on the critical path (i.e. the first node in each neighborhood), then we expect that the packet arrives at tvu socket as opposed to tvu-forwards. This commit adds a metric to track how often above condition is not met. --- core/src/cluster_info.rs | 2 +- core/src/retransmit_stage.rs | 27 +++++++++++++++++++++++---- core/tests/gossip.rs | 2 +- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e47c81f75f..75d54a8f1d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1383,7 +1383,7 @@ impl ClusterInfo { /// We need to avoid having obj locked while doing a io, such as the `send_to` pub fn retransmit_to( peers: &[&ContactInfo], - packet: &mut Packet, + packet: &Packet, s: &UdpSocket, forwarded: bool, ) -> Result<()> { diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index f7716c2370..8f09cbdbec 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -65,6 +65,7 @@ struct RetransmitStats { retransmit_total: AtomicU64, last_ts: AtomicU64, compute_turbine_peers_total: AtomicU64, + retransmit_tree_mismatch: AtomicU64, packets_by_slot: Mutex>, packets_by_source: Mutex>, } @@ -83,6 +84,7 @@ fn update_retransmit_stats( packets_by_source: HashMap, epoch_fetch: u64, epoch_cach_update: u64, + retransmit_tree_mismatch: u64, ) { stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats @@ -105,6 +107,9 @@ fn update_retransmit_stats( stats .epoch_cache_update .fetch_add(epoch_cach_update, Ordering::Relaxed); + stats + .retransmit_tree_mismatch + .fetch_add(retransmit_tree_mismatch, Ordering::Relaxed); { let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); for (slot, count) in packets_by_slot { @@ -157,6 +162,11 @@ fn update_retransmit_stats( stats.retransmit_total.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "retransmit_tree_mismatch", + stats.retransmit_tree_mismatch.swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "compute_turbine", stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, @@ -315,9 +325,9 @@ fn retransmit( first_shreds_received: &Mutex>, rpc_subscriptions: &Option>, ) -> Result<()> { - let timer = Duration::new(1, 0); + const RECV_TIMEOUT: Duration = Duration::from_secs(1); let r_lock = r.lock().unwrap(); - let packets = r_lock.recv_timeout(timer)?; + let packets = r_lock.recv_timeout(RECV_TIMEOUT)?; let mut timer_start = Measure::start("retransmit"); let mut total_packets = packets.packets.len(); let mut packet_v = vec![packets]; @@ -368,11 +378,12 @@ fn retransmit( let mut repair_total = 0; let mut retransmit_total = 0; let mut compute_turbine_peers_total = 0; + let mut retransmit_tree_mismatch = 0; let mut packets_by_slot: HashMap = HashMap::new(); let mut packets_by_source: HashMap = HashMap::new(); let mut max_slot = 0; - for mut packets in packet_v { - for packet in packets.packets.iter_mut() { + for packets in packet_v { + for packet in packets.packets.iter() { // skip discarded packets and repair packets if packet.meta.discard { total_packets -= 1; @@ -413,6 +424,13 @@ fn retransmit( &stakes_and_index, packet.meta.seed, ); + // If the node is on the critical path (i.e. the first node in each + // neighborhood), then we expect that the packet arrives at tvu + // socket as opposed to tvu-forwards. If this is not the case, then + // the turbine broadcast/retransmit tree mismatch across nodes. + if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) { + retransmit_tree_mismatch += 1; + } peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); // split off the indexes, we don't need the stakes anymore let indexes: Vec<_> = shuffled_stakes_and_index @@ -476,6 +494,7 @@ fn retransmit( packets_by_source, epoch_fetch.as_us(), epoch_cache_update.as_us(), + retransmit_tree_mismatch, ); Ok(()) diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index ad036109ad..b66b034cb8 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -204,7 +204,7 @@ pub fn cluster_info_retransmit() { p.meta.size = 10; let peers = c1.tvu_peers(); let retransmit_peers: Vec<_> = peers.iter().collect(); - ClusterInfo::retransmit_to(&retransmit_peers, &mut p, &tn1, false).unwrap(); + ClusterInfo::retransmit_to(&retransmit_peers, &p, &tn1, false).unwrap(); let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| {