adds metric for turbine retransmit tree mismatch

In order to remove port-based forwarding logic in turbine, we need to
first track how often the turbine retransmit/broadcast trees mismatch
across nodes.
One consistency condition is that if the node is on the critical path
(i.e. the first node in each neighborhood), then we expect that the
packet arrives at tvu socket as opposed to tvu-forwards.

This commit adds a metric to track how often above condition is not met.
This commit is contained in:
behzad nouri
2021-05-20 12:16:36 -04:00
parent 019bccab51
commit 71de021177
3 changed files with 25 additions and 6 deletions

View File

@ -1383,7 +1383,7 @@ impl ClusterInfo {
/// We need to avoid having obj locked while doing a io, such as the `send_to` /// We need to avoid having obj locked while doing a io, such as the `send_to`
pub fn retransmit_to( pub fn retransmit_to(
peers: &[&ContactInfo], peers: &[&ContactInfo],
packet: &mut Packet, packet: &Packet,
s: &UdpSocket, s: &UdpSocket,
forwarded: bool, forwarded: bool,
) -> Result<()> { ) -> Result<()> {

View File

@ -65,6 +65,7 @@ struct RetransmitStats {
retransmit_total: AtomicU64, retransmit_total: AtomicU64,
last_ts: AtomicU64, last_ts: AtomicU64,
compute_turbine_peers_total: AtomicU64, compute_turbine_peers_total: AtomicU64,
retransmit_tree_mismatch: AtomicU64,
packets_by_slot: Mutex<BTreeMap<Slot, usize>>, packets_by_slot: Mutex<BTreeMap<Slot, usize>>,
packets_by_source: Mutex<BTreeMap<String, usize>>, packets_by_source: Mutex<BTreeMap<String, usize>>,
} }
@ -83,6 +84,7 @@ fn update_retransmit_stats(
packets_by_source: HashMap<String, usize>, packets_by_source: HashMap<String, usize>,
epoch_fetch: u64, epoch_fetch: u64,
epoch_cach_update: u64, epoch_cach_update: u64,
retransmit_tree_mismatch: u64,
) { ) {
stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats.total_time.fetch_add(total_time, Ordering::Relaxed);
stats stats
@ -105,6 +107,9 @@ fn update_retransmit_stats(
stats stats
.epoch_cache_update .epoch_cache_update
.fetch_add(epoch_cach_update, Ordering::Relaxed); .fetch_add(epoch_cach_update, Ordering::Relaxed);
stats
.retransmit_tree_mismatch
.fetch_add(retransmit_tree_mismatch, Ordering::Relaxed);
{ {
let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap();
for (slot, count) in packets_by_slot { for (slot, count) in packets_by_slot {
@ -157,6 +162,11 @@ fn update_retransmit_stats(
stats.retransmit_total.swap(0, Ordering::Relaxed) as i64, stats.retransmit_total.swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"retransmit_tree_mismatch",
stats.retransmit_tree_mismatch.swap(0, Ordering::Relaxed) as i64,
i64
),
( (
"compute_turbine", "compute_turbine",
stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64,
@ -315,9 +325,9 @@ fn retransmit(
first_shreds_received: &Mutex<BTreeSet<Slot>>, first_shreds_received: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: &Option<Arc<RpcSubscriptions>>, rpc_subscriptions: &Option<Arc<RpcSubscriptions>>,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let r_lock = r.lock().unwrap(); let r_lock = r.lock().unwrap();
let packets = r_lock.recv_timeout(timer)?; let packets = r_lock.recv_timeout(RECV_TIMEOUT)?;
let mut timer_start = Measure::start("retransmit"); let mut timer_start = Measure::start("retransmit");
let mut total_packets = packets.packets.len(); let mut total_packets = packets.packets.len();
let mut packet_v = vec![packets]; let mut packet_v = vec![packets];
@ -368,11 +378,12 @@ fn retransmit(
let mut repair_total = 0; let mut repair_total = 0;
let mut retransmit_total = 0; let mut retransmit_total = 0;
let mut compute_turbine_peers_total = 0; let mut compute_turbine_peers_total = 0;
let mut retransmit_tree_mismatch = 0;
let mut packets_by_slot: HashMap<Slot, usize> = HashMap::new(); let mut packets_by_slot: HashMap<Slot, usize> = HashMap::new();
let mut packets_by_source: HashMap<String, usize> = HashMap::new(); let mut packets_by_source: HashMap<String, usize> = HashMap::new();
let mut max_slot = 0; let mut max_slot = 0;
for mut packets in packet_v { for packets in packet_v {
for packet in packets.packets.iter_mut() { for packet in packets.packets.iter() {
// skip discarded packets and repair packets // skip discarded packets and repair packets
if packet.meta.discard { if packet.meta.discard {
total_packets -= 1; total_packets -= 1;
@ -413,6 +424,13 @@ fn retransmit(
&stakes_and_index, &stakes_and_index,
packet.meta.seed, packet.meta.seed,
); );
// If the node is on the critical path (i.e. the first node in each
// neighborhood), then we expect that the packet arrives at tvu
// socket as opposed to tvu-forwards. If this is not the case, then
// the turbine broadcast/retransmit tree mismatch across nodes.
if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) {
retransmit_tree_mismatch += 1;
}
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
// split off the indexes, we don't need the stakes anymore // split off the indexes, we don't need the stakes anymore
let indexes: Vec<_> = shuffled_stakes_and_index let indexes: Vec<_> = shuffled_stakes_and_index
@ -476,6 +494,7 @@ fn retransmit(
packets_by_source, packets_by_source,
epoch_fetch.as_us(), epoch_fetch.as_us(),
epoch_cache_update.as_us(), epoch_cache_update.as_us(),
retransmit_tree_mismatch,
); );
Ok(()) Ok(())

View File

@ -204,7 +204,7 @@ pub fn cluster_info_retransmit() {
p.meta.size = 10; p.meta.size = 10;
let peers = c1.tvu_peers(); let peers = c1.tvu_peers();
let retransmit_peers: Vec<_> = peers.iter().collect(); let retransmit_peers: Vec<_> = peers.iter().collect();
ClusterInfo::retransmit_to(&retransmit_peers, &mut p, &tn1, false).unwrap(); ClusterInfo::retransmit_to(&retransmit_peers, &p, &tn1, false).unwrap();
let res: Vec<_> = [tn1, tn2, tn3] let res: Vec<_> = [tn1, tn2, tn3]
.into_par_iter() .into_par_iter()
.map(|s| { .map(|s| {