Handle missed slots in storage stage (#4337)

* Handle missed slots in storage stage

* Fix test compile

* Make test use the new missed slot support
This commit is contained in:
Sagar Dhawan
2019-05-18 15:24:50 -07:00
committed by GitHub
parent 167890ca63
commit 06eb2364f2
2 changed files with 64 additions and 49 deletions

View File

@ -83,7 +83,7 @@ impl ReplayStage {
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> (Self, Receiver<(u64, Pubkey)>, Receiver<u64>) ) -> (Self, Receiver<(u64, Pubkey)>, Receiver<Vec<u64>>)
where where
T: 'static + KeypairUtil + Send + Sync, T: 'static + KeypairUtil + Send + Sync,
{ {
@ -302,17 +302,23 @@ impl ReplayStage {
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
root_slot_sender: &Sender<u64>, root_slot_sender: &Sender<Vec<u64>>,
) -> Result<()> ) -> Result<()>
where where
T: 'static + KeypairUtil + Send + Sync, T: 'static + KeypairUtil + Send + Sync,
{ {
if let Some(new_root) = locktower.record_vote(bank.slot()) { if let Some(new_root) = locktower.record_vote(bank.slot()) {
let mut rooted_slots = bank
.parents()
.into_iter()
.map(|bank| bank.slot())
.collect::<Vec<_>>();
rooted_slots.push(bank.slot());
bank_forks.write().unwrap().set_root(new_root); bank_forks.write().unwrap().set_root(new_root);
leader_schedule_cache.set_root(new_root); leader_schedule_cache.set_root(new_root);
blocktree.set_root(new_root)?; blocktree.set_root(new_root)?;
Self::handle_new_root(&bank_forks, progress); Self::handle_new_root(&bank_forks, progress);
root_slot_sender.send(new_root)?; root_slot_sender.send(rooted_slots)?;
} }
locktower.update_epoch(&bank); locktower.update_epoch(&bank);
if let Some(ref voting_keypair) = voting_keypair { if let Some(ref voting_keypair) = voting_keypair {

View File

@ -132,7 +132,7 @@ impl StorageStage {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
storage_state: &StorageState, storage_state: &StorageState,
slot_receiver: Receiver<u64>, slot_receiver: Receiver<Vec<u64>>,
blocktree: Option<Arc<Blocktree>>, blocktree: Option<Arc<Blocktree>>,
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
@ -152,6 +152,7 @@ impl StorageStage {
.spawn(move || { .spawn(move || {
let mut current_key = 0; let mut current_key = 0;
let mut slot_count = 0; let mut slot_count = 0;
let mut last_root = 0;
loop { loop {
if let Some(ref some_blocktree) = blocktree { if let Some(ref some_blocktree) = blocktree {
if let Err(e) = Self::process_entries( if let Err(e) = Self::process_entries(
@ -160,6 +161,7 @@ impl StorageStage {
&slot_receiver, &slot_receiver,
&some_blocktree, &some_blocktree,
&mut slot_count, &mut slot_count,
&mut last_root,
&mut current_key, &mut current_key,
storage_rotate_count, storage_rotate_count,
&instruction_sender, &instruction_sender,
@ -391,53 +393,58 @@ impl StorageStage {
fn process_entries( fn process_entries(
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
storage_state: &Arc<RwLock<StorageStateInner>>, storage_state: &Arc<RwLock<StorageStateInner>>,
slot_receiver: &Receiver<u64>, slot_receiver: &Receiver<Vec<u64>>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
slot_count: &mut u64, slot_count: &mut u64,
last_root: &mut u64,
current_key_idx: &mut usize, current_key_idx: &mut usize,
storage_rotate_count: u64, storage_rotate_count: u64,
instruction_sender: &InstructionSender, instruction_sender: &InstructionSender,
) -> Result<()> { ) -> Result<()> {
let timeout = Duration::new(1, 0); let timeout = Duration::new(1, 0);
let slot: u64 = slot_receiver.recv_timeout(timeout)?; let slots: Vec<u64> = slot_receiver.recv_timeout(timeout)?;
*slot_count += 1; // check if any rooted slots were missed leading up to this one and bump slot count and process proofs for each missed root
// Todo check if any rooted slots were missed leading up to this one and bump slot count and process proofs for each missed root for slot in slots.into_iter().rev() {
// Update the advertised blockhash to the latest root directly. if slot > *last_root {
*slot_count += 1;
*last_root = slot;
if let Ok(entries) = blocktree.get_slot_entries(slot, 0, None) { if let Ok(entries) = blocktree.get_slot_entries(slot, 0, None) {
for entry in &entries { for entry in &entries {
// Go through the transactions, find proofs, and use them to update // Go through the transactions, find proofs, and use them to update
// the storage_keys with their signatures // the storage_keys with their signatures
for tx in &entry.transactions { for tx in &entry.transactions {
for (i, program_id) in tx.message.program_ids().iter().enumerate() { for (i, program_id) in tx.message.program_ids().iter().enumerate() {
if solana_storage_api::check_id(&program_id) { if solana_storage_api::check_id(&program_id) {
Self::process_storage_transaction( Self::process_storage_transaction(
&tx.message().instructions[i].data, &tx.message().instructions[i].data,
slot, slot,
storage_state, storage_state,
current_key_idx, current_key_idx,
tx.message.account_keys[0], tx.message.account_keys[0],
); );
}
}
} }
} }
if *slot_count % storage_rotate_count == 0 {
// assume the last entry in the slot is the blockhash for that slot
let entry_hash = entries.last().unwrap().hash;
debug!(
"crosses sending at root slot: {}! with last entry's hash {}",
slot_count, entry_hash
);
Self::process_entry_crossing(
&storage_keypair,
&storage_state,
&blocktree,
entries.last().unwrap().hash,
slot,
instruction_sender,
)?;
}
} }
} }
if *slot_count % storage_rotate_count == 0 {
// assume the last entry in the slot is the blockhash for that slot
let entry_hash = entries.last().unwrap().hash;
debug!(
"crosses sending at root slot: {}! with last entry's hash {}",
slot_count, entry_hash
);
Self::process_entry_crossing(
&storage_keypair,
&storage_state,
&blocktree,
entries.last().unwrap().hash,
slot,
instruction_sender,
)?;
}
} }
Ok(()) Ok(())
} }
@ -543,7 +550,7 @@ mod tests {
STORAGE_ROTATE_TEST_COUNT, STORAGE_ROTATE_TEST_COUNT,
&cluster_info, &cluster_info,
); );
slot_sender.send(slot).unwrap(); slot_sender.send(vec![slot]).unwrap();
let keypair = Keypair::new(); let keypair = Keypair::new();
let hash = Hash::default(); let hash = Hash::default();
@ -551,13 +558,15 @@ mod tests {
let mut result = storage_state.get_mining_result(&signature); let mut result = storage_state.get_mining_result(&signature);
assert_eq!(result, Hash::default()); assert_eq!(result, Hash::default());
for i in slot..slot + SLOTS_PER_SEGMENT + 1 { let rooted_slots = (slot..slot + SLOTS_PER_SEGMENT + 1)
blocktree .map(|i| {
.write_entries(i, 0, 0, ticks_per_slot, &entries) blocktree
.unwrap(); .write_entries(i, 0, 0, ticks_per_slot, &entries)
.unwrap();
slot_sender.send(i).unwrap(); i
} })
.collect::<Vec<_>>();
slot_sender.send(rooted_slots).unwrap();
for _ in 0..5 { for _ in 0..5 {
result = storage_state.get_mining_result(&signature); result = storage_state.get_mining_result(&signature);
if result != Hash::default() { if result != Hash::default() {
@ -614,7 +623,7 @@ mod tests {
STORAGE_ROTATE_TEST_COUNT, STORAGE_ROTATE_TEST_COUNT,
&cluster_info, &cluster_info,
); );
slot_sender.send(1).unwrap(); slot_sender.send(vec![1]).unwrap();
let mut reference_keys; let mut reference_keys;
{ {
@ -637,7 +646,7 @@ mod tests {
blocktree blocktree
.write_entries(2, 0, 0, ticks_per_slot, &proof_entries) .write_entries(2, 0, 0, ticks_per_slot, &proof_entries)
.unwrap(); .unwrap();
slot_sender.send(2).unwrap(); slot_sender.send(vec![2]).unwrap();
for _ in 0..5 { for _ in 0..5 {
{ {