Remove Backend trait (#6407)

This commit is contained in:
TristanDebrunner
2019-10-17 15:19:27 -06:00
committed by GitHub
parent d865f1f0c5
commit 9e52d11ad0
7 changed files with 527 additions and 624 deletions

View File

@ -36,28 +36,19 @@ mod db;
mod meta;
mod rooted_slot_iterator;
macro_rules! db_imports {
{ $mod:ident, $db:ident, $db_path:expr } => {
mod $mod;
pub use db::columns;
use db::{columns as cf, IteratorDirection, IteratorMode};
use $mod::$db;
use db::{columns as cf, IteratorMode, IteratorDirection};
pub use db::columns;
pub type Database = db::Database;
pub type Cursor<C> = db::Cursor<C>;
pub type LedgerColumn<C> = db::LedgerColumn<C>;
pub type WriteBatch = db::WriteBatch;
type BatchProcessor = db::BatchProcessor;
pub type Database = db::Database<$db>;
pub type Cursor<C> = db::Cursor<$db, C>;
pub type LedgerColumn<C> = db::LedgerColumn<$db, C>;
pub type WriteBatch = db::WriteBatch<$db>;
type BatchProcessor = db::BatchProcessor<$db>;
pub trait Column: db::Column {}
impl<C: db::Column> Column for C {}
pub trait Column: db::Column<$db> {}
impl<C: db::Column<$db>> Column for C {}
pub const BLOCKTREE_DIRECTORY: &str = $db_path;
};
}
db_imports! {rocks, Rocks, "rocksdb"}
pub const BLOCKTREE_DIRECTORY: &str = "rocksdb";
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;

View File

@ -1,6 +1,7 @@
use crate::blocktree::{BlocktreeError, Result};
use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder};
use serde::de::DeserializeOwned;
use serde::Serialize;
@ -8,10 +9,21 @@ use serde::Serialize;
use solana_sdk::clock::Slot;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fs;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction,
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
};
// A good value for this is the number of cores on the machine
const TOTAL_THREADS: i32 = 8;
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
const MIN_WRITE_BUFFER_SIZE: u64 = 64 * 1024; // 64KB
pub enum IteratorMode<Index> {
Start,
End,
@ -57,153 +69,434 @@ pub mod columns {
pub struct ShredCode;
}
pub trait Backend: Sized + Send + Sync {
type Key: ?Sized + ToOwned<Owned = Self::OwnedKey>;
type OwnedKey: Borrow<Self::Key>;
type ColumnFamily: Clone;
type Cursor: DbCursor<Self>;
type Iter: Iterator<Item = (Box<Self::Key>, Box<[u8]>)>;
type WriteBatch: IWriteBatch<Self>;
type Error: Into<BlocktreeError>;
#[derive(Debug)]
pub struct Rocks(rocksdb::DB);
fn open(path: &Path) -> Result<Self>;
impl Rocks {
fn open(path: &Path) -> Result<Rocks> {
use crate::blocktree::db::columns::{
DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta,
};
fn columns(&self) -> Vec<&'static str>;
fs::create_dir_all(&path)?;
fn destroy(path: &Path) -> Result<()>;
// Use default database options
let db_options = get_db_options();
fn cf_handle(&self, cf: &str) -> Self::ColumnFamily;
// Column family names
let meta_cf_descriptor =
ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(SlotMeta::NAME));
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(DeadSlots::NAME));
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(ErasureMeta::NAME));
let orphans_cf_descriptor =
ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(Orphans::NAME));
let root_cf_descriptor =
ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(Root::NAME));
let index_cf_descriptor =
ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(Index::NAME));
let shred_data_cf_descriptor =
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(ShredData::NAME));
let shred_code_cf_descriptor =
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(ShredCode::NAME));
fn get_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<Option<Vec<u8>>>;
let cfs = vec![
meta_cf_descriptor,
dead_slots_cf_descriptor,
erasure_meta_cf_descriptor,
orphans_cf_descriptor,
root_cf_descriptor,
index_cf_descriptor,
shred_data_cf_descriptor,
shred_code_cf_descriptor,
];
fn put_cf(&self, cf: Self::ColumnFamily, key: &Self::Key, value: &[u8]) -> Result<()>;
// Open the database
let db = Rocks(DB::open_cf_descriptors(&db_options, path, cfs)?);
fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<()>;
Ok(db)
}
fn columns(&self) -> Vec<&'static str> {
use crate::blocktree::db::columns::{
DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta,
};
vec![
ErasureMeta::NAME,
DeadSlots::NAME,
Index::NAME,
Orphans::NAME,
Root::NAME,
SlotMeta::NAME,
ShredData::NAME,
ShredCode::NAME,
]
}
fn destroy(path: &Path) -> Result<()> {
DB::destroy(&Options::default(), path)?;
Ok(())
}
fn cf_handle(&self, cf: &str) -> ColumnFamily {
self.0
.cf_handle(cf)
.expect("should never get an unknown column")
}
fn get_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> {
let opt = self.0.get_cf(cf, key)?.map(|db_vec| db_vec.to_vec());
Ok(opt)
}
fn put_cf(&self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
self.0.put_cf(cf, key, value)?;
Ok(())
}
fn delete_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<()> {
self.0.delete_cf(cf, key)?;
Ok(())
}
fn iterator_cf(
&self,
cf: Self::ColumnFamily,
iterator_mode: IteratorMode<&Self::Key>,
) -> Result<Self::Iter>;
cf: ColumnFamily,
iterator_mode: IteratorMode<&[u8]>,
) -> Result<DBIterator> {
let iter = {
match iterator_mode {
IteratorMode::Start => self.0.iterator_cf(cf, RocksIteratorMode::Start)?,
IteratorMode::End => self.0.iterator_cf(cf, RocksIteratorMode::End)?,
IteratorMode::From(start_from, direction) => {
let rocks_direction = match direction {
IteratorDirection::Forward => Direction::Forward,
IteratorDirection::Reverse => Direction::Reverse,
};
self.0
.iterator_cf(cf, RocksIteratorMode::From(start_from, rocks_direction))?
}
}
};
fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result<Self::Cursor>;
Ok(iter)
}
fn write(&self, batch: Self::WriteBatch) -> Result<()>;
fn raw_iterator_cf(&self, cf: ColumnFamily) -> Result<DBRawIterator> {
let raw_iter = self.0.raw_iterator_cf(cf)?;
fn batch(&self) -> Result<Self::WriteBatch>;
Ok(raw_iter)
}
fn batch(&self) -> Result<RWriteBatch> {
Ok(RWriteBatch::default())
}
fn write(&self, batch: RWriteBatch) -> Result<()> {
self.0.write(batch)?;
Ok(())
}
}
pub trait Column<B>
where
B: Backend,
{
pub trait Column {
const NAME: &'static str;
type Index;
fn key(index: Self::Index) -> B::OwnedKey;
fn index(key: &B::Key) -> Self::Index;
fn key(index: Self::Index) -> Vec<u8>;
fn index(key: &[u8]) -> Self::Index;
fn slot(index: Self::Index) -> Slot;
fn as_index(slot: Slot) -> Self::Index;
}
pub trait DbCursor<B>
where
B: Backend,
{
pub trait DbCursor {
fn valid(&self) -> bool;
fn seek(&mut self, key: &B::Key);
fn seek(&mut self, key: &[u8]);
fn seek_to_first(&mut self);
fn next(&mut self);
fn key(&self) -> Option<B::OwnedKey>;
fn key(&self) -> Option<Vec<u8>>;
fn value(&self) -> Option<Vec<u8>>;
}
pub trait IWriteBatch<B>
where
B: Backend,
{
fn put_cf(&mut self, cf: B::ColumnFamily, key: &B::Key, value: &[u8]) -> Result<()>;
fn delete_cf(&mut self, cf: B::ColumnFamily, key: &B::Key) -> Result<()>;
pub trait IWriteBatch {
fn put_cf(&mut self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<()>;
fn delete_cf(&mut self, cf: ColumnFamily, key: &[u8]) -> Result<()>;
}
pub trait TypedColumn<B>: Column<B>
where
B: Backend,
{
pub trait TypedColumn: Column {
type Type: Serialize + DeserializeOwned;
}
#[derive(Debug, Clone)]
pub struct Database<B>
where
B: Backend,
{
backend: Arc<B>,
impl Column for columns::ShredCode {
const NAME: &'static str = super::CODE_SHRED_CF;
type Index = (u64, u64);
fn key(index: (u64, u64)) -> Vec<u8> {
columns::ShredData::key(index)
}
fn index(key: &[u8]) -> (u64, u64) {
columns::ShredData::index(key)
}
fn slot(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl Column for columns::ShredData {
const NAME: &'static str = super::DATA_SHRED_CF;
type Index = (u64, u64);
fn key((slot, index): (u64, u64)) -> Vec<u8> {
let mut key = vec![0; 16];
BigEndian::write_u64(&mut key[..8], slot);
BigEndian::write_u64(&mut key[8..16], index);
key
}
fn index(key: &[u8]) -> (u64, u64) {
let slot = BigEndian::read_u64(&key[..8]);
let index = BigEndian::read_u64(&key[8..16]);
(slot, index)
}
fn slot(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl Column for columns::Index {
const NAME: &'static str = super::INDEX_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn for columns::Index {
type Type = crate::blocktree::meta::Index;
}
impl Column for columns::DeadSlots {
const NAME: &'static str = super::DEAD_SLOTS_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn for columns::DeadSlots {
type Type = bool;
}
impl Column for columns::Orphans {
const NAME: &'static str = super::ORPHANS_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn for columns::Orphans {
type Type = bool;
}
impl Column for columns::Root {
const NAME: &'static str = super::ROOT_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn for columns::Root {
type Type = bool;
}
impl Column for columns::SlotMeta {
const NAME: &'static str = super::META_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn for columns::SlotMeta {
type Type = super::SlotMeta;
}
impl Column for columns::ErasureMeta {
const NAME: &'static str = super::ERASURE_META_CF;
type Index = (u64, u64);
fn index(key: &[u8]) -> (u64, u64) {
let slot = BigEndian::read_u64(&key[..8]);
let set_index = BigEndian::read_u64(&key[8..]);
(slot, set_index)
}
fn key((slot, set_index): (u64, u64)) -> Vec<u8> {
let mut key = vec![0; 16];
BigEndian::write_u64(&mut key[..8], slot);
BigEndian::write_u64(&mut key[8..], set_index);
key
}
fn slot(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl TypedColumn for columns::ErasureMeta {
type Type = super::ErasureMeta;
}
#[derive(Debug, Clone)]
pub struct BatchProcessor<B>
where
B: Backend,
{
backend: Arc<B>,
pub struct Database {
backend: Arc<Rocks>,
}
#[derive(Debug, Clone)]
pub struct Cursor<B, C>
pub struct BatchProcessor {
backend: Arc<Rocks>,
}
pub struct Cursor<C>
where
B: Backend,
C: Column<B>,
C: Column,
{
db_cursor: B::Cursor,
db_cursor: DBRawIterator,
column: PhantomData<C>,
backend: PhantomData<B>,
backend: PhantomData<Rocks>,
}
#[derive(Debug, Clone)]
pub struct LedgerColumn<B, C>
pub struct LedgerColumn<C>
where
B: Backend,
C: Column<B>,
C: Column,
{
backend: Arc<B>,
backend: Arc<Rocks>,
column: PhantomData<C>,
}
#[derive(Debug)]
pub struct WriteBatch<B>
where
B: Backend,
{
write_batch: B::WriteBatch,
backend: PhantomData<B>,
map: HashMap<&'static str, B::ColumnFamily>,
pub struct WriteBatch {
write_batch: RWriteBatch,
backend: PhantomData<Rocks>,
map: HashMap<&'static str, ColumnFamily>,
}
impl<B> Database<B>
where
B: Backend,
{
impl Database {
pub fn open(path: &Path) -> Result<Self> {
let backend = Arc::new(B::open(path)?);
let backend = Arc::new(Rocks::open(path)?);
Ok(Database { backend })
}
pub fn destroy(path: &Path) -> Result<()> {
B::destroy(path)?;
Rocks::destroy(path)?;
Ok(())
}
pub fn get_bytes<C>(&self, key: C::Index) -> Result<Option<Vec<u8>>>
where
C: Column<B>,
C: Column,
{
self.backend
.get_cf(self.cf_handle::<C>(), C::key(key).borrow())
@ -211,7 +504,7 @@ where
pub fn put_bytes<C>(&self, key: C::Index, data: &[u8]) -> Result<()>
where
C: Column<B>,
C: Column,
{
self.backend
.put_cf(self.cf_handle::<C>(), C::key(key).borrow(), data)
@ -219,7 +512,7 @@ where
pub fn delete<C>(&self, key: C::Index) -> Result<()>
where
C: Column<B>,
C: Column,
{
self.backend
.delete_cf(self.cf_handle::<C>(), C::key(key).borrow())
@ -227,7 +520,7 @@ where
pub fn get<C>(&self, key: C::Index) -> Result<Option<C::Type>>
where
C: TypedColumn<B>,
C: TypedColumn,
{
if let Some(serialized_value) = self
.backend
@ -243,7 +536,7 @@ where
pub fn put<C>(&self, key: C::Index, value: &C::Type) -> Result<()>
where
C: TypedColumn<B>,
C: TypedColumn,
{
let serialized_value = serialize(value)?;
@ -254,9 +547,9 @@ where
)
}
pub fn cursor<C>(&self) -> Result<Cursor<B, C>>
pub fn cursor<C>(&self) -> Result<Cursor<C>>
where
C: Column<B>,
C: Column,
{
let db_cursor = self.backend.raw_iterator_cf(self.cf_handle::<C>())?;
@ -272,7 +565,7 @@ where
iterator_mode: IteratorMode<C::Index>,
) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)>>
where
C: Column<B>,
C: Column,
{
let iter = {
match iterator_mode {
@ -296,16 +589,16 @@ where
}
#[inline]
pub fn cf_handle<C>(&self) -> B::ColumnFamily
pub fn cf_handle<C>(&self) -> ColumnFamily
where
C: Column<B>,
C: Column,
{
self.backend.cf_handle(C::NAME).clone()
self.backend.cf_handle(C::NAME)
}
pub fn column<C>(&self) -> LedgerColumn<B, C>
pub fn column<C>(&self) -> LedgerColumn<C>
where
C: Column<B>,
C: Column,
{
LedgerColumn {
backend: Arc::clone(&self.backend),
@ -318,18 +611,15 @@ where
// blocktree.batch_processor, so this API should only be used if the caller is sure they
// are writing to data in columns that will not be corrupted by any simultaneous blocktree
// operations.
pub unsafe fn batch_processor(&self) -> BatchProcessor<B> {
pub unsafe fn batch_processor(&self) -> BatchProcessor {
BatchProcessor {
backend: Arc::clone(&self.backend),
}
}
}
impl<B> BatchProcessor<B>
where
B: Backend,
{
pub fn batch(&mut self) -> Result<WriteBatch<B>> {
impl BatchProcessor {
pub fn batch(&mut self) -> Result<WriteBatch> {
let db_write_batch = self.backend.batch()?;
let map = self
.backend
@ -345,15 +635,14 @@ where
})
}
pub fn write(&mut self, batch: WriteBatch<B>) -> Result<()> {
pub fn write(&mut self, batch: WriteBatch) -> Result<()> {
self.backend.write(batch.write_batch)
}
}
impl<B, C> Cursor<B, C>
impl<C> Cursor<C>
where
B: Backend,
C: Column<B>,
C: Column,
{
pub fn valid(&self) -> bool {
self.db_cursor.valid()
@ -384,10 +673,9 @@ where
}
}
impl<B, C> Cursor<B, C>
impl<C> Cursor<C>
where
B: Backend,
C: TypedColumn<B>,
C: TypedColumn,
{
pub fn value(&self) -> Option<C::Type> {
if let Some(bytes) = self.db_cursor.value() {
@ -399,16 +687,15 @@ where
}
}
impl<B, C> LedgerColumn<B, C>
impl<C> LedgerColumn<C>
where
B: Backend,
C: Column<B>,
C: Column,
{
pub fn get_bytes(&self, key: C::Index) -> Result<Option<Vec<u8>>> {
self.backend.get_cf(self.handle(), C::key(key).borrow())
}
pub fn cursor(&self) -> Result<Cursor<B, C>> {
pub fn cursor(&self) -> Result<Cursor<C>> {
let db_cursor = self.backend.raw_iterator_cf(self.handle())?;
Ok(Cursor {
@ -441,7 +728,7 @@ where
pub fn delete_slot(
&self,
batch: &mut WriteBatch<B>,
batch: &mut WriteBatch,
from: Option<Slot>,
to: Option<Slot>,
) -> Result<bool>
@ -474,8 +761,8 @@ where
}
#[inline]
pub fn handle(&self) -> B::ColumnFamily {
self.backend.cf_handle(C::NAME).clone()
pub fn handle(&self) -> ColumnFamily {
self.backend.cf_handle(C::NAME)
}
pub fn is_empty(&self) -> Result<bool> {
@ -494,10 +781,9 @@ where
}
}
impl<B, C> LedgerColumn<B, C>
impl<C> LedgerColumn<C>
where
B: Backend,
C: TypedColumn<B>,
C: TypedColumn,
{
pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> {
if let Some(serialized_value) = self.backend.get_cf(self.handle(), C::key(key).borrow())? {
@ -517,28 +803,106 @@ where
}
}
impl<B> WriteBatch<B>
where
B: Backend,
{
pub fn put_bytes<C: Column<B>>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> {
impl WriteBatch {
pub fn put_bytes<C: Column>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> {
self.write_batch
.put_cf(self.get_cf::<C>(), C::key(key).borrow(), bytes)
.map_err(|e| e.into())
}
pub fn delete<C: Column<B>>(&mut self, key: C::Index) -> Result<()> {
pub fn delete<C: Column>(&mut self, key: C::Index) -> Result<()> {
self.write_batch
.delete_cf(self.get_cf::<C>(), C::key(key).borrow())
.map_err(|e| e.into())
}
pub fn put<C: TypedColumn<B>>(&mut self, key: C::Index, value: &C::Type) -> Result<()> {
pub fn put<C: TypedColumn>(&mut self, key: C::Index, value: &C::Type) -> Result<()> {
let serialized_value = serialize(&value)?;
self.write_batch
.put_cf(self.get_cf::<C>(), C::key(key).borrow(), &serialized_value)
.map_err(|e| e.into())
}
#[inline]
fn get_cf<C: Column<B>>(&self) -> B::ColumnFamily {
self.map[C::NAME].clone()
fn get_cf<C: Column>(&self) -> ColumnFamily {
self.map[C::NAME]
}
}
impl DbCursor for DBRawIterator {
fn valid(&self) -> bool {
DBRawIterator::valid(self)
}
fn seek(&mut self, key: &[u8]) {
DBRawIterator::seek(self, key);
}
fn seek_to_first(&mut self) {
DBRawIterator::seek_to_first(self);
}
fn next(&mut self) {
DBRawIterator::next(self);
}
fn key(&self) -> Option<Vec<u8>> {
DBRawIterator::key(self)
}
fn value(&self) -> Option<Vec<u8>> {
DBRawIterator::value(self)
}
}
impl IWriteBatch for RWriteBatch {
fn put_cf(&mut self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
RWriteBatch::put_cf(self, cf, key, value)?;
Ok(())
}
fn delete_cf(&mut self, cf: ColumnFamily, key: &[u8]) -> Result<()> {
RWriteBatch::delete_cf(self, cf, key)?;
Ok(())
}
}
impl std::convert::From<rocksdb::Error> for BlocktreeError {
fn from(e: rocksdb::Error) -> BlocktreeError {
BlocktreeError::RocksDb(e)
}
}
fn get_cf_options(name: &'static str) -> Options {
use crate::blocktree::db::columns::{ErasureMeta, Index, ShredCode, ShredData};
let mut options = Options::default();
match name {
ShredCode::NAME | ShredData::NAME | Index::NAME | ErasureMeta::NAME => {
// 512MB * 8 = 4GB. 2 of these columns should take no more than 8GB of RAM
options.set_max_write_buffer_number(8);
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE as usize);
options.set_target_file_size_base(MAX_WRITE_BUFFER_SIZE / 10);
options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE);
}
_ => {
// We want smaller CFs to flush faster. This results in more WAL files but lowers
// overall WAL space utilization and increases flush frequency
options.set_write_buffer_size(MIN_WRITE_BUFFER_SIZE as usize);
options.set_target_file_size_base(MIN_WRITE_BUFFER_SIZE);
options.set_max_bytes_for_level_base(MIN_WRITE_BUFFER_SIZE);
options.set_level_zero_file_num_compaction_trigger(1);
}
}
options
}
fn get_db_options() -> Options {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
options.increase_parallelism(TOTAL_THREADS);
options.set_max_background_flushes(4);
options.set_max_background_compactions(4);
options
}

View File

@ -1,452 +0,0 @@
use crate::blocktree::db::columns as cf;
use crate::blocktree::db::{
Backend, Column, DbCursor, IWriteBatch, IteratorDirection, IteratorMode, TypedColumn,
};
use crate::blocktree::{BlocktreeError, Result};
use solana_sdk::clock::Slot;
use byteorder::{BigEndian, ByteOrder};
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction,
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
};
use std::fs;
use std::path::Path;
// A good value for this is the number of cores on the machine
const TOTAL_THREADS: i32 = 8;
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
const MIN_WRITE_BUFFER_SIZE: u64 = 64 * 1024; // 64KB
#[derive(Debug)]
pub struct Rocks(rocksdb::DB);
impl Backend for Rocks {
type Key = [u8];
type OwnedKey = Vec<u8>;
type ColumnFamily = ColumnFamily;
type Cursor = DBRawIterator;
type Iter = DBIterator;
type WriteBatch = RWriteBatch;
type Error = rocksdb::Error;
fn open(path: &Path) -> Result<Rocks> {
use crate::blocktree::db::columns::{
DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta,
};
fs::create_dir_all(&path)?;
// Use default database options
let db_options = get_db_options();
// Column family names
let meta_cf_descriptor =
ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(SlotMeta::NAME));
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(DeadSlots::NAME));
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(ErasureMeta::NAME));
let orphans_cf_descriptor =
ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(Orphans::NAME));
let root_cf_descriptor =
ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(Root::NAME));
let index_cf_descriptor =
ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(Index::NAME));
let shred_data_cf_descriptor =
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(ShredData::NAME));
let shred_code_cf_descriptor =
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(ShredCode::NAME));
let cfs = vec![
meta_cf_descriptor,
dead_slots_cf_descriptor,
erasure_meta_cf_descriptor,
orphans_cf_descriptor,
root_cf_descriptor,
index_cf_descriptor,
shred_data_cf_descriptor,
shred_code_cf_descriptor,
];
// Open the database
let db = Rocks(DB::open_cf_descriptors(&db_options, path, cfs)?);
Ok(db)
}
fn columns(&self) -> Vec<&'static str> {
use crate::blocktree::db::columns::{
DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta,
};
vec![
ErasureMeta::NAME,
DeadSlots::NAME,
Index::NAME,
Orphans::NAME,
Root::NAME,
SlotMeta::NAME,
ShredData::NAME,
ShredCode::NAME,
]
}
fn destroy(path: &Path) -> Result<()> {
DB::destroy(&Options::default(), path)?;
Ok(())
}
fn cf_handle(&self, cf: &str) -> ColumnFamily {
self.0
.cf_handle(cf)
.expect("should never get an unknown column")
}
fn get_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> {
let opt = self.0.get_cf(cf, key)?.map(|db_vec| db_vec.to_vec());
Ok(opt)
}
fn put_cf(&self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
self.0.put_cf(cf, key, value)?;
Ok(())
}
fn delete_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<()> {
self.0.delete_cf(cf, key)?;
Ok(())
}
fn iterator_cf(
&self,
cf: ColumnFamily,
iterator_mode: IteratorMode<&[u8]>,
) -> Result<DBIterator> {
let iter = {
match iterator_mode {
IteratorMode::Start => self.0.iterator_cf(cf, RocksIteratorMode::Start)?,
IteratorMode::End => self.0.iterator_cf(cf, RocksIteratorMode::End)?,
IteratorMode::From(start_from, direction) => {
let rocks_direction = match direction {
IteratorDirection::Forward => Direction::Forward,
IteratorDirection::Reverse => Direction::Reverse,
};
self.0
.iterator_cf(cf, RocksIteratorMode::From(start_from, rocks_direction))?
}
}
};
Ok(iter)
}
fn raw_iterator_cf(&self, cf: ColumnFamily) -> Result<DBRawIterator> {
let raw_iter = self.0.raw_iterator_cf(cf)?;
Ok(raw_iter)
}
fn batch(&self) -> Result<RWriteBatch> {
Ok(RWriteBatch::default())
}
fn write(&self, batch: RWriteBatch) -> Result<()> {
self.0.write(batch)?;
Ok(())
}
}
impl Column<Rocks> for cf::ShredCode {
const NAME: &'static str = super::CODE_SHRED_CF;
type Index = (u64, u64);
fn key(index: (u64, u64)) -> Vec<u8> {
cf::ShredData::key(index)
}
fn index(key: &[u8]) -> (u64, u64) {
cf::ShredData::index(key)
}
fn slot(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl Column<Rocks> for cf::ShredData {
const NAME: &'static str = super::DATA_SHRED_CF;
type Index = (u64, u64);
fn key((slot, index): (u64, u64)) -> Vec<u8> {
let mut key = vec![0; 16];
BigEndian::write_u64(&mut key[..8], slot);
BigEndian::write_u64(&mut key[8..16], index);
key
}
fn index(key: &[u8]) -> (u64, u64) {
let slot = BigEndian::read_u64(&key[..8]);
let index = BigEndian::read_u64(&key[8..16]);
(slot, index)
}
fn slot(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl Column<Rocks> for cf::Index {
const NAME: &'static str = super::INDEX_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn<Rocks> for cf::Index {
type Type = crate::blocktree::meta::Index;
}
impl Column<Rocks> for cf::DeadSlots {
const NAME: &'static str = super::DEAD_SLOTS_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn<Rocks> for cf::DeadSlots {
type Type = bool;
}
impl Column<Rocks> for cf::Orphans {
const NAME: &'static str = super::ORPHANS_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn<Rocks> for cf::Orphans {
type Type = bool;
}
impl Column<Rocks> for cf::Root {
const NAME: &'static str = super::ROOT_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn<Rocks> for cf::Root {
type Type = bool;
}
impl Column<Rocks> for cf::SlotMeta {
const NAME: &'static str = super::META_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn<Rocks> for cf::SlotMeta {
type Type = super::SlotMeta;
}
impl Column<Rocks> for cf::ErasureMeta {
const NAME: &'static str = super::ERASURE_META_CF;
type Index = (u64, u64);
fn index(key: &[u8]) -> (u64, u64) {
let slot = BigEndian::read_u64(&key[..8]);
let set_index = BigEndian::read_u64(&key[8..]);
(slot, set_index)
}
fn key((slot, set_index): (u64, u64)) -> Vec<u8> {
let mut key = vec![0; 16];
BigEndian::write_u64(&mut key[..8], slot);
BigEndian::write_u64(&mut key[8..], set_index);
key
}
fn slot(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl TypedColumn<Rocks> for cf::ErasureMeta {
type Type = super::ErasureMeta;
}
impl DbCursor<Rocks> for DBRawIterator {
fn valid(&self) -> bool {
DBRawIterator::valid(self)
}
fn seek(&mut self, key: &[u8]) {
DBRawIterator::seek(self, key);
}
fn seek_to_first(&mut self) {
DBRawIterator::seek_to_first(self);
}
fn next(&mut self) {
DBRawIterator::next(self);
}
fn key(&self) -> Option<Vec<u8>> {
DBRawIterator::key(self)
}
fn value(&self) -> Option<Vec<u8>> {
DBRawIterator::value(self)
}
}
impl IWriteBatch<Rocks> for RWriteBatch {
fn put_cf(&mut self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
RWriteBatch::put_cf(self, cf, key, value)?;
Ok(())
}
fn delete_cf(&mut self, cf: ColumnFamily, key: &[u8]) -> Result<()> {
RWriteBatch::delete_cf(self, cf, key)?;
Ok(())
}
}
impl std::convert::From<rocksdb::Error> for BlocktreeError {
fn from(e: rocksdb::Error) -> BlocktreeError {
BlocktreeError::RocksDb(e)
}
}
fn get_cf_options(name: &'static str) -> Options {
use crate::blocktree::db::columns::{ErasureMeta, Index, ShredCode, ShredData};
let mut options = Options::default();
match name {
ShredCode::NAME | ShredData::NAME | Index::NAME | ErasureMeta::NAME => {
// 512MB * 8 = 4GB. 2 of these columns should take no more than 8GB of RAM
options.set_max_write_buffer_number(8);
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE as usize);
options.set_target_file_size_base(MAX_WRITE_BUFFER_SIZE / 10);
options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE);
}
_ => {
// We want smaller CFs to flush faster. This results in more WAL files but lowers
// overall WAL space utilization and increases flush frequency
options.set_write_buffer_size(MIN_WRITE_BUFFER_SIZE as usize);
options.set_target_file_size_base(MIN_WRITE_BUFFER_SIZE);
options.set_max_bytes_for_level_base(MIN_WRITE_BUFFER_SIZE);
options.set_level_zero_file_num_compaction_trigger(1);
}
}
options
}
fn get_db_options() -> Options {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
options.increase_parallelism(TOTAL_THREADS);
options.set_max_background_flushes(4);
options.set_max_background_compactions(4);
options
}

View File

@ -157,7 +157,7 @@ impl Crds {
}
pub fn remove(&mut self, key: &CrdsValueLabel) {
self.table.remove(key);
self.table.swap_remove(key);
}
}

View File

@ -270,7 +270,7 @@ impl CrdsGossipPush {
keys.shuffle(&mut rand::thread_rng());
let num = keys.len() / ratio;
for k in &keys[..num] {
self.active_set.remove(k);
self.active_set.swap_remove(k);
}
for (k, v) in new_items {
self.active_set.insert(k, v);