Simplify storage interface in blocktree (#3522)
This commit is contained in:
@@ -31,45 +31,29 @@ use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
|
||||
use std::sync::Arc;
|
||||
|
||||
mod db;
|
||||
#[cfg(feature = "kvstore")]
|
||||
mod kvs;
|
||||
#[cfg(not(feature = "kvstore"))]
|
||||
mod rocks;
|
||||
|
||||
#[cfg(feature = "kvstore")]
|
||||
use self::kvs::{DataCf, ErasureCf, Kvs, MetaCf};
|
||||
#[cfg(not(feature = "kvstore"))]
|
||||
use self::rocks::{DataCf, ErasureCf, MetaCf, Rocks};
|
||||
macro_rules! db_imports {
|
||||
{ $mod:ident, $db:ident, $db_path:expr } => {
|
||||
mod $mod;
|
||||
|
||||
pub use db::{
|
||||
Cursor, Database, IDataCf, IErasureCf, IMetaCf, IWriteBatch, LedgerColumnFamily,
|
||||
LedgerColumnFamilyRaw,
|
||||
};
|
||||
pub use db::{
|
||||
Cursor, Database, IndexColumn, IWriteBatch, LedgerColumnFamily,
|
||||
LedgerColumnFamilyRaw,
|
||||
};
|
||||
|
||||
pub use $mod::{$db, ErasureCf, MetaCf, DataCf};
|
||||
pub type BlocktreeRawIterator = <$db as Database>::Cursor;
|
||||
pub type WriteBatch = <$db as Database>::WriteBatch;
|
||||
pub type OwnedKey = <$db as Database>::OwnedKey;
|
||||
pub type Key = <$db as Database>::Key;
|
||||
pub const BLOCKTREE_DIRECTORY: &str = $db_path;
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "kvstore"))]
|
||||
pub type BlocktreeRawIterator = <Rocks as Database>::Cursor;
|
||||
db_imports! {rocks, Rocks, "rocksdb"}
|
||||
#[cfg(feature = "kvstore")]
|
||||
pub type BlocktreeRawIterator = <Kvs as Database>::Cursor;
|
||||
|
||||
#[cfg(not(feature = "kvstore"))]
|
||||
pub type WriteBatch = <Rocks as Database>::WriteBatch;
|
||||
#[cfg(feature = "kvstore")]
|
||||
pub type WriteBatch = <Kvs as Database>::WriteBatch;
|
||||
|
||||
#[cfg(not(feature = "kvstore"))]
|
||||
type KeyRef = <Rocks as Database>::KeyRef;
|
||||
#[cfg(feature = "kvstore")]
|
||||
type KeyRef = <Kvs as Database>::KeyRef;
|
||||
|
||||
#[cfg(not(feature = "kvstore"))]
|
||||
pub type Key = <Rocks as Database>::Key;
|
||||
#[cfg(feature = "kvstore")]
|
||||
pub type Key = <Kvs as Database>::Key;
|
||||
|
||||
#[cfg(not(feature = "kvstore"))]
|
||||
pub const BLOCKTREE_DIRECTORY: &str = "rocksdb";
|
||||
#[cfg(feature = "kvstore")]
|
||||
pub const BLOCKTREE_DIRECTORY: &str = "kvstore";
|
||||
db_imports! {kvs, Kvs, "kvstore"}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum BlocktreeError {
|
||||
@@ -161,14 +145,14 @@ impl Blocktree {
|
||||
}
|
||||
|
||||
pub fn meta(&self, slot: u64) -> Result<Option<SlotMeta>> {
|
||||
self.meta_cf.get(&MetaCf::key(slot))
|
||||
self.meta_cf.get(&MetaCf::key(&slot))
|
||||
}
|
||||
|
||||
pub fn reset_slot_consumed(&self, slot: u64) -> Result<()> {
|
||||
let meta_key = MetaCf::key(slot);
|
||||
let meta_key = MetaCf::key(&slot);
|
||||
if let Some(mut meta) = self.meta_cf.get(&meta_key)? {
|
||||
for index in 0..meta.received {
|
||||
self.data_cf.delete_by_slot_index(slot, index)?;
|
||||
self.data_cf.delete_by_index(&(slot, index))?;
|
||||
}
|
||||
meta.consumed = 0;
|
||||
meta.received = 0;
|
||||
@@ -181,12 +165,12 @@ impl Blocktree {
|
||||
|
||||
pub fn get_next_slot(&self, slot: u64) -> Result<Option<u64>> {
|
||||
let mut db_iterator = self.db.raw_iterator_cf(self.meta_cf.handle())?;
|
||||
db_iterator.seek(&MetaCf::key(slot + 1));
|
||||
db_iterator.seek(&MetaCf::key(&(slot + 1)));
|
||||
if !db_iterator.valid() {
|
||||
Ok(None)
|
||||
} else {
|
||||
let key = &db_iterator.key().expect("Expected valid key");
|
||||
Ok(Some(MetaCf::index_from_key(&key)?))
|
||||
Ok(Some(MetaCf::index(&key)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,7 +325,7 @@ impl Blocktree {
|
||||
should_signal = should_signal || Self::slot_has_updates(meta, &meta_backup);
|
||||
write_batch.put_cf(
|
||||
self.meta_cf.handle(),
|
||||
&MetaCf::key(*slot),
|
||||
&MetaCf::key(slot),
|
||||
&serialize(&meta)?,
|
||||
)?;
|
||||
}
|
||||
@@ -368,7 +352,7 @@ impl Blocktree {
|
||||
buf: &mut [u8],
|
||||
slot: u64,
|
||||
) -> Result<(u64, u64)> {
|
||||
let start_key = DataCf::key(slot, start_index);
|
||||
let start_key = DataCf::key(&(slot, start_index));
|
||||
let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?;
|
||||
db_iterator.seek(&start_key);
|
||||
let mut total_blobs = 0;
|
||||
@@ -388,7 +372,7 @@ impl Blocktree {
|
||||
// Check key is the next sequential key based on
|
||||
// blob index
|
||||
let key = &db_iterator.key().expect("Expected valid key");
|
||||
let index = DataCf::index_from_key(key)?;
|
||||
let index = DataCf::index(key).1;
|
||||
if index != expected_index {
|
||||
break;
|
||||
}
|
||||
@@ -421,24 +405,24 @@ impl Blocktree {
|
||||
}
|
||||
|
||||
pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
|
||||
self.erasure_cf.get_by_slot_index(slot, index)
|
||||
self.erasure_cf.get_by_index(&(slot, index))
|
||||
}
|
||||
pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> {
|
||||
self.erasure_cf.delete_by_slot_index(slot, index)
|
||||
self.erasure_cf.delete_by_index(&(slot, index))
|
||||
}
|
||||
pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
|
||||
self.data_cf.get_by_slot_index(slot, index)
|
||||
self.data_cf.get_by_index(&(slot, index))
|
||||
}
|
||||
pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
|
||||
self.erasure_cf.put_by_slot_index(slot, index, bytes)
|
||||
self.erasure_cf.put_by_index(&(slot, index), bytes)
|
||||
}
|
||||
|
||||
pub fn put_data_raw(&self, key: &KeyRef, value: &[u8]) -> Result<()> {
|
||||
self.data_cf.put(key, value)
|
||||
pub fn put_data_raw(&self, key: &Key, value: &[u8]) -> Result<()> {
|
||||
self.data_cf.put_bytes(key, value)
|
||||
}
|
||||
|
||||
pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
|
||||
self.data_cf.put_by_slot_index(slot, index, bytes)
|
||||
self.data_cf.put_by_index(&(slot, index), bytes)
|
||||
}
|
||||
|
||||
pub fn get_data_blob(&self, slot: u64, blob_index: u64) -> Result<Option<Blob>> {
|
||||
@@ -468,9 +452,9 @@ impl Blocktree {
|
||||
slot: u64,
|
||||
start_index: u64,
|
||||
end_index: u64,
|
||||
key: &dyn Fn(u64, u64) -> Key,
|
||||
slot_from_key: &dyn Fn(&KeyRef) -> Result<u64>,
|
||||
index_from_key: &dyn Fn(&KeyRef) -> Result<u64>,
|
||||
key: &dyn Fn(u64, u64) -> OwnedKey,
|
||||
slot_from_key: &dyn Fn(&Key) -> u64,
|
||||
index_from_key: &dyn Fn(&Key) -> u64,
|
||||
max_missing: usize,
|
||||
) -> Vec<u64> {
|
||||
if start_index >= end_index || max_missing == 0 {
|
||||
@@ -495,14 +479,12 @@ impl Blocktree {
|
||||
break;
|
||||
}
|
||||
let current_key = db_iterator.key().expect("Expect a valid key");
|
||||
let current_slot = slot_from_key(¤t_key)
|
||||
.expect("Expect to be able to parse slot from valid key");
|
||||
let current_slot = slot_from_key(¤t_key);
|
||||
let current_index = {
|
||||
if current_slot > slot {
|
||||
end_index
|
||||
} else {
|
||||
index_from_key(¤t_key)
|
||||
.expect("Expect to be able to parse index from valid key")
|
||||
}
|
||||
};
|
||||
let upper_index = cmp::min(current_index, end_index);
|
||||
@@ -543,9 +525,9 @@ impl Blocktree {
|
||||
slot,
|
||||
start_index,
|
||||
end_index,
|
||||
&DataCf::key,
|
||||
&DataCf::slot_from_key,
|
||||
&DataCf::index_from_key,
|
||||
&|slot, index| DataCf::key(&(slot, index)),
|
||||
&MetaCf::index,
|
||||
&|key| DataCf::index(key).1,
|
||||
max_missing,
|
||||
)
|
||||
}
|
||||
@@ -564,9 +546,9 @@ impl Blocktree {
|
||||
slot,
|
||||
start_index,
|
||||
end_index,
|
||||
&ErasureCf::key,
|
||||
&ErasureCf::slot_from_key,
|
||||
&ErasureCf::index_from_key,
|
||||
&|slot, index| ErasureCf::key(&(slot, index)),
|
||||
&MetaCf::index,
|
||||
&|key| ErasureCf::index(key).1,
|
||||
max_missing,
|
||||
)
|
||||
}
|
||||
@@ -661,11 +643,7 @@ impl Blocktree {
|
||||
// Write all the newly changed slots in new_chained_slots to the write_batch
|
||||
for (slot, meta_copy) in new_chained_slots.iter() {
|
||||
let meta: &SlotMeta = &RefCell::borrow(&*meta_copy);
|
||||
write_batch.put_cf(
|
||||
self.meta_cf.handle(),
|
||||
&MetaCf::key(*slot),
|
||||
&serialize(meta)?,
|
||||
)?;
|
||||
write_batch.put_cf(self.meta_cf.handle(), &MetaCf::key(slot), &serialize(meta)?)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -848,7 +826,7 @@ impl Blocktree {
|
||||
}
|
||||
};
|
||||
|
||||
let key = DataCf::key(blob_slot, blob_index);
|
||||
let key = DataCf::key(&(blob_slot, blob_index));
|
||||
let serialized_blob_data = &blob_to_insert.data[..BLOB_HEADER_SIZE + blob_size];
|
||||
|
||||
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
||||
@@ -892,7 +870,7 @@ impl Blocktree {
|
||||
// Try to find the next blob we're looking for in the prev_inserted_blob_datas
|
||||
if let Some(prev_blob_data) = prev_inserted_blob_datas.get(&(slot, current_index)) {
|
||||
blobs.push(Cow::Borrowed(*prev_blob_data));
|
||||
} else if let Some(blob_data) = self.data_cf.get_by_slot_index(slot, current_index)? {
|
||||
} else if let Some(blob_data) = self.data_cf.get_by_index(&(slot, current_index))? {
|
||||
// Try to find the next blob we're looking for in the database
|
||||
blobs.push(Cow::Owned(blob_data));
|
||||
} else {
|
||||
@@ -909,7 +887,7 @@ impl Blocktree {
|
||||
// don't count as ticks, even if they're empty entries
|
||||
fn write_genesis_blobs(&self, blobs: &[Blob]) -> Result<()> {
|
||||
// TODO: change bootstrap height to number of slots
|
||||
let meta_key = MetaCf::key(0);
|
||||
let meta_key = MetaCf::key(&0);
|
||||
let mut bootstrap_meta = SlotMeta::new(0, 1);
|
||||
let last = blobs.last().unwrap();
|
||||
|
||||
@@ -924,7 +902,7 @@ impl Blocktree {
|
||||
&serialize(&bootstrap_meta)?,
|
||||
)?;
|
||||
for blob in blobs {
|
||||
let key = DataCf::key(blob.slot(), blob.index());
|
||||
let key = DataCf::key(&(blob.slot(), blob.index()));
|
||||
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()];
|
||||
batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?;
|
||||
}
|
||||
@@ -1171,7 +1149,7 @@ pub mod tests {
|
||||
|
||||
// Test meta column family
|
||||
let meta = SlotMeta::new(0, 1);
|
||||
let meta_key = MetaCf::key(0);
|
||||
let meta_key = MetaCf::key(&0);
|
||||
ledger.meta_cf.put(&meta_key, &meta).unwrap();
|
||||
let result = ledger
|
||||
.meta_cf
|
||||
@@ -1183,12 +1161,15 @@ pub mod tests {
|
||||
|
||||
// Test erasure column family
|
||||
let erasure = vec![1u8; 16];
|
||||
let erasure_key = ErasureCf::key(0, 0);
|
||||
ledger.erasure_cf.put(&erasure_key, &erasure).unwrap();
|
||||
let erasure_key = ErasureCf::key(&(0, 0));
|
||||
ledger
|
||||
.erasure_cf
|
||||
.put_bytes(&erasure_key[..], &erasure)
|
||||
.unwrap();
|
||||
|
||||
let result = ledger
|
||||
.erasure_cf
|
||||
.get(&erasure_key)
|
||||
.get_bytes(&erasure_key[..])
|
||||
.unwrap()
|
||||
.expect("Expected erasure object to exist");
|
||||
|
||||
@@ -1196,12 +1177,12 @@ pub mod tests {
|
||||
|
||||
// Test data column family
|
||||
let data = vec![2u8; 16];
|
||||
let data_key = DataCf::key(0, 0);
|
||||
ledger.data_cf.put(&data_key, &data).unwrap();
|
||||
let data_key = DataCf::key(&(0, 0));
|
||||
ledger.data_cf.put_bytes(&data_key, &data).unwrap();
|
||||
|
||||
let result = ledger
|
||||
.data_cf
|
||||
.get(&data_key)
|
||||
.get_bytes(&data_key)
|
||||
.unwrap()
|
||||
.expect("Expected data object to exist");
|
||||
|
||||
@@ -1296,7 +1277,7 @@ pub mod tests {
|
||||
|
||||
let meta = ledger
|
||||
.meta_cf
|
||||
.get(&MetaCf::key(0))
|
||||
.get(&MetaCf::key(&0))
|
||||
.unwrap()
|
||||
.expect("Expected new metadata object to be created");
|
||||
assert!(meta.consumed == 0 && meta.received == num_entries);
|
||||
@@ -1311,7 +1292,7 @@ pub mod tests {
|
||||
|
||||
let meta = ledger
|
||||
.meta_cf
|
||||
.get(&MetaCf::key(0))
|
||||
.get(&MetaCf::key(&0))
|
||||
.unwrap()
|
||||
.expect("Expected new metadata object to exist");
|
||||
assert_eq!(meta.consumed, num_entries);
|
||||
@@ -1341,7 +1322,7 @@ pub mod tests {
|
||||
|
||||
let meta = ledger
|
||||
.meta_cf
|
||||
.get(&MetaCf::key(0))
|
||||
.get(&MetaCf::key(&0))
|
||||
.unwrap()
|
||||
.expect("Expected metadata object to exist");
|
||||
assert_eq!(meta.parent_slot, 0);
|
||||
@@ -1392,14 +1373,13 @@ pub mod tests {
|
||||
.raw_iterator_cf(blocktree.data_cf.handle())
|
||||
.expect("Expected to be able to open database iterator");
|
||||
|
||||
db_iterator.seek(&DataCf::key(slot, 1));
|
||||
db_iterator.seek(&DataCf::key(&(slot, 1)));
|
||||
|
||||
// Iterate through ledger
|
||||
for i in 0..num_entries {
|
||||
assert!(db_iterator.valid());
|
||||
let current_key = db_iterator.key().expect("Expected a valid key");
|
||||
let current_index = DataCf::index_from_key(¤t_key)
|
||||
.expect("Expect to be able to parse index from valid key");
|
||||
let current_index = DataCf::index(¤t_key).1;
|
||||
assert_eq!(current_index, (1 as u64) << (i * 8));
|
||||
db_iterator.next();
|
||||
}
|
||||
@@ -1519,7 +1499,7 @@ pub mod tests {
|
||||
|
||||
assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]);
|
||||
|
||||
let meta_key = MetaCf::key(slot);
|
||||
let meta_key = MetaCf::key(&slot);
|
||||
let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap();
|
||||
if num_entries % 2 == 0 {
|
||||
assert_eq!(meta.received, num_entries);
|
||||
@@ -1541,7 +1521,7 @@ pub mod tests {
|
||||
original_entries,
|
||||
);
|
||||
|
||||
let meta_key = MetaCf::key(slot);
|
||||
let meta_key = MetaCf::key(&slot);
|
||||
let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap();
|
||||
assert_eq!(meta.received, num_entries);
|
||||
assert_eq!(meta.consumed, num_entries);
|
||||
@@ -1594,7 +1574,7 @@ pub mod tests {
|
||||
|
||||
assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), expected,);
|
||||
|
||||
let meta_key = MetaCf::key(0);
|
||||
let meta_key = MetaCf::key(&0);
|
||||
let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap();
|
||||
assert_eq!(meta.consumed, num_unique_entries);
|
||||
assert_eq!(meta.received, num_unique_entries);
|
||||
@@ -2053,14 +2033,17 @@ pub mod tests {
|
||||
assert!(blocktree.get_slots_since(&vec![0]).unwrap().is_empty());
|
||||
|
||||
let mut meta0 = SlotMeta::new(0, 0);
|
||||
blocktree.meta_cf.put_slot_meta(0, &meta0).unwrap();
|
||||
blocktree
|
||||
.meta_cf
|
||||
.put_by_index(&0, &serialize(&meta0).unwrap())
|
||||
.unwrap();
|
||||
|
||||
// Slot exists, chains to nothing
|
||||
let expected: HashMap<u64, Vec<u64>> =
|
||||
HashMap::from_iter(vec![(0, vec![])].into_iter());
|
||||
assert_eq!(blocktree.get_slots_since(&vec![0]).unwrap(), expected);
|
||||
meta0.next_slots = vec![1, 2];
|
||||
blocktree.meta_cf.put_slot_meta(0, &meta0).unwrap();
|
||||
blocktree.meta_cf.put(&MetaCf::key(&0), &meta0).unwrap();
|
||||
|
||||
// Slot exists, chains to some other slots
|
||||
let expected: HashMap<u64, Vec<u64>> =
|
||||
@@ -2070,7 +2053,10 @@ pub mod tests {
|
||||
|
||||
let mut meta3 = SlotMeta::new(3, 1);
|
||||
meta3.next_slots = vec![10, 5];
|
||||
blocktree.meta_cf.put_slot_meta(3, &meta3).unwrap();
|
||||
blocktree
|
||||
.meta_cf
|
||||
.put_by_index(&3, &serialize(&meta3).unwrap())
|
||||
.unwrap();
|
||||
let expected: HashMap<u64, Vec<u64>> =
|
||||
HashMap::from_iter(vec![(0, vec![1, 2]), (3, vec![10, 5])].into_iter());
|
||||
assert_eq!(blocktree.get_slots_since(&vec![0, 1, 3]).unwrap(), expected);
|
||||
@@ -2119,7 +2105,7 @@ pub mod tests {
|
||||
entries[i as usize]
|
||||
);
|
||||
|
||||
let meta_key = MetaCf::key(i);
|
||||
let meta_key = MetaCf::key(&i);
|
||||
let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap();
|
||||
assert_eq!(meta.received, i + 1);
|
||||
assert_eq!(meta.last_index, i);
|
||||
|
Reference in New Issue
Block a user