Upgrade RocksDB (#6496)

* Upgrade rocksdb

* Delete BatchProcessor

Those methods don't need to be `&mut self` and they're causing
compilation failures.
This commit is contained in:
Greg Fitzgerald
2019-10-24 11:30:53 -06:00
committed by GitHub
parent e9bdee3dc7
commit a2543e5a8d
4 changed files with 96 additions and 108 deletions

View File

@ -43,7 +43,7 @@ tempfile = "3.1.0"
[dependencies.rocksdb]
# Avoid the vendored bzip2 within rocksdb-sys that can cause linker conflicts
# when also using the bzip2 crate
version = "0.11.0"
version = "0.12.4"
default-features = false
features = ["lz4"]

View File

@ -2,8 +2,7 @@
//! Proof of History ledger as well as iterative read, append write, and random
//! access read to a persistent file-based ledger.
use crate::blocktree_db::{
columns as cf, BatchProcessor, Column, Database, IteratorDirection, IteratorMode, LedgerColumn,
WriteBatch,
columns as cf, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, WriteBatch,
};
pub use crate::blocktree_db::{BlocktreeError, Result};
pub use crate::blocktree_meta::SlotMeta;
@ -55,7 +54,6 @@ pub struct Blocktree {
index_cf: LedgerColumn<cf::Index>,
data_shred_cf: LedgerColumn<cf::ShredData>,
code_shred_cf: LedgerColumn<cf::ShredCode>,
batch_processor: Arc<RwLock<BatchProcessor>>,
last_root: Arc<RwLock<u64>>,
pub new_shreds_signals: Vec<SyncSender<bool>>,
pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>,
@ -70,8 +68,6 @@ impl Blocktree {
// Open the database
let db = Database::open(&blocktree_path)?;
let batch_processor = unsafe { Arc::new(RwLock::new(db.batch_processor())) };
// Create the metadata column family
let meta_cf = db.column();
@ -109,7 +105,6 @@ impl Blocktree {
data_shred_cf,
code_shred_cf,
new_shreds_signals: vec![],
batch_processor,
completed_slots_senders: vec![],
last_root,
})
@ -179,54 +174,52 @@ impl Blocktree {
let from_slot = Some(from_slot);
let batch_end = Some(batch_end);
unsafe {
let mut batch_processor = self.db.batch_processor();
let mut write_batch = batch_processor
.batch()
.expect("Database Error: Failed to get write batch");
let end = self
.meta_cf
let mut write_batch = self
.db
.batch()
.expect("Database Error: Failed to get write batch");
let end = self
.meta_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.erasure_meta_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.erasure_meta_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.data_shred_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.code_shred_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.orphans_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.index_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.dead_slots_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.db
.column::<cf::Root>()
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false);
&& self
.data_shred_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.code_shred_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.orphans_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.index_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.dead_slots_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false)
&& self
.db
.column::<cf::Root>()
.delete_slot(&mut write_batch, from_slot, batch_end)
.unwrap_or(false);
if let Err(e) = batch_processor.write(write_batch) {
error!(
"Error: {:?} while submitting write batch for slot {:?} retrying...",
e, from_slot
);
return Err(e);
}
Ok(end)
if let Err(e) = self.db.write(write_batch) {
error!(
"Error: {:?} while submitting write batch for slot {:?} retrying...",
e, from_slot
);
return Err(e);
}
Ok(end)
}
pub fn erasure_meta(&self, slot: u64, set_index: u64) -> Result<Option<ErasureMeta>> {
@ -367,8 +360,7 @@ impl Blocktree {
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
) -> Result<()> {
let db = &*self.db;
let mut batch_processor = self.batch_processor.write().unwrap();
let mut write_batch = batch_processor.batch()?;
let mut write_batch = db.batch()?;
let mut just_inserted_coding_shreds = HashMap::new();
let mut just_inserted_data_shreds = HashMap::new();
@ -437,7 +429,7 @@ impl Blocktree {
write_batch.put::<cf::Index>(slot, index)?;
}
batch_processor.write(write_batch)?;
self.db.write(write_batch)?;
if should_signal {
for signal in &self.new_shreds_signals {
@ -1098,16 +1090,13 @@ impl Blocktree {
}
pub fn set_roots(&self, rooted_slots: &[u64]) -> Result<()> {
unsafe {
let mut batch_processor = self.db.batch_processor();
let mut write_batch = batch_processor.batch()?;
for slot in rooted_slots {
write_batch.put::<cf::Root>(*slot, &true)?;
}
batch_processor.write(write_batch)?;
let mut write_batch = self.db.batch()?;
for slot in rooted_slots {
write_batch.put::<cf::Root>(*slot, &true)?;
}
self.db.write(write_batch)?;
let mut last_root = self.last_root.write().unwrap();
if *last_root == std::u64::MAX {
*last_root = 0;

View File

@ -471,11 +471,6 @@ pub struct Database {
backend: Arc<Rocks>,
}
#[derive(Debug, Clone)]
pub struct BatchProcessor {
backend: Arc<Rocks>,
}
#[derive(Debug, Clone)]
pub struct LedgerColumn<C>
where
@ -485,10 +480,9 @@ where
column: PhantomData<C>,
}
pub struct WriteBatch {
pub struct WriteBatch<'a> {
write_batch: RWriteBatch,
backend: PhantomData<Rocks>,
map: HashMap<&'static str, ColumnFamily>,
map: HashMap<&'static str, ColumnFamily<'a>>,
}
impl Database {
@ -552,21 +546,8 @@ impl Database {
self.backend.raw_iterator_cf(cf)
}
// Note this returns an object that can be used to directly write to multiple column families.
// This circumvents the synchronization around APIs that in Blocktree that use
// blocktree.batch_processor, so this API should only be used if the caller is sure they
// are writing to data in columns that will not be corrupted by any simultaneous blocktree
// operations.
pub unsafe fn batch_processor(&self) -> BatchProcessor {
BatchProcessor {
backend: Arc::clone(&self.backend),
}
}
}
impl BatchProcessor {
pub fn batch(&mut self) -> Result<WriteBatch> {
let db_write_batch = self.backend.batch()?;
pub fn batch(&self) -> Result<WriteBatch> {
let write_batch = self.backend.batch()?;
let map = self
.backend
.columns()
@ -574,14 +555,10 @@ impl BatchProcessor {
.map(|desc| (desc, self.backend.cf_handle(desc)))
.collect();
Ok(WriteBatch {
write_batch: db_write_batch,
backend: PhantomData,
map,
})
Ok(WriteBatch { write_batch, map })
}
pub fn write(&mut self, batch: WriteBatch) -> Result<()> {
pub fn write(&self, batch: WriteBatch) -> Result<()> {
self.backend.write(batch.write_batch)
}
}
@ -676,7 +653,7 @@ where
}
}
impl WriteBatch {
impl<'a> WriteBatch<'a> {
pub fn put_bytes<C: Column>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> {
self.write_batch
.put_cf(self.get_cf::<C>(), &C::key(key), bytes)?;
@ -697,7 +674,7 @@ impl WriteBatch {
}
#[inline]
fn get_cf<C: Column>(&self) -> ColumnFamily {
fn get_cf<C: Column>(&self) -> ColumnFamily<'a> {
self.map[C::NAME]
}
}