implement erasure-based recovery inside blocktree (#3739)

* implement recover in blocktree

* erasures metric

* erasure metrics only

* fixup
This commit is contained in:
Rob Walker
2019-04-11 14:14:57 -07:00
committed by GitHub
parent d31989f878
commit efd19b07e7
6 changed files with 1121 additions and 409 deletions

View File

@@ -3,6 +3,8 @@
//! access read to a persistent file-based ledger.
use crate::entry::Entry;
#[cfg(feature = "erasure")]
use crate::erasure;
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::result::{Error, Result};
#[cfg(feature = "kvstore")]
@@ -15,7 +17,8 @@ use hashbrown::HashMap;
#[cfg(not(feature = "kvstore"))]
use rocksdb;
use serde::Serialize;
#[cfg(feature = "erasure")]
use solana_metrics::counter::Counter;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
@@ -30,7 +33,10 @@ use std::rc::Rc;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::{Arc, RwLock};
pub use self::meta::*;
mod db;
mod meta;
macro_rules! db_imports {
{ $mod:ident, $db:ident, $db_path:expr } => {
@@ -67,67 +73,14 @@ pub enum BlocktreeError {
KvsDb(kvstore::Error),
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
// The Meta column family
pub struct SlotMeta {
// The number of slots above the root (the genesis block). The first
// slot has slot 0.
pub slot: u64,
// The total number of consecutive blobs starting from index 0
// we have received for this slot.
pub consumed: u64,
// The index *plus one* of the highest blob received for this slot. Useful
// for checking if the slot has received any blobs yet, and to calculate the
// range where there is one or more holes: `(consumed..received)`.
pub received: u64,
// The index of the blob that is flagged as the last blob for this slot.
pub last_index: u64,
// The slot height of the block this one derives from.
pub parent_slot: u64,
// The list of slot heights, each of which contains a block that derives
// from this one.
pub next_slots: Vec<u64>,
// True if this slot is full (consumed == last_index + 1) and if every
// slot that is a parent of this slot is also connected.
pub is_connected: bool,
}
impl SlotMeta {
pub fn is_full(&self) -> bool {
// last_index is std::u64::MAX when it has no information about how
// many blobs will fill this slot.
// Note: A full slot with zero blobs is not possible.
if self.last_index == std::u64::MAX {
return false;
}
assert!(self.consumed <= self.last_index + 1);
self.consumed == self.last_index + 1
}
pub fn is_parent_set(&self) -> bool {
self.parent_slot != std::u64::MAX
}
fn new(slot: u64, parent_slot: u64) -> Self {
SlotMeta {
slot,
consumed: 0,
received: 0,
parent_slot,
next_slots: vec![],
is_connected: slot == 0,
last_index: std::u64::MAX,
}
}
}
// ledger window
pub struct Blocktree {
db: Arc<Database>,
meta_cf: LedgerColumn<cf::SlotMeta>,
data_cf: LedgerColumn<cf::Data>,
erasure_cf: LedgerColumn<cf::Coding>,
#[cfg(feature = "erasure")]
erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
orphans_cf: LedgerColumn<cf::Orphans>,
pub new_blobs_signals: Vec<SyncSender<bool>>,
pub root_slot: RwLock<u64>,
@@ -139,6 +92,8 @@ pub const META_CF: &str = "meta";
pub const DATA_CF: &str = "data";
// Column family for erasure data
pub const ERASURE_CF: &str = "erasure";
#[cfg(feature = "erasure")]
pub const ERASURE_META_CF: &str = "erasure_meta";
// Column family for orphans data
pub const ORPHANS_CF: &str = "orphans";
@@ -161,6 +116,8 @@ impl Blocktree {
// Create the erasure column family
let erasure_cf = LedgerColumn::new(&db);
#[cfg(feature = "erasure")]
let erasure_meta_cf = LedgerColumn::new(&db);
// Create the orphans column family. An "orphan" is defined as
// the head of a detached chain of slots, i.e. a slot with no
@@ -172,6 +129,8 @@ impl Blocktree {
meta_cf,
data_cf,
erasure_cf,
#[cfg(feature = "erasure")]
erasure_meta_cf,
orphans_cf,
new_blobs_signals: vec![],
root_slot: RwLock::new(0),
@@ -314,6 +273,8 @@ impl Blocktree {
// A map from slot to a 2-tuple of metadata: (working copy, backup copy),
// so we can detect changes to the slot metadata later
let mut slot_meta_working_set = HashMap::new();
#[cfg(feature = "erasure")]
let mut erasure_meta_working_set = HashMap::new();
let new_blobs: Vec<_> = new_blobs.into_iter().collect();
let mut prev_inserted_blob_datas = HashMap::new();
@@ -354,6 +315,21 @@ impl Blocktree {
continue;
}
#[cfg(feature = "erasure")]
{
let set_index = ErasureMeta::set_index_for(blob.index());
let erasure_meta_entry = erasure_meta_working_set
.entry((blob_slot, set_index))
.or_insert_with(|| {
self.erasure_meta_cf
.get((blob_slot, set_index))
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index))
});
erasure_meta_entry.set_data_present(blob.index());
}
let _ = self.insert_data_blob(
blob,
&mut prev_inserted_blob_datas,
@@ -377,13 +353,53 @@ impl Blocktree {
}
}
#[cfg(feature = "erasure")]
{
for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() {
write_batch.put::<cf::ErasureMeta>((*slot, *set_index), erasure_meta)?;
}
}
self.db.write(write_batch)?;
if should_signal {
for signal in self.new_blobs_signals.iter() {
let _ = signal.try_send(true);
}
}
#[cfg(feature = "erasure")]
for ((slot, set_index), erasure_meta) in erasure_meta_working_set.into_iter() {
if erasure_meta.can_recover() {
match self.recover(slot, set_index) {
Ok(recovered) => {
inc_new_counter_info!("erasures-recovered", recovered);
}
Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => {
let mut erasure_meta = self
.erasure_meta_cf
.get((slot, set_index))?
.expect("erasure meta should exist");
let mut batch = self.db.batch()?;
let start_index = erasure_meta.start_index();
let (_, coding_end_idx) = erasure_meta.end_indexes();
erasure_meta.coding = 0;
batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
for idx in start_index..coding_end_idx {
batch.delete::<cf::Coding>((slot, idx))?;
}
self.db.write(batch)?;
}
Err(e) => return Err(e),
}
}
}
Ok(())
}
@@ -457,10 +473,66 @@ impl Blocktree {
pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
self.data_cf.get_bytes((slot, index))
}
pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
self.erasure_cf.put_bytes((slot, index), bytes)
}
#[cfg(not(feature = "erasure"))]
#[inline]
pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
self.put_coding_blob_bytes_raw(slot, index, bytes)
}
/// this function will insert coding blobs and also automatically track erasure-related
/// metadata. If recovery is available it will be done
#[cfg(feature = "erasure")]
pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
let set_index = ErasureMeta::set_index_for(index);
let mut erasure_meta = self
.erasure_meta_cf
.get((slot, set_index))?
.unwrap_or_else(|| ErasureMeta::new(set_index));
erasure_meta.set_coding_present(index);
let mut writebatch = self.db.batch()?;
writebatch.put_bytes::<cf::Coding>((slot, index), bytes)?;
writebatch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
self.db.write(writebatch)?;
if erasure_meta.can_recover() {
match self.recover(slot, set_index) {
Ok(recovered) => {
inc_new_counter_info!("erasures-recovered", recovered);
return Ok(());
}
Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => {
let start_index = erasure_meta.start_index();
let (_, coding_end_idx) = erasure_meta.end_indexes();
let mut batch = self.db.batch()?;
erasure_meta.coding = 0;
batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
for idx in start_index..coding_end_idx {
batch.delete::<cf::Coding>((slot, idx as u64))?;
}
self.db.write(batch)?;
return Ok(());
}
Err(e) => return Err(e),
}
}
Ok(())
}
pub fn put_data_raw(&self, slot: u64, index: u64, value: &[u8]) -> Result<()> {
self.data_cf.put_bytes((slot, index), value)
}
@@ -1016,6 +1088,144 @@ impl Blocktree {
Ok(())
}
#[cfg(feature = "erasure")]
/// Attempts recovery using erasure coding
fn recover(&self, slot: u64, set_index: u64) -> Result<usize> {
use crate::erasure::{ErasureError, NUM_CODING, NUM_DATA};
use crate::packet::BLOB_DATA_SIZE;
let erasure_meta = self.erasure_meta_cf.get((slot, set_index))?.unwrap();
let start_idx = erasure_meta.start_index();
let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes();
let mut erasures = Vec::with_capacity(NUM_CODING + 1);
let (mut data, mut coding) = (vec![], vec![]);
let mut size = 0;
for i in start_idx..coding_end_idx {
if erasure_meta.is_coding_present(i) {
let blob_bytes = self
.erasure_cf
.get_bytes((slot, i))?
.expect("erasure_meta must have no false positives");
if size == 0 {
size = blob_bytes.len() - BLOB_HEADER_SIZE;
}
coding.push(blob_bytes);
} else {
let set_relative_idx = (i - start_idx) + NUM_DATA as u64;
coding.push(vec![0; crate::packet::BLOB_SIZE]);
erasures.push(set_relative_idx as i32);
}
}
assert_ne!(size, 0);
for i in start_idx..data_end_idx {
if erasure_meta.is_data_present(i) {
let mut blob_bytes = self
.data_cf
.get_bytes((slot, i))?
.expect("erasure_meta must have no false positives");
// If data is too short, extend it with zeroes
if blob_bytes.len() < size {
blob_bytes.resize(size, 0u8);
}
data.push(blob_bytes);
} else {
let set_relative_index = i - start_idx;
data.push(vec![0; size]);
// data erasures must come before any coding erasures if present
erasures.insert(0, set_relative_index as i32);
}
}
let mut coding_ptrs: Vec<_> = coding
.iter_mut()
.map(|coding_bytes| &mut coding_bytes[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size])
.collect();
let mut data_ptrs: Vec<_> = data
.iter_mut()
.map(|data_bytes| &mut data_bytes[..size])
.collect();
// Marks the end
erasures.push(-1);
trace!("erasures: {:?}, size: {}", erasures, size);
erasure::decode_blocks(
data_ptrs.as_mut_slice(),
coding_ptrs.as_mut_slice(),
&erasures,
)?;
// Create the missing blobs from the reconstructed data
let block_start_idx = erasure_meta.start_index();
let (mut recovered_data, mut recovered_coding) = (vec![], vec![]);
for i in &erasures[..erasures.len() - 1] {
let n = *i as usize;
let (data_size, idx, first_byte);
if n < NUM_DATA {
let mut blob = Blob::new(&data_ptrs[n]);
idx = n as u64 + block_start_idx;
data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
first_byte = blob.data[0];
if data_size > BLOB_DATA_SIZE {
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
return Err(Error::ErasureError(ErasureError::CorruptCoding));
}
blob.set_slot(slot);
blob.set_index(idx);
blob.set_size(data_size);
recovered_data.push(blob);
} else {
let mut blob = Blob::new(&coding_ptrs[n - NUM_DATA]);
idx = (n - NUM_DATA) as u64 + block_start_idx;
data_size = size;
first_byte = blob.data[0];
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
return Err(Error::ErasureError(ErasureError::CorruptCoding));
}
blob.set_slot(slot);
blob.set_index(idx);
blob.set_data_size(data_size as u64);
recovered_coding.push(blob);
}
trace!(
"erasures[{}] ({}) size: {} data[0]: {}",
*i,
idx,
data_size,
first_byte,
);
}
self.write_blobs(recovered_data)?;
for blob in recovered_coding {
self.put_coding_blob_bytes_raw(slot, blob.index(), &blob.data[..])?;
}
Ok(erasures.len() - 1)
}
/// Returns the next consumed index and the number of ticks in the new consumed
/// range
fn get_slot_consecutive_blobs<'a>(
@@ -2484,6 +2694,282 @@ pub mod tests {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[cfg(feature = "erasure")]
mod erasure {
use super::*;
use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec};
use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA};
use rand::{thread_rng, Rng};
use std::sync::RwLock;
impl Into<SharedBlob> for Blob {
fn into(self) -> SharedBlob {
Arc::new(RwLock::new(self))
}
}
#[test]
fn test_erasure_meta_accuracy() {
let path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&path).unwrap();
// one erasure set + half of the next
let num_blobs = 24;
let slot = 0;
let (blobs, _) = make_slot_entries(slot, 0, num_blobs);
let shared_blobs: Vec<_> = blobs
.iter()
.cloned()
.map(|blob| Arc::new(RwLock::new(blob)))
.collect();
blocktree.write_blobs(&blobs[8..16]).unwrap();
let erasure_meta_opt = blocktree
.erasure_meta_cf
.get((slot, 0))
.expect("DB get must succeed");
assert!(erasure_meta_opt.is_some());
let erasure_meta = erasure_meta_opt.unwrap();
assert_eq!(erasure_meta.data, 0xFF00);
assert_eq!(erasure_meta.coding, 0x0);
blocktree.write_blobs(&blobs[..8]).unwrap();
let erasure_meta = blocktree
.erasure_meta_cf
.get((slot, 0))
.expect("DB get must succeed")
.unwrap();
assert_eq!(erasure_meta.data, 0xFFFF);
assert_eq!(erasure_meta.coding, 0x0);
blocktree.write_blobs(&blobs[16..]).unwrap();
let erasure_meta = blocktree
.erasure_meta_cf
.get((slot, 1))
.expect("DB get must succeed")
.unwrap();
assert_eq!(erasure_meta.data, 0x00FF);
assert_eq!(erasure_meta.coding, 0x0);
let mut coding_generator = CodingGenerator::new();
let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]).unwrap();
for shared_coding_blob in coding_blobs {
let blob = shared_coding_blob.read().unwrap();
let size = blob.size() + BLOB_HEADER_SIZE;
blocktree
.put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size])
.unwrap();
}
let erasure_meta = blocktree
.erasure_meta_cf
.get((slot, 0))
.expect("DB get must succeed")
.unwrap();
assert_eq!(erasure_meta.data, 0xFFFF);
assert_eq!(erasure_meta.coding, 0x0F);
}
#[test]
pub fn test_recovery_basic() {
solana_logger::setup();
let slot = 0;
let ledger_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&ledger_path).unwrap();
let data_blobs = make_slot_entries(slot, 0, 3 * NUM_DATA as u64)
.0
.into_iter()
.map(Blob::into)
.collect::<Vec<_>>();
let mut coding_generator = CodingGenerator::new();
for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() {
let focused_index = (set_index + 1) * NUM_DATA - 1;
let coding_blobs = coding_generator.next(&data_blobs).unwrap();
assert_eq!(coding_blobs.len(), NUM_CODING);
let deleted_data = data_blobs[NUM_DATA - 1].clone();
debug!(
"deleted: slot: {}, index: {}",
deleted_data.read().unwrap().slot(),
deleted_data.read().unwrap().index()
);
blocktree
.write_shared_blobs(&data_blobs[..NUM_DATA - 1])
.unwrap();
// this should trigger recovery
for shared_coding_blob in coding_blobs {
let blob = shared_coding_blob.read().unwrap();
let size = blob.size() + BLOB_HEADER_SIZE;
blocktree
.put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
.expect("Inserting coding blobs must succeed");
(slot, blob.index());
}
let erasure_meta = blocktree
.erasure_meta_cf
.get((slot, set_index as u64))
.expect("Erasure Meta should be present")
.unwrap();
assert_eq!(erasure_meta.data, 0xFFFF);
assert_eq!(erasure_meta.coding, 0x0F);
let retrieved_data = blocktree
.data_cf
.get_bytes((slot, focused_index as u64))
.unwrap();
assert!(retrieved_data.is_some());
let data_blob = Blob::new(&retrieved_data.unwrap());
assert_eq!(&data_blob, &*deleted_data.read().unwrap());
}
drop(blocktree);
Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction");
}
/// FIXME: JERASURE Threading: see Issue
/// [#3725](https://github.com/solana-labs/solana/issues/3725)
#[test]
fn test_recovery_multi_slot_multi_thread() {
use std::thread;
const USE_THREADS: bool = true;
let slots = vec![0, 3, 5, 50, 100];
let max_erasure_sets = 16;
solana_logger::setup();
let path = get_tmp_ledger_path!();
let mut rng = thread_rng();
// Specification should generate a ledger where each slot has an random number of
// erasure sets. Odd erasure sets will have all data blobs and no coding blobs, and even ones
// will have between 1-4 data blobs missing and all coding blobs
let specs = slots
.iter()
.map(|&slot| {
let num_erasure_sets = rng.gen_range(0, max_erasure_sets);
let set_specs = (0..num_erasure_sets)
.map(|set_index| {
let (num_data, num_coding) = if set_index % 2 == 0 {
(NUM_DATA - rng.gen_range(1, 5), NUM_CODING)
} else {
(NUM_DATA, 0)
};
ErasureSpec {
set_index,
num_data,
num_coding,
}
})
.collect();
SlotSpec { slot, set_specs }
})
.collect::<Vec<_>>();
let model = generate_ledger_model(&specs);
let blocktree = Arc::new(Blocktree::open(&path).unwrap());
// Write to each slot in a different thread simultaneously.
// These writes should trigger the recovery. Every erasure set should have all of its
// data blobs
let mut handles = vec![];
for slot_model in model.clone() {
let blocktree = Arc::clone(&blocktree);
let slot = slot_model.slot;
let closure = move || {
for erasure_set in slot_model.chunks {
blocktree
.write_shared_blobs(erasure_set.data)
.expect("Writing data blobs must succeed");
debug!(
"multislot: wrote data: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
for shared_coding_blob in erasure_set.coding {
let blob = shared_coding_blob.read().unwrap();
let size = blob.size() + BLOB_HEADER_SIZE;
blocktree
.put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
.expect("Writing coding blobs must succeed");
}
debug!(
"multislot: wrote coding: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
}
};
if USE_THREADS {
handles.push(thread::spawn(closure));
} else {
closure();
}
}
handles
.into_iter()
.for_each(|handle| handle.join().unwrap());
for slot_model in model {
let slot = slot_model.slot;
for erasure_set_model in slot_model.chunks {
let set_index = erasure_set_model.set_index as u64;
let erasure_meta = blocktree
.erasure_meta_cf
.get((slot, set_index))
.expect("DB get must succeed")
.expect("ErasureMeta must be present for each erasure set");
debug!(
"multislot: got erasure_meta: slot: {}, set_index: {}, erasure_meta: {:?}",
slot, set_index, erasure_meta
);
// all possibility for recovery should be exhausted
assert!(!erasure_meta.can_recover());
// Should have all data
assert_eq!(erasure_meta.data, 0xFFFF);
if set_index % 2 == 0 {
// Even sets have all coding
assert_eq!(erasure_meta.coding, 0x0F);
}
}
}
drop(blocktree);
Blocktree::destroy(&path).expect("Blocktree destruction must succeed");
}
}
pub fn entries_to_blobs(
entries: &Vec<Entry>,
slot: u64,