From 6335be803c61eaab136b1fa756610d90e453ea9e Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 13 Nov 2018 02:21:37 -0800 Subject: [PATCH] Broadcast last tick before leader rotation (#1766) * Broadcast last tick before leader rotation to everybody on network * Add test * Refactor broadcast --- src/broadcast_stage.rs | 21 +++++ src/cluster_info.rs | 205 ++++++++++++++++++++++++++--------------- src/fullnode.rs | 4 + tests/multinode.rs | 150 +++++++++++++++++++++++++++++- 4 files changed, 306 insertions(+), 74 deletions(-) diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 45853ef66f..a5e4b01c08 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -28,7 +28,10 @@ pub enum BroadcastStageReturnType { ChannelDisconnected, } +#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn broadcast( + max_tick_height: Option, + tick_height: &mut u64, node_info: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, @@ -52,6 +55,14 @@ fn broadcast( inc_new_counter_info!("broadcast_stage-entries_received", num_entries); let to_blobs_start = Instant::now(); + let num_ticks: u64 = ventries + .iter() + .flatten() + .map(|entry| (entry.is_tick()) as u64) + .sum(); + + *tick_height += num_ticks; + let dq: SharedBlobs = ventries .into_par_iter() .flat_map(|p| p.to_blobs()) @@ -128,6 +139,7 @@ fn broadcast( // Send blobs out from the window ClusterInfo::broadcast( + Some(*tick_height) == max_tick_height, &node_info, &broadcast_table, &window, @@ -188,6 +200,8 @@ impl BroadcastStage { entry_height: u64, leader_slot: u64, receiver: &Receiver>, + max_tick_height: Option, + tick_height: u64, ) -> BroadcastStageReturnType { let mut transmit_index = WindowIndex { data: entry_height, @@ -195,9 +209,12 @@ impl BroadcastStage { }; let mut receive_index = entry_height; let me = cluster_info.read().unwrap().my_data().clone(); + let mut tick_height_ = tick_height; loop { let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table(); if let Err(e) = broadcast( + max_tick_height, + &mut tick_height_, &me, &broadcast_table, &window, @@ -244,6 +261,8 @@ impl BroadcastStage { entry_height: u64, leader_slot: u64, receiver: Receiver>, + max_tick_height: Option, + tick_height: u64, exit_sender: Arc, ) -> Self { let thread_hdl = Builder::new() @@ -257,6 +276,8 @@ impl BroadcastStage { entry_height, leader_slot, &receiver, + max_tick_height, + tick_height, ) }).unwrap(); diff --git a/src/cluster_info.rs b/src/cluster_info.rs index cbdd85cad9..706732005b 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -28,6 +28,7 @@ use signature::{Keypair, KeypairUtil}; use solana_sdk::pubkey::Pubkey; use std; use std::collections::HashMap; +use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -461,6 +462,7 @@ impl ClusterInfo { /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( + contains_last_tick: bool, me: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, @@ -483,81 +485,18 @@ impl ClusterInfo { let old_transmit_index = transmit_index.data; - // enumerate all the blobs in the window, those are the indices - // transmit them to nodes, starting from a different node. Add one - // to the capacity in case we want to send an extra blob notifying the - // next leader about the blob right before leader rotation - let mut orders = Vec::with_capacity((received_index - transmit_index.data + 1) as usize); - let window_l = window.read().unwrap(); - - let mut br_idx = transmit_index.data as usize % broadcast_table.len(); - - for idx in transmit_index.data..received_index { - let w_idx = idx as usize % window_l.len(); - - trace!( - "{} broadcast order data w_idx {} br_idx {}", - me.id, - w_idx, - br_idx - ); - - orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx])); - br_idx += 1; - br_idx %= broadcast_table.len(); - } - - for idx in transmit_index.coding..received_index { - let w_idx = idx as usize % window_l.len(); - - // skip over empty slots - if window_l[w_idx].coding.is_none() { - continue; - } - - trace!( - "{} broadcast order coding w_idx: {} br_idx :{}", - me.id, - w_idx, - br_idx, - ); - - orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx])); - br_idx += 1; - br_idx %= broadcast_table.len(); - } - + let orders = Self::create_broadcast_orders( + contains_last_tick, + window, + broadcast_table, + transmit_index, + received_index, + me, + ); trace!("broadcast orders table {}", orders.len()); - let errs: Vec<_> = orders - .into_iter() - .map(|(b, v)| { - // only leader should be broadcasting - assert!(me.leader_id != v.id); - let bl = b.unwrap(); - let blob = bl.read().unwrap(); - //TODO profile this, may need multiple sockets for par_iter - trace!( - "{}: BROADCAST idx: {} sz: {} to {},{} coding: {}", - me.id, - blob.index().unwrap(), - blob.meta.size, - v.id, - v.contact_info.tvu, - blob.is_coding() - ); - assert!(blob.meta.size <= BLOB_SIZE); - let e = s.send_to(&blob.data[..blob.meta.size], &v.contact_info.tvu); - trace!( - "{}: done broadcast {} to {} {}", - me.id, - blob.meta.size, - v.id, - v.contact_info.tvu - ); - e - }).collect(); - trace!("broadcast results {}", errs.len()); + let errs = Self::send_orders(s, orders, me); + for e in errs { if let Err(e) = &e { trace!("broadcast result {:?}", e); @@ -641,6 +580,126 @@ impl ClusterInfo { self.remote.values().fold(max, |a, b| std::cmp::min(a, *b)) } + fn send_orders( + s: &UdpSocket, + orders: Vec<(Option, Vec<&NodeInfo>)>, + me: &NodeInfo, + ) -> Vec> { + orders + .into_iter() + .flat_map(|(b, vs)| { + // only leader should be broadcasting + assert!(vs.iter().find(|info| info.id == me.leader_id).is_none()); + let bl = b.unwrap(); + let blob = bl.read().unwrap(); + //TODO profile this, may need multiple sockets for par_iter + let ids_and_tvus = if log_enabled!(Level::Trace) { + let v_ids = vs.iter().map(|v| v.id); + let tvus = vs.iter().map(|v| v.contact_info.tvu); + let ids_and_tvus = v_ids.zip(tvus).collect(); + + trace!( + "{}: BROADCAST idx: {} sz: {} to {:?} coding: {}", + me.id, + blob.index().unwrap(), + blob.meta.size, + ids_and_tvus, + blob.is_coding() + ); + + ids_and_tvus + } else { + vec![] + }; + + assert!(blob.meta.size <= BLOB_SIZE); + let send_errs_for_blob: Vec<_> = vs + .iter() + .map(move |v| { + let e = s.send_to(&blob.data[..blob.meta.size], &v.contact_info.tvu); + trace!( + "{}: done broadcast {} to {:?}", + me.id, + blob.meta.size, + ids_and_tvus + ); + e + }).collect(); + send_errs_for_blob + }).collect() + } + + fn create_broadcast_orders<'a>( + contains_last_tick: bool, + window: &SharedWindow, + broadcast_table: &'a [NodeInfo], + transmit_index: &mut WindowIndex, + received_index: u64, + me: &NodeInfo, + ) -> Vec<(Option, Vec<&'a NodeInfo>)> { + // enumerate all the blobs in the window, those are the indices + // transmit them to nodes, starting from a different node. + let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize); + let window_l = window.read().unwrap(); + let mut br_idx = transmit_index.data as usize % broadcast_table.len(); + + for idx in transmit_index.data..received_index { + let w_idx = idx as usize % window_l.len(); + + trace!( + "{} broadcast order data w_idx {} br_idx {}", + me.id, + w_idx, + br_idx + ); + + // Broadcast the last tick to everyone on the network so it doesn't get dropped + // (Need to maximize probability the next leader in line sees this handoff tick + // despite packet drops) + let target = if idx == received_index - 1 && contains_last_tick { + // If we see a tick at max_tick_height, then we know it must be the last + // Blob in the window, at index == received_index. There cannot be an entry + // that got sent after the last tick, guaranteed by the PohService). + assert!(window_l[w_idx].data.is_some()); + ( + window_l[w_idx].data.clone(), + broadcast_table.iter().collect(), + ) + } else { + (window_l[w_idx].data.clone(), vec![&broadcast_table[br_idx]]) + }; + + orders.push(target); + br_idx += 1; + br_idx %= broadcast_table.len(); + } + + for idx in transmit_index.coding..received_index { + let w_idx = idx as usize % window_l.len(); + + // skip over empty slots + if window_l[w_idx].coding.is_none() { + continue; + } + + trace!( + "{} broadcast order coding w_idx: {} br_idx :{}", + me.id, + w_idx, + br_idx, + ); + + orders.push(( + window_l[w_idx].coding.clone(), + vec![&broadcast_table[br_idx]], + )); + br_idx += 1; + br_idx %= broadcast_table.len(); + } + + orders + } + // TODO: fill in with real implmentation once staking is implemented fn get_stake(_id: Pubkey) -> f64 { 1.0 diff --git a/src/fullnode.rs b/src/fullnode.rs index e1885bb39b..88e8662d00 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -298,6 +298,8 @@ impl Fullnode { entry_height, leader_slot, entry_receiver, + max_tick_height, + bank.tick_height(), tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); @@ -449,6 +451,8 @@ impl Fullnode { entry_height, 0, // TODO: get real leader slot from leader_scheduler blob_receiver, + max_tick_height, + tick_height, tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); diff --git a/tests/multinode.rs b/tests/multinode.rs index 4112aeaa57..acc21b0b4f 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -6,17 +6,21 @@ extern crate serde_json; extern crate solana; extern crate solana_sdk; +use solana::blob_fetch_stage::BlobFetchStage; use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::entry::Entry; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::hash::Hash; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; use solana::ledger::{ - create_tmp_genesis, create_tmp_sample_ledger, get_tmp_ledger_path, read_ledger, LedgerWriter, + create_tmp_genesis, create_tmp_sample_ledger, get_tmp_ledger_path, read_ledger, + reconstruct_entries_from_blobs, LedgerWindow, LedgerWriter, }; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; +use solana::packet::SharedBlob; +use solana::poh_service::NUM_TICKS_PER_SECOND; use solana::result; use solana::service::Service; use solana::signature::{Keypair, KeypairUtil}; @@ -58,6 +62,31 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { (ncp, spy_cluster_info_ref, me) } +fn make_listening_node(leader: &NodeInfo) -> (Ncp, Arc>, Node, Pubkey) { + let exit = Arc::new(AtomicBool::new(false)); + let new_node = Node::new_localhost(); + let new_node_info = new_node.info.clone(); + let me = new_node.info.id.clone(); + let mut new_node_cluster_info = ClusterInfo::new(new_node_info).expect("ClusterInfo::new"); + new_node_cluster_info.insert(&leader); + new_node_cluster_info.set_leader(leader.id); + let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info)); + let new_node_window = Arc::new(RwLock::new(default_window())); + let ncp = Ncp::new( + &new_node_cluster_info_ref, + new_node_window, + None, + new_node + .sockets + .gossip + .try_clone() + .expect("Failed to clone gossip"), + exit.clone(), + ); + + (ncp, new_node_cluster_info_ref, new_node, me) +} + fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { //lets spy on the network let (ncp, spy_ref, _) = make_spy_node(leader); @@ -1470,6 +1499,125 @@ fn test_full_leader_validator_network() { } } +#[test] +fn test_broadcast_last_tick() { + logger::setup(); + // The number of validators + const N: usize = 5; + logger::setup(); + + // Create the bootstrap leader node information + let bootstrap_leader_keypair = Keypair::new(); + let bootstrap_leader_node = Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); + let bootstrap_leader_info = bootstrap_leader_node.info.clone(); + + // Create leader ledger + let (_, bootstrap_leader_ledger_path, genesis_entries) = create_tmp_sample_ledger( + "test_broadcast_last_tick", + 10_000, + 0, + bootstrap_leader_info.id, + 500, + ); + + let num_ending_ticks = genesis_entries + .iter() + .skip(2) + .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); + + let genesis_ledger_len = genesis_entries.len() as u64 - num_ending_ticks; + let blob_receiver_exit = Arc::new(AtomicBool::new(false)); + + // Create the listeners + let mut listening_nodes: Vec<_> = (0..N) + .map(|_| make_listening_node(&bootstrap_leader_info)) + .collect(); + + let blob_fetch_stages: Vec<_> = listening_nodes + .iter_mut() + .map(|(_, _, node, _)| { + BlobFetchStage::new( + Arc::new(node.sockets.replicate.pop().unwrap()), + blob_receiver_exit.clone(), + ) + }).collect(); + + // Create fullnode, should take 20 seconds to reach end of bootstrap period + let bootstrap_height = (NUM_TICKS_PER_SECOND * 20) as u64; + let leader_rotation_interval = 100; + let seed_rotation_interval = 200; + let leader_scheduler_config = LeaderSchedulerConfig::new( + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(seed_rotation_interval), + Some(leader_rotation_interval), + ); + + // Start up the bootstrap leader fullnode + let mut bootstrap_leader = Fullnode::new( + bootstrap_leader_node, + &bootstrap_leader_ledger_path, + Arc::new(bootstrap_leader_keypair), + Arc::new(Keypair::new()), + Some(bootstrap_leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + None, + ); + + // Wait for convergence + let servers = converge(&bootstrap_leader_info, N + 1); + assert_eq!(servers.len(), N + 1); + + // Wait for leader rotation + match bootstrap_leader.handle_role_transition().unwrap() { + Some(FullnodeReturnType::LeaderToValidatorRotation) => (), + _ => panic!("Expected reason for exit to be leader rotation"), + } + + // Shut down the leader + bootstrap_leader.close().unwrap(); + + let last_tick_entry_height = genesis_ledger_len as u64 + bootstrap_height; + let mut ledger_window = LedgerWindow::open(&bootstrap_leader_ledger_path) + .expect("Expected to be able to open ledger"); + + // get_entry() expects the index of the entry, so we have to subtract one from the actual entry height + let expected_last_tick = ledger_window + .get_entry(last_tick_entry_height - 1) + .expect("Expected last tick entry to exist"); + + // Check that the nodes got the last broadcasted blob + for (_, receiver) in blob_fetch_stages.iter() { + let mut last_tick_blob: SharedBlob = SharedBlob::default(); + while let Ok(mut new_blobs) = receiver.try_recv() { + let last_blob = new_blobs.into_iter().find(|b| { + b.read().unwrap().index().expect("Expected index in blob") + == last_tick_entry_height - 1 + }); + if let Some(last_blob) = last_blob { + last_tick_blob = last_blob; + break; + } + } + let actual_last_tick = &reconstruct_entries_from_blobs(vec![last_tick_blob]) + .expect("Expected to be able to reconstruct entries from blob")[0]; + assert_eq!(actual_last_tick, &expected_last_tick); + } + + // Shut down blob fetch stages + blob_receiver_exit.store(true, Ordering::Relaxed); + for (bf, _) in blob_fetch_stages { + bf.join().unwrap(); + } + + // Shut down the listeners + for node in listening_nodes { + node.0.close().unwrap(); + } + remove_dir_all(bootstrap_leader_ledger_path).unwrap(); +} + fn mk_client(leader: &NodeInfo) -> ThinClient { let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); assert!(ClusterInfo::is_valid_address(&leader.contact_info.tpu));