From 0bea870b22a8bb8fe1e6324560543ed28f619204 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 2 Jan 2019 14:16:15 +0530 Subject: [PATCH] Dynamic N layer 'avalanche' broadcast and retransmit (#2058) * Dynamic N layer avalanche broadcast and retransmit --- src/broadcast_service.rs | 13 +- src/cluster_info.rs | 355 ++++++++++++++++++++++++++++++++++++++- src/contact_info.rs | 23 ++- src/fullnode.rs | 2 + src/retransmit_stage.rs | 63 ++++++- src/tvu.rs | 1 + 6 files changed, 440 insertions(+), 17 deletions(-) diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index f36d99a418..c3c43dec48 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -1,6 +1,7 @@ //! The `broadcast_service` broadcasts data from a leader node to validators //! -use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo}; +use crate::bank::Bank; +use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo, DATA_PLANE_FANOUT}; use crate::counter::Counter; use crate::db_ledger::DbLedger; use crate::entry::Entry; @@ -239,8 +240,10 @@ pub struct BroadcastService { } impl BroadcastService { + #[allow(clippy::too_many_arguments)] fn run( db_ledger: &Arc, + bank: &Arc, sock: &UdpSocket, cluster_info: &Arc>, window: &SharedWindow, @@ -260,7 +263,9 @@ impl BroadcastService { if exit_signal.load(Ordering::Relaxed) { return BroadcastServiceReturnType::ExitSignal; } - let broadcast_table = cluster_info.read().unwrap().tvu_peers(); + let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(&bank); + // Layer 1 nodes are limited to the fanout size. + broadcast_table.truncate(DATA_PLANE_FANOUT); inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); let leader_id = cluster_info.read().unwrap().leader_id(); if let Err(e) = broadcast( @@ -309,6 +314,7 @@ impl BroadcastService { #[allow(clippy::too_many_arguments, clippy::new_ret_no_self)] pub fn new( db_ledger: Arc, + bank: Arc, sock: UdpSocket, cluster_info: Arc>, window: SharedWindow, @@ -326,6 +332,7 @@ impl BroadcastService { let _exit = Finalizer::new(exit_sender); Self::run( &db_ledger, + &bank, &sock, &cluster_info, &window, @@ -401,10 +408,12 @@ mod test { let shared_window = Arc::new(RwLock::new(window)); let (entry_sender, entry_receiver) = channel(); let exit_sender = Arc::new(AtomicBool::new(false)); + let bank = Arc::new(Bank::default()); // Start up the broadcast stage let (broadcast_service, exit_signal) = BroadcastService::new( db_ledger.clone(), + bank.clone(), leader_info.sockets.broadcast, cluster_info, shared_window, diff --git a/src/cluster_info.rs b/src/cluster_info.rs index fad378deeb..791fe38551 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -12,6 +12,7 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight +use crate::bank::Bank; use crate::bloom::Bloom; use crate::contact_info::ContactInfo; use crate::counter::Counter; @@ -36,6 +37,7 @@ use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; +use std::cmp::min; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -47,6 +49,12 @@ pub type NodeInfo = ContactInfo; pub const FULLNODE_PORT_RANGE: (u16, u16) = (8000, 10_000); +/// The fanout for Ledger Replication +pub const DATA_PLANE_FANOUT: usize = 200; +pub const NEIGHBORHOOD_SIZE: usize = DATA_PLANE_FANOUT; +/// Set whether node capacity should grow as layers are added +pub const GROW_LAYER_CAPACITY: bool = false; + /// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; @@ -66,6 +74,20 @@ pub struct ClusterInfo { pub(crate) keypair: Arc, } +#[derive(Default, Clone)] +pub struct Locality { + /// The bounds of the neighborhood represented by this locality + pub neighbor_bounds: (usize, usize), + /// The `avalanche` layer this locality is in + pub layer_ix: usize, + /// The bounds of the current layer + pub layer_bounds: (usize, usize), + /// The bounds of the next layer + pub child_layer_bounds: Option<(usize, usize)>, + /// The indices of the nodes that should be contacted in next layer + pub child_layer_peers: Vec, +} + #[derive(Debug, Deserialize, Serialize)] pub struct PruneData { /// Pubkey of the node that sent this prune data @@ -297,6 +319,33 @@ impl ClusterInfo { .collect() } + fn sort_by_stake(peers: &[NodeInfo], bank: &Arc) -> Vec<(u64, NodeInfo)> { + let mut peers_with_stakes: Vec<_> = peers + .iter() + .map(|c| (bank.get_stake(&c.id), c.clone())) + .collect(); + peers_with_stakes.sort_unstable(); + peers_with_stakes + } + + pub fn sorted_retransmit_peers(&self, bank: &Arc) -> Vec { + let peers = self.retransmit_peers(); + let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, bank); + peers_with_stakes + .iter() + .map(|(_, peer)| (*peer).clone()) + .collect() + } + + pub fn sorted_tvu_peers(&self, bank: &Arc) -> Vec { + let peers = self.tvu_peers(); + let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, bank); + peers_with_stakes + .iter() + .map(|(_, peer)| (*peer).clone()) + .collect() + } + /// compute broadcast table pub fn tpu_peers(&self) -> Vec { let me = self.my_data().id; @@ -311,6 +360,137 @@ impl ClusterInfo { .collect() } + /// Given a node count, neighborhood size, and an initial fanout (leader -> layer 1), it + /// calculates how many layers are needed and at what index each layer begins. + /// The `grow` parameter is used to determine if the network should 'fanout' or keep + /// layer capacities constant. + pub fn describe_data_plane( + nodes: usize, + fanout: usize, + hood_size: usize, + grow: bool, + ) -> (usize, Vec) { + let mut layer_indices: Vec = vec![0]; + if nodes == 0 { + (0, vec![]) + } else if nodes <= fanout { + // single layer data plane + (1, layer_indices) + } else { + //layer 1 is going to be the first num fanout nodes, so exclude those + let mut remaining_nodes = nodes - fanout; + layer_indices.push(fanout); + let mut num_layers = 2; + let mut num_neighborhoods = fanout / 2; + let mut layer_capacity = hood_size * num_neighborhoods; + while remaining_nodes > 0 { + if remaining_nodes > layer_capacity { + // Needs more layers. + num_layers += 1; + remaining_nodes -= layer_capacity; + let end = *layer_indices.last().unwrap(); + layer_indices.push(layer_capacity + end); + + if grow { + // Next layer's capacity + num_neighborhoods *= num_neighborhoods; + layer_capacity = hood_size * num_neighborhoods; + } + } else { + //everything will now fit in the layers we have + let end = *layer_indices.last().unwrap(); + layer_indices.push(layer_capacity + end); + break; + } + } + assert_eq!(num_layers, layer_indices.len() - 1); + (num_layers, layer_indices) + } + } + + fn localize_item( + layer_indices: &[usize], + hood_size: usize, + select_index: usize, + curr_index: usize, + ) -> Option<(Locality)> { + let end = layer_indices.len() - 1; + let next = min(end, curr_index + 1); + let value = layer_indices[curr_index]; + let localized = select_index >= value && select_index < layer_indices[next]; + let mut locality = Locality::default(); + if localized { + match curr_index { + _ if curr_index == 0 => { + locality.layer_ix = 0; + locality.layer_bounds = (0, hood_size); + locality.neighbor_bounds = locality.layer_bounds; + if next == end { + locality.child_layer_bounds = None; + locality.child_layer_peers = vec![]; + } else { + locality.child_layer_bounds = + Some((layer_indices[next], layer_indices[next + 1])); + locality.child_layer_peers = ClusterInfo::lower_layer_peers( + select_index, + layer_indices[next], + layer_indices[next + 1], + hood_size, + ); + } + } + _ if curr_index == end => { + locality.layer_ix = end; + locality.layer_bounds = (end - hood_size, end); + locality.neighbor_bounds = locality.layer_bounds; + locality.child_layer_bounds = None; + locality.child_layer_peers = vec![]; + } + ix => { + let hood_ix = (select_index - value) / hood_size; + locality.layer_ix = ix; + locality.layer_bounds = (value, layer_indices[next]); + locality.neighbor_bounds = ( + ((hood_ix * hood_size) + value), + ((hood_ix + 1) * hood_size + value), + ); + if next == end { + locality.child_layer_bounds = None; + locality.child_layer_peers = vec![]; + } else { + locality.child_layer_bounds = + Some((layer_indices[next], layer_indices[next + 1])); + locality.child_layer_peers = ClusterInfo::lower_layer_peers( + select_index, + layer_indices[next], + layer_indices[next + 1], + hood_size, + ); + } + } + } + Some(locality) + } else { + None + } + } + + /// Given a array of layer indices and another index, returns (as a `Locality`) the layer, + /// layer-bounds and neighborhood-bounds in which the index resides + pub fn localize(layer_indices: &[usize], hood_size: usize, select_index: usize) -> Locality { + (0..layer_indices.len()) + .find_map(|i| ClusterInfo::localize_item(layer_indices, hood_size, select_index, i)) + .or_else(|| Some(Locality::default())) + .unwrap() + } + + fn lower_layer_peers(index: usize, start: usize, end: usize, hood_size: usize) -> Vec { + (start..end) + .step_by(hood_size) + .map(|x| x + index % hood_size) + .collect() + } + /// broadcast messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` @@ -369,14 +549,19 @@ impl ClusterInfo { Ok(()) } - /// retransmit messages from the leader to layer 1 nodes + /// retransmit messages to a list of nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` - pub fn retransmit(obj: &Arc>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { - let (me, orders): (NodeInfo, Vec) = { + pub fn retransmit_to( + obj: &Arc>, + peers: &[NodeInfo], + blob: &SharedBlob, + s: &UdpSocket, + ) -> Result<()> { + let (me, orders): (NodeInfo, &[NodeInfo]) = { // copy to avoid locking during IO - let s = obj.read().expect("'obj' read lock in pub fn retransmit"); - (s.my_data().clone(), s.retransmit_peers()) + let s = obj.read().unwrap(); + (s.my_data().clone(), peers) }; blob.write() .unwrap() @@ -409,6 +594,14 @@ impl ClusterInfo { Ok(()) } + /// retransmit messages from the leader to layer 1 nodes + /// # Remarks + /// We need to avoid having obj locked while doing any io, such as the `send_to` + pub fn retransmit(obj: &Arc>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { + let me = obj.read().unwrap(); + ClusterInfo::retransmit_to(obj, &me.retransmit_peers(), blob, s) + } + fn send_orders( s: &UdpSocket, orders: Vec<(Option, Vec<&NodeInfo>)>, @@ -1119,10 +1312,10 @@ mod tests { use crate::crds_value::CrdsValueLabel; use crate::db_ledger::DbLedger; use crate::ledger::get_tmp_ledger_path; - use crate::packet::BLOB_HEADER_SIZE; use crate::result::Error; use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::collections::HashSet; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::{Arc, RwLock}; @@ -1361,4 +1554,154 @@ mod tests { .unwrap(); assert!(val.verify()); } + + fn num_layers(nodes: usize, fanout: usize, hood_size: usize, grow: bool) -> usize { + ClusterInfo::describe_data_plane(nodes, fanout, hood_size, grow).0 + } + + #[test] + fn test_describe_data_plane() { + // no nodes + assert_eq!(num_layers(0, 200, 200, false), 0); + + // 1 node + assert_eq!(num_layers(1, 200, 200, false), 1); + + // 10 nodes with fanout of 2 and hood size of 2 + assert_eq!(num_layers(10, 2, 2, false), 5); + + // fanout + 1 nodes with fanout of 2 and hood size of 2 + assert_eq!(num_layers(3, 2, 2, false), 2); + + // 10 nodes with fanout of 4 and hood size of 2 while growing + assert_eq!(num_layers(10, 4, 2, true), 3); + + // A little more realistic + assert_eq!(num_layers(100, 10, 10, false), 3); + + // A little more realistic with odd numbers + assert_eq!(num_layers(103, 13, 13, false), 3); + + // larger + let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(10_000, 10, 10, false); + assert_eq!(layer_cnt, 201); + // distances between index values should be the same since we aren't growing. + let capacity = 10 / 2 * 10; + assert_eq!(layer_indices[1], 10); + layer_indices[1..layer_indices.len()] + .chunks(2) + .for_each(|x| { + if x.len() == 2 { + assert_eq!(x[1] - x[0], capacity); + } + }); + + // massive + let (layer_cnt, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, false); + let capacity = 200 / 2 * 200; + let cnt = 500_000 / capacity + 1; + assert_eq!(layer_cnt, cnt); + // distances between index values should be the same since we aren't growing. + assert_eq!(layer_indices[1], 200); + layer_indices[1..layer_indices.len()] + .chunks(2) + .for_each(|x| { + if x.len() == 2 { + assert_eq!(x[1] - x[0], capacity); + } + }); + let total_capacity: usize = *layer_indices.last().unwrap(); + assert!(total_capacity >= 500_000); + + // massive with growth + assert_eq!(num_layers(500_000, 200, 200, true), 3); + } + + #[test] + fn test_localize() { + // go for gold + let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, false); + let mut me = 0; + let mut layer_ix = 0; + let locality = ClusterInfo::localize(&layer_indices, 200, me); + assert_eq!(locality.layer_ix, layer_ix); + assert_eq!( + locality.child_layer_bounds, + Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2])) + ); + me = 201; + layer_ix = 1; + let locality = ClusterInfo::localize(&layer_indices, 200, me); + assert_eq!( + locality.layer_ix, layer_ix, + "layer_indices[layer_ix] is actually {}", + layer_indices[layer_ix] + ); + assert_eq!( + locality.child_layer_bounds, + Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2])) + ); + me = 20_201; + layer_ix = 2; + let locality = ClusterInfo::localize(&layer_indices, 200, me); + assert_eq!( + locality.layer_ix, layer_ix, + "layer_indices[layer_ix] is actually {}", + layer_indices[layer_ix] + ); + assert_eq!( + locality.child_layer_bounds, + Some((layer_indices[layer_ix + 1], layer_indices[layer_ix + 2])) + ); + + // test no child layer since last layer should have massive capacity + let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, true); + me = 20_201; + layer_ix = 2; + let locality = ClusterInfo::localize(&layer_indices, 200, me); + assert_eq!( + locality.layer_ix, layer_ix, + "layer_indices[layer_ix] is actually {}", + layer_indices[layer_ix] + ); + assert_eq!(locality.child_layer_bounds, None); + } + + #[test] + fn test_localize_child_peer_overlap() { + let (_, layer_indices) = ClusterInfo::describe_data_plane(500_000, 200, 200, false); + let last_ix = layer_indices.len() - 1; + // sample every 33 pairs to reduce test time + for x in (0..*layer_indices.get(last_ix - 2).unwrap()).step_by(33) { + let me_locality = ClusterInfo::localize(&layer_indices, 200, x); + let buddy_locality = ClusterInfo::localize(&layer_indices, 200, x + 1); + assert!(!me_locality.child_layer_peers.is_empty()); + assert!(!buddy_locality.child_layer_peers.is_empty()); + me_locality + .child_layer_peers + .iter() + .zip(buddy_locality.child_layer_peers.iter()) + .for_each(|(x, y)| assert_ne!(x, y)); + } + } + + #[test] + fn test_network_coverage() { + // pretend to be each node in a scaled down network and make sure the set of all the broadcast peers + // includes every node in the network. + let (_, layer_indices) = ClusterInfo::describe_data_plane(25_000, 10, 10, false); + let mut broadcast_set = HashSet::new(); + for my_index in 0..25_000 { + let my_locality = ClusterInfo::localize(&layer_indices, 10, my_index); + broadcast_set.extend(my_locality.neighbor_bounds.0..my_locality.neighbor_bounds.1); + broadcast_set.extend(my_locality.child_layer_peers); + } + + for i in 0..25_000 { + assert!(broadcast_set.contains(&(i as usize))); + } + assert!(broadcast_set.contains(&(layer_indices.last().unwrap() - 1))); + //sanity check for past total capacity. + assert!(!broadcast_set.contains(&(layer_indices.last().unwrap()))); + } } diff --git a/src/contact_info.rs b/src/contact_info.rs index 6eb3da4bae..3991df82e5 100644 --- a/src/contact_info.rs +++ b/src/contact_info.rs @@ -3,10 +3,11 @@ use bincode::serialize; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::timestamp; +use std::cmp::{Ord, Ordering, PartialEq, PartialOrd}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; /// Structure representing a node on the network -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct ContactInfo { pub id: Pubkey, /// signature of this ContactInfo @@ -27,6 +28,26 @@ pub struct ContactInfo { pub wallclock: u64, } +impl Ord for ContactInfo { + fn cmp(&self, other: &Self) -> Ordering { + self.id.cmp(&other.id) + } +} + +impl PartialOrd for ContactInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for ContactInfo { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for ContactInfo {} + #[macro_export] macro_rules! socketaddr { ($ip:expr, $port:expr) => { diff --git a/src/fullnode.rs b/src/fullnode.rs index e7eadd0fab..040ed63966 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -303,6 +303,7 @@ impl Fullnode { let (broadcast_service, _) = BroadcastService::new( db_ledger.clone(), + bank.clone(), node.sockets .broadcast .try_clone() @@ -476,6 +477,7 @@ impl Fullnode { let (broadcast_service, _) = BroadcastService::new( self.db_ledger.clone(), + self.bank.clone(), self.broadcast_socket .try_clone() .expect("Failed to clone broadcast socket"), diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index be58faf25f..161aaf4a2e 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -1,10 +1,10 @@ //! The `retransmit_stage` retransmits blobs between validators -use crate::cluster_info::ClusterInfo; +use crate::bank::Bank; +use crate::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE}; use crate::counter::Counter; use crate::db_ledger::DbLedger; use crate::entry::Entry; - use crate::leader_scheduler::LeaderScheduler; use crate::result::{Error, Result}; use crate::service::Service; @@ -21,6 +21,7 @@ use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; fn retransmit( + bank: &Arc, cluster_info: &Arc>, r: &BlobReceiver, sock: &UdpSocket, @@ -37,13 +38,53 @@ fn retransmit( .to_owned(), ); - for b in &mut dq { - ClusterInfo::retransmit(&cluster_info, b, sock)?; + // TODO layer 2 logic here + // 1 - find out if I am in layer 1 first + // 1.1 - If yes, then broadcast to all layer 1 nodes + // 1 - using my layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size + // 1.2 - If no, then figure out what layer I am in and who my neighbors are and only broadcast to them + // 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic + let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank); + let my_id = cluster_info.read().unwrap().id(); + //calc num_layers and num_neighborhoods using the total number of nodes + let (num_layers, layer_indices) = ClusterInfo::describe_data_plane( + peers.len(), + DATA_PLANE_FANOUT, + NEIGHBORHOOD_SIZE, + GROW_LAYER_CAPACITY, + ); + if num_layers <= 1 { + /* single layer data plane */ + for b in &mut dq { + ClusterInfo::retransmit(&cluster_info, b, sock)?; + } + } else { + //find my index (my ix is the same as the first node with smaller stake) + let my_index = peers + .iter() + .position(|ci| bank.get_stake(&ci.id) <= bank.get_stake(&my_id)); + //find my layer + let locality = ClusterInfo::localize( + &layer_indices, + NEIGHBORHOOD_SIZE, + my_index.unwrap_or(peers.len() - 1), + ); + let mut retransmit_peers = + peers[locality.neighbor_bounds.0..locality.neighbor_bounds.1].to_vec(); + locality.child_layer_peers.iter().for_each(|&ix| { + if let Some(peer) = peers.get(ix) { + retransmit_peers.push(peer.clone()); + } + }); + + for b in &mut dq { + ClusterInfo::retransmit_to(&cluster_info, &retransmit_peers, b, sock)?; + } } Ok(()) } -/// Service to retransmit messages from the leader to layer 1 nodes. +/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes. /// See `cluster_info` for network layer definitions. /// # Arguments /// * `sock` - Socket to read from. Read timeout is set to 1. @@ -53,6 +94,7 @@ fn retransmit( /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. fn retransmitter( sock: Arc, + bank: Arc, cluster_info: Arc>, r: BlobReceiver, ) -> JoinHandle<()> { @@ -61,7 +103,7 @@ fn retransmitter( .spawn(move || { trace!("retransmitter started"); loop { - if let Err(e) = retransmit(&cluster_info, &r, &sock) { + if let Err(e) = retransmit(&bank, &cluster_info, &r, &sock) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -83,6 +125,7 @@ pub struct RetransmitStage { impl RetransmitStage { #[allow(clippy::new_ret_no_self)] pub fn new( + bank: &Arc, db_ledger: Arc, cluster_info: &Arc>, tick_height: u64, @@ -94,8 +137,12 @@ impl RetransmitStage { ) -> (Self, Receiver>) { let (retransmit_sender, retransmit_receiver) = channel(); - let t_retransmit = - retransmitter(retransmit_socket, cluster_info.clone(), retransmit_receiver); + let t_retransmit = retransmitter( + retransmit_socket, + bank.clone(), + cluster_info.clone(), + retransmit_receiver, + ); let (entry_sender, entry_receiver) = channel(); let done = Arc::new(AtomicBool::new(false)); let t_window = window_service( diff --git a/src/tvu.rs b/src/tvu.rs index 2391e105e4..47eec5e7cf 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -96,6 +96,7 @@ impl Tvu { //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction let (retransmit_stage, blob_window_receiver) = RetransmitStage::new( + bank, db_ledger, &cluster_info, bank.tick_height(),