diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index 0ea40d96a2..54326b024b 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -963,7 +963,6 @@ mod tests { solana_logger::setup(); const NUM_NODES: usize = 1; - let validator_config = ValidatorConfig::default(); let mut config = Config::default(); config.identity = Keypair::new(); @@ -985,7 +984,7 @@ mod tests { let cluster = LocalCluster::new(&ClusterConfig { node_stakes: vec![100_000; NUM_NODES], cluster_lamports: 100_000_000_000_000, - validator_config, + validator_configs: vec![ValidatorConfig::default(); NUM_NODES], native_instruction_processors: [solana_exchange_program!()].to_vec(), ..ClusterConfig::default() }); diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 9a4c6698e0..3da10b8455 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -668,12 +668,11 @@ mod tests { #[test] fn test_bench_tps_local_cluster() { solana_logger::setup(); - let validator_config = ValidatorConfig::default(); const NUM_NODES: usize = 1; let cluster = LocalCluster::new(&ClusterConfig { node_stakes: vec![999_990; NUM_NODES], cluster_lamports: 2_000_000, - validator_config, + validator_configs: vec![ValidatorConfig::default(); NUM_NODES], ..ClusterConfig::default() }); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 42eb9b70e3..d27e0e6517 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,28 +1,28 @@ //! A stage to broadcast data from a leader node to validators -//! +use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun; +use self::standard_broadcast_run::StandardBroadcastRun; use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError}; -use crate::entry::EntrySlice; use crate::erasure::CodingGenerator; -use crate::packet::index_blobs; use crate::poh_recorder::WorkingBankEntries; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; -use rayon::prelude::*; use rayon::ThreadPool; use solana_metrics::{ datapoint, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info, }; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::Signable; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; -use std::time::{Duration, Instant}; +use std::time::Instant; + +mod broadcast_utils; +mod fail_entry_verification_broadcast_run; +mod standard_broadcast_run; pub const NUM_THREADS: u32 = 10; @@ -31,170 +31,57 @@ pub enum BroadcastStageReturnType { ChannelDisconnected, } -#[derive(Default)] -struct BroadcastStats { - num_entries: Vec, - run_elapsed: Vec, - to_blobs_elapsed: Vec, +#[derive(PartialEq, Clone, Debug)] +pub enum BroadcastStageType { + Standard, + FailEntryVerification, } -struct Broadcast { - id: Pubkey, - coding_generator: CodingGenerator, - stats: BroadcastStats, - thread_pool: ThreadPool, +impl BroadcastStageType { + pub fn new_broadcast_stage( + &self, + sock: UdpSocket, + cluster_info: Arc>, + receiver: Receiver, + exit_sender: &Arc, + blocktree: &Arc, + ) -> BroadcastStage { + match self { + BroadcastStageType::Standard => BroadcastStage::new( + sock, + cluster_info, + receiver, + exit_sender, + blocktree, + StandardBroadcastRun::new(), + ), + + BroadcastStageType::FailEntryVerification => BroadcastStage::new( + sock, + cluster_info, + receiver, + exit_sender, + blocktree, + FailEntryVerificationBroadcastRun::new(), + ), + } + } } -impl Broadcast { +trait BroadcastRun { fn run( &mut self, + broadcast: &mut Broadcast, cluster_info: &Arc>, receiver: &Receiver, sock: &UdpSocket, blocktree: &Arc, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let (mut bank, entries) = receiver.recv_timeout(timer)?; - let mut max_tick_height = bank.max_tick_height(); + ) -> Result<()>; +} - let run_start = Instant::now(); - let mut num_entries = entries.len(); - let mut ventries = Vec::new(); - let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0); - ventries.push(entries); - - assert!(last_tick <= max_tick_height); - if last_tick != max_tick_height { - while let Ok((same_bank, entries)) = receiver.try_recv() { - // If the bank changed, that implies the previous slot was interrupted and we do not have to - // broadcast its entries. - if same_bank.slot() != bank.slot() { - num_entries = 0; - ventries.clear(); - bank = same_bank.clone(); - max_tick_height = bank.max_tick_height(); - } - num_entries += entries.len(); - last_tick = entries.last().map(|v| v.1).unwrap_or(0); - ventries.push(entries); - assert!(last_tick <= max_tick_height,); - if last_tick == max_tick_height { - break; - } - } - } - - inc_new_counter_info!("broadcast_service-entries_received", num_entries); - - let to_blobs_start = Instant::now(); - - let blobs: Vec<_> = self.thread_pool.install(|| { - ventries - .into_par_iter() - .map(|p| { - let entries: Vec<_> = p.into_iter().map(|e| e.0).collect(); - entries.to_shared_blobs() - }) - .flatten() - .collect() - }); - - let blob_index = blocktree - .meta(bank.slot()) - .expect("Database error") - .map(|meta| meta.consumed) - .unwrap_or(0); - - index_blobs( - &blobs, - &self.id, - blob_index, - bank.slot(), - bank.parent().map_or(0, |parent| parent.slot()), - ); - - if last_tick == max_tick_height { - blobs.last().unwrap().write().unwrap().set_is_last_in_slot(); - } - - // Make sure not to modify the blob header or data after signing it here - self.thread_pool.install(|| { - blobs.par_iter().for_each(|b| { - b.write() - .unwrap() - .sign(&cluster_info.read().unwrap().keypair); - }) - }); - - blocktree.write_shared_blobs(&blobs)?; - - let coding = self.coding_generator.next(&blobs); - - self.thread_pool.install(|| { - coding.par_iter().for_each(|c| { - c.write() - .unwrap() - .sign(&cluster_info.read().unwrap().keypair); - }) - }); - - let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); - - let broadcast_start = Instant::now(); - - let bank_epoch = bank.get_stakers_epoch(bank.slot()); - let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - - // Send out data - cluster_info - .read() - .unwrap() - .broadcast(sock, &blobs, stakes.as_ref())?; - - inc_new_counter_debug!("streamer-broadcast-sent", blobs.len()); - - // send out erasures - cluster_info - .read() - .unwrap() - .broadcast(sock, &coding, stakes.as_ref())?; - - self.update_broadcast_stats( - duration_as_ms(&broadcast_start.elapsed()), - duration_as_ms(&run_start.elapsed()), - num_entries, - to_blobs_elapsed, - blob_index, - ); - - Ok(()) - } - - fn update_broadcast_stats( - &mut self, - broadcast_elapsed: u64, - run_elapsed: u64, - num_entries: usize, - to_blobs_elapsed: u64, - blob_index: u64, - ) { - inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize); - - self.stats.num_entries.push(num_entries); - self.stats.to_blobs_elapsed.push(to_blobs_elapsed); - self.stats.run_elapsed.push(run_elapsed); - if self.stats.num_entries.len() >= 16 { - info!( - "broadcast: entries: {:?} blob times ms: {:?} broadcast times ms: {:?}", - self.stats.num_entries, self.stats.to_blobs_elapsed, self.stats.run_elapsed - ); - self.stats.num_entries.clear(); - self.stats.to_blobs_elapsed.clear(); - self.stats.run_elapsed.clear(); - } - - datapoint!("broadcast-service", ("transmit-index", blob_index, i64)); - } +struct Broadcast { + coding_generator: CodingGenerator, + thread_pool: ThreadPool, } // Implement a destructor for the BroadcastStage thread to signal it exited @@ -226,14 +113,12 @@ impl BroadcastStage { cluster_info: &Arc>, receiver: &Receiver, blocktree: &Arc, + mut broadcast_stage_run: impl BroadcastRun, ) -> BroadcastStageReturnType { - let me = cluster_info.read().unwrap().my_data().clone(); let coding_generator = CodingGenerator::default(); let mut broadcast = Broadcast { - id: me.id, coding_generator, - stats: BroadcastStats::default(), thread_pool: rayon::ThreadPoolBuilder::new() .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) .build() @@ -241,7 +126,9 @@ impl BroadcastStage { }; loop { - if let Err(e) = broadcast.run(&cluster_info, receiver, sock, blocktree) { + if let Err(e) = + broadcast_stage_run.run(&mut broadcast, &cluster_info, receiver, sock, blocktree) + { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { return BroadcastStageReturnType::ChannelDisconnected; @@ -273,12 +160,13 @@ impl BroadcastStage { /// which will then close FetchStage in the Tpu, and then the rest of the Tpu, /// completing the cycle. #[allow(clippy::too_many_arguments)] - pub fn new( + fn new( sock: UdpSocket, cluster_info: Arc>, receiver: Receiver, exit_sender: &Arc, blocktree: &Arc, + broadcast_stage_run: impl BroadcastRun + Send + 'static, ) -> Self { let blocktree = blocktree.clone(); let exit_sender = exit_sender.clone(); @@ -286,7 +174,13 @@ impl BroadcastStage { .name("solana-broadcaster".to_string()) .spawn(move || { let _finalizer = Finalizer::new(exit_sender); - Self::run(&sock, &cluster_info, &receiver, &blocktree) + Self::run( + &sock, + &cluster_info, + &receiver, + &blocktree, + broadcast_stage_run, + ) }) .unwrap(); @@ -312,6 +206,7 @@ mod test { use crate::service::Service; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; + use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; @@ -357,6 +252,7 @@ mod test { entry_receiver, &exit_sender, &blocktree, + StandardBroadcastRun::new(), ); MockBroadcastStage { diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs new file mode 100644 index 0000000000..efad43196d --- /dev/null +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -0,0 +1,156 @@ +use crate::entry::Entry; +use crate::entry::EntrySlice; +use crate::erasure::CodingGenerator; +use crate::packet::{self, SharedBlob}; +use crate::poh_recorder::WorkingBankEntries; +use crate::result::Result; +use rayon::prelude::*; +use rayon::ThreadPool; +use solana_runtime::bank::Bank; +use solana_sdk::signature::{Keypair, KeypairUtil, Signable}; +use std::sync::mpsc::Receiver; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +pub(super) struct ReceiveResults { + pub ventries: Vec>, + pub num_entries: usize, + pub time_elapsed: Duration, + pub bank: Arc, + pub last_tick: u64, +} + +impl ReceiveResults { + pub fn new( + ventries: Vec>, + num_entries: usize, + time_elapsed: Duration, + bank: Arc, + last_tick: u64, + ) -> Self { + Self { + ventries, + num_entries, + time_elapsed, + bank, + last_tick, + } + } +} + +pub(super) fn recv_slot_blobs(receiver: &Receiver) -> Result { + let timer = Duration::new(1, 0); + let (mut bank, entries) = receiver.recv_timeout(timer)?; + let recv_start = Instant::now(); + let mut max_tick_height = bank.max_tick_height(); + let mut num_entries = entries.len(); + let mut ventries = Vec::new(); + let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0); + ventries.push(entries); + + assert!(last_tick <= max_tick_height); + if last_tick != max_tick_height { + while let Ok((same_bank, entries)) = receiver.try_recv() { + // If the bank changed, that implies the previous slot was interrupted and we do not have to + // broadcast its entries. + if same_bank.slot() != bank.slot() { + num_entries = 0; + ventries.clear(); + bank = same_bank.clone(); + max_tick_height = bank.max_tick_height(); + } + num_entries += entries.len(); + last_tick = entries.last().map(|v| v.1).unwrap_or(0); + ventries.push(entries); + assert!(last_tick <= max_tick_height,); + if last_tick == max_tick_height { + break; + } + } + } + + let recv_end = recv_start.elapsed(); + let receive_results = ReceiveResults::new(ventries, num_entries, recv_end, bank, last_tick); + Ok(receive_results) +} + +pub(super) fn entries_to_blobs( + ventries: Vec>, + thread_pool: &ThreadPool, + latest_blob_index: u64, + last_tick: u64, + bank: &Bank, + keypair: &Keypair, + coding_generator: &mut CodingGenerator, +) -> (Vec, Vec) { + let blobs = generate_data_blobs( + ventries, + thread_pool, + latest_blob_index, + last_tick, + &bank, + &keypair, + ); + + let coding = generate_coding_blobs(&blobs, &thread_pool, coding_generator, &keypair); + + (blobs, coding) +} + +pub(super) fn generate_data_blobs( + ventries: Vec>, + thread_pool: &ThreadPool, + latest_blob_index: u64, + last_tick: u64, + bank: &Bank, + keypair: &Keypair, +) -> Vec { + let blobs: Vec = thread_pool.install(|| { + ventries + .into_par_iter() + .map(|p| { + let entries: Vec<_> = p.into_iter().map(|e| e.0).collect(); + entries.to_shared_blobs() + }) + .flatten() + .collect() + }); + + packet::index_blobs( + &blobs, + &keypair.pubkey(), + latest_blob_index, + bank.slot(), + bank.parent().map_or(0, |parent| parent.slot()), + ); + + if last_tick == bank.max_tick_height() { + blobs.last().unwrap().write().unwrap().set_is_last_in_slot(); + } + + // Make sure not to modify the blob header or data after signing it here + thread_pool.install(|| { + blobs.par_iter().for_each(|b| { + b.write().unwrap().sign(keypair); + }) + }); + + blobs +} + +pub(super) fn generate_coding_blobs( + blobs: &[SharedBlob], + thread_pool: &ThreadPool, + coding_generator: &mut CodingGenerator, + keypair: &Keypair, +) -> Vec { + let coding = coding_generator.next(&blobs); + + thread_pool.install(|| { + coding.par_iter().for_each(|c| { + c.write().unwrap().sign(keypair); + }) + }); + + coding +} diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs new file mode 100644 index 0000000000..2e26fed504 --- /dev/null +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -0,0 +1,70 @@ +use super::*; +use solana_sdk::hash::Hash; + +pub(super) struct FailEntryVerificationBroadcastRun {} + +impl FailEntryVerificationBroadcastRun { + pub(super) fn new() -> Self { + Self {} + } +} + +impl BroadcastRun for FailEntryVerificationBroadcastRun { + fn run( + &mut self, + broadcast: &mut Broadcast, + cluster_info: &Arc>, + receiver: &Receiver, + sock: &UdpSocket, + blocktree: &Arc, + ) -> Result<()> { + // 1) Pull entries from banking stage + let mut receive_results = broadcast_utils::recv_slot_blobs(receiver)?; + let bank = receive_results.bank.clone(); + let last_tick = receive_results.last_tick; + + // 2) Convert entries to blobs + generate coding blobs. Set a garbage PoH on the last entry + // in the slot to make verification fail on validators + if last_tick == bank.max_tick_height() { + let mut last_entry = receive_results + .ventries + .last_mut() + .unwrap() + .last_mut() + .unwrap(); + last_entry.0.hash = Hash::default(); + } + + let keypair = &cluster_info.read().unwrap().keypair.clone(); + let latest_blob_index = blocktree + .meta(bank.slot()) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0); + + let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs( + receive_results.ventries, + &broadcast.thread_pool, + latest_blob_index, + last_tick, + &bank, + &keypair, + &mut broadcast.coding_generator, + ); + + blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; + + // 3) Start broadcast step + let bank_epoch = bank.get_stakers_epoch(bank.slot()); + let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); + + // Broadcast data + erasures + cluster_info.read().unwrap().broadcast( + sock, + data_blobs.iter().chain(coding_blobs.iter()), + stakes.as_ref(), + )?; + + Ok(()) + } +} diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs new file mode 100644 index 0000000000..90a2263a92 --- /dev/null +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -0,0 +1,116 @@ +use super::broadcast_utils; +use super::*; + +#[derive(Default)] +struct BroadcastStats { + num_entries: Vec, + run_elapsed: Vec, + to_blobs_elapsed: Vec, +} + +pub(super) struct StandardBroadcastRun { + stats: BroadcastStats, +} + +impl StandardBroadcastRun { + pub(super) fn new() -> Self { + Self { + stats: BroadcastStats::default(), + } + } + + fn update_broadcast_stats( + &mut self, + broadcast_elapsed: u64, + run_elapsed: u64, + num_entries: usize, + to_blobs_elapsed: u64, + blob_index: u64, + ) { + inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize); + + self.stats.num_entries.push(num_entries); + self.stats.to_blobs_elapsed.push(to_blobs_elapsed); + self.stats.run_elapsed.push(run_elapsed); + if self.stats.num_entries.len() >= 16 { + info!( + "broadcast: entries: {:?} blob times ms: {:?} broadcast times ms: {:?}", + self.stats.num_entries, self.stats.to_blobs_elapsed, self.stats.run_elapsed + ); + self.stats.num_entries.clear(); + self.stats.to_blobs_elapsed.clear(); + self.stats.run_elapsed.clear(); + } + + datapoint!("broadcast-service", ("transmit-index", blob_index, i64)); + } +} + +impl BroadcastRun for StandardBroadcastRun { + fn run( + &mut self, + broadcast: &mut Broadcast, + cluster_info: &Arc>, + receiver: &Receiver, + sock: &UdpSocket, + blocktree: &Arc, + ) -> Result<()> { + // 1) Pull entries from banking stage + let receive_results = broadcast_utils::recv_slot_blobs(receiver)?; + let receive_elapsed = receive_results.time_elapsed; + let num_entries = receive_results.num_entries; + let bank = receive_results.bank.clone(); + let last_tick = receive_results.last_tick; + inc_new_counter_info!("broadcast_service-entries_received", num_entries); + + // 2) Convert entries to blobs + generate coding blobs + let to_blobs_start = Instant::now(); + let keypair = &cluster_info.read().unwrap().keypair.clone(); + let latest_blob_index = blocktree + .meta(bank.slot()) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0); + + let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs( + receive_results.ventries, + &broadcast.thread_pool, + latest_blob_index, + last_tick, + &bank, + &keypair, + &mut broadcast.coding_generator, + ); + + blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; + let to_blobs_elapsed = to_blobs_start.elapsed(); + + // 3) Start broadcast step + let broadcast_start = Instant::now(); + let bank_epoch = bank.get_stakers_epoch(bank.slot()); + let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); + + // Broadcast data + erasures + cluster_info.read().unwrap().broadcast( + sock, + data_blobs.iter().chain(coding_blobs.iter()), + stakes.as_ref(), + )?; + + inc_new_counter_debug!( + "streamer-broadcast-sent", + data_blobs.len() + coding_blobs.len() + ); + + let broadcast_elapsed = broadcast_start.elapsed(); + self.update_broadcast_stats( + duration_as_ms(&broadcast_elapsed), + duration_as_ms(&(receive_elapsed + to_blobs_elapsed + broadcast_elapsed)), + num_entries, + duration_as_ms(&to_blobs_elapsed), + latest_blob_index, + ); + + Ok(()) + } +} diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index fc0cd63c91..a393511bac 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -44,6 +44,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::transaction::Transaction; +use std::borrow::Borrow; use std::borrow::Cow; use std::cmp::min; use std::collections::{BTreeSet, HashMap}; @@ -709,16 +710,22 @@ impl ClusterInfo { /// broadcast messages from the leader to layer 1 nodes /// # Remarks - pub fn broadcast( + pub fn broadcast( &self, s: &UdpSocket, - blobs: &[SharedBlob], + blobs: I, stakes: Option<&HashMap>, - ) -> Result<()> { + ) -> Result<()> + where + I: IntoIterator, + I::Item: Borrow, + { let mut last_err = Ok(()); let mut broadcast_table_len = 0; - blobs.iter().for_each(|b| { - let blob = b.read().unwrap(); + let mut blobs_len = 0; + blobs.into_iter().for_each(|b| { + blobs_len += 1; + let blob = b.borrow().read().unwrap(); let broadcast_table = self.sorted_tvu_peers(stakes, ChaChaRng::from_seed(blob.seed())); broadcast_table_len = cmp::max(broadcast_table_len, broadcast_table.len()); @@ -732,7 +739,7 @@ impl ClusterInfo { last_err?; - inc_new_counter_debug!("cluster_info-broadcast-max_idx", blobs.len()); + inc_new_counter_debug!("cluster_info-broadcast-max_idx", blobs_len); if broadcast_table_len != 0 { inc_new_counter_warn!("broadcast_service-num_peers", broadcast_table_len + 1); } diff --git a/core/src/cluster_tests.rs b/core/src/cluster_tests.rs index f40adec6eb..c80865540f 100644 --- a/core/src/cluster_tests.rs +++ b/core/src/cluster_tests.rs @@ -8,11 +8,13 @@ use crate::contact_info::ContactInfo; use crate::entry::{Entry, EntrySlice}; use crate::gossip_service::discover_cluster; use crate::locktower::VOTE_THRESHOLD_DEPTH; +use hashbrown::HashSet; use solana_client::thin_client::create_client; use solana_runtime::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH; use solana_sdk::client::SyncClient; use solana_sdk::hash::Hash; use solana_sdk::poh_config::PohConfig; +use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_transaction; use solana_sdk::timing::{ @@ -26,14 +28,18 @@ use std::time::Duration; const DEFAULT_SLOT_MILLIS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / DEFAULT_NUM_TICKS_PER_SECOND; /// Spend and verify from every node in the network -pub fn spend_and_verify_all_nodes( +pub fn spend_and_verify_all_nodes( entry_point_info: &ContactInfo, funding_keypair: &Keypair, nodes: usize, + ignore_nodes: HashSet, ) { let (cluster_nodes, _) = discover_cluster(&entry_point_info.gossip, nodes).unwrap(); assert!(cluster_nodes.len() >= nodes); for ingress_node in &cluster_nodes { + if ignore_nodes.contains(&ingress_node.id) { + continue; + } let random_keypair = Keypair::new(); let client = create_client(ingress_node.client_facing_addr(), FULLNODE_PORT_RANGE); let bal = client @@ -48,6 +54,9 @@ pub fn spend_and_verify_all_nodes( .retry_transfer_until_confirmed(&funding_keypair, &mut transaction, 5, confs) .unwrap(); for validator in &cluster_nodes { + if ignore_nodes.contains(&validator.id) { + continue; + } let client = create_client(validator.client_facing_addr(), FULLNODE_PORT_RANGE); client.poll_for_signature_confirmation(&sig, confs).unwrap(); } diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index 5a65e94a38..a8b9baf046 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -51,10 +51,24 @@ impl ReplicatorInfo { } } +pub struct ClusterValidatorInfo { + pub info: ValidatorInfo, + pub config: ValidatorConfig, +} + +impl ClusterValidatorInfo { + pub fn new(validator_info: ValidatorInfo, config: ValidatorConfig) -> Self { + Self { + info: validator_info, + config, + } + } +} + #[derive(Clone, Debug)] pub struct ClusterConfig { /// The fullnode config that should be applied to every node in the cluster - pub validator_config: ValidatorConfig, + pub validator_configs: Vec, /// Number of replicators in the cluster /// Note- replicators will timeout if ticks_per_slot is much larger than the default 8 pub num_replicators: usize, @@ -74,7 +88,7 @@ pub struct ClusterConfig { impl Default for ClusterConfig { fn default() -> Self { ClusterConfig { - validator_config: ValidatorConfig::default(), + validator_configs: vec![], num_replicators: 0, num_listeners: 0, node_stakes: vec![], @@ -91,11 +105,10 @@ impl Default for ClusterConfig { pub struct LocalCluster { /// Keypair with funding to participate in the network pub funding_keypair: Keypair, - pub validator_config: ValidatorConfig, /// Entry point from which the rest of the network can be discovered pub entry_point_info: ContactInfo, - pub fullnode_infos: HashMap, - pub listener_infos: HashMap, + pub fullnode_infos: HashMap, + pub listener_infos: HashMap, fullnodes: HashMap, genesis_ledger_path: String, pub genesis_block: GenesisBlock, @@ -113,12 +126,14 @@ impl LocalCluster { let config = ClusterConfig { node_stakes: stakes, cluster_lamports, + validator_configs: vec![ValidatorConfig::default(); num_nodes], ..ClusterConfig::default() }; Self::new(&config) } pub fn new(config: &ClusterConfig) -> Self { + assert_eq!(config.validator_configs.len(), config.node_stakes.len()); let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey()); @@ -161,22 +176,24 @@ impl LocalCluster { &leader_voting_keypair, &leader_storage_keypair, None, - &config.validator_config, + &config.validator_configs[0], ); let mut fullnodes = HashMap::new(); let mut fullnode_infos = HashMap::new(); fullnodes.insert(leader_pubkey, leader_server); - fullnode_infos.insert( - leader_pubkey, - ValidatorInfo { - keypair: leader_keypair, - voting_keypair: leader_voting_keypair, - storage_keypair: leader_storage_keypair, - ledger_path: leader_ledger_path, - contact_info: leader_contact_info.clone(), - }, - ); + let leader_info = ValidatorInfo { + keypair: leader_keypair, + voting_keypair: leader_voting_keypair, + storage_keypair: leader_storage_keypair, + ledger_path: leader_ledger_path, + contact_info: leader_contact_info.clone(), + }; + + let cluster_leader = + ClusterValidatorInfo::new(leader_info, config.validator_configs[0].clone()); + + fullnode_infos.insert(leader_pubkey, cluster_leader); let mut cluster = Self { funding_keypair: mint_keypair, @@ -187,17 +204,19 @@ impl LocalCluster { genesis_block, fullnode_infos, replicator_infos: HashMap::new(), - validator_config: config.validator_config.clone(), listener_infos: HashMap::new(), }; - for stake in &config.node_stakes[1..] { - cluster.add_validator(&config.validator_config, *stake); + for (stake, validator_config) in (&config.node_stakes[1..]) + .iter() + .zip((&config.validator_configs[1..]).iter()) + { + cluster.add_validator(validator_config, *stake); } let listener_config = ValidatorConfig { voting_disabled: true, - ..config.validator_config.clone() + ..config.validator_configs[0].clone() }; (0..config.num_listeners).for_each(|_| cluster.add_validator(&listener_config, 0)); @@ -294,28 +313,22 @@ impl LocalCluster { self.fullnodes .insert(validator_keypair.pubkey(), validator_server); + let validator_pubkey = validator_keypair.pubkey(); + let validator_info = ClusterValidatorInfo::new( + ValidatorInfo { + keypair: validator_keypair, + voting_keypair, + storage_keypair, + ledger_path, + contact_info, + }, + validator_config.clone(), + ); + if validator_config.voting_disabled { - self.listener_infos.insert( - validator_keypair.pubkey(), - ValidatorInfo { - keypair: validator_keypair, - voting_keypair, - storage_keypair, - ledger_path, - contact_info, - }, - ); + self.listener_infos.insert(validator_pubkey, validator_info); } else { - self.fullnode_infos.insert( - validator_keypair.pubkey(), - ValidatorInfo { - keypair: validator_keypair, - voting_keypair, - storage_keypair, - ledger_path, - contact_info, - }, - ); + self.fullnode_infos.insert(validator_pubkey, validator_info); } } @@ -362,7 +375,7 @@ impl LocalCluster { for ledger_path in self .fullnode_infos .values() - .map(|f| &f.ledger_path) + .map(|f| &f.info.ledger_path) .chain(self.replicator_infos.values().map(|info| &info.ledger_path)) { remove_dir_all(&ledger_path) @@ -519,9 +532,12 @@ impl Cluster for LocalCluster { } fn get_validator_client(&self, pubkey: &Pubkey) -> Option { - self.fullnode_infos - .get(pubkey) - .map(|f| create_client(f.contact_info.client_facing_addr(), FULLNODE_PORT_RANGE)) + self.fullnode_infos.get(pubkey).map(|f| { + create_client( + f.info.contact_info.client_facing_addr(), + FULLNODE_PORT_RANGE, + ) + }) } fn restart_node(&mut self, pubkey: Pubkey) { @@ -531,7 +547,8 @@ impl Cluster for LocalCluster { node.join().unwrap(); // Restart the node - let fullnode_info = &self.fullnode_infos[&pubkey]; + let fullnode_info = &self.fullnode_infos[&pubkey].info; + let config = &self.fullnode_infos[&pubkey].config; let node = Node::new_localhost_with_pubkey(&fullnode_info.keypair.pubkey()); if pubkey == self.entry_point_info.id { self.entry_point_info = node.info.clone(); @@ -544,7 +561,7 @@ impl Cluster for LocalCluster { &fullnode_info.voting_keypair, &fullnode_info.storage_keypair, None, - &self.validator_config, + config, ); self.fullnodes.insert(pubkey, restarted_node); @@ -581,7 +598,7 @@ mod test { const NUM_NODES: usize = 1; let num_replicators = 1; let config = ClusterConfig { - validator_config, + validator_configs: vec![ValidatorConfig::default(); NUM_NODES], num_replicators, node_stakes: vec![3; NUM_NODES], cluster_lamports: 100, @@ -593,5 +610,4 @@ mod test { assert_eq!(cluster.fullnodes.len(), NUM_NODES); assert_eq!(cluster.replicators.len(), num_replicators); } - } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 7c3dc4b9dd..24d25953b6 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -3,7 +3,7 @@ use crate::banking_stage::BankingStage; use crate::blocktree::Blocktree; -use crate::broadcast_stage::BroadcastStage; +use crate::broadcast_stage::{BroadcastStage, BroadcastStageType}; use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::fetch_stage::FetchStage; @@ -37,6 +37,7 @@ impl Tpu { broadcast_socket: UdpSocket, sigverify_disabled: bool, blocktree: &Arc, + broadcast_type: &BroadcastStageType, exit: &Arc, ) -> Self { cluster_info.write().unwrap().set_leader(id); @@ -70,7 +71,7 @@ impl Tpu { verified_vote_receiver, ); - let broadcast_stage = BroadcastStage::new( + let broadcast_stage = broadcast_type.new_broadcast_stage( broadcast_socket, cluster_info.clone(), entry_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index cd110ccfc7..ac15871871 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -3,6 +3,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::blocktree_processor::{self, BankForksInfo}; +use crate::broadcast_stage::BroadcastStageType; use crate::cluster_info::{ClusterInfo, Node}; use crate::contact_info::ContactInfo; use crate::gossip_service::{discover_cluster, GossipService}; @@ -39,7 +40,9 @@ pub struct ValidatorConfig { pub account_paths: Option, pub rpc_config: JsonRpcConfig, pub snapshot_path: Option, + pub broadcast_stage_type: BroadcastStageType, } + impl Default for ValidatorConfig { fn default() -> Self { // TODO: remove this, temporary parameter to configure @@ -54,6 +57,7 @@ impl Default for ValidatorConfig { account_paths: None, rpc_config: JsonRpcConfig::default(), snapshot_path: None, + broadcast_stage_type: BroadcastStageType::Standard, } } } @@ -262,6 +266,7 @@ impl Validator { node.sockets.broadcast, config.sigverify_disabled, &blocktree, + &config.broadcast_stage_type, &exit, ); diff --git a/core/tests/local_cluster.rs b/core/tests/local_cluster.rs index 362d035ea9..330b8b0c2e 100644 --- a/core/tests/local_cluster.rs +++ b/core/tests/local_cluster.rs @@ -2,6 +2,7 @@ extern crate solana; use hashbrown::HashSet; use log::*; +use solana::broadcast_stage::BroadcastStageType; use solana::cluster::Cluster; use solana::cluster_tests; use solana::gossip_service::discover_cluster; @@ -23,6 +24,7 @@ fn test_spend_and_verify_all_nodes_1() { &local.entry_point_info, &local.funding_keypair, num_nodes, + HashSet::new(), ); } @@ -35,6 +37,7 @@ fn test_spend_and_verify_all_nodes_2() { &local.entry_point_info, &local.funding_keypair, num_nodes, + HashSet::new(), ); } @@ -47,6 +50,7 @@ fn test_spend_and_verify_all_nodes_3() { &local.entry_point_info, &local.funding_keypair, num_nodes, + HashSet::new(), ); } @@ -63,6 +67,7 @@ fn test_spend_and_verify_all_nodes_env_num_nodes() { &local.entry_point_info, &local.funding_keypair, num_nodes, + HashSet::new(), ); } @@ -83,8 +88,8 @@ fn test_fullnode_exit_2() { validator_config.rpc_config.enable_fullnode_exit = true; let config = ClusterConfig { cluster_lamports: 10_000, - node_stakes: vec![100; 2], - validator_config, + node_stakes: vec![100; num_nodes], + validator_configs: vec![validator_config.clone(); num_nodes], ..ClusterConfig::default() }; let local = LocalCluster::new(&config); @@ -101,7 +106,7 @@ fn test_leader_failure_4() { let config = ClusterConfig { cluster_lamports: 10_000, node_stakes: vec![100; 4], - validator_config: validator_config.clone(), + validator_configs: vec![validator_config.clone(); num_nodes], ..ClusterConfig::default() }; let local = LocalCluster::new(&config); @@ -124,7 +129,7 @@ fn test_two_unbalanced_stakes() { let mut cluster = LocalCluster::new(&ClusterConfig { node_stakes: vec![999_990, 3], cluster_lamports: 1_000_000, - validator_config: validator_config.clone(), + validator_configs: vec![validator_config.clone(); 2], ticks_per_slot: num_ticks_per_slot, slots_per_epoch: num_slots_per_epoch, poh_config: PohConfig::new_sleep(Duration::from_millis(1000 / num_ticks_per_second)), @@ -139,7 +144,10 @@ fn test_two_unbalanced_stakes() { ); cluster.close_preserve_ledgers(); let leader_pubkey = cluster.entry_point_info.id; - let leader_ledger = cluster.fullnode_infos[&leader_pubkey].ledger_path.clone(); + let leader_ledger = cluster.fullnode_infos[&leader_pubkey] + .info + .ledger_path + .clone(); cluster_tests::verify_ledger_ticks(&leader_ledger, num_ticks_per_slot as usize); } @@ -151,6 +159,7 @@ fn test_forwarding() { let config = ClusterConfig { node_stakes: vec![999_990, 3], cluster_lamports: 2_000_000, + validator_configs: vec![ValidatorConfig::default(); 3], ..ClusterConfig::default() }; let cluster = LocalCluster::new(&config); @@ -171,13 +180,12 @@ fn test_forwarding() { #[test] fn test_restart_node() { - let validator_config = ValidatorConfig::default(); let slots_per_epoch = MINIMUM_SLOTS_PER_EPOCH as u64; let ticks_per_slot = 16; let mut cluster = LocalCluster::new(&ClusterConfig { node_stakes: vec![3], cluster_lamports: 100, - validator_config: validator_config.clone(), + validator_configs: vec![ValidatorConfig::default()], ticks_per_slot, slots_per_epoch, ..ClusterConfig::default() @@ -205,6 +213,7 @@ fn test_listener_startup() { node_stakes: vec![100; 1], cluster_lamports: 1_000, num_listeners: 3, + validator_configs: vec![ValidatorConfig::default(); 1], ..ClusterConfig::default() }; let cluster = LocalCluster::new(&config); @@ -212,6 +221,60 @@ fn test_listener_startup() { assert_eq!(cluster_nodes.len(), 4); } +#[test] +#[ignore] +fn test_fail_entry_verification_leader() { + solana_logger::setup(); + let num_nodes = 4; + let validator_config = ValidatorConfig::default(); + let mut error_validator_config = ValidatorConfig::default(); + error_validator_config.broadcast_stage_type = BroadcastStageType::FailEntryVerification; + let mut validator_configs = vec![validator_config; num_nodes - 1]; + validator_configs.push(error_validator_config); + + let cluster_config = ClusterConfig { + cluster_lamports: 10_000, + node_stakes: vec![100; 4], + validator_configs: validator_configs, + slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2 as u64, + stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2 as u64, + ..ClusterConfig::default() + }; + + let cluster = LocalCluster::new(&cluster_config); + let epoch_schedule = EpochSchedule::new( + cluster_config.slots_per_epoch, + cluster_config.stakers_slot_offset, + true, + ); + let num_warmup_epochs = epoch_schedule.get_stakers_epoch(0) + 1; + + // Wait for the corrupted leader to be scheduled afer the warmup epochs expire + cluster_tests::sleep_n_epochs( + (num_warmup_epochs + 1) as f64, + &cluster.genesis_block.poh_config, + cluster_config.ticks_per_slot, + cluster_config.slots_per_epoch, + ); + + let corrupt_node = cluster + .fullnode_infos + .iter() + .find(|(_, v)| v.config.broadcast_stage_type == BroadcastStageType::FailEntryVerification) + .unwrap() + .0; + let mut ignore = HashSet::new(); + ignore.insert(*corrupt_node); + + // Verify that we can still spend and verify even in the presence of corrupt nodes + cluster_tests::spend_and_verify_all_nodes( + &cluster.entry_point_info, + &cluster.funding_keypair, + num_nodes, + ignore, + ); +} + #[test] fn test_repairman_catchup() { run_repairman_catchup(3); @@ -223,7 +286,7 @@ fn run_repairman_catchup(num_repairmen: u64) { let num_ticks_per_slot = 40; let num_slots_per_epoch = MINIMUM_SLOTS_PER_EPOCH as u64; let num_root_buffer_slots = 10; - // Calculate the leader schedule num_root_buffer slots ahead. Otherwise, if stakers_slot_offset == + // Calculate the leader schedule num_root_buffer_slots ahead. Otherwise, if stakers_slot_offset == // num_slots_per_epoch, and num_slots_per_epoch == MINIMUM_SLOTS_PER_EPOCH, then repairmen // will stop sending repairs after the last slot in epoch 1 (0-indexed), because the root // is at most in the first epoch. @@ -256,7 +319,7 @@ fn run_repairman_catchup(num_repairmen: u64) { let mut cluster = LocalCluster::new(&ClusterConfig { node_stakes, cluster_lamports, - validator_config: validator_config.clone(), + validator_configs: vec![validator_config.clone(); num_repairmen as usize], ticks_per_slot: num_ticks_per_slot, slots_per_epoch: num_slots_per_epoch, stakers_slot_offset, diff --git a/core/tests/replicator.rs b/core/tests/replicator.rs index 9cd4dcfd71..ad5ee4d173 100644 --- a/core/tests/replicator.rs +++ b/core/tests/replicator.rs @@ -27,7 +27,7 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) { let mut validator_config = ValidatorConfig::default(); validator_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT; let config = ClusterConfig { - validator_config, + validator_configs: vec![ValidatorConfig::default(); num_nodes], num_replicators, node_stakes: vec![100; num_nodes], cluster_lamports: 10_000, @@ -149,7 +149,7 @@ fn test_account_setup() { let mut validator_config = ValidatorConfig::default(); validator_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT; let config = ClusterConfig { - validator_config, + validator_configs: vec![ValidatorConfig::default(); num_nodes], num_replicators, node_stakes: vec![100; num_nodes], cluster_lamports: 10_000,