|
|
|
@ -41,7 +41,7 @@ macro_rules! db_imports {
|
|
|
|
|
LedgerColumnFamilyRaw,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
pub use $mod::{$db, ErasureCf, MetaCf, DataCf};
|
|
|
|
|
pub use $mod::{$db, ErasureCf, MetaCf, DataCf, DetachedHeadsCf};
|
|
|
|
|
pub type BlocktreeRawIterator = <$db as Database>::Cursor;
|
|
|
|
|
pub type WriteBatch = <$db as Database>::WriteBatch;
|
|
|
|
|
pub type OwnedKey = <$db as Database>::OwnedKey;
|
|
|
|
@ -125,6 +125,7 @@ pub struct Blocktree {
|
|
|
|
|
meta_cf: MetaCf,
|
|
|
|
|
data_cf: DataCf,
|
|
|
|
|
erasure_cf: ErasureCf,
|
|
|
|
|
detached_heads_cf: DetachedHeadsCf,
|
|
|
|
|
pub new_blobs_signals: Vec<SyncSender<bool>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -134,6 +135,8 @@ pub const META_CF: &str = "meta";
|
|
|
|
|
pub const DATA_CF: &str = "data";
|
|
|
|
|
// Column family for erasure data
|
|
|
|
|
pub const ERASURE_CF: &str = "erasure";
|
|
|
|
|
// Column family for detached heads data
|
|
|
|
|
pub const DETACHED_HEADS_CF: &str = "detached_heads";
|
|
|
|
|
|
|
|
|
|
impl Blocktree {
|
|
|
|
|
pub fn open_with_signal(ledger_path: &str) -> Result<(Self, Receiver<bool>)> {
|
|
|
|
@ -148,6 +151,10 @@ impl Blocktree {
|
|
|
|
|
self.meta_cf.get(&MetaCf::key(&slot))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn detached_head(&self, slot: u64) -> Result<Option<bool>> {
|
|
|
|
|
self.detached_heads_cf.get(&DetachedHeadsCf::key(&slot))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn reset_slot_consumed(&self, slot: u64) -> Result<()> {
|
|
|
|
|
let meta_key = MetaCf::key(&slot);
|
|
|
|
|
if let Some(mut meta) = self.meta_cf.get(&meta_key)? {
|
|
|
|
@ -277,18 +284,16 @@ impl Blocktree {
|
|
|
|
|
.meta(blob_slot)
|
|
|
|
|
.expect("Expect database get to succeed")
|
|
|
|
|
{
|
|
|
|
|
// If parent_slot == std::u64::MAX, then this is one of the dummy metadatas inserted
|
|
|
|
|
let backup = Some(meta.clone());
|
|
|
|
|
// If parent_slot == std::u64::MAX, then this is one of the detached heads inserted
|
|
|
|
|
// during the chaining process, see the function find_slot_meta_in_cached_state()
|
|
|
|
|
// for details
|
|
|
|
|
if meta.parent_slot == std::u64::MAX {
|
|
|
|
|
// for details. Slots that are detached heads are missing a parent_slot, so we should
|
|
|
|
|
// fill in the parent now that we know it.
|
|
|
|
|
if Self::is_detached_head(&meta) {
|
|
|
|
|
meta.parent_slot = parent_slot;
|
|
|
|
|
// Set backup as None so that all the logic for inserting new slots
|
|
|
|
|
// still runs, as this placeholder slot is essentially equivalent to
|
|
|
|
|
// inserting a new slot
|
|
|
|
|
(Rc::new(RefCell::new(meta.clone())), None)
|
|
|
|
|
} else {
|
|
|
|
|
(Rc::new(RefCell::new(meta.clone())), Some(meta))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(Rc::new(RefCell::new(meta)), backup)
|
|
|
|
|
} else {
|
|
|
|
|
(
|
|
|
|
|
Rc::new(RefCell::new(SlotMeta::new(blob_slot, parent_slot))),
|
|
|
|
@ -318,8 +323,8 @@ impl Blocktree {
|
|
|
|
|
|
|
|
|
|
// Check if any metadata was changed, if so, insert the new version of the
|
|
|
|
|
// metadata into the write batch
|
|
|
|
|
for (slot, (meta_copy, meta_backup)) in slot_meta_working_set.iter() {
|
|
|
|
|
let meta: &SlotMeta = &RefCell::borrow(&*meta_copy);
|
|
|
|
|
for (slot, (meta, meta_backup)) in slot_meta_working_set.iter() {
|
|
|
|
|
let meta: &SlotMeta = &RefCell::borrow(&*meta);
|
|
|
|
|
// Check if the working copy of the metadata has changed
|
|
|
|
|
if Some(meta) != meta_backup.as_ref() {
|
|
|
|
|
should_signal = should_signal || Self::slot_has_updates(meta, &meta_backup);
|
|
|
|
@ -637,12 +642,12 @@ impl Blocktree {
|
|
|
|
|
let mut new_chained_slots = HashMap::new();
|
|
|
|
|
let working_set_slots: Vec<_> = working_set.iter().map(|s| *s.0).collect();
|
|
|
|
|
for slot in working_set_slots {
|
|
|
|
|
self.handle_chaining_for_slot(working_set, &mut new_chained_slots, slot)?;
|
|
|
|
|
self.handle_chaining_for_slot(write_batch, working_set, &mut new_chained_slots, slot)?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Write all the newly changed slots in new_chained_slots to the write_batch
|
|
|
|
|
for (slot, meta_copy) in new_chained_slots.iter() {
|
|
|
|
|
let meta: &SlotMeta = &RefCell::borrow(&*meta_copy);
|
|
|
|
|
for (slot, meta) in new_chained_slots.iter() {
|
|
|
|
|
let meta: &SlotMeta = &RefCell::borrow(&*meta);
|
|
|
|
|
write_batch.put_cf(self.meta_cf.handle(), &MetaCf::key(slot), &serialize(meta)?)?;
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
@ -650,53 +655,101 @@ impl Blocktree {
|
|
|
|
|
|
|
|
|
|
fn handle_chaining_for_slot(
|
|
|
|
|
&self,
|
|
|
|
|
write_batch: &mut WriteBatch,
|
|
|
|
|
working_set: &HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
|
|
|
|
|
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
|
|
|
|
slot: u64,
|
|
|
|
|
) -> Result<()> {
|
|
|
|
|
let (meta_copy, meta_backup) = working_set
|
|
|
|
|
let (meta, meta_backup) = working_set
|
|
|
|
|
.get(&slot)
|
|
|
|
|
.expect("Slot must exist in the working_set hashmap");
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
let mut slot_meta = meta_copy.borrow_mut();
|
|
|
|
|
let is_detached_head =
|
|
|
|
|
meta_backup.is_some() && Self::is_detached_head(meta_backup.as_ref().unwrap());
|
|
|
|
|
|
|
|
|
|
let mut meta_mut = meta.borrow_mut();
|
|
|
|
|
|
|
|
|
|
// If:
|
|
|
|
|
// 1) This is a new slot
|
|
|
|
|
// 2) slot != 0
|
|
|
|
|
// then try to chain this slot to a previous slot
|
|
|
|
|
if slot != 0 {
|
|
|
|
|
let prev_slot = slot_meta.parent_slot;
|
|
|
|
|
let prev_slot = meta_mut.parent_slot;
|
|
|
|
|
|
|
|
|
|
// Check if slot_meta is a new slot
|
|
|
|
|
if meta_backup.is_none() {
|
|
|
|
|
let prev_slot =
|
|
|
|
|
// Check if the slot represented by meta_mut is either a new slot or a detached head.
|
|
|
|
|
// In both cases we need to run the chaining logic b/c the parent on the slot was
|
|
|
|
|
// previously unknown.
|
|
|
|
|
if meta_backup.is_none() || is_detached_head {
|
|
|
|
|
let prev_slot_meta =
|
|
|
|
|
self.find_slot_meta_else_create(working_set, new_chained_slots, prev_slot)?;
|
|
|
|
|
|
|
|
|
|
// This is a newly inserted slot so:
|
|
|
|
|
// 1) Chain to the previous slot, and also
|
|
|
|
|
// 2) Determine whether to set the is_connected flag
|
|
|
|
|
// This is a newly inserted slot so run the chaining logic
|
|
|
|
|
self.chain_new_slot_to_prev_slot(
|
|
|
|
|
&mut prev_slot.borrow_mut(),
|
|
|
|
|
&mut prev_slot_meta.borrow_mut(),
|
|
|
|
|
slot,
|
|
|
|
|
&mut slot_meta,
|
|
|
|
|
&mut meta_mut,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if Self::is_detached_head(&RefCell::borrow(&*prev_slot_meta)) {
|
|
|
|
|
write_batch.put_cf(
|
|
|
|
|
self.detached_heads_cf.handle(),
|
|
|
|
|
&DetachedHeadsCf::key(&prev_slot),
|
|
|
|
|
&serialize(&true)?,
|
|
|
|
|
)?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if self.is_newly_completed_slot(&RefCell::borrow(&*meta_copy), meta_backup)
|
|
|
|
|
&& RefCell::borrow(&*meta_copy).is_connected
|
|
|
|
|
// At this point this slot has received a parent, so no longer a detached head
|
|
|
|
|
if is_detached_head {
|
|
|
|
|
write_batch.delete_cf(
|
|
|
|
|
self.detached_heads_cf.handle(),
|
|
|
|
|
&DetachedHeadsCf::key(&slot),
|
|
|
|
|
)?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This is a newly inserted slot and slot.is_connected is true, so update all
|
|
|
|
|
// child slots so that their `is_connected` = true
|
|
|
|
|
let should_propagate_is_connected =
|
|
|
|
|
Self::is_newly_completed_slot(&RefCell::borrow(&*meta), meta_backup)
|
|
|
|
|
&& RefCell::borrow(&*meta).is_connected;
|
|
|
|
|
|
|
|
|
|
if should_propagate_is_connected {
|
|
|
|
|
// slot_function returns a boolean indicating whether to explore the children
|
|
|
|
|
// of the input slot
|
|
|
|
|
let slot_function = |slot: &mut SlotMeta| {
|
|
|
|
|
slot.is_connected = true;
|
|
|
|
|
|
|
|
|
|
// We don't want to set the is_connected flag on the children of non-full
|
|
|
|
|
// slots
|
|
|
|
|
slot.is_full()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
self.traverse_children_mut(slot, &meta, working_set, new_chained_slots, slot_function)?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn traverse_children_mut<F>(
|
|
|
|
|
&self,
|
|
|
|
|
slot: u64,
|
|
|
|
|
slot_meta: &Rc<RefCell<(SlotMeta)>>,
|
|
|
|
|
working_set: &HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
|
|
|
|
|
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
|
|
|
|
|
slot_function: F,
|
|
|
|
|
) -> Result<()>
|
|
|
|
|
where
|
|
|
|
|
F: Fn(&mut SlotMeta) -> bool,
|
|
|
|
|
{
|
|
|
|
|
// This is a newly inserted slot and slot.is_connected is true, so go through
|
|
|
|
|
// and update all child slots with is_connected if applicable
|
|
|
|
|
let mut next_slots: Vec<(u64, Rc<RefCell<(SlotMeta)>>)> =
|
|
|
|
|
vec![(slot, meta_copy.clone())];
|
|
|
|
|
let mut next_slots: Vec<(u64, Rc<RefCell<(SlotMeta)>>)> = vec![(slot, slot_meta.clone())];
|
|
|
|
|
while !next_slots.is_empty() {
|
|
|
|
|
let (_, current_slot) = next_slots.pop().unwrap();
|
|
|
|
|
current_slot.borrow_mut().is_connected = true;
|
|
|
|
|
|
|
|
|
|
// Check whether we should explore the children of this slot
|
|
|
|
|
if slot_function(&mut current_slot.borrow_mut()) {
|
|
|
|
|
let current_slot = &RefCell::borrow(&*current_slot);
|
|
|
|
|
if current_slot.is_full() {
|
|
|
|
|
for next_slot_index in current_slot.next_slots.iter() {
|
|
|
|
|
let next_slot = self.find_slot_meta_else_create(
|
|
|
|
|
working_set,
|
|
|
|
@ -707,11 +760,18 @@ impl Blocktree {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn is_detached_head(meta: &SlotMeta) -> bool {
|
|
|
|
|
// If we have children, but no parent, then this is the head of a detached chain of
|
|
|
|
|
// slots
|
|
|
|
|
meta.parent_slot == std::u64::MAX
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 1) Chain current_slot to the previous slot defined by prev_slot_meta
|
|
|
|
|
// 2) Determine whether to set the is_connected flag
|
|
|
|
|
fn chain_new_slot_to_prev_slot(
|
|
|
|
|
&self,
|
|
|
|
|
prev_slot_meta: &mut SlotMeta,
|
|
|
|
@ -722,20 +782,17 @@ impl Blocktree {
|
|
|
|
|
current_slot_meta.is_connected = prev_slot_meta.is_connected && prev_slot_meta.is_full();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn is_newly_completed_slot(
|
|
|
|
|
&self,
|
|
|
|
|
slot_meta: &SlotMeta,
|
|
|
|
|
backup_slot_meta: &Option<SlotMeta>,
|
|
|
|
|
) -> bool {
|
|
|
|
|
fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option<SlotMeta>) -> bool {
|
|
|
|
|
slot_meta.is_full()
|
|
|
|
|
&& (backup_slot_meta.is_none()
|
|
|
|
|
|| Self::is_detached_head(&backup_slot_meta.as_ref().unwrap())
|
|
|
|
|
|| slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 1) Find the slot metadata in the cache of dirty slot metadata we've previously touched,
|
|
|
|
|
// else:
|
|
|
|
|
// 2) Search the database for that slot metadata. If still no luck, then
|
|
|
|
|
// 3) Create a dummy placeholder slot in the database
|
|
|
|
|
// 2) Search the database for that slot metadata. If still no luck, then:
|
|
|
|
|
// 3) Create a dummy `detached head` slot in the database
|
|
|
|
|
fn find_slot_meta_else_create<'a>(
|
|
|
|
|
&self,
|
|
|
|
|
working_set: &'a HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
|
|
|
|
@ -751,7 +808,7 @@ impl Blocktree {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Search the database for that slot metadata. If still no luck, then
|
|
|
|
|
// create a dummy placeholder slot in the database
|
|
|
|
|
// create a dummy `detached head` slot in the database
|
|
|
|
|
fn find_slot_meta_in_db_else_create<'a>(
|
|
|
|
|
&self,
|
|
|
|
|
slot: u64,
|
|
|
|
@ -761,7 +818,7 @@ impl Blocktree {
|
|
|
|
|
insert_map.insert(slot, Rc::new(RefCell::new(slot_meta)));
|
|
|
|
|
Ok(insert_map.get(&slot).unwrap().clone())
|
|
|
|
|
} else {
|
|
|
|
|
// If this slot doesn't exist, make a placeholder slot. This way we
|
|
|
|
|
// If this slot doesn't exist, make a `detached head` slot. This way we
|
|
|
|
|
// remember which slots chained to this one when we eventually get a real blob
|
|
|
|
|
// for this slot
|
|
|
|
|
insert_map.insert(
|
|
|
|
@ -1020,6 +1077,7 @@ pub fn tmp_copy_blocktree(from: &str, name: &str) -> String {
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
pub mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
use crate::blocktree::db::Database;
|
|
|
|
|
use crate::entry::{
|
|
|
|
|
create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_hash, Entry, EntrySlice,
|
|
|
|
|
};
|
|
|
|
@ -1817,7 +1875,7 @@ pub mod tests {
|
|
|
|
|
// If "i" is the index of a slot we just inserted, then next_slots should be empty
|
|
|
|
|
// for slot "i" because no slots chain to that slot, because slot i + 1 is missing.
|
|
|
|
|
// However, if it's a slot we haven't inserted, aka one of the gaps, then one of the slots
|
|
|
|
|
// we just inserted will chain to that gap, so next_slots for that placeholder
|
|
|
|
|
// we just inserted will chain to that gap, so next_slots for that `detached head`
|
|
|
|
|
// slot won't be empty, but the parent slot is unknown so should equal std::u64::MAX.
|
|
|
|
|
let s = blocktree.meta(i as u64).unwrap().unwrap();
|
|
|
|
|
if i % 2 == 0 {
|
|
|
|
@ -1990,6 +2048,7 @@ pub mod tests {
|
|
|
|
|
let slot_meta = blocktree.meta(slot).unwrap().unwrap();
|
|
|
|
|
assert_eq!(slot_meta.consumed, entries_per_slot);
|
|
|
|
|
assert_eq!(slot_meta.received, entries_per_slot);
|
|
|
|
|
assert!(slot_meta.is_connected);
|
|
|
|
|
let slot_parent = {
|
|
|
|
|
if slot == 0 {
|
|
|
|
|
0
|
|
|
|
@ -2017,6 +2076,9 @@ pub mod tests {
|
|
|
|
|
}
|
|
|
|
|
assert_eq!(expected_children, result);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Detached heads is empty
|
|
|
|
|
assert!(blocktree.detached_heads_cf.is_empty().unwrap())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
|
|
|
@ -2065,6 +2127,63 @@ pub mod tests {
|
|
|
|
|
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_detached_head() {
|
|
|
|
|
let blocktree_path = get_tmp_ledger_path("test_is_detached_head");
|
|
|
|
|
{
|
|
|
|
|
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
|
|
|
|
|
|
|
|
|
// Create blobs and entries
|
|
|
|
|
let entries_per_slot = 1;
|
|
|
|
|
let (blobs, _) = make_many_slot_entries(0, 3, entries_per_slot);
|
|
|
|
|
|
|
|
|
|
// Write slot 2, which chains to slot 1. We're missing slot 0,
|
|
|
|
|
// so slot 1 is the detached head
|
|
|
|
|
blocktree.write_blobs(once(&blobs[2])).unwrap();
|
|
|
|
|
let meta = blocktree
|
|
|
|
|
.meta(1)
|
|
|
|
|
.expect("Expect database get to succeed")
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert!(Blocktree::is_detached_head(&meta));
|
|
|
|
|
assert_eq!(get_detached_heads(&blocktree), vec![1]);
|
|
|
|
|
|
|
|
|
|
// Write slot 1 which chains to slot 0, so now slot 0 is the
|
|
|
|
|
// detached head, and slot 1 is no longer the detached head.
|
|
|
|
|
blocktree.write_blobs(once(&blobs[1])).unwrap();
|
|
|
|
|
let meta = blocktree
|
|
|
|
|
.meta(1)
|
|
|
|
|
.expect("Expect database get to succeed")
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert!(!Blocktree::is_detached_head(&meta));
|
|
|
|
|
let meta = blocktree
|
|
|
|
|
.meta(0)
|
|
|
|
|
.expect("Expect database get to succeed")
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert!(Blocktree::is_detached_head(&meta));
|
|
|
|
|
assert_eq!(get_detached_heads(&blocktree), vec![0]);
|
|
|
|
|
|
|
|
|
|
// Write some slot that also chains to existing slots and detached head,
|
|
|
|
|
// nothing should change
|
|
|
|
|
let blob4 = &make_slot_entries(4, 0, 1).0[0];
|
|
|
|
|
let blob5 = &make_slot_entries(5, 1, 1).0[0];
|
|
|
|
|
blocktree.write_blobs(vec![blob4, blob5]).unwrap();
|
|
|
|
|
assert_eq!(get_detached_heads(&blocktree), vec![0]);
|
|
|
|
|
|
|
|
|
|
// Write zeroth slot, no more detached heads
|
|
|
|
|
blocktree.write_blobs(once(&blobs[0])).unwrap();
|
|
|
|
|
for i in 0..3 {
|
|
|
|
|
let meta = blocktree
|
|
|
|
|
.meta(i)
|
|
|
|
|
.expect("Expect database get to succeed")
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert!(!Blocktree::is_detached_head(&meta));
|
|
|
|
|
}
|
|
|
|
|
// Detached heads is empty
|
|
|
|
|
assert!(blocktree.detached_heads_cf.is_empty().unwrap())
|
|
|
|
|
}
|
|
|
|
|
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn test_insert_data_blobs_slots(name: &str, should_bulk_write: bool) {
|
|
|
|
|
let blocktree_path = get_tmp_ledger_path(name);
|
|
|
|
|
{
|
|
|
|
@ -2325,4 +2444,18 @@ pub mod tests {
|
|
|
|
|
|
|
|
|
|
(blobs, entries)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn get_detached_heads(blocktree: &Blocktree) -> Vec<u64> {
|
|
|
|
|
let mut results = vec![];
|
|
|
|
|
let mut iter = blocktree
|
|
|
|
|
.db
|
|
|
|
|
.raw_iterator_cf(blocktree.detached_heads_cf.handle())
|
|
|
|
|
.unwrap();
|
|
|
|
|
iter.seek_to_first();
|
|
|
|
|
while iter.valid() {
|
|
|
|
|
results.push(DetachedHeadsCf::index(&iter.key().unwrap()));
|
|
|
|
|
iter.next();
|
|
|
|
|
}
|
|
|
|
|
results
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|