From 982afa87a64e02e7dc02486d65ae3e9100280745 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 14 Aug 2018 21:51:37 -0700 Subject: [PATCH] Retransmit blobs from leader from window (#975) - Some nodes don't have leader information while leader is broadcasting blobs to those nodes. Such blobs are not retransmitted. This change rertansmits the blobs once the leader's identity is know. --- src/erasure.rs | 3 +- src/window.rs | 258 ++++++++++++++++++++++++++++++++++++++++----- tests/multinode.rs | 2 +- 3 files changed, 234 insertions(+), 29 deletions(-) mode change 100755 => 100644 tests/multinode.rs diff --git a/src/erasure.rs b/src/erasure.rs index 4a953faf61..9fd78d1e8c 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -713,7 +713,8 @@ mod test { let mut window = vec![ WindowSlot { data: None, - coding: None + coding: None, + leader_unknown: false, }; WINDOW_SIZE ]; diff --git a/src/window.rs b/src/window.rs index 69834ce918..fbf5f0d837 100644 --- a/src/window.rs +++ b/src/window.rs @@ -9,6 +9,7 @@ use ledger::Block; use log::Level; use packet::{BlobRecycler, SharedBlob, SharedBlobs, BLOB_SIZE}; use result::{Error, Result}; +use signature::Pubkey; use std::cmp; use std::collections::VecDeque; use std::mem; @@ -27,6 +28,7 @@ pub const WINDOW_SIZE: u64 = 2 * 1024; pub struct WindowSlot { pub data: Option, pub coding: Option, + pub leader_unknown: bool, } pub type SharedWindow = Arc>>; @@ -143,6 +145,49 @@ fn repair_window( Ok(()) } +fn add_block_to_retransmit_queue( + b: &SharedBlob, + leader_id: Pubkey, + recycler: &BlobRecycler, + retransmit_queue: &mut VecDeque, +) { + let p = b + .read() + .expect("'b' read lock in fn add_block_to_retransmit_queue"); + //TODO this check isn't safe against adverserial packets + //we need to maintain a sequence window + trace!( + "idx: {} addr: {:?} id: {:?} leader: {:?}", + p.get_index() + .expect("get_index in fn add_block_to_retransmit_queue"), + p.get_id() + .expect("get_id in trace! fn add_block_to_retransmit_queue"), + p.meta.addr(), + leader_id + ); + if p.get_id() + .expect("get_id in fn add_block_to_retransmit_queue") == leader_id + { + //TODO + //need to copy the retransmitted blob + //otherwise we get into races with which thread + //should do the recycling + // + //a better abstraction would be to recycle when the blob + //is dropped via a weakref to the recycler + let nv = recycler.allocate(); + { + let mut mnv = nv + .write() + .expect("recycler write lock in fn add_block_to_retransmit_queue"); + let sz = p.meta.size; + mnv.meta.size = sz; + mnv.data[..sz].copy_from_slice(&p.data[..sz]); + } + retransmit_queue.push_back(nv); + } +} + fn retransmit_all_leader_blocks( maybe_leader: Option, dq: &mut SharedBlobs, @@ -151,37 +196,34 @@ fn retransmit_all_leader_blocks( consumed: u64, received: u64, retransmit: &BlobSender, + window: &SharedWindow, + pending_retransmits: &mut bool, ) -> Result<()> { - let mut retransmit_queue = VecDeque::new(); + let mut retransmit_queue: VecDeque = VecDeque::new(); if let Some(leader) = maybe_leader { + let leader_id = leader.id; for b in dq { - let p = b.read().expect("'b' read lock in fn recv_window"); - //TODO this check isn't safe against adverserial packets - //we need to maintain a sequence window - let leader_id = leader.id; - trace!( - "idx: {} addr: {:?} id: {:?} leader: {:?}", - p.get_index().expect("get_index in fn recv_window"), - p.get_id().expect("get_id in trace! fn recv_window"), - p.meta.addr(), - leader_id - ); - if p.get_id().expect("get_id in fn recv_window") == leader_id { - //TODO - //need to copy the retransmitted blob - //otherwise we get into races with which thread - //should do the recycling - // - //a better abstraction would be to recycle when the blob - //is dropped via a weakref to the recycler - let nv = recycler.allocate(); - { - let mut mnv = nv.write().expect("recycler write lock in fn recv_window"); - let sz = p.meta.size; - mnv.meta.size = sz; - mnv.data[..sz].copy_from_slice(&p.data[..sz]); + add_block_to_retransmit_queue(b, leader_id, recycler, &mut retransmit_queue); + } + + if *pending_retransmits { + for w in window + .write() + .expect("Window write failed in retransmit_all_leader_blocks") + .iter_mut() + { + *pending_retransmits = false; + if w.leader_unknown { + if let Some(b) = w.clone().data { + add_block_to_retransmit_queue( + &b, + leader_id, + recycler, + &mut retransmit_queue, + ); + w.leader_unknown = false; + } } - retransmit_queue.push_back(nv); } } } else { @@ -223,6 +265,8 @@ fn process_blob( window: &SharedWindow, recycler: &BlobRecycler, consumed: &mut u64, + leader_unknown: bool, + pending_retransmits: &mut bool, ) { let mut window = window.write().unwrap(); let w = (pix % WINDOW_SIZE) as usize; @@ -280,6 +324,9 @@ fn process_blob( return; } + window[w].leader_unknown = leader_unknown; + *pending_retransmits = true; + #[cfg(feature = "erasure")] { if erasure::recover( @@ -346,6 +393,7 @@ fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64 } } +#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn recv_window( debug_id: u64, window: &SharedWindow, @@ -356,6 +404,7 @@ fn recv_window( r: &BlobReceiver, s: &BlobSender, retransmit: &BlobSender, + pending_retransmits: &mut bool, ) -> Result<()> { let timer = Duration::from_millis(200); let mut dq = r.recv_timeout(timer)?; @@ -364,6 +413,7 @@ fn recv_window( .expect("'crdt' read lock in fn recv_window") .leader_data() .cloned(); + let leader_unknown = maybe_leader.is_none(); while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } @@ -385,6 +435,8 @@ fn recv_window( *consumed, *received, retransmit, + window, + pending_retransmits, )?; let mut pixs = Vec::new(); @@ -412,6 +464,8 @@ fn recv_window( window, recycler, consumed, + leader_unknown, + pending_retransmits, ); } if log_enabled!(Level::Trace) { @@ -588,6 +642,7 @@ pub fn window( let mut last = entry_height; let mut times = 0; let debug_id = crdt.read().unwrap().debug_id(); + let mut pending_retransmits = false; trace!("{:x}: RECV_WINDOW started", debug_id); loop { if let Err(e) = recv_window( @@ -600,6 +655,7 @@ pub fn window( &r, &s, &retransmit, + &mut pending_retransmits, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -785,6 +841,154 @@ mod test { t_window.join().expect("join"); } + #[test] + pub fn window_send_no_leader_test() { + logger::setup(); + let tn = TestNode::new_localhost(); + let exit = Arc::new(AtomicBool::new(false)); + let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new"); + let me_id = crdt_me.my_data().id; + let subs = Arc::new(RwLock::new(crdt_me)); + + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = blob_receiver( + exit.clone(), + resp_recycler.clone(), + tn.sockets.gossip, + s_reader, + ).unwrap(); + let (s_window, _r_window) = channel(); + let (s_retransmit, r_retransmit) = channel(); + let win = default_window(); + let t_window = window( + subs.clone(), + win, + 0, + resp_recycler.clone(), + r_reader, + s_window, + s_retransmit, + ); + let t_responder = { + let (s_responder, r_responder) = channel(); + let t_responder = responder( + "window_send_test", + tn.sockets.replicate, + resp_recycler.clone(), + r_responder, + ); + let mut msgs = VecDeque::new(); + for v in 0..10 { + let i = 9 - v; + let b = resp_recycler.allocate(); + { + let mut w = b.write().unwrap(); + w.set_index(i).unwrap(); + w.set_id(me_id).unwrap(); + assert_eq!(i, w.get_index().unwrap()); + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&tn.data.contact_info.ncp); + } + msgs.push_back(b); + } + s_responder.send(msgs).expect("send"); + t_responder + }; + + assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err()); + exit.store(true, Ordering::Relaxed); + t_receiver.join().expect("join"); + t_responder.join().expect("join"); + t_window.join().expect("join"); + } + + #[test] + pub fn window_send_late_leader_test() { + logger::setup(); + let tn = TestNode::new_localhost(); + let exit = Arc::new(AtomicBool::new(false)); + let crdt_me = Crdt::new(tn.data.clone()).expect("Crdt::new"); + let me_id = crdt_me.my_data().id; + let subs = Arc::new(RwLock::new(crdt_me)); + + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = blob_receiver( + exit.clone(), + resp_recycler.clone(), + tn.sockets.gossip, + s_reader, + ).unwrap(); + let (s_window, _r_window) = channel(); + let (s_retransmit, r_retransmit) = channel(); + let win = default_window(); + let t_window = window( + subs.clone(), + win, + 0, + resp_recycler.clone(), + r_reader, + s_window, + s_retransmit, + ); + let t_responder = { + let (s_responder, r_responder) = channel(); + let t_responder = responder( + "window_send_test", + tn.sockets.replicate, + resp_recycler.clone(), + r_responder, + ); + let mut msgs = VecDeque::new(); + for v in 0..10 { + let i = 9 - v; + let b = resp_recycler.allocate(); + { + let mut w = b.write().unwrap(); + w.set_index(i).unwrap(); + w.set_id(me_id).unwrap(); + assert_eq!(i, w.get_index().unwrap()); + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&tn.data.contact_info.ncp); + } + msgs.push_back(b); + } + s_responder.send(msgs).expect("send"); + + assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err()); + + subs.write().unwrap().set_leader(me_id); + + let mut msgs1 = VecDeque::new(); + for v in 1..5 { + let i = 9 + v; + let b = resp_recycler.allocate(); + { + let mut w = b.write().unwrap(); + w.set_index(i).unwrap(); + w.set_id(me_id).unwrap(); + assert_eq!(i, w.get_index().unwrap()); + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&tn.data.contact_info.ncp); + } + msgs1.push_back(b); + } + s_responder.send(msgs1).expect("send"); + t_responder + }; + + let mut q = r_retransmit.recv().unwrap(); + while let Ok(mut nq) = r_retransmit.try_recv() { + q.append(&mut nq); + } + assert!(q.len() > 10); + exit.store(true, Ordering::Relaxed); + t_receiver.join().expect("join"); + t_responder.join().expect("join"); + t_window.join().expect("join"); + } + #[test] pub fn calculate_highest_lost_blob_index_test() { assert_eq!(calculate_highest_lost_blob_index(0, 10, 90), 90); diff --git a/tests/multinode.rs b/tests/multinode.rs old mode 100755 new mode 100644 index 1ed62208c4..fee853bd55 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -524,7 +524,7 @@ fn test_multi_node_dynamic_network() { Ok(val) => val .parse() .expect(&format!("env var {} is not parse-able as usize", key)), - Err(_) => 100, + Err(_) => 170, }; let purge_key = "SOLANA_DYNAMIC_NODES_PURGE_LAG";