Don't grab keypair from cluster info on every iteration of broadccast (#6303)
This commit is contained in:
@ -45,14 +45,17 @@ impl BroadcastStageType {
|
|||||||
blocktree: &Arc<Blocktree>,
|
blocktree: &Arc<Blocktree>,
|
||||||
) -> BroadcastStage {
|
) -> BroadcastStage {
|
||||||
match self {
|
match self {
|
||||||
BroadcastStageType::Standard => BroadcastStage::new(
|
BroadcastStageType::Standard => {
|
||||||
|
let keypair = cluster_info.read().unwrap().keypair.clone();
|
||||||
|
BroadcastStage::new(
|
||||||
sock,
|
sock,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
receiver,
|
receiver,
|
||||||
exit_sender,
|
exit_sender,
|
||||||
blocktree,
|
blocktree,
|
||||||
StandardBroadcastRun::new(),
|
StandardBroadcastRun::new(keypair),
|
||||||
),
|
)
|
||||||
|
}
|
||||||
|
|
||||||
BroadcastStageType::FailEntryVerification => BroadcastStage::new(
|
BroadcastStageType::FailEntryVerification => BroadcastStage::new(
|
||||||
sock,
|
sock,
|
||||||
@ -235,6 +238,7 @@ mod test {
|
|||||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000);
|
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000);
|
||||||
let bank = Arc::new(Bank::new(&genesis_block));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
|
|
||||||
|
let leader_keypair = cluster_info.read().unwrap().keypair.clone();
|
||||||
// Start up the broadcast stage
|
// Start up the broadcast stage
|
||||||
let broadcast_service = BroadcastStage::new(
|
let broadcast_service = BroadcastStage::new(
|
||||||
leader_info.sockets.broadcast,
|
leader_info.sockets.broadcast,
|
||||||
@ -242,7 +246,7 @@ mod test {
|
|||||||
entry_receiver,
|
entry_receiver,
|
||||||
&exit_sender,
|
&exit_sender,
|
||||||
&blocktree,
|
&blocktree,
|
||||||
StandardBroadcastRun::new(),
|
StandardBroadcastRun::new(leader_keypair),
|
||||||
);
|
);
|
||||||
|
|
||||||
MockBroadcastStage {
|
MockBroadcastStage {
|
||||||
|
@ -32,15 +32,17 @@ pub(super) struct StandardBroadcastRun {
|
|||||||
unfinished_slot: Option<UnfinishedSlotInfo>,
|
unfinished_slot: Option<UnfinishedSlotInfo>,
|
||||||
current_slot_and_parent: Option<(u64, u64)>,
|
current_slot_and_parent: Option<(u64, u64)>,
|
||||||
slot_broadcast_start: Option<Instant>,
|
slot_broadcast_start: Option<Instant>,
|
||||||
|
keypair: Arc<Keypair>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StandardBroadcastRun {
|
impl StandardBroadcastRun {
|
||||||
pub(super) fn new() -> Self {
|
pub(super) fn new(keypair: Arc<Keypair>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
stats: BroadcastStats::default(),
|
stats: BroadcastStats::default(),
|
||||||
unfinished_slot: None,
|
unfinished_slot: None,
|
||||||
current_slot_and_parent: None,
|
current_slot_and_parent: None,
|
||||||
slot_broadcast_start: None,
|
slot_broadcast_start: None,
|
||||||
|
keypair,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,11 +100,15 @@ impl StandardBroadcastRun {
|
|||||||
&mut self,
|
&mut self,
|
||||||
blocktree: &Blocktree,
|
blocktree: &Blocktree,
|
||||||
entries: &[Entry],
|
entries: &[Entry],
|
||||||
keypair: Arc<Keypair>,
|
|
||||||
is_slot_end: bool,
|
is_slot_end: bool,
|
||||||
) -> (Vec<Shred>, Vec<Shred>) {
|
) -> (Vec<Shred>, Vec<Shred>) {
|
||||||
let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
|
let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
|
||||||
let shredder = Shredder::new(slot, parent_slot, RECOMMENDED_FEC_RATE, keypair)
|
let shredder = Shredder::new(
|
||||||
|
slot,
|
||||||
|
parent_slot,
|
||||||
|
RECOMMENDED_FEC_RATE,
|
||||||
|
self.keypair.clone(),
|
||||||
|
)
|
||||||
.expect("Expected to create a new shredder");
|
.expect("Expected to create a new shredder");
|
||||||
|
|
||||||
let next_shred_index = self
|
let next_shred_index = self
|
||||||
@ -158,8 +164,6 @@ impl StandardBroadcastRun {
|
|||||||
receive_elapsed = Duration::new(0, 0);
|
receive_elapsed = Duration::new(0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
let keypair = cluster_info.read().unwrap().keypair.clone();
|
|
||||||
|
|
||||||
let to_shreds_start = Instant::now();
|
let to_shreds_start = Instant::now();
|
||||||
|
|
||||||
// 1) Check if slot was interrupted
|
// 1) Check if slot was interrupted
|
||||||
@ -169,7 +173,6 @@ impl StandardBroadcastRun {
|
|||||||
let (data_shreds, coding_shreds) = self.entries_to_shreds(
|
let (data_shreds, coding_shreds) = self.entries_to_shreds(
|
||||||
blocktree,
|
blocktree,
|
||||||
&receive_results.entries,
|
&receive_results.entries,
|
||||||
keypair,
|
|
||||||
last_tick == bank.max_tick_height(),
|
last_tick == bank.max_tick_height(),
|
||||||
);
|
);
|
||||||
let to_shreds_elapsed = to_shreds_start.elapsed();
|
let to_shreds_elapsed = to_shreds_start.elapsed();
|
||||||
@ -309,7 +312,7 @@ mod test {
|
|||||||
GenesisBlock,
|
GenesisBlock,
|
||||||
Arc<RwLock<ClusterInfo>>,
|
Arc<RwLock<ClusterInfo>>,
|
||||||
Arc<Bank>,
|
Arc<Bank>,
|
||||||
Keypair,
|
Arc<Keypair>,
|
||||||
UdpSocket,
|
UdpSocket,
|
||||||
) {
|
) {
|
||||||
// Setup
|
// Setup
|
||||||
@ -317,7 +320,7 @@ mod test {
|
|||||||
let blocktree = Arc::new(
|
let blocktree = Arc::new(
|
||||||
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
|
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
|
||||||
);
|
);
|
||||||
let leader_keypair = Keypair::new();
|
let leader_keypair = Arc::new(Keypair::new());
|
||||||
let leader_pubkey = leader_keypair.pubkey();
|
let leader_pubkey = leader_keypair.pubkey();
|
||||||
let leader_info = Node::new_localhost_with_pubkey(&leader_pubkey);
|
let leader_info = Node::new_localhost_with_pubkey(&leader_pubkey);
|
||||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
||||||
@ -354,7 +357,7 @@ mod test {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Step 1: Make an incomplete transmission for slot 0
|
// Step 1: Make an incomplete transmission for slot 0
|
||||||
let mut standard_broadcast_run = StandardBroadcastRun::new();
|
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair.clone());
|
||||||
standard_broadcast_run
|
standard_broadcast_run
|
||||||
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
|
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@ -408,7 +411,8 @@ mod test {
|
|||||||
fn test_slot_finish() {
|
fn test_slot_finish() {
|
||||||
// Setup
|
// Setup
|
||||||
let num_shreds_per_slot = 2;
|
let num_shreds_per_slot = 2;
|
||||||
let (blocktree, genesis_block, cluster_info, bank0, _, socket) = setup(num_shreds_per_slot);
|
let (blocktree, genesis_block, cluster_info, bank0, leader_keypair, socket) =
|
||||||
|
setup(num_shreds_per_slot);
|
||||||
|
|
||||||
// Insert complete slot of ticks needed to finish the slot
|
// Insert complete slot of ticks needed to finish the slot
|
||||||
let ticks = create_ticks(genesis_block.ticks_per_slot, genesis_block.hash());
|
let ticks = create_ticks(genesis_block.ticks_per_slot, genesis_block.hash());
|
||||||
@ -419,7 +423,7 @@ mod test {
|
|||||||
last_tick: (ticks.len() - 1) as u64,
|
last_tick: (ticks.len() - 1) as u64,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut standard_broadcast_run = StandardBroadcastRun::new();
|
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair);
|
||||||
standard_broadcast_run
|
standard_broadcast_run
|
||||||
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
|
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
Reference in New Issue
Block a user