From 06eb2364f202f4c3a08687b072d7d236a84eb5d9 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Sat, 18 May 2019 15:24:50 -0700 Subject: [PATCH] Handle missed slots in storage stage (#4337) * Handle missed slots in storage stage * Fix test compile * Make test use the new missed slot support --- core/src/replay_stage.rs | 12 +++-- core/src/storage_stage.rs | 101 +++++++++++++++++++++----------------- 2 files changed, 64 insertions(+), 49 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5400c664ee..fa6893612b 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -83,7 +83,7 @@ impl ReplayStage { subscriptions: &Arc, poh_recorder: &Arc>, leader_schedule_cache: &Arc, - ) -> (Self, Receiver<(u64, Pubkey)>, Receiver) + ) -> (Self, Receiver<(u64, Pubkey)>, Receiver>) where T: 'static + KeypairUtil + Send + Sync, { @@ -302,17 +302,23 @@ impl ReplayStage { cluster_info: &Arc>, blocktree: &Arc, leader_schedule_cache: &Arc, - root_slot_sender: &Sender, + root_slot_sender: &Sender>, ) -> Result<()> where T: 'static + KeypairUtil + Send + Sync, { if let Some(new_root) = locktower.record_vote(bank.slot()) { + let mut rooted_slots = bank + .parents() + .into_iter() + .map(|bank| bank.slot()) + .collect::>(); + rooted_slots.push(bank.slot()); bank_forks.write().unwrap().set_root(new_root); leader_schedule_cache.set_root(new_root); blocktree.set_root(new_root)?; Self::handle_new_root(&bank_forks, progress); - root_slot_sender.send(new_root)?; + root_slot_sender.send(rooted_slots)?; } locktower.update_epoch(&bank); if let Some(ref voting_keypair) = voting_keypair { diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 19b8ecb4f1..22bcf69f3f 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -132,7 +132,7 @@ impl StorageStage { #[allow(clippy::too_many_arguments)] pub fn new( storage_state: &StorageState, - slot_receiver: Receiver, + slot_receiver: Receiver>, blocktree: Option>, keypair: &Arc, storage_keypair: &Arc, @@ -152,6 +152,7 @@ impl StorageStage { .spawn(move || { let mut current_key = 0; let mut slot_count = 0; + let mut last_root = 0; loop { if let Some(ref some_blocktree) = blocktree { if let Err(e) = Self::process_entries( @@ -160,6 +161,7 @@ impl StorageStage { &slot_receiver, &some_blocktree, &mut slot_count, + &mut last_root, &mut current_key, storage_rotate_count, &instruction_sender, @@ -391,53 +393,58 @@ impl StorageStage { fn process_entries( storage_keypair: &Arc, storage_state: &Arc>, - slot_receiver: &Receiver, + slot_receiver: &Receiver>, blocktree: &Arc, slot_count: &mut u64, + last_root: &mut u64, current_key_idx: &mut usize, storage_rotate_count: u64, instruction_sender: &InstructionSender, ) -> Result<()> { let timeout = Duration::new(1, 0); - let slot: u64 = slot_receiver.recv_timeout(timeout)?; - *slot_count += 1; - // Todo check if any rooted slots were missed leading up to this one and bump slot count and process proofs for each missed root - // Update the advertised blockhash to the latest root directly. + let slots: Vec = slot_receiver.recv_timeout(timeout)?; + // check if any rooted slots were missed leading up to this one and bump slot count and process proofs for each missed root + for slot in slots.into_iter().rev() { + if slot > *last_root { + *slot_count += 1; + *last_root = slot; - if let Ok(entries) = blocktree.get_slot_entries(slot, 0, None) { - for entry in &entries { - // Go through the transactions, find proofs, and use them to update - // the storage_keys with their signatures - for tx in &entry.transactions { - for (i, program_id) in tx.message.program_ids().iter().enumerate() { - if solana_storage_api::check_id(&program_id) { - Self::process_storage_transaction( - &tx.message().instructions[i].data, - slot, - storage_state, - current_key_idx, - tx.message.account_keys[0], - ); + if let Ok(entries) = blocktree.get_slot_entries(slot, 0, None) { + for entry in &entries { + // Go through the transactions, find proofs, and use them to update + // the storage_keys with their signatures + for tx in &entry.transactions { + for (i, program_id) in tx.message.program_ids().iter().enumerate() { + if solana_storage_api::check_id(&program_id) { + Self::process_storage_transaction( + &tx.message().instructions[i].data, + slot, + storage_state, + current_key_idx, + tx.message.account_keys[0], + ); + } + } } } + if *slot_count % storage_rotate_count == 0 { + // assume the last entry in the slot is the blockhash for that slot + let entry_hash = entries.last().unwrap().hash; + debug!( + "crosses sending at root slot: {}! with last entry's hash {}", + slot_count, entry_hash + ); + Self::process_entry_crossing( + &storage_keypair, + &storage_state, + &blocktree, + entries.last().unwrap().hash, + slot, + instruction_sender, + )?; + } } } - if *slot_count % storage_rotate_count == 0 { - // assume the last entry in the slot is the blockhash for that slot - let entry_hash = entries.last().unwrap().hash; - debug!( - "crosses sending at root slot: {}! with last entry's hash {}", - slot_count, entry_hash - ); - Self::process_entry_crossing( - &storage_keypair, - &storage_state, - &blocktree, - entries.last().unwrap().hash, - slot, - instruction_sender, - )?; - } } Ok(()) } @@ -543,7 +550,7 @@ mod tests { STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); - slot_sender.send(slot).unwrap(); + slot_sender.send(vec![slot]).unwrap(); let keypair = Keypair::new(); let hash = Hash::default(); @@ -551,13 +558,15 @@ mod tests { let mut result = storage_state.get_mining_result(&signature); assert_eq!(result, Hash::default()); - for i in slot..slot + SLOTS_PER_SEGMENT + 1 { - blocktree - .write_entries(i, 0, 0, ticks_per_slot, &entries) - .unwrap(); - - slot_sender.send(i).unwrap(); - } + let rooted_slots = (slot..slot + SLOTS_PER_SEGMENT + 1) + .map(|i| { + blocktree + .write_entries(i, 0, 0, ticks_per_slot, &entries) + .unwrap(); + i + }) + .collect::>(); + slot_sender.send(rooted_slots).unwrap(); for _ in 0..5 { result = storage_state.get_mining_result(&signature); if result != Hash::default() { @@ -614,7 +623,7 @@ mod tests { STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); - slot_sender.send(1).unwrap(); + slot_sender.send(vec![1]).unwrap(); let mut reference_keys; { @@ -637,7 +646,7 @@ mod tests { blocktree .write_entries(2, 0, 0, ticks_per_slot, &proof_entries) .unwrap(); - slot_sender.send(2).unwrap(); + slot_sender.send(vec![2]).unwrap(); for _ in 0..5 { {