db_ledger now fully encapsulates rocksdb

This commit is contained in:
Michael Vines
2018-12-20 11:16:07 -08:00
parent 7148c14178
commit 034c5d0422
7 changed files with 137 additions and 155 deletions

View File

@@ -23,7 +23,7 @@ fn bench_write_blobs(bench: &mut Bencher, blobs: &mut [&mut Blob], ledger_path:
let size = blob.size().unwrap(); let size = blob.size().unwrap();
db_ledger db_ledger
.data_cf .data_cf
.put(&db_ledger.db, &key, &blob.data[..BLOB_HEADER_SIZE + size]) .put(&key, &blob.data[..BLOB_HEADER_SIZE + size])
.unwrap(); .unwrap();
blob.set_index(index + num_blobs as u64).unwrap(); blob.set_index(index + num_blobs as u64).unwrap();
} }
@@ -99,10 +99,9 @@ fn bench_read_sequential(bench: &mut Bencher) {
// Generate random starting point in the range [0, total_blobs - 1], read num_reads blobs sequentially // Generate random starting point in the range [0, total_blobs - 1], read num_reads blobs sequentially
let start_index = rng.gen_range(0, num_small_blobs + num_large_blobs); let start_index = rng.gen_range(0, num_small_blobs + num_large_blobs);
for i in start_index..start_index + num_reads { for i in start_index..start_index + num_reads {
let _ = let _ = db_ledger
db_ledger
.data_cf .data_cf
.get_by_slot_index(&db_ledger.db, slot, i as u64 % total_blobs); .get_by_slot_index(slot, i as u64 % total_blobs);
} }
}); });
@@ -133,9 +132,7 @@ fn bench_read_random(bench: &mut Bencher) {
.collect(); .collect();
bench.iter(move || { bench.iter(move || {
for i in indexes.iter() { for i in indexes.iter() {
let _ = db_ledger let _ = db_ledger.data_cf.get_by_slot_index(slot, *i as u64);
.data_cf
.get_by_slot_index(&db_ledger.db, slot, *i as u64);
} }
}); });

View File

@@ -472,7 +472,7 @@ mod test {
.expect("Leader should exist"); .expect("Leader should exist");
let result = db_ledger let result = db_ledger
.data_cf .data_cf
.get_by_slot_index(&db_ledger.db, slot, entry_height + i) .get_by_slot_index(slot, entry_height + i)
.unwrap(); .unwrap();
assert!(result.is_some()); assert!(result.is_some());

View File

@@ -678,15 +678,13 @@ impl ClusterInfo {
ix: u64, ix: u64,
) -> Vec<SharedBlob> { ) -> Vec<SharedBlob> {
if let Some(db_ledger) = db_ledger { if let Some(db_ledger) = db_ledger {
let meta = db_ledger let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT));
.meta_cf
.get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT));
if let Ok(Some(meta)) = meta { if let Ok(Some(meta)) = meta {
let max_slot = meta.received_slot; let max_slot = meta.received_slot;
// Try to find the requested index in one of the slots // Try to find the requested index in one of the slots
for i in 0..=max_slot { for i in 0..=max_slot {
let get_result = db_ledger.data_cf.get_by_slot_index(&db_ledger.db, i, ix); let get_result = db_ledger.data_cf.get_by_slot_index(i, ix);
if let Ok(Some(blob_data)) = get_result { if let Ok(Some(blob_data)) = get_result {
inc_new_counter_info!("cluster_info-window-request-ledger", 1); inc_new_counter_info!("cluster_info-window-request-ledger", 1);

View File

@@ -7,7 +7,7 @@ use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, Options, WriteBatch, DB}; use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, Options, WriteBatch, DB};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
@@ -15,9 +15,9 @@ use std::borrow::Borrow;
use std::cmp::max; use std::cmp::max;
use std::io; use std::io;
use std::path::Path; use std::path::Path;
use std::sync::Arc;
// Re-export rocksdb::DBRawIterator until it can be encapsulated pub type DbLedgerRawIterator = rocksdb::DBRawIterator;
pub use rocksdb::DBRawIterator;
pub const DB_LEDGER_DIRECTORY: &str = "rocksdb"; pub const DB_LEDGER_DIRECTORY: &str = "rocksdb";
// A good value for this is the number of cores on the machine // A good value for this is the number of cores on the machine
@@ -40,8 +40,9 @@ impl std::convert::From<rocksdb::Error> for Error {
pub trait LedgerColumnFamily { pub trait LedgerColumnFamily {
type ValueType: DeserializeOwned + Serialize; type ValueType: DeserializeOwned + Serialize;
fn get(&self, db: &DB, key: &[u8]) -> Result<Option<Self::ValueType>> { fn get(&self, key: &[u8]) -> Result<Option<Self::ValueType>> {
let data_bytes = db.get_cf(self.handle(db), key)?; let db = self.db();
let data_bytes = db.get_cf(self.handle(), key)?;
if let Some(raw) = data_bytes { if let Some(raw) = data_bytes {
let result: Self::ValueType = deserialize(&raw)?; let result: Self::ValueType = deserialize(&raw)?;
@@ -51,47 +52,62 @@ pub trait LedgerColumnFamily {
} }
} }
fn get_bytes(&self, db: &DB, key: &[u8]) -> Result<Option<Vec<u8>>> { fn get_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let data_bytes = db.get_cf(self.handle(db), key)?; let db = self.db();
let data_bytes = db.get_cf(self.handle(), key)?;
Ok(data_bytes.map(|x| x.to_vec())) Ok(data_bytes.map(|x| x.to_vec()))
} }
fn put_bytes(&self, db: &DB, key: &[u8], serialized_value: &[u8]) -> Result<()> { fn put_bytes(&self, key: &[u8], serialized_value: &[u8]) -> Result<()> {
db.put_cf(self.handle(db), &key, &serialized_value)?; let db = self.db();
db.put_cf(self.handle(), &key, &serialized_value)?;
Ok(()) Ok(())
} }
fn put(&self, db: &DB, key: &[u8], value: &Self::ValueType) -> Result<()> { fn put(&self, key: &[u8], value: &Self::ValueType) -> Result<()> {
let db = self.db();
let serialized = serialize(value)?; let serialized = serialize(value)?;
db.put_cf(self.handle(db), &key, &serialized)?; db.put_cf(self.handle(), &key, &serialized)?;
Ok(()) Ok(())
} }
fn delete(&self, db: &DB, key: &[u8]) -> Result<()> { fn delete(&self, key: &[u8]) -> Result<()> {
db.delete_cf(self.handle(db), &key)?; let db = self.db();
db.delete_cf(self.handle(), &key)?;
Ok(()) Ok(())
} }
fn handle(&self, db: &DB) -> ColumnFamily; fn db(&self) -> &Arc<DB>;
fn handle(&self) -> ColumnFamily;
} }
pub trait LedgerColumnFamilyRaw { pub trait LedgerColumnFamilyRaw {
fn get(&self, db: &DB, key: &[u8]) -> Result<Option<Vec<u8>>> { fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let data_bytes = db.get_cf(self.handle(db), key)?; let db = self.db();
let data_bytes = db.get_cf(self.handle(), key)?;
Ok(data_bytes.map(|x| x.to_vec())) Ok(data_bytes.map(|x| x.to_vec()))
} }
fn put(&self, db: &DB, key: &[u8], serialized_value: &[u8]) -> Result<()> { fn put(&self, key: &[u8], serialized_value: &[u8]) -> Result<()> {
db.put_cf(self.handle(db), &key, &serialized_value)?; let db = self.db();
db.put_cf(self.handle(), &key, &serialized_value)?;
Ok(()) Ok(())
} }
fn delete(&self, db: &DB, key: &[u8]) -> Result<()> { fn delete(&self, key: &[u8]) -> Result<()> {
db.delete_cf(self.handle(db), &key)?; let db = self.db();
db.delete_cf(self.handle(), &key)?;
Ok(()) Ok(())
} }
fn handle(&self, db: &DB) -> ColumnFamily; fn raw_iterator(&self) -> DbLedgerRawIterator {
let db = self.db();
db.raw_iterator_cf(self.handle())
.expect("Expected to be able to open database iterator")
}
fn handle(&self) -> ColumnFamily;
fn db(&self) -> &Arc<DB>;
} }
#[derive(Debug, Default, Deserialize, Serialize, Eq, PartialEq)] #[derive(Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
@@ -119,10 +135,15 @@ impl SlotMeta {
} }
} }
#[derive(Default)] pub struct MetaCf {
pub struct MetaCf {} db: Arc<DB>,
}
impl MetaCf { impl MetaCf {
pub fn new(db: Arc<DB>) -> Self {
MetaCf { db }
}
pub fn key(slot_height: u64) -> Vec<u8> { pub fn key(slot_height: u64) -> Vec<u8> {
let mut key = vec![0u8; 8]; let mut key = vec![0u8; 8];
BigEndian::write_u64(&mut key[0..8], slot_height); BigEndian::write_u64(&mut key[0..8], slot_height);
@@ -133,35 +154,38 @@ impl MetaCf {
impl LedgerColumnFamily for MetaCf { impl LedgerColumnFamily for MetaCf {
type ValueType = SlotMeta; type ValueType = SlotMeta;
fn handle(&self, db: &DB) -> ColumnFamily { fn db(&self) -> &Arc<DB> {
db.cf_handle(META_CF).unwrap() &self.db
}
fn handle(&self) -> ColumnFamily {
self.db.cf_handle(META_CF).unwrap()
} }
} }
// The data column family // The data column family
#[derive(Default)] pub struct DataCf {
pub struct DataCf {} db: Arc<DB>,
}
impl DataCf { impl DataCf {
pub fn get_by_slot_index( pub fn new(db: Arc<DB>) -> Self {
&self, DataCf { db }
db: &DB, }
slot_height: u64,
index: u64, pub fn get_by_slot_index(&self, slot_height: u64, index: u64) -> Result<Option<Vec<u8>>> {
) -> Result<Option<Vec<u8>>> {
let key = Self::key(slot_height, index); let key = Self::key(slot_height, index);
self.get(db, &key) self.get(&key)
} }
pub fn put_by_slot_index( pub fn put_by_slot_index(
&self, &self,
db: &DB,
slot_height: u64, slot_height: u64,
index: u64, index: u64,
serialized_value: &[u8], serialized_value: &[u8],
) -> Result<()> { ) -> Result<()> {
let key = Self::key(slot_height, index); let key = Self::key(slot_height, index);
self.put(db, &key, serialized_value) self.put(&key, serialized_value)
} }
pub fn key(slot_height: u64, index: u64) -> Vec<u8> { pub fn key(slot_height: u64, index: u64) -> Vec<u8> {
@@ -185,40 +209,42 @@ impl DataCf {
} }
impl LedgerColumnFamilyRaw for DataCf { impl LedgerColumnFamilyRaw for DataCf {
fn handle(&self, db: &DB) -> ColumnFamily { fn db(&self) -> &Arc<DB> {
db.cf_handle(DATA_CF).unwrap() &self.db
}
fn handle(&self) -> ColumnFamily {
self.db.cf_handle(DATA_CF).unwrap()
} }
} }
// The erasure column family // The erasure column family
#[derive(Default)] pub struct ErasureCf {
pub struct ErasureCf {} db: Arc<DB>,
impl ErasureCf {
pub fn delete_by_slot_index(&self, db: &DB, slot_height: u64, index: u64) -> Result<()> {
let key = Self::key(slot_height, index);
self.delete(db, &key)
} }
pub fn get_by_slot_index( impl ErasureCf {
&self, pub fn new(db: Arc<DB>) -> Self {
db: &DB, ErasureCf { db }
slot_height: u64, }
index: u64, pub fn delete_by_slot_index(&self, slot_height: u64, index: u64) -> Result<()> {
) -> Result<Option<Vec<u8>>> {
let key = Self::key(slot_height, index); let key = Self::key(slot_height, index);
self.get(db, &key) self.delete(&key)
}
pub fn get_by_slot_index(&self, slot_height: u64, index: u64) -> Result<Option<Vec<u8>>> {
let key = Self::key(slot_height, index);
self.get(&key)
} }
pub fn put_by_slot_index( pub fn put_by_slot_index(
&self, &self,
db: &DB,
slot_height: u64, slot_height: u64,
index: u64, index: u64,
serialized_value: &[u8], serialized_value: &[u8],
) -> Result<()> { ) -> Result<()> {
let key = Self::key(slot_height, index); let key = Self::key(slot_height, index);
self.put(db, &key, serialized_value) self.put(&key, serialized_value)
} }
pub fn key(slot_height: u64, index: u64) -> Vec<u8> { pub fn key(slot_height: u64, index: u64) -> Vec<u8> {
@@ -235,15 +261,19 @@ impl ErasureCf {
} }
impl LedgerColumnFamilyRaw for ErasureCf { impl LedgerColumnFamilyRaw for ErasureCf {
fn handle(&self, db: &DB) -> ColumnFamily { fn db(&self) -> &Arc<DB> {
db.cf_handle(ERASURE_CF).unwrap() &self.db
}
fn handle(&self) -> ColumnFamily {
self.db.cf_handle(ERASURE_CF).unwrap()
} }
} }
// ledger window // ledger window
pub struct DbLedger { pub struct DbLedger {
// Underlying database is automatically closed in the Drop implementation of DB // Underlying database is automatically closed in the Drop implementation of DB
pub db: DB, db: Arc<DB>,
pub meta_cf: MetaCf, pub meta_cf: MetaCf,
pub data_cf: DataCf, pub data_cf: DataCf,
pub erasure_cf: ErasureCf, pub erasure_cf: ErasureCf,
@@ -279,16 +309,16 @@ impl DbLedger {
]; ];
// Open the database // Open the database
let db = DB::open_cf_descriptors(&db_options, ledger_path, cfs)?; let db = Arc::new(DB::open_cf_descriptors(&db_options, ledger_path, cfs)?);
// Create the metadata column family // Create the metadata column family
let meta_cf = MetaCf::default(); let meta_cf = MetaCf::new(db.clone());
// Create the data column family // Create the data column family
let data_cf = DataCf::default(); let data_cf = DataCf::new(db.clone());
// Create the erasure column family // Create the erasure column family
let erasure_cf = ErasureCf::default(); let erasure_cf = ErasureCf::new(db.clone());
Ok(DbLedger { Ok(DbLedger {
db, db,
@@ -372,7 +402,7 @@ impl DbLedger {
let mut should_write_meta = false; let mut should_write_meta = false;
let mut meta = { let mut meta = {
if let Some(meta) = self.db.get_cf(self.meta_cf.handle(&self.db), &meta_key)? { if let Some(meta) = self.db.get_cf(self.meta_cf.handle(), &meta_key)? {
deserialize(&meta)? deserialize(&meta)?
} else { } else {
should_write_meta = true; should_write_meta = true;
@@ -444,11 +474,11 @@ impl DbLedger {
} else { } else {
let key = DataCf::key(current_slot, current_index); let key = DataCf::key(current_slot, current_index);
let blob_data = { let blob_data = {
if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { if let Some(blob_data) = self.data_cf.get(&key)? {
blob_data blob_data
} else if meta.consumed < meta.received { } else if meta.consumed < meta.received {
let key = DataCf::key(current_slot + 1, current_index); let key = DataCf::key(current_slot + 1, current_index);
if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { if let Some(blob_data) = self.data_cf.get(&key)? {
current_slot += 1; current_slot += 1;
blob_data blob_data
} else { } else {
@@ -473,14 +503,14 @@ impl DbLedger {
// Commit Step: Atomic write both the metadata and the data // Commit Step: Atomic write both the metadata and the data
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
if should_write_meta { if should_write_meta {
batch.put_cf(self.meta_cf.handle(&self.db), &meta_key, &serialize(&meta)?)?; batch.put_cf(self.meta_cf.handle(), &meta_key, &serialize(&meta)?)?;
} }
for blob in new_blobs { for blob in new_blobs {
let blob = blob.borrow(); let blob = blob.borrow();
let key = DataCf::key(blob.slot()?, blob.index()?); let key = DataCf::key(blob.slot()?, blob.index()?);
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?]; let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?];
batch.put_cf(self.data_cf.handle(&self.db), &key, serialized_blob_datas)?; batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?;
} }
self.db.write(batch)?; self.db.write(batch)?;
@@ -494,7 +524,7 @@ impl DbLedger {
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
let mut meta = { let mut meta = {
if let Some(meta) = self.meta_cf.get(&self.db, &meta_key)? { if let Some(meta) = self.meta_cf.get(&meta_key)? {
let first = blobs[0].read().unwrap(); let first = blobs[0].read().unwrap();
assert_eq!(meta.consumed, first.index()?); assert_eq!(meta.consumed, first.index()?);
meta meta
@@ -512,12 +542,12 @@ impl DbLedger {
} }
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.put_cf(self.meta_cf.handle(&self.db), &meta_key, &serialize(&meta)?)?; batch.put_cf(self.meta_cf.handle(), &meta_key, &serialize(&meta)?)?;
for blob in blobs { for blob in blobs {
let blob = blob.read().unwrap(); let blob = blob.read().unwrap();
let key = DataCf::key(blob.slot()?, blob.index()?); let key = DataCf::key(blob.slot()?, blob.index()?);
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?]; let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?];
batch.put_cf(self.data_cf.handle(&self.db), &key, serialized_blob_datas)?; batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?;
} }
self.db.write(batch)?; self.db.write(batch)?;
Ok(()) Ok(())
@@ -535,7 +565,7 @@ impl DbLedger {
slot_height: u64, slot_height: u64,
) -> Result<(u64, u64)> { ) -> Result<(u64, u64)> {
let start_key = DataCf::key(slot_height, start_index); let start_key = DataCf::key(slot_height, start_index);
let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle(&self.db))?; let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?;
db_iterator.seek(&start_key); db_iterator.seek(&start_key);
let mut total_blobs = 0; let mut total_blobs = 0;
let mut total_current_size = 0; let mut total_current_size = 0;
@@ -588,7 +618,7 @@ impl DbLedger {
/// Return an iterator for all the entries in the given file. /// Return an iterator for all the entries in the given file.
pub fn read_ledger(&self) -> Result<impl Iterator<Item = Entry>> { pub fn read_ledger(&self) -> Result<impl Iterator<Item = Entry>> {
let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle(&self.db))?; let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?;
db_iterator.seek_to_first(); db_iterator.seek_to_first();
Ok(EntryIterator { db_iterator }) Ok(EntryIterator { db_iterator })
@@ -696,10 +726,10 @@ mod tests {
// Test meta column family // Test meta column family
let meta = SlotMeta::new(); let meta = SlotMeta::new();
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
ledger.meta_cf.put(&ledger.db, &meta_key, &meta).unwrap(); ledger.meta_cf.put(&meta_key, &meta).unwrap();
let result = ledger let result = ledger
.meta_cf .meta_cf
.get(&ledger.db, &meta_key) .get(&meta_key)
.unwrap() .unwrap()
.expect("Expected meta object to exist"); .expect("Expected meta object to exist");
@@ -708,14 +738,11 @@ mod tests {
// Test erasure column family // Test erasure column family
let erasure = vec![1u8; 16]; let erasure = vec![1u8; 16];
let erasure_key = ErasureCf::key(DEFAULT_SLOT_HEIGHT, 0); let erasure_key = ErasureCf::key(DEFAULT_SLOT_HEIGHT, 0);
ledger ledger.erasure_cf.put(&erasure_key, &erasure).unwrap();
.erasure_cf
.put(&ledger.db, &erasure_key, &erasure)
.unwrap();
let result = ledger let result = ledger
.erasure_cf .erasure_cf
.get(&ledger.db, &erasure_key) .get(&erasure_key)
.unwrap() .unwrap()
.expect("Expected erasure object to exist"); .expect("Expected erasure object to exist");
@@ -724,11 +751,11 @@ mod tests {
// Test data column family // Test data column family
let data = vec![2u8; 16]; let data = vec![2u8; 16];
let data_key = DataCf::key(DEFAULT_SLOT_HEIGHT, 0); let data_key = DataCf::key(DEFAULT_SLOT_HEIGHT, 0);
ledger.data_cf.put(&ledger.db, &data_key, &data).unwrap(); ledger.data_cf.put(&data_key, &data).unwrap();
let result = ledger let result = ledger
.data_cf .data_cf
.get(&ledger.db, &data_key) .get(&data_key)
.unwrap() .unwrap()
.expect("Expected data object to exist"); .expect("Expected data object to exist");
@@ -829,7 +856,7 @@ mod tests {
assert!(result.len() == 0); assert!(result.len() == 0);
let meta = ledger let meta = ledger
.meta_cf .meta_cf
.get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))
.unwrap() .unwrap()
.expect("Expected new metadata object to be created"); .expect("Expected new metadata object to be created");
assert!(meta.consumed == 0 && meta.received == 2); assert!(meta.consumed == 0 && meta.received == 2);
@@ -841,7 +868,7 @@ mod tests {
let meta = ledger let meta = ledger
.meta_cf .meta_cf
.get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))
.unwrap() .unwrap()
.expect("Expected new metadata object to exist"); .expect("Expected new metadata object to exist");
assert!(meta.consumed == 2 && meta.received == 2); assert!(meta.consumed == 2 && meta.received == 2);
@@ -871,7 +898,7 @@ mod tests {
let meta = ledger let meta = ledger
.meta_cf .meta_cf
.get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))
.unwrap() .unwrap()
.expect("Expected metadata object to exist"); .expect("Expected metadata object to exist");
if i != 0 { if i != 0 {
@@ -913,7 +940,7 @@ mod tests {
let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap(); let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap();
let meta = ledger let meta = ledger
.meta_cf .meta_cf
.get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))
.unwrap() .unwrap()
.expect("Expected metadata object to exist"); .expect("Expected metadata object to exist");
if i != 0 { if i != 0 {
@@ -933,7 +960,6 @@ mod tests {
#[test] #[test]
pub fn test_iteration_order() { pub fn test_iteration_order() {
let slot = 0; let slot = 0;
// Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_iteration_order"); let db_ledger_path = get_tmp_ledger_path("test_iteration_order");
{ {
let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
@@ -956,7 +982,7 @@ mod tests {
); );
let mut db_iterator = db_ledger let mut db_iterator = db_ledger
.db .db
.raw_iterator_cf(db_ledger.data_cf.handle(&db_ledger.db)) .raw_iterator_cf(db_ledger.data_cf.handle())
.expect("Expected to be able to open database iterator"); .expect("Expected to be able to open database iterator");
db_iterator.seek(&DataCf::key(slot, 1)); db_iterator.seek(&DataCf::key(slot, 1));
@@ -976,7 +1002,6 @@ mod tests {
#[test] #[test]
pub fn test_insert_data_blobs_bulk() { pub fn test_insert_data_blobs_bulk() {
// Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_bulk"); let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_bulk");
{ {
let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
@@ -1006,11 +1031,7 @@ mod tests {
); );
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
let meta = db_ledger let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap();
.meta_cf
.get(&db_ledger.db, &meta_key)
.unwrap()
.unwrap();
assert_eq!(meta.consumed, num_entries); assert_eq!(meta.consumed, num_entries);
assert_eq!(meta.received, num_entries); assert_eq!(meta.received, num_entries);
assert_eq!(meta.consumed_slot, num_entries - 1); assert_eq!(meta.consumed_slot, num_entries - 1);
@@ -1082,7 +1103,6 @@ mod tests {
#[test] #[test]
pub fn test_write_consecutive_blobs() { pub fn test_write_consecutive_blobs() {
// Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_write_consecutive_blobs"); let db_ledger_path = get_tmp_ledger_path("test_write_consecutive_blobs");
{ {
let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
@@ -1102,11 +1122,7 @@ mod tests {
.expect("Expect successful blob writes"); .expect("Expect successful blob writes");
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
let meta = db_ledger let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap();
.meta_cf
.get(&db_ledger.db, &meta_key)
.unwrap()
.unwrap();
assert_eq!(meta.consumed, num_entries); assert_eq!(meta.consumed, num_entries);
assert_eq!(meta.received, num_entries); assert_eq!(meta.received, num_entries);
assert_eq!(meta.consumed_slot, num_entries - 1); assert_eq!(meta.consumed_slot, num_entries - 1);
@@ -1122,11 +1138,7 @@ mod tests {
.write_consecutive_blobs(&shared_blobs) .write_consecutive_blobs(&shared_blobs)
.expect("Expect successful blob writes"); .expect("Expect successful blob writes");
let meta = db_ledger let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap();
.meta_cf
.get(&db_ledger.db, &meta_key)
.unwrap()
.unwrap();
assert_eq!(meta.consumed, 2 * num_entries); assert_eq!(meta.consumed, 2 * num_entries);
assert_eq!(meta.received, 2 * num_entries); assert_eq!(meta.received, 2 * num_entries);
assert_eq!(meta.consumed_slot, 2 * num_entries - 1); assert_eq!(meta.consumed_slot, 2 * num_entries - 1);
@@ -1137,7 +1149,6 @@ mod tests {
#[test] #[test]
pub fn test_genesis_and_entry_iterator() { pub fn test_genesis_and_entry_iterator() {
// Create RocksDb ledger
let entries = make_tiny_test_entries(100); let entries = make_tiny_test_entries(100);
let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator"); let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator");
{ {

View File

@@ -31,9 +31,7 @@ pub fn repair(
) -> Result<Vec<(SocketAddr, Vec<u8>)>> { ) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
let rcluster_info = cluster_info.read().unwrap(); let rcluster_info = cluster_info.read().unwrap();
let mut is_next_leader = false; let mut is_next_leader = false;
let meta = db_ledger let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))?;
.meta_cf
.get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?;
if meta.is_none() { if meta.is_none() {
return Ok(vec![]); return Ok(vec![]);
} }
@@ -121,7 +119,7 @@ pub fn repair(
// Given a start and end entry index, find all the missing // Given a start and end entry index, find all the missing
// indexes in the ledger in the range [start_index, end_index) // indexes in the ledger in the range [start_index, end_index)
pub fn find_missing_indexes( pub fn find_missing_indexes(
db_iterator: &mut DBRawIterator, db_iterator: &mut DbLedgerRawIterator,
slot: u64, slot: u64,
start_index: u64, start_index: u64,
end_index: u64, end_index: u64,
@@ -178,10 +176,7 @@ pub fn find_missing_data_indexes(
end_index: u64, end_index: u64,
max_missing: usize, max_missing: usize,
) -> Vec<u64> { ) -> Vec<u64> {
let mut db_iterator = db_ledger let mut db_iterator = db_ledger.data_cf.raw_iterator();
.db
.raw_iterator_cf(db_ledger.data_cf.handle(&db_ledger.db))
.expect("Expected to be able to open database iterator");
find_missing_indexes( find_missing_indexes(
&mut db_iterator, &mut db_iterator,
@@ -201,10 +196,7 @@ pub fn find_missing_coding_indexes(
end_index: u64, end_index: u64,
max_missing: usize, max_missing: usize,
) -> Vec<u64> { ) -> Vec<u64> {
let mut db_iterator = db_ledger let mut db_iterator = db_ledger.erasure_cf.raw_iterator();
.db
.raw_iterator_cf(db_ledger.erasure_cf.handle(&db_ledger.db))
.expect("Expected to be able to open database iterator");
find_missing_indexes( find_missing_indexes(
&mut db_iterator, &mut db_iterator,
@@ -302,11 +294,9 @@ pub fn process_blob(
let erasure_key = ErasureCf::key(slot, pix); let erasure_key = ErasureCf::key(slot, pix);
let rblob = &blob.read().unwrap(); let rblob = &blob.read().unwrap();
let size = rblob.size()?; let size = rblob.size()?;
db_ledger.erasure_cf.put( db_ledger
&db_ledger.db, .erasure_cf
&erasure_key, .put(&erasure_key, &rblob.data[..BLOB_HEADER_SIZE + size])?;
&rblob.data[..BLOB_HEADER_SIZE + size],
)?;
vec![] vec![]
} else { } else {
db_ledger.insert_data_blobs(vec![&*blob.read().unwrap()])? db_ledger.insert_data_blobs(vec![&*blob.read().unwrap()])?
@@ -335,7 +325,7 @@ pub fn process_blob(
if max_ix != 0 && !consumed_entries.is_empty() { if max_ix != 0 && !consumed_entries.is_empty() {
let meta = db_ledger let meta = db_ledger
.meta_cf .meta_cf
.get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))? .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))?
.expect("Expect metadata to exist if consumed entries is nonzero"); .expect("Expect metadata to exist if consumed entries is nonzero");
let consumed = meta.consumed; let consumed = meta.consumed;
@@ -376,9 +366,7 @@ pub fn calculate_max_repair_entry_height(
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
fn try_erasure(db_ledger: &Arc<DbLedger>, consume_queue: &mut Vec<Entry>) -> Result<()> { fn try_erasure(db_ledger: &Arc<DbLedger>, consume_queue: &mut Vec<Entry>) -> Result<()> {
let meta = db_ledger let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))?;
.meta_cf
.get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?;
if let Some(meta) = meta { if let Some(meta) = meta {
let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?; let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?;
@@ -389,11 +377,9 @@ fn try_erasure(db_ledger: &Arc<DbLedger>, consume_queue: &mut Vec<Entry>) -> Res
cl.index().expect("Recovered blob must set index"), cl.index().expect("Recovered blob must set index"),
); );
let size = cl.size().expect("Recovered blob must set size"); let size = cl.size().expect("Recovered blob must set size");
db_ledger.erasure_cf.put( db_ledger
&db_ledger.db, .erasure_cf
&erasure_key, .put(&erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?;
&cl.data[..BLOB_HEADER_SIZE + size],
)?;
} }
let entries = db_ledger.write_shared_blobs(data)?; let entries = db_ledger.write_shared_blobs(data)?;
@@ -741,7 +727,7 @@ mod test {
assert_eq!( assert_eq!(
&db_ledger &db_ledger
.erasure_cf .erasure_cf
.get_by_slot_index(&db_ledger.db, slot_height, erase_offset as u64) .get_by_slot_index(slot_height, erase_offset as u64)
.unwrap() .unwrap()
.unwrap()[BLOB_HEADER_SIZE..], .unwrap()[BLOB_HEADER_SIZE..],
&erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize], &erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize],

View File

@@ -410,9 +410,7 @@ pub fn recover(
// Add the data blobs we have into the recovery vector, mark the missing ones // Add the data blobs we have into the recovery vector, mark the missing ones
for i in block_start_idx..block_end_idx { for i in block_start_idx..block_end_idx {
let result = db_ledger let result = db_ledger.data_cf.get_by_slot_index(slot, i)?;
.data_cf
.get_by_slot_index(&db_ledger.db, slot, i)?;
categorize_blob( categorize_blob(
&result, &result,
@@ -425,9 +423,7 @@ pub fn recover(
// Add the coding blobs we have into the recovery vector, mark the missing ones // Add the coding blobs we have into the recovery vector, mark the missing ones
for i in coding_start_idx..block_end_idx { for i in coding_start_idx..block_end_idx {
let result = db_ledger let result = db_ledger.erasure_cf.get_by_slot_index(slot, i)?;
.erasure_cf
.get_by_slot_index(&db_ledger.db, slot, i)?;
categorize_blob( categorize_blob(
&result, &result,
@@ -520,9 +516,7 @@ pub fn recover(
// Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct // Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct
// the blobs again // the blobs again
for i in coding_start_idx..block_end_idx { for i in coding_start_idx..block_end_idx {
db_ledger db_ledger.erasure_cf.delete_by_slot_index(slot, i)?;
.erasure_cf
.delete_by_slot_index(&db_ledger.db, slot, i)?;
} }
return Ok((vec![], vec![])); return Ok((vec![], vec![]));
} }
@@ -638,7 +632,6 @@ pub mod test {
db_ledger db_ledger
.data_cf .data_cf
.put_by_slot_index( .put_by_slot_index(
&db_ledger.db,
data_l.slot().unwrap(), data_l.slot().unwrap(),
data_l.index().unwrap(), data_l.index().unwrap(),
&data_l.data[..data_l.data_size().unwrap() as usize], &data_l.data[..data_l.data_size().unwrap() as usize],
@@ -665,7 +658,6 @@ pub mod test {
db_ledger db_ledger
.erasure_cf .erasure_cf
.put_by_slot_index( .put_by_slot_index(
&db_ledger.db,
coding_lock.slot().unwrap(), coding_lock.slot().unwrap(),
index, index,
&coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE], &coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE],

View File

@@ -164,9 +164,7 @@ pub fn window_service(
} }
} }
let meta = db_ledger let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT));
.meta_cf
.get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT));
if let Ok(Some(meta)) = meta { if let Ok(Some(meta)) = meta {
let received = meta.received; let received = meta.received;