diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index d27e0e6517..c69c6dec85 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,4 +1,5 @@ //! A stage to broadcast data from a leader node to validators +use self::broadcast_fake_blobs_run::BroadcastFakeBlobsRun; use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun; use self::standard_broadcast_run::StandardBroadcastRun; use crate::blocktree::Blocktree; @@ -20,6 +21,7 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; +mod broadcast_fake_blobs_run; mod broadcast_utils; mod fail_entry_verification_broadcast_run; mod standard_broadcast_run; @@ -35,6 +37,7 @@ pub enum BroadcastStageReturnType { pub enum BroadcastStageType { Standard, FailEntryVerification, + BroadcastFakeBlobs, } impl BroadcastStageType { @@ -64,6 +67,15 @@ impl BroadcastStageType { blocktree, FailEntryVerificationBroadcastRun::new(), ), + + BroadcastStageType::BroadcastFakeBlobs => BroadcastStage::new( + sock, + cluster_info, + receiver, + exit_sender, + blocktree, + BroadcastFakeBlobsRun::new(0), + ), } } } diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs new file mode 100644 index 0000000000..c3617b3cf9 --- /dev/null +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -0,0 +1,166 @@ +use super::*; +use crate::entry::Entry; +use solana_sdk::hash::Hash; + +pub(super) struct BroadcastFakeBlobsRun { + last_blockhash: Hash, + partition: usize, +} + +impl BroadcastFakeBlobsRun { + pub(super) fn new(partition: usize) -> Self { + Self { + last_blockhash: Hash::default(), + partition, + } + } +} + +impl BroadcastRun for BroadcastFakeBlobsRun { + fn run( + &mut self, + broadcast: &mut Broadcast, + cluster_info: &Arc>, + receiver: &Receiver, + sock: &UdpSocket, + blocktree: &Arc, + ) -> Result<()> { + // 1) Pull entries from banking stage + let receive_results = broadcast_utils::recv_slot_blobs(receiver)?; + let bank = receive_results.bank.clone(); + let last_tick = receive_results.last_tick; + + let keypair = &cluster_info.read().unwrap().keypair.clone(); + let latest_blob_index = blocktree + .meta(bank.slot()) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0); + + let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs( + receive_results.ventries, + &broadcast.thread_pool, + latest_blob_index, + last_tick, + &bank, + &keypair, + &mut broadcast.coding_generator, + ); + + // If the last blockhash is default, a new block is being created + // So grab the last blockhash from the parent bank + if self.last_blockhash == Hash::default() { + self.last_blockhash = bank.parent().unwrap().last_blockhash(); + } + + let fake_ventries: Vec<_> = (0..receive_results.num_entries) + .map(|_| vec![(Entry::new(&self.last_blockhash, 0, vec![]), 0)]) + .collect(); + + let (fake_data_blobs, fake_coding_blobs) = broadcast_utils::entries_to_blobs( + fake_ventries, + &broadcast.thread_pool, + latest_blob_index, + last_tick, + &bank, + &keypair, + &mut broadcast.coding_generator, + ); + + // If it's the last tick, reset the last block hash to default + // this will cause next run to grab last bank's blockhash + if last_tick == bank.max_tick_height() { + self.last_blockhash = Hash::default(); + } + + blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; + + // Set the forwarded flag to true, so that the blobs won't be forwarded to peers + data_blobs + .iter() + .for_each(|blob| blob.write().unwrap().set_forwarded(true)); + coding_blobs + .iter() + .for_each(|blob| blob.write().unwrap().set_forwarded(true)); + fake_data_blobs + .iter() + .for_each(|blob| blob.write().unwrap().set_forwarded(true)); + fake_coding_blobs + .iter() + .for_each(|blob| blob.write().unwrap().set_forwarded(true)); + + // 3) Start broadcast step + let peers = cluster_info.read().unwrap().tvu_peers(); + peers.iter().enumerate().for_each(|(i, peer)| { + if i <= self.partition { + // Send fake blobs to the first N peers + fake_data_blobs.iter().for_each(|b| { + let blob = b.read().unwrap(); + sock.send_to(&blob.data[..blob.meta.size], &peer.tvu) + .unwrap(); + }); + fake_coding_blobs.iter().for_each(|b| { + let blob = b.read().unwrap(); + sock.send_to(&blob.data[..blob.meta.size], &peer.tvu) + .unwrap(); + }); + } else { + data_blobs.iter().for_each(|b| { + let blob = b.read().unwrap(); + sock.send_to(&blob.data[..blob.meta.size], &peer.tvu) + .unwrap(); + }); + coding_blobs.iter().for_each(|b| { + let blob = b.read().unwrap(); + sock.send_to(&blob.data[..blob.meta.size], &peer.tvu) + .unwrap(); + }); + } + }); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::contact_info::ContactInfo; + use solana_sdk::pubkey::Pubkey; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + #[test] + fn test_tvu_peers_ordering() { + let mut cluster = ClusterInfo::new_with_invalid_keypair(ContactInfo::new_localhost( + &Pubkey::new_rand(), + 0, + )); + cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), + 8080, + ))); + cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), + 8080, + ))); + cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), + 8080, + ))); + cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(192, 168, 1, 4)), + 8080, + ))); + + let tvu_peers1 = cluster.tvu_peers(); + (0..5).for_each(|_| { + cluster + .tvu_peers() + .iter() + .zip(tvu_peers1.iter()) + .for_each(|(v1, v2)| { + assert_eq!(v1, v2); + }); + }); + } +} diff --git a/core/tests/local_cluster.rs b/core/tests/local_cluster.rs index 26ac8360b7..8e9a206f91 100644 --- a/core/tests/local_cluster.rs +++ b/core/tests/local_cluster.rs @@ -274,6 +274,60 @@ fn test_fail_entry_verification_leader() { ); } +#[test] +#[ignore] +fn test_fake_blobs_broadcast_leader() { + solana_logger::setup(); + let num_nodes = 4; + let validator_config = ValidatorConfig::default(); + let mut error_validator_config = ValidatorConfig::default(); + error_validator_config.broadcast_stage_type = BroadcastStageType::BroadcastFakeBlobs; + let mut validator_configs = vec![validator_config; num_nodes - 1]; + validator_configs.push(error_validator_config); + + let cluster_config = ClusterConfig { + cluster_lamports: 10_000, + node_stakes: vec![100; 4], + validator_configs: validator_configs, + slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2 as u64, + stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2 as u64, + ..ClusterConfig::default() + }; + + let cluster = LocalCluster::new(&cluster_config); + let epoch_schedule = EpochSchedule::new( + cluster_config.slots_per_epoch, + cluster_config.stakers_slot_offset, + true, + ); + let num_warmup_epochs = epoch_schedule.get_stakers_epoch(0) + 1; + + // Wait for the corrupted leader to be scheduled afer the warmup epochs expire + cluster_tests::sleep_n_epochs( + (num_warmup_epochs + 1) as f64, + &cluster.genesis_block.poh_config, + cluster_config.ticks_per_slot, + cluster_config.slots_per_epoch, + ); + + let corrupt_node = cluster + .fullnode_infos + .iter() + .find(|(_, v)| v.config.broadcast_stage_type == BroadcastStageType::BroadcastFakeBlobs) + .unwrap() + .0; + let mut ignore = HashSet::new(); + ignore.insert(*corrupt_node); + + // Verify that we can still spend and verify even in the presence of corrupt nodes + cluster_tests::spend_and_verify_all_nodes( + &cluster.entry_point_info, + &cluster.funding_keypair, + num_nodes, + ignore, + ); +} + #[test] fn test_repairman_catchup() { run_repairman_catchup(3);