@ -9,7 +9,7 @@ use solana_ledger::entry::{create_ticks, Entry};
|
|||||||
use solana_ledger::shred::{
|
use solana_ledger::shred::{
|
||||||
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, Shredder,
|
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, Shredder,
|
||||||
MAX_DATA_SHREDS_PER_FEC_BLOCK, RECOMMENDED_FEC_RATE, SHRED_PAYLOAD_SIZE,
|
MAX_DATA_SHREDS_PER_FEC_BLOCK, RECOMMENDED_FEC_RATE, SHRED_PAYLOAD_SIZE,
|
||||||
SIZE_OF_DATA_SHRED_IGNORED_TAIL, SIZE_OF_DATA_SHRED_PAYLOAD,
|
SIZE_OF_CODING_SHRED_HEADERS, SIZE_OF_DATA_SHRED_PAYLOAD,
|
||||||
};
|
};
|
||||||
use solana_perf::test_tx;
|
use solana_perf::test_tx;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
@ -56,7 +56,7 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {
|
|||||||
|
|
||||||
fn make_concatenated_shreds(num_shreds: usize) -> Vec<u8> {
|
fn make_concatenated_shreds(num_shreds: usize) -> Vec<u8> {
|
||||||
let data_shreds = make_shreds(num_shreds);
|
let data_shreds = make_shreds(num_shreds);
|
||||||
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL) as usize;
|
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
|
||||||
let mut data: Vec<u8> = vec![0; num_shreds * valid_shred_data_len];
|
let mut data: Vec<u8> = vec![0; num_shreds * valid_shred_data_len];
|
||||||
for (i, shred) in (data_shreds[0..num_shreds]).iter().enumerate() {
|
for (i, shred) in (data_shreds[0..num_shreds]).iter().enumerate() {
|
||||||
data[i * valid_shred_data_len..(i + 1) * valid_shred_data_len]
|
data[i * valid_shred_data_len..(i + 1) * valid_shred_data_len]
|
||||||
@ -167,7 +167,7 @@ fn bench_shredder_decoding(bencher: &mut Bencher) {
|
|||||||
fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
|
fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
|
||||||
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
|
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
|
||||||
let data = make_concatenated_shreds(symbol_count as usize);
|
let data = make_concatenated_shreds(symbol_count as usize);
|
||||||
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL) as usize;
|
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
|
||||||
bencher.iter(|| {
|
bencher.iter(|| {
|
||||||
let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16);
|
let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16);
|
||||||
encoder.get_encoded_packets(symbol_count);
|
encoder.get_encoded_packets(symbol_count);
|
||||||
@ -178,7 +178,7 @@ fn bench_shredder_coding_raptorq(bencher: &mut Bencher) {
|
|||||||
fn bench_shredder_decoding_raptorq(bencher: &mut Bencher) {
|
fn bench_shredder_decoding_raptorq(bencher: &mut Bencher) {
|
||||||
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
|
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK;
|
||||||
let data = make_concatenated_shreds(symbol_count as usize);
|
let data = make_concatenated_shreds(symbol_count as usize);
|
||||||
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL) as usize;
|
let valid_shred_data_len = (SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS) as usize;
|
||||||
let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16);
|
let encoder = Encoder::with_defaults(&data, valid_shred_data_len as u16);
|
||||||
let mut packets = encoder.get_encoded_packets(symbol_count as u32);
|
let mut packets = encoder.get_encoded_packets(symbol_count as u32);
|
||||||
packets.shuffle(&mut rand::thread_rng());
|
packets.shuffle(&mut rand::thread_rng());
|
||||||
|
@ -1,4 +1,54 @@
|
|||||||
//! The `shred` module defines data structures and methods to pull MTU sized data frames from the network.
|
//! The `shred` module defines data structures and methods to pull MTU sized data frames from the
|
||||||
|
//! network. There are two types of shreds: data and coding. Data shreds contain entry information
|
||||||
|
//! while coding shreds provide redundancy to protect against dropped network packets (erasures).
|
||||||
|
//!
|
||||||
|
//! +---------------------------------------------------------------------------------------------+
|
||||||
|
//! | Data Shred |
|
||||||
|
//! +---------------------------------------------------------------------------------------------+
|
||||||
|
//! | common | data | payload |
|
||||||
|
//! | header | header | |
|
||||||
|
//! |+---+---+--- |+---+---+---|+----------------------------------------------------------+----+|
|
||||||
|
//! || s | s | . || p | f | s || data (ie ledger entries) | r ||
|
||||||
|
//! || i | h | . || a | l | i || | e ||
|
||||||
|
//! || g | r | . || r | a | z || See notes immediately after shred diagrams for an | s ||
|
||||||
|
//! || n | e | || e | g | e || explanation of the "restricted" section in this payload | t ||
|
||||||
|
//! || a | d | || n | s | || | r ||
|
||||||
|
//! || t | | || t | | || | i ||
|
||||||
|
//! || u | t | || | | || | c ||
|
||||||
|
//! || r | y | || o | | || | t ||
|
||||||
|
//! || e | p | || f | | || | e ||
|
||||||
|
//! || | e | || f | | || | d ||
|
||||||
|
//! |+---+---+--- |+---+---+---+|----------------------------------------------------------+----+|
|
||||||
|
//! +---------------------------------------------------------------------------------------------+
|
||||||
|
//!
|
||||||
|
//! +---------------------------------------------------------------------------------------------+
|
||||||
|
//! | Coding Shred |
|
||||||
|
//! +---------------------------------------------------------------------------------------------+
|
||||||
|
//! | common | coding | payload |
|
||||||
|
//! | header | header | |
|
||||||
|
//! |+---+---+--- |+---+---+---+----------------------------------------------------------------+|
|
||||||
|
//! || s | s | . || n | n | p || data (encoded data shred data) ||
|
||||||
|
//! || i | h | . || u | u | o || ||
|
||||||
|
//! || g | r | . || m | m | s || ||
|
||||||
|
//! || n | e | || | | i || ||
|
||||||
|
//! || a | d | || d | c | t || ||
|
||||||
|
//! || t | | || | | i || ||
|
||||||
|
//! || u | t | || s | s | o || ||
|
||||||
|
//! || r | y | || h | h | n || ||
|
||||||
|
//! || e | p | || r | r | || ||
|
||||||
|
//! || | e | || e | e | || ||
|
||||||
|
//! || | | || d | d | || ||
|
||||||
|
//! |+---+---+--- |+---+---+---+|+--------------------------------------------------------------+|
|
||||||
|
//! +---------------------------------------------------------------------------------------------+
|
||||||
|
//!
|
||||||
|
//! Notes:
|
||||||
|
//! a) Coding shreds encode entire data shreds: both of the headers AND the payload.
|
||||||
|
//! b) Coding shreds require their own headers for identification and etc.
|
||||||
|
//! c) The erasure algorithm requires data shred and coding shred bytestreams to be equal in length.
|
||||||
|
//!
|
||||||
|
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
|
||||||
|
//! payload can fit into one coding shred / packet.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
blockstore::MAX_DATA_SHREDS_PER_SLOT,
|
blockstore::MAX_DATA_SHREDS_PER_SLOT,
|
||||||
entry::{create_ticks, Entry},
|
entry::{create_ticks, Entry},
|
||||||
@ -67,12 +117,12 @@ pub const SIZE_OF_SHRED_TYPE: usize = 1;
|
|||||||
pub const SIZE_OF_SHRED_SLOT: usize = 8;
|
pub const SIZE_OF_SHRED_SLOT: usize = 8;
|
||||||
pub const SIZE_OF_SHRED_INDEX: usize = 4;
|
pub const SIZE_OF_SHRED_INDEX: usize = 4;
|
||||||
pub const SIZE_OF_NONCE: usize = 4;
|
pub const SIZE_OF_NONCE: usize = 4;
|
||||||
pub const SIZE_OF_DATA_SHRED_IGNORED_TAIL: usize =
|
pub const SIZE_OF_CODING_SHRED_HEADERS: usize =
|
||||||
SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER;
|
SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER;
|
||||||
pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
|
pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
|
||||||
- SIZE_OF_COMMON_SHRED_HEADER
|
- SIZE_OF_COMMON_SHRED_HEADER
|
||||||
- SIZE_OF_DATA_SHRED_HEADER
|
- SIZE_OF_DATA_SHRED_HEADER
|
||||||
- SIZE_OF_DATA_SHRED_IGNORED_TAIL
|
- SIZE_OF_CODING_SHRED_HEADERS
|
||||||
- SIZE_OF_NONCE;
|
- SIZE_OF_NONCE;
|
||||||
|
|
||||||
pub const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE;
|
pub const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE;
|
||||||
@ -241,11 +291,11 @@ impl Shred {
|
|||||||
&mut payload,
|
&mut payload,
|
||||||
&common_header,
|
&common_header,
|
||||||
)
|
)
|
||||||
.expect("Failed to write header into shred buffer");
|
.expect("Failed to write common header into shred buffer");
|
||||||
let size_of_data_shred_header = SIZE_OF_DATA_SHRED_HEADER;
|
|
||||||
Self::serialize_obj_into(
|
Self::serialize_obj_into(
|
||||||
&mut start,
|
&mut start,
|
||||||
size_of_data_shred_header,
|
SIZE_OF_DATA_SHRED_HEADER,
|
||||||
&mut payload,
|
&mut payload,
|
||||||
&data_header,
|
&data_header,
|
||||||
)
|
)
|
||||||
@ -288,9 +338,8 @@ impl Shred {
|
|||||||
payload,
|
payload,
|
||||||
}
|
}
|
||||||
} else if common_header.shred_type == ShredType(DATA_SHRED) {
|
} else if common_header.shred_type == ShredType(DATA_SHRED) {
|
||||||
let size_of_data_shred_header = SIZE_OF_DATA_SHRED_HEADER;
|
|
||||||
let data_header: DataShredHeader =
|
let data_header: DataShredHeader =
|
||||||
Self::deserialize_obj(&mut start, size_of_data_shred_header, &payload)?;
|
Self::deserialize_obj(&mut start, SIZE_OF_DATA_SHRED_HEADER, &payload)?;
|
||||||
if u64::from(data_header.parent_offset) > common_header.slot {
|
if u64::from(data_header.parent_offset) > common_header.slot {
|
||||||
return Err(ShredError::InvalidParentOffset {
|
return Err(ShredError::InvalidParentOffset {
|
||||||
slot,
|
slot,
|
||||||
@ -601,8 +650,9 @@ impl Shredder {
|
|||||||
serialize_time.stop();
|
serialize_time.stop();
|
||||||
|
|
||||||
let mut gen_data_time = Measure::start("shred_gen_data_time");
|
let mut gen_data_time = Measure::start("shred_gen_data_time");
|
||||||
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD;
|
let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD;
|
||||||
let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size;
|
// Integer division to ensure we have enough shreds to fit all the data
|
||||||
|
let num_shreds = (serialized_shreds.len() + payload_capacity - 1) / payload_capacity;
|
||||||
let last_shred_index = next_shred_index + num_shreds as u32 - 1;
|
let last_shred_index = next_shred_index + num_shreds as u32 - 1;
|
||||||
// 1) Generate data shreds
|
// 1) Generate data shreds
|
||||||
let make_data_shred = |shred_index: u32, data| {
|
let make_data_shred = |shred_index: u32, data| {
|
||||||
@ -627,7 +677,7 @@ impl Shredder {
|
|||||||
let data_shreds: Vec<Shred> = PAR_THREAD_POOL.with(|thread_pool| {
|
let data_shreds: Vec<Shred> = PAR_THREAD_POOL.with(|thread_pool| {
|
||||||
thread_pool.borrow().install(|| {
|
thread_pool.borrow().install(|| {
|
||||||
serialized_shreds
|
serialized_shreds
|
||||||
.par_chunks(no_header_size)
|
.par_chunks(payload_capacity)
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(i, shred_data)| {
|
.map(|(i, shred_data)| {
|
||||||
let shred_index = next_shred_index + i as u32;
|
let shred_index = next_shred_index + i as u32;
|
||||||
@ -657,7 +707,7 @@ impl Shredder {
|
|||||||
return Ok(Vec::default());
|
return Ok(Vec::default());
|
||||||
}
|
}
|
||||||
let mut gen_coding_time = Measure::start("gen_coding_shreds");
|
let mut gen_coding_time = Measure::start("gen_coding_shreds");
|
||||||
// 2) Generate coding shreds
|
// 1) Generate coding shreds
|
||||||
let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| {
|
let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| {
|
||||||
thread_pool.borrow().install(|| {
|
thread_pool.borrow().install(|| {
|
||||||
data_shreds
|
data_shreds
|
||||||
@ -675,7 +725,7 @@ impl Shredder {
|
|||||||
gen_coding_time.stop();
|
gen_coding_time.stop();
|
||||||
|
|
||||||
let mut sign_coding_time = Measure::start("sign_coding_shreds");
|
let mut sign_coding_time = Measure::start("sign_coding_shreds");
|
||||||
// 3) Sign coding shreds
|
// 2) Sign coding shreds
|
||||||
PAR_THREAD_POOL.with(|thread_pool| {
|
PAR_THREAD_POOL.with(|thread_pool| {
|
||||||
thread_pool.borrow().install(|| {
|
thread_pool.borrow().install(|| {
|
||||||
coding_shreds.par_iter_mut().for_each(|mut coding_shred| {
|
coding_shreds.par_iter_mut().for_each(|mut coding_shred| {
|
||||||
@ -751,8 +801,8 @@ impl Shredder {
|
|||||||
.all(|shred| shred.common_header.slot == slot
|
.all(|shred| shred.common_header.slot == slot
|
||||||
&& shred.common_header.version == version
|
&& shred.common_header.version == version
|
||||||
&& shred.common_header.fec_set_index == fec_set_index));
|
&& shred.common_header.fec_set_index == fec_set_index));
|
||||||
// All information after coding shred field in a data shred is encoded
|
// All information (excluding the restricted section) from a data shred is encoded
|
||||||
let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS;
|
||||||
let data_ptrs: Vec<_> = data_shred_batch
|
let data_ptrs: Vec<_> = data_shred_batch
|
||||||
.iter()
|
.iter()
|
||||||
.map(|data| &data.payload[..valid_data_len])
|
.map(|data| &data.payload[..valid_data_len])
|
||||||
@ -774,8 +824,8 @@ impl Shredder {
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Grab pointers for the coding blocks
|
// Grab pointers for the coding blocks; these come after the two headers
|
||||||
let coding_block_offset = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER;
|
let coding_block_offset = SIZE_OF_CODING_SHRED_HEADERS;
|
||||||
let mut coding_ptrs: Vec<_> = coding_shreds
|
let mut coding_ptrs: Vec<_> = coding_shreds
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.map(|buffer| &mut buffer[coding_block_offset..])
|
.map(|buffer| &mut buffer[coding_block_offset..])
|
||||||
@ -905,8 +955,9 @@ impl Shredder {
|
|||||||
|
|
||||||
let session = Session::new(num_data, num_coding)?;
|
let session = Session::new(num_data, num_coding)?;
|
||||||
|
|
||||||
let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
// All information (excluding the restricted section) from a data shred is encoded
|
||||||
let coding_block_offset = SIZE_OF_CODING_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER;
|
let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS;
|
||||||
|
let coding_block_offset = SIZE_OF_CODING_SHRED_HEADERS;
|
||||||
let mut blocks: Vec<(&mut [u8], bool)> = shred_bufs
|
let mut blocks: Vec<(&mut [u8], bool)> = shred_bufs
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
@ -984,7 +1035,7 @@ impl Shredder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn reassemble_payload(num_data: usize, data_shred_bufs: Vec<&Vec<u8>>) -> Vec<u8> {
|
fn reassemble_payload(num_data: usize, data_shred_bufs: Vec<&Vec<u8>>) -> Vec<u8> {
|
||||||
let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL;
|
let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_CODING_SHRED_HEADERS;
|
||||||
data_shred_bufs[..num_data]
|
data_shred_bufs[..num_data]
|
||||||
.iter()
|
.iter()
|
||||||
.flat_map(|data| {
|
.flat_map(|data| {
|
||||||
@ -1223,8 +1274,9 @@ pub mod tests {
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let size = serialized_size(&entries).unwrap();
|
let size = serialized_size(&entries).unwrap();
|
||||||
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD as u64;
|
// Integer division to ensure we have enough shreds to fit all the data
|
||||||
let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size;
|
let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD as u64;
|
||||||
|
let num_expected_data_shreds = (size + payload_capacity - 1) / payload_capacity;
|
||||||
let num_expected_coding_shreds = Shredder::calculate_num_coding_shreds(
|
let num_expected_coding_shreds = Shredder::calculate_num_coding_shreds(
|
||||||
num_expected_data_shreds as usize,
|
num_expected_data_shreds as usize,
|
||||||
fec_rate,
|
fec_rate,
|
||||||
@ -1391,8 +1443,8 @@ pub mod tests {
|
|||||||
.expect("Failed in creating shredder");
|
.expect("Failed in creating shredder");
|
||||||
|
|
||||||
// Create enough entries to make > 1 shred
|
// Create enough entries to make > 1 shred
|
||||||
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD;
|
let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD;
|
||||||
let num_entries = max_ticks_per_n_shreds(1, Some(no_header_size)) + 1;
|
let num_entries = max_ticks_per_n_shreds(1, Some(payload_capacity)) + 1;
|
||||||
let entries: Vec<_> = (0..num_entries)
|
let entries: Vec<_> = (0..num_entries)
|
||||||
.map(|_| {
|
.map(|_| {
|
||||||
let keypair0 = Keypair::new();
|
let keypair0 = Keypair::new();
|
||||||
@ -1439,9 +1491,9 @@ pub mod tests {
|
|||||||
let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
|
let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
|
||||||
|
|
||||||
let num_data_shreds: usize = 5;
|
let num_data_shreds: usize = 5;
|
||||||
let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD;
|
let payload_capacity = SIZE_OF_DATA_SHRED_PAYLOAD;
|
||||||
let num_entries =
|
let num_entries =
|
||||||
max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(no_header_size));
|
max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(payload_capacity));
|
||||||
let entries: Vec<_> = (0..num_entries)
|
let entries: Vec<_> = (0..num_entries)
|
||||||
.map(|_| {
|
.map(|_| {
|
||||||
let keypair0 = Keypair::new();
|
let keypair0 = Keypair::new();
|
||||||
@ -1456,8 +1508,10 @@ pub mod tests {
|
|||||||
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0);
|
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0);
|
||||||
let num_coding_shreds = coding_shreds.len();
|
let num_coding_shreds = coding_shreds.len();
|
||||||
|
|
||||||
// We should have 10 shreds now, an equal number of coding shreds
|
// We should have 5 data shreds now
|
||||||
assert_eq!(data_shreds.len(), num_data_shreds);
|
assert_eq!(data_shreds.len(), num_data_shreds);
|
||||||
|
// and an equal number of coding shreds
|
||||||
|
assert_eq!(num_data_shreds, num_coding_shreds);
|
||||||
|
|
||||||
let all_shreds = data_shreds
|
let all_shreds = data_shreds
|
||||||
.iter()
|
.iter()
|
||||||
|
Reference in New Issue
Block a user