diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 63f762a2fb..1f96626310 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -83,11 +83,11 @@ fn bench_retransmitter(bencher: &mut Bencher) { }) .collect(); - let keypair = Arc::new(Keypair::new()); + let keypair = Keypair::new(); let slot = 0; let parent = 0; - let shredder = Shredder::new(slot, parent, keypair, 0, 0).unwrap(); - let mut data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; + let shredder = Shredder::new(slot, parent, 0, 0).unwrap(); + let mut data_shreds = shredder.entries_to_shreds(&keypair, &entries, true, 0).0; let num_packets = data_shreds.len(); diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 5a99f15dcc..015ac85e29 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -14,7 +14,6 @@ use solana_ledger::shred::{ use solana_perf::test_tx; use solana_sdk::hash::Hash; use solana_sdk::signature::Keypair; -use std::sync::Arc; use test::Bencher; fn make_test_entry(txs_per_entry: u64) -> Entry { @@ -39,9 +38,10 @@ fn make_shreds(num_shreds: usize) -> Vec { Some(shred_size), ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); - let shredder = Shredder::new(1, 0, Arc::new(Keypair::new()), 0, 0).unwrap(); + let shredder = Shredder::new(1, 0, 0, 0).unwrap(); let data_shreds = shredder .entries_to_data_shreds( + &Keypair::new(), &entries, true, // is_last_in_slot 0, // next_shred_index @@ -67,21 +67,21 @@ fn make_concatenated_shreds(num_shreds: usize) -> Vec { #[bench] fn bench_shredder_ticks(bencher: &mut Bencher) { - let kp = Arc::new(Keypair::new()); + let kp = Keypair::new(); let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD; let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; // ~1Mb let num_ticks = max_ticks_per_n_shreds(1, Some(SIZE_OF_DATA_SHRED_PAYLOAD)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); bencher.iter(|| { - let shredder = Shredder::new(1, 0, kp.clone(), 0, 0).unwrap(); - shredder.entries_to_shreds(&entries, true, 0); + let shredder = Shredder::new(1, 0, 0, 0).unwrap(); + shredder.entries_to_shreds(&kp, &entries, true, 0); }) } #[bench] fn bench_shredder_large_entries(bencher: &mut Bencher) { - let kp = Arc::new(Keypair::new()); + let kp = Keypair::new(); let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD; let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; let txs_per_entry = 128; @@ -93,21 +93,21 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { let entries = make_large_unchained_entries(txs_per_entry, num_entries); // 1Mb bencher.iter(|| { - let shredder = Shredder::new(1, 0, kp.clone(), 0, 0).unwrap(); - shredder.entries_to_shreds(&entries, true, 0); + let shredder = Shredder::new(1, 0, 0, 0).unwrap(); + shredder.entries_to_shreds(&kp, &entries, true, 0); }) } #[bench] fn bench_deshredder(bencher: &mut Bencher) { - let kp = Arc::new(Keypair::new()); + let kp = Keypair::new(); let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD; // ~10Mb let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); - let shredder = Shredder::new(1, 0, kp, 0, 0).unwrap(); - let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; + let shredder = Shredder::new(1, 0, 0, 0).unwrap(); + let data_shreds = shredder.entries_to_shreds(&kp, &entries, true, 0).0; bencher.iter(|| { let raw = &mut Shredder::deshred(&data_shreds).unwrap(); assert_ne!(raw.len(), 0); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index a0953aee62..7e5f83b82d 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -23,7 +23,7 @@ use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use solana_poh::poh_recorder::WorkingBankEntry; use solana_runtime::bank::Bank; use solana_sdk::timing::timestamp; -use solana_sdk::{clock::Slot, pubkey::Pubkey}; +use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}; use solana_streamer::sendmmsg::send_mmsg; use std::sync::atomic::AtomicU64; use std::{ @@ -81,7 +81,6 @@ impl BroadcastStageType { blockstore: &Arc, shred_version: u16, ) -> BroadcastStage { - let keypair = cluster_info.keypair.clone(); match self { BroadcastStageType::Standard => BroadcastStage::new( sock, @@ -90,7 +89,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, - StandardBroadcastRun::new(keypair, shred_version), + StandardBroadcastRun::new(shred_version), ), BroadcastStageType::FailEntryVerification => BroadcastStage::new( @@ -100,7 +99,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, - FailEntryVerificationBroadcastRun::new(keypair, shred_version), + FailEntryVerificationBroadcastRun::new(shred_version), ), BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new( @@ -110,7 +109,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, - BroadcastFakeShredsRun::new(keypair, 0, shred_version), + BroadcastFakeShredsRun::new(0, shred_version), ), BroadcastStageType::BroadcastDuplicates(config) => BroadcastStage::new( @@ -120,7 +119,7 @@ impl BroadcastStageType { retransmit_slots_receiver, exit_sender, blockstore, - BroadcastDuplicatesRun::new(keypair, shred_version, config.clone()), + BroadcastDuplicatesRun::new(shred_version, config.clone()), ), } } @@ -130,6 +129,7 @@ pub type TransmitShreds = (Option>>, Arc>); trait BroadcastRun { fn run( &mut self, + keypair: &Keypair, blockstore: &Arc, receiver: &Receiver, socket_sender: &Sender<(TransmitShreds, Option)>, @@ -173,6 +173,7 @@ pub struct BroadcastStage { impl BroadcastStage { #[allow(clippy::too_many_arguments)] fn run( + keypair: &Keypair, blockstore: &Arc, receiver: &Receiver, socket_sender: &Sender<(TransmitShreds, Option)>, @@ -180,8 +181,13 @@ impl BroadcastStage { mut broadcast_stage_run: impl BroadcastRun, ) -> BroadcastStageReturnType { loop { - let res = - broadcast_stage_run.run(blockstore, receiver, socket_sender, blockstore_sender); + let res = broadcast_stage_run.run( + keypair, + blockstore, + receiver, + socket_sender, + blockstore_sender, + ); let res = Self::handle_error(res, "run"); if let Some(res) = res { return res; @@ -242,11 +248,13 @@ impl BroadcastStage { let bs_run = broadcast_stage_run.clone(); let socket_sender_ = socket_sender.clone(); + let keypair = cluster_info.keypair.clone(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { let _finalizer = Finalizer::new(exit); Self::run( + &keypair, &btree, &receiver, &socket_sender_, @@ -635,7 +643,6 @@ pub mod test { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Arc::new(Bank::new(&genesis_config)); - let leader_keypair = cluster_info.keypair.clone(); // Start up the broadcast stage let broadcast_service = BroadcastStage::new( leader_info.sockets.broadcast, @@ -644,7 +651,7 @@ pub mod test { retransmit_slots_receiver, &exit_sender, &blockstore, - StandardBroadcastRun::new(leader_keypair, 0), + StandardBroadcastRun::new(0), ); MockBroadcastStage { diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index d9d738267e..b219652d31 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -28,15 +28,10 @@ pub(super) struct BroadcastDuplicatesRun { last_broadcast_slot: Slot, next_shred_index: u32, shred_version: u16, - keypair: Arc, } impl BroadcastDuplicatesRun { - pub(super) fn new( - keypair: Arc, - shred_version: u16, - config: BroadcastDuplicatesConfig, - ) -> Self { + pub(super) fn new(shred_version: u16, config: BroadcastDuplicatesConfig) -> Self { let mut delayed_queue = DelayedQueue::new(); delayed_queue.resize(config.duplicate_send_delay, (None, None)); Self { @@ -48,7 +43,6 @@ impl BroadcastDuplicatesRun { last_broadcast_slot: 0, last_duplicate_entry_hash: Hash::default(), shred_version, - keypair, } } @@ -139,6 +133,7 @@ pub const MINIMUM_DUPLICATE_SLOT: Slot = 20; impl BroadcastRun for BroadcastDuplicatesRun { fn run( &mut self, + keypair: &Keypair, blockstore: &Arc, receiver: &Receiver, socket_sender: &Sender<(TransmitShreds, Option)>, @@ -166,13 +161,13 @@ impl BroadcastRun for BroadcastDuplicatesRun { let shredder = Shredder::new( bank.slot(), bank.parent().unwrap().slot(), - self.keypair.clone(), (bank.tick_height() % bank.ticks_per_slot()) as u8, self.shred_version, ) .expect("Expected to create a new shredder"); let (data_shreds, coding_shreds, last_shred_index) = shredder.entries_to_shreds( + keypair, &receive_results.entries, last_tick_height == bank.max_tick_height(), self.next_shred_index, @@ -182,6 +177,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { self.queue_or_create_duplicate_entries(&bank, &receive_results); let (duplicate_data_shreds, duplicate_coding_shreds, _) = if !duplicate_entries.is_empty() { shredder.entries_to_shreds( + keypair, &duplicate_entries, last_tick_height == bank.max_tick_height(), next_duplicate_shred_index, @@ -208,7 +204,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { .epoch_staked_nodes(bank_epoch) .unwrap() .into_iter() - .filter(|(pubkey, _)| *pubkey != self.keypair.pubkey()) + .filter(|(pubkey, _)| *pubkey != keypair.pubkey()) .collect(); stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| { if r_stake == l_stake { @@ -234,7 +230,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { if bank.slot() > MINIMUM_DUPLICATE_SLOT && last_tick_height == bank.max_tick_height() { warn!( "{} sent duplicate slot {} to nodes: {:?}", - self.keypair.pubkey(), + keypair.pubkey(), bank.slot(), &duplicate_recipients, ); diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 55983fd35e..c9b118068e 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -9,16 +9,14 @@ pub(super) struct BroadcastFakeShredsRun { last_blockhash: Hash, partition: usize, shred_version: u16, - keypair: Arc, } impl BroadcastFakeShredsRun { - pub(super) fn new(keypair: Arc, partition: usize, shred_version: u16) -> Self { + pub(super) fn new(partition: usize, shred_version: u16) -> Self { Self { last_blockhash: Hash::default(), partition, shred_version, - keypair, } } } @@ -26,6 +24,7 @@ impl BroadcastFakeShredsRun { impl BroadcastRun for BroadcastFakeShredsRun { fn run( &mut self, + keypair: &Keypair, blockstore: &Arc, receiver: &Receiver, socket_sender: &Sender<(TransmitShreds, Option)>, @@ -47,13 +46,13 @@ impl BroadcastRun for BroadcastFakeShredsRun { let shredder = Shredder::new( bank.slot(), bank.parent().unwrap().slot(), - self.keypair.clone(), (bank.tick_height() % bank.ticks_per_slot()) as u8, self.shred_version, ) .expect("Expected to create a new shredder"); let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( + keypair, &receive_results.entries, last_tick_height == bank.max_tick_height(), next_shred_index, @@ -70,6 +69,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { .collect(); let (fake_data_shreds, fake_coding_shreds, _) = shredder.entries_to_shreds( + keypair, &fake_entries, last_tick_height == bank.max_tick_height(), next_shred_index, diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index b66681786d..6175db6209 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -10,17 +10,15 @@ pub const SLOT_TO_RESOLVE: u64 = 32; #[derive(Clone)] pub(super) struct FailEntryVerificationBroadcastRun { shred_version: u16, - keypair: Arc, good_shreds: Vec, current_slot: Slot, next_shred_index: u32, } impl FailEntryVerificationBroadcastRun { - pub(super) fn new(keypair: Arc, shred_version: u16) -> Self { + pub(super) fn new(shred_version: u16) -> Self { Self { shred_version, - keypair, good_shreds: vec![], current_slot: 0, next_shred_index: 0, @@ -31,6 +29,7 @@ impl FailEntryVerificationBroadcastRun { impl BroadcastRun for FailEntryVerificationBroadcastRun { fn run( &mut self, + keypair: &Keypair, blockstore: &Arc, receiver: &Receiver, socket_sender: &Sender<(TransmitShreds, Option)>, @@ -71,13 +70,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let shredder = Shredder::new( bank.slot(), bank.parent().unwrap().slot(), - self.keypair.clone(), (bank.tick_height() % bank.ticks_per_slot()) as u8, self.shred_version, ) .expect("Expected to create a new shredder"); let (data_shreds, _, _) = shredder.entries_to_shreds( + keypair, &receive_results.entries, last_tick_height == bank.max_tick_height() && last_entries.is_none(), self.next_shred_index, @@ -86,12 +85,12 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { self.next_shred_index += data_shreds.len() as u32; let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| { let (good_last_data_shred, _, _) = - shredder.entries_to_shreds(&[good_last_entry], true, self.next_shred_index); + shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index); let (bad_last_data_shred, _, _) = // Don't mark the last shred as last so that validators won't know that // they've gotten all the shreds, and will continue trying to repair - shredder.entries_to_shreds(&[bad_last_entry], false, self.next_shred_index); + shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index); self.next_shred_index += 1; (good_last_data_shred, bad_last_data_shred) diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 8b9cf78e27..003076c20f 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -13,7 +13,7 @@ use solana_ledger::{ }, }; use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; -use std::{collections::HashMap, ops::Deref, sync::RwLock, time::Duration}; +use std::{collections::HashMap, sync::RwLock, time::Duration}; #[derive(Clone)] pub struct StandardBroadcastRun { @@ -23,7 +23,6 @@ pub struct StandardBroadcastRun { unfinished_slot: Option, current_slot_and_parent: Option<(u64, u64)>, slot_broadcast_start: Option, - keypair: Arc, shred_version: u16, last_datapoint_submit: Arc, num_batches: usize, @@ -38,7 +37,7 @@ struct BroadcastPeerCache { } impl StandardBroadcastRun { - pub(super) fn new(keypair: Arc, shred_version: u16) -> Self { + pub(super) fn new(shred_version: u16) -> Self { Self { process_shreds_stats: ProcessShredsStats::default(), transmit_shreds_stats: Arc::default(), @@ -46,7 +45,6 @@ impl StandardBroadcastRun { unfinished_slot: None, current_slot_and_parent: None, slot_broadcast_start: None, - keypair, shred_version, last_datapoint_submit: Arc::default(), num_batches: 0, @@ -60,6 +58,7 @@ impl StandardBroadcastRun { // shreds buffered. fn finish_prev_slot( &mut self, + keypair: &Keypair, max_ticks_in_slot: u8, stats: &mut ProcessShredsStats, ) -> Vec { @@ -83,10 +82,10 @@ impl StandardBroadcastRun { self.shred_version, fec_set_index.unwrap(), ); - Shredder::sign_shred(self.keypair.deref(), &mut shred); + Shredder::sign_shred(keypair, &mut shred); state.data_shreds_buffer.push(shred.clone()); let mut shreds = make_coding_shreds( - self.keypair.deref(), + keypair, &mut self.unfinished_slot, true, // is_last_in_slot stats, @@ -101,6 +100,7 @@ impl StandardBroadcastRun { fn entries_to_data_shreds( &mut self, + keypair: &Keypair, entries: &[Entry], blockstore: &Blockstore, reference_tick: u8, @@ -118,21 +118,17 @@ impl StandardBroadcastRun { None => (0, 0), }, }; - let (data_shreds, next_shred_index) = Shredder::new( - slot, - parent_slot, - self.keypair.clone(), - reference_tick, - self.shred_version, - ) - .unwrap() - .entries_to_data_shreds( - entries, - is_slot_end, - next_shred_index, - fec_set_offset, - process_stats, - ); + let (data_shreds, next_shred_index) = + Shredder::new(slot, parent_slot, reference_tick, self.shred_version) + .unwrap() + .entries_to_data_shreds( + keypair, + entries, + is_slot_end, + next_shred_index, + fec_set_offset, + process_stats, + ); let mut data_shreds_buffer = match &mut self.unfinished_slot { Some(state) => { assert_eq!(state.slot, slot); @@ -154,6 +150,7 @@ impl StandardBroadcastRun { #[cfg(test)] fn test_process_receive_results( &mut self, + keypair: &Keypair, cluster_info: &ClusterInfo, sock: &UdpSocket, blockstore: &Arc, @@ -161,7 +158,7 @@ impl StandardBroadcastRun { ) -> Result<()> { let (bsend, brecv) = channel(); let (ssend, srecv) = channel(); - self.process_receive_results(blockstore, &ssend, &bsend, receive_results)?; + self.process_receive_results(keypair, blockstore, &ssend, &bsend, receive_results)?; let srecv = Arc::new(Mutex::new(srecv)); let brecv = Arc::new(Mutex::new(brecv)); //data @@ -175,6 +172,7 @@ impl StandardBroadcastRun { fn process_receive_results( &mut self, + keypair: &Keypair, blockstore: &Arc, socket_sender: &Sender<(TransmitShreds, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, @@ -205,12 +203,13 @@ impl StandardBroadcastRun { // 1) Check if slot was interrupted let prev_slot_shreds = - self.finish_prev_slot(bank.ticks_per_slot() as u8, &mut process_stats); + self.finish_prev_slot(keypair, bank.ticks_per_slot() as u8, &mut process_stats); // 2) Convert entries to shreds and coding shreds let is_last_in_slot = last_tick_height == bank.max_tick_height(); let reference_tick = bank.tick_height() % bank.ticks_per_slot(); let data_shreds = self.entries_to_data_shreds( + keypair, &receive_results.entries, blockstore, reference_tick as u8, @@ -273,7 +272,7 @@ impl StandardBroadcastRun { // Create and send coding shreds let coding_shreds = make_coding_shreds( - self.keypair.deref(), + keypair, &mut self.unfinished_slot, is_last_in_slot, &mut process_stats, @@ -456,6 +455,7 @@ fn make_coding_shreds( impl BroadcastRun for StandardBroadcastRun { fn run( &mut self, + keypair: &Keypair, blockstore: &Arc, receiver: &Receiver, socket_sender: &Sender<(TransmitShreds, Option)>, @@ -465,6 +465,7 @@ impl BroadcastRun for StandardBroadcastRun { // TODO: Confirm that last chunk of coding shreds // will not be lost or delayed for too long. self.process_receive_results( + keypair, blockstore, socket_sender, blockstore_sender, @@ -505,6 +506,7 @@ mod test { genesis_config::GenesisConfig, signature::{Keypair, Signer}, }; + use std::ops::Deref; use std::sync::Arc; use std::time::Duration; @@ -544,7 +546,7 @@ mod test { #[test] fn test_interrupted_slot_last_shred() { let keypair = Arc::new(Keypair::new()); - let mut run = StandardBroadcastRun::new(keypair.clone(), 0); + let mut run = StandardBroadcastRun::new(0); // Set up the slot to be interrupted let next_shred_index = 10; @@ -563,7 +565,7 @@ mod test { run.current_slot_and_parent = Some((4, 2)); // Slot 2 interrupted slot 1 - let shreds = run.finish_prev_slot(0, &mut ProcessShredsStats::default()); + let shreds = run.finish_prev_slot(&keypair, 0, &mut ProcessShredsStats::default()); let shred = shreds .get(0) .expect("Expected a shred that signals an interrupt"); @@ -593,9 +595,15 @@ mod test { }; // Step 1: Make an incomplete transmission for slot 0 - let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair.clone(), 0); + let mut standard_broadcast_run = StandardBroadcastRun::new(0); standard_broadcast_run - .test_process_receive_results(&cluster_info, &socket, &blockstore, receive_results) + .test_process_receive_results( + &leader_keypair, + &cluster_info, + &socket, + &blockstore, + receive_results, + ) .unwrap(); let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); assert_eq!(unfinished_slot.next_shred_index as u64, num_shreds_per_slot); @@ -653,7 +661,13 @@ mod test { last_tick_height: (ticks1.len() - 1) as u64, }; standard_broadcast_run - .test_process_receive_results(&cluster_info, &socket, &blockstore, receive_results) + .test_process_receive_results( + &leader_keypair, + &cluster_info, + &socket, + &blockstore, + receive_results, + ) .unwrap(); let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); @@ -699,7 +713,7 @@ mod test { let (bsend, brecv) = channel(); let (ssend, _srecv) = channel(); let mut last_tick_height = 0; - let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair, 0); + let mut standard_broadcast_run = StandardBroadcastRun::new(0); let mut process_ticks = |num_ticks| { let ticks = create_ticks(num_ticks, 0, genesis_config.hash()); last_tick_height += (ticks.len() - 1) as u64; @@ -710,7 +724,13 @@ mod test { last_tick_height, }; standard_broadcast_run - .process_receive_results(&blockstore, &ssend, &bsend, receive_results) + .process_receive_results( + &leader_keypair, + &blockstore, + &ssend, + &bsend, + receive_results, + ) .unwrap(); }; for i in 0..3 { @@ -751,9 +771,15 @@ mod test { last_tick_height: ticks.len() as u64, }; - let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair, 0); + let mut standard_broadcast_run = StandardBroadcastRun::new(0); standard_broadcast_run - .test_process_receive_results(&cluster_info, &socket, &blockstore, receive_results) + .test_process_receive_results( + &leader_keypair, + &cluster_info, + &socket, + &blockstore, + receive_results, + ) .unwrap(); assert!(standard_broadcast_run.unfinished_slot.is_none()) } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index af8178ab28..42743f4006 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -627,10 +627,10 @@ mod test { entries: &[Entry], slot: Slot, parent: Slot, - keypair: &Arc, + keypair: &Keypair, ) -> Vec { - let shredder = Shredder::new(slot, parent, keypair.clone(), 0, 0).unwrap(); - shredder.entries_to_shreds(entries, true, 0).0 + let shredder = Shredder::new(slot, parent, 0, 0).unwrap(); + shredder.entries_to_shreds(keypair, entries, true, 0).0 } #[test] @@ -639,7 +639,7 @@ mod test { let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); let num_entries = 10; let original_entries = create_ticks(num_entries, 0, Hash::default()); - let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Arc::new(Keypair::new())); + let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new()); shreds.reverse(); blockstore .insert_shreds(shreds, None, false) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 8a55d1f2a7..9971309993 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -3408,11 +3408,10 @@ mod tests { let leader = Arc::new(Keypair::new()); let keypair = Keypair::new(); let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); - let shredder = - Shredder::new(slot, parent_slot, leader.clone(), reference_tick, version).unwrap(); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); let next_shred_index = rng.gen(); - let shred = new_rand_shred(&mut rng, next_shred_index, &shredder); - let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder).payload; + let shred = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); + let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader).payload; let leader_schedule = |s| { if s == slot { Some(leader.pubkey()) diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index ba7c330a59..59cd28836e 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -317,6 +317,7 @@ pub(crate) mod tests { rng: &mut R, next_shred_index: u32, shredder: &Shredder, + keypair: &Keypair, ) -> Shred { let entries: Vec<_> = std::iter::repeat_with(|| { let tx = system_transaction::transfer( @@ -334,6 +335,7 @@ pub(crate) mod tests { .take(5) .collect(); let (mut data_shreds, _coding_shreds, _last_shred_index) = shredder.entries_to_shreds( + keypair, &entries, true, // is_last_in_slot next_shred_index, @@ -346,11 +348,10 @@ pub(crate) mod tests { let mut rng = rand::thread_rng(); let leader = Arc::new(Keypair::new()); let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); - let shredder = - Shredder::new(slot, parent_slot, leader.clone(), reference_tick, version).unwrap(); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); let next_shred_index = rng.gen(); - let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder); - let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder); + let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); + let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); let leader_schedule = |s| { if s == slot { Some(leader.pubkey()) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e4bbef6c33..f4c867dff5 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1591,8 +1591,7 @@ impl Blockstore { let mut remaining_ticks_in_slot = num_slots * ticks_per_slot - num_ticks_in_start_slot; let mut current_slot = start_slot; - let mut shredder = - Shredder::new(current_slot, parent_slot, keypair.clone(), 0, version).unwrap(); + let mut shredder = Shredder::new(current_slot, parent_slot, 0, version).unwrap(); let mut all_shreds = vec![]; let mut slot_entries = vec![]; // Find all the entries for start_slot @@ -1611,13 +1610,12 @@ impl Blockstore { } }; let (mut data_shreds, mut coding_shreds, _) = - shredder.entries_to_shreds(¤t_entries, true, start_index); + shredder.entries_to_shreds(keypair, ¤t_entries, true, start_index); all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); shredder = Shredder::new( current_slot, parent_slot, - keypair.clone(), (ticks_per_slot - remaining_ticks_in_slot) as u8, version, ) @@ -1632,7 +1630,7 @@ impl Blockstore { if !slot_entries.is_empty() { let (mut data_shreds, mut coding_shreds, _) = - shredder.entries_to_shreds(&slot_entries, is_full_slot, 0); + shredder.entries_to_shreds(keypair, &slot_entries, is_full_slot, 0); all_shreds.append(&mut data_shreds); all_shreds.append(&mut coding_shreds); } @@ -3530,8 +3528,10 @@ pub fn create_new_ledger( let last_hash = entries.last().unwrap().hash; let version = solana_sdk::shred_version::version_from_hash(&last_hash); - let shredder = Shredder::new(0, 0, Arc::new(Keypair::new()), 0, version).unwrap(); - let shreds = shredder.entries_to_shreds(&entries, true, 0).0; + let shredder = Shredder::new(0, 0, 0, version).unwrap(); + let shreds = shredder + .entries_to_shreds(&Keypair::new(), &entries, true, 0) + .0; assert!(shreds.last().unwrap().last_in_slot()); blockstore.insert_shreds(shreds, None, false)?; @@ -3712,9 +3712,9 @@ pub fn entries_to_test_shreds( is_full_slot: bool, version: u16, ) -> Vec { - Shredder::new(slot, parent_slot, Arc::new(Keypair::new()), 0, version) + Shredder::new(slot, parent_slot, 0, version) .unwrap() - .entries_to_shreds(&entries, is_full_slot, 0) + .entries_to_shreds(&Keypair::new(), &entries, is_full_slot, 0) .0 } @@ -8007,8 +8007,9 @@ pub mod tests { ) -> (Vec, Vec, Arc) { let entries = make_slot_entries_with_transactions(num_entries); let leader_keypair = Arc::new(Keypair::new()); - let shredder = Shredder::new(slot, parent_slot, leader_keypair.clone(), 0, 0).unwrap(); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0); + let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap(); + let (data_shreds, coding_shreds, _) = + shredder.entries_to_shreds(&leader_keypair, &entries, true, 0); let genesis_config = create_genesis_config(2).genesis_config; let bank = Arc::new(Bank::new(&genesis_config)); @@ -8062,9 +8063,10 @@ pub mod tests { let entries1 = make_slot_entries_with_transactions(1); let entries2 = make_slot_entries_with_transactions(1); let leader_keypair = Arc::new(Keypair::new()); - let shredder = Shredder::new(slot, 0, leader_keypair, 0, 0).unwrap(); - let (shreds, _, _) = shredder.entries_to_shreds(&entries1, true, 0); - let (duplicate_shreds, _, _) = shredder.entries_to_shreds(&entries2, true, 0); + let shredder = Shredder::new(slot, 0, 0, 0).unwrap(); + let (shreds, _, _) = shredder.entries_to_shreds(&leader_keypair, &entries1, true, 0); + let (duplicate_shreds, _, _) = + shredder.entries_to_shreds(&leader_keypair, &entries2, true, 0); let shred = shreds[0].clone(); let duplicate_shred = duplicate_shreds[0].clone(); let non_duplicate_shred = shred.clone(); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 8549562eba..13b30bbab6 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -72,7 +72,7 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signature, Signer}, }; -use std::{mem::size_of, ops::Deref, sync::Arc}; +use std::mem::size_of; use thiserror::Error; @@ -562,26 +562,18 @@ pub struct Shredder { pub slot: Slot, pub parent_slot: Slot, version: u16, - keypair: Arc, pub signing_coding_time: u128, reference_tick: u8, } impl Shredder { - pub fn new( - slot: Slot, - parent_slot: Slot, - keypair: Arc, - reference_tick: u8, - version: u16, - ) -> Result { + pub fn new(slot: Slot, parent_slot: Slot, reference_tick: u8, version: u16) -> Result { if slot < parent_slot || slot - parent_slot > u64::from(std::u16::MAX) { Err(ShredError::SlotTooLow { slot, parent_slot }) } else { Ok(Self { slot, parent_slot, - keypair, signing_coding_time: 0, reference_tick, version, @@ -591,25 +583,23 @@ impl Shredder { pub fn entries_to_shreds( &self, + keypair: &Keypair, entries: &[Entry], is_last_in_slot: bool, next_shred_index: u32, ) -> (Vec, Vec, u32) { let mut stats = ProcessShredsStats::default(); let (data_shreds, last_shred_index) = self.entries_to_data_shreds( + keypair, entries, is_last_in_slot, next_shred_index, next_shred_index, // fec_set_offset &mut stats, ); - let coding_shreds = Self::data_shreds_to_coding_shreds( - self.keypair.deref(), - &data_shreds, - is_last_in_slot, - &mut stats, - ) - .unwrap(); + let coding_shreds = + Self::data_shreds_to_coding_shreds(keypair, &data_shreds, is_last_in_slot, &mut stats) + .unwrap(); (data_shreds, coding_shreds, last_shred_index) } @@ -625,6 +615,7 @@ impl Shredder { pub fn entries_to_data_shreds( &self, + keypair: &Keypair, entries: &[Entry], is_last_in_slot: bool, next_shred_index: u32, @@ -659,7 +650,7 @@ impl Shredder { self.version, fec_set_index.unwrap(), ); - Shredder::sign_shred(self.keypair.deref(), &mut shred); + Shredder::sign_shred(keypair, &mut shred); shred }; let data_shreds: Vec = PAR_THREAD_POOL.with(|thread_pool| { @@ -1109,7 +1100,7 @@ pub mod tests { hash::{self, hash}, shred_version, system_transaction, }; - use std::{collections::HashSet, convert::TryInto, iter::repeat_with}; + use std::{collections::HashSet, convert::TryInto, iter::repeat_with, sync::Arc}; #[test] fn test_shred_constants() { @@ -1164,7 +1155,7 @@ pub mod tests { // Test that parent cannot be > current slot assert_matches!( - Shredder::new(slot, slot + 1, keypair.clone(), 0, 0), + Shredder::new(slot, slot + 1, 0, 0), Err(ShredError::SlotTooLow { slot: _, parent_slot: _, @@ -1172,14 +1163,14 @@ pub mod tests { ); // Test that slot - parent cannot be > u16 MAX assert_matches!( - Shredder::new(slot, slot - 1 - 0xffff, keypair.clone(), 0, 0), + Shredder::new(slot, slot - 1 - 0xffff, 0, 0), Err(ShredError::SlotTooLow { slot: _, parent_slot: _, }) ); let parent_slot = slot - 5; - let shredder = Shredder::new(slot, parent_slot, keypair.clone(), 0, 0).unwrap(); + let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap(); let entries: Vec<_> = (0..5) .map(|_| { let keypair0 = Keypair::new(); @@ -1199,7 +1190,7 @@ pub mod tests { .max(num_expected_data_shreds as usize); let start_index = 0; let (data_shreds, coding_shreds, next_index) = - shredder.entries_to_shreds(&entries, true, start_index); + shredder.entries_to_shreds(&keypair, &entries, true, start_index); assert_eq!(next_index as u64, num_expected_data_shreds); let mut data_shred_indexes = HashSet::new(); @@ -1257,7 +1248,7 @@ pub mod tests { let keypair = Arc::new(Keypair::new()); let slot = 1; let parent_slot = 0; - let shredder = Shredder::new(slot, parent_slot, keypair, 0, 0).unwrap(); + let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap(); let entries: Vec<_> = (0..5) .map(|_| { let keypair0 = Keypair::new(); @@ -1268,7 +1259,7 @@ pub mod tests { }) .collect(); - let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; + let data_shreds = shredder.entries_to_shreds(&keypair, &entries, true, 0).0; let deserialized_shred = Shred::new_from_serialized_shred(data_shreds.last().unwrap().payload.clone()).unwrap(); @@ -1280,7 +1271,7 @@ pub mod tests { let keypair = Arc::new(Keypair::new()); let slot = 1; let parent_slot = 0; - let shredder = Shredder::new(slot, parent_slot, keypair, 5, 0).unwrap(); + let shredder = Shredder::new(slot, parent_slot, 5, 0).unwrap(); let entries: Vec<_> = (0..5) .map(|_| { let keypair0 = Keypair::new(); @@ -1291,7 +1282,7 @@ pub mod tests { }) .collect(); - let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; + let data_shreds = shredder.entries_to_shreds(&keypair, &entries, true, 0).0; data_shreds.iter().for_each(|s| { assert_eq!(s.reference_tick(), 5); assert_eq!(Shred::reference_tick_from_data(&s.payload), 5); @@ -1307,7 +1298,7 @@ pub mod tests { let keypair = Arc::new(Keypair::new()); let slot = 1; let parent_slot = 0; - let shredder = Shredder::new(slot, parent_slot, keypair, u8::max_value(), 0).unwrap(); + let shredder = Shredder::new(slot, parent_slot, u8::max_value(), 0).unwrap(); let entries: Vec<_> = (0..5) .map(|_| { let keypair0 = Keypair::new(); @@ -1318,7 +1309,7 @@ pub mod tests { }) .collect(); - let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; + let data_shreds = shredder.entries_to_shreds(&keypair, &entries, true, 0).0; data_shreds.iter().for_each(|s| { assert_eq!(s.reference_tick(), SHRED_TICK_REFERENCE_MASK); assert_eq!( @@ -1337,7 +1328,7 @@ pub mod tests { fn run_test_data_and_code_shredder(slot: Slot) { let keypair = Arc::new(Keypair::new()); - let shredder = Shredder::new(slot, slot - 5, keypair.clone(), 0, 0).unwrap(); + let shredder = Shredder::new(slot, slot - 5, 0, 0).unwrap(); // Create enough entries to make > 1 shred let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD; let num_entries = max_ticks_per_n_shreds(1, Some(payload_capacity)) + 1; @@ -1351,7 +1342,8 @@ pub mod tests { }) .collect(); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0); + let (data_shreds, coding_shreds, _) = + shredder.entries_to_shreds(&keypair, &entries, true, 0); for (i, s) in data_shreds.iter().enumerate() { verify_test_data_shred( @@ -1378,7 +1370,7 @@ pub mod tests { fn run_test_recovery_and_reassembly(slot: Slot, is_last_in_slot: bool) { let keypair = Arc::new(Keypair::new()); - let shredder = Shredder::new(slot, slot - 5, keypair.clone(), 0, 0).unwrap(); + let shredder = Shredder::new(slot, slot - 5, 0, 0).unwrap(); let keypair0 = Keypair::new(); let keypair1 = Keypair::new(); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); @@ -1400,6 +1392,7 @@ pub mod tests { let serialized_entries = bincode::serialize(&entries).unwrap(); let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( + &keypair, &entries, is_last_in_slot, 0, // next_shred_index @@ -1556,7 +1549,8 @@ pub mod tests { // Test5: Try recovery/reassembly with non zero index full slot with 3 missing data shreds // and 2 missing coding shreds. Hint: should work let serialized_entries = bincode::serialize(&entries).unwrap(); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 25); + let (data_shreds, coding_shreds, _) = + shredder.entries_to_shreds(&keypair, &entries, true, 25); let num_coding_shreds = coding_shreds.len(); // We should have 10 shreds now assert_eq!(data_shreds.len(), num_data_shreds); @@ -1661,14 +1655,13 @@ pub mod tests { let shredder = Shredder::new( slot, slot - rng.gen_range(1, 27), // parent slot - keypair, - 0, // reference tick - rng.gen(), // version + 0, // reference tick + rng.gen(), // version ) .unwrap(); let next_shred_index = rng.gen_range(1, 1024); let (data_shreds, coding_shreds, _) = - shredder.entries_to_shreds(&[entry], is_last_in_slot, next_shred_index); + shredder.entries_to_shreds(&keypair, &[entry], is_last_in_slot, next_shred_index); let num_data_shreds = data_shreds.len(); let num_coding_shreds = coding_shreds.len(); let mut shreds = coding_shreds; @@ -1718,7 +1711,7 @@ pub mod tests { let hash = hash(Hash::default().as_ref()); let version = shred_version::version_from_hash(&hash); assert_ne!(version, 0); - let shredder = Shredder::new(0, 0, keypair, 0, version).unwrap(); + let shredder = Shredder::new(0, 0, 0, version).unwrap(); let entries: Vec<_> = (0..5) .map(|_| { let keypair0 = Keypair::new(); @@ -1730,7 +1723,7 @@ pub mod tests { .collect(); let (data_shreds, coding_shreds, _next_index) = - shredder.entries_to_shreds(&entries, true, 0); + shredder.entries_to_shreds(&keypair, &entries, true, 0); assert!(!data_shreds .iter() .chain(coding_shreds.iter()) @@ -1766,7 +1759,7 @@ pub mod tests { let hash = hash(Hash::default().as_ref()); let version = shred_version::version_from_hash(&hash); assert_ne!(version, 0); - let shredder = Shredder::new(0, 0, keypair, 0, version).unwrap(); + let shredder = Shredder::new(0, 0, 0, version).unwrap(); let entries: Vec<_> = (0..500) .map(|_| { let keypair0 = Keypair::new(); @@ -1779,7 +1772,7 @@ pub mod tests { let start_index = 0x12; let (data_shreds, coding_shreds, _next_index) = - shredder.entries_to_shreds(&entries, true, start_index); + shredder.entries_to_shreds(&keypair, &entries, true, start_index); let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize; data_shreds.iter().enumerate().for_each(|(i, s)| { @@ -1802,7 +1795,7 @@ pub mod tests { let hash = hash(Hash::default().as_ref()); let version = shred_version::version_from_hash(&hash); assert_ne!(version, 0); - let shredder = Shredder::new(0, 0, keypair, 0, version).unwrap(); + let shredder = Shredder::new(0, 0, 0, version).unwrap(); let entries: Vec<_> = (0..500) .map(|_| { let keypair0 = Keypair::new(); @@ -1816,6 +1809,7 @@ pub mod tests { let mut stats = ProcessShredsStats::default(); let start_index = 0x12; let (data_shreds, _next_index) = shredder.entries_to_data_shreds( + &keypair, &entries, true, // is_last_in_slot start_index, @@ -1827,7 +1821,7 @@ pub mod tests { (1..=MAX_DATA_SHREDS_PER_FEC_BLOCK as usize).for_each(|count| { let coding_shreds = Shredder::data_shreds_to_coding_shreds( - shredder.keypair.deref(), + &keypair, &data_shreds[..count], false, // is_last_in_slot &mut stats, @@ -1835,7 +1829,7 @@ pub mod tests { .unwrap(); assert_eq!(coding_shreds.len(), count); let coding_shreds = Shredder::data_shreds_to_coding_shreds( - shredder.keypair.deref(), + &keypair, &data_shreds[..count], true, // is_last_in_slot &mut stats, @@ -1848,7 +1842,7 @@ pub mod tests { }); let coding_shreds = Shredder::data_shreds_to_coding_shreds( - shredder.keypair.deref(), + &keypair, &data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1], false, // is_last_in_slot &mut stats, @@ -1859,7 +1853,7 @@ pub mod tests { MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1 ); let coding_shreds = Shredder::data_shreds_to_coding_shreds( - shredder.keypair.deref(), + &keypair, &data_shreds[..MAX_DATA_SHREDS_PER_FEC_BLOCK as usize + 1], true, // is_last_in_slot &mut stats, diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index 8dc241de38..71eacd1050 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -22,7 +22,7 @@ type IndexShredsMap = BTreeMap>; fn test_multi_fec_block_coding() { let keypair = Arc::new(Keypair::new()); let slot = 0x1234_5678_9abc_def0; - let shredder = Shredder::new(slot, slot - 5, keypair.clone(), 0, 0).unwrap(); + let shredder = Shredder::new(slot, slot - 5, 0, 0).unwrap(); let num_fec_sets = 100; let num_data_shreds = (MAX_DATA_SHREDS_PER_FEC_BLOCK * num_fec_sets) as usize; let keypair0 = Keypair::new(); @@ -46,7 +46,8 @@ fn test_multi_fec_block_coding() { .collect(); let serialized_entries = bincode::serialize(&entries).unwrap(); - let (data_shreds, coding_shreds, next_index) = shredder.entries_to_shreds(&entries, true, 0); + let (data_shreds, coding_shreds, next_index) = + shredder.entries_to_shreds(&keypair, &entries, true, 0); assert_eq!(next_index as usize, num_data_shreds); assert_eq!(data_shreds.len(), num_data_shreds); assert_eq!(coding_shreds.len(), num_data_shreds); @@ -190,7 +191,7 @@ fn setup_different_sized_fec_blocks( parent_slot: Slot, keypair: Arc, ) -> (IndexShredsMap, IndexShredsMap, usize) { - let shredder = Shredder::new(slot, parent_slot, keypair, 0, 0).unwrap(); + let shredder = Shredder::new(slot, parent_slot, 0, 0).unwrap(); let keypair0 = Keypair::new(); let keypair1 = Keypair::new(); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); @@ -227,7 +228,7 @@ fn setup_different_sized_fec_blocks( for i in 0..2 { let is_last = i == 1; let (data_shreds, coding_shreds, new_next_index) = - shredder.entries_to_shreds(&entries, is_last, next_index); + shredder.entries_to_shreds(&keypair, &entries, is_last, next_index); for shred in &data_shreds { if (shred.index() as usize) == total_num_data_shreds - 1 { assert!(shred.data_complete());