Replace Blob Ids with Forward property (#2734)

* Replace Blob Id with Blob forwarding

* Update simulation to properly propagate blobs
This commit is contained in:
Sagar Dhawan
2019-02-12 10:56:48 -08:00
committed by GitHub
parent 1173cf7ed4
commit 8b39eb5e4e
12 changed files with 144 additions and 137 deletions

View File

@ -1261,7 +1261,7 @@ pub fn create_new_ledger(ledger_path: &str, genesis_block: &GenesisBlock) -> Res
Ok((1, entries[0].id)) Ok((1, entries[0].id))
} }
pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Result<()> pub fn genesis<'a, I>(ledger_path: &str, entries: I) -> Result<()>
where where
I: IntoIterator<Item = &'a Entry>, I: IntoIterator<Item = &'a Entry>,
{ {
@ -1274,7 +1274,7 @@ where
.map(|(idx, entry)| { .map(|(idx, entry)| {
let mut b = entry.borrow().to_blob(); let mut b = entry.borrow().to_blob();
b.set_index(idx as u64); b.set_index(idx as u64);
b.set_id(&keypair.pubkey()); b.forward(true);
b.set_slot(DEFAULT_SLOT_HEIGHT); b.set_slot(DEFAULT_SLOT_HEIGHT);
b b
}) })
@ -1408,7 +1408,7 @@ mod tests {
fn test_read_blobs_bytes() { fn test_read_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs(); let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
let slot = DEFAULT_SLOT_HEIGHT; let slot = DEFAULT_SLOT_HEIGHT;
index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, &[slot; 10]); index_blobs(&shared_blobs, &mut 0, &[slot; 10]);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -1779,7 +1779,7 @@ mod tests {
let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator"); let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator");
{ {
genesis(&ledger_path, &Keypair::new(), &entries).unwrap(); genesis(&ledger_path, &entries).unwrap();
let ledger = Blocktree::open(&ledger_path).expect("open failed"); let ledger = Blocktree::open(&ledger_path).expect("open failed");
@ -1797,7 +1797,7 @@ mod tests {
let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator"); let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator");
{ {
// put entries except last 2 into ledger // put entries except last 2 into ledger
genesis(&ledger_path, &Keypair::new(), &entries[..entries.len() - 2]).unwrap(); genesis(&ledger_path, &entries[..entries.len() - 2]).unwrap();
let ledger = Blocktree::open(&ledger_path).expect("open failed"); let ledger = Blocktree::open(&ledger_path).expect("open failed");

View File

@ -82,7 +82,7 @@ impl Broadcast {
.collect(); .collect();
// TODO: blob_index should be slot-relative... // TODO: blob_index should be slot-relative...
index_blobs(&blobs, &self.id, &mut self.blob_index, &slots); index_blobs(&blobs, &mut self.blob_index, &slots);
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());

View File

@ -166,11 +166,11 @@ mod tests {
use bs58; use bs58;
// golden needs to be updated if blob stuff changes.... // golden needs to be updated if blob stuff changes....
let golden = Hash::new( let golden = Hash::new(
&bs58::decode("nzxMWDQVsftBZbMGA1ika8X6bAKy7vya1jfXnVZSErt") &bs58::decode("8NMJBwpXoBoA7YrA5CemRtGtfAqoY15bvnCqVjh4LYpS")
.into_vec() .into_vec()
.unwrap(), .unwrap(),
); );
assert_eq!(hasher.result(), golden,); assert_eq!(hasher.result(), golden);
remove_file(out_path).unwrap(); remove_file(out_path).unwrap();
} }
} }

View File

@ -583,7 +583,6 @@ impl ClusterInfo {
let s = obj.read().unwrap(); let s = obj.read().unwrap();
(s.my_data().clone(), peers) (s.my_data().clone(), peers)
}; };
blob.write().unwrap().set_id(&me.id);
let rblob = blob.read().unwrap(); let rblob = blob.read().unwrap();
trace!("retransmit orders {}", orders.len()); trace!("retransmit orders {}", orders.len());
let errs: Vec<_> = orders let errs: Vec<_> = orders

View File

@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock};
pub const MAX_REPAIR_LENGTH: usize = 128; pub const MAX_REPAIR_LENGTH: usize = 128;
pub fn retransmit_all_leader_blocks( pub fn retransmit_blobs(
dq: &[SharedBlob], dq: &[SharedBlob],
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
retransmit: &BlobSender, retransmit: &BlobSender,
@ -24,16 +24,16 @@ pub fn retransmit_all_leader_blocks(
) -> Result<()> { ) -> Result<()> {
let mut retransmit_queue: Vec<SharedBlob> = Vec::new(); let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
for b in dq { for b in dq {
// Check if the blob is from the scheduled leader for its slot. If so, // Don't add blobs generated by this node to the retransmit queue
// add to the retransmit_queue
let slot = b.read().unwrap().slot(); let slot = b.read().unwrap().slot();
if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) { if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) {
if leader_id != *id { if leader_id != *id {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); retransmit_queue.push(b.clone());
} }
} }
} }
//todo maybe move this to where retransmit is actually happening
submit( submit(
influxdb::Point::new("retransmit-queue") influxdb::Point::new("retransmit-queue")
.add_field( .add_field(
@ -50,24 +50,6 @@ pub fn retransmit_all_leader_blocks(
Ok(()) Ok(())
} }
pub fn add_blob_to_retransmit_queue(
b: &SharedBlob,
leader_id: Pubkey,
retransmit_queue: &mut Vec<SharedBlob>,
) {
let p = b.read().unwrap();
if p.id() == leader_id {
let nv = SharedBlob::default();
{
let mut mnv = nv.write().unwrap();
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
retransmit_queue.push(nv);
}
}
/// Process a blob: Add blob to the ledger window. /// Process a blob: Add blob to the ledger window.
pub fn process_blob( pub fn process_blob(
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
@ -216,7 +198,7 @@ mod test {
} }
#[test] #[test]
pub fn test_retransmit() { pub fn test_send_to_retransmit_stage() {
let leader = Keypair::new().pubkey(); let leader = Keypair::new().pubkey();
let nonleader = Keypair::new().pubkey(); let nonleader = Keypair::new().pubkey();
let mut leader_scheduler = LeaderScheduler::default(); let mut leader_scheduler = LeaderScheduler::default();
@ -226,9 +208,21 @@ mod test {
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
// Expect blob from leader to be retransmitted // Expect all blobs to be sent to retransmit_stage
blob.write().unwrap().set_id(&leader); blob.write().unwrap().forward(false);
retransmit_all_leader_blocks( retransmit_blobs(
&vec![blob.clone()],
&leader_scheduler,
&blob_sender,
&nonleader,
)
.expect("Expect successful retransmit");
let _ = blob_receiver
.try_recv()
.expect("Expect input blob to be retransmitted");
blob.write().unwrap().forward(true);
retransmit_blobs(
&vec![blob.clone()], &vec![blob.clone()],
&leader_scheduler, &leader_scheduler,
&blob_sender, &blob_sender,
@ -239,26 +233,13 @@ mod test {
.try_recv() .try_recv()
.expect("Expect input blob to be retransmitted"); .expect("Expect input blob to be retransmitted");
// Retransmitted blob should be missing the leader id // retransmit_blobs shouldn't be modifying the blob. That is retransmit stage's job now
assert_ne!(*output_blob[0].read().unwrap(), *blob.read().unwrap());
// Set the leader in the retransmitted blob, should now match the original
output_blob[0].write().unwrap().set_id(&leader);
assert_eq!(*output_blob[0].read().unwrap(), *blob.read().unwrap()); assert_eq!(*output_blob[0].read().unwrap(), *blob.read().unwrap());
// Expect blob from nonleader to not be retransmitted
blob.write().unwrap().set_id(&nonleader);
retransmit_all_leader_blocks(
&vec![blob.clone()],
&leader_scheduler,
&blob_sender,
&nonleader,
)
.expect("Expect successful retransmit");
assert!(blob_receiver.try_recv().is_err());
// Expect blob from leader while currently leader to not be retransmitted // Expect blob from leader while currently leader to not be retransmitted
blob.write().unwrap().set_id(&leader); // Even when forward is set
retransmit_all_leader_blocks(&vec![blob], &leader_scheduler, &blob_sender, &leader) blob.write().unwrap().forward(true);
retransmit_blobs(&vec![blob], &leader_scheduler, &blob_sender, &leader)
.expect("Expect successful retransmit"); .expect("Expect successful retransmit");
assert!(blob_receiver.try_recv().is_err()); assert!(blob_receiver.try_recv().is_err());
} }
@ -470,12 +451,7 @@ mod test {
let num_entries = 10; let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs(); let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();
index_blobs( index_blobs(&shared_blobs, &mut 0, &vec![slot; num_entries]);
&shared_blobs,
&Keypair::new().pubkey(),
&mut 0,
&vec![slot; num_entries],
);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -572,7 +548,6 @@ mod test {
index_blobs( index_blobs(
&shared_blobs, &shared_blobs,
&Keypair::new().pubkey(),
&mut 0, &mut 0,
&vec![DEFAULT_SLOT_HEIGHT; num_entries], &vec![DEFAULT_SLOT_HEIGHT; num_entries],
); );

View File

@ -447,7 +447,6 @@ pub fn make_large_test_entries(num_entries: usize) -> Vec<Entry> {
#[cfg(test)] #[cfg(test)]
pub fn make_consecutive_blobs( pub fn make_consecutive_blobs(
id: &Pubkey,
num_blobs_to_make: u64, num_blobs_to_make: u64,
start_height: u64, start_height: u64,
start_hash: Hash, start_hash: Hash,
@ -460,7 +459,7 @@ pub fn make_consecutive_blobs(
for blob in &blobs { for blob in &blobs {
let mut blob = blob.write().unwrap(); let mut blob = blob.write().unwrap();
blob.set_index(index); blob.set_index(index);
blob.set_id(id); blob.forward(true);
blob.meta.set_addr(addr); blob.meta.set_addr(addr);
index += 1; index += 1;
} }

View File

@ -329,14 +329,14 @@ impl CodingGenerator {
for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] { for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] {
let index = data_blob.index(); let index = data_blob.index();
let slot = data_blob.slot(); let slot = data_blob.slot();
let id = data_blob.id(); let should_forward = data_blob.should_forward();
let coding_blob = SharedBlob::default(); let coding_blob = SharedBlob::default();
{ {
let mut coding_blob = coding_blob.write().unwrap(); let mut coding_blob = coding_blob.write().unwrap();
coding_blob.set_index(index); coding_blob.set_index(index);
coding_blob.set_slot(slot); coding_blob.set_slot(slot);
coding_blob.set_id(&id); coding_blob.forward(should_forward);
coding_blob.set_size(max_data_size); coding_blob.set_size(max_data_size);
coding_blob.set_coding(); coding_blob.set_coding();
} }
@ -509,7 +509,6 @@ pub mod test {
use crate::window::WindowSlot; use crate::window::WindowSlot;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc; use std::sync::Arc;
#[test] #[test]
@ -756,23 +755,23 @@ pub mod test {
for i in 0..max_data_size { for i in 0..max_data_size {
coding_wl.data[i] = 0; coding_wl.data[i] = 0;
} }
// copy index and id from the data blob // copy index and forward flag from the data blob
if let Some(data) = &window[n].data { if let Some(data) = &window[n].data {
let data_rl = data.read().unwrap(); let data_rl = data.read().unwrap();
let index = data_rl.index(); let index = data_rl.index();
let slot = data_rl.slot(); let slot = data_rl.slot();
let id = data_rl.id(); let should_forward = data_rl.should_forward();
trace!( trace!(
"{} copying index {} id {:?} from data to coding", "{} copying index {} should_forward {:?} from data to coding",
id, should_forward,
index, index,
id should_forward
); );
coding_wl.set_index(index); coding_wl.set_index(index);
coding_wl.set_slot(slot); coding_wl.set_slot(slot);
coding_wl.set_id(&id); coding_wl.forward(should_forward);
} }
coding_wl.set_size(max_data_size); coding_wl.set_size(max_data_size);
coding_wl.set_coding(); coding_wl.set_coding();
@ -890,12 +889,7 @@ pub mod test {
} }
// Make some dummy slots // Make some dummy slots
index_blobs( index_blobs(&blobs, &mut (offset as u64), &vec![slot; blobs.len()]);
&blobs,
&Keypair::new().pubkey(),
&mut (offset as u64),
&vec![slot; blobs.len()],
);
for b in blobs { for b in blobs {
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE; let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
@ -910,7 +904,6 @@ pub mod test {
index_blobs( index_blobs(
&blobs, &blobs,
&Keypair::new().pubkey(),
&mut (offset as u64), &mut (offset as u64),
&vec![DEFAULT_SLOT_HEIGHT; blobs.len()], &vec![DEFAULT_SLOT_HEIGHT; blobs.len()],
); );

View File

@ -794,13 +794,8 @@ mod tests {
let tvu_address = &validator_info.tvu; let tvu_address = &validator_info.tvu;
let msgs = make_consecutive_blobs( let msgs =
&leader_id, make_consecutive_blobs(blobs_to_send, ledger_initial_len, last_id, &tvu_address)
blobs_to_send,
ledger_initial_len,
last_id,
&tvu_address,
)
.into_iter() .into_iter()
.rev() .rev()
.collect(); .collect();

View File

@ -7,7 +7,6 @@ use byteorder::{ByteOrder, LittleEndian};
use log::Level; use log::Level;
use serde::Serialize; use serde::Serialize;
pub use solana_sdk::packet::PACKET_DATA_SIZE; pub use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey;
use std::cmp; use std::cmp;
use std::fmt; use std::fmt;
use std::io; use std::io;
@ -281,8 +280,8 @@ macro_rules! range {
const PARENT_RANGE: std::ops::Range<usize> = range!(0, u64); const PARENT_RANGE: std::ops::Range<usize> = range!(0, u64);
const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64); const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64);
const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64); const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64);
const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey); const FORWARD_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, bool);
const FLAGS_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, u32); const FLAGS_RANGE: std::ops::Range<usize> = range!(FORWARD_RANGE.end, u32);
const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64); const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64);
macro_rules! align { macro_rules! align {
@ -324,14 +323,15 @@ impl Blob {
LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix);
} }
/// sender id, we use this for identifying if its a blob from the leader that we should /// Used to determine whether or not this blob should be forwarded in retransmit
/// retransmit. eventually blobs should have a signature that we can use for spam filtering /// A bool is used here instead of a flag because this item is not intended to be signed when
pub fn id(&self) -> Pubkey { /// blob signatures are introduced
Pubkey::new(&self.data[ID_RANGE]) pub fn should_forward(&self) -> bool {
self.data[FORWARD_RANGE][0] & 0x1 == 1
} }
pub fn set_id(&mut self, id: &Pubkey) { pub fn forward(&mut self, forward: bool) {
self.data[ID_RANGE].copy_from_slice(id.as_ref()) self.data[FORWARD_RANGE][0] = u8::from(forward)
} }
pub fn flags(&self) -> u32 { pub fn flags(&self) -> u32 {
@ -442,14 +442,14 @@ impl Blob {
} }
} }
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: &mut u64, slots: &[u64]) { pub fn index_blobs(blobs: &[SharedBlob], blob_index: &mut u64, slots: &[u64]) {
// enumerate all the blobs, those are the indices // enumerate all the blobs, those are the indices
for (blob, slot) in blobs.iter().zip(slots) { for (blob, slot) in blobs.iter().zip(slots) {
let mut blob = blob.write().unwrap(); let mut blob = blob.write().unwrap();
blob.set_index(*blob_index); blob.set_index(*blob_index);
blob.set_slot(*slot); blob.set_slot(*slot);
blob.set_id(id); blob.forward(true);
*blob_index += 1; *blob_index += 1;
} }
} }
@ -567,5 +567,12 @@ mod tests {
assert_eq!(b.index(), <u64>::max_value()); assert_eq!(b.index(), <u64>::max_value());
assert_eq!(b.meta, Meta::default()); assert_eq!(b.meta, Meta::default());
} }
#[test]
fn test_blob_forward() {
let mut b = Blob::default();
assert!(!b.should_forward());
b.forward(true);
assert!(b.should_forward());
}
} }

View File

@ -7,6 +7,7 @@ use crate::cluster_info::{
}; };
use crate::counter::Counter; use crate::counter::Counter;
use crate::leader_scheduler::LeaderScheduler; use crate::leader_scheduler::LeaderScheduler;
use crate::packet::SharedBlob;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use crate::streamer::BlobReceiver; use crate::streamer::BlobReceiver;
@ -28,13 +29,15 @@ use std::time::Duration;
/// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size /// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size
/// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them /// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them
/// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic /// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance)
fn compute_retransmit_peers( fn compute_retransmit_peers(
bank: &Arc<Bank>, bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
fanout: usize, fanout: usize,
hood_size: usize, hood_size: usize,
grow: bool, grow: bool,
) -> Vec<NodeInfo> { ) -> (Vec<NodeInfo>, Vec<NodeInfo>) {
let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank); let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank);
let my_id = cluster_info.read().unwrap().id(); let my_id = cluster_info.read().unwrap().id();
//calc num_layers and num_neighborhoods using the total number of nodes //calc num_layers and num_neighborhoods using the total number of nodes
@ -43,7 +46,7 @@ fn compute_retransmit_peers(
if num_layers <= 1 { if num_layers <= 1 {
/* single layer data plane */ /* single layer data plane */
peers (peers, vec![])
} else { } else {
//find my index (my ix is the same as the first node with smaller stake) //find my index (my ix is the same as the first node with smaller stake)
let my_index = peers let my_index = peers
@ -56,15 +59,16 @@ fn compute_retransmit_peers(
my_index.unwrap_or(peers.len() - 1), my_index.unwrap_or(peers.len() - 1),
); );
let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len()); let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len());
let mut retransmit_peers = peers[locality.neighbor_bounds.0..upper_bound].to_vec(); let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec();
let mut children = Vec::new();
for ix in locality.child_layer_peers { for ix in locality.child_layer_peers {
if let Some(peer) = peers.get(ix) { if let Some(peer) = peers.get(ix) {
retransmit_peers.push(peer.clone()); children.push(peer.clone());
continue; continue;
} }
break; break;
} }
retransmit_peers (neighbors, children)
} }
} }
@ -85,19 +89,32 @@ fn retransmit(
.add_field("count", influxdb::Value::Integer(dq.len() as i64)) .add_field("count", influxdb::Value::Integer(dq.len() as i64))
.to_owned(), .to_owned(),
); );
let retransmit_peers = compute_retransmit_peers( let (neighbors, children) = compute_retransmit_peers(
&bank, &bank,
cluster_info, cluster_info,
DATA_PLANE_FANOUT, DATA_PLANE_FANOUT,
NEIGHBORHOOD_SIZE, NEIGHBORHOOD_SIZE,
GROW_LAYER_CAPACITY, GROW_LAYER_CAPACITY,
); );
for b in &mut dq { for b in &dq {
ClusterInfo::retransmit_to(&cluster_info, &retransmit_peers, b, sock)?; if b.read().unwrap().should_forward() {
ClusterInfo::retransmit_to(&cluster_info, &neighbors, &copy_for_neighbors(b), sock)?;
}
// Always send blobs to children
ClusterInfo::retransmit_to(&cluster_info, &children, b, sock)?;
} }
Ok(()) Ok(())
} }
/// Modifies a blob for neighbors nodes
#[inline]
fn copy_for_neighbors(b: &SharedBlob) -> SharedBlob {
let mut blob = b.read().unwrap().clone();
// Disable blob forwarding for neighbors
blob.forward(false);
Arc::new(RwLock::new(blob))
}
/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes. /// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
/// See `cluster_info` for network layer definitions. /// See `cluster_info` for network layer definitions.
/// # Arguments /// # Arguments
@ -204,7 +221,7 @@ mod tests {
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Instant; use std::time::Instant;
type Nodes = HashMap<Pubkey, (HashSet<i32>, Receiver<i32>)>; type Nodes = HashMap<Pubkey, (HashSet<i32>, Receiver<(i32, bool)>)>;
fn num_threads() -> usize { fn num_threads() -> usize {
sys_info::cpu_num().unwrap_or(10) as usize sys_info::cpu_num().unwrap_or(10) as usize
@ -240,7 +257,7 @@ mod tests {
// setup accounts for all nodes (leader has 0 bal) // setup accounts for all nodes (leader has 0 bal)
let (s, r) = channel(); let (s, r) = channel();
let senders: Arc<Mutex<HashMap<Pubkey, Sender<i32>>>> = let senders: Arc<Mutex<HashMap<Pubkey, Sender<(i32, bool)>>>> =
Arc::new(Mutex::new(HashMap::new())); Arc::new(Mutex::new(HashMap::new()));
senders.lock().unwrap().insert(leader_info.id, s); senders.lock().unwrap().insert(leader_info.id, s);
let mut batches: Vec<Nodes> = Vec::with_capacity(num_threads); let mut batches: Vec<Nodes> = Vec::with_capacity(num_threads);
@ -273,7 +290,7 @@ mod tests {
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0); assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0);
// create some "blobs". // create some "blobs".
let blobs: Vec<_> = (0..100).into_par_iter().map(|i| i as i32).collect(); let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect();
// pretend to broadcast from leader - cluster_info::create_broadcast_orders // pretend to broadcast from leader - cluster_info::create_broadcast_orders
let mut broadcast_table = cluster_info.sorted_tvu_peers(&bank); let mut broadcast_table = cluster_info.sorted_tvu_peers(&bank);
@ -283,7 +300,7 @@ mod tests {
// send blobs to layer 1 nodes // send blobs to layer 1 nodes
orders.iter().for_each(|(b, vc)| { orders.iter().for_each(|(b, vc)| {
vc.iter().for_each(|c| { vc.iter().for_each(|c| {
find_insert_blob(&c.id, *b, &mut batches); find_insert_blob(&c.id, b.0, &mut batches);
}) })
}); });
assert!(!batches.is_empty()); assert!(!batches.is_empty());
@ -295,43 +312,62 @@ mod tests {
let batch_size = batch.len(); let batch_size = batch.len();
let mut remaining = batch_size; let mut remaining = batch_size;
let senders: HashMap<_, _> = senders.lock().unwrap().clone(); let senders: HashMap<_, _> = senders.lock().unwrap().clone();
let mut mapped_peers: HashMap<Pubkey, Vec<Sender<i32>>> = HashMap::new(); // A map that holds neighbors and children senders for a given node
let mut mapped_peers: HashMap<
Pubkey,
(Vec<Sender<(i32, bool)>>, Vec<Sender<(i32, bool)>>),
> = HashMap::new();
while remaining > 0 { while remaining > 0 {
for (id, (recv, r)) in batch.iter_mut() { for (id, (recv, r)) in batch.iter_mut() {
assert!(now.elapsed().as_secs() < timeout, "Timed out"); assert!(now.elapsed().as_secs() < timeout, "Timed out");
cluster.gossip.set_self(*id); cluster.gossip.set_self(*id);
if !mapped_peers.contains_key(id) { if !mapped_peers.contains_key(id) {
let peers = compute_retransmit_peers( let (neighbors, children) = compute_retransmit_peers(
&bank, &bank,
&Arc::new(RwLock::new(cluster.clone())), &Arc::new(RwLock::new(cluster.clone())),
fanout, fanout,
hood_size, hood_size,
GROW_LAYER_CAPACITY, GROW_LAYER_CAPACITY,
); );
let vec_children: Vec<_> = children
let vec_peers: Vec<_> = peers
.iter() .iter()
.map(|p| { .map(|p| {
let s = senders.get(&p.id).unwrap(); let s = senders.get(&p.id).unwrap();
recv.iter().for_each(|i| { recv.iter().for_each(|i| {
let _ = s.send(*i); let _ = s.send((*i, true));
}); });
s.clone() s.clone()
}) })
.collect(); .collect();
mapped_peers.insert(*id, vec_peers);
let vec_neighbors: Vec<_> = neighbors
.iter()
.map(|p| {
let s = senders.get(&p.id).unwrap();
recv.iter().for_each(|i| {
let _ = s.send((*i, false));
});
s.clone()
})
.collect();
mapped_peers.insert(*id, (vec_neighbors, vec_children));
} }
let vec_peers = mapped_peers.get(id).unwrap().to_vec(); let (vec_neighbors, vec_children) = mapped_peers.get(id).unwrap();
//send and recv //send and recv
if recv.len() < blobs.len() { if recv.len() < blobs.len() {
loop { loop {
match r.try_recv() { match r.try_recv() {
Ok(i) => { Ok((data, retransmit)) => {
if recv.insert(i) { if recv.insert(data) {
vec_peers.iter().for_each(|s| { vec_children.iter().for_each(|s| {
let _ = s.send(i); let _ = s.send((data, retransmit));
}); });
if retransmit {
vec_neighbors.iter().for_each(|s| {
let _ = s.send((data, false));
})
}
if recv.len() == blobs.len() { if recv.len() == blobs.len() {
remaining -= 1; remaining -= 1;
break; break;
@ -348,6 +384,8 @@ mod tests {
}); });
} }
//todo add tests with network failures
// Run with a single layer // Run with a single layer
#[test] #[test]
fn test_retransmit_small() { fn test_retransmit_small() {
@ -372,5 +410,13 @@ mod tests {
run_simulation(num_nodes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10); run_simulation(num_nodes, DATA_PLANE_FANOUT / 10, NEIGHBORHOOD_SIZE / 10);
} }
//todo add tests with network failures // Test that blobs always come out with forward unset for neighbors
#[test]
fn test_blob_for_neighbors() {
let blob = SharedBlob::default();
blob.write().unwrap().forward(true);
let for_hoodies = copy_for_neighbors(&blob);
assert!(!for_hoodies.read().unwrap().should_forward());
}
} }

View File

@ -303,7 +303,6 @@ pub mod tests {
let mut cluster_info2 = ClusterInfo::new(target2.info.clone()); let mut cluster_info2 = ClusterInfo::new(target2.info.clone());
cluster_info2.insert_info(leader.info.clone()); cluster_info2.insert_info(leader.info.clone());
cluster_info2.set_leader(leader.info.id); cluster_info2.set_leader(leader.info.id);
let leader_id = leader.info.id;
let cref2 = Arc::new(RwLock::new(cluster_info2)); let cref2 = Arc::new(RwLock::new(cluster_info2));
let dr_2 = new_gossip(cref2, target2.sockets.gossip, exit.clone()); let dr_2 = new_gossip(cref2, target2.sockets.gossip, exit.clone());
@ -402,7 +401,7 @@ pub mod tests {
let mut w = b.write().unwrap(); let mut w = b.write().unwrap();
w.set_index(blob_idx); w.set_index(blob_idx);
blob_idx += 1; blob_idx += 1;
w.set_id(&leader_id); w.forward(true);
let serialized_entry = serialize(&entry).unwrap(); let serialized_entry = serialize(&entry).unwrap();

View File

@ -50,7 +50,7 @@ fn recv_window(
.to_owned(), .to_owned(),
); );
retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit, id)?; retransmit_blobs(&dq, leader_scheduler, retransmit, id)?;
//send a contiguous set of blocks //send a contiguous set of blocks
trace!("{} num blobs received: {}", id, dq.len()); trace!("{} num blobs received: {}", id, dq.len());
@ -215,13 +215,8 @@ mod test {
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let num_blobs_to_make = 10; let num_blobs_to_make = 10;
let gossip_address = &leader_node.info.gossip; let gossip_address = &leader_node.info.gossip;
let msgs = make_consecutive_blobs( let msgs =
&me_id, make_consecutive_blobs(num_blobs_to_make, 0, Hash::default(), &gossip_address)
num_blobs_to_make,
0,
Hash::default(),
&gossip_address,
)
.into_iter() .into_iter()
.rev() .rev()
.collect();; .collect();;
@ -290,8 +285,7 @@ mod test {
leader_node.sockets.tvu.into_iter().map(Arc::new).collect(); leader_node.sockets.tvu.into_iter().map(Arc::new).collect();
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut msgs = Vec::new(); let mut msgs = Vec::new();
let blobs = let blobs = make_consecutive_blobs(14u64, 0, Hash::default(), &leader_node.info.gossip);
make_consecutive_blobs(&me_id, 14u64, 0, Hash::default(), &leader_node.info.gossip);
for v in 0..10 { for v in 0..10 {
let i = 9 - v; let i = 9 - v;