diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index c03be42060..2536aaca5f 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -2,7 +2,7 @@ //! Proof of History ledger as well as iterative read, append write, and random //! access read to a persistent file-based ledger. use crate::entry::Entry; -use crate::erasure; +use crate::erasure::{self, Session}; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; @@ -16,7 +16,7 @@ use std::collections::HashMap; #[cfg(not(feature = "kvstore"))] use rocksdb; -use solana_metrics::datapoint_error; +use solana_metrics::{datapoint_error, datapoint_info}; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; @@ -88,8 +88,8 @@ pub struct Blocktree { erasure_cf: LedgerColumn, erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, - index_cf: LedgerColumn, batch_processor: Arc>, + session: Arc, pub new_blobs_signals: Vec>, pub completed_slots_senders: Vec>>, } @@ -107,7 +107,6 @@ pub const ERASURE_META_CF: &str = "erasure_meta"; pub const ORPHANS_CF: &str = "orphans"; // Column family for root data pub const ROOT_CF: &str = "root"; -pub const INDEX_CF: &str = "index"; impl Blocktree { /// Opens a Ledger in directory, provides "infinite" window of blobs @@ -141,7 +140,8 @@ impl Blocktree { // known parent let orphans_cf = db.column(); - let index_cf = db.column(); + // setup erasure + let session = Arc::new(erasure::Session::default()); let db = Arc::new(db); @@ -153,7 +153,7 @@ impl Blocktree { erasure_cf, erasure_meta_cf, orphans_cf, - index_cf, + session, new_blobs_signals: vec![], batch_processor, completed_slots_senders: vec![], @@ -316,75 +316,55 @@ impl Blocktree { let mut recovered_data = vec![]; let mut prev_inserted_blob_datas = HashMap::new(); - let mut prev_inserted_coding = HashMap::new(); // A map from slot to a 2-tuple of metadata: (working copy, backup copy), // so we can detect changes to the slot metadata later let mut slot_meta_working_set = HashMap::new(); let mut erasure_meta_working_set = HashMap::new(); - let mut index_working_set = HashMap::new(); for blob in new_blobs.iter() { let blob = blob.borrow(); let blob_slot = blob.slot(); - let _ = index_working_set.entry(blob_slot).or_insert_with(|| { - self.index_cf - .get(blob_slot) - .unwrap() - .unwrap_or_else(|| Index::new(blob_slot)) - }); - - if let Some(erasure_info) = blob.get_coding_header() { - let set_index = erasure_info.set_index; - if !erasure_meta_working_set.contains_key(&(blob_slot, set_index)) { - if let Some(mut meta) = self.erasure_meta_cf.get((blob_slot, set_index))? { - meta.set_session_info(erasure_info); - erasure_meta_working_set.insert((blob_slot, set_index), meta); - } - } - } + let set_index = ErasureMeta::set_index_for(blob.index()); + erasure_meta_working_set + .entry((blob_slot, set_index)) + .or_insert_with(|| { + self.erasure_meta_cf + .get((blob_slot, set_index)) + .expect("Expect database get to succeed") + .unwrap_or_else(|| ErasureMeta::new(set_index)) + }); } insert_data_blob_batch( new_blobs.iter().map(Borrow::borrow), &db, &mut slot_meta_working_set, - &mut index_working_set, + &mut erasure_meta_working_set, &mut prev_inserted_blob_datas, &mut write_batch, )?; - for (&(slot, set_index), erasure_meta) in erasure_meta_working_set.iter_mut() { - let index = index_working_set.get_mut(&slot).expect("Index"); + for (&(slot, _), erasure_meta) in erasure_meta_working_set.iter_mut() { if let Some((data, coding)) = try_erasure_recover( &db, + &self.session, &erasure_meta, - index, slot, &prev_inserted_blob_datas, - &prev_inserted_coding, + None, )? { for data_blob in data { recovered_data.push(data_blob); } - let info = erasure_meta.session_info(); - - for mut coding_blob in coding { - index - .coding_mut() - .set_present(set_index, coding_blob.index(), true); - - coding_blob.set_coding_header(&info); + for coding_blob in coding { + erasure_meta.set_coding_present(coding_blob.index(), true); write_batch.put_bytes::( - (coding_blob.slot(), set_index, coding_blob.index()), + (coding_blob.slot(), coding_blob.index()), &coding_blob.data[..BLOB_HEADER_SIZE + coding_blob.size()], )?; - - prev_inserted_coding - .entry((slot, info.set_index, coding_blob.index())) - .or_insert_with(|| coding_blob.clone()); } } } @@ -393,22 +373,30 @@ impl Blocktree { recovered_data.iter(), &db, &mut slot_meta_working_set, - &mut index_working_set, + &mut erasure_meta_working_set, &mut prev_inserted_blob_datas, &mut write_batch, )?; // Handle chaining for the working set - handle_chaining(db, &mut write_batch, &slot_meta_working_set)?; + handle_chaining(&db, &mut write_batch, &slot_meta_working_set)?; + let mut should_signal = false; + let mut newly_completed_slots = vec![]; - let (should_signal, newly_completed_slots) = prepare_signals( - &slot_meta_working_set, - &self.completed_slots_senders, - &mut write_batch, - )?; - - for (&slot, index) in index_working_set.iter() { - write_batch.put::(slot, index)?; + // Check if any metadata was changed, if so, insert the new version of the + // metadata into the write batch + for (slot, (meta, meta_backup)) in slot_meta_working_set.iter() { + let meta: &SlotMeta = &RefCell::borrow(&*meta); + if !self.completed_slots_senders.is_empty() + && is_newly_completed_slot(meta, meta_backup) + { + newly_completed_slots.push(*slot); + } + // Check if the working copy of the metadata has changed + if Some(meta) != meta_backup.as_ref() { + should_signal = should_signal || slot_has_updates(meta, &meta_backup); + write_batch.put::(*slot, &meta)?; + } } for ((slot, set_index), erasure_meta) in erasure_meta_working_set { @@ -417,12 +405,34 @@ impl Blocktree { batch_processor.write(write_batch)?; - send_signals( - &self.new_blobs_signals, - &self.completed_slots_senders, - should_signal, - newly_completed_slots, - )?; + if should_signal { + for signal in &self.new_blobs_signals { + let _ = signal.try_send(true); + } + } + + if !self.completed_slots_senders.is_empty() && !newly_completed_slots.is_empty() { + let mut slots: Vec<_> = (0..self.completed_slots_senders.len() - 1) + .map(|_| newly_completed_slots.clone()) + .collect(); + + slots.push(newly_completed_slots); + + for (signal, slots) in self.completed_slots_senders.iter().zip(slots.into_iter()) { + let res = signal.try_send(slots); + if let Err(TrySendError::Full(_)) = res { + datapoint_error!( + "blocktree_error", + ( + "error", + "Unable to send newly completed slot because channel is full" + .to_string(), + String + ), + ); + } + } + } Ok(()) } @@ -489,26 +499,25 @@ impl Blocktree { Ok((total_blobs, total_current_size as u64)) } - pub fn get_coding_blob_bytes( - &self, - slot: u64, - set_index: u64, - index: u64, - ) -> Result>> { - self.erasure_cf.get_bytes((slot, set_index, index)) + pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { + self.erasure_cf.get_bytes((slot, index)) } - pub fn delete_coding_blob(&self, slot: u64, set_index: u64, blob_index: u64) -> Result<()> { + pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { + let set_index = ErasureMeta::set_index_for(index); let mut batch_processor = self.batch_processor.write().unwrap(); - let mut index = self.index_cf.get(slot)?.unwrap_or_else(|| Index::new(slot)); + let mut erasure_meta = self + .erasure_meta_cf + .get((slot, set_index))? + .unwrap_or_else(|| ErasureMeta::new(set_index)); - index.coding_mut().set_present(set_index, blob_index, false); + erasure_meta.set_coding_present(index, false); let mut batch = batch_processor.batch()?; - batch.delete::((slot, set_index, blob_index))?; - batch.put::(slot, &index)?; + batch.delete::((slot, index))?; + batch.put::((slot, set_index), &erasure_meta)?; batch_processor.write(batch)?; Ok(()) @@ -526,148 +535,75 @@ impl Blocktree { /// For benchmarks, testing, and setup. /// Does no metadata tracking. Use with care. - pub fn put_coding_blob_bytes_raw( - &self, - slot: u64, - set_index: u64, - index: u64, - bytes: &[u8], - ) -> Result<()> { - self.erasure_cf.put_bytes((slot, set_index, index), bytes) + pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { + self.erasure_cf.put_bytes((slot, index), bytes) } /// this function will insert coding blobs and also automatically track erasure-related /// metadata. If recovery is available it will be done - pub fn put_coding_blob(&self, blob: &Blob) -> Result<()> { - self.put_many_coding_blobs(vec![blob]) - } - - /// this function will insert coding blobs and also automatically track erasure-related - /// metadata. If recovery is available it will be done - pub fn put_many_coding_blobs(&self, blobs: I) -> Result<()> - where - I: IntoIterator, - I::Item: Borrow, - { + pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { + let set_index = ErasureMeta::set_index_for(index); let mut batch_processor = self.batch_processor.write().unwrap(); + + let mut erasure_meta = self + .erasure_meta_cf + .get((slot, set_index))? + .unwrap_or_else(|| ErasureMeta::new(set_index)); + + erasure_meta.set_coding_present(index, true); + erasure_meta.set_size(bytes.len() - BLOB_HEADER_SIZE); + let mut writebatch = batch_processor.batch()?; - let mut new_data_blobs = vec![]; - let mut erasure_metas = HashMap::new(); - let mut slot_meta_working_set = HashMap::new(); - let mut index_working_set = HashMap::new(); - let mut prev_inserted_coding = HashMap::new(); - - for blob_item in blobs { - let blob = blob_item.borrow(); - let (blob_slot, blob_index) = (blob.slot(), blob.index()); - let info = blob - .get_coding_header() - .expect("coding blobs must have coding headers"); - let set_index = info.set_index; - - let index = index_working_set.entry(blob_slot).or_insert_with(|| { - self.index_cf - .get(blob_slot) - .unwrap() - .unwrap_or_else(|| Index::new(blob_slot)) - }); - - let erasure_meta = erasure_metas - .entry((blob_slot, set_index)) - .or_insert_with(|| { - self.erasure_meta_cf - .get((blob_slot, set_index)) - .expect("Expect database get to succeed") - .unwrap_or_else(|| ErasureMeta::new(set_index)) - }); - - erasure_meta.set_session_info(info); - index.coding_mut().set_present(set_index, blob_index, true); - - // `or_insert_with` used to prevent stack overflow - prev_inserted_coding - .entry((blob_slot, set_index, blob_index)) - .or_insert_with(|| blob.clone()); - - writebatch.put_bytes::( - (blob_slot, set_index, blob_index), - &blob.data[..BLOB_HEADER_SIZE + blob.size()], - )?; - } - - let mut prev_inserted_blob_datas = HashMap::new(); - - for (&(slot, set_index), erasure_meta) in erasure_metas.iter_mut() { - let index = index_working_set.get_mut(&slot).expect("Index"); + writebatch.put_bytes::((slot, index), bytes)?; + let recovered_data = { if let Some((data, coding)) = try_erasure_recover( &self.db, + &self.session, &erasure_meta, - index, slot, - &prev_inserted_blob_datas, - &prev_inserted_coding, + &HashMap::new(), + Some((index, bytes)), )? { - for data_blob in data { - new_data_blobs.push(data_blob); - } + let mut erasure_meta_working_set = HashMap::new(); + erasure_meta_working_set.insert((slot, set_index), erasure_meta); + erasure_meta = *erasure_meta_working_set.values().next().unwrap(); - let info = erasure_meta.session_info(); - - for mut coding_blob in coding { - index - .coding_mut() - .set_present(set_index, coding_blob.index(), true); - - coding_blob.set_coding_header(&info); + for coding_blob in coding { + erasure_meta.set_coding_present(coding_blob.index(), true); writebatch.put_bytes::( - (coding_blob.slot(), set_index, coding_blob.index()), + (coding_blob.slot(), coding_blob.index()), &coding_blob.data[..BLOB_HEADER_SIZE + coding_blob.size()], )?; - - prev_inserted_coding - .insert((slot, info.set_index, coding_blob.index()), coding_blob); } + Some(data) + } else { + None + } + }; + + writebatch.put::((slot, set_index), &erasure_meta)?; + batch_processor.write(writebatch)?; + drop(batch_processor); + if let Some(data) = recovered_data { + if !data.is_empty() { + self.insert_data_blobs(&data)?; } } - insert_data_blob_batch( - new_data_blobs.iter(), - &self.db, - &mut slot_meta_working_set, - &mut index_working_set, - &mut prev_inserted_blob_datas, - &mut writebatch, - )?; + Ok(()) + } - // Handle chaining for the working set - handle_chaining(&self.db, &mut writebatch, &slot_meta_working_set)?; - - let (should_signal, newly_completed_slots) = prepare_signals( - &slot_meta_working_set, - &self.completed_slots_senders, - &mut writebatch, - )?; - - for ((slot, set_index), erasure_meta) in erasure_metas { - writebatch.put::((slot, set_index), &erasure_meta)?; + pub fn put_many_coding_blob_bytes(&self, coding_blobs: &[SharedBlob]) -> Result<()> { + for shared_coding_blob in coding_blobs { + let blob = shared_coding_blob.read().unwrap(); + assert!(blob.is_coding()); + let size = blob.size() + BLOB_HEADER_SIZE; + self.put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size])? } - for (&slot, index) in index_working_set.iter() { - writebatch.put::(slot, index)?; - } - - batch_processor.write(writebatch)?; - - send_signals( - &self.new_blobs_signals, - &self.completed_slots_senders, - should_signal, - newly_completed_slots, - )?; - Ok(()) } @@ -681,21 +617,6 @@ impl Blocktree { })) } - pub fn get_coding_blob( - &self, - slot: u64, - set_index: u64, - blob_index: u64, - ) -> Result> { - let bytes = self.get_coding_blob_bytes(slot, set_index, blob_index)?; - Ok(bytes.map(|bytes| { - let blob = Blob::new(&bytes); - assert!(blob.slot() == slot); - assert!(blob.index() == blob_index); - blob - })) - } - pub fn get_entries_bytes( &self, _start_index: u64, @@ -997,7 +918,7 @@ fn insert_data_blob_batch<'a, I>( new_blobs: I, db: &Database, slot_meta_working_set: &mut HashMap>, Option)>, - index_working_set: &mut HashMap, + erasure_meta_working_set: &mut HashMap<(u64, u64), ErasureMeta>, prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, write_batch: &mut WriteBatch, ) -> Result<()> @@ -1014,9 +935,10 @@ where ); if inserted { - let index = index_working_set.get_mut(&blob.slot()).expect("index"); - - index.data_mut().set_present(blob.index(), true); + erasure_meta_working_set + .get_mut(&(blob.slot(), ErasureMeta::set_index_for(blob.index()))) + .unwrap() + .set_data_present(blob.index(), true); } } @@ -1082,72 +1004,6 @@ fn insert_data_blob<'a>( Ok(()) } -fn send_signals( - new_blobs_signals: &[SyncSender], - completed_slots_senders: &[SyncSender>], - should_signal: bool, - newly_completed_slots: Vec, -) -> Result<()> { - if should_signal { - for signal in new_blobs_signals { - let _ = signal.try_send(true); - } - } - - if !completed_slots_senders.is_empty() && !newly_completed_slots.is_empty() { - let mut slots: Vec<_> = (0..completed_slots_senders.len() - 1) - .map(|_| newly_completed_slots.clone()) - .collect(); - - slots.push(newly_completed_slots); - - for (signal, slots) in completed_slots_senders.iter().zip(slots.into_iter()) { - let res = signal.try_send(slots); - if let Err(TrySendError::Full(_)) = res { - solana_metrics::submit( - solana_metrics::influxdb::Point::new("blocktree_error") - .add_field( - "error", - solana_metrics::influxdb::Value::String( - "Unable to send newly completed slot because channel is full" - .to_string(), - ), - ) - .to_owned(), - log::Level::Error, - ); - } - } - } - - Ok(()) -} - -fn prepare_signals( - slot_meta_working_set: &HashMap>, Option)>, - completed_slots_senders: &[SyncSender>], - write_batch: &mut WriteBatch, -) -> Result<(bool, Vec)> { - let mut should_signal = false; - let mut newly_completed_slots = vec![]; - - // Check if any metadata was changed, if so, insert the new version of the - // metadata into the write batch - for (slot, (meta, meta_backup)) in slot_meta_working_set.iter() { - let meta: &SlotMeta = &RefCell::borrow(&*meta); - if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) { - newly_completed_slots.push(*slot); - } - // Check if the working copy of the metadata has changed - if Some(meta) != meta_backup.as_ref() { - should_signal = should_signal || slot_has_updates(meta, &meta_backup); - write_batch.put::(*slot, &meta)?; - } - } - - Ok((should_signal, newly_completed_slots)) -} - /// Checks to see if the data blob passes integrity checks for insertion. Proceeds with /// insertion if it does. fn check_insert_data_blob<'a>( @@ -1501,19 +1357,20 @@ fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option, - prev_inserted_coding: &HashMap<(u64, u64, u64), Blob>, + new_coding_blob: Option<(u64, &[u8])>, ) -> Result, Vec)>> { - let coding_info = erasure_meta.session_info(); - let set_index = coding_info.set_index; - let start_index = coding_info.start_index; - let (data_end_index, _) = coding_info.end_indexes(); + use crate::erasure::ERASURE_SET_SIZE; + + let set_index = erasure_meta.set_index; + let start_index = erasure_meta.start_index(); + let (data_end_index, _) = erasure_meta.end_indexes(); let submit_metrics = |attempted: bool, status: String| { - datapoint!( + datapoint_info!( "blocktree-erasure", ("slot", slot as i64, i64), ("start_index", start_index as i64, i64), @@ -1523,15 +1380,15 @@ fn try_erasure_recover( ); }; - let blobs = match erasure_meta.status(index) { + let blobs = match erasure_meta.status() { ErasureMetaStatus::CanRecover => { let erasure_result = recover( db, + session, slot, erasure_meta, - index, prev_inserted_blob_datas, - prev_inserted_coding, + new_coding_blob, ); match erasure_result { @@ -1539,10 +1396,8 @@ fn try_erasure_recover( let recovered = data.len() + coding.len(); assert_eq!( - coding_info.parity_count + coding_info.data_count, - recovered - + index.coding().present_in_set(set_index) - + index.data().present_in_bounds(start_index..data_end_index), + ERASURE_SET_SIZE, + recovered + (erasure_meta.num_coding() + erasure_meta.num_data()) as usize, "Recovery should always complete a set" ); @@ -1560,7 +1415,7 @@ fn try_erasure_recover( error!( "[try_erasure] slot: {}, set_index: {}, recovery failed: cause: {}", - slot, set_index, e + slot, erasure_meta.set_index, e ); None @@ -1596,72 +1451,69 @@ fn try_erasure_recover( fn recover( db: &Database, + session: &Session, slot: u64, erasure_meta: &ErasureMeta, - index: &Index, prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, - prev_inserted_coding: &HashMap<(u64, u64, u64), Blob>, + new_coding: Option<(u64, &[u8])>, ) -> Result<(Vec, Vec)> { - let info = erasure_meta.session_info(); - let start_idx = info.start_index; - let set_index = info.set_index; - let size = info.shard_size; + use crate::erasure::ERASURE_SET_SIZE; + let start_idx = erasure_meta.start_index(); + let size = erasure_meta.size(); let data_cf = db.column::(); let erasure_cf = db.column::(); - let data_index = index.data(); - let coding_index = index.coding(); - let data_end_idx = start_idx + info.data_count as u64; - let set_size = info.parity_count + info.data_count; + let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes(); - let mut present = vec![true; set_size]; - let mut blobs = Vec::with_capacity(set_size); + let present = &mut [true; ERASURE_SET_SIZE]; + let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); - for i in 0..info.data_count as u64 { - let idx = i + start_idx; - if data_index.is_present(idx) { - let blob_bytes = match prev_inserted_blob_datas.get(&(slot, idx)) { + for i in start_idx..coding_end_idx { + if erasure_meta.is_coding_present(i) { + let mut blob_bytes = match new_coding { + Some((new_coding_index, bytes)) if new_coding_index == i => bytes.to_vec(), + _ => erasure_cf + .get_bytes((slot, i))? + .expect("ErasureMeta must have no false positives"), + }; + + blob_bytes.drain(..BLOB_HEADER_SIZE); + + blobs.push(blob_bytes); + } else { + let set_relative_idx = erasure_meta.coding_index_in_set(i).unwrap() as usize; + blobs.push(vec![0; size]); + present[set_relative_idx] = false; + } + } + + assert_ne!(size, 0); + + for i in start_idx..data_end_idx { + let set_relative_idx = erasure_meta.data_index_in_set(i).unwrap() as usize; + + if erasure_meta.is_data_present(i) { + let mut blob_bytes = match prev_inserted_blob_datas.get(&(slot, i)) { Some(bytes) => bytes.to_vec(), None => data_cf - .get_bytes((slot, idx))? + .get_bytes((slot, i))? .expect("erasure_meta must have no false positives"), }; - blobs.push(Blob::new(&blob_bytes)); + // If data is too short, extend it with zeroes + blob_bytes.resize(size, 0u8); + + blobs.insert(set_relative_idx, blob_bytes); } else { - present[i as usize] = false; - blobs.push(Blob::default()); + blobs.insert(set_relative_idx, vec![0u8; size]); + // data erasures must come before any coding erasures if present + present[set_relative_idx] = false; } } - for i in 0..info.parity_count as u64 { - if coding_index.is_present(set_index, i) { - let blob = match prev_inserted_coding.get(&(slot, set_index, i)) { - Some(blob) => (*blob).clone(), - _ => { - let bytes = erasure_cf - .get_bytes((slot, set_index, i))? - .expect("ErasureMeta must have no false positives"); - - Blob::new(&bytes) - } - }; - - assert_eq!(blob.get_coding_header(), Some(info)); - - blobs.push(blob); - } else { - blobs.push(Blob::default()); - present[i as usize + info.data_count] = false; - } - } - - debug_assert_ne!(size, 0); - debug_assert_eq!(blobs.len(), info.data_count + info.parity_count); - let (recovered_data, recovered_coding) = - erasure::decode(&info, slot, &mut blobs[..], &present)?; + session.reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?; trace!( "[recover] reconstruction OK slot: {}, indexes: [{},{})", @@ -1813,6 +1665,7 @@ pub mod tests { use crate::entry::{ create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_hash, Entry, EntrySlice, }; + use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA}; use crate::packet; use rand::seq::SliceRandom; use rand::thread_rng; @@ -1825,9 +1678,6 @@ pub mod tests { use std::iter::FromIterator; use std::time::Duration; - const NUM_CODING: usize = 8; - const NUM_DATA: usize = 8; - #[test] fn test_write_entries() { solana_logger::setup(); @@ -1926,7 +1776,7 @@ pub mod tests { // Test erasure column family let erasure = vec![1u8; 16]; - let erasure_key = (0, 0, 0); + let erasure_key = (0, 0); ledger.erasure_cf.put_bytes(erasure_key, &erasure).unwrap(); let result = ledger @@ -2796,8 +2646,6 @@ pub mod tests { #[test] pub fn test_chaining_tree() { - use crate::erasure::encode_shared; - let blocktree_path = get_tmp_ledger_path("test_chaining_tree"); { let blocktree = Blocktree::open(&blocktree_path).unwrap(); @@ -2837,24 +2685,18 @@ pub mod tests { .cloned() .map(|blob| Arc::new(RwLock::new(blob))) .collect(); - - let coding_blobs = encode_shared(slot, 0, 0, &shared_blobs, NUM_CODING).unwrap(); + let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let coding_blobs = coding_generator.next(&shared_blobs); assert_eq!(coding_blobs.len(), NUM_CODING); let mut rng = thread_rng(); - let coding_locks: Vec<_> = coding_blobs - .iter() - .map(|shared| shared.read().unwrap()) - .collect(); - let coding_iter = coding_locks.iter().map(|lock| &**lock); - - // Randomly pick whether to insert data or coding blobs first + // Randomly pick whether to insert erasure or coding blobs first if rng.gen_bool(0.5) { blocktree.write_blobs(slot_blobs).unwrap(); - blocktree.put_many_coding_blobs(coding_iter).unwrap(); + blocktree.put_many_coding_blob_bytes(&coding_blobs).unwrap(); } else { - blocktree.put_many_coding_blobs(coding_iter).unwrap(); + blocktree.put_many_coding_blob_bytes(&coding_blobs).unwrap(); blocktree.write_blobs(slot_blobs).unwrap(); } } @@ -3340,13 +3182,8 @@ pub mod tests { mod erasure { use super::*; use crate::blocktree::meta::ErasureMetaStatus; - use crate::erasure::{ - encode, encode_shared, - test::{ - generate_ledger_model, generate_shared_test_blobs, generate_test_blobs, - ErasureSpec, SlotSpec, - }, - }; + use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec}; + use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA}; use rand::{thread_rng, Rng}; use std::sync::RwLock; @@ -3358,86 +3195,119 @@ pub mod tests { #[test] fn test_erasure_meta_accuracy() { - use ErasureMetaStatus::DataFull; - - const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; + use crate::erasure::ERASURE_SET_SIZE; + use ErasureMetaStatus::{DataFull, StillNeed}; let path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&path).unwrap(); // two erasure sets - let num_blobs = NUM_DATA * 2; + let num_blobs = NUM_DATA as u64 * 2; let slot = 0; - let mut blobs = generate_test_blobs(0, num_blobs); - blocktree.write_blobs(&blobs[..2]).unwrap(); + let (blobs, _) = make_slot_entries(slot, 0, num_blobs); + let shared_blobs: Vec<_> = blobs + .iter() + .cloned() + .map(|blob| Arc::new(RwLock::new(blob))) + .collect(); - let index = blocktree.index_cf.get(slot).unwrap().expect("index"); - assert!(index.data().is_present(0)); - assert!(index.data().is_present(1)); + blocktree.write_blobs(&blobs[..2]).unwrap(); let erasure_meta_opt = blocktree .erasure_meta(slot, 0) .expect("DB get must succeed"); - assert!(erasure_meta_opt.is_none()); + assert!(erasure_meta_opt.is_some()); + let erasure_meta = erasure_meta_opt.unwrap(); + + let should_need = ERASURE_SET_SIZE - NUM_CODING - 2; + match erasure_meta.status() { + StillNeed(n) => assert_eq!(n, should_need), + _ => panic!("Should still need more blobs"), + }; blocktree.write_blobs(&blobs[2..NUM_DATA]).unwrap(); - // insert all coding blobs in first set - let coding_blobs = encode(slot, 0, 0, &mut blobs[..NUM_DATA], NUM_CODING).unwrap(); - - blocktree.put_many_coding_blobs(&coding_blobs).unwrap(); - - let index = blocktree.index_cf.get(slot).unwrap().expect("index"); let erasure_meta = blocktree .erasure_meta(slot, 0) .expect("DB get must succeed") .unwrap(); - assert_eq!(erasure_meta.status(&index), DataFull); + assert_eq!(erasure_meta.status(), DataFull); + + // insert all coding blobs in first set + let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]); + + for shared_coding_blob in coding_blobs { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size]) + .unwrap(); + } + + let erasure_meta = blocktree + .erasure_meta(slot, 0) + .expect("DB get must succeed") + .unwrap(); + + assert_eq!(erasure_meta.status(), DataFull); // insert blobs in the 2nd set until recovery should be possible given all coding blobs - let set2 = &mut blobs[NUM_DATA..]; + let set2 = &blobs[NUM_DATA..]; let mut end = 1; let blobs_needed = ERASURE_SET_SIZE - NUM_CODING; while end < blobs_needed { blocktree.write_blobs(&set2[end - 1..end]).unwrap(); + let erasure_meta = blocktree + .erasure_meta(slot, 1) + .expect("DB get must succeed") + .unwrap(); + + match erasure_meta.status() { + StillNeed(n) => assert_eq!(n, blobs_needed - end), + _ => panic!("Should still need more blobs"), + }; + end += 1; } // insert all coding blobs in 2nd set. Should trigger recovery - let coding_blobs = encode(slot, 1, 0, &mut set2[..], NUM_CODING).unwrap(); + let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let coding_blobs = coding_generator.next(&shared_blobs[NUM_DATA..]); - for blob in coding_blobs { - //let blob = shared_coding_blob.read().unwrap(); - blocktree.put_coding_blob(&blob).unwrap(); + for shared_coding_blob in coding_blobs { + let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; + blocktree + .put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size]) + .unwrap(); } - let index = blocktree.index_cf.get(slot).unwrap().expect("index"); let erasure_meta = blocktree .erasure_meta(slot, 1) .expect("DB get must succeed") .unwrap(); - assert_eq!(erasure_meta.status(&index), DataFull); - let coding_info = erasure_meta.session_info(); + assert_eq!(erasure_meta.status(), DataFull); + // remove coding blobs, erasure meta should still report being full let (start_idx, coding_end_idx) = - (coding_info.start_index, coding_info.end_indexes().1); + (erasure_meta.start_index(), erasure_meta.end_indexes().1); for idx in start_idx..coding_end_idx { - blocktree.delete_coding_blob(slot, 1, idx).unwrap(); + blocktree.delete_coding_blob(slot, idx).unwrap(); } - let index = blocktree.index_cf.get(slot).unwrap().expect("index"); let erasure_meta = blocktree .erasure_meta(slot, 1) .expect("DB get must succeed") .unwrap(); - assert_eq!(erasure_meta.status(&index), DataFull); + assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); } #[test] @@ -3450,19 +3320,17 @@ pub mod tests { let blocktree = Blocktree::open(&ledger_path).unwrap(); let num_sets = 3; - let data_blobs = generate_shared_test_blobs(0, num_sets * NUM_DATA); + let data_blobs = make_slot_entries(slot, 0, num_sets * NUM_DATA as u64) + .0 + .into_iter() + .map(Blob::into) + .collect::>(); + + let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() { let focused_index = (set_index + 1) * NUM_DATA - 1; - let start_index = (set_index * NUM_DATA) as u64; - let coding_blobs = encode_shared( - slot, - set_index as u64, - start_index, - &data_blobs[..], - NUM_CODING, - ) - .unwrap(); + let coding_blobs = coding_generator.next(&data_blobs); assert_eq!(coding_blobs.len(), NUM_CODING); @@ -3475,47 +3343,46 @@ pub mod tests { // This should trigger recovery of the missing data blob for shared_coding_blob in coding_blobs { let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; blocktree - .put_coding_blob(&blob) + .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) .expect("Inserting coding blobs must succeed"); (slot, blob.index()); } - // verify erasure meta - let index = blocktree.index_cf.get(slot).unwrap().expect("index"); + // Verify the slot meta + let slot_meta = blocktree.meta(slot).unwrap().unwrap(); + assert_eq!(slot_meta.consumed, (NUM_DATA * (set_index + 1)) as u64); + assert_eq!(slot_meta.received, (NUM_DATA * (set_index + 1)) as u64); + assert_eq!(slot_meta.parent_slot, 0); + assert!(slot_meta.next_slots.is_empty()); + assert_eq!(slot_meta.is_connected, true); + if set_index as u64 == num_sets - 1 { + assert_eq!( + slot_meta.last_index, + (NUM_DATA * (set_index + 1) - 1) as u64 + ); + } + let erasure_meta = blocktree .erasure_meta_cf .get((slot, set_index as u64)) .expect("Erasure Meta should be present") .unwrap(); - assert_eq!(erasure_meta.status(&index), ErasureMetaStatus::DataFull); - - // Verify the slot meta - let slot_meta = blocktree.meta(slot).unwrap().unwrap(); - - assert_eq!(slot_meta.consumed, (NUM_DATA * (set_index + 1)) as u64); - assert_eq!(slot_meta.received, (NUM_DATA * (set_index + 1)) as u64); - assert_eq!(slot_meta.parent_slot, 0); - assert!(slot_meta.next_slots.is_empty()); - assert_eq!(slot_meta.is_connected, true); + assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); let retrieved_data = blocktree .data_cf .get_bytes((slot, focused_index as u64)) - .unwrap() - .expect("recovered data"); + .unwrap(); - let data_blob = Blob::new(&retrieved_data); - let deleted_data = Blob::clone(&deleted_data.read().unwrap()); + assert!(retrieved_data.is_some()); - assert_eq!(&data_blob.meta, &deleted_data.meta); - assert_eq!( - &data_blob.get_coding_header(), - &deleted_data.get_coding_header() - ); - assert_eq!(&data_blob.data(), &deleted_data.data()); + let data_blob = Blob::new(&retrieved_data.unwrap()); + + assert_eq!(&data_blob, &*deleted_data.read().unwrap()); } drop(blocktree); @@ -3537,40 +3404,39 @@ pub mod tests { .map(Blob::into) .collect::>(); - let shared_coding_blobs = - crate::erasure::encode_shared(SLOT, SET_INDEX, 0, &data_blobs, NUM_CODING).unwrap(); + let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let shared_coding_blobs = coding_generator.next(&data_blobs); assert_eq!(shared_coding_blobs.len(), NUM_CODING); // Insert coding blobs except 1 and no data. Not enough to do recovery for shared_blob in shared_coding_blobs.iter().skip(1) { let blob = shared_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; blocktree - .put_coding_blob(&blob) + .put_coding_blob_bytes(SLOT, blob.index(), &blob.data[..size]) .expect("Inserting coding blobs must succeed"); } // try recovery even though there aren't enough blobs - let index = blocktree.index_cf.get(SLOT).unwrap().expect("index"); let erasure_meta = blocktree .erasure_meta_cf .get((SLOT, SET_INDEX)) .unwrap() .unwrap(); - assert_eq!(erasure_meta.status(&index), ErasureMetaStatus::StillNeed(1)); + assert_eq!(erasure_meta.status(), ErasureMetaStatus::StillNeed(1)); let prev_inserted_blob_datas = HashMap::new(); - let prev_inserted_coding = HashMap::new(); let attempt_result = try_erasure_recover( &blocktree.db, + &blocktree.session, &erasure_meta, - &index, SLOT, &prev_inserted_blob_datas, - &prev_inserted_coding, + None, ); assert!(attempt_result.is_ok()); @@ -3616,8 +3482,6 @@ pub mod tests { set_index, num_data, num_coding, - data_count: NUM_DATA, - parity_count: NUM_CODING, } }) .collect(); @@ -3665,8 +3529,13 @@ pub mod tests { for shared_coding_blob in &erasure_set.coding { let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; blocktree - .put_coding_blob(&blob) + .put_coding_blob_bytes( + slot, + blob.index(), + &blob.data[..size], + ) .expect("Writing coding blobs must succeed"); } debug!( @@ -3677,8 +3546,13 @@ pub mod tests { // write coding blobs first, then write the data blobs. for shared_coding_blob in &erasure_set.coding { let blob = shared_coding_blob.read().unwrap(); + let size = blob.size() + BLOB_HEADER_SIZE; blocktree - .put_coding_blob(&blob) + .put_coding_blob_bytes( + slot, + blob.index(), + &blob.data[..size], + ) .expect("Writing coding blobs must succeed"); } debug!( @@ -3698,14 +3572,13 @@ pub mod tests { // due to racing, some blobs might not be inserted. don't stop // trying until *some* thread succeeds in writing everything and // triggering recovery. - let index = blocktree.index_cf.get(slot).unwrap().expect("index"); let erasure_meta = blocktree .erasure_meta_cf .get((slot, erasure_set.set_index)) .unwrap() .unwrap(); - let status = erasure_meta.status(&index); + let status = erasure_meta.status(); attempt += 1; debug!( @@ -3733,7 +3606,6 @@ pub mod tests { for erasure_set_model in slot_model.chunks { let set_index = erasure_set_model.set_index as u64; - let index = blocktree.index_cf.get(slot).unwrap().expect("index"); let erasure_meta = blocktree .erasure_meta_cf .get((slot, set_index)) @@ -3746,12 +3618,11 @@ pub mod tests { ); // all possibility for recovery should be exhausted - assert_eq!(erasure_meta.status(&index), ErasureMetaStatus::DataFull); + assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); + // Should have all data + assert_eq!(erasure_meta.num_data(), NUM_DATA); // Should have all coding - assert_eq!( - index.coding().present_in_set(set_index), - erasure_set_model.parity_count - ); + assert_eq!(erasure_meta.num_coding(), NUM_CODING); } } diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index e76d7a7d91..412f8bd727 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -39,10 +39,6 @@ pub mod columns { #[derive(Debug)] /// The root column pub struct Root; - - #[derive(Debug)] - /// The index column - pub struct Index; } pub trait Backend: Sized + Send + Sync { diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index 1d7db40cfe..db2768da07 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -71,21 +71,14 @@ impl Backend for Kvs { impl Column for cf::Coding { const NAME: &'static str = super::ERASURE_CF; - type Index = (u64, u64, u64); + type Index = (u64, u64); - fn key((slot, set_index, index): (u64, u64, u64)) -> Vec { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[..8], slot); - BigEndian::write_u64(&mut key.0[8..16], set_index); - BigEndian::write_u64(&mut key.0[16..], index); - key + fn key(index: (u64, u64)) -> Key { + cf::Data::key(index) } - fn index(key: &Key) -> (u64, u64, u64) { - let slot = BigEndian::read_u64(&key.0[..8]); - let set_index = BigEndian::read_u64(&key.0[8..16]); - let index = BigEndian::read_u64(&key.0[16..]); - (slot, set_index, index) + fn index(key: &Key) -> (u64, u64) { + cf::Data::index(key) } } @@ -179,12 +172,8 @@ impl Column for cf::SlotMeta { } } -impl TypedColumn for cf::SlotMeta { - type Type = super::SlotMeta; -} - -impl Column for cf::Index { - const NAME: &'static str = super::INDEX_CF; +impl Column for cf::SlotMeta { + const NAME: &'static str = super::META_CF; type Index = u64; fn key(slot: u64) -> Key { @@ -198,8 +187,8 @@ impl Column for cf::Index { } } -impl TypedColumn for cf::Index { - type Type = crate::blocktree::meta::Index; +impl TypedColumn for cf::SlotMeta { + type Type = super::SlotMeta; } impl Column for cf::ErasureMeta { diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index 51f8f5f826..b34576d141 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -1,6 +1,6 @@ -use crate::erasure::CodingHeader; +use crate::erasure::{NUM_CODING, NUM_DATA}; use solana_metrics::datapoint; -use std::{collections::BTreeMap, ops::RangeBounds}; +use std::borrow::Borrow; #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] // The Meta column family @@ -27,43 +27,6 @@ pub struct SlotMeta { pub is_connected: bool, } -#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] -/// Index recording presence/absence of blobs -pub struct Index { - pub slot: u64, - data: DataIndex, - coding: CodingIndex, -} - -#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] -pub struct DataIndex { - slot: u64, - /// Map representing presence/absence of data blobs - index: BTreeMap, -} - -#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] -/// Erasure coding information -pub struct CodingIndex { - slot: u64, - /// Map from set index, to hashmap from blob index to presence bool - index: BTreeMap>, -} - -#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, PartialEq)] -/// Erasure coding information -pub struct ErasureMeta { - header: CodingHeader, - set_index: u64, -} - -#[derive(Debug, PartialEq)] -pub enum ErasureMetaStatus { - CanRecover, - DataFull, - StillNeed(usize), -} - impl SlotMeta { pub fn is_full(&self) -> bool { // last_index is std::u64::MAX when it has no information about how @@ -109,174 +72,274 @@ impl SlotMeta { } } -impl Index { - pub(in crate::blocktree) fn new(slot: u64) -> Self { - Index { - slot, - data: DataIndex::default(), - coding: CodingIndex::default(), - } - } - - pub fn data(&self) -> &DataIndex { - &self.data - } - pub fn coding(&self) -> &CodingIndex { - &self.coding - } - - pub fn data_mut(&mut self) -> &mut DataIndex { - &mut self.data - } - pub fn coding_mut(&mut self) -> &mut CodingIndex { - &mut self.coding - } +#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +/// Erasure coding information +pub struct ErasureMeta { + /// Which erasure set in the slot this is + pub set_index: u64, + /// Size of shards in this erasure set + pub size: usize, + /// Bitfield representing presence/absence of data blobs + data: u64, + /// Bitfield representing presence/absence of coding blobs + coding: u64, } -impl CodingIndex { - pub fn is_set_present(&self, set_index: u64) -> bool { - self.index.contains_key(&set_index) - } - - pub fn present_in_set(&self, set_index: u64) -> usize { - match self.index.get(&set_index) { - Some(map) => map.values().filter(|presence| **presence).count(), - None => 0, - } - } - - pub fn is_present(&self, set_index: u64, blob_index: u64) -> bool { - match self.index.get(&set_index) { - Some(map) => *map.get(&blob_index).unwrap_or(&false), - None => false, - } - } - - pub fn set_present(&mut self, set_index: u64, blob_index: u64, present: bool) { - let set_map = self - .index - .entry(set_index) - .or_insert_with(BTreeMap::default); - - set_map.insert(blob_index, present); - } -} - -impl DataIndex { - pub fn present_in_bounds(&self, bounds: impl RangeBounds) -> usize { - self.index - .range(bounds) - .filter(|(_, presence)| **presence) - .count() - } - - pub fn is_present(&self, index: u64) -> bool { - *self.index.get(&index).unwrap_or(&false) - } - - pub fn set_present(&mut self, index: u64, presence: bool) { - self.index.insert(index, presence); - } +#[derive(Debug, PartialEq)] +pub enum ErasureMetaStatus { + CanRecover, + DataFull, + StillNeed(usize), } impl ErasureMeta { - pub(in crate::blocktree) fn new(set_index: u64) -> ErasureMeta { + pub fn new(set_index: u64) -> ErasureMeta { ErasureMeta { - header: CodingHeader::default(), set_index, + size: 0, + data: 0, + coding: 0, } } - pub fn session_info(&self) -> CodingHeader { - self.header - } - - pub fn set_session_info(&mut self, header: CodingHeader) { - self.header = header; - } - - pub fn status(&self, index: &Index) -> ErasureMetaStatus { - use ErasureMetaStatus::*; - - let start_idx = self.header.start_index; - let end_idx = start_idx + self.header.data_count as u64; - - let num_coding = index.coding().present_in_set(self.header.set_index); - let num_data = index.data().present_in_bounds(start_idx..end_idx); - - assert!(self.header.shard_size != 0); - - let (data_missing, coding_missing) = ( - self.header.data_count - num_data, - self.header.parity_count - num_coding, - ); - - let total_missing = data_missing + coding_missing; - - if data_missing > 0 && total_missing <= self.header.parity_count { - CanRecover + pub fn status(&self) -> ErasureMetaStatus { + let (data_missing, coding_missing) = + (NUM_DATA - self.num_data(), NUM_CODING - self.num_coding()); + if data_missing > 0 && data_missing + coding_missing <= NUM_CODING { + assert!(self.size != 0); + ErasureMetaStatus::CanRecover } else if data_missing == 0 { - DataFull + ErasureMetaStatus::DataFull } else { - StillNeed(total_missing - self.header.parity_count) + ErasureMetaStatus::StillNeed(data_missing + coding_missing - NUM_CODING) } } + + pub fn num_coding(&self) -> usize { + self.coding.count_ones() as usize + } + + pub fn num_data(&self) -> usize { + self.data.count_ones() as usize + } + + pub fn is_coding_present(&self, index: u64) -> bool { + if let Some(position) = self.data_index_in_set(index) { + self.coding & (1 << position) != 0 + } else { + false + } + } + + pub fn set_size(&mut self, size: usize) { + self.size = size; + } + + pub fn size(&self) -> usize { + self.size + } + + pub fn set_coding_present(&mut self, index: u64, present: bool) { + if let Some(position) = self.data_index_in_set(index) { + if present { + self.coding |= 1 << position; + } else { + self.coding &= !(1 << position); + } + } + } + + pub fn is_data_present(&self, index: u64) -> bool { + if let Some(position) = self.data_index_in_set(index) { + self.data & (1 << position) != 0 + } else { + false + } + } + + pub fn set_data_present(&mut self, index: u64, present: bool) { + if let Some(position) = self.data_index_in_set(index) { + if present { + self.data |= 1 << position; + } else { + self.data &= !(1 << position); + } + } + } + + pub fn set_data_multi(&mut self, indexes: I, present: bool) + where + I: IntoIterator, + Idx: Borrow, + { + for index in indexes.into_iter() { + self.set_data_present(*index.borrow(), present); + } + } + + pub fn set_coding_multi(&mut self, indexes: I, present: bool) + where + I: IntoIterator, + Idx: Borrow, + { + for index in indexes.into_iter() { + self.set_coding_present(*index.borrow(), present); + } + } + + pub fn set_index_for(index: u64) -> u64 { + index / NUM_DATA as u64 + } + + pub fn data_index_in_set(&self, index: u64) -> Option { + let set_index = Self::set_index_for(index); + + if set_index == self.set_index { + Some(index - self.start_index()) + } else { + None + } + } + + pub fn coding_index_in_set(&self, index: u64) -> Option { + self.data_index_in_set(index).map(|i| i + NUM_DATA as u64) + } + + pub fn start_index(&self) -> u64 { + self.set_index * NUM_DATA as u64 + } + + /// returns a tuple of (data_end, coding_end) + pub fn end_indexes(&self) -> (u64, u64) { + let start = self.start_index(); + (start + NUM_DATA as u64, start + NUM_CODING as u64) + } } -#[cfg(test)] -mod test { - use super::*; +#[test] +fn test_meta_indexes() { + use rand::{thread_rng, Rng}; + // to avoid casts everywhere + const NUM_DATA: u64 = crate::erasure::NUM_DATA as u64; - const NUM_DATA: u64 = 7; - const NUM_CODING: u64 = 8; + let mut rng = thread_rng(); - fn sample_header() -> CodingHeader { - CodingHeader { - shard_size: 1, - data_count: NUM_DATA as usize, - parity_count: NUM_CODING as usize, - ..CodingHeader::default() - } + for _ in 0..100 { + let set_index = rng.gen_range(0, 1_000); + let blob_index = (set_index * NUM_DATA) + rng.gen_range(0, NUM_DATA); + + assert_eq!(set_index, ErasureMeta::set_index_for(blob_index)); + let e_meta = ErasureMeta::new(set_index); + + assert_eq!(e_meta.start_index(), set_index * NUM_DATA); + let (data_end_idx, coding_end_idx) = e_meta.end_indexes(); + assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA); + assert_eq!(coding_end_idx, set_index * NUM_DATA + NUM_CODING as u64); } - #[test] - fn test_erasure_meta_status() { - let set_index = 0; + let mut e_meta = ErasureMeta::new(0); - let header = sample_header(); - let mut e_meta = ErasureMeta::new(set_index); - e_meta.set_session_info(header); + assert_eq!(e_meta.data_index_in_set(0), Some(0)); + assert_eq!(e_meta.data_index_in_set(NUM_DATA / 2), Some(NUM_DATA / 2)); + assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), Some(NUM_DATA - 1)); + assert_eq!(e_meta.data_index_in_set(NUM_DATA), None); + assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None); - let mut index = Index::new(0); + e_meta.set_index = 1; - assert_eq!(e_meta.status(&index), ErasureMetaStatus::StillNeed(7)); + assert_eq!(e_meta.data_index_in_set(0), None); + assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), None); + assert_eq!(e_meta.data_index_in_set(NUM_DATA), Some(0)); + assert_eq!( + e_meta.data_index_in_set(NUM_DATA * 2 - 1), + Some(NUM_DATA - 1) + ); + assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None); +} - for i in 0..NUM_DATA { - index.data_mut().set_present(i, true); - } +#[test] +fn test_meta_coding_present() { + let mut e_meta = ErasureMeta::default(); - assert_eq!(e_meta.status(&index), ErasureMetaStatus::DataFull); + e_meta.set_coding_multi(0..NUM_CODING as u64, true); + for i in 0..NUM_CODING as u64 { + assert_eq!(e_meta.is_coding_present(i), true); + } + for i in NUM_CODING as u64..NUM_DATA as u64 { + assert_eq!(e_meta.is_coding_present(i), false); + } - index.data_mut().set_present(NUM_DATA - 1, false); + e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64); + let start_idx = e_meta.start_index(); + e_meta.set_coding_multi(start_idx..start_idx + NUM_CODING as u64, true); - assert_eq!(e_meta.status(&index), ErasureMetaStatus::StillNeed(1)); - - for i in 0..NUM_DATA - 2 { - index.data_mut().set_present(i, false); - } - - assert_eq!(e_meta.status(&index), ErasureMetaStatus::StillNeed(6)); - - for i in 0..NUM_CODING { - index.coding_mut().set_present(set_index, i, true); - } - - index.data_mut().set_present(NUM_DATA - 1, false); - - for i in 0..NUM_DATA - 1 { - index.data_mut().set_present(i, true); - - assert_eq!(e_meta.status(&index), ErasureMetaStatus::CanRecover); - } + for i in start_idx..start_idx + NUM_CODING as u64 { + e_meta.set_coding_present(i, true); + assert_eq!(e_meta.is_coding_present(i), true); + } + for i in start_idx + NUM_CODING as u64..start_idx + NUM_DATA as u64 { + assert_eq!(e_meta.is_coding_present(i), false); + } +} + +#[test] +fn test_erasure_meta_status() { + use rand::{seq::SliceRandom, thread_rng}; + // Local constansts just used to avoid repetitive casts + const N_DATA: u64 = crate::erasure::NUM_DATA as u64; + const N_CODING: u64 = crate::erasure::NUM_CODING as u64; + + let mut e_meta = ErasureMeta::default(); + let mut rng = thread_rng(); + let data_indexes: Vec = (0..N_DATA).collect(); + let coding_indexes: Vec = (0..N_CODING).collect(); + + assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(NUM_DATA)); + + e_meta.set_data_multi(0..N_DATA, true); + + assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull); + + e_meta.size = 1; + e_meta.set_coding_multi(0..N_CODING, true); + + assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull); + + for &idx in data_indexes.choose_multiple(&mut rng, NUM_CODING) { + e_meta.set_data_present(idx, false); + + assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); + } + + e_meta.set_data_multi(0..N_DATA, true); + + for &idx in coding_indexes.choose_multiple(&mut rng, NUM_CODING) { + e_meta.set_coding_present(idx, false); + + assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull); + } +} + +#[test] +fn test_meta_data_present() { + let mut e_meta = ErasureMeta::default(); + + e_meta.set_data_multi(0..NUM_DATA as u64, true); + for i in 0..NUM_DATA as u64 { + assert_eq!(e_meta.is_data_present(i), true); + } + for i in NUM_DATA as u64..2 * NUM_DATA as u64 { + assert_eq!(e_meta.is_data_present(i), false); + } + + e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 23) as u64); + let start_idx = e_meta.start_index(); + e_meta.set_data_multi(start_idx..start_idx + NUM_DATA as u64, true); + + for i in start_idx..start_idx + NUM_DATA as u64 { + assert_eq!(e_meta.is_data_present(i), true); + } + for i in start_idx - NUM_DATA as u64..start_idx { + assert_eq!(e_meta.is_data_present(i), false); } } diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index 9f9030d66e..ef696d7737 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -30,9 +30,7 @@ impl Backend for Rocks { type Error = rocksdb::Error; fn open(path: &Path) -> Result { - use crate::blocktree::db::columns::{ - Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, SlotMeta, - }; + use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta}; fs::create_dir_all(&path)?; @@ -42,14 +40,12 @@ impl Backend for Rocks { // Column family names let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); - let dead_slots_cf_descriptor = - ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options()); + let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options()); let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options()); - let index_desc = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options()); let cfs = vec![ meta_cf_descriptor, @@ -59,7 +55,6 @@ impl Backend for Rocks { erasure_meta_cf_descriptor, orphans_cf_descriptor, root_cf_descriptor, - index_desc, ]; // Open the database @@ -69,15 +64,12 @@ impl Backend for Rocks { } fn columns(&self) -> Vec<&'static str> { - use crate::blocktree::db::columns::{ - Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, SlotMeta, - }; + use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta}; vec![ Coding::NAME, ErasureMeta::NAME, DeadSlots::NAME, - Index::NAME, Data::NAME, Orphans::NAME, Root::NAME, @@ -143,21 +135,14 @@ impl Backend for Rocks { impl Column for cf::Coding { const NAME: &'static str = super::ERASURE_CF; - type Index = (u64, u64, u64); + type Index = (u64, u64); - fn key((slot, set_index, index): (u64, u64, u64)) -> Vec { - let mut key = vec![0; 24]; - BigEndian::write_u64(&mut key[..8], slot); - BigEndian::write_u64(&mut key[8..16], set_index); - BigEndian::write_u64(&mut key[16..], index); - key + fn key(index: (u64, u64)) -> Vec { + cf::Data::key(index) } - fn index(key: &[u8]) -> (u64, u64, u64) { - let slot = BigEndian::read_u64(&key[..8]); - let set_index = BigEndian::read_u64(&key[8..16]); - let index = BigEndian::read_u64(&key[16..]); - (slot, set_index, index) + fn index(key: &[u8]) -> (u64, u64) { + cf::Data::index(key) } } @@ -278,25 +263,6 @@ impl TypedColumn for cf::ErasureMeta { type Type = super::ErasureMeta; } -impl Column for cf::Index { - const NAME: &'static str = super::INDEX_CF; - type Index = u64; - - fn key(slot: u64) -> Vec { - let mut key = vec![0; 8]; - BigEndian::write_u64(&mut key[..], slot); - key - } - - fn index(key: &[u8]) -> u64 { - BigEndian::read_u64(&key[..8]) - } -} - -impl TypedColumn for cf::Index { - type Type = crate::blocktree::meta::Index; -} - impl DbCursor for DBRawIterator { fn valid(&self) -> bool { DBRawIterator::valid(self) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index c9c81e06a1..d27e0e6517 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -3,6 +3,7 @@ use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastR use self::standard_broadcast_run::StandardBroadcastRun; use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError}; +use crate::erasure::CodingGenerator; use crate::poh_recorder::WorkingBankEntries; use crate::result::{Error, Result}; use crate::service::Service; @@ -79,6 +80,7 @@ trait BroadcastRun { } struct Broadcast { + coding_generator: CodingGenerator, thread_pool: ThreadPool, } @@ -113,7 +115,10 @@ impl BroadcastStage { blocktree: &Arc, mut broadcast_stage_run: impl BroadcastRun, ) -> BroadcastStageReturnType { + let coding_generator = CodingGenerator::default(); + let mut broadcast = Broadcast { + coding_generator, thread_pool: rayon::ThreadPoolBuilder::new() .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) .build() diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index c4dd15f900..efad43196d 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -1,6 +1,6 @@ use crate::entry::Entry; use crate::entry::EntrySlice; -use crate::erasure; +use crate::erasure::CodingGenerator; use crate::packet::{self, SharedBlob}; use crate::poh_recorder::WorkingBankEntries; use crate::result::Result; @@ -81,7 +81,7 @@ pub(super) fn entries_to_blobs( last_tick: u64, bank: &Bank, keypair: &Keypair, - set_index: &mut u64, + coding_generator: &mut CodingGenerator, ) -> (Vec, Vec) { let blobs = generate_data_blobs( ventries, @@ -92,18 +92,7 @@ pub(super) fn entries_to_blobs( &keypair, ); - let start_index = blobs[0].read().unwrap().index(); - - let coding = generate_coding_blobs( - &blobs, - &thread_pool, - bank.slot(), - *set_index, - start_index, - &keypair, - ); - - *set_index += 1; + let coding = generate_coding_blobs(&blobs, &thread_pool, coding_generator, &keypair); (blobs, coding) } @@ -152,15 +141,10 @@ pub(super) fn generate_data_blobs( pub(super) fn generate_coding_blobs( blobs: &[SharedBlob], thread_pool: &ThreadPool, - slot: u64, - set_index: u64, - start_index: u64, + coding_generator: &mut CodingGenerator, keypair: &Keypair, ) -> Vec { - let set_len = blobs.len(); - - let coding = erasure::encode_shared(slot, set_index, start_index, blobs, set_len) - .expect("Erasure coding failed"); + let coding = coding_generator.next(&blobs); thread_pool.install(|| { coding.par_iter().for_each(|c| { diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 30ea71dcd3..2e26fed504 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,13 +1,11 @@ use super::*; use solana_sdk::hash::Hash; -pub(super) struct FailEntryVerificationBroadcastRun { - set_index: u64, -} +pub(super) struct FailEntryVerificationBroadcastRun {} impl FailEntryVerificationBroadcastRun { pub(super) fn new() -> Self { - Self { set_index: 0 } + Self {} } } @@ -51,7 +49,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { last_tick, &bank, &keypair, - &mut self.set_index, + &mut broadcast.coding_generator, ); blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 597ca2b7a0..90a2263a92 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -10,14 +10,12 @@ struct BroadcastStats { pub(super) struct StandardBroadcastRun { stats: BroadcastStats, - set_index: u64, } impl StandardBroadcastRun { pub(super) fn new() -> Self { Self { stats: BroadcastStats::default(), - set_index: 0, } } @@ -81,7 +79,7 @@ impl BroadcastRun for StandardBroadcastRun { last_tick, &bank, &keypair, - &mut self.set_index, + &mut broadcast.coding_generator, ); blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; diff --git a/core/src/chacha.rs b/core/src/chacha.rs index f5ef7f8a76..6ad2a948d1 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -133,7 +133,7 @@ mod tests { hasher.hash(&buf[..size]); // golden needs to be updated if blob stuff changes.... - let golden: Hash = "AMKCEbK6txetPPEQ8JDVdrqpefgtGRzN6ng2gSmKy6Fv" + let golden: Hash = "E2HZjSC6VgH4nmEiTbMDATTeBcFjwSYz7QYvU7doGNhD" .parse() .unwrap(); diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index a6c7e76725..10a44e0eff 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -4,7 +4,8 @@ use crate::crds_value::EpochSlots; use crate::result::Result; use crate::service::Service; use byteorder::{ByteOrder, LittleEndian}; -use rand::{seq::SliceRandom, Rng, SeedableRng}; +use rand::seq::SliceRandom; +use rand::SeedableRng; use rand_chacha::ChaChaRng; use solana_metrics::datapoint; use solana_runtime::epoch_schedule::EpochSchedule; @@ -271,8 +272,6 @@ impl ClusterInfoRepairListener { let mut total_data_blobs_sent = 0; let mut total_coding_blobs_sent = 0; let mut num_slots_repaired = 0; - let mut rng = rand::thread_rng(); - let max_confirmed_repairee_epoch = epoch_schedule.get_stakers_epoch(repairee_epoch_slots.root); let max_confirmed_repairee_slot = @@ -306,38 +305,20 @@ impl ClusterInfoRepairListener { // a database iterator over the slots because by the time this node is // sending the blobs in this slot for repair, we expect these slots // to be full. - - if let Some(data_blob) = blocktree - .get_data_blob(slot, blob_index as u64) + if let Some(blob_data) = blocktree + .get_data_blob_bytes(slot, blob_index as u64) .expect("Failed to read data blob from blocktree") { - socket.send_to(&data_blob.data[..], repairee_tvu)?; + socket.send_to(&blob_data[..], repairee_tvu)?; total_data_blobs_sent += 1; + } - if let Some(coding_header) = data_blob.get_coding_header() { - let ratio = std::cmp::max( - 1, - coding_header.parity_count / coding_header.data_count, - ); - - for _ in 0..ratio { - let chosen_index = - rng.gen_range(0, coding_header.parity_count as u64); - - let coding_blob_opt = blocktree - .get_coding_blob( - slot, - coding_header.set_index, - chosen_index, - ) - .expect("Failed to read coding blob from blocktree"); - - if let Some(coding_blob) = coding_blob_opt { - socket.send_to(&coding_blob.data[..], repairee_tvu)?; - total_coding_blobs_sent += 1; - } - } - } + if let Some(coding_bytes) = blocktree + .get_coding_blob_bytes(slot, blob_index as u64) + .expect("Failed to read coding blob from blocktree") + { + socket.send_to(&coding_bytes[..], repairee_tvu)?; + total_coding_blobs_sent += 1; } } diff --git a/core/src/erasure.rs b/core/src/erasure.rs index 3daefa3bca..99784fa053 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -41,261 +41,237 @@ //! //! use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; -use std::{ - borrow::BorrowMut, - sync::{Arc, RwLock}, -}; +use std::cmp; +use std::convert::AsMut; +use std::sync::{Arc, RwLock}; use reed_solomon_erasure::ReedSolomon; -/// Max number of data blobs in an erasure set; Also max number of parity blobs. -pub const MAX_SET_SIZE: usize = 255; +//TODO(sakridge) pick these values +/// Number of data blobs +pub const NUM_DATA: usize = 8; +/// Number of coding blobs; also the maximum number that can go missing. +pub const NUM_CODING: usize = 8; +/// Total number of blobs in an erasure set; includes data and coding blobs +pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; type Result = std::result::Result; -/// This struct is stored in the header of any data blob that has been encoded -/// Every coding blob contains it. -#[derive(Clone, Copy, Default, Debug, PartialEq, Serialize, Deserialize)] -pub struct CodingHeader { - /// Index of the first data blob in the set - pub start_index: u64, - /// Index of the erasure set. Slot relative. - pub set_index: u64, - /// Number of data blobs in the set. - pub data_count: usize, - /// Number of parity blobs in the set. - pub parity_count: usize, - /// Size of the largest data blob in the set, including the header. - pub shard_size: usize, +/// Represents an erasure "session" with a particular configuration and number of data and coding +/// blobs +#[derive(Debug, Clone)] +pub struct Session(ReedSolomon); + +/// Generates coding blobs on demand given data blobs +#[derive(Debug, Clone)] +pub struct CodingGenerator { + /// SharedBlobs that couldn't be used in last call to next() + leftover: Vec, + session: Arc, } -impl CodingHeader { - /// returns the set-relative index of the blob with the given index. - pub fn data_index_in_set(&self, index: u64) -> u64 { - index - self.start_index +impl Session { + pub fn new(data_count: usize, coding_count: usize) -> Result { + let rs = ReedSolomon::new(data_count, coding_count)?; + + Ok(Session(rs)) } - /// returns the set-relative index of the coding blob with the given index. - /// in the context of erasure/recovery coding blobs come after data-blobs. - pub fn coding_index_in_set(&self, index: u64) -> u64 { - index + self.data_count as u64 + /// Create coding blocks by overwriting `parity` + pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> { + self.0.encode_sep(data, parity)?; + + Ok(()) } - /// returns the end boundary indexes of the data and coding blobs in this set, respectively. - pub fn end_indexes(&self) -> (u64, u64) { - let start = self.start_index; - (start + self.data_count as u64, self.parity_count as u64) + /// Recover data + coding blocks into data blocks + /// # Arguments + /// * `data` - array of data blocks to recover into + /// * `coding` - array of coding blocks + /// * `erasures` - list of indices in data where blocks should be recovered + pub fn decode_blocks(&self, blocks: &mut [&mut [u8]], present: &[bool]) -> Result<()> { + self.0.reconstruct(blocks, present)?; + + Ok(()) } -} -/// Erasure code data blobs. -/// -/// # Arguments -/// -/// * `slot` - slot all blobs belong to -/// * `set_index` - index of the erasure set being encoded -/// * `start_index` - index of the first data blob -/// * `blobs` - data blobs to be encoded. an amount greater than `MAX_SET_SIZE` causes errors. -/// * `parity` - number of parity blobs to create. values greater than `MAX_SET_SIZE` cause errors. -pub fn encode>( - slot: u64, - set_index: u64, - start_index: u64, - blobs: &mut [B], - parity: usize, -) -> Result> { - let data = blobs.len(); - // this would fail if there are too few or too many blobs - let rs = ReedSolomon::new(data, parity)?; - let mut header = CodingHeader { - data_count: data, - parity_count: parity, - start_index, - set_index, - shard_size: 0, - }; + /// Returns `(number_of_data_blobs, number_of_coding_blobs)` + pub fn dimensions(&self) -> (usize, usize) { + (self.0.data_shard_count(), self.0.parity_shard_count()) + } - let shard_size = blobs - .iter_mut() - .map(|blob| (*blob).borrow().data_size() as usize) - .max() - .expect("must be >=1 blobs"); + /// Reconstruct any missing blobs in this erasure set if possible + /// Re-indexes any coding blobs that have been reconstructed and fixes up size in metadata + /// Assumes that the user has sliced into the blobs appropriately already. else recovery will + /// return an error or garbage data + pub fn reconstruct_blobs( + &self, + blobs: &mut [B], + present: &[bool], + size: usize, + block_start_idx: u64, + slot: u64, + ) -> Result<(Vec, Vec)> + where + B: AsMut<[u8]>, + { + let mut blocks: Vec<&mut [u8]> = blobs.iter_mut().map(AsMut::as_mut).collect(); - //header.shard_size = crate::packet::BLOB_DATA_SIZE; - header.shard_size = shard_size; + trace!("[reconstruct_blobs] present: {:?}, size: {}", present, size,); - let slices = blobs - .iter_mut() - .map(|b| { - let blob: &mut Blob = b.borrow_mut(); - blob.set_coding_header(&header); - &blob.data[..shard_size] - }) - .collect::>(); + // Decode the blocks + self.decode_blocks(blocks.as_mut_slice(), &present)?; - let mut parity_blocks = (0..parity).map(|_| vec![0; shard_size]).collect::>(); - let mut parity_slices = parity_blocks - .iter_mut() - .map(|v| &mut v[..]) - .collect::>(); + let mut recovered_data = vec![]; + let mut recovered_coding = vec![]; - rs.encode_sep(&slices[..], &mut parity_slices[..])?; + let erasures = present + .iter() + .enumerate() + .filter_map(|(i, present)| if *present { None } else { Some(i) }); - let parity = parity_blocks - .into_iter() - .enumerate() - .map(|(idx, block)| { - let mut blob = Blob::default(); - (&mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + shard_size]) - .copy_from_slice(&block); - blob.set_slot(slot); - blob.set_size(shard_size - BLOB_HEADER_SIZE); - //blob.set_data_size(shard_size as u64); - blob.set_coding(); - blob.set_coding_header(&header); - blob.set_index(idx as u64); + // Create the missing blobs from the reconstructed data + for n in erasures { + let data_size; + let idx; + let first_byte; - blob - }) - .collect(); + if n < NUM_DATA { + let mut blob = Blob::new(&blocks[n]); - Ok(parity) -} + data_size = blob.data_size() as usize - BLOB_HEADER_SIZE; + idx = n as u64 + block_start_idx; + first_byte = blob.data[0]; -/// See `encode`. -/// Convenience function to encode and return `Arc>`s -pub fn encode_shared( - slot: u64, - set_index: u64, - start_index: u64, - blobs: &[SharedBlob], - parity: usize, -) -> Result> { - let mut locks = blobs - .iter() - .map(|shared_blob| shared_blob.write().unwrap()) - .collect::>(); + blob.set_size(data_size); + recovered_data.push(blob); + } else { + let mut blob = Blob::default(); + blob.data_mut()[..size].copy_from_slice(&blocks[n]); + data_size = size; + idx = (n as u64 + block_start_idx) - NUM_DATA as u64; + first_byte = blob.data[0]; - let mut blobs = locks.iter_mut().map(|lock| &mut **lock).collect::>(); + blob.set_slot(slot); + blob.set_index(idx); + blob.set_size(data_size); + recovered_coding.push(blob); + } - let parity_blobs = encode(slot, set_index, start_index, &mut blobs[..], parity)? - .into_iter() - .map(|blob| Arc::new(RwLock::new(blob))) - .collect(); - - Ok(parity_blobs) -} - -/// Attempt to recover missing blobs -/// # Arguments -/// * `info` - the encoding parameters for this erasure set -/// * `slot` - the slot that these blobs belong to -/// * `blobs` - data blobs, followed by parity blobs. blobs must be in order or the recovery will -/// succeed but return garbage. -/// * `present` - each element indicates the presence of the blob with the same set-relative index -pub fn decode( - info: &CodingHeader, - slot: u64, - blobs: &mut [B], - present: &[bool], -) -> Result<(Vec, Vec)> -where - B: BorrowMut, -{ - let rs = ReedSolomon::new(info.data_count as usize, info.parity_count as usize)?; - - let mut blocks = vec![]; - - for (idx, blob) in blobs.iter_mut().enumerate() { - if idx < info.data_count { - blocks.push(&mut blob.borrow_mut().data[..info.shard_size as usize]); - } else { - blocks.push( - &mut blob.borrow_mut().data - [BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + info.shard_size as usize], + trace!( + "[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}", + n, + idx, + data_size, + first_byte ); } + + Ok((recovered_data, recovered_coding)) } - - assert_eq!( - blocks.len(), - rs.data_shard_count() + rs.parity_shard_count() - ); - rs.reconstruct(&mut blocks[..], present)?; - - let mut recovered_data = vec![]; - let mut recovered_coding = vec![]; - - let erasures = present - .iter() - .enumerate() - .filter_map(|(i, present)| if *present { None } else { Some(i) }); - - let shard_size = info.shard_size as usize; - - // Create the missing blobs from the reconstructed data - for n in erasures { - let data_size; - let idx; - let first_byte; - - if n < info.data_count { - let mut blob: Box = Box::default(); - (&mut blob.data[..shard_size]).copy_from_slice(&blocks[n]); - - data_size = blob.data_size() as usize - BLOB_HEADER_SIZE; - idx = n as u64 + info.start_index; - first_byte = blob.data[0]; - - blob.set_slot(slot); - blob.set_index(idx); - blob.set_size(data_size); - blob.set_coding_header(info); - - recovered_data.push(*blob); - } else { - let mut blob = Blob::default(); - (&mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + shard_size]) - .copy_from_slice(&blocks[n]); - data_size = shard_size; - idx = (n - info.data_count) as u64; - first_byte = blob.data[0]; - - blob.set_slot(slot); - blob.set_index(idx); - blob.set_size(data_size); - blob.set_coding_header(info); - recovered_coding.push(blob); - } - - trace!( - "[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}", - n, - idx, - data_size, - first_byte - ); - } - - Ok((recovered_data, recovered_coding)) } -/// See `decode` -/// Convenience function to accept and return `Arc>`s -pub fn decode_shared( - info: &CodingHeader, - slot: u64, - blobs: &[SharedBlob], - present: &[bool], -) -> Result<(Vec, Vec)> { - let mut locks = blobs - .iter() - .map(|shared_blob| shared_blob.write().unwrap()) - .collect::>(); +impl CodingGenerator { + pub fn new(session: Arc) -> Self { + CodingGenerator { + leftover: Vec::with_capacity(session.0.data_shard_count()), + session, + } + } - let mut blobs = locks.iter_mut().map(|lock| &mut **lock).collect::>(); + /// Yields next set of coding blobs, if any. + /// Must be called with consecutive data blobs within a slot. + /// + /// Passing in a slice with the first blob having a new slot will cause internal state to + /// reset, so the above concern does not apply to slot boundaries, only indexes within a slot + /// must be consecutive. + /// + /// If used improperly, it my return garbage coding blobs, but will not give an + /// error. + pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec { + let (num_data, num_coding) = self.session.dimensions(); + let mut next_coding = + Vec::with_capacity((self.leftover.len() + next_data.len()) / num_data * num_coding); - decode(info, slot, &mut blobs[..], present) + if !self.leftover.is_empty() + && !next_data.is_empty() + && self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot() + { + self.leftover.clear(); + } + + let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect(); + + for data_blobs in next_data.chunks(num_data) { + if data_blobs.len() < num_data { + self.leftover = data_blobs.to_vec(); + break; + } + self.leftover.clear(); + + // find max_data_size for the erasure set + let max_data_size = data_blobs + .iter() + .fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)); + + let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); + let data_ptrs: Vec<_> = data_locks + .iter() + .map(|l| &l.data[..max_data_size]) + .collect(); + + let mut coding_blobs = Vec::with_capacity(num_coding); + + for data_blob in &data_locks[..num_coding] { + let index = data_blob.index(); + let slot = data_blob.slot(); + let id = data_blob.id(); + + let mut coding_blob = Blob::default(); + coding_blob.set_index(index); + coding_blob.set_slot(slot); + coding_blob.set_id(&id); + coding_blob.set_size(max_data_size); + coding_blob.set_coding(); + + coding_blobs.push(coding_blob); + } + + if { + let mut coding_ptrs: Vec<_> = coding_blobs + .iter_mut() + .map(|blob| &mut blob.data_mut()[..max_data_size]) + .collect(); + + self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice()) + } + .is_ok() + { + next_coding.append(&mut coding_blobs); + } + } + + next_coding + .into_iter() + .map(|blob| Arc::new(RwLock::new(blob))) + .collect() + } +} + +impl Default for Session { + fn default() -> Session { + Session::new(NUM_DATA, NUM_CODING).unwrap() + } +} + +impl Default for CodingGenerator { + fn default() -> Self { + let session = Session::default(); + CodingGenerator { + leftover: Vec::with_capacity(session.0.data_shard_count()), + session: Arc::new(session), + } + } } #[cfg(test)] @@ -309,10 +285,6 @@ pub mod test { use solana_sdk::signature::{Keypair, KeypairUtil}; use std::borrow::Borrow; - const NUM_DATA: usize = 8; - const NUM_CODING: usize = 9; - const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; - /// Specifies the contents of a 16-data-blob and 4-coding-blob erasure set /// Exists to be passed to `generate_blocktree_with_coding` #[derive(Debug, Copy, Clone)] @@ -321,8 +293,6 @@ pub mod test { pub set_index: u64, pub num_data: usize, pub num_coding: usize, - pub data_count: usize, - pub parity_count: usize, } /// Specifies the contents of a slot @@ -348,54 +318,107 @@ pub mod test { pub start_index: u64, pub coding: Vec, pub data: Vec, - pub data_count: usize, - pub parity_count: usize, } - fn test_toss_and_recover(slot: u64, data_blobs: &[SharedBlob], coding_blobs: &[SharedBlob]) { + #[test] + fn test_coding() { + const N_DATA: usize = 4; + const N_CODING: usize = 2; + + let session = Session::new(N_DATA, N_CODING).unwrap(); + + let mut vs: Vec> = (0..N_DATA as u8).map(|i| (i..(16 + i)).collect()).collect(); + let v_orig: Vec = vs[0].clone(); + + let mut coding_blocks: Vec<_> = (0..N_CODING).map(|_| vec![0u8; 16]).collect(); + + let mut coding_blocks_slices: Vec<_> = + coding_blocks.iter_mut().map(Vec::as_mut_slice).collect(); + let v_slices: Vec<_> = vs.iter().map(Vec::as_slice).collect(); + + session + .encode(v_slices.as_slice(), coding_blocks_slices.as_mut_slice()) + .expect("encoding must succeed"); + + trace!("test_coding: coding blocks:"); + for b in &coding_blocks { + trace!("test_coding: {:?}", b); + } + + let erasure: usize = 1; + let present = &mut [true; N_DATA + N_CODING]; + present[erasure] = false; + let erased = vs[erasure].clone(); + + // clear an entry + vs[erasure as usize].copy_from_slice(&[0; 16]); + + let mut blocks: Vec<_> = vs + .iter_mut() + .chain(coding_blocks.iter_mut()) + .map(Vec::as_mut_slice) + .collect(); + + session + .decode_blocks(blocks.as_mut_slice(), present) + .expect("decoding must succeed"); + + trace!("test_coding: vs:"); + for v in &vs { + trace!("test_coding: {:?}", v); + } + assert_eq!(v_orig, vs[0]); + assert_eq!(erased, vs[erasure]); + } + + fn test_toss_and_recover( + session: &Session, + data_blobs: &[SharedBlob], + coding_blobs: &[SharedBlob], + block_start_idx: usize, + ) { + let size = coding_blobs[0].read().unwrap().size(); + let mut blobs: Vec = Vec::with_capacity(ERASURE_SET_SIZE); blobs.push(SharedBlob::default()); // empty data, erasure at zero - for blob in &data_blobs[1..] { + for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] { // skip first blob blobs.push(blob.clone()); } blobs.push(SharedBlob::default()); // empty coding, erasure at zero - for blob in &coding_blobs[1..] { + for blob in &coding_blobs[1..NUM_CODING] { blobs.push(blob.clone()); } - let info = coding_blobs[0] - .read() - .unwrap() - .get_coding_header() - .expect("coding info"); - // toss one data and one coding let mut present = vec![true; blobs.len()]; present[0] = false; - present[data_blobs.len()] = false; + present[NUM_DATA] = false; - let (recovered_data, recovered_coding) = - decode_shared(&info, slot, &mut blobs[..], &present) - .expect("reconstruction must succeed"); + let (recovered_data, recovered_coding) = session + .reconstruct_shared_blobs(&mut blobs, &present, size, block_start_idx as u64, 0) + .expect("reconstruction must succeed"); assert_eq!(recovered_data.len(), 1); assert_eq!(recovered_coding.len(), 1); assert_eq!( blobs[1].read().unwrap().meta, - data_blobs[1].read().unwrap().meta + data_blobs[block_start_idx + 1].read().unwrap().meta ); assert_eq!( blobs[1].read().unwrap().data(), - data_blobs[1].read().unwrap().data() + data_blobs[block_start_idx + 1].read().unwrap().data() + ); + assert_eq!( + recovered_data[0].meta, + data_blobs[block_start_idx].read().unwrap().meta ); - assert_eq!(recovered_data[0].meta, data_blobs[0].read().unwrap().meta); assert_eq!( recovered_data[0].data(), - data_blobs[0].read().unwrap().data() + data_blobs[block_start_idx].read().unwrap().data() ); assert_eq!( recovered_coding[0].data(), @@ -405,27 +428,72 @@ pub mod test { #[test] fn test_erasure_generate_coding() { + solana_logger::setup(); + + // trivial case + let mut coding_generator = CodingGenerator::default(); + let blobs = Vec::new(); + for _ in 0..NUM_DATA * 2 { + let coding = coding_generator.next(&blobs); + assert!(coding.is_empty()); + } + // test coding by iterating one blob at a time - let test_blobs = generate_shared_test_blobs(0, NUM_DATA * 2); + let data_blobs = generate_test_blobs(0, NUM_DATA * 2); - for (idx, data_blobs) in test_blobs.chunks_exact(NUM_DATA).enumerate() { - let coding_blobs = encode_shared( - 0, - idx as u64, - (idx * NUM_DATA) as u64, - &data_blobs[..], - NUM_CODING, - ) - .unwrap(); + for (i, blob) in data_blobs.iter().cloned().enumerate() { + let coding_blobs = coding_generator.next(&[blob]); - test_toss_and_recover(0, &data_blobs, &coding_blobs); + if !coding_blobs.is_empty() { + assert_eq!(i % NUM_DATA, NUM_DATA - 1); + assert_eq!(coding_blobs.len(), NUM_CODING); + + for j in 0..NUM_CODING { + assert_eq!( + coding_blobs[j].read().unwrap().index(), + ((i / NUM_DATA) * NUM_DATA + j) as u64 + ); + } + test_toss_and_recover( + &coding_generator.session, + &data_blobs, + &coding_blobs, + i - (i % NUM_DATA), + ); + } } } #[test] - fn test_erasure_generate_blocktree_with_coding() { + fn test_erasure_generate_coding_reset_on_new_slot() { solana_logger::setup(); + let mut coding_generator = CodingGenerator::default(); + + // test coding by iterating one blob at a time + let data_blobs = generate_test_blobs(0, NUM_DATA * 2); + + for i in NUM_DATA..NUM_DATA * 2 { + data_blobs[i].write().unwrap().set_slot(1); + } + + let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]); + assert!(coding_blobs.is_empty()); + + let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]); + + assert_eq!(coding_blobs.len(), NUM_CODING); + + test_toss_and_recover( + &coding_generator.session, + &data_blobs, + &coding_blobs, + NUM_DATA, + ); + } + + #[test] + fn test_erasure_generate_blocktree_with_coding() { let cases = vec![ (NUM_DATA, NUM_CODING, 7, 5), (NUM_DATA - 6, NUM_CODING - 1, 5, 7), @@ -441,8 +509,6 @@ pub mod test { set_index, num_data, num_coding, - data_count: NUM_DATA, - parity_count: NUM_CODING, }) .collect(); @@ -457,17 +523,18 @@ pub mod test { for erasure_spec in spec.set_specs.iter() { let start_index = erasure_spec.set_index * NUM_DATA as u64; - let data_end = start_index + erasure_spec.num_data as u64; + let (data_end, coding_end) = ( + start_index + erasure_spec.num_data as u64, + start_index + erasure_spec.num_coding as u64, + ); for idx in start_index..data_end { let opt_bytes = blocktree.get_data_blob_bytes(slot, idx).unwrap(); assert!(opt_bytes.is_some()); } - for idx in 0..erasure_spec.num_coding { - let opt_bytes = blocktree - .get_coding_blob_bytes(slot, erasure_spec.set_index, idx as u64) - .unwrap(); + for idx in start_index..coding_end { + let opt_bytes = blocktree.get_coding_blob_bytes(slot, idx).unwrap(); assert!(opt_bytes.is_some()); } } @@ -480,7 +547,10 @@ pub mod test { #[test] fn test_recovery_with_model() { + use std::thread; + const MAX_ERASURE_SETS: u64 = 16; + const N_THREADS: usize = 2; const N_SLOTS: u64 = 10; solana_logger::setup(); @@ -493,66 +563,85 @@ pub mod test { set_index, num_data: NUM_DATA, num_coding: NUM_CODING, - parity_count: NUM_CODING, - data_count: NUM_DATA, }) .collect(); SlotSpec { slot, set_specs } }); - for slot_model in generate_ledger_model(specs) { - for erasure_set in slot_model.chunks { - let erased_coding = erasure_set.coding[0].clone(); - let erased_data = erasure_set.data[..3].to_vec(); - let info = erasure_set.coding[0] - .read() - .unwrap() - .get_coding_header() - .expect("coding info"); + let mut handles = vec![]; + let session = Arc::new(Session::default()); - let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); + for i in 0..N_THREADS { + let specs = specs.clone(); + let session = Arc::clone(&session); - blobs.push(SharedBlob::default()); - blobs.push(SharedBlob::default()); - blobs.push(SharedBlob::default()); - for blob in erasure_set.data.into_iter().skip(3) { - blobs.push(blob); - } + let handle = thread::Builder::new() + .name(i.to_string()) + .spawn(move || { + for slot_model in generate_ledger_model(specs) { + for erasure_set in slot_model.chunks { + let erased_coding = erasure_set.coding[0].clone(); + let erased_data = erasure_set.data[..3].to_vec(); - blobs.push(SharedBlob::default()); - for blob in erasure_set.coding.into_iter().skip(1) { - blobs.push(blob); - } + let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); - let mut present = vec![true; ERASURE_SET_SIZE]; - present[0] = false; - present[1] = false; - present[2] = false; - present[NUM_DATA] = false; + blobs.push(SharedBlob::default()); + blobs.push(SharedBlob::default()); + blobs.push(SharedBlob::default()); + for blob in erasure_set.data.into_iter().skip(3) { + blobs.push(blob); + } - decode_shared(&info, slot_model.slot, &mut blobs, &present) - .expect("reconstruction must succeed"); + blobs.push(SharedBlob::default()); + for blob in erasure_set.coding.into_iter().skip(1) { + blobs.push(blob); + } - for (expected, recovered) in erased_data.iter().zip(blobs.iter()) { - let expected = expected.read().unwrap(); - let mut recovered = recovered.write().unwrap(); - let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE; - recovered.set_size(data_size); - let corrupt = data_size > BLOB_DATA_SIZE; - assert!(!corrupt, "CORRUPTION {}", data_size); - assert_eq!(expected.data(), recovered.data()); - } + let size = erased_coding.read().unwrap().size() as usize; - assert_eq!( - erased_coding.read().unwrap().data(), - blobs[NUM_DATA].read().unwrap().data() - ); + let mut present = vec![true; ERASURE_SET_SIZE]; + present[0] = false; + present[1] = false; + present[2] = false; + present[NUM_DATA] = false; - debug!("passed set: {}", erasure_set.set_index); - } - debug!("passed slot: {}", slot_model.slot); + session + .reconstruct_shared_blobs( + &mut blobs, + &present, + size, + erasure_set.set_index * NUM_DATA as u64, + slot_model.slot, + ) + .expect("reconstruction must succeed"); + + for (expected, recovered) in erased_data.iter().zip(blobs.iter()) { + let expected = expected.read().unwrap(); + let mut recovered = recovered.write().unwrap(); + let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE; + recovered.set_size(data_size); + let corrupt = data_size > BLOB_DATA_SIZE; + assert!(!corrupt, "CORRUPTION"); + assert_eq!(&*expected, &*recovered); + } + + assert_eq!( + erased_coding.read().unwrap().data(), + blobs[NUM_DATA].read().unwrap().data() + ); + + debug!("passed set: {}", erasure_set.set_index); + } + debug!("passed slot: {}", slot_model.slot); + } + }) + .expect("thread build error"); + + handles.push(handle); } + + handles.into_iter().for_each(|h| h.join().unwrap()); } /// Generates a model of a ledger containing certain data and coding blobs according to a spec @@ -564,6 +653,8 @@ pub mod test { IntoIt: Iterator + Clone + 'a, S: Borrow, { + let mut coding_generator = CodingGenerator::default(); + specs.into_iter().map(move |spec| { let spec = spec.borrow(); let slot = spec.slot; @@ -574,10 +665,8 @@ pub mod test { .map(|erasure_spec| { let set_index = erasure_spec.set_index as usize; let start_index = set_index * NUM_DATA; - let (parity_count, data_count) = - (erasure_spec.parity_count, erasure_spec.data_count); - let mut blobs = generate_shared_test_blobs(0, data_count); + let mut blobs = generate_test_blobs(0, NUM_DATA); index_blobs( &blobs, &Keypair::new().pubkey(), @@ -586,14 +675,7 @@ pub mod test { 0, ); - let mut coding_blobs = encode_shared( - slot, - set_index as u64, - start_index as u64, - &blobs, - parity_count, - ) - .unwrap(); + let mut coding_blobs = coding_generator.next(&blobs); blobs.drain(erasure_spec.num_data..); coding_blobs.drain(erasure_spec.num_coding..); @@ -603,8 +685,6 @@ pub mod test { set_index: set_index as u64, data: blobs, coding: coding_blobs, - parity_count, - data_count, } }) .collect(); @@ -631,7 +711,6 @@ pub mod test { blocktree .put_coding_blob_bytes_raw( slot, - erasure_set.set_index, blob.index(), &blob.data[..blob.size() + BLOB_HEADER_SIZE], ) @@ -643,7 +722,16 @@ pub mod test { blocktree } - pub fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec { + // fn verify_test_blobs(offset: usize, blobs: &[SharedBlob]) -> bool { + // let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect(); + // + // blobs.iter().enumerate().all(|(i, blob)| { + // let blob = blob.read().unwrap(); + // blob.index() as usize == i + offset && blob.data() == &data[..] + // }) + // } + // + fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec { let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect(); let blobs: Vec<_> = (0..num_blobs) @@ -660,26 +748,35 @@ pub mod test { index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0); blobs - .into_iter() - .map(|shared| shared.read().unwrap().clone()) - .collect() } - pub fn generate_shared_test_blobs(offset: usize, num_blobs: usize) -> Vec { - let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect(); + impl Session { + fn reconstruct_shared_blobs( + &self, + blobs: &mut [SharedBlob], + present: &[bool], + size: usize, + block_start_idx: u64, + slot: u64, + ) -> Result<(Vec, Vec)> { + let mut locks: Vec> = blobs + .iter() + .map(|shared_blob| shared_blob.write().unwrap()) + .collect(); - let blobs: Vec<_> = (0..num_blobs) - .into_iter() - .map(|_| { - let mut blob = Blob::default(); - blob.data_mut().copy_from_slice(&data); - blob.set_size(data.len()); - Arc::new(RwLock::new(blob)) - }) - .collect(); + let mut slices: Vec<_> = locks + .iter_mut() + .enumerate() + .map(|(i, blob)| { + if i < NUM_DATA { + &mut blob.data[..size] + } else { + &mut blob.data_mut()[..size] + } + }) + .collect(); - index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0); - - blobs + self.reconstruct_blobs(&mut slices, present, size, block_start_idx, slot) + } } } diff --git a/core/src/packet.rs b/core/src/packet.rs index 859ec9498d..8c14b97180 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -1,9 +1,6 @@ //! The `packet` module defines data structures and methods to pull data from the network. -use crate::{ - erasure::CodingHeader, - recvmmsg::{recv_mmsg, NUM_RCVMMSGS}, - result::{Error, Result}, -}; +use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; +use crate::result::{Error, Result}; use bincode; use byteorder::{ByteOrder, LittleEndian}; use serde::Serialize; @@ -336,7 +333,7 @@ pub fn packets_to_blobs>(packets: &[T]) -> Vec { } macro_rules! range { - ($prev:expr, $type:ty) => { + ($prev:expr, $type:ident) => { $prev..$prev + size_of::<$type>() }; } @@ -346,13 +343,17 @@ const FORWARDED_RANGE: std::ops::Range = range!(SIGNATURE_RANGE.end, bool const PARENT_RANGE: std::ops::Range = range!(FORWARDED_RANGE.end, u64); const SLOT_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); -const CODING_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Option); -const ID_RANGE: std::ops::Range = range!(CODING_RANGE.end, Pubkey); +const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); const FLAGS_RANGE: std::ops::Range = range!(ID_RANGE.end, u32); const SIZE_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, u64); -pub const BLOB_HEADER_SIZE: usize = SIZE_RANGE.end; +macro_rules! align { + ($x:expr, $align:expr) => { + $x + ($align - 1) & !($align - 1) + }; +} +pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, BLOB_DATA_ALIGN); // make sure data() is safe for erasure pub const SIGNABLE_START: usize = PARENT_RANGE.start; pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2; @@ -421,14 +422,6 @@ impl Blob { self.data[ID_RANGE].copy_from_slice(id.as_ref()) } - pub fn set_coding_header(&mut self, header: &CodingHeader) { - bincode::serialize_into(&mut self.data[CODING_RANGE], &Some(*header)).unwrap(); - } - - pub fn get_coding_header(&self) -> Option { - bincode::deserialize(&self.data[CODING_RANGE]).unwrap() - } - /// Used to determine whether or not this blob should be forwarded in retransmit /// A bool is used here instead of a flag because this item is not intended to be signed when /// blob signatures are introduced @@ -475,10 +468,10 @@ impl Blob { } pub fn data(&self) -> &[u8] { - &self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] + &self.data[BLOB_HEADER_SIZE..] } pub fn data_mut(&mut self) -> &mut [u8] { - &mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] + &mut self.data[BLOB_HEADER_SIZE..] } pub fn size(&self) -> usize { let size = self.data_size() as usize; diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 302762b9c6..0c34a5a5f8 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -4,7 +4,7 @@ use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; use crate::leader_schedule_cache::LeaderScheduleCache; -use crate::packet::{Blob, SharedBlob}; +use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; @@ -62,14 +62,19 @@ pub fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc) -> Result } }))?; - blocktree.put_many_coding_blobs(blobs.iter().filter_map(move |blob| { - if blob.is_coding() { - Some(&**blob) - } else { - None - } - }))?; + for blob in blobs { + // TODO: Once the original leader signature is added to the blob, make sure that + // the blob was originally generated by the expected leader for this slot + // Insert the new blob into block tree + if blob.is_coding() { + blocktree.put_coding_blob_bytes( + blob.slot(), + blob.index(), + &blob.data[..BLOB_HEADER_SIZE + blob.size()], + )?; + } + } Ok(()) }