Shredder no longer holds a keypair
This commit is contained in:
@ -23,7 +23,7 @@ use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
|
||||
use solana_poh::poh_recorder::WorkingBankEntry;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::timing::timestamp;
|
||||
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
||||
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair};
|
||||
use solana_streamer::sendmmsg::send_mmsg;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::{
|
||||
@ -81,7 +81,6 @@ impl BroadcastStageType {
|
||||
blockstore: &Arc<Blockstore>,
|
||||
shred_version: u16,
|
||||
) -> BroadcastStage {
|
||||
let keypair = cluster_info.keypair.clone();
|
||||
match self {
|
||||
BroadcastStageType::Standard => BroadcastStage::new(
|
||||
sock,
|
||||
@ -90,7 +89,7 @@ impl BroadcastStageType {
|
||||
retransmit_slots_receiver,
|
||||
exit_sender,
|
||||
blockstore,
|
||||
StandardBroadcastRun::new(keypair, shred_version),
|
||||
StandardBroadcastRun::new(shred_version),
|
||||
),
|
||||
|
||||
BroadcastStageType::FailEntryVerification => BroadcastStage::new(
|
||||
@ -100,7 +99,7 @@ impl BroadcastStageType {
|
||||
retransmit_slots_receiver,
|
||||
exit_sender,
|
||||
blockstore,
|
||||
FailEntryVerificationBroadcastRun::new(keypair, shred_version),
|
||||
FailEntryVerificationBroadcastRun::new(shred_version),
|
||||
),
|
||||
|
||||
BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new(
|
||||
@ -110,7 +109,7 @@ impl BroadcastStageType {
|
||||
retransmit_slots_receiver,
|
||||
exit_sender,
|
||||
blockstore,
|
||||
BroadcastFakeShredsRun::new(keypair, 0, shred_version),
|
||||
BroadcastFakeShredsRun::new(0, shred_version),
|
||||
),
|
||||
|
||||
BroadcastStageType::BroadcastDuplicates(config) => BroadcastStage::new(
|
||||
@ -120,7 +119,7 @@ impl BroadcastStageType {
|
||||
retransmit_slots_receiver,
|
||||
exit_sender,
|
||||
blockstore,
|
||||
BroadcastDuplicatesRun::new(keypair, shred_version, config.clone()),
|
||||
BroadcastDuplicatesRun::new(shred_version, config.clone()),
|
||||
),
|
||||
}
|
||||
}
|
||||
@ -130,6 +129,7 @@ pub type TransmitShreds = (Option<Arc<HashMap<Pubkey, u64>>>, Arc<Vec<Shred>>);
|
||||
trait BroadcastRun {
|
||||
fn run(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
@ -173,6 +173,7 @@ pub struct BroadcastStage {
|
||||
impl BroadcastStage {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn run(
|
||||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
@ -180,8 +181,13 @@ impl BroadcastStage {
|
||||
mut broadcast_stage_run: impl BroadcastRun,
|
||||
) -> BroadcastStageReturnType {
|
||||
loop {
|
||||
let res =
|
||||
broadcast_stage_run.run(blockstore, receiver, socket_sender, blockstore_sender);
|
||||
let res = broadcast_stage_run.run(
|
||||
keypair,
|
||||
blockstore,
|
||||
receiver,
|
||||
socket_sender,
|
||||
blockstore_sender,
|
||||
);
|
||||
let res = Self::handle_error(res, "run");
|
||||
if let Some(res) = res {
|
||||
return res;
|
||||
@ -242,11 +248,13 @@ impl BroadcastStage {
|
||||
let bs_run = broadcast_stage_run.clone();
|
||||
|
||||
let socket_sender_ = socket_sender.clone();
|
||||
let keypair = cluster_info.keypair.clone();
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-broadcaster".to_string())
|
||||
.spawn(move || {
|
||||
let _finalizer = Finalizer::new(exit);
|
||||
Self::run(
|
||||
&keypair,
|
||||
&btree,
|
||||
&receiver,
|
||||
&socket_sender_,
|
||||
@ -635,7 +643,6 @@ pub mod test {
|
||||
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
|
||||
let bank = Arc::new(Bank::new(&genesis_config));
|
||||
|
||||
let leader_keypair = cluster_info.keypair.clone();
|
||||
// Start up the broadcast stage
|
||||
let broadcast_service = BroadcastStage::new(
|
||||
leader_info.sockets.broadcast,
|
||||
@ -644,7 +651,7 @@ pub mod test {
|
||||
retransmit_slots_receiver,
|
||||
&exit_sender,
|
||||
&blockstore,
|
||||
StandardBroadcastRun::new(leader_keypair, 0),
|
||||
StandardBroadcastRun::new(0),
|
||||
);
|
||||
|
||||
MockBroadcastStage {
|
||||
|
@ -28,15 +28,10 @@ pub(super) struct BroadcastDuplicatesRun {
|
||||
last_broadcast_slot: Slot,
|
||||
next_shred_index: u32,
|
||||
shred_version: u16,
|
||||
keypair: Arc<Keypair>,
|
||||
}
|
||||
|
||||
impl BroadcastDuplicatesRun {
|
||||
pub(super) fn new(
|
||||
keypair: Arc<Keypair>,
|
||||
shred_version: u16,
|
||||
config: BroadcastDuplicatesConfig,
|
||||
) -> Self {
|
||||
pub(super) fn new(shred_version: u16, config: BroadcastDuplicatesConfig) -> Self {
|
||||
let mut delayed_queue = DelayedQueue::new();
|
||||
delayed_queue.resize(config.duplicate_send_delay, (None, None));
|
||||
Self {
|
||||
@ -48,7 +43,6 @@ impl BroadcastDuplicatesRun {
|
||||
last_broadcast_slot: 0,
|
||||
last_duplicate_entry_hash: Hash::default(),
|
||||
shred_version,
|
||||
keypair,
|
||||
}
|
||||
}
|
||||
|
||||
@ -139,6 +133,7 @@ pub const MINIMUM_DUPLICATE_SLOT: Slot = 20;
|
||||
impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
fn run(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
@ -166,13 +161,13 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
let shredder = Shredder::new(
|
||||
bank.slot(),
|
||||
bank.parent().unwrap().slot(),
|
||||
self.keypair.clone(),
|
||||
(bank.tick_height() % bank.ticks_per_slot()) as u8,
|
||||
self.shred_version,
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
let (data_shreds, coding_shreds, last_shred_index) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&receive_results.entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
self.next_shred_index,
|
||||
@ -182,6 +177,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
self.queue_or_create_duplicate_entries(&bank, &receive_results);
|
||||
let (duplicate_data_shreds, duplicate_coding_shreds, _) = if !duplicate_entries.is_empty() {
|
||||
shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&duplicate_entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
next_duplicate_shred_index,
|
||||
@ -208,7 +204,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
.epoch_staked_nodes(bank_epoch)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.filter(|(pubkey, _)| *pubkey != self.keypair.pubkey())
|
||||
.filter(|(pubkey, _)| *pubkey != keypair.pubkey())
|
||||
.collect();
|
||||
stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| {
|
||||
if r_stake == l_stake {
|
||||
@ -234,7 +230,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
if bank.slot() > MINIMUM_DUPLICATE_SLOT && last_tick_height == bank.max_tick_height() {
|
||||
warn!(
|
||||
"{} sent duplicate slot {} to nodes: {:?}",
|
||||
self.keypair.pubkey(),
|
||||
keypair.pubkey(),
|
||||
bank.slot(),
|
||||
&duplicate_recipients,
|
||||
);
|
||||
|
@ -9,16 +9,14 @@ pub(super) struct BroadcastFakeShredsRun {
|
||||
last_blockhash: Hash,
|
||||
partition: usize,
|
||||
shred_version: u16,
|
||||
keypair: Arc<Keypair>,
|
||||
}
|
||||
|
||||
impl BroadcastFakeShredsRun {
|
||||
pub(super) fn new(keypair: Arc<Keypair>, partition: usize, shred_version: u16) -> Self {
|
||||
pub(super) fn new(partition: usize, shred_version: u16) -> Self {
|
||||
Self {
|
||||
last_blockhash: Hash::default(),
|
||||
partition,
|
||||
shred_version,
|
||||
keypair,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -26,6 +24,7 @@ impl BroadcastFakeShredsRun {
|
||||
impl BroadcastRun for BroadcastFakeShredsRun {
|
||||
fn run(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
@ -47,13 +46,13 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
||||
let shredder = Shredder::new(
|
||||
bank.slot(),
|
||||
bank.parent().unwrap().slot(),
|
||||
self.keypair.clone(),
|
||||
(bank.tick_height() % bank.ticks_per_slot()) as u8,
|
||||
self.shred_version,
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&receive_results.entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
next_shred_index,
|
||||
@ -70,6 +69,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
||||
.collect();
|
||||
|
||||
let (fake_data_shreds, fake_coding_shreds, _) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&fake_entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
next_shred_index,
|
||||
|
@ -10,17 +10,15 @@ pub const SLOT_TO_RESOLVE: u64 = 32;
|
||||
#[derive(Clone)]
|
||||
pub(super) struct FailEntryVerificationBroadcastRun {
|
||||
shred_version: u16,
|
||||
keypair: Arc<Keypair>,
|
||||
good_shreds: Vec<Shred>,
|
||||
current_slot: Slot,
|
||||
next_shred_index: u32,
|
||||
}
|
||||
|
||||
impl FailEntryVerificationBroadcastRun {
|
||||
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
|
||||
pub(super) fn new(shred_version: u16) -> Self {
|
||||
Self {
|
||||
shred_version,
|
||||
keypair,
|
||||
good_shreds: vec![],
|
||||
current_slot: 0,
|
||||
next_shred_index: 0,
|
||||
@ -31,6 +29,7 @@ impl FailEntryVerificationBroadcastRun {
|
||||
impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||
fn run(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
@ -71,13 +70,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||
let shredder = Shredder::new(
|
||||
bank.slot(),
|
||||
bank.parent().unwrap().slot(),
|
||||
self.keypair.clone(),
|
||||
(bank.tick_height() % bank.ticks_per_slot()) as u8,
|
||||
self.shred_version,
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
let (data_shreds, _, _) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&receive_results.entries,
|
||||
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
|
||||
self.next_shred_index,
|
||||
@ -86,12 +85,12 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||
self.next_shred_index += data_shreds.len() as u32;
|
||||
let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| {
|
||||
let (good_last_data_shred, _, _) =
|
||||
shredder.entries_to_shreds(&[good_last_entry], true, self.next_shred_index);
|
||||
shredder.entries_to_shreds(keypair, &[good_last_entry], true, self.next_shred_index);
|
||||
|
||||
let (bad_last_data_shred, _, _) =
|
||||
// Don't mark the last shred as last so that validators won't know that
|
||||
// they've gotten all the shreds, and will continue trying to repair
|
||||
shredder.entries_to_shreds(&[bad_last_entry], false, self.next_shred_index);
|
||||
shredder.entries_to_shreds(keypair, &[bad_last_entry], false, self.next_shred_index);
|
||||
|
||||
self.next_shred_index += 1;
|
||||
(good_last_data_shred, bad_last_data_shred)
|
||||
|
@ -13,7 +13,7 @@ use solana_ledger::{
|
||||
},
|
||||
};
|
||||
use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us};
|
||||
use std::{collections::HashMap, ops::Deref, sync::RwLock, time::Duration};
|
||||
use std::{collections::HashMap, sync::RwLock, time::Duration};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct StandardBroadcastRun {
|
||||
@ -23,7 +23,6 @@ pub struct StandardBroadcastRun {
|
||||
unfinished_slot: Option<UnfinishedSlotInfo>,
|
||||
current_slot_and_parent: Option<(u64, u64)>,
|
||||
slot_broadcast_start: Option<Instant>,
|
||||
keypair: Arc<Keypair>,
|
||||
shred_version: u16,
|
||||
last_datapoint_submit: Arc<AtomicU64>,
|
||||
num_batches: usize,
|
||||
@ -38,7 +37,7 @@ struct BroadcastPeerCache {
|
||||
}
|
||||
|
||||
impl StandardBroadcastRun {
|
||||
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
|
||||
pub(super) fn new(shred_version: u16) -> Self {
|
||||
Self {
|
||||
process_shreds_stats: ProcessShredsStats::default(),
|
||||
transmit_shreds_stats: Arc::default(),
|
||||
@ -46,7 +45,6 @@ impl StandardBroadcastRun {
|
||||
unfinished_slot: None,
|
||||
current_slot_and_parent: None,
|
||||
slot_broadcast_start: None,
|
||||
keypair,
|
||||
shred_version,
|
||||
last_datapoint_submit: Arc::default(),
|
||||
num_batches: 0,
|
||||
@ -60,6 +58,7 @@ impl StandardBroadcastRun {
|
||||
// shreds buffered.
|
||||
fn finish_prev_slot(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
max_ticks_in_slot: u8,
|
||||
stats: &mut ProcessShredsStats,
|
||||
) -> Vec<Shred> {
|
||||
@ -83,10 +82,10 @@ impl StandardBroadcastRun {
|
||||
self.shred_version,
|
||||
fec_set_index.unwrap(),
|
||||
);
|
||||
Shredder::sign_shred(self.keypair.deref(), &mut shred);
|
||||
Shredder::sign_shred(keypair, &mut shred);
|
||||
state.data_shreds_buffer.push(shred.clone());
|
||||
let mut shreds = make_coding_shreds(
|
||||
self.keypair.deref(),
|
||||
keypair,
|
||||
&mut self.unfinished_slot,
|
||||
true, // is_last_in_slot
|
||||
stats,
|
||||
@ -101,6 +100,7 @@ impl StandardBroadcastRun {
|
||||
|
||||
fn entries_to_data_shreds(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
entries: &[Entry],
|
||||
blockstore: &Blockstore,
|
||||
reference_tick: u8,
|
||||
@ -118,21 +118,17 @@ impl StandardBroadcastRun {
|
||||
None => (0, 0),
|
||||
},
|
||||
};
|
||||
let (data_shreds, next_shred_index) = Shredder::new(
|
||||
slot,
|
||||
parent_slot,
|
||||
self.keypair.clone(),
|
||||
reference_tick,
|
||||
self.shred_version,
|
||||
)
|
||||
.unwrap()
|
||||
.entries_to_data_shreds(
|
||||
entries,
|
||||
is_slot_end,
|
||||
next_shred_index,
|
||||
fec_set_offset,
|
||||
process_stats,
|
||||
);
|
||||
let (data_shreds, next_shred_index) =
|
||||
Shredder::new(slot, parent_slot, reference_tick, self.shred_version)
|
||||
.unwrap()
|
||||
.entries_to_data_shreds(
|
||||
keypair,
|
||||
entries,
|
||||
is_slot_end,
|
||||
next_shred_index,
|
||||
fec_set_offset,
|
||||
process_stats,
|
||||
);
|
||||
let mut data_shreds_buffer = match &mut self.unfinished_slot {
|
||||
Some(state) => {
|
||||
assert_eq!(state.slot, slot);
|
||||
@ -154,6 +150,7 @@ impl StandardBroadcastRun {
|
||||
#[cfg(test)]
|
||||
fn test_process_receive_results(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
cluster_info: &ClusterInfo,
|
||||
sock: &UdpSocket,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
@ -161,7 +158,7 @@ impl StandardBroadcastRun {
|
||||
) -> Result<()> {
|
||||
let (bsend, brecv) = channel();
|
||||
let (ssend, srecv) = channel();
|
||||
self.process_receive_results(blockstore, &ssend, &bsend, receive_results)?;
|
||||
self.process_receive_results(keypair, blockstore, &ssend, &bsend, receive_results)?;
|
||||
let srecv = Arc::new(Mutex::new(srecv));
|
||||
let brecv = Arc::new(Mutex::new(brecv));
|
||||
//data
|
||||
@ -175,6 +172,7 @@ impl StandardBroadcastRun {
|
||||
|
||||
fn process_receive_results(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
@ -205,12 +203,13 @@ impl StandardBroadcastRun {
|
||||
|
||||
// 1) Check if slot was interrupted
|
||||
let prev_slot_shreds =
|
||||
self.finish_prev_slot(bank.ticks_per_slot() as u8, &mut process_stats);
|
||||
self.finish_prev_slot(keypair, bank.ticks_per_slot() as u8, &mut process_stats);
|
||||
|
||||
// 2) Convert entries to shreds and coding shreds
|
||||
let is_last_in_slot = last_tick_height == bank.max_tick_height();
|
||||
let reference_tick = bank.tick_height() % bank.ticks_per_slot();
|
||||
let data_shreds = self.entries_to_data_shreds(
|
||||
keypair,
|
||||
&receive_results.entries,
|
||||
blockstore,
|
||||
reference_tick as u8,
|
||||
@ -273,7 +272,7 @@ impl StandardBroadcastRun {
|
||||
|
||||
// Create and send coding shreds
|
||||
let coding_shreds = make_coding_shreds(
|
||||
self.keypair.deref(),
|
||||
keypair,
|
||||
&mut self.unfinished_slot,
|
||||
is_last_in_slot,
|
||||
&mut process_stats,
|
||||
@ -456,6 +455,7 @@ fn make_coding_shreds(
|
||||
impl BroadcastRun for StandardBroadcastRun {
|
||||
fn run(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
@ -465,6 +465,7 @@ impl BroadcastRun for StandardBroadcastRun {
|
||||
// TODO: Confirm that last chunk of coding shreds
|
||||
// will not be lost or delayed for too long.
|
||||
self.process_receive_results(
|
||||
keypair,
|
||||
blockstore,
|
||||
socket_sender,
|
||||
blockstore_sender,
|
||||
@ -505,6 +506,7 @@ mod test {
|
||||
genesis_config::GenesisConfig,
|
||||
signature::{Keypair, Signer},
|
||||
};
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -544,7 +546,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_interrupted_slot_last_shred() {
|
||||
let keypair = Arc::new(Keypair::new());
|
||||
let mut run = StandardBroadcastRun::new(keypair.clone(), 0);
|
||||
let mut run = StandardBroadcastRun::new(0);
|
||||
|
||||
// Set up the slot to be interrupted
|
||||
let next_shred_index = 10;
|
||||
@ -563,7 +565,7 @@ mod test {
|
||||
run.current_slot_and_parent = Some((4, 2));
|
||||
|
||||
// Slot 2 interrupted slot 1
|
||||
let shreds = run.finish_prev_slot(0, &mut ProcessShredsStats::default());
|
||||
let shreds = run.finish_prev_slot(&keypair, 0, &mut ProcessShredsStats::default());
|
||||
let shred = shreds
|
||||
.get(0)
|
||||
.expect("Expected a shred that signals an interrupt");
|
||||
@ -593,9 +595,15 @@ mod test {
|
||||
};
|
||||
|
||||
// Step 1: Make an incomplete transmission for slot 0
|
||||
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair.clone(), 0);
|
||||
let mut standard_broadcast_run = StandardBroadcastRun::new(0);
|
||||
standard_broadcast_run
|
||||
.test_process_receive_results(&cluster_info, &socket, &blockstore, receive_results)
|
||||
.test_process_receive_results(
|
||||
&leader_keypair,
|
||||
&cluster_info,
|
||||
&socket,
|
||||
&blockstore,
|
||||
receive_results,
|
||||
)
|
||||
.unwrap();
|
||||
let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap();
|
||||
assert_eq!(unfinished_slot.next_shred_index as u64, num_shreds_per_slot);
|
||||
@ -653,7 +661,13 @@ mod test {
|
||||
last_tick_height: (ticks1.len() - 1) as u64,
|
||||
};
|
||||
standard_broadcast_run
|
||||
.test_process_receive_results(&cluster_info, &socket, &blockstore, receive_results)
|
||||
.test_process_receive_results(
|
||||
&leader_keypair,
|
||||
&cluster_info,
|
||||
&socket,
|
||||
&blockstore,
|
||||
receive_results,
|
||||
)
|
||||
.unwrap();
|
||||
let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap();
|
||||
|
||||
@ -699,7 +713,7 @@ mod test {
|
||||
let (bsend, brecv) = channel();
|
||||
let (ssend, _srecv) = channel();
|
||||
let mut last_tick_height = 0;
|
||||
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair, 0);
|
||||
let mut standard_broadcast_run = StandardBroadcastRun::new(0);
|
||||
let mut process_ticks = |num_ticks| {
|
||||
let ticks = create_ticks(num_ticks, 0, genesis_config.hash());
|
||||
last_tick_height += (ticks.len() - 1) as u64;
|
||||
@ -710,7 +724,13 @@ mod test {
|
||||
last_tick_height,
|
||||
};
|
||||
standard_broadcast_run
|
||||
.process_receive_results(&blockstore, &ssend, &bsend, receive_results)
|
||||
.process_receive_results(
|
||||
&leader_keypair,
|
||||
&blockstore,
|
||||
&ssend,
|
||||
&bsend,
|
||||
receive_results,
|
||||
)
|
||||
.unwrap();
|
||||
};
|
||||
for i in 0..3 {
|
||||
@ -751,9 +771,15 @@ mod test {
|
||||
last_tick_height: ticks.len() as u64,
|
||||
};
|
||||
|
||||
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair, 0);
|
||||
let mut standard_broadcast_run = StandardBroadcastRun::new(0);
|
||||
standard_broadcast_run
|
||||
.test_process_receive_results(&cluster_info, &socket, &blockstore, receive_results)
|
||||
.test_process_receive_results(
|
||||
&leader_keypair,
|
||||
&cluster_info,
|
||||
&socket,
|
||||
&blockstore,
|
||||
receive_results,
|
||||
)
|
||||
.unwrap();
|
||||
assert!(standard_broadcast_run.unfinished_slot.is_none())
|
||||
}
|
||||
|
@ -627,10 +627,10 @@ mod test {
|
||||
entries: &[Entry],
|
||||
slot: Slot,
|
||||
parent: Slot,
|
||||
keypair: &Arc<Keypair>,
|
||||
keypair: &Keypair,
|
||||
) -> Vec<Shred> {
|
||||
let shredder = Shredder::new(slot, parent, keypair.clone(), 0, 0).unwrap();
|
||||
shredder.entries_to_shreds(entries, true, 0).0
|
||||
let shredder = Shredder::new(slot, parent, 0, 0).unwrap();
|
||||
shredder.entries_to_shreds(keypair, entries, true, 0).0
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -639,7 +639,7 @@ mod test {
|
||||
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
|
||||
let num_entries = 10;
|
||||
let original_entries = create_ticks(num_entries, 0, Hash::default());
|
||||
let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Arc::new(Keypair::new()));
|
||||
let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Keypair::new());
|
||||
shreds.reverse();
|
||||
blockstore
|
||||
.insert_shreds(shreds, None, false)
|
||||
|
Reference in New Issue
Block a user