From a86ced0bacc4611fba2931b82f6aaf38aa588798 Mon Sep 17 00:00:00 2001 From: jbiseda Date: Wed, 7 Jul 2021 08:21:12 -0700 Subject: [PATCH] generate deterministic seeds for shreds (#17950) * generate shred seed from leader pubkey * clippy * clippy * review * review 2 * fmt * review * check * review * cleanup * fmt --- core/benches/cluster_info.rs | 14 ++++++- core/src/broadcast_stage.rs | 27 +++++++++++--- .../broadcast_duplicates_run.rs | 1 + .../broadcast_fake_shreds_run.rs | 1 + .../fail_entry_verification_broadcast_run.rs | 3 ++ .../broadcast_stage/standard_broadcast_run.rs | 37 +++++++++++++++---- core/src/tpu.rs | 3 +- core/src/window_service.rs | 23 +++++++++--- ledger/src/shred.rs | 32 +++++++++++++++- sdk/src/feature_set.rs | 5 +++ 10 files changed, 124 insertions(+), 22 deletions(-) diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 9f526eb59b..8c5ec3df2f 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -11,13 +11,17 @@ use solana_gossip::{ cluster_info::{ClusterInfo, Node}, contact_info::ContactInfo, }; -use solana_ledger::shred::Shred; +use solana_ledger::{ + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + shred::Shred, +}; +use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::pubkey; use solana_sdk::timing::timestamp; use std::{ collections::HashMap, net::UdpSocket, - sync::{atomic::AtomicU64, Arc}, + sync::{atomic::AtomicU64, Arc, RwLock}, }; use test::Bencher; @@ -29,6 +33,10 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { let cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + const NUM_SHREDS: usize = 32; let shreds = vec![Shred::new_empty_data_shred(); NUM_SHREDS]; let mut stakes = HashMap::new(); @@ -51,6 +59,8 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { &cluster_nodes, &last_datapoint, &mut TransmitShredsStats::default(), + cluster_info.id(), + &bank_forks, ) .unwrap(); }); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index d55d48484d..2846cec3bd 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -19,7 +19,7 @@ use solana_ledger::{blockstore::Blockstore, shred::Shred}; use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use solana_poh::poh_recorder::WorkingBankEntry; -use solana_runtime::bank::Bank; +use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::timing::timestamp; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}; use solana_streamer::sendmmsg::send_mmsg; @@ -29,7 +29,7 @@ use std::{ net::UdpSocket, sync::atomic::{AtomicBool, Ordering}, sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender}, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }; @@ -69,6 +69,7 @@ pub enum BroadcastStageType { } impl BroadcastStageType { + #[allow(clippy::too_many_arguments)] pub fn new_broadcast_stage( &self, sock: Vec, @@ -77,6 +78,7 @@ impl BroadcastStageType { retransmit_slots_receiver: RetransmitSlotsReceiver, exit_sender: &Arc, blockstore: &Arc, + bank_forks: &Arc>, shred_version: u16, ) -> BroadcastStage { match self { @@ -87,6 +89,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, + bank_forks, StandardBroadcastRun::new(shred_version), ), @@ -97,6 +100,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, + bank_forks, FailEntryVerificationBroadcastRun::new(shred_version), ), @@ -107,6 +111,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, + bank_forks, BroadcastFakeShredsRun::new(0, shred_version), ), @@ -117,6 +122,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, + bank_forks, BroadcastDuplicatesRun::new(shred_version, config.clone()), ), } @@ -138,6 +144,7 @@ trait BroadcastRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, + bank_forks: &Arc>, ) -> Result<()>; fn record( &mut self, @@ -237,6 +244,7 @@ impl BroadcastStage { retransmit_slots_receiver: RetransmitSlotsReceiver, exit_sender: &Arc, blockstore: &Arc, + bank_forks: &Arc>, broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, ) -> Self { let btree = blockstore.clone(); @@ -269,10 +277,12 @@ impl BroadcastStage { let socket_receiver = socket_receiver.clone(); let mut bs_transmit = broadcast_stage_run.clone(); let cluster_info = cluster_info.clone(); + let bank_forks = bank_forks.clone(); let t = Builder::new() .name("solana-broadcaster-transmit".to_string()) .spawn(move || loop { - let res = bs_transmit.transmit(&socket_receiver, &cluster_info, &sock); + let res = + bs_transmit.transmit(&socket_receiver, &cluster_info, &sock, &bank_forks); let res = Self::handle_error(res, "solana-broadcaster-transmit"); if let Some(res) = res { return res; @@ -396,6 +406,8 @@ pub fn broadcast_shreds( cluster_nodes: &ClusterNodes, last_datapoint_submit: &Arc, transmit_stats: &mut TransmitShredsStats, + self_pubkey: Pubkey, + bank_forks: &Arc>, ) -> Result<()> { let broadcast_len = cluster_nodes.num_peers(); if broadcast_len == 0 { @@ -403,10 +415,12 @@ pub fn broadcast_shreds( return Ok(()); } let mut shred_select = Measure::start("shred_select"); + let root_bank = bank_forks.read().unwrap().root_bank(); let packets: Vec<_> = shreds .iter() .filter_map(|shred| { - let node = cluster_nodes.get_broadcast_peer(shred.seed())?; + let seed = shred.seed(Some(self_pubkey), &root_bank); + let node = cluster_nodes.get_broadcast_peer(seed)?; Some((&shred.payload, &node.tvu)) }) .collect(); @@ -598,7 +612,9 @@ pub mod test { let exit_sender = Arc::new(AtomicBool::new(false)); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); - let bank = Arc::new(Bank::new(&genesis_config)); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = bank_forks.read().unwrap().root_bank(); // Start up the broadcast stage let broadcast_service = BroadcastStage::new( @@ -608,6 +624,7 @@ pub mod test { retransmit_slots_receiver, &exit_sender, &blockstore, + &bank_forks, StandardBroadcastRun::new(0), ); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index b219652d31..16db6b5a07 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -284,6 +284,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, + _bank_forks: &Arc>, ) -> Result<()> { // Check the delay queue for shreds that are ready to be sent let (delayed_recipient, delayed_shreds) = { diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index c9b118068e..538faa5d22 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -105,6 +105,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, + _bank_forks: &Arc>, ) -> Result<()> { for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() { let peers = cluster_info.tvu_peers(); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 901dc3759a..79acbab1a0 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -131,6 +131,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, + bank_forks: &Arc>, ) -> Result<()> { let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; // Broadcast data @@ -144,6 +145,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { &cluster_nodes, &Arc::new(AtomicU64::new(0)), &mut TransmitShredsStats::default(), + cluster_info.id(), + bank_forks, )?; Ok(()) diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 76cb09ed03..7f08cbe81c 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -149,17 +149,19 @@ impl StandardBroadcastRun { sock: &UdpSocket, blockstore: &Arc, receive_results: ReceiveResults, + bank_forks: &Arc>, ) -> Result<()> { let (bsend, brecv) = channel(); let (ssend, srecv) = channel(); self.process_receive_results(keypair, blockstore, &ssend, &bsend, receive_results)?; let srecv = Arc::new(Mutex::new(srecv)); let brecv = Arc::new(Mutex::new(brecv)); + //data - let _ = self.transmit(&srecv, cluster_info, sock); + let _ = self.transmit(&srecv, cluster_info, sock, bank_forks); let _ = self.record(&brecv, blockstore); //coding - let _ = self.transmit(&srecv, cluster_info, sock); + let _ = self.transmit(&srecv, cluster_info, sock, bank_forks); let _ = self.record(&brecv, blockstore); Ok(()) } @@ -333,6 +335,7 @@ impl StandardBroadcastRun { stakes: Option<&HashMap>, shreds: Arc>, broadcast_shred_batch_info: Option, + bank_forks: &Arc>, ) -> Result<()> { const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000; trace!("Broadcasting {:?} shreds", shreds.len()); @@ -358,12 +361,15 @@ impl StandardBroadcastRun { let mut transmit_stats = TransmitShredsStats::default(); // Broadcast the shreds let mut transmit_time = Measure::start("broadcast_shreds"); + broadcast_shreds( sock, &shreds, &cluster_nodes, &self.last_datapoint_submit, &mut transmit_stats, + cluster_info.id(), + bank_forks, )?; drop(cluster_nodes); transmit_time.stop(); @@ -470,9 +476,17 @@ impl BroadcastRun for StandardBroadcastRun { receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, + bank_forks: &Arc>, ) -> Result<()> { let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; - self.broadcast(sock, cluster_info, stakes.as_deref(), shreds, slot_start_ts) + self.broadcast( + sock, + cluster_info, + stakes.as_deref(), + shreds, + slot_start_ts, + bank_forks, + ) } fn record( &mut self, @@ -503,6 +517,7 @@ mod test { use std::sync::Arc; use std::time::Duration; + #[allow(clippy::type_complexity)] fn setup( num_shreds_per_slot: Slot, ) -> ( @@ -512,6 +527,7 @@ mod test { Arc, Arc, UdpSocket, + Arc>, ) { // Setup let ledger_path = get_tmp_ledger_path!(); @@ -525,7 +541,10 @@ mod test { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut genesis_config = create_genesis_config(10_000).genesis_config; genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot, None) + 1; - let bank0 = Arc::new(Bank::new(&genesis_config)); + + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank0 = bank_forks.read().unwrap().root_bank(); ( blockstore, genesis_config, @@ -533,6 +552,7 @@ mod test { bank0, leader_keypair, socket, + bank_forks, ) } @@ -575,7 +595,7 @@ mod test { fn test_slot_interrupt() { // Setup let num_shreds_per_slot = 2; - let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket) = + let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) = setup(num_shreds_per_slot); // Insert 1 less than the number of ticks needed to finish the slot @@ -596,6 +616,7 @@ mod test { &socket, &blockstore, receive_results, + &bank_forks, ) .unwrap(); let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); @@ -660,6 +681,7 @@ mod test { &socket, &blockstore, receive_results, + &bank_forks, ) .unwrap(); let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); @@ -701,7 +723,7 @@ mod test { #[test] fn test_buffer_data_shreds() { let num_shreds_per_slot = 2; - let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket) = + let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket, _bank_forks) = setup(num_shreds_per_slot); let (bsend, brecv) = channel(); let (ssend, _srecv) = channel(); @@ -752,7 +774,7 @@ mod test { fn test_slot_finish() { // Setup let num_shreds_per_slot = 2; - let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket) = + let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) = setup(num_shreds_per_slot); // Insert complete slot of ticks needed to finish the slot @@ -772,6 +794,7 @@ mod test { &socket, &blockstore, receive_results, + &bank_forks, ) .unwrap(); assert!(standard_broadcast_run.unfinished_slot.is_none()) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 0e741edb9b..4836f097da 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -96,7 +96,7 @@ impl Tpu { verified_vote_packets_sender, poh_recorder, vote_tracker, - bank_forks, + bank_forks.clone(), subscriptions.clone(), verified_vote_sender, gossip_verified_vote_hash_sender, @@ -124,6 +124,7 @@ impl Tpu { retransmit_slots_receiver, exit, blockstore, + &bank_forks, shred_version, ); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index ffa158d526..bd074a675c 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -213,6 +213,8 @@ where fn recv_window( blockstore: &Arc, + leader_schedule_cache: &Arc, + bank_forks: &Arc>, insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, my_pubkey: &Pubkey, verified_receiver: &CrossbeamReceiver>, @@ -236,6 +238,7 @@ where let now = Instant::now(); inc_new_counter_debug!("streamer-recv_window-recv", total_packets); + let root_bank = bank_forks.read().unwrap().root_bank(); let last_root = blockstore.last_root(); let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets @@ -275,8 +278,10 @@ where } }; if shred_filter(&shred, last_root) { + let leader_pubkey = leader_schedule_cache + .slot_leader_at(shred.slot(), Some(&root_bank)); packet.meta.slot = shred.slot(); - packet.meta.seed = shred.seed(); + packet.meta.seed = shred.seed(leader_pubkey, &root_bank); Some((shred, repair_info)) } else { packet.meta.discard = true; @@ -370,7 +375,7 @@ impl WindowService { let outstanding_requests: Arc> = Arc::new(RwLock::new(OutstandingRequests::default())); - let bank_forks = Some(repair_info.bank_forks.clone()); + let bank_forks = repair_info.bank_forks.clone(); let repair_service = RepairService::new( blockstore.clone(), @@ -411,7 +416,8 @@ impl WindowService { insert_sender, verified_receiver, shred_filter, - bank_forks, + leader_schedule_cache, + &bank_forks, retransmit, ); @@ -509,6 +515,7 @@ impl WindowService { .unwrap() } + #[allow(clippy::too_many_arguments)] fn start_recv_window_thread( id: Pubkey, exit: &Arc, @@ -516,7 +523,8 @@ impl WindowService { insert_sender: CrossbeamSender<(Vec, Vec>)>, verified_receiver: CrossbeamReceiver>, shred_filter: F, - bank_forks: Option>>, + leader_schedule_cache: &Arc, + bank_forks: &Arc>, retransmit: PacketSender, ) -> JoinHandle<()> where @@ -527,6 +535,9 @@ impl WindowService { { let exit = exit.clone(); let blockstore = blockstore.clone(); + let bank_forks = bank_forks.clone(); + let bank_forks_opt = Some(bank_forks.clone()); + let leader_schedule_cache = leader_schedule_cache.clone(); Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -554,6 +565,8 @@ impl WindowService { }; if let Err(e) = recv_window( &blockstore, + &leader_schedule_cache, + &bank_forks, &insert_sender, &id, &verified_receiver, @@ -562,7 +575,7 @@ impl WindowService { shred_filter( &id, shred, - bank_forks + bank_forks_opt .as_ref() .map(|bank_forks| bank_forks.read().unwrap().working_bank()), last_root, diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 13b30bbab6..da636353fa 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -65,15 +65,17 @@ use serde::{Deserialize, Serialize}; use solana_measure::measure::Measure; use solana_perf::packet::{limited_deserialize, Packet}; use solana_rayon_threadlimit::get_thread_count; +use solana_runtime::bank::Bank; use solana_sdk::{ clock::Slot, + feature_set, + hash::hashv, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; use std::mem::size_of; - use thiserror::Error; #[derive(Default, Clone)] @@ -467,7 +469,18 @@ impl Shred { self.common_header.signature } - pub fn seed(&self) -> [u8; 32] { + pub fn seed(&self, leader_pubkey: Option, root_bank: &Bank) -> [u8; 32] { + if let Some(leader_pubkey) = leader_pubkey { + if enable_deterministic_seed(self.slot(), root_bank) { + let h = hashv(&[ + &self.slot().to_le_bytes(), + &self.index().to_le_bytes(), + &leader_pubkey.to_bytes(), + ]); + return h.to_bytes(); + } + } + let mut seed = [0; 32]; let seed_len = seed.len(); let sig = self.common_header.signature.as_ref(); @@ -557,6 +570,21 @@ impl Shred { } } +fn enable_deterministic_seed(shred_slot: Slot, bank: &Bank) -> bool { + let feature_slot = bank + .feature_set + .activated_slot(&feature_set::deterministic_shred_seed_enabled::id()); + match feature_slot { + None => false, + Some(feature_slot) => { + let epoch_schedule = bank.epoch_schedule(); + let feature_epoch = epoch_schedule.get_epoch(feature_slot); + let shred_epoch = epoch_schedule.get_epoch(shred_slot); + feature_epoch < shred_epoch + } + } +} + #[derive(Debug)] pub struct Shredder { pub slot: Slot, diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 2c9262a42a..de94de0a48 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -143,6 +143,10 @@ pub mod dedupe_config_program_signers { solana_sdk::declare_id!("8kEuAshXLsgkUEdcFVLqrjCGGHVWFW99ZZpxvAzzMtBp"); } +pub mod deterministic_shred_seed_enabled { + solana_sdk::declare_id!("FjSRMpFe7mofQ3WrEMT7Smjk2sME1XdAoRxcv55V6M44"); +} + pub mod verify_tx_signatures_len { solana_sdk::declare_id!("EVW9B5xD9FFK7vw1SBARwMA4s5eRo5eKJdKpsBikzKBz"); } @@ -187,6 +191,7 @@ lazy_static! { (system_transfer_zero_check::id(), "perform all checks for transfers of 0 lamports"), (blake3_syscall_enabled::id(), "blake3 syscall"), (dedupe_config_program_signers::id(), "dedupe config program signers"), + (deterministic_shred_seed_enabled::id(), "deterministic shred seed"), (vote_stake_checked_instructions::id(), "vote/state program checked instructions #18345"), /*************** ADD NEW FEATURES HERE ***************/ ]