buffers data shreds to make larger erasure coded sets (#15849)

Broadcast stage batches up to 8 entries:
https://github.com/solana-labs/solana/blob/79280b304/core/src/broadcast_stage/broadcast_utils.rs#L26-L29
which will be serialized into some number of shreds and chunked into FEC
sets of at most 32 shreds each:
https://github.com/solana-labs/solana/blob/79280b304/ledger/src/shred.rs#L576-L597
So depending on the size of entries, FEC sets can be small, which may
aggravate loss rate.
For example 16 FEC sets of 2:2 data/code shreds each have higher loss
rate than one 32:32 set.

This commit broadcasts data shreds immediately, but also buffers them
until it has a batch of 32 data shreds, at which point 32 coding shreds
are generated and broadcasted.
This commit is contained in:
behzad nouri
2021-03-23 14:52:38 +00:00
committed by GitHub
parent 57ba86c821
commit 4f82b897bc
7 changed files with 333 additions and 179 deletions

View File

@ -472,12 +472,14 @@ pub mod test {
) {
let num_entries = max_ticks_per_n_shreds(num, None);
let (data_shreds, _) = make_slot_entries(slot, 0, num_entries);
let keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(slot, 0, RECOMMENDED_FEC_RATE, keypair, 0, 0)
.expect("Expected to create a new shredder");
let coding_shreds = shredder
.data_shreds_to_coding_shreds(&data_shreds[0..], &mut ProcessShredsStats::default());
let keypair = Keypair::new();
let coding_shreds = Shredder::data_shreds_to_coding_shreds(
&keypair,
&data_shreds[0..],
RECOMMENDED_FEC_RATE,
&mut ProcessShredsStats::default(),
)
.unwrap();
(
data_shreds.clone(),
coding_shreds.clone(),

View File

@ -1,6 +1,6 @@
use crate::poh_recorder::WorkingBankEntry;
use crate::result::Result;
use solana_ledger::entry::Entry;
use solana_ledger::{entry::Entry, shred::Shred};
use solana_runtime::bank::Bank;
use solana_sdk::clock::Slot;
use std::{
@ -16,11 +16,15 @@ pub(super) struct ReceiveResults {
pub last_tick_height: u64,
}
#[derive(Copy, Clone)]
#[derive(Clone)]
pub struct UnfinishedSlotInfo {
pub next_shred_index: u32,
pub slot: Slot,
pub parent: Slot,
// Data shreds buffered to make a batch of size
// MAX_DATA_SHREDS_PER_FEC_BLOCK.
pub(crate) data_shreds_buffer: Vec<Shred>,
pub(crate) fec_set_offset: u32, // See Shredder::fec_set_index.
}
/// This parameter tunes how many entries are received in one iteration of recv loop

View File

@ -7,12 +7,13 @@ use super::{
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
use solana_ledger::{
entry::Entry,
shred::{ProcessShredsStats, Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK},
shred::{
ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, RECOMMENDED_FEC_RATE,
SHRED_TICK_REFERENCE_MASK,
},
};
use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us};
use std::collections::HashMap;
use std::sync::RwLock;
use std::time::Duration;
use std::{collections::HashMap, ops::Deref, sync::RwLock, time::Duration};
#[derive(Clone)]
pub struct StandardBroadcastRun {
@ -40,94 +41,114 @@ impl StandardBroadcastRun {
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
Self {
process_shreds_stats: ProcessShredsStats::default(),
transmit_shreds_stats: Arc::new(Mutex::new(SlotBroadcastStats::default())),
insert_shreds_stats: Arc::new(Mutex::new(SlotBroadcastStats::default())),
transmit_shreds_stats: Arc::default(),
insert_shreds_stats: Arc::default(),
unfinished_slot: None,
current_slot_and_parent: None,
slot_broadcast_start: None,
keypair,
shred_version,
last_datapoint_submit: Arc::new(AtomicU64::new(0)),
last_datapoint_submit: Arc::default(),
num_batches: 0,
broadcast_peer_cache: Arc::new(RwLock::new(BroadcastPeerCache::default())),
last_peer_update: Arc::new(AtomicU64::new(0)),
broadcast_peer_cache: Arc::default(),
last_peer_update: Arc::default(),
}
}
fn check_for_interrupted_slot(&mut self, max_ticks_in_slot: u8) -> Option<Shred> {
let (slot, _) = self.current_slot_and_parent.unwrap();
let mut last_unfinished_slot_shred = self
.unfinished_slot
.map(|last_unfinished_slot| {
if last_unfinished_slot.slot != slot {
self.report_and_reset_stats();
Some(Shred::new_from_data(
last_unfinished_slot.slot,
last_unfinished_slot.next_shred_index,
(last_unfinished_slot.slot - last_unfinished_slot.parent) as u16,
None,
true,
true,
max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK,
self.shred_version,
last_unfinished_slot.next_shred_index,
))
} else {
None
}
})
.unwrap_or(None);
// This shred should only be Some if the previous slot was interrupted
if let Some(ref mut shred) = last_unfinished_slot_shred {
Shredder::sign_shred(&self.keypair, shred);
self.unfinished_slot = None;
// If the current slot has changed, generates an empty shred indicating
// last shred in the previous slot, along with coding shreds for the data
// shreds buffered.
fn finish_prev_slot(
&mut self,
max_ticks_in_slot: u8,
stats: &mut ProcessShredsStats,
) -> Vec<Shred> {
let (current_slot, _) = self.current_slot_and_parent.unwrap();
match self.unfinished_slot {
None => Vec::default(),
Some(ref state) if state.slot == current_slot => Vec::default(),
Some(ref mut state) => {
let parent_offset = state.slot - state.parent;
let reference_tick = max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK;
let fec_set_index =
Shredder::fec_set_index(state.next_shred_index, state.fec_set_offset);
let mut shred = Shred::new_from_data(
state.slot,
state.next_shred_index,
parent_offset as u16,
None, // data
true, // is_last_in_fec_set
true, // is_last_in_slot
reference_tick,
self.shred_version,
fec_set_index.unwrap(),
);
Shredder::sign_shred(self.keypair.deref(), &mut shred);
state.data_shreds_buffer.push(shred.clone());
let mut shreds = make_coding_shreds(
self.keypair.deref(),
&mut self.unfinished_slot,
true, // is_last_in_slot
stats,
);
shreds.insert(0, shred);
self.report_and_reset_stats();
self.unfinished_slot = None;
shreds
}
}
}
last_unfinished_slot_shred
}
fn init_shredder(&self, blockstore: &Blockstore, reference_tick: u8) -> (Shredder, u32) {
let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
let next_shred_index = self
.unfinished_slot
.map(|s| s.next_shred_index)
.unwrap_or_else(|| {
blockstore
.meta(slot)
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0) as u32
});
(
Shredder::new(
slot,
parent_slot,
RECOMMENDED_FEC_RATE,
self.keypair.clone(),
reference_tick,
self.shred_version,
)
.expect("Expected to create a new shredder"),
next_shred_index,
)
}
fn entries_to_data_shreds(
&mut self,
shredder: &Shredder,
next_shred_index: u32,
entries: &[Entry],
blockstore: &Blockstore,
reference_tick: u8,
is_slot_end: bool,
process_stats: &mut ProcessShredsStats,
) -> Vec<Shred> {
let (data_shreds, new_next_shred_index) =
shredder.entries_to_data_shreds(entries, is_slot_end, next_shred_index, process_stats);
let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
let (next_shred_index, fec_set_offset) = match &self.unfinished_slot {
Some(state) => (state.next_shred_index, state.fec_set_offset),
None => match blockstore.meta(slot).unwrap() {
Some(slot_meta) => {
let shreds_consumed = slot_meta.consumed as u32;
(shreds_consumed, shreds_consumed)
}
None => (0, 0),
},
};
let (data_shreds, next_shred_index) = Shredder::new(
slot,
parent_slot,
RECOMMENDED_FEC_RATE,
self.keypair.clone(),
reference_tick,
self.shred_version,
)
.unwrap()
.entries_to_data_shreds(
entries,
is_slot_end,
next_shred_index,
fec_set_offset,
process_stats,
);
let mut data_shreds_buffer = match &mut self.unfinished_slot {
Some(state) => {
assert_eq!(state.slot, slot);
std::mem::take(&mut state.data_shreds_buffer)
}
None => Vec::default(),
};
data_shreds_buffer.extend(data_shreds.clone());
self.unfinished_slot = Some(UnfinishedSlotInfo {
next_shred_index: new_next_shred_index,
slot: shredder.slot,
parent: shredder.parent_slot,
next_shred_index,
slot,
parent: parent_slot,
data_shreds_buffer,
fec_set_offset,
});
data_shreds
}
@ -184,19 +205,16 @@ impl StandardBroadcastRun {
let mut to_shreds_time = Measure::start("broadcast_to_shreds");
// 1) Check if slot was interrupted
let last_unfinished_slot_shred =
self.check_for_interrupted_slot(bank.ticks_per_slot() as u8);
let prev_slot_shreds =
self.finish_prev_slot(bank.ticks_per_slot() as u8, &mut process_stats);
// 2) Convert entries to shreds and coding shreds
let (shredder, next_shred_index) = self.init_shredder(
blockstore,
(bank.tick_height() % bank.ticks_per_slot()) as u8,
);
let is_last_in_slot = last_tick_height == bank.max_tick_height();
let reference_tick = bank.tick_height() % bank.ticks_per_slot();
let data_shreds = self.entries_to_data_shreds(
&shredder,
next_shred_index,
&receive_results.entries,
blockstore,
reference_tick as u8,
is_last_in_slot,
&mut process_stats,
);
@ -208,27 +226,25 @@ impl StandardBroadcastRun {
.insert_shreds(first, None, true)
.expect("Failed to insert shreds in blockstore");
}
let last_data_shred = data_shreds.len();
to_shreds_time.stop();
let mut get_leader_schedule_time = Measure::start("broadcast_get_leader_schedule");
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = bank.epoch_staked_nodes(bank_epoch);
let stakes = stakes.map(Arc::new);
let stakes = bank.epoch_staked_nodes(bank_epoch).map(Arc::new);
// Broadcast the last shred of the interrupted slot if necessary
if let Some(last_shred) = last_unfinished_slot_shred {
if !prev_slot_shreds.is_empty() {
let batch_info = Some(BroadcastShredBatchInfo {
slot: last_shred.slot(),
slot: prev_slot_shreds[0].slot(),
num_expected_batches: Some(old_num_batches + 1),
slot_start_ts: old_broadcast_start.expect(
"Old broadcast start time for previous slot must exist if the previous slot
was interrupted",
),
});
let last_shred = Arc::new(vec![last_shred]);
socket_sender.send(((stakes.clone(), last_shred.clone()), batch_info.clone()))?;
blockstore_sender.send((last_shred, batch_info))?;
let shreds = Arc::new(prev_slot_shreds);
socket_sender.send(((stakes.clone(), shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((shreds, batch_info))?;
}
// Increment by two batches, one for the data batch, one for the coding batch.
@ -255,11 +271,15 @@ impl StandardBroadcastRun {
// Send data shreds
let data_shreds = Arc::new(data_shreds);
socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((data_shreds.clone(), batch_info.clone()))?;
blockstore_sender.send((data_shreds, batch_info.clone()))?;
// Create and send coding shreds
let coding_shreds = shredder
.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred], &mut process_stats);
let coding_shreds = make_coding_shreds(
self.keypair.deref(),
&mut self.unfinished_slot,
is_last_in_slot,
&mut process_stats,
);
let coding_shreds = Arc::new(coding_shreds);
socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((coding_shreds, batch_info))?;
@ -378,15 +398,15 @@ impl StandardBroadcastRun {
fn report_and_reset_stats(&mut self) {
let stats = &self.process_shreds_stats;
assert!(self.unfinished_slot.is_some());
let unfinished_slot = self.unfinished_slot.as_ref().unwrap();
datapoint_info!(
"broadcast-process-shreds-stats",
("slot", self.unfinished_slot.unwrap().slot as i64, i64),
("slot", unfinished_slot.slot as i64, i64),
("shredding_time", stats.shredding_elapsed, i64),
("receive_time", stats.receive_elapsed, i64),
(
"num_data_shreds",
i64::from(self.unfinished_slot.unwrap().next_shred_index),
unfinished_slot.next_shred_index as i64,
i64
),
(
@ -409,6 +429,33 @@ impl StandardBroadcastRun {
}
}
// Consumes data_shreds_buffer returning corresponding coding shreds.
fn make_coding_shreds(
keypair: &Keypair,
unfinished_slot: &mut Option<UnfinishedSlotInfo>,
is_slot_end: bool,
stats: &mut ProcessShredsStats,
) -> Vec<Shred> {
let data_shreds = match unfinished_slot {
None => Vec::default(),
Some(unfinished_slot) => {
let size = unfinished_slot.data_shreds_buffer.len();
// Consume a multiple of 32, unless this is the slot end.
let offset = if is_slot_end {
0
} else {
size % MAX_DATA_SHREDS_PER_FEC_BLOCK as usize
};
unfinished_slot
.data_shreds_buffer
.drain(0..size - offset)
.collect()
}
};
Shredder::data_shreds_to_coding_shreds(keypair, &data_shreds, RECOMMENDED_FEC_RATE, stats)
.unwrap()
}
impl BroadcastRun for StandardBroadcastRun {
fn run(
&mut self,
@ -418,6 +465,8 @@ impl BroadcastRun for StandardBroadcastRun {
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> {
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
// TODO: Confirm that last chunk of coding shreds
// will not be lost or delayed for too long.
self.process_receive_results(
blockstore,
socket_sender,
@ -508,6 +557,8 @@ mod test {
next_shred_index,
slot,
parent,
data_shreds_buffer: Vec::default(),
fec_set_offset: next_shred_index,
});
run.slot_broadcast_start = Some(Instant::now());
@ -515,8 +566,9 @@ mod test {
run.current_slot_and_parent = Some((4, 2));
// Slot 2 interrupted slot 1
let shred = run
.check_for_interrupted_slot(0)
let shreds = run.finish_prev_slot(0, &mut ProcessShredsStats::default());
let shred = shreds
.get(0)
.expect("Expected a shred that signals an interrupt");
// Validate the shred
@ -642,6 +694,50 @@ mod test {
);
}
#[test]
fn test_buffer_data_shreds() {
let num_shreds_per_slot = 2;
let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket) =
setup(num_shreds_per_slot);
let (bsend, brecv) = channel();
let (ssend, _srecv) = channel();
let mut last_tick_height = 0;
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair, 0);
let mut process_ticks = |num_ticks| {
let ticks = create_ticks(num_ticks, 0, genesis_config.hash());
last_tick_height += (ticks.len() - 1) as u64;
let receive_results = ReceiveResults {
entries: ticks,
time_elapsed: Duration::new(1, 0),
bank: bank.clone(),
last_tick_height,
};
standard_broadcast_run
.process_receive_results(&blockstore, &ssend, &bsend, receive_results)
.unwrap();
};
for i in 0..3 {
process_ticks((i + 1) * 100);
}
let mut shreds = Vec::<Shred>::new();
while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) {
shreds.extend(recv_shreds.deref().clone());
}
assert!(shreds.len() < 32, "shreds.len(): {}", shreds.len());
assert!(shreds.iter().all(|shred| shred.is_data()));
process_ticks(75);
while let Ok((recv_shreds, _)) = brecv.recv_timeout(Duration::from_secs(1)) {
shreds.extend(recv_shreds.deref().clone());
}
assert!(shreds.len() > 64, "shreds.len(): {}", shreds.len());
let num_coding_shreds = shreds.iter().filter(|shred| shred.is_code()).count();
assert_eq!(
num_coding_shreds, 32,
"num coding shreds: {}",
num_coding_shreds
);
}
#[test]
fn test_slot_finish() {
// Setup

View File

@ -236,7 +236,16 @@ mod tests {
let mut stats = ShredFetchStats::default();
let slot = 1;
let shred = Shred::new_from_data(slot, 3, 0, None, true, true, 0, 0, 0);
let shred = Shred::new_from_data(
slot, 3, // shred index
0, // parent offset
None, // data
true, // is_last_in_fec_set
true, // is_last_in_slot
0, // reference_tick
0, // version
3, // fec_set_index
);
shred.copy_to_packet(&mut packet);
let hasher = PacketHasher::default();
@ -256,8 +265,7 @@ mod tests {
);
assert!(!packet.meta.discard);
let coding =
solana_ledger::shred::Shredder::generate_coding_shreds(slot, 1.0f32, &[shred], 10, 1);
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(1.0f32, &[shred], 1);
coding[0].copy_to_packet(&mut packet);
ShredFetchStage::process_packet(
&mut packet,