enscapsulate data_cf (#2336)

* enscapsulate data_cf
This commit is contained in:
Rob Walker
2019-01-08 15:53:44 -08:00
committed by GitHub
parent a82a5ae184
commit a904e15ecc
7 changed files with 207 additions and 193 deletions

View File

@@ -12,7 +12,7 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::borrow::Borrow;
use std::cmp::max;
use std::cmp;
use std::fs::create_dir_all;
use std::io;
use std::path::Path;
@@ -276,7 +276,7 @@ pub struct DbLedger {
// Underlying database is automatically closed in the Drop implementation of DB
db: Arc<DB>,
meta_cf: MetaCf,
pub data_cf: DataCf,
data_cf: DataCf,
pub erasure_cf: ErasureCf,
}
@@ -547,8 +547,8 @@ impl DbLedger {
let last = blobs.last().unwrap().read().unwrap();
meta.consumed = last.index()? + 1;
meta.consumed_slot = last.slot()?;
meta.received = max(meta.received, last.index()? + 1);
meta.received_slot = max(meta.received_slot, last.index()?);
meta.received = cmp::max(meta.received, last.index()? + 1);
meta.received_slot = cmp::max(meta.received_slot, last.index()?);
}
let mut batch = WriteBatch::default();
@@ -634,6 +634,33 @@ impl DbLedger {
Ok(EntryIterator { db_iterator })
}
pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
self.erasure_cf.get_by_slot_index(slot, index)
}
pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> {
self.erasure_cf.delete_by_slot_index(slot, index)
}
pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
self.data_cf.get_by_slot_index(slot, index)
}
pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
self.erasure_cf.put_by_slot_index(slot, index, bytes)
}
pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
self.data_cf.put_by_slot_index(slot, index, bytes)
}
pub fn get_data_blob(&self, slot: u64, index: u64) -> Result<Option<Blob>> {
let bytes = self.get_data_blob_bytes(slot, index)?;
Ok(bytes.map(|bytes| {
let blob = Blob::new(&bytes);
assert!(blob.slot().unwrap() == slot);
assert!(blob.index().unwrap() == index);
blob
}))
}
pub fn get_entries_bytes(
&self,
_start_index: u64,
@@ -643,6 +670,99 @@ impl DbLedger {
Err(io::Error::new(io::ErrorKind::Other, "TODO"))
}
// Given a start and end entry index, find all the missing
// indexes in the ledger in the range [start_index, end_index)
fn find_missing_indexes(
db_iterator: &mut DbLedgerRawIterator,
slot: u64,
start_index: u64,
end_index: u64,
key: &dyn Fn(u64, u64) -> Vec<u8>,
index_from_key: &dyn Fn(&[u8]) -> Result<u64>,
max_missing: usize,
) -> Vec<u64> {
if start_index >= end_index || max_missing == 0 {
return vec![];
}
let mut missing_indexes = vec![];
// Seek to the first blob with index >= start_index
db_iterator.seek(&key(slot, start_index));
// The index of the first missing blob in the slot
let mut prev_index = start_index;
'outer: loop {
if !db_iterator.valid() {
for i in prev_index..end_index {
missing_indexes.push(i);
if missing_indexes.len() == max_missing {
break;
}
}
break;
}
let current_key = db_iterator.key().expect("Expect a valid key");
let current_index = index_from_key(&current_key)
.expect("Expect to be able to parse index from valid key");
let upper_index = cmp::min(current_index, end_index);
for i in prev_index..upper_index {
missing_indexes.push(i);
if missing_indexes.len() == max_missing {
break 'outer;
}
}
if current_index >= end_index {
break;
}
prev_index = current_index + 1;
db_iterator.next();
}
missing_indexes
}
pub fn find_missing_data_indexes(
&self,
slot: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
) -> Vec<u64> {
let mut db_iterator = self.data_cf.raw_iterator();
Self::find_missing_indexes(
&mut db_iterator,
slot,
start_index,
end_index,
&DataCf::key,
&DataCf::index_from_key,
max_missing,
)
}
pub fn find_missing_coding_indexes(
&self,
slot: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
) -> Vec<u64> {
let mut db_iterator = self.erasure_cf.raw_iterator();
Self::find_missing_indexes(
&mut db_iterator,
slot,
start_index,
end_index,
&ErasureCf::key,
&ErasureCf::index_from_key,
max_missing,
)
}
fn get_cf_options() -> Options {
let mut options = Options::default();
options.set_max_write_buffer_number(32);