Remove unnecessary serialize of shred data (#5967)

* Remove unnecessary serialize of shred data

* remove obsolete code

* fix golden hash
This commit is contained in:
Pankaj Garg
2019-09-18 20:08:27 -07:00
committed by GitHub
parent 0d16db2d1b
commit 0dbf7995b5
3 changed files with 93 additions and 198 deletions

View File

@@ -12,19 +12,15 @@ use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use std::io;
use std::io::{Error as IOError, ErrorKind, Write};
use std::sync::Arc;
use std::{cmp, io};
lazy_static! {
static ref SIZE_OF_CODING_SHRED_HEADER: usize =
{ serialized_size(&CodingShredHeader::default()).unwrap() as usize };
static ref SIZE_OF_DATA_SHRED_HEADER: usize =
{ serialized_size(&DataShredHeader::default()).unwrap() as usize };
static ref SIZE_OF_EMPTY_CODING_SHRED: usize =
{ serialized_size(&CodingShred::empty_shred()).unwrap() as usize };
static ref SIZE_OF_EMPTY_DATA_SHRED: usize =
{ serialized_size(&DataShred::empty_shred()).unwrap() as usize };
static ref SIZE_OF_SIGNATURE: usize =
{ bincode::serialized_size(&Signature::default()).unwrap() as usize };
static ref SIZE_OF_EMPTY_VEC: usize =
@@ -60,7 +56,7 @@ impl Shred {
let header = if shred_type == CODING_SHRED {
let end = *SIZE_OF_CODING_SHRED_HEADER;
let mut header = DataShredHeader::default();
header.common_header.header = bincode::deserialize(&shred_buf[..end])?;
header.common_header = bincode::deserialize(&shred_buf[..end])?;
header
} else {
let end = *SIZE_OF_DATA_SHRED_HEADER;
@@ -80,7 +76,7 @@ impl Shred {
if self.is_data() {
&self.headers.data_header
} else {
&self.headers.common_header.header.coding_header
&self.headers.common_header.coding_header
}
}
@@ -88,7 +84,7 @@ impl Shred {
if self.is_data() {
&mut self.headers.data_header
} else {
&mut self.headers.common_header.header.coding_header
&mut self.headers.common_header.coding_header
}
}
@@ -133,7 +129,7 @@ impl Shred {
}
pub fn is_data(&self) -> bool {
self.headers.common_header.header.shred_type == DATA_SHRED
self.headers.common_header.shred_type == DATA_SHRED
}
pub fn last_in_slot(&self) -> bool {
@@ -162,7 +158,7 @@ impl Shred {
pub fn coding_params(&self) -> Option<(u16, u16, u16)> {
if !self.is_data() {
let header = &self.headers.common_header.header;
let header = &self.headers.common_header;
Some((
header.num_data_shreds,
header.num_coding_shreds,
@@ -175,7 +171,7 @@ impl Shred {
pub fn verify(&self, pubkey: &Pubkey) -> bool {
let signed_payload_offset = if self.is_data() {
CodingShred::overhead()
*SIZE_OF_CODING_SHRED_HEADER
} else {
*SIZE_OF_SHRED_TYPE
} + *SIZE_OF_SIGNATURE;
@@ -205,7 +201,7 @@ pub struct ShredCommonHeader {
/// A common header that is present at start of every data shred
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct DataShredHeader {
pub common_header: CodingShred,
pub common_header: CodingShredHeader,
pub data_header: ShredCommonHeader,
pub parent_offset: u16,
pub flags: u8,
@@ -221,27 +217,12 @@ pub struct CodingShredHeader {
pub position: u16,
}
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct DataShred {
pub header: DataShredHeader,
pub payload: Vec<u8>,
}
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct CodingShred {
pub header: CodingShredHeader,
pub payload: Vec<u8>,
}
impl Default for DataShredHeader {
fn default() -> Self {
DataShredHeader {
common_header: CodingShred {
header: CodingShredHeader {
shred_type: DATA_SHRED,
..CodingShredHeader::default()
},
payload: vec![],
common_header: CodingShredHeader {
shred_type: DATA_SHRED,
..CodingShredHeader::default()
},
data_header: ShredCommonHeader::default(),
parent_offset: 0,
@@ -262,84 +243,6 @@ impl Default for CodingShredHeader {
}
}
/// Default shred is sized correctly to meet MTU/Packet size requirements
impl Default for DataShred {
fn default() -> Self {
let size = PACKET_DATA_SIZE - *SIZE_OF_EMPTY_DATA_SHRED;
DataShred {
header: DataShredHeader::default(),
payload: vec![0; size],
}
}
}
/// Default shred is sized correctly to meet MTU/Packet size requirements
impl Default for CodingShred {
fn default() -> Self {
let size = PACKET_DATA_SIZE - *SIZE_OF_EMPTY_CODING_SHRED;
CodingShred {
header: CodingShredHeader::default(),
payload: vec![0; size],
}
}
}
/// Common trait implemented by all types of shreds
pub trait ShredCommon {
/// Write at a particular offset in the shred. Returns amount written and leftover capacity
fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize);
/// Overhead of shred enum and headers
fn overhead() -> usize;
/// Utility function to create an empty shred
fn empty_shred() -> Self;
}
impl ShredCommon for DataShred {
fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) {
let mut capacity = self.payload.len().saturating_sub(offset);
let slice_len = cmp::min(capacity, buf.len());
capacity -= slice_len;
if slice_len > 0 {
self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]);
}
(slice_len, capacity)
}
fn overhead() -> usize {
*SIZE_OF_EMPTY_DATA_SHRED - *SIZE_OF_EMPTY_VEC
}
fn empty_shred() -> Self {
DataShred {
header: DataShredHeader::default(),
payload: vec![],
}
}
}
impl ShredCommon for CodingShred {
fn write_at(&mut self, offset: usize, buf: &[u8]) -> (usize, usize) {
let mut capacity = self.payload.len().saturating_sub(offset);
let slice_len = cmp::min(capacity, buf.len());
capacity -= slice_len;
if slice_len > 0 {
self.payload[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]);
}
(slice_len, capacity)
}
fn overhead() -> usize {
*SIZE_OF_EMPTY_CODING_SHRED
}
fn empty_shred() -> Self {
CodingShred {
header: CodingShredHeader::default(),
payload: vec![],
}
}
}
#[derive(Debug)]
pub struct Shredder {
slot: u64,
@@ -350,14 +253,17 @@ pub struct Shredder {
signer: Arc<Keypair>,
pub shreds: Vec<Shred>,
fec_set_shred_start: usize,
active_shred: DataShred,
active_shred: Vec<u8>,
active_shred_header: DataShredHeader,
active_offset: usize,
}
impl Write for Shredder {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let written = self.active_offset;
let (slice_len, capacity) = self.active_shred.write_at(written, buf);
let offset = self.active_offset + *SIZE_OF_DATA_SHRED_HEADER;
let slice_len = std::cmp::min(buf.len(), PACKET_DATA_SIZE - offset);
self.active_shred[offset..offset + slice_len].copy_from_slice(&buf[..slice_len]);
let capacity = PACKET_DATA_SIZE - offset - slice_len;
if buf.len() > slice_len || capacity == 0 {
self.finalize_data_shred();
@@ -408,11 +314,11 @@ impl Shredder {
),
)))
} else {
let mut data_shred = DataShred::default();
data_shred.header.data_header.slot = slot;
data_shred.header.data_header.index = index;
data_shred.header.parent_offset = (slot - parent) as u16;
let active_shred = data_shred;
let mut header = DataShredHeader::default();
header.data_header.slot = slot;
header.data_header.index = index;
header.parent_offset = (slot - parent) as u16;
let active_shred = vec![0; PACKET_DATA_SIZE];
Ok(Shredder {
slot,
index,
@@ -423,6 +329,7 @@ impl Shredder {
shreds: vec![],
fec_set_shred_start: 0,
active_shred,
active_shred_header: header,
active_offset: 0,
})
}
@@ -439,7 +346,7 @@ impl Shredder {
}
fn sign_unsigned_shreds_and_generate_codes(&mut self) {
let signature_offset = CodingShred::overhead();
let signature_offset = *SIZE_OF_CODING_SHRED_HEADER;
let signer = self.signer.clone();
PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
@@ -465,41 +372,43 @@ impl Shredder {
/// Finalize a data shred. Update the shred index for the next shred
fn finalize_data_shred(&mut self) {
let mut data = Vec::with_capacity(PACKET_DATA_SIZE);
bincode::serialize_into(&mut data, &self.active_shred).expect("Failed to serialize shred");
self.active_offset = 0;
self.index += 1;
let mut shred = self.new_data_shred();
std::mem::swap(&mut shred, &mut self.active_shred);
let shred_info = Shred::new(shred.header, data);
self.shreds.push(shred_info);
// Swap header
let mut header = DataShredHeader::default();
header.data_header.slot = self.slot;
header.data_header.index = self.index;
header.parent_offset = self.parent_offset;
std::mem::swap(&mut header, &mut self.active_shred_header);
// Swap shred buffer
let mut shred_buf = vec![0; PACKET_DATA_SIZE];
std::mem::swap(&mut shred_buf, &mut self.active_shred);
let mut wr = io::Cursor::new(&mut shred_buf[..*SIZE_OF_DATA_SHRED_HEADER]);
bincode::serialize_into(&mut wr, &header)
.expect("Failed to write header into shred buffer");
let shred = Shred::new(header, shred_buf);
self.shreds.push(shred);
}
/// Creates a new data shred
fn new_data_shred(&self) -> DataShred {
let mut data_shred = DataShred::default();
data_shred.header.data_header.slot = self.slot;
data_shred.header.data_header.index = self.index;
data_shred.header.parent_offset = self.parent_offset;
data_shred
}
pub fn new_coding_shred(
pub fn new_coding_shred_header(
slot: u64,
index: u32,
num_data: usize,
num_code: usize,
position: usize,
) -> CodingShred {
let mut coding_shred = CodingShred::default();
coding_shred.header.coding_header.slot = slot;
coding_shred.header.coding_header.index = index;
coding_shred.header.num_data_shreds = num_data as u16;
coding_shred.header.num_coding_shreds = num_code as u16;
coding_shred.header.position = position as u16;
coding_shred
) -> DataShredHeader {
let mut header = DataShredHeader::default();
header.common_header.shred_type = CODING_SHRED;
header.common_header.coding_header.index = index;
header.common_header.coding_header.slot = slot;
header.common_header.num_coding_shreds = num_code as u16;
header.common_header.num_data_shreds = num_data as u16;
header.common_header.position = position as u16;
header
}
/// Generates coding shreds for the data shreds in the current FEC set
@@ -513,7 +422,7 @@ impl Shredder {
let start_index = self.index - num_data as u32;
// All information after coding shred field in a data shred is encoded
let coding_block_offset = CodingShred::overhead();
let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER;
let data_ptrs: Vec<_> = self.shreds[self.fec_set_shred_start..]
.iter()
.map(|data| &data.payload[coding_block_offset..])
@@ -522,15 +431,15 @@ impl Shredder {
// Create empty coding shreds, with correctly populated headers
let mut coding_shreds = Vec::with_capacity(num_coding);
(0..num_coding).for_each(|i| {
let shred = bincode::serialize(&Self::new_coding_shred(
let header = Self::new_coding_shred_header(
self.slot,
start_index + i as u32,
num_data,
num_coding,
i,
))
.unwrap();
coding_shreds.push(shred);
);
let shred = Shred::new_empty_from_header(header);
coding_shreds.push(shred.payload);
});
// Grab pointers for the coding blocks
@@ -546,15 +455,14 @@ impl Shredder {
// append to the shred list
coding_shreds.into_iter().enumerate().for_each(|(i, code)| {
let mut header = DataShredHeader::default();
header.common_header.header.shred_type = CODING_SHRED;
header.common_header.header.coding_header.index = start_index + i as u32;
header.common_header.header.coding_header.slot = self.slot;
header.common_header.header.num_coding_shreds = num_coding as u16;
header.common_header.header.num_data_shreds = num_data as u16;
header.common_header.header.position = i as u16;
let shred_info = Shred::new(header, code);
self.shreds.push(shred_info);
let header = Self::new_coding_shred_header(
self.slot,
start_index + i as u32,
num_data,
num_coding,
i,
);
self.shreds.push(Shred::new(header, code));
});
self.fec_set_index = self.index;
}
@@ -564,12 +472,12 @@ impl Shredder {
/// If there's an active data shred, morph it into the final shred
/// If the current active data shred is first in slot, finalize it and create a new shred
fn make_final_data_shred(&mut self, last_in_slot: u8) {
if self.active_shred.header.data_header.index == 0 {
if self.active_shred_header.data_header.index == 0 {
self.finalize_data_shred();
}
self.active_shred.header.flags |= DATA_COMPLETE_SHRED;
self.active_shred_header.flags |= DATA_COMPLETE_SHRED;
if last_in_slot == LAST_SHRED_IN_SLOT {
self.active_shred.header.flags |= LAST_SHRED_IN_SLOT;
self.active_shred_header.flags |= LAST_SHRED_IN_SLOT;
}
self.finalize_data_shred();
self.sign_unsigned_shreds_and_generate_codes();
@@ -618,21 +526,22 @@ impl Shredder {
first_index: usize,
missing: usize,
) -> Vec<u8> {
if missing < first_index + num_data {
let mut data_shred = DataShred::default();
data_shred.header.data_header.slot = slot;
data_shred.header.data_header.index = missing as u32;
bincode::serialize(&data_shred).unwrap()
let header = if missing < first_index + num_data {
let mut header = DataShredHeader::default();
header.data_header.slot = slot;
header.data_header.index = missing as u32;
header
} else {
bincode::serialize(&Self::new_coding_shred(
Self::new_coding_shred_header(
slot,
missing.saturating_sub(num_data) as u32,
num_data,
num_coding,
missing - first_index - num_data,
))
.unwrap()
}
)
};
let shred = Shred::new_empty_from_header(header);
shred.payload
}
pub fn try_recovery(
@@ -646,7 +555,7 @@ impl Shredder {
let mut recovered_code = vec![];
let fec_set_size = num_data + num_coding;
if num_coding > 0 && shreds.len() < fec_set_size {
let coding_block_offset = CodingShred::overhead();
let coding_block_offset = *SIZE_OF_CODING_SHRED_HEADER;
// Let's try recovering missing shreds using erasure
let mut present = &mut vec![true; fec_set_size];
@@ -764,7 +673,7 @@ impl Shredder {
data_shred_bufs[..num_data]
.iter()
.flat_map(|data| {
let offset = *SIZE_OF_EMPTY_DATA_SHRED;
let offset = *SIZE_OF_DATA_SHRED_HEADER;
data[offset as usize..].iter()
})
.cloned()
@@ -819,9 +728,6 @@ mod tests {
assert!(shredder.shreds.is_empty());
assert_eq!(shredder.active_offset, 0);
assert!(DataShred::overhead() < PACKET_DATA_SIZE);
assert!(CodingShred::overhead() < PACKET_DATA_SIZE);
// Test0: Write some data to shred. Not enough to create a signed shred
let data: Vec<u8> = (0..25).collect();
assert_eq!(shredder.write(&data).unwrap(), data.len());
@@ -1372,7 +1278,7 @@ mod tests {
let mut index = 0;
while index < shredder.shreds.len() {
let num_data_shreds = cmp::min(
let num_data_shreds = std::cmp::min(
MAX_DATA_SHREDS_PER_FEC_BLOCK as usize,
(shredder.shreds.len() - index) / 2,
);