KvStore - A data-store to support BlockTree (#2897)

* Mostly implement key-value store and add integration points

Essential key-value store functionality is implemented, needs more work to be integrated, tested, and activated.

Behind the `kvstore` feature.
This commit is contained in:
Mark
2019-03-11 17:53:14 -05:00
committed by GitHub
parent 3073ebb20d
commit 56b0ba2601
22 changed files with 3366 additions and 370 deletions

View File

@@ -3,121 +3,81 @@
//! access read to a persistent file-based ledger.
use crate::entry::Entry;
#[cfg(feature = "kvstore")]
use crate::kvstore;
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::result::{Error, Result};
use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use hashbrown::HashMap;
use rocksdb::{
ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, IteratorMode, Options, WriteBatch, DB,
};
use serde::de::DeserializeOwned;
#[cfg(not(feature = "kvstore"))]
use rocksdb;
use serde::Serialize;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
use std::borrow::{Borrow, Cow};
use std::cell::RefCell;
use std::cmp;
use std::fs;
use std::io;
use std::path::Path;
use std::rc::Rc;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc;
pub type BlocktreeRawIterator = rocksdb::DBRawIterator;
mod db;
#[cfg(feature = "kvstore")]
mod kvs;
#[cfg(not(feature = "kvstore"))]
mod rocks;
#[cfg(feature = "kvstore")]
use self::kvs::{DataCf, ErasureCf, Kvs, MetaCf};
#[cfg(not(feature = "kvstore"))]
use self::rocks::{DataCf, ErasureCf, MetaCf, Rocks};
pub use db::{
Cursor, Database, IDataCf, IErasureCf, IMetaCf, IWriteBatch, LedgerColumnFamily,
LedgerColumnFamilyRaw,
};
#[cfg(not(feature = "kvstore"))]
pub type BlocktreeRawIterator = <Rocks as Database>::Cursor;
#[cfg(feature = "kvstore")]
pub type BlocktreeRawIterator = <Kvs as Database>::Cursor;
#[cfg(not(feature = "kvstore"))]
pub type WriteBatch = <Rocks as Database>::WriteBatch;
#[cfg(feature = "kvstore")]
pub type WriteBatch = <Kvs as Database>::WriteBatch;
#[cfg(not(feature = "kvstore"))]
type KeyRef = <Rocks as Database>::KeyRef;
#[cfg(feature = "kvstore")]
type KeyRef = <Kvs as Database>::KeyRef;
#[cfg(not(feature = "kvstore"))]
pub type Key = <Rocks as Database>::Key;
#[cfg(feature = "kvstore")]
pub type Key = <Kvs as Database>::Key;
#[cfg(not(feature = "kvstore"))]
pub const BLOCKTREE_DIRECTORY: &str = "rocksdb";
// A good value for this is the number of cores on the machine
const TOTAL_THREADS: i32 = 8;
const MAX_WRITE_BUFFER_SIZE: usize = 512 * 1024 * 1024;
#[cfg(feature = "kvstore")]
pub const BLOCKTREE_DIRECTORY: &str = "kvstore";
#[derive(Debug)]
pub enum BlocktreeError {
BlobForIndexExists,
InvalidBlobData,
RocksDb(rocksdb::Error),
}
impl std::convert::From<rocksdb::Error> for Error {
fn from(e: rocksdb::Error) -> Error {
Error::BlocktreeError(BlocktreeError::RocksDb(e))
}
}
pub trait LedgerColumnFamily {
type ValueType: DeserializeOwned + Serialize;
fn get(&self, key: &[u8]) -> Result<Option<Self::ValueType>> {
let db = self.db();
let data_bytes = db.get_cf(self.handle(), key)?;
if let Some(raw) = data_bytes {
let result: Self::ValueType = deserialize(&raw)?;
Ok(Some(result))
} else {
Ok(None)
}
}
fn get_bytes(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let db = self.db();
let data_bytes = db.get_cf(self.handle(), key)?;
Ok(data_bytes.map(|x| x.to_vec()))
}
fn put_bytes(&self, key: &[u8], serialized_value: &[u8]) -> Result<()> {
let db = self.db();
db.put_cf(self.handle(), &key, &serialized_value)?;
Ok(())
}
fn put(&self, key: &[u8], value: &Self::ValueType) -> Result<()> {
let db = self.db();
let serialized = serialize(value)?;
db.put_cf(self.handle(), &key, &serialized)?;
Ok(())
}
fn delete(&self, key: &[u8]) -> Result<()> {
let db = self.db();
db.delete_cf(self.handle(), &key)?;
Ok(())
}
fn db(&self) -> &Arc<DB>;
fn handle(&self) -> ColumnFamily;
}
pub trait LedgerColumnFamilyRaw {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let db = self.db();
let data_bytes = db.get_cf(self.handle(), key)?;
Ok(data_bytes.map(|x| x.to_vec()))
}
fn put(&self, key: &[u8], serialized_value: &[u8]) -> Result<()> {
let db = self.db();
db.put_cf(self.handle(), &key, &serialized_value)?;
Ok(())
}
fn delete(&self, key: &[u8]) -> Result<()> {
let db = self.db();
db.delete_cf(self.handle(), &key)?;
Ok(())
}
fn raw_iterator(&self) -> BlocktreeRawIterator {
let db = self.db();
db.raw_iterator_cf(self.handle())
.expect("Expected to be able to open database iterator")
}
fn handle(&self) -> ColumnFamily;
fn db(&self) -> &Arc<DB>;
#[cfg(feature = "kvstore")]
KvsDb(kvstore::Error),
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
@@ -171,156 +131,13 @@ impl SlotMeta {
}
}
pub struct MetaCf {
db: Arc<DB>,
}
impl MetaCf {
pub fn new(db: Arc<DB>) -> Self {
MetaCf { db }
}
pub fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0u8; 8];
BigEndian::write_u64(&mut key[0..8], slot);
key
}
pub fn get_slot_meta(&self, slot: u64) -> Result<Option<SlotMeta>> {
let key = Self::key(slot);
self.get(&key)
}
pub fn put_slot_meta(&self, slot: u64, slot_meta: &SlotMeta) -> Result<()> {
let key = Self::key(slot);
self.put(&key, slot_meta)
}
pub fn index_from_key(key: &[u8]) -> Result<u64> {
let mut rdr = io::Cursor::new(&key[..]);
let index = rdr.read_u64::<BigEndian>()?;
Ok(index)
}
}
impl LedgerColumnFamily for MetaCf {
type ValueType = SlotMeta;
fn db(&self) -> &Arc<DB> {
&self.db
}
fn handle(&self) -> ColumnFamily {
self.db.cf_handle(META_CF).unwrap()
}
}
// The data column family
pub struct DataCf {
db: Arc<DB>,
}
impl DataCf {
pub fn new(db: Arc<DB>) -> Self {
DataCf { db }
}
pub fn get_by_slot_index(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
let key = Self::key(slot, index);
self.get(&key)
}
pub fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> {
let key = Self::key(slot, index);
self.delete(&key)
}
pub fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> {
let key = Self::key(slot, index);
self.put(&key, serialized_value)
}
pub fn key(slot: u64, index: u64) -> Vec<u8> {
let mut key = vec![0u8; 16];
BigEndian::write_u64(&mut key[0..8], slot);
BigEndian::write_u64(&mut key[8..16], index);
key
}
pub fn slot_from_key(key: &[u8]) -> Result<u64> {
let mut rdr = io::Cursor::new(&key[0..8]);
let height = rdr.read_u64::<BigEndian>()?;
Ok(height)
}
pub fn index_from_key(key: &[u8]) -> Result<u64> {
let mut rdr = io::Cursor::new(&key[8..16]);
let index = rdr.read_u64::<BigEndian>()?;
Ok(index)
}
}
impl LedgerColumnFamilyRaw for DataCf {
fn db(&self) -> &Arc<DB> {
&self.db
}
fn handle(&self) -> ColumnFamily {
self.db.cf_handle(DATA_CF).unwrap()
}
}
// The erasure column family
pub struct ErasureCf {
db: Arc<DB>,
}
impl ErasureCf {
pub fn new(db: Arc<DB>) -> Self {
ErasureCf { db }
}
pub fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> {
let key = Self::key(slot, index);
self.delete(&key)
}
pub fn get_by_slot_index(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
let key = Self::key(slot, index);
self.get(&key)
}
pub fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> {
let key = Self::key(slot, index);
self.put(&key, serialized_value)
}
pub fn key(slot: u64, index: u64) -> Vec<u8> {
DataCf::key(slot, index)
}
pub fn slot_from_key(key: &[u8]) -> Result<u64> {
DataCf::slot_from_key(key)
}
pub fn index_from_key(key: &[u8]) -> Result<u64> {
DataCf::index_from_key(key)
}
}
impl LedgerColumnFamilyRaw for ErasureCf {
fn db(&self) -> &Arc<DB> {
&self.db
}
fn handle(&self) -> ColumnFamily {
self.db.cf_handle(ERASURE_CF).unwrap()
}
}
// ledger window
pub struct Blocktree {
// Underlying database is automatically closed in the Drop implementation of DB
db: Arc<DB>,
#[cfg(not(feature = "kvstore"))]
db: Arc<Rocks>,
#[cfg(feature = "kvstore")]
db: Arc<Kvs>,
meta_cf: MetaCf,
data_cf: DataCf,
erasure_cf: ErasureCf,
@@ -336,47 +153,6 @@ pub const DATA_CF: &str = "data";
pub const ERASURE_CF: &str = "erasure";
impl Blocktree {
// Opens a Ledger in directory, provides "infinite" window of blobs
pub fn open(ledger_path: &str) -> Result<Self> {
fs::create_dir_all(&ledger_path)?;
let ledger_path = Path::new(ledger_path).join(BLOCKTREE_DIRECTORY);
// Use default database options
let db_options = Self::get_db_options();
// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(META_CF, Self::get_cf_options());
let data_cf_descriptor = ColumnFamilyDescriptor::new(DATA_CF, Self::get_cf_options());
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(ERASURE_CF, Self::get_cf_options());
let cfs = vec![
meta_cf_descriptor,
data_cf_descriptor,
erasure_cf_descriptor,
];
// Open the database
let db = Arc::new(DB::open_cf_descriptors(&db_options, ledger_path, cfs)?);
// Create the metadata column family
let meta_cf = MetaCf::new(db.clone());
// Create the data column family
let data_cf = DataCf::new(db.clone());
// Create the erasure column family
let erasure_cf = ErasureCf::new(db.clone());
let ticks_per_slot = DEFAULT_TICKS_PER_SLOT;
Ok(Blocktree {
db,
meta_cf,
data_cf,
erasure_cf,
new_blobs_signals: vec![],
ticks_per_slot,
})
}
pub fn open_with_signal(ledger_path: &str) -> Result<(Self, Receiver<bool>)> {
let mut blocktree = Self::open(ledger_path)?;
let (signal_sender, signal_receiver) = sync_channel(1);
@@ -422,14 +198,6 @@ impl Blocktree {
Ok(())
}
pub fn destroy(ledger_path: &str) -> Result<()> {
// DB::destroy() fails if `ledger_path` doesn't exist
fs::create_dir_all(&ledger_path)?;
let ledger_path = Path::new(ledger_path).join(BLOCKTREE_DIRECTORY);
DB::destroy(&Options::default(), &ledger_path)?;
Ok(())
}
pub fn get_next_slot(&self, slot: u64) -> Result<Option<u64>> {
let mut db_iterator = self.db.raw_iterator_cf(self.meta_cf.handle())?;
db_iterator.seek(&MetaCf::key(slot + 1));
@@ -526,7 +294,7 @@ impl Blocktree {
I: IntoIterator,
I::Item: Borrow<Blob>,
{
let mut write_batch = WriteBatch::default();
let mut write_batch = self.db.batch()?;
// A map from slot to a 2-tuple of metadata: (working copy, backup copy),
// so we can detect changes to the slot metadata later
let mut slot_meta_working_set = HashMap::new();
@@ -672,24 +440,6 @@ impl Blocktree {
Ok((total_blobs, total_current_size as u64))
}
/// Return an iterator for all the entries in the given file.
pub fn read_ledger(&self) -> Result<impl Iterator<Item = Entry>> {
let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?;
db_iterator.seek_to_first();
Ok(EntryIterator {
db_iterator,
blockhash: None,
})
}
pub fn read_ledger_blobs(&self) -> impl Iterator<Item = Blob> {
self.db
.iterator_cf(self.data_cf.handle(), IteratorMode::Start)
.unwrap()
.map(|(_, blob_data)| Blob::new(&blob_data))
}
pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
self.erasure_cf.get_by_slot_index(slot, index)
}
@@ -703,7 +453,7 @@ impl Blocktree {
self.erasure_cf.put_by_slot_index(slot, index, bytes)
}
pub fn put_data_raw(&self, key: &[u8], value: &[u8]) -> Result<()> {
pub fn put_data_raw(&self, key: &KeyRef, value: &[u8]) -> Result<()> {
self.data_cf.put(key, value)
}
@@ -738,9 +488,9 @@ impl Blocktree {
slot: u64,
start_index: u64,
end_index: u64,
key: &dyn Fn(u64, u64) -> Vec<u8>,
slot_from_key: &dyn Fn(&[u8]) -> Result<u64>,
index_from_key: &dyn Fn(&[u8]) -> Result<u64>,
key: &dyn Fn(u64, u64) -> Key,
slot_from_key: &dyn Fn(&KeyRef) -> Result<u64>,
index_from_key: &dyn Fn(&KeyRef) -> Result<u64>,
max_missing: usize,
) -> Vec<u64> {
if start_index >= end_index || max_missing == 0 {
@@ -897,27 +647,6 @@ impl Blocktree {
.collect()
}
fn get_cf_options() -> Options {
let mut options = Options::default();
options.set_max_write_buffer_number(32);
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE);
options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64);
options
}
fn get_db_options() -> Options {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
options.increase_parallelism(TOTAL_THREADS);
options.set_max_background_flushes(4);
options.set_max_background_compactions(4);
options.set_max_write_buffer_number(32);
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE);
options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64);
options
}
fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option<SlotMeta>) -> bool {
// We should signal that there are updates if we extended the chain of consecutive blocks starting
// from block 0, which is true iff:
@@ -1204,7 +933,7 @@ impl Blocktree {
bootstrap_meta.received = last.index() + 1;
bootstrap_meta.is_rooted = true;
let mut batch = WriteBatch::default();
let mut batch = self.db.batch()?;
batch.put_cf(
self.meta_cf.handle(),
&meta_key,
@@ -1220,45 +949,6 @@ impl Blocktree {
}
}
// TODO: all this goes away with Blocktree
struct EntryIterator {
db_iterator: DBRawIterator,
// TODO: remove me when replay_stage is iterating by block (Blocktree)
// this verification is duplicating that of replay_stage, which
// can do this in parallel
blockhash: Option<Hash>,
// https://github.com/rust-rocksdb/rust-rocksdb/issues/234
// rocksdb issue: the _blocktree member must be lower in the struct to prevent a crash
// when the db_iterator member above is dropped.
// _blocktree is unused, but dropping _blocktree results in a broken db_iterator
// you have to hold the database open in order to iterate over it, and in order
// for db_iterator to be able to run Drop
// _blocktree: Blocktree,
}
impl Iterator for EntryIterator {
type Item = Entry;
fn next(&mut self) -> Option<Entry> {
if self.db_iterator.valid() {
if let Some(value) = self.db_iterator.value() {
if let Ok(entry) = deserialize::<Entry>(&value[BLOB_HEADER_SIZE..]) {
if let Some(blockhash) = self.blockhash {
if !entry.verify(&blockhash) {
return None;
}
}
self.db_iterator.next();
self.blockhash = Some(entry.hash);
return Some(entry);
}
}
}
None
}
}
// Creates a new ledger with slot 0 full of ticks (and only ticks).
//
// Returns the blockhash that can be used to append entries with.