diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs index 5eec39902d..3cefb53112 100644 --- a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -45,6 +45,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun { keypair, latest_blob_index, bank.parent().unwrap().slot(), + None, ); // If the last blockhash is default, a new block is being created @@ -65,6 +66,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun { keypair, latest_blob_index, bank.parent().unwrap().slot(), + None, ); // If it's the last tick, reset the last block hash to default diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index 63cdd6effd..e4876798db 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -15,6 +15,13 @@ pub(super) struct ReceiveResults { pub last_tick: u64, } +#[derive(Copy, Clone)] +pub struct UnfinishedSlotInfo { + pub next_index: u64, + pub slot: u64, + pub parent: u64, +} + /// Theis parameter tunes how many entries are received in one iteration of recv loop /// This will prevent broadcast stage from consuming more entries, that could have led /// to delays in shredding, and broadcasting shreds to peer validators @@ -72,7 +79,27 @@ pub(super) fn entries_to_shreds( keypair: &Arc, latest_shred_index: u64, parent_slot: u64, -) -> (Vec, u64) { + last_unfinished_slot: Option, +) -> (Vec, Option) { + let mut shreds = if let Some(unfinished_slot) = last_unfinished_slot { + if unfinished_slot.slot != slot { + let mut shredder = Shredder::new( + unfinished_slot.slot, + unfinished_slot.parent, + RECOMMENDED_FEC_RATE, + keypair, + unfinished_slot.next_index as u32, + ) + .expect("Expected to create a new shredder"); + shredder.finalize_slot(); + shredder.shreds.drain(..).collect() + } else { + vec![] + } + } else { + vec![] + }; + let mut shredder = Shredder::new( slot, parent_slot, @@ -85,17 +112,23 @@ pub(super) fn entries_to_shreds( bincode::serialize_into(&mut shredder, &entries) .expect("Expect to write all entries to shreds"); - if last_tick == bank_max_tick { + let unfinished_slot = if last_tick == bank_max_tick { shredder.finalize_slot(); + None } else { shredder.finalize_data(); - } + Some(UnfinishedSlotInfo { + next_index: u64::from(shredder.index), + slot, + parent: parent_slot, + }) + }; - let shred_infos: Vec = shredder.shreds.drain(..).collect(); + shreds.append(&mut shredder.shreds); - trace!("Inserting {:?} shreds in blocktree", shred_infos.len()); + trace!("Inserting {:?} shreds in blocktree", shreds.len()); - (shred_infos, u64::from(shredder.index)) + (shreds, unfinished_slot) } #[cfg(test)] diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 9cd8463c5c..6cf88319a7 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -44,6 +44,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { keypair, latest_blob_index, bank.parent().unwrap().slot(), + None, ); let seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect(); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index a51e7fc47e..818f34c078 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -1,6 +1,6 @@ use super::broadcast_utils; use super::*; -use crate::broadcast_stage::broadcast_utils::entries_to_shreds; +use crate::broadcast_stage::broadcast_utils::{entries_to_shreds, UnfinishedSlotInfo}; use solana_sdk::timing::duration_as_ms; #[derive(Default)] @@ -12,12 +12,14 @@ struct BroadcastStats { pub(super) struct StandardBroadcastRun { stats: BroadcastStats, + unfinished_slot: Option, } impl StandardBroadcastRun { pub(super) fn new() -> Self { Self { stats: BroadcastStats::default(), + unfinished_slot: None, } } @@ -91,7 +93,7 @@ impl BroadcastRun for StandardBroadcastRun { }; let to_shreds_start = Instant::now(); - let (shred_infos, latest_shred_index) = entries_to_shreds( + let (shred_infos, uninished_slot) = entries_to_shreds( receive_results.entries, last_tick, bank.slot(), @@ -99,8 +101,10 @@ impl BroadcastRun for StandardBroadcastRun { keypair, latest_shred_index, parent_slot, + self.unfinished_slot, ); let to_shreds_elapsed = to_shreds_start.elapsed(); + self.unfinished_slot = uninished_slot; let all_seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect(); let num_shreds = shred_infos.len(); @@ -125,6 +129,13 @@ impl BroadcastRun for StandardBroadcastRun { )?; let broadcast_elapsed = broadcast_start.elapsed(); + let latest_shred_index = uninished_slot.map(|s| s.next_index).unwrap_or_else(|| { + blocktree + .meta(bank.slot()) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0) + }); self.update_broadcast_stats( duration_as_ms(&receive_elapsed), duration_as_ms(&to_shreds_elapsed),