From 597c504c2748d7e1e5c95296455e2f7a17f90649 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sat, 25 Sep 2021 19:09:49 +0000 Subject: [PATCH] generate deterministic seeds for shreds (backport #17950) (#20172) * generate deterministic seeds for shreds (#17950) * generate shred seed from leader pubkey * clippy * clippy * review * review 2 * fmt * review * check * review * cleanup * fmt (cherry picked from commit a86ced0bacc4611fba2931b82f6aaf38aa588798) # Conflicts: # core/benches/cluster_info.rs # core/src/broadcast_stage.rs # core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs # core/src/broadcast_stage/standard_broadcast_run.rs # ledger/src/shred.rs # sdk/src/feature_set.rs * removes backport merge conflicts Co-authored-by: jbiseda Co-authored-by: behzad nouri --- core/benches/cluster_info.rs | 18 +++++- core/src/broadcast_stage.rs | 27 +++++++-- .../broadcast_duplicates_run.rs | 1 + .../broadcast_fake_shreds_run.rs | 1 + .../fail_entry_verification_broadcast_run.rs | 3 + .../broadcast_stage/standard_broadcast_run.rs | 58 +++++++++++++++---- core/src/tpu.rs | 3 +- core/src/window_service.rs | 23 ++++++-- ledger/src/shred.rs | 32 +++++++++- sdk/src/feature_set.rs | 5 ++ 10 files changed, 146 insertions(+), 25 deletions(-) diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index c7a83021ef..83046369a9 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -11,14 +11,22 @@ use solana_gossip::{ cluster_info::{ClusterInfo, Node}, contact_info::ContactInfo, }; -use solana_ledger::shred::Shred; +use solana_ledger::{ + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + shred::Shred, +}; +use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::{ pubkey, signature::Keypair, timing::{timestamp, AtomicInterval}, }; use solana_streamer::socket::SocketAddrSpace; -use std::{collections::HashMap, net::UdpSocket, sync::Arc}; +use std::{ + collections::HashMap, + net::UdpSocket, + sync::{Arc, RwLock}, +}; use test::Bencher; #[bench] @@ -33,6 +41,10 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { ); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + const NUM_SHREDS: usize = 32; let shreds = vec![Shred::new_empty_data_shred(); NUM_SHREDS]; let mut stakes = HashMap::new(); @@ -56,6 +68,8 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { &last_datapoint, &mut TransmitShredsStats::default(), &SocketAddrSpace::Unspecified, + cluster_info.id(), + &bank_forks, ) .unwrap(); }); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index a57b02ccd2..212ab77fa8 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -19,7 +19,7 @@ use solana_ledger::{blockstore::Blockstore, shred::Shred}; use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use solana_poh::poh_recorder::WorkingBankEntry; -use solana_runtime::bank::Bank; +use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::timing::{timestamp, AtomicInterval}; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use solana_streamer::{sendmmsg::send_mmsg, socket::SocketAddrSpace}; @@ -28,7 +28,7 @@ use std::{ net::UdpSocket, sync::atomic::{AtomicBool, Ordering}, sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender}, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }; @@ -68,6 +68,7 @@ pub enum BroadcastStageType { } impl BroadcastStageType { + #[allow(clippy::too_many_arguments)] pub fn new_broadcast_stage( &self, sock: Vec, @@ -76,6 +77,7 @@ impl BroadcastStageType { retransmit_slots_receiver: RetransmitSlotsReceiver, exit_sender: &Arc, blockstore: &Arc, + bank_forks: &Arc>, shred_version: u16, ) -> BroadcastStage { let keypair = cluster_info.keypair.clone(); @@ -87,6 +89,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, + bank_forks, StandardBroadcastRun::new(keypair, shred_version), ), @@ -97,6 +100,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, + bank_forks, FailEntryVerificationBroadcastRun::new(keypair, shred_version), ), @@ -107,6 +111,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, + bank_forks, BroadcastFakeShredsRun::new(keypair, 0, shred_version), ), @@ -117,6 +122,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, + bank_forks, BroadcastDuplicatesRun::new(keypair, shred_version, config.clone()), ), } @@ -137,6 +143,7 @@ trait BroadcastRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, + bank_forks: &Arc>, ) -> Result<()>; fn record( &mut self, @@ -230,6 +237,7 @@ impl BroadcastStage { retransmit_slots_receiver: RetransmitSlotsReceiver, exit_sender: &Arc, blockstore: &Arc, + bank_forks: &Arc>, broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, ) -> Self { let btree = blockstore.clone(); @@ -258,10 +266,12 @@ impl BroadcastStage { let socket_receiver = socket_receiver.clone(); let mut bs_transmit = broadcast_stage_run.clone(); let cluster_info = cluster_info.clone(); + let bank_forks = bank_forks.clone(); let t = Builder::new() .name("solana-broadcaster-transmit".to_string()) .spawn(move || loop { - let res = bs_transmit.transmit(&socket_receiver, &cluster_info, &sock); + let res = + bs_transmit.transmit(&socket_receiver, &cluster_info, &sock, &bank_forks); let res = Self::handle_error(res, "solana-broadcaster-transmit"); if let Some(res) = res { return res; @@ -381,6 +391,8 @@ pub fn broadcast_shreds( last_datapoint_submit: &Arc, transmit_stats: &mut TransmitShredsStats, socket_addr_space: &SocketAddrSpace, + self_pubkey: Pubkey, + bank_forks: &Arc>, ) -> Result<()> { let broadcast_len = cluster_nodes.num_peers(); if broadcast_len == 0 { @@ -388,10 +400,12 @@ pub fn broadcast_shreds( return Ok(()); } let mut shred_select = Measure::start("shred_select"); + let root_bank = bank_forks.read().unwrap().root_bank(); let packets: Vec<_> = shreds .iter() .filter_map(|shred| { - let node = cluster_nodes.get_broadcast_peer(shred.seed())?; + let seed = shred.seed(Some(self_pubkey), &root_bank); + let node = cluster_nodes.get_broadcast_peer(seed)?; if socket_addr_space.check(&node.tvu) { Some((&shred.payload, &node.tvu)) } else { @@ -591,7 +605,9 @@ pub mod test { let exit_sender = Arc::new(AtomicBool::new(false)); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); - let bank = Arc::new(Bank::new(&genesis_config)); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = bank_forks.read().unwrap().root_bank(); let leader_keypair = cluster_info.keypair.clone(); // Start up the broadcast stage @@ -602,6 +618,7 @@ pub mod test { retransmit_slots_receiver, &exit_sender, &blockstore, + &bank_forks, StandardBroadcastRun::new(leader_keypair, 0), ); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index cbe7a3861e..68a9a4b7ce 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -288,6 +288,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, + _bank_forks: &Arc>, ) -> Result<()> { // Check the delay queue for shreds that are ready to be sent let (delayed_recipient, delayed_shreds) = { diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 6461f350ac..0c5fd1a5c7 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -105,6 +105,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, + _bank_forks: &Arc>, ) -> Result<()> { for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() { let peers = cluster_info.tvu_peers(); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 79a7637bd8..f26e4191bf 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -132,6 +132,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, + bank_forks: &Arc>, ) -> Result<()> { let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; // Broadcast data @@ -146,6 +147,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { &Arc::new(AtomicInterval::default()), &mut TransmitShredsStats::default(), cluster_info.socket_addr_space(), + cluster_info.id(), + bank_forks, )?; Ok(()) diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index ecafc69d75..c84cb81a29 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -156,17 +156,19 @@ impl StandardBroadcastRun { sock: &UdpSocket, blockstore: &Arc, receive_results: ReceiveResults, + bank_forks: &Arc>, ) -> Result<()> { let (bsend, brecv) = channel(); let (ssend, srecv) = channel(); self.process_receive_results(blockstore, &ssend, &bsend, receive_results)?; let srecv = Arc::new(Mutex::new(srecv)); let brecv = Arc::new(Mutex::new(brecv)); + //data - let _ = self.transmit(&srecv, cluster_info, sock); + let _ = self.transmit(&srecv, cluster_info, sock, bank_forks); let _ = self.record(&brecv, blockstore); //coding - let _ = self.transmit(&srecv, cluster_info, sock); + let _ = self.transmit(&srecv, cluster_info, sock, bank_forks); let _ = self.record(&brecv, blockstore); Ok(()) } @@ -338,6 +340,7 @@ impl StandardBroadcastRun { stakes: Option<&HashMap>, shreds: Arc>, broadcast_shred_batch_info: Option, + bank_forks: &Arc>, ) -> Result<()> { const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000; trace!("Broadcasting {:?} shreds", shreds.len()); @@ -358,6 +361,7 @@ impl StandardBroadcastRun { let mut transmit_stats = TransmitShredsStats::default(); // Broadcast the shreds let mut transmit_time = Measure::start("broadcast_shreds"); + broadcast_shreds( sock, &shreds, @@ -365,6 +369,8 @@ impl StandardBroadcastRun { &self.last_datapoint_submit, &mut transmit_stats, cluster_info.socket_addr_space(), + cluster_info.id(), + bank_forks, )?; drop(cluster_nodes); transmit_time.stop(); @@ -469,9 +475,17 @@ impl BroadcastRun for StandardBroadcastRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, + bank_forks: &Arc>, ) -> Result<()> { let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; - self.broadcast(sock, cluster_info, stakes.as_deref(), shreds, slot_start_ts) + self.broadcast( + sock, + cluster_info, + stakes.as_deref(), + shreds, + slot_start_ts, + bank_forks, + ) } fn record( &mut self, @@ -502,6 +516,7 @@ mod test { use std::sync::Arc; use std::time::Duration; + #[allow(clippy::type_complexity)] fn setup( num_shreds_per_slot: Slot, ) -> ( @@ -511,6 +526,7 @@ mod test { Arc, Arc, UdpSocket, + Arc>, ) { // Setup let ledger_path = get_tmp_ledger_path!(); @@ -528,7 +544,10 @@ mod test { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut genesis_config = create_genesis_config(10_000).genesis_config; genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot, None) + 1; - let bank0 = Arc::new(Bank::new(&genesis_config)); + + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank0 = bank_forks.read().unwrap().root_bank(); ( blockstore, genesis_config, @@ -536,6 +555,7 @@ mod test { bank0, leader_keypair, socket, + bank_forks, ) } @@ -578,7 +598,7 @@ mod test { fn test_slot_interrupt() { // Setup let num_shreds_per_slot = 2; - let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket) = + let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) = setup(num_shreds_per_slot); // Insert 1 less than the number of ticks needed to finish the slot @@ -593,7 +613,13 @@ mod test { // Step 1: Make an incomplete transmission for slot 0 let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair.clone(), 0); standard_broadcast_run - .test_process_receive_results(&cluster_info, &socket, &blockstore, receive_results) + .test_process_receive_results( + &cluster_info, + &socket, + &blockstore, + receive_results, + &bank_forks, + ) .unwrap(); let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); assert_eq!(unfinished_slot.next_shred_index as u64, num_shreds_per_slot); @@ -651,7 +677,13 @@ mod test { last_tick_height: (ticks1.len() - 1) as u64, }; standard_broadcast_run - .test_process_receive_results(&cluster_info, &socket, &blockstore, receive_results) + .test_process_receive_results( + &cluster_info, + &socket, + &blockstore, + receive_results, + &bank_forks, + ) .unwrap(); let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); @@ -692,7 +724,7 @@ mod test { #[test] fn test_buffer_data_shreds() { let num_shreds_per_slot = 2; - let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket) = + let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket, _bank_forks) = setup(num_shreds_per_slot); let (bsend, brecv) = channel(); let (ssend, _srecv) = channel(); @@ -737,7 +769,7 @@ mod test { fn test_slot_finish() { // Setup let num_shreds_per_slot = 2; - let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket) = + let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) = setup(num_shreds_per_slot); // Insert complete slot of ticks needed to finish the slot @@ -751,7 +783,13 @@ mod test { let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair, 0); standard_broadcast_run - .test_process_receive_results(&cluster_info, &socket, &blockstore, receive_results) + .test_process_receive_results( + &cluster_info, + &socket, + &blockstore, + receive_results, + &bank_forks, + ) .unwrap(); assert!(standard_broadcast_run.unfinished_slot.is_none()) } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index dc97a0f595..42da887e72 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -93,7 +93,7 @@ impl Tpu { verified_vote_packets_sender, poh_recorder, vote_tracker, - bank_forks, + bank_forks.clone(), subscriptions.clone(), verified_vote_sender, gossip_verified_vote_hash_sender, @@ -119,6 +119,7 @@ impl Tpu { retransmit_slots_receiver, exit, blockstore, + &bank_forks, shred_version, ); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index c34238d500..5d358942c2 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -213,6 +213,8 @@ where fn recv_window( blockstore: &Arc, + leader_schedule_cache: &Arc, + bank_forks: &Arc>, insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, my_pubkey: &Pubkey, verified_receiver: &CrossbeamReceiver>, @@ -236,6 +238,7 @@ where let now = Instant::now(); inc_new_counter_debug!("streamer-recv_window-recv", total_packets); + let root_bank = bank_forks.read().unwrap().root_bank(); let last_root = blockstore.last_root(); let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets @@ -275,8 +278,10 @@ where } }; if shred_filter(&shred, last_root) { + let leader_pubkey = leader_schedule_cache + .slot_leader_at(shred.slot(), Some(&root_bank)); packet.meta.slot = shred.slot(); - packet.meta.seed = shred.seed(); + packet.meta.seed = shred.seed(leader_pubkey, &root_bank); Some((shred, repair_info)) } else { packet.meta.discard = true; @@ -370,7 +375,7 @@ impl WindowService { let outstanding_requests: Arc> = Arc::new(RwLock::new(OutstandingRequests::default())); - let bank_forks = Some(repair_info.bank_forks.clone()); + let bank_forks = repair_info.bank_forks.clone(); let repair_service = RepairService::new( blockstore.clone(), @@ -411,7 +416,8 @@ impl WindowService { insert_sender, verified_receiver, shred_filter, - bank_forks, + leader_schedule_cache, + &bank_forks, retransmit, ); @@ -509,6 +515,7 @@ impl WindowService { .unwrap() } + #[allow(clippy::too_many_arguments)] fn start_recv_window_thread( id: Pubkey, exit: &Arc, @@ -516,7 +523,8 @@ impl WindowService { insert_sender: CrossbeamSender<(Vec, Vec>)>, verified_receiver: CrossbeamReceiver>, shred_filter: F, - bank_forks: Option>>, + leader_schedule_cache: &Arc, + bank_forks: &Arc>, retransmit: PacketSender, ) -> JoinHandle<()> where @@ -527,6 +535,9 @@ impl WindowService { { let exit = exit.clone(); let blockstore = blockstore.clone(); + let bank_forks = bank_forks.clone(); + let bank_forks_opt = Some(bank_forks.clone()); + let leader_schedule_cache = leader_schedule_cache.clone(); Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -554,6 +565,8 @@ impl WindowService { }; if let Err(e) = recv_window( &blockstore, + &leader_schedule_cache, + &bank_forks, &insert_sender, &id, &verified_receiver, @@ -562,7 +575,7 @@ impl WindowService { shred_filter( &id, shred, - bank_forks + bank_forks_opt .as_ref() .map(|bank_forks| bank_forks.read().unwrap().working_bank()), last_root, diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 8549562eba..97bccff161 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -65,15 +65,17 @@ use serde::{Deserialize, Serialize}; use solana_measure::measure::Measure; use solana_perf::packet::{limited_deserialize, Packet}; use solana_rayon_threadlimit::get_thread_count; +use solana_runtime::bank::Bank; use solana_sdk::{ clock::Slot, + feature_set, + hash::hashv, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; use std::{mem::size_of, ops::Deref, sync::Arc}; - use thiserror::Error; #[derive(Default, Clone)] @@ -467,7 +469,18 @@ impl Shred { self.common_header.signature } - pub fn seed(&self) -> [u8; 32] { + pub fn seed(&self, leader_pubkey: Option, root_bank: &Bank) -> [u8; 32] { + if let Some(leader_pubkey) = leader_pubkey { + if enable_deterministic_seed(self.slot(), root_bank) { + let h = hashv(&[ + &self.slot().to_le_bytes(), + &self.index().to_le_bytes(), + &leader_pubkey.to_bytes(), + ]); + return h.to_bytes(); + } + } + let mut seed = [0; 32]; let seed_len = seed.len(); let sig = self.common_header.signature.as_ref(); @@ -557,6 +570,21 @@ impl Shred { } } +fn enable_deterministic_seed(shred_slot: Slot, bank: &Bank) -> bool { + let feature_slot = bank + .feature_set + .activated_slot(&feature_set::deterministic_shred_seed_enabled::id()); + match feature_slot { + None => false, + Some(feature_slot) => { + let epoch_schedule = bank.epoch_schedule(); + let feature_epoch = epoch_schedule.get_epoch(feature_slot); + let shred_epoch = epoch_schedule.get_epoch(shred_slot); + feature_epoch < shred_epoch + } + } +} + #[derive(Debug)] pub struct Shredder { pub slot: Slot, diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 446a57377a..bc9dad9058 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -135,6 +135,10 @@ pub mod dedupe_config_program_signers { solana_sdk::declare_id!("8kEuAshXLsgkUEdcFVLqrjCGGHVWFW99ZZpxvAzzMtBp"); } +pub mod deterministic_shred_seed_enabled { + solana_sdk::declare_id!("FjSRMpFe7mofQ3WrEMT7Smjk2sME1XdAoRxcv55V6M44"); +} + pub mod verify_tx_signatures_len { solana_sdk::declare_id!("EVW9B5xD9FFK7vw1SBARwMA4s5eRo5eKJdKpsBikzKBz"); } @@ -246,6 +250,7 @@ lazy_static! { (system_transfer_zero_check::id(), "perform all checks for transfers of 0 lamports"), (dedupe_config_program_signers::id(), "dedupe config program signers"), (verify_tx_signatures_len::id(), "prohibit extra transaction signatures"), + (deterministic_shred_seed_enabled::id(), "deterministic shred seed"), (vote_stake_checked_instructions::id(), "vote/state program checked instructions #18345"), (updated_verify_policy::id(), "Update verify policy"), (neon_evm_compute_budget::id(), "bump neon_evm's compute budget"),