Reduce locking in Blocktree (#4075)

* Reduce lock contention in blocktree

* Store root slot in separate column
This commit is contained in:
Mark E. Sinclair
2019-05-03 16:46:02 -05:00
committed by GitHub
parent f91627a230
commit ed48d8323c
6 changed files with 217 additions and 172 deletions

View File

@@ -49,6 +49,7 @@ macro_rules! db_imports {
pub type Cursor<C> = db::Cursor<$db, C>;
pub type LedgerColumn<C> = db::LedgerColumn<$db, C>;
pub type WriteBatch = db::WriteBatch<$db>;
type BatchProcessor = db::BatchProcessor<$db>;
pub trait Column: db::Column<$db> {}
impl<C: db::Column<$db>> Column for C {}
@@ -73,15 +74,15 @@ pub enum BlocktreeError {
// ledger window
pub struct Blocktree {
db: Arc<RwLock<Database>>,
db: Arc<Database>,
meta_cf: LedgerColumn<cf::SlotMeta>,
data_cf: LedgerColumn<cf::Data>,
erasure_cf: LedgerColumn<cf::Coding>,
erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
orphans_cf: LedgerColumn<cf::Orphans>,
batch_processor: Arc<RwLock<BatchProcessor>>,
session: Arc<erasure::Session>,
pub new_blobs_signals: Vec<SyncSender<bool>>,
pub root_slot: RwLock<u64>,
}
// Column family for metadata about a leader slot
@@ -93,6 +94,8 @@ pub const ERASURE_CF: &str = "erasure";
pub const ERASURE_META_CF: &str = "erasure_meta";
// Column family for orphans data
pub const ORPHANS_CF: &str = "orphans";
// Column family for root data
pub const ROOT_CF: &str = "root";
impl Blocktree {
/// Opens a Ledger in directory, provides "infinite" window of blobs
@@ -105,6 +108,8 @@ impl Blocktree {
// Open the database
let db = Database::open(&ledger_path)?;
let batch_processor = Arc::new(RwLock::new(db.batch_processor()));
// Create the metadata column family
let meta_cf = db.column();
@@ -124,7 +129,7 @@ impl Blocktree {
// setup erasure
let session = Arc::new(erasure::Session::default());
let db = Arc::new(RwLock::new(db));
let db = Arc::new(db);
Ok(Blocktree {
db,
@@ -135,7 +140,7 @@ impl Blocktree {
orphans_cf,
session,
new_blobs_signals: vec![],
root_slot: RwLock::new(0),
batch_processor,
})
}
@@ -155,21 +160,19 @@ impl Blocktree {
}
pub fn meta(&self, slot: u64) -> Result<Option<SlotMeta>> {
self.meta_cf.get(&*self.db.read().unwrap(), slot)
self.meta_cf.get(slot)
}
pub fn erasure_meta(&self, slot: u64, set_index: u64) -> Result<Option<ErasureMeta>> {
self.erasure_meta_cf
.get(&*self.db.read().unwrap(), (slot, set_index))
self.erasure_meta_cf.get((slot, set_index))
}
pub fn orphan(&self, slot: u64) -> Result<Option<bool>> {
self.orphans_cf.get(&*self.db.read().unwrap(), slot)
self.orphans_cf.get(slot)
}
pub fn get_next_slot(&self, slot: u64) -> Result<Option<u64>> {
let db = self.db.read().unwrap();
let mut db_iterator = db.cursor::<cf::SlotMeta>()?;
let mut db_iterator = self.db.cursor::<cf::SlotMeta>()?;
db_iterator.seek(slot + 1);
if !db_iterator.valid() {
@@ -264,8 +267,9 @@ impl Blocktree {
I: IntoIterator,
I::Item: Borrow<Blob>,
{
let mut db = self.db.write().unwrap();
let mut write_batch = db.batch()?;
let db = &*self.db;
let mut batch_processor = self.batch_processor.write().unwrap();
let mut write_batch = batch_processor.batch()?;
let new_blobs: Vec<_> = new_blobs.into_iter().collect();
let mut recovered_data = vec![];
@@ -285,7 +289,7 @@ impl Blocktree {
.entry((blob_slot, set_index))
.or_insert_with(|| {
self.erasure_meta_cf
.get(&db, (blob_slot, set_index))
.get((blob_slot, set_index))
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index))
});
@@ -358,7 +362,7 @@ impl Blocktree {
}
}
db.write(write_batch)?;
batch_processor.write(write_batch)?;
Ok(())
}
@@ -374,8 +378,7 @@ impl Blocktree {
buf: &mut [u8],
slot: u64,
) -> Result<(u64, u64)> {
let db = self.db.read().unwrap();
let mut db_iterator = db.cursor::<cf::Data>()?;
let mut db_iterator = self.db.cursor::<cf::Data>()?;
db_iterator.seek((slot, start_index));
let mut total_blobs = 0;
@@ -427,69 +430,65 @@ impl Blocktree {
}
pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
let db = self.db.read().unwrap();
self.erasure_cf.get_bytes(&db, (slot, index))
self.erasure_cf.get_bytes((slot, index))
}
pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> {
let set_index = ErasureMeta::set_index_for(index);
let mut db = self.db.write().unwrap();
let mut batch_processor = self.batch_processor.write().unwrap();
let mut erasure_meta = self
.erasure_meta_cf
.get(&db, (slot, set_index))?
.get((slot, set_index))?
.unwrap_or_else(|| ErasureMeta::new(set_index));
erasure_meta.set_coding_present(index, false);
let mut batch = db.batch()?;
let mut batch = batch_processor.batch()?;
batch.delete::<cf::Coding>((slot, index))?;
batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
db.write(batch)?;
batch_processor.write(batch)?;
Ok(())
}
pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
let db = self.db.read().unwrap();
self.data_cf.get_bytes(&db, (slot, index))
self.data_cf.get_bytes((slot, index))
}
/// For benchmarks, testing, and setup.
/// Does no metadata tracking. Use with care.
pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
let mut db = self.db.write().unwrap();
self.data_cf.put_bytes(&mut db, (slot, index), bytes)
self.data_cf.put_bytes((slot, index), bytes)
}
/// For benchmarks, testing, and setup.
/// Does no metadata tracking. Use with care.
pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
let mut db = self.db.write().unwrap();
self.erasure_cf.put_bytes(&mut db, (slot, index), bytes)
self.erasure_cf.put_bytes((slot, index), bytes)
}
/// this function will insert coding blobs and also automatically track erasure-related
/// metadata. If recovery is available it will be done
pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
let set_index = ErasureMeta::set_index_for(index);
let mut db = self.db.write().unwrap();
let mut batch_processor = self.batch_processor.write().unwrap();
let mut erasure_meta = self
.erasure_meta_cf
.get(&db, (slot, set_index))?
.get((slot, set_index))?
.unwrap_or_else(|| ErasureMeta::new(set_index));
erasure_meta.set_coding_present(index, true);
erasure_meta.set_size(bytes.len() - BLOB_HEADER_SIZE);
let mut writebatch = db.batch()?;
let mut writebatch = batch_processor.batch()?;
writebatch.put_bytes::<cf::Coding>((slot, index), bytes)?;
if let Some((data, coding)) = try_erasure_recover(
&db,
&self.db,
&self.session,
&erasure_meta,
slot,
@@ -501,7 +500,7 @@ impl Blocktree {
insert_data_blob_batch(
&data[..],
&db,
&self.db,
&mut HashMap::new(),
&mut erasure_meta_working_set,
&mut HashMap::new(),
@@ -522,7 +521,7 @@ impl Blocktree {
writebatch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
db.write(writebatch)?;
batch_processor.write(writebatch)?;
Ok(())
}
@@ -619,8 +618,7 @@ impl Blocktree {
end_index: u64,
max_missing: usize,
) -> Vec<u64> {
let db = self.db.read().unwrap();
if let Ok(mut db_iterator) = db.cursor::<cf::Data>() {
if let Ok(mut db_iterator) = self.db.cursor::<cf::Data>() {
Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing)
} else {
vec![]
@@ -639,9 +637,7 @@ impl Blocktree {
}
pub fn read_ledger_blobs(&self) -> impl Iterator<Item = Blob> + '_ {
let db = self.db.read().unwrap();
let iter = db.iter::<cf::Data>().unwrap();
let iter = self.db.iter::<cf::Data>().unwrap();
iter.map(|(_, blob_data)| Blob::new(&blob_data))
}
@@ -699,7 +695,7 @@ impl Blocktree {
None
}
}
let mut db_iterator = self.db.read().unwrap().cursor::<cf::Data>()?;
let mut db_iterator = self.db.cursor::<cf::Data>()?;
db_iterator.seek_to_first();
Ok(EntryIterator {
@@ -718,7 +714,7 @@ impl Blocktree {
// Find the next consecutive block of blobs.
let consecutive_blobs = get_slot_consecutive_blobs(
slot,
&self.db.read().unwrap(),
&self.db,
&HashMap::new(),
blob_start_index,
max_entries,
@@ -750,30 +746,28 @@ impl Blocktree {
}
pub fn is_root(&self, slot: u64) -> bool {
if let Ok(Some(meta)) = self.meta(slot) {
meta.is_root
if let Ok(Some(root_slot)) = self.db.get::<cf::Root>(()) {
root_slot == slot
} else {
false
}
}
pub fn set_root(&self, slot: u64) -> Result<()> {
let mut root_slot = self.root_slot.write().unwrap();
let mut db = self.db.write().unwrap();
*root_slot = slot;
if let Some(mut meta) = self.meta_cf.get(&db, slot)? {
meta.is_root = true;
self.meta_cf.put(&mut db, slot, &meta)?;
}
self.db.put::<cf::Root>((), &slot)?;
Ok(())
}
pub fn get_root(&self) -> Result<u64> {
let root_opt = self.db.get::<cf::Root>(())?;
Ok(root_opt.unwrap_or(0))
}
pub fn get_orphans(&self, max: Option<usize>) -> Vec<u64> {
let mut results = vec![];
let db = self.db.read().unwrap();
let mut iter = db.cursor::<cf::Orphans>().unwrap();
let mut iter = self.db.cursor::<cf::Orphans>().unwrap();
iter.seek_to_first();
while iter.valid() {
if let Some(max) = max {
@@ -794,19 +788,19 @@ impl Blocktree {
let mut bootstrap_meta = SlotMeta::new(0, 1);
let last = blobs.last().unwrap();
let mut db = self.db.write().unwrap();
let mut batch_processor = self.batch_processor.write().unwrap();
bootstrap_meta.consumed = last.index() + 1;
bootstrap_meta.received = last.index() + 1;
bootstrap_meta.is_connected = true;
let mut batch = db.batch()?;
let mut batch = batch_processor.batch()?;
batch.put::<cf::SlotMeta>(0, &bootstrap_meta)?;
for blob in blobs {
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()];
batch.put_bytes::<cf::Data>((blob.slot(), blob.index()), serialized_blob_datas)?;
}
db.write(batch)?;
batch_processor.write(batch)?;
Ok(())
}
}
@@ -918,7 +912,7 @@ fn check_insert_data_blob<'a>(
let entry = slot_meta_working_set.entry(blob_slot).or_insert_with(|| {
// Store a 2-tuple of the metadata (working copy, backup copy)
if let Some(mut meta) = meta_cf
.get(db, blob_slot)
.get(blob_slot)
.expect("Expect database get to succeed")
{
let backup = Some(meta.clone());
@@ -965,7 +959,7 @@ fn should_insert_blob(
if blob_index < slot.consumed
|| prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index))
|| data_cf
.get_bytes(db, (blob_slot, blob_index))
.get_bytes((blob_slot, blob_index))
.map(|opt| opt.is_some())
.unwrap_or(false)
{
@@ -1035,7 +1029,7 @@ fn find_slot_meta_in_db_else_create<'a>(
slot: u64,
insert_map: &'a mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
) -> Result<Rc<RefCell<SlotMeta>>> {
if let Some(slot_meta) = db.column::<cf::SlotMeta>().get(db, slot)? {
if let Some(slot_meta) = db.column::<cf::SlotMeta>().get(slot)? {
insert_map.insert(slot, Rc::new(RefCell::new(slot_meta)));
Ok(insert_map.get(&slot).unwrap().clone())
} else {
@@ -1084,7 +1078,7 @@ fn get_slot_consecutive_blobs<'a>(
// Try to find the next blob we're looking for in the prev_inserted_blob_datas
if let Some(prev_blob_data) = prev_inserted_blob_datas.get(&(slot, current_index)) {
blobs.push(Cow::Borrowed(*prev_blob_data));
} else if let Some(blob_data) = data_cf.get_bytes(db, (slot, current_index))? {
} else if let Some(blob_data) = data_cf.get_bytes((slot, current_index))? {
// Try to find the next blob we're looking for in the database
blobs.push(Cow::Owned(blob_data));
} else {
@@ -1345,7 +1339,7 @@ fn recover(
let mut blob_bytes = match new_coding {
Some((new_coding_index, bytes)) if new_coding_index == i => bytes.to_vec(),
_ => erasure_cf
.get_bytes(db, (slot, i))?
.get_bytes((slot, i))?
.expect("ErasureMeta must have no false positives"),
};
@@ -1368,7 +1362,7 @@ fn recover(
let mut blob_bytes = match prev_inserted_blob_datas.get(&(slot, i)) {
Some(bytes) => bytes.to_vec(),
None => data_cf
.get_bytes(db, (slot, i))?
.get_bytes((slot, i))?
.expect("erasure_meta must have no false positives"),
};
@@ -1631,14 +1625,13 @@ pub mod tests {
fn test_put_get_simple() {
let ledger_path = get_tmp_ledger_path("test_put_get_simple");
let ledger = Blocktree::open(&ledger_path).unwrap();
let mut db = ledger.db.write().unwrap();
// Test meta column family
let meta = SlotMeta::new(0, 1);
ledger.meta_cf.put(&mut db, 0, &meta).unwrap();
ledger.meta_cf.put(0, &meta).unwrap();
let result = ledger
.meta_cf
.get(&db, 0)
.get(0)
.unwrap()
.expect("Expected meta object to exist");
@@ -1647,14 +1640,11 @@ pub mod tests {
// Test erasure column family
let erasure = vec![1u8; 16];
let erasure_key = (0, 0);
ledger
.erasure_cf
.put_bytes(&mut db, erasure_key, &erasure)
.unwrap();
ledger.erasure_cf.put_bytes(erasure_key, &erasure).unwrap();
let result = ledger
.erasure_cf
.get_bytes(&db, erasure_key)
.get_bytes(erasure_key)
.unwrap()
.expect("Expected erasure object to exist");
@@ -1663,18 +1653,17 @@ pub mod tests {
// Test data column family
let data = vec![2u8; 16];
let data_key = (0, 0);
ledger.data_cf.put_bytes(&mut db, data_key, &data).unwrap();
ledger.data_cf.put_bytes(data_key, &data).unwrap();
let result = ledger
.data_cf
.get_bytes(&db, data_key)
.get_bytes(data_key)
.unwrap()
.expect("Expected data object to exist");
assert_eq!(result, data);
// Destroying database without closing it first is undefined behavior
drop(db);
drop(ledger);
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
@@ -1853,8 +1842,6 @@ pub mod tests {
let mut db_iterator = blocktree
.db
.read()
.unwrap()
.cursor::<cf::Data>()
.expect("Expected to be able to open database iterator");
@@ -2504,10 +2491,7 @@ pub mod tests {
}
// No orphan slots should exist
assert!(blocktree
.orphans_cf
.is_empty(&blocktree.db.read().unwrap())
.unwrap())
assert!(blocktree.orphans_cf.is_empty().unwrap())
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
@@ -2524,20 +2508,14 @@ pub mod tests {
assert!(blocktree.get_slots_since(&vec![0]).unwrap().is_empty());
let mut meta0 = SlotMeta::new(0, 0);
blocktree
.meta_cf
.put(&mut blocktree.db.write().unwrap(), 0, &meta0)
.unwrap();
blocktree.meta_cf.put(0, &meta0).unwrap();
// Slot exists, chains to nothing
let expected: HashMap<u64, Vec<u64>> =
HashMap::from_iter(vec![(0, vec![])].into_iter());
assert_eq!(blocktree.get_slots_since(&vec![0]).unwrap(), expected);
meta0.next_slots = vec![1, 2];
blocktree
.meta_cf
.put(&mut blocktree.db.write().unwrap(), 0, &meta0)
.unwrap();
blocktree.meta_cf.put(0, &meta0).unwrap();
// Slot exists, chains to some other slots
let expected: HashMap<u64, Vec<u64>> =
@@ -2547,10 +2525,7 @@ pub mod tests {
let mut meta3 = SlotMeta::new(3, 1);
meta3.next_slots = vec![10, 5];
blocktree
.meta_cf
.put(&mut blocktree.db.write().unwrap(), 3, &meta3)
.unwrap();
blocktree.meta_cf.put(3, &meta3).unwrap();
let expected: HashMap<u64, Vec<u64>> =
HashMap::from_iter(vec![(0, vec![1, 2]), (3, vec![10, 5])].into_iter());
assert_eq!(blocktree.get_slots_since(&vec![0, 1, 3]).unwrap(), expected);
@@ -2611,10 +2586,7 @@ pub mod tests {
assert!(!is_orphan(&meta));
}
// Orphans cf is empty
assert!(blocktree
.orphans_cf
.is_empty(&blocktree.db.read().unwrap())
.unwrap())
assert!(blocktree.orphans_cf.is_empty().unwrap())
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
@@ -2847,7 +2819,7 @@ pub mod tests {
assert_eq!(slot_meta.consumed, 5);
assert!(!should_insert_blob(
&slot_meta,
&blocktree.db.read().unwrap(),
&blocktree.db,
&HashMap::new(),
&blobs[4].clone()
));
@@ -2857,7 +2829,7 @@ pub mod tests {
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert!(!should_insert_blob(
&slot_meta,
&blocktree.db.read().unwrap(),
&blocktree.db,
&HashMap::new(),
&blobs[7].clone()
));
@@ -2870,7 +2842,7 @@ pub mod tests {
blobs[8].set_is_last_in_slot();
assert!(!should_insert_blob(
&slot_meta,
&blocktree.db.read().unwrap(),
&blocktree.db,
&HashMap::new(),
&blobs[8].clone()
));
@@ -2883,7 +2855,7 @@ pub mod tests {
// Trying to insert a blob with index > the "is_last" blob should fail
assert!(!should_insert_blob(
&slot_meta,
&blocktree.db.read().unwrap(),
&blocktree.db,
&HashMap::new(),
&blobs[10].clone()
));
@@ -3092,11 +3064,9 @@ pub mod tests {
(slot, blob.index());
}
let db = blocktree.db.read().unwrap();
let erasure_meta = blocktree
.erasure_meta_cf
.get(&db, (slot, set_index as u64))
.get((slot, set_index as u64))
.expect("Erasure Meta should be present")
.unwrap();
@@ -3104,7 +3074,7 @@ pub mod tests {
let retrieved_data = blocktree
.data_cf
.get_bytes(&db, (slot, focused_index as u64))
.get_bytes((slot, focused_index as u64))
.unwrap();
assert!(retrieved_data.is_some());
@@ -3151,7 +3121,7 @@ pub mod tests {
// try recovery even though there aren't enough blobs
let erasure_meta = blocktree
.erasure_meta_cf
.get(&blocktree.db.read().unwrap(), (SLOT, SET_INDEX))
.get((SLOT, SET_INDEX))
.unwrap()
.unwrap();
@@ -3160,7 +3130,7 @@ pub mod tests {
let prev_inserted_blob_datas = HashMap::new();
let attempt_result = try_erasure_recover(
&blocktree.db.read().unwrap(),
&blocktree.db,
&blocktree.session,
&erasure_meta,
SLOT,
@@ -3303,10 +3273,7 @@ pub mod tests {
// triggering recovery.
let erasure_meta = blocktree
.erasure_meta_cf
.get(
&blocktree.db.read().unwrap(),
(slot, erasure_set.set_index),
)
.get((slot, erasure_set.set_index))
.unwrap()
.unwrap();
@@ -3340,7 +3307,7 @@ pub mod tests {
let erasure_meta = blocktree
.erasure_meta_cf
.get(&blocktree.db.read().unwrap(), (slot, set_index))
.get((slot, set_index))
.expect("DB get must succeed")
.expect("ErasureMeta must be present for each erasure set");