diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index e0984b72c8..07f2b07bc7 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -88,6 +88,7 @@ pub struct Blocktree { erasure_cf: LedgerColumn, erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, + index_cf: LedgerColumn, batch_processor: Arc>, session: Arc, pub new_blobs_signals: Vec>, @@ -107,6 +108,8 @@ pub const ERASURE_META_CF: &str = "erasure_meta"; pub const ORPHANS_CF: &str = "orphans"; // Column family for root data pub const ROOT_CF: &str = "root"; +/// Column family for indexes +pub const INDEX_CF: &str = "index"; impl Blocktree { /// Opens a Ledger in directory, provides "infinite" window of blobs @@ -139,6 +142,7 @@ impl Blocktree { // the head of a detached chain of slots, i.e. a slot with no // known parent let orphans_cf = db.column(); + let index_cf = db.column(); // setup erasure let session = Arc::new(erasure::Session::default()); @@ -153,6 +157,7 @@ impl Blocktree { erasure_cf, erasure_meta_cf, orphans_cf, + index_cf, session, new_blobs_signals: vec![], batch_processor, @@ -224,6 +229,7 @@ impl Blocktree { Ok(slot_iterator.take_while(move |((blob_slot, _), _)| *blob_slot == slot)) } + /// Use this function to write data blobs to blocktree pub fn write_shared_blobs(&self, shared_blobs: I) -> Result<()> where I: IntoIterator, @@ -313,96 +319,84 @@ impl Blocktree { let mut write_batch = batch_processor.batch()?; let new_blobs: Vec<_> = new_blobs.into_iter().collect(); - let mut recovered_data = vec![]; let mut prev_inserted_blob_datas = HashMap::new(); + let mut prev_inserted_coding = HashMap::new(); + // A map from slot to a 2-tuple of metadata: (working copy, backup copy), // so we can detect changes to the slot metadata later let mut slot_meta_working_set = HashMap::new(); let mut erasure_meta_working_set = HashMap::new(); + let mut index_working_set = HashMap::new(); for blob in new_blobs.iter() { let blob = blob.borrow(); + assert!(!blob.is_coding()); + let blob_slot = blob.slot(); + let _ = index_working_set.entry(blob_slot).or_insert_with(|| { + self.index_cf + .get(blob_slot) + .unwrap() + .unwrap_or_else(|| Index::new(blob_slot)) + }); + let set_index = ErasureMeta::set_index_for(blob.index()); - erasure_meta_working_set - .entry((blob_slot, set_index)) - .or_insert_with(|| { - self.erasure_meta_cf - .get((blob_slot, set_index)) - .expect("Expect database get to succeed") - .unwrap_or_else(|| ErasureMeta::new(set_index)) - }); - } - - insert_data_blob_batch( - new_blobs.iter().map(Borrow::borrow), - &db, - &mut slot_meta_working_set, - &mut erasure_meta_working_set, - &mut prev_inserted_blob_datas, - &mut write_batch, - )?; - - for (&(slot, _), erasure_meta) in erasure_meta_working_set.iter_mut() { - if let Some((data, coding)) = try_erasure_recover( - &db, - &self.session, - &erasure_meta, - slot, - &prev_inserted_blob_datas, - None, - )? { - for data_blob in data { - recovered_data.push(data_blob); - } - - for coding_blob in coding { - erasure_meta.set_coding_present(coding_blob.index(), true); - - write_batch.put_bytes::( - (coding_blob.slot(), coding_blob.index()), - &coding_blob.data[..BLOB_HEADER_SIZE + coding_blob.size()], - )?; - } + if let Some(erasure_meta) = self.erasure_meta_cf.get((blob_slot, set_index))? { + erasure_meta_working_set.insert((blob_slot, set_index), erasure_meta); } } - insert_data_blob_batch( - recovered_data.iter(), - &db, - &mut slot_meta_working_set, - &mut erasure_meta_working_set, - &mut prev_inserted_blob_datas, + let recovered_data_opt = handle_recovery( + &self.db, + &self.session, + &erasure_meta_working_set, + &mut index_working_set, + &prev_inserted_blob_datas, + &mut prev_inserted_coding, &mut write_batch, )?; + if let Some(recovered_data) = recovered_data_opt { + insert_data_blob_batch( + recovered_data + .iter() + .chain(new_blobs.iter().map(Borrow::borrow)), + &self.db, + &mut slot_meta_working_set, + &mut index_working_set, + &mut prev_inserted_blob_datas, + &mut write_batch, + )?; + } else { + insert_data_blob_batch( + new_blobs.iter().map(Borrow::borrow), + &db, + &mut slot_meta_working_set, + &mut index_working_set, + &mut prev_inserted_blob_datas, + &mut write_batch, + )?; + } + // Handle chaining for the working set handle_chaining(&db, &mut write_batch, &slot_meta_working_set)?; - let mut should_signal = false; - let mut newly_completed_slots = vec![]; - // Check if any metadata was changed, if so, insert the new version of the - // metadata into the write batch - for (slot, (meta, meta_backup)) in slot_meta_working_set.iter() { - let meta: &SlotMeta = &RefCell::borrow(&*meta); - if !self.completed_slots_senders.is_empty() - && is_newly_completed_slot(meta, meta_backup) - { - newly_completed_slots.push(*slot); - } - // Check if the working copy of the metadata has changed - if Some(meta) != meta_backup.as_ref() { - should_signal = should_signal || slot_has_updates(meta, &meta_backup); - write_batch.put::(*slot, &meta)?; - } - } + let (should_signal, newly_completed_slots) = prepare_signals( + &slot_meta_working_set, + &self.completed_slots_senders, + &mut write_batch, + )?; for ((slot, set_index), erasure_meta) in erasure_meta_working_set { write_batch.put::((slot, set_index), &erasure_meta)?; } + for (&slot, index) in index_working_set.iter() { + write_batch.put::(slot, index)?; + } + batch_processor.write(write_batch)?; if should_signal { @@ -411,28 +405,12 @@ impl Blocktree { } } - if !self.completed_slots_senders.is_empty() && !newly_completed_slots.is_empty() { - let mut slots: Vec<_> = (0..self.completed_slots_senders.len() - 1) - .map(|_| newly_completed_slots.clone()) - .collect(); - - slots.push(newly_completed_slots); - - for (signal, slots) in self.completed_slots_senders.iter().zip(slots.into_iter()) { - let res = signal.try_send(slots); - if let Err(TrySendError::Full(_)) = res { - datapoint_error!( - "blocktree_error", - ( - "error", - "Unable to send newly completed slot because channel is full" - .to_string(), - String - ), - ); - } - } - } + send_signals( + &self.new_blobs_signals, + &self.completed_slots_senders, + should_signal, + newly_completed_slots, + )?; Ok(()) } @@ -499,25 +477,25 @@ impl Blocktree { Ok((total_blobs, total_current_size as u64)) } + pub fn get_index(&self, slot: u64) -> Result> { + self.index_cf.get(slot) + } + pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { self.erasure_cf.get_bytes((slot, index)) } - pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { - let set_index = ErasureMeta::set_index_for(index); + pub fn delete_coding_blob(&self, slot: u64, blob_index: u64) -> Result<()> { let mut batch_processor = self.batch_processor.write().unwrap(); - let mut erasure_meta = self - .erasure_meta_cf - .get((slot, set_index))? - .unwrap_or_else(|| ErasureMeta::new(set_index)); + let mut index = self.index_cf.get(slot)?.unwrap_or_else(|| Index::new(slot)); - erasure_meta.set_coding_present(index, false); + index.coding_mut().set_present(blob_index, false); let mut batch = batch_processor.batch()?; - batch.delete::((slot, index))?; - batch.put::((slot, set_index), &erasure_meta)?; + batch.delete::((slot, blob_index))?; + batch.put::(slot, &index)?; batch_processor.write(batch)?; Ok(()) @@ -541,67 +519,139 @@ impl Blocktree { /// this function will insert coding blobs and also automatically track erasure-related /// metadata. If recovery is available it will be done - pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - let set_index = ErasureMeta::set_index_for(index); + pub fn put_coding_blob(&self, blob: &Blob) -> Result<()> { + self.put_many_coding_blobs(vec![blob]) + } + + /// this function will insert coding blobs and also automatically track erasure-related + /// metadata. If recovery is available it will be done + pub fn put_many_coding_blobs(&self, blobs: I) -> Result<()> + where + I: IntoIterator, + I::Item: Borrow, + { let mut batch_processor = self.batch_processor.write().unwrap(); - - let mut erasure_meta = self - .erasure_meta_cf - .get((slot, set_index))? - .unwrap_or_else(|| ErasureMeta::new(set_index)); - - erasure_meta.set_coding_present(index, true); - erasure_meta.set_size(bytes.len() - BLOB_HEADER_SIZE); - let mut writebatch = batch_processor.batch()?; - writebatch.put_bytes::((slot, index), bytes)?; + let mut erasure_metas = HashMap::new(); + let mut slot_meta_working_set = HashMap::new(); + let mut index_working_set = HashMap::new(); - let recovered_data = { - if let Some((data, coding)) = try_erasure_recover( - &self.db, - &self.session, - &erasure_meta, - slot, - &HashMap::new(), - Some((index, bytes)), - )? { - let mut erasure_meta_working_set = HashMap::new(); - erasure_meta_working_set.insert((slot, set_index), erasure_meta); - erasure_meta = *erasure_meta_working_set.values().next().unwrap(); + let mut prev_inserted_coding = HashMap::new(); + let mut prev_inserted_blob_datas = HashMap::new(); - for coding_blob in coding { - erasure_meta.set_coding_present(coding_blob.index(), true); + for blob_item in blobs { + let blob = blob_item.borrow(); + assert!(blob.is_coding()); - writebatch.put_bytes::( - (coding_blob.slot(), coding_blob.index()), - &coding_blob.data[..BLOB_HEADER_SIZE + coding_blob.size()], - )?; - } - Some(data) - } else { - None - } - }; + let (blob_slot, blob_index, blob_size) = + (blob.slot(), blob.index(), blob.size() as usize); + let set_index = blob_index / crate::erasure::NUM_CODING as u64; - writebatch.put::((slot, set_index), &erasure_meta)?; - batch_processor.write(writebatch)?; - drop(batch_processor); - if let Some(data) = recovered_data { - if !data.is_empty() { - self.insert_data_blobs(&data)?; - } + writebatch.put_bytes::( + (blob_slot, blob_index), + &blob.data[..BLOB_HEADER_SIZE + blob_size], + )?; + + let index = index_working_set.entry(blob_slot).or_insert_with(|| { + self.index_cf + .get(blob_slot) + .unwrap() + .unwrap_or_else(|| Index::new(blob_slot)) + }); + + let erasure_meta = erasure_metas + .entry((blob_slot, set_index)) + .or_insert_with(|| { + self.erasure_meta_cf + .get((blob_slot, set_index)) + .expect("Expect database get to succeed") + .unwrap_or_else(|| ErasureMeta::new(set_index)) + }); + + // size should be the same for all coding blobs, else there's a bug + erasure_meta.set_size(blob_size); + index.coding_mut().set_present(blob_index, true); + + // `or_insert_with` used to prevent stack overflow + prev_inserted_coding + .entry((blob_slot, blob_index)) + .or_insert_with(|| blob.clone()); } + let recovered_data_opt = handle_recovery( + &self.db, + &self.session, + &erasure_metas, + &mut index_working_set, + &prev_inserted_blob_datas, + &mut prev_inserted_coding, + &mut writebatch, + )?; + + if let Some(recovered_data) = recovered_data_opt { + insert_data_blob_batch( + recovered_data.iter(), + &self.db, + &mut slot_meta_working_set, + &mut index_working_set, + &mut prev_inserted_blob_datas, + &mut writebatch, + )?; + + // Handle chaining for the working set + handle_chaining(&self.db, &mut writebatch, &slot_meta_working_set)?; + } + + let (should_signal, newly_completed_slots) = prepare_signals( + &slot_meta_working_set, + &self.completed_slots_senders, + &mut writebatch, + )?; + + for ((slot, set_index), erasure_meta) in erasure_metas { + writebatch.put::((slot, set_index), &erasure_meta)?; + } + + for (&slot, index) in index_working_set.iter() { + writebatch.put::(slot, index)?; + } + + batch_processor.write(writebatch)?; + + send_signals( + &self.new_blobs_signals, + &self.completed_slots_senders, + should_signal, + newly_completed_slots, + )?; + Ok(()) } - pub fn put_many_coding_blob_bytes(&self, coding_blobs: &[SharedBlob]) -> Result<()> { + pub fn put_shared_coding_blobs(&self, shared_blobs: I) -> Result<()> + where + I: IntoIterator, + I::Item: Borrow, + { + let blobs: Vec<_> = shared_blobs + .into_iter() + .map(move |s| s.borrow().clone()) + .collect(); + + let locks: Vec<_> = blobs.iter().map(move |b| b.read().unwrap()).collect(); + + let blob_refs = locks.iter().map(|s| &**s); + + self.put_many_coding_blobs(blob_refs) + } + + pub fn put_many_coding_blob_bytes_raw(&self, coding_blobs: &[SharedBlob]) -> Result<()> { for shared_coding_blob in coding_blobs { let blob = shared_coding_blob.read().unwrap(); assert!(blob.is_coding()); let size = blob.size() + BLOB_HEADER_SIZE; - self.put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size])? + self.put_coding_blob_bytes_raw(blob.slot(), blob.index(), &blob.data[..size])? } Ok(()) @@ -920,7 +970,7 @@ fn insert_data_blob_batch<'a, I>( new_blobs: I, db: &Database, slot_meta_working_set: &mut HashMap>, Option)>, - erasure_meta_working_set: &mut HashMap<(u64, u64), ErasureMeta>, + index_working_set: &mut HashMap, prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, write_batch: &mut WriteBatch, ) -> Result<()> @@ -937,10 +987,11 @@ where ); if inserted { - erasure_meta_working_set - .get_mut(&(blob.slot(), ErasureMeta::set_index_for(blob.index()))) - .unwrap() - .set_data_present(blob.index(), true); + index_working_set + .get_mut(&blob.slot()) + .expect("Index must be present for all data blobs") + .data_mut() + .set_present(blob.index(), true); } } @@ -1115,6 +1166,72 @@ fn should_insert_blob( true } +fn send_signals( + new_blobs_signals: &[SyncSender], + completed_slots_senders: &[SyncSender>], + should_signal: bool, + newly_completed_slots: Vec, +) -> Result<()> { + if should_signal { + for signal in new_blobs_signals { + let _ = signal.try_send(true); + } + } + + if !completed_slots_senders.is_empty() && !newly_completed_slots.is_empty() { + let mut slots: Vec<_> = (0..completed_slots_senders.len() - 1) + .map(|_| newly_completed_slots.clone()) + .collect(); + + slots.push(newly_completed_slots); + + for (signal, slots) in completed_slots_senders.iter().zip(slots.into_iter()) { + let res = signal.try_send(slots); + if let Err(TrySendError::Full(_)) = res { + solana_metrics::submit( + solana_metrics::influxdb::Point::new("blocktree_error") + .add_field( + "error", + solana_metrics::influxdb::Value::String( + "Unable to send newly completed slot because channel is full" + .to_string(), + ), + ) + .to_owned(), + log::Level::Error, + ); + } + } + } + + Ok(()) +} + +fn prepare_signals( + slot_meta_working_set: &HashMap>, Option)>, + completed_slots_senders: &[SyncSender>], + write_batch: &mut WriteBatch, +) -> Result<(bool, Vec)> { + let mut should_signal = false; + let mut newly_completed_slots = vec![]; + + // Check if any metadata was changed, if so, insert the new version of the + // metadata into the write batch + for (slot, (meta, meta_backup)) in slot_meta_working_set.iter() { + let meta: &SlotMeta = &RefCell::borrow(&*meta); + if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) { + newly_completed_slots.push(*slot); + } + // Check if the working copy of the metadata has changed + if Some(meta) != meta_backup.as_ref() { + should_signal = should_signal || slot_has_updates(meta, &meta_backup); + write_batch.put::(*slot, &meta)?; + } + } + + Ok((should_signal, newly_completed_slots)) +} + // 1) Find the slot metadata in the cache of dirty slot metadata we've previously touched, // else: // 2) Search the database for that slot metadata. If still no luck, then: @@ -1356,20 +1473,132 @@ fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option, + index_working_set: &mut HashMap, + prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, + prev_inserted_coding: &mut HashMap<(u64, u64), Blob>, + writebatch: &mut WriteBatch, +) -> Result>> { + use solana_sdk::signature::Signable; + + let (mut recovered_data, mut recovered_coding) = (vec![], vec![]); + + for (&(slot, _), erasure_meta) in erasure_metas.iter() { + let index = index_working_set.get_mut(&slot).expect("Index"); + + if let Some((mut data, coding)) = try_erasure_recover( + db, + session, + &erasure_meta, + index, + slot, + &prev_inserted_blob_datas, + &prev_inserted_coding, + )? { + for blob in data.iter() { + debug!( + "[handle_recovery] recovered blob at ({}, {})", + blob.slot(), + blob.index() + ); + + let blob_index = blob.index(); + let blob_slot = blob.slot(); + + assert_eq!(blob_slot, slot); + + assert!( + blob_index >= erasure_meta.start_index() + && blob_index < erasure_meta.end_indexes().0 + ); + } + + recovered_data.append(&mut data); + + for coding_blob in coding { + if !index.coding().is_present(coding_blob.index()) { + recovered_coding.push(coding_blob); + } + } + } + } + + if !recovered_coding.is_empty() { + info!( + "[handle_recovery] recovered {} coding blobs", + recovered_coding.len() + ); + + for coding_blob in recovered_coding { + let (blob_slot, blob_index) = (coding_blob.slot(), coding_blob.index()); + let index = index_working_set.get_mut(&blob_slot).expect("Index"); + + index.coding_mut().set_present(coding_blob.index(), true); + + writebatch.put_bytes::( + (blob_slot, blob_index), + &coding_blob.data[..coding_blob.data_size() as usize], + )?; + + prev_inserted_coding.insert((blob_slot, blob_index), coding_blob); + } + } + + if !recovered_data.is_empty() { + let mut new_data = vec![]; + + for blob in recovered_data { + let index = index_working_set + .get_mut(&blob.slot()) + .expect("Index must have been present if blob was recovered"); + + let (blob_slot, blob_index) = (blob.slot(), blob.index()); + + if !index.data().is_present(blob_index) { + if blob.verify() { + trace!( + "[handle_recovery] successful verification at slot = {}, index={}", + blob_slot, + blob_index + ); + + new_data.push(blob); + } else { + warn!( + "[handle_recovery] failed verification at slot={}, index={}, discarding", + blob.slot(), + blob.index() + ); + } + } + } + + info!("[handle_recovery] recovered {} data blobs", new_data.len()); + + Ok(Some(new_data)) + } else { + Ok(None) + } +} + /// Attempts recovery using erasure coding fn try_erasure_recover( db: &Database, session: &Session, erasure_meta: &ErasureMeta, + index: &Index, slot: u64, prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, - new_coding_blob: Option<(u64, &[u8])>, + prev_inserted_coding: &HashMap<(u64, u64), Blob>, ) -> Result, Vec)>> { use crate::erasure::ERASURE_SET_SIZE; let set_index = erasure_meta.set_index; let start_index = erasure_meta.start_index(); - let (data_end_index, _) = erasure_meta.end_indexes(); + let (data_end_index, coding_end_idx) = erasure_meta.end_indexes(); let submit_metrics = |attempted: bool, status: String| { datapoint_info!( @@ -1382,15 +1611,16 @@ fn try_erasure_recover( ); }; - let blobs = match erasure_meta.status() { + let blobs = match erasure_meta.status(index) { ErasureMetaStatus::CanRecover => { let erasure_result = recover( db, session, slot, erasure_meta, + index, prev_inserted_blob_datas, - new_coding_blob, + prev_inserted_coding, ); match erasure_result { @@ -1399,7 +1629,11 @@ fn try_erasure_recover( assert_eq!( ERASURE_SET_SIZE, - recovered + (erasure_meta.num_coding() + erasure_meta.num_data()) as usize, + recovered + + index.data().present_in_bounds(start_index..data_end_index) + + index + .coding() + .present_in_bounds(start_index..coding_end_idx), "Recovery should always complete a set" ); @@ -1439,9 +1673,10 @@ fn try_erasure_recover( ErasureMetaStatus::DataFull => { submit_metrics(false, "complete".into()); - debug!( + trace!( "[try_erasure] slot: {}, set_index: {}, set full", - slot, set_index, + slot, + set_index, ); None @@ -1456,44 +1691,31 @@ fn recover( session: &Session, slot: u64, erasure_meta: &ErasureMeta, + index: &Index, prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, - new_coding: Option<(u64, &[u8])>, + prev_inserted_coding: &HashMap<(u64, u64), Blob>, ) -> Result<(Vec, Vec)> { - use crate::erasure::ERASURE_SET_SIZE; + use crate::erasure::{ERASURE_SET_SIZE, NUM_DATA}; let start_idx = erasure_meta.start_index(); let size = erasure_meta.size(); let data_cf = db.column::(); let erasure_cf = db.column::(); + debug!( + "[recover] Attempting recovery: slot = {}, start_idx = {}, size = {}, erasure_meta = {:?}", + slot, start_idx, size, erasure_meta + ); + let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes(); let present = &mut [true; ERASURE_SET_SIZE]; let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); - for i in start_idx..coding_end_idx { - if erasure_meta.is_coding_present(i) { - let mut blob_bytes = match new_coding { - Some((new_coding_index, bytes)) if new_coding_index == i => bytes.to_vec(), - _ => erasure_cf - .get_bytes((slot, i))? - .expect("ErasureMeta must have no false positives"), - }; - - blob_bytes.drain(..BLOB_HEADER_SIZE); - - blobs.push(blob_bytes); - } else { - let set_relative_idx = erasure_meta.coding_index_in_set(i).unwrap() as usize; - blobs.push(vec![0; size]); - present[set_relative_idx] = false; - } - } - for i in start_idx..data_end_idx { - let set_relative_idx = erasure_meta.data_index_in_set(i).unwrap() as usize; + if index.data().is_present(i) { + trace!("[recover] present data blob at {}", i); - if erasure_meta.is_data_present(i) { let mut blob_bytes = match prev_inserted_blob_datas.get(&(slot, i)) { Some(bytes) => bytes.to_vec(), None => data_cf @@ -1504,10 +1726,35 @@ fn recover( // If data is too short, extend it with zeroes blob_bytes.resize(size, 0u8); - blobs.insert(set_relative_idx, blob_bytes); + blobs.push(blob_bytes); } else { - blobs.insert(set_relative_idx, vec![0u8; size]); - // data erasures must come before any coding erasures if present + trace!("[recover] absent data blob at {}", i); + + let set_relative_idx = (i - start_idx) as usize; + blobs.push(vec![0u8; size]); + present[set_relative_idx] = false; + } + } + + for i in start_idx..coding_end_idx { + if index.coding().is_present(i) { + trace!("[recover] present coding blob at {}", i); + let blob = match prev_inserted_coding.get(&(slot, i)) { + Some(blob) => (*blob).clone(), + _ => { + let bytes = erasure_cf + .get_bytes((slot, i))? + .expect("ErasureMeta must have no false positives"); + + Blob::new(&bytes) + } + }; + + blobs.push(blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size].to_vec()); + } else { + trace!("[recover] absent coding blob at {}", i); + let set_relative_idx = (i - start_idx) as usize + NUM_DATA; + blobs.push(vec![0; size]); present[set_relative_idx] = false; } } @@ -1515,11 +1762,9 @@ fn recover( let (recovered_data, recovered_coding) = session.reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?; - trace!( + debug!( "[recover] reconstruction OK slot: {}, indexes: [{},{})", - slot, - start_idx, - data_end_idx + slot, start_idx, data_end_idx ); Ok((recovered_data, recovered_coding)) @@ -2703,9 +2948,9 @@ pub mod tests { // Randomly pick whether to insert erasure or coding blobs first if rng.gen_bool(0.5) { blocktree.write_blobs(slot_blobs).unwrap(); - blocktree.put_many_coding_blob_bytes(&coding_blobs).unwrap(); + blocktree.put_shared_coding_blobs(&coding_blobs).unwrap(); } else { - blocktree.put_many_coding_blob_bytes(&coding_blobs).unwrap(); + blocktree.put_shared_coding_blobs(&coding_blobs).unwrap(); blocktree.write_blobs(slot_blobs).unwrap(); } } @@ -3194,6 +3439,7 @@ pub mod tests { use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec}; use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA}; use rand::{thread_rng, Rng}; + use solana_sdk::signature::Signable; use std::sync::RwLock; impl Into for Blob { @@ -3204,8 +3450,9 @@ pub mod tests { #[test] fn test_erasure_meta_accuracy() { - use crate::erasure::ERASURE_SET_SIZE; - use ErasureMetaStatus::{DataFull, StillNeed}; + use ErasureMetaStatus::DataFull; + + solana_logger::setup(); let path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&path).unwrap(); @@ -3214,7 +3461,12 @@ pub mod tests { let num_blobs = NUM_DATA as u64 * 2; let slot = 0; - let (blobs, _) = make_slot_entries(slot, 0, num_blobs); + let (mut blobs, _) = make_slot_entries(slot, 0, num_blobs); + let keypair = Keypair::new(); + blobs.iter_mut().for_each(|blob| { + blob.set_id(&keypair.pubkey()); + blob.sign(&keypair); + }); let shared_blobs: Vec<_> = blobs .iter() .cloned() @@ -3227,81 +3479,44 @@ pub mod tests { .erasure_meta(slot, 0) .expect("DB get must succeed"); - assert!(erasure_meta_opt.is_some()); - let erasure_meta = erasure_meta_opt.unwrap(); - - let should_need = ERASURE_SET_SIZE - NUM_CODING - 2; - match erasure_meta.status() { - StillNeed(n) => assert_eq!(n, should_need), - _ => panic!("Should still need more blobs"), - }; + assert!(erasure_meta_opt.is_none()); blocktree.write_blobs(&blobs[2..NUM_DATA]).unwrap(); - let erasure_meta = blocktree - .erasure_meta(slot, 0) - .expect("DB get must succeed") - .unwrap(); - - assert_eq!(erasure_meta.status(), DataFull); - // insert all coding blobs in first set let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]); - for shared_coding_blob in coding_blobs { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; - blocktree - .put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size]) - .unwrap(); - } + blocktree + .put_shared_coding_blobs(coding_blobs.iter()) + .unwrap(); let erasure_meta = blocktree .erasure_meta(slot, 0) .expect("DB get must succeed") .unwrap(); + let index = blocktree.get_index(slot).unwrap().unwrap(); - assert_eq!(erasure_meta.status(), DataFull); + assert_eq!(erasure_meta.status(&index), DataFull); - // insert blobs in the 2nd set until recovery should be possible given all coding blobs + // insert blob in the 2nd set so that recovery should be possible given all coding blobs let set2 = &blobs[NUM_DATA..]; - let mut end = 1; - let blobs_needed = ERASURE_SET_SIZE - NUM_CODING; - while end < blobs_needed { - blocktree.write_blobs(&set2[end - 1..end]).unwrap(); - - let erasure_meta = blocktree - .erasure_meta(slot, 1) - .expect("DB get must succeed") - .unwrap(); - - match erasure_meta.status() { - StillNeed(n) => assert_eq!(n, blobs_needed - end), - _ => panic!("Should still need more blobs"), - }; - - end += 1; - } + blocktree.write_blobs(&set2[..1]).unwrap(); // insert all coding blobs in 2nd set. Should trigger recovery - let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); let coding_blobs = coding_generator.next(&shared_blobs[NUM_DATA..]); - for shared_coding_blob in coding_blobs { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; - blocktree - .put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size]) - .unwrap(); - } + blocktree + .put_shared_coding_blobs(coding_blobs.iter()) + .unwrap(); let erasure_meta = blocktree .erasure_meta(slot, 1) .expect("DB get must succeed") .unwrap(); + let index = blocktree.get_index(slot).unwrap().unwrap(); - assert_eq!(erasure_meta.status(), DataFull); + assert_eq!(erasure_meta.status(&index), DataFull); // remove coding blobs, erasure meta should still report being full let (start_idx, coding_end_idx) = @@ -3315,8 +3530,9 @@ pub mod tests { .erasure_meta(slot, 1) .expect("DB get must succeed") .unwrap(); + let index = blocktree.get_index(slot).unwrap().unwrap(); - assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); + assert_eq!(erasure_meta.status(&index), DataFull); } #[test] @@ -3334,31 +3550,30 @@ pub mod tests { .into_iter() .map(Blob::into) .collect::>(); + let keypair = Keypair::new(); + data_blobs.iter().for_each(|blob: &Arc>| { + let mut b = blob.write().unwrap(); + b.set_id(&keypair.pubkey()); + b.sign(&keypair); + }); let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() { - let focused_index = (set_index + 1) * NUM_DATA - 1; let coding_blobs = coding_generator.next(&data_blobs); assert_eq!(coding_blobs.len(), NUM_CODING); - let deleted_data = data_blobs[NUM_DATA - 1].clone(); + let deleted_data = data_blobs[0].clone(); blocktree - .write_shared_blobs(&data_blobs[..NUM_DATA - 1]) + .write_shared_blobs(data_blobs.iter().skip(1)) .unwrap(); // This should trigger recovery of the missing data blob - for shared_coding_blob in coding_blobs { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; - - blocktree - .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size]) - .expect("Inserting coding blobs must succeed"); - (slot, blob.index()); - } + blocktree + .put_shared_coding_blobs(coding_blobs.iter()) + .unwrap(); // Verify the slot meta let slot_meta = blocktree.meta(slot).unwrap().unwrap(); @@ -3379,12 +3594,14 @@ pub mod tests { .get((slot, set_index as u64)) .expect("Erasure Meta should be present") .unwrap(); + let index = blocktree.get_index(slot).unwrap().unwrap(); + let status = erasure_meta.status(&index); - assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); + assert_eq!(status, ErasureMetaStatus::DataFull); let retrieved_data = blocktree .data_cf - .get_bytes((slot, focused_index as u64)) + .get_bytes((slot, erasure_meta.start_index())) .unwrap(); assert!(retrieved_data.is_some()); @@ -3392,6 +3609,10 @@ pub mod tests { let data_blob = Blob::new(&retrieved_data.unwrap()); assert_eq!(&data_blob, &*deleted_data.read().unwrap()); + //assert_eq!( + //&retrieved_data.unwrap()[..], + //deleted_data.read().unwrap().data() + //); } drop(blocktree); @@ -3399,6 +3620,74 @@ pub mod tests { Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction"); } + #[test] + fn test_recovery_is_accurate() { + const SLOT: u64 = 0; + const SET_INDEX: u64 = 0; + + solana_logger::setup(); + + let ledger_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&ledger_path).unwrap(); + let data_blobs = make_slot_entries(SLOT, 0, NUM_DATA as u64) + .0 + .into_iter() + .map(Blob::into) + .collect::>(); + + let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + + let shared_coding_blobs = coding_generator.next(&data_blobs); + assert_eq!(shared_coding_blobs.len(), NUM_CODING); + + let mut prev_coding = HashMap::new(); + let prev_data = HashMap::new(); + let mut index = Index::new(SLOT); + let mut erasure_meta = ErasureMeta::new(SET_INDEX); + erasure_meta.size = shared_coding_blobs[0].read().unwrap().size(); + + for shared_blob in shared_coding_blobs.iter() { + let blob = shared_blob.read().unwrap(); + + prev_coding.insert((blob.slot(), blob.index()), blob.clone()); + } + + index + .coding_mut() + .set_many_present((0..NUM_CODING as u64).zip(std::iter::repeat(true))); + + let (recovered_data, recovered_coding) = recover( + &blocktree.db, + &blocktree.session, + SLOT, + &erasure_meta, + &index, + &prev_data, + &prev_coding, + ) + .expect("Successful recovery"); + + for (original, recovered) in data_blobs.iter().zip(recovered_data.iter()) { + let original = original.read().unwrap(); + + assert_eq!(original.slot(), recovered.slot()); + assert_eq!(original.index(), recovered.index()); + + assert_eq!(original.data(), recovered.data()); + assert_eq!(&*original, recovered); + } + + for (original, recovered) in shared_coding_blobs.iter().zip(recovered_coding.iter()) { + let original = original.read().unwrap(); + + assert_eq!(original.slot(), recovered.slot()); + assert_eq!(original.index(), recovered.index()); + + assert_eq!(original.data(), recovered.data()); + assert_eq!(&*original, recovered); + } + } + #[test] fn test_recovery_fails_safely() { const SLOT: u64 = 0; @@ -3419,14 +3708,9 @@ pub mod tests { assert_eq!(shared_coding_blobs.len(), NUM_CODING); // Insert coding blobs except 1 and no data. Not enough to do recovery - for shared_blob in shared_coding_blobs.iter().skip(1) { - let blob = shared_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; - - blocktree - .put_coding_blob_bytes(SLOT, blob.index(), &blob.data[..size]) - .expect("Inserting coding blobs must succeed"); - } + blocktree + .put_shared_coding_blobs(shared_coding_blobs.iter().skip(1)) + .unwrap(); // try recovery even though there aren't enough blobs let erasure_meta = blocktree @@ -3435,17 +3719,21 @@ pub mod tests { .unwrap() .unwrap(); - assert_eq!(erasure_meta.status(), ErasureMetaStatus::StillNeed(1)); + let index = blocktree.index_cf.get(SLOT).unwrap().unwrap(); + + assert_eq!(erasure_meta.status(&index), ErasureMetaStatus::StillNeed(1)); let prev_inserted_blob_datas = HashMap::new(); + let prev_inserted_coding = HashMap::new(); let attempt_result = try_erasure_recover( &blocktree.db, &blocktree.session, &erasure_meta, + &index, SLOT, &prev_inserted_blob_datas, - None, + &prev_inserted_coding, ); assert!(attempt_result.is_ok()); @@ -3528,12 +3816,14 @@ pub mod tests { // will try to write each erasure set in a random order. Within each erasure set, there // is a 50/50 chance of attempting to write the coding blobs first or the data blobs // first. - // The goal is to be as racey as possible and cover a wide range of situations + // The goal is to be as contentious as possible and cover a wide range of situations for thread_id in 0..N_THREADS { let blocktree = Arc::clone(&blocktree); - let mut rng = SmallRng::from_rng(&mut rng).unwrap(); + let model = model.clone(); - let handle = thread::spawn(move || { + let handle = thread::Builder::new().stack_size(32* 1024 * 1024).spawn(move || { + let mut rng = SmallRng::from_rng(&mut thread_rng()).unwrap(); + for slot_model in model { let slot = slot_model.slot; let num_erasure_sets = slot_model.chunks.len(); @@ -3548,50 +3838,40 @@ pub mod tests { blocktree .write_shared_blobs(&erasure_set.data) .expect("Writing data blobs must succeed"); - debug!( + trace!( "multislot: wrote data: slot: {}, erasure_set: {}", - slot, erasure_set.set_index + slot, + erasure_set.set_index ); - for shared_coding_blob in &erasure_set.coding { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; - blocktree - .put_coding_blob_bytes( - slot, - blob.index(), - &blob.data[..size], - ) - .expect("Writing coding blobs must succeed"); - } - debug!( + blocktree + .put_shared_coding_blobs(erasure_set.coding.iter()) + .unwrap(); + + trace!( "multislot: wrote coding: slot: {}, erasure_set: {}", - slot, erasure_set.set_index + slot, + erasure_set.set_index ); } else { // write coding blobs first, then write the data blobs. - for shared_coding_blob in &erasure_set.coding { - let blob = shared_coding_blob.read().unwrap(); - let size = blob.size() + BLOB_HEADER_SIZE; - blocktree - .put_coding_blob_bytes( - slot, - blob.index(), - &blob.data[..size], - ) - .expect("Writing coding blobs must succeed"); - } - debug!( + blocktree + .put_shared_coding_blobs(erasure_set.coding.iter()) + .unwrap(); + + trace!( "multislot: wrote coding: slot: {}, erasure_set: {}", - slot, erasure_set.set_index + slot, + erasure_set.set_index ); blocktree .write_shared_blobs(&erasure_set.data) .expect("Writing data blobs must succeed"); - debug!( + trace!( "multislot: wrote data: slot: {}, erasure_set: {}", - slot, erasure_set.set_index + slot, + erasure_set.set_index ); } @@ -3604,20 +3884,37 @@ pub mod tests { .unwrap() .unwrap(); - let status = erasure_meta.status(); + let index = blocktree.index_cf.get(slot).unwrap().unwrap(); + + let status = erasure_meta.status(&index); attempt += 1; debug!( "[multi_slot] thread_id: {}, attempt: {}, slot: {}, set_index: {}, status: {:?}", thread_id, attempt, slot, erasure_set.set_index, status ); - if status == ErasureMetaStatus::DataFull { - break; + + match status { + ErasureMetaStatus::DataFull => break, + ErasureMetaStatus::CanRecover => { + debug!("[test_multi_slot] can recover"); + + if !rng.gen::() { + continue; + } else { + break; + } + } + ErasureMetaStatus::StillNeed(_) => { + if attempt > N_THREADS + thread_id { + break; + } + } } } } } - }); + }).unwrap(); handles.push(handle); } @@ -3638,17 +3935,27 @@ pub mod tests { .expect("DB get must succeed") .expect("ErasureMeta must be present for each erasure set"); + let index = blocktree + .index_cf + .get(slot) + .expect("DB read") + .expect("Erasure meta for each set"); + debug!( "multislot: got erasure_meta: slot: {}, set_index: {}, erasure_meta: {:?}", slot, set_index, erasure_meta ); + let start_index = erasure_meta.start_index(); + let (data_end_idx, _) = erasure_meta.end_indexes(); + // all possibility for recovery should be exhausted - assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull); + assert_eq!(erasure_meta.status(&index), ErasureMetaStatus::DataFull); // Should have all data - assert_eq!(erasure_meta.num_data(), NUM_DATA); - // Should have all coding - assert_eq!(erasure_meta.num_coding(), NUM_CODING); + assert_eq!( + index.data().present_in_bounds(start_index..data_end_idx), + NUM_DATA + ); } } diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 412f8bd727..e76d7a7d91 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -39,6 +39,10 @@ pub mod columns { #[derive(Debug)] /// The root column pub struct Root; + + #[derive(Debug)] + /// The index column + pub struct Index; } pub trait Backend: Sized + Send + Sync { diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index db2768da07..407ece2a44 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -100,6 +100,25 @@ impl Column for cf::Data { } } +impl Column for cf::Index { + const NAME: &'static str = super::INDEX_CF; + type Index = u64; + + fn key(slot: u64) -> Key { + let mut key = Key::default(); + BigEndian::write_u64(&mut key.0[8..16], slot); + key + } + + fn index(key: &Key) -> u64 { + BigEndian::read_u64(&key.0[8..16]) + } +} + +impl TypedColumn for cf::Index { + type Type = crate::blocktree::meta::Index; +} + impl Column for cf::DeadSlots { const NAME: &'static str = super::DEAD_SLOTS; type Index = u64; diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index 599f04cdd9..7f84afae66 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -1,6 +1,6 @@ use crate::erasure::{NUM_CODING, NUM_DATA}; use solana_metrics::datapoint; -use std::borrow::Borrow; +use std::{collections::BTreeSet, ops::RangeBounds}; #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] // The Meta column family @@ -27,6 +27,116 @@ pub struct SlotMeta { pub is_connected: bool, } +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +/// Index recording presence/absence of blobs +pub struct Index { + pub slot: u64, + data: DataIndex, + coding: CodingIndex, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub struct DataIndex { + /// Map representing presence/absence of data blobs + index: BTreeSet, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +/// Erasure coding information +pub struct CodingIndex { + /// Map from set index, to hashmap from blob index to presence bool + index: BTreeSet, +} + +#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +/// Erasure coding information +pub struct ErasureMeta { + /// Which erasure set in the slot this is + pub set_index: u64, + /// Size of shards in this erasure set + pub size: usize, +} + +#[derive(Debug, PartialEq)] +pub enum ErasureMetaStatus { + CanRecover, + DataFull, + StillNeed(usize), +} + +impl Index { + pub(in crate::blocktree) fn new(slot: u64) -> Self { + Index { + slot, + data: DataIndex::default(), + coding: CodingIndex::default(), + } + } + + pub fn data(&self) -> &DataIndex { + &self.data + } + pub fn coding(&self) -> &CodingIndex { + &self.coding + } + + pub fn data_mut(&mut self) -> &mut DataIndex { + &mut self.data + } + pub fn coding_mut(&mut self) -> &mut CodingIndex { + &mut self.coding + } +} + +/// TODO: Mark: Change this when coding +impl CodingIndex { + pub fn present_in_bounds(&self, bounds: impl RangeBounds) -> usize { + self.index.range(bounds).count() + } + + pub fn is_present(&self, index: u64) -> bool { + self.index.contains(&index) + } + + pub fn set_present(&mut self, index: u64, presence: bool) { + if presence { + self.index.insert(index); + } else { + self.index.remove(&index); + } + } + + pub fn set_many_present(&mut self, presence: impl IntoIterator) { + for (idx, present) in presence.into_iter() { + self.set_present(idx, present); + } + } +} + +impl DataIndex { + pub fn present_in_bounds(&self, bounds: impl RangeBounds) -> usize { + self.index.range(bounds).count() + } + + pub fn is_present(&self, index: u64) -> bool { + self.index.contains(&index) + } + + pub fn set_present(&mut self, index: u64, presence: bool) { + if presence { + self.index.insert(index); + } else { + self.index.remove(&index); + } + } + + pub fn set_many_present(&mut self, presence: impl IntoIterator) { + for (idx, present) in presence.into_iter() { + self.set_present(idx, present); + } + } +} + impl SlotMeta { pub fn is_full(&self) -> bool { // last_index is std::u64::MAX when it has no information about how @@ -72,62 +182,30 @@ impl SlotMeta { } } -#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] -/// Erasure coding information -pub struct ErasureMeta { - /// Which erasure set in the slot this is - pub set_index: u64, - /// Size of shards in this erasure set - size: usize, - /// Bitfield representing presence/absence of data blobs - data: u64, - /// Bitfield representing presence/absence of coding blobs - coding: u64, -} - -#[derive(Debug, PartialEq)] -pub enum ErasureMetaStatus { - CanRecover, - DataFull, - StillNeed(usize), -} - impl ErasureMeta { pub fn new(set_index: u64) -> ErasureMeta { - ErasureMeta { - set_index, - size: 0, - data: 0, - coding: 0, - } + ErasureMeta { set_index, size: 0 } } - pub fn status(&self) -> ErasureMetaStatus { - let (data_missing, coding_missing) = - (NUM_DATA - self.num_data(), NUM_CODING - self.num_coding()); - if data_missing > 0 && data_missing + coding_missing <= NUM_CODING { - //assert!(self.size != 0); - ErasureMetaStatus::CanRecover + pub fn status(&self, index: &Index) -> ErasureMetaStatus { + use ErasureMetaStatus::*; + + let start_idx = self.start_index(); + let (data_end_idx, coding_end_idx) = self.end_indexes(); + + let num_coding = index.coding().present_in_bounds(start_idx..coding_end_idx); + let num_data = index.data().present_in_bounds(start_idx..data_end_idx); + + let (data_missing, coding_missing) = (NUM_DATA - num_data, NUM_CODING - num_coding); + + let total_missing = data_missing + coding_missing; + + if data_missing > 0 && total_missing <= NUM_CODING { + CanRecover } else if data_missing == 0 { - ErasureMetaStatus::DataFull + DataFull } else { - ErasureMetaStatus::StillNeed(data_missing + coding_missing - NUM_CODING) - } - } - - pub fn num_coding(&self) -> usize { - self.coding.count_ones() as usize - } - - pub fn num_data(&self) -> usize { - self.data.count_ones() as usize - } - - pub fn is_coding_present(&self, index: u64) -> bool { - if let Some(position) = self.data_index_in_set(index) { - self.coding & (1 << position) != 0 - } else { - false + StillNeed(total_missing - NUM_CODING) } } @@ -139,72 +217,10 @@ impl ErasureMeta { self.size } - pub fn set_coding_present(&mut self, index: u64, present: bool) { - if let Some(position) = self.data_index_in_set(index) { - if present { - self.coding |= 1 << position; - } else { - self.coding &= !(1 << position); - } - } - } - - pub fn is_data_present(&self, index: u64) -> bool { - if let Some(position) = self.data_index_in_set(index) { - self.data & (1 << position) != 0 - } else { - false - } - } - - pub fn set_data_present(&mut self, index: u64, present: bool) { - if let Some(position) = self.data_index_in_set(index) { - if present { - self.data |= 1 << position; - } else { - self.data &= !(1 << position); - } - } - } - - pub fn set_data_multi(&mut self, indexes: I, present: bool) - where - I: IntoIterator, - Idx: Borrow, - { - for index in indexes.into_iter() { - self.set_data_present(*index.borrow(), present); - } - } - - pub fn set_coding_multi(&mut self, indexes: I, present: bool) - where - I: IntoIterator, - Idx: Borrow, - { - for index in indexes.into_iter() { - self.set_coding_present(*index.borrow(), present); - } - } - pub fn set_index_for(index: u64) -> u64 { index / NUM_DATA as u64 } - pub fn data_index_in_set(&self, index: u64) -> Option { - let set_index = Self::set_index_for(index); - - if set_index == self.set_index { - Some(index - self.start_index()) - } else { - None - } - } - - pub fn coding_index_in_set(&self, index: u64) -> Option { - self.data_index_in_set(index).map(|i| i + NUM_DATA as u64) - } - pub fn start_index(&self) -> u64 { self.set_index * NUM_DATA as u64 } @@ -216,130 +232,59 @@ impl ErasureMeta { } } -#[test] -fn test_meta_indexes() { - use rand::{thread_rng, Rng}; - // to avoid casts everywhere - const NUM_DATA: u64 = crate::erasure::NUM_DATA as u64; - - let mut rng = thread_rng(); - - for _ in 0..100 { - let set_index = rng.gen_range(0, 1_000); - let blob_index = (set_index * NUM_DATA) + rng.gen_range(0, NUM_DATA); - - assert_eq!(set_index, ErasureMeta::set_index_for(blob_index)); - let e_meta = ErasureMeta::new(set_index); - - assert_eq!(e_meta.start_index(), set_index * NUM_DATA); - let (data_end_idx, coding_end_idx) = e_meta.end_indexes(); - assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA); - assert_eq!(coding_end_idx, set_index * NUM_DATA + NUM_CODING as u64); - } - - let mut e_meta = ErasureMeta::new(0); - - assert_eq!(e_meta.data_index_in_set(0), Some(0)); - assert_eq!(e_meta.data_index_in_set(NUM_DATA / 2), Some(NUM_DATA / 2)); - assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), Some(NUM_DATA - 1)); - assert_eq!(e_meta.data_index_in_set(NUM_DATA), None); - assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None); - - e_meta.set_index = 1; - - assert_eq!(e_meta.data_index_in_set(0), None); - assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), None); - assert_eq!(e_meta.data_index_in_set(NUM_DATA), Some(0)); - assert_eq!( - e_meta.data_index_in_set(NUM_DATA * 2 - 1), - Some(NUM_DATA - 1) - ); - assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None); -} - -#[test] -fn test_meta_coding_present() { - let mut e_meta = ErasureMeta::default(); - - e_meta.set_coding_multi(0..NUM_CODING as u64, true); - for i in 0..NUM_CODING as u64 { - assert_eq!(e_meta.is_coding_present(i), true); - } - for i in NUM_CODING as u64..NUM_DATA as u64 { - assert_eq!(e_meta.is_coding_present(i), false); - } - - e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64); - let start_idx = e_meta.start_index(); - e_meta.set_coding_multi(start_idx..start_idx + NUM_CODING as u64, true); - - for i in start_idx..start_idx + NUM_CODING as u64 { - e_meta.set_coding_present(i, true); - assert_eq!(e_meta.is_coding_present(i), true); - } - for i in start_idx + NUM_CODING as u64..start_idx + NUM_DATA as u64 { - assert_eq!(e_meta.is_coding_present(i), false); - } -} - -#[test] -fn test_erasure_meta_status() { +#[cfg(test)] +mod test { + use super::*; use rand::{seq::SliceRandom, thread_rng}; - // Local constansts just used to avoid repetitive casts - const N_DATA: u64 = crate::erasure::NUM_DATA as u64; - const N_CODING: u64 = crate::erasure::NUM_CODING as u64; + use std::iter::repeat; - let mut e_meta = ErasureMeta::default(); - let mut rng = thread_rng(); - let data_indexes: Vec = (0..N_DATA).collect(); - let coding_indexes: Vec = (0..N_CODING).collect(); + #[test] + fn test_erasure_meta_status() { + use ErasureMetaStatus::*; - assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(NUM_DATA)); + let set_index = 0; - e_meta.set_data_multi(0..N_DATA, true); + let mut e_meta = ErasureMeta::new(set_index); + let mut rng = thread_rng(); + let mut index = Index::new(0); + e_meta.size = 1; - assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull); + let data_indexes = 0..NUM_DATA as u64; + let coding_indexes = 0..NUM_CODING as u64; - e_meta.size = 1; - e_meta.set_coding_multi(0..N_CODING, true); + assert_eq!(e_meta.status(&index), StillNeed(NUM_DATA)); - assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull); + index + .data_mut() + .set_many_present(data_indexes.clone().zip(repeat(true))); - for &idx in data_indexes.choose_multiple(&mut rng, NUM_CODING) { - e_meta.set_data_present(idx, false); + assert_eq!(e_meta.status(&index), DataFull); - assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover); - } + index + .coding_mut() + .set_many_present(coding_indexes.clone().zip(repeat(true))); - e_meta.set_data_multi(0..N_DATA, true); + for &idx in data_indexes + .clone() + .collect::>() + .choose_multiple(&mut rng, NUM_DATA) + { + index.data_mut().set_present(idx, false); - for &idx in coding_indexes.choose_multiple(&mut rng, NUM_CODING) { - e_meta.set_coding_present(idx, false); + assert_eq!(e_meta.status(&index), CanRecover); + } - assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull); - } -} - -#[test] -fn test_meta_data_present() { - let mut e_meta = ErasureMeta::default(); - - e_meta.set_data_multi(0..NUM_DATA as u64, true); - for i in 0..NUM_DATA as u64 { - assert_eq!(e_meta.is_data_present(i), true); - } - for i in NUM_DATA as u64..2 * NUM_DATA as u64 { - assert_eq!(e_meta.is_data_present(i), false); - } - - e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 23) as u64); - let start_idx = e_meta.start_index(); - e_meta.set_data_multi(start_idx..start_idx + NUM_DATA as u64, true); - - for i in start_idx..start_idx + NUM_DATA as u64 { - assert_eq!(e_meta.is_data_present(i), true); - } - for i in start_idx - NUM_DATA as u64..start_idx { - assert_eq!(e_meta.is_data_present(i), false); + index + .data_mut() + .set_many_present(data_indexes.zip(repeat(true))); + + for &idx in coding_indexes + .collect::>() + .choose_multiple(&mut rng, NUM_CODING) + { + index.coding_mut().set_present(idx, false); + + assert_eq!(e_meta.status(&index), DataFull); + } } } diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index ef696d7737..4863f9e850 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -30,7 +30,9 @@ impl Backend for Rocks { type Error = rocksdb::Error; fn open(path: &Path) -> Result { - use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta}; + use crate::blocktree::db::columns::{ + Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, SlotMeta, + }; fs::create_dir_all(&path)?; @@ -40,12 +42,14 @@ impl Backend for Rocks { // Column family names let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); - let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options()); + let dead_slots_cf_descriptor = + ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options()); let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options()); + let index_cf_descriptor = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options()); let cfs = vec![ meta_cf_descriptor, @@ -55,6 +59,7 @@ impl Backend for Rocks { erasure_meta_cf_descriptor, orphans_cf_descriptor, root_cf_descriptor, + index_cf_descriptor, ]; // Open the database @@ -64,13 +69,16 @@ impl Backend for Rocks { } fn columns(&self) -> Vec<&'static str> { - use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta}; + use crate::blocktree::db::columns::{ + Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, SlotMeta, + }; vec![ Coding::NAME, ErasureMeta::NAME, DeadSlots::NAME, Data::NAME, + Index::NAME, Orphans::NAME, Root::NAME, SlotMeta::NAME, @@ -164,6 +172,25 @@ impl Column for cf::Data { } } +impl Column for cf::Index { + const NAME: &'static str = super::INDEX_CF; + type Index = u64; + + fn key(slot: u64) -> Vec { + let mut key = vec![0; 8]; + BigEndian::write_u64(&mut key[..], slot); + key + } + + fn index(key: &[u8]) -> u64 { + BigEndian::read_u64(&key[..8]) + } +} + +impl TypedColumn for cf::Index { + type Type = crate::blocktree::meta::Index; +} + impl Column for cf::DeadSlots { const NAME: &'static str = super::DEAD_SLOTS_CF; type Index = u64; diff --git a/core/src/broadcast_stage/broadcast_bad_blob_sizes.rs b/core/src/broadcast_stage/broadcast_bad_blob_sizes.rs index 0bfeccc59f..1737650d9b 100644 --- a/core/src/broadcast_stage/broadcast_bad_blob_sizes.rs +++ b/core/src/broadcast_stage/broadcast_bad_blob_sizes.rs @@ -65,7 +65,8 @@ impl BroadcastRun for BroadcastBadBlobSizes { w_b.meta.size = real_size; } - blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; + blocktree.write_shared_blobs(data_blobs.iter())?; + blocktree.put_shared_coding_blobs(coding_blobs.iter())?; // 3) Start broadcast step let bank_epoch = bank.get_stakers_epoch(bank.slot()); diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs index c3617b3cf9..75297d9e2e 100644 --- a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -73,7 +73,8 @@ impl BroadcastRun for BroadcastFakeBlobsRun { self.last_blockhash = Hash::default(); } - blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; + blocktree.write_shared_blobs(data_blobs.iter())?; + blocktree.put_shared_coding_blobs(coding_blobs.iter())?; // Set the forwarded flag to true, so that the blobs won't be forwarded to peers data_blobs diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 2e26fed504..7417145e12 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -52,7 +52,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { &mut broadcast.coding_generator, ); - blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; + blocktree.write_shared_blobs(data_blobs.iter())?; + blocktree.put_shared_coding_blobs(coding_blobs.iter())?; // 3) Start broadcast step let bank_epoch = bank.get_stakers_epoch(bank.slot()); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 90a2263a92..2b6641dae0 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -82,7 +82,9 @@ impl BroadcastRun for StandardBroadcastRun { &mut broadcast.coding_generator, ); - blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; + blocktree.write_shared_blobs(data_blobs.iter())?; + blocktree.put_shared_coding_blobs(coding_blobs.iter())?; + let to_blobs_elapsed = to_blobs_start.elapsed(); // 3) Start broadcast step diff --git a/core/src/erasure.rs b/core/src/erasure.rs index 99784fa053..f1216ccbfe 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -138,23 +138,25 @@ impl Session { if n < NUM_DATA { let mut blob = Blob::new(&blocks[n]); + blob.meta.size = blob.data_size() as usize; - data_size = blob.data_size() as usize - BLOB_HEADER_SIZE; + data_size = blob.data_size() as usize; idx = n as u64 + block_start_idx; first_byte = blob.data[0]; - blob.set_size(data_size); recovered_data.push(blob); } else { let mut blob = Blob::default(); - blob.data_mut()[..size].copy_from_slice(&blocks[n]); + blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size].copy_from_slice(&blocks[n]); + blob.meta.size = size; + data_size = size; - idx = (n as u64 + block_start_idx) - NUM_DATA as u64; + idx = n as u64 + block_start_idx - NUM_DATA as u64; first_byte = blob.data[0]; blob.set_slot(slot); blob.set_index(idx); - blob.set_size(data_size); + blob.set_coding(); recovered_coding.push(blob); } @@ -240,7 +242,7 @@ impl CodingGenerator { if { let mut coding_ptrs: Vec<_> = coding_blobs .iter_mut() - .map(|blob| &mut blob.data_mut()[..max_data_size]) + .map(|blob| &mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + max_data_size]) .collect(); self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice()) @@ -427,7 +429,7 @@ pub mod test { } #[test] - fn test_erasure_generate_coding() { + fn test_generate_coding() { solana_logger::setup(); // trivial case @@ -449,10 +451,10 @@ pub mod test { assert_eq!(coding_blobs.len(), NUM_CODING); for j in 0..NUM_CODING { - assert_eq!( - coding_blobs[j].read().unwrap().index(), - ((i / NUM_DATA) * NUM_DATA + j) as u64 - ); + let coding_blob = coding_blobs[j].read().unwrap(); + + //assert_eq!(coding_blob.index(), (i * NUM_DATA + j % NUM_CODING) as u64); + assert!(coding_blob.is_coding()); } test_toss_and_recover( &coding_generator.session, @@ -654,6 +656,8 @@ pub mod test { S: Borrow, { let mut coding_generator = CodingGenerator::default(); + let keypair = Keypair::new(); + let bytes = keypair.to_bytes(); specs.into_iter().map(move |spec| { let spec = spec.borrow(); @@ -666,14 +670,14 @@ pub mod test { let set_index = erasure_spec.set_index as usize; let start_index = set_index * NUM_DATA; - let mut blobs = generate_test_blobs(0, NUM_DATA); - index_blobs( - &blobs, - &Keypair::new().pubkey(), - start_index as u64, - slot, - 0, - ); + let mut blobs = generate_test_blobs(start_index, NUM_DATA); + let keypair = Keypair::from_bytes(&bytes).unwrap(); + index_blobs(&blobs, &keypair.pubkey(), start_index as u64, slot, 0); + + // Signing has to be deferred until all data/header fields are set correctly + blobs.iter().for_each(|blob| { + blob.write().unwrap().sign(&keypair); + }); let mut coding_blobs = coding_generator.next(&blobs); @@ -738,9 +742,8 @@ pub mod test { .into_iter() .map(|_| { let mut blob = Blob::default(); - blob.data_mut()[..data.len()].copy_from_slice(&data); - blob.set_size(data.len()); - blob.sign(&Keypair::new()); + blob.data_mut()[..].copy_from_slice(&data); + blob.set_size(BLOB_DATA_SIZE); Arc::new(RwLock::new(blob)) }) .collect(); @@ -771,7 +774,7 @@ pub mod test { if i < NUM_DATA { &mut blob.data[..size] } else { - &mut blob.data_mut()[..size] + &mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size] } }) .collect(); diff --git a/core/src/packet.rs b/core/src/packet.rs index ffd70df022..fd1ffcf9cd 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -515,10 +515,10 @@ impl Blob { } pub fn data(&self) -> &[u8] { - &self.data[BLOB_HEADER_SIZE..] + &self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] } pub fn data_mut(&mut self) -> &mut [u8] { - &mut self.data[BLOB_HEADER_SIZE..] + &mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] } pub fn size(&self) -> usize { let size = self.data_size() as usize; diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 5e85fcc470..39796482e6 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -4,7 +4,7 @@ use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; use crate::leader_schedule_cache::LeaderScheduleCache; -use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; +use crate::packet::{Blob, SharedBlob}; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; @@ -28,11 +28,12 @@ pub const NUM_THREADS: u32 = 10; fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) -> Result<()> { let mut retransmit_queue: Vec = Vec::new(); for blob in blobs { + let mut blob_guard = blob.write().unwrap(); // Don't add blobs generated by this node to the retransmit queue - if blob.read().unwrap().id() != *id { - let mut w_blob = blob.write().unwrap(); - w_blob.meta.forward = w_blob.should_forward(); - w_blob.set_forwarded(false); + if blob_guard.id() != *id && !blob_guard.is_coding() { + //let mut w_blob = blob.write().unwrap(); + blob_guard.meta.forward = blob_guard.should_forward(); + blob_guard.set_forwarded(false); retransmit_queue.push(blob.clone()); } } @@ -52,29 +53,17 @@ fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) /// Process a blob: Add blob to the ledger window. pub fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc) -> Result<()> { // make an iterator for insert_data_blobs() - let blobs: Vec<_> = blobs.iter().map(move |blob| blob.read().unwrap()).collect(); + //let blobs: Vec<_> = blobs.iter().map(move |blob| blob.read().unwrap()).collect(); - blocktree.insert_data_blobs(blobs.iter().filter_map(|blob| { - if !blob.is_coding() { - Some(&(**blob)) - } else { - None - } - }))?; + blocktree.write_shared_blobs( + blobs + .iter() + .filter(|blob| !blob.read().unwrap().is_coding()), + )?; - for blob in blobs { - // TODO: Once the original leader signature is added to the blob, make sure that - // the blob was originally generated by the expected leader for this slot + blocktree + .put_shared_coding_blobs(blobs.iter().filter(|blob| blob.read().unwrap().is_coding()))?; - // Insert the new blob into block tree - if blob.is_coding() { - blocktree.put_coding_blob_bytes( - blob.slot(), - blob.index(), - &blob.data[..BLOB_HEADER_SIZE + blob.size()], - )?; - } - } Ok(()) } @@ -215,6 +204,8 @@ impl WindowService { let bank_forks = bank_forks.clone(); let t_window = Builder::new() .name("solana-window".to_string()) + // TODO: Mark: Why is it overflowing + .stack_size(8 * 1024 * 1024) .spawn(move || { let _exit = Finalizer::new(exit.clone()); let id = cluster_info.read().unwrap().id(); diff --git a/scripts/coverage.sh b/scripts/coverage.sh index ae10c220eb..dfcadae2e3 100755 --- a/scripts/coverage.sh +++ b/scripts/coverage.sh @@ -27,6 +27,7 @@ coverageFlags+=("-Coverflow-checks=off") # Disable overflow checks, which create export RUSTFLAGS="${coverageFlags[*]}" export CARGO_INCREMENTAL=0 export RUST_BACKTRACE=1 +export RUST_MIN_STACK=8388608 echo "--- remove old coverage results" if [[ -d target/cov ]]; then