fix blocktree tests

This commit is contained in:
Carl
2019-02-12 19:54:18 -08:00
committed by Michael Vines
parent de6109c599
commit 34da362ee6
3 changed files with 255 additions and 226 deletions

View File

@ -51,10 +51,10 @@ slot index and blob index for an entry, and the value is the entry data. Note bl
* `next_slots` - A list of future slots this slot could chain to. Used when rebuilding * `next_slots` - A list of future slots this slot could chain to. Used when rebuilding
the ledger to find possible fork points. the ledger to find possible fork points.
* `consumed_ticks` - Tick height of the highest received blob (used to identify when a slot is full) * `consumed_ticks` - Tick height of the highest received blob (used to identify when a slot is full)
* `is_trunk` - True iff every block from 0...slot forms a full sequence without any holes. We can derive is_trunk for each slot with the following rules. Let slot(n) be the slot with index `n`, and slot(n).contains_all_ticks() is true if the slot with index `n` has all the ticks expected for that slot. Let is_trunk(n) be the statement that "the slot(n).is_trunk is true". Then: * `is_trunk` - True iff every block from 0...slot forms a full sequence without any holes. We can derive is_trunk for each slot with the following rules. Let slot(n) be the slot with index `n`, and slot(n).is_full() is true if the slot with index `n` has all the ticks expected for that slot. Let is_trunk(n) be the statement that "the slot(n).is_trunk is true". Then:
is_trunk(0) is_trunk(0)
is_trunk(n+1) iff (is_trunk(n) and slot(n).contains_all_ticks() is_trunk(n+1) iff (is_trunk(n) and slot(n).is_full()
3. Chaining - When a blob for a new slot `x` arrives, we check the number of blocks (`num_blocks`) for that new slot (this information is encoded in the blob). We then know that this new slot chains to slot `x - num_blocks`. 3. Chaining - When a blob for a new slot `x` arrives, we check the number of blocks (`num_blocks`) for that new slot (this information is encoded in the blob). We then know that this new slot chains to slot `x - num_blocks`.

View File

@ -142,7 +142,7 @@ pub struct SlotMeta {
} }
impl SlotMeta { impl SlotMeta {
pub fn contains_all_ticks(&self) -> bool { pub fn is_full(&self) -> bool {
self.consumed > self.last_index self.consumed > self.last_index
} }
@ -564,7 +564,7 @@ impl Blocktree {
let slot_meta = &mut entry.0.borrow_mut(); let slot_meta = &mut entry.0.borrow_mut();
// This slot is full, skip the bogus blob // This slot is full, skip the bogus blob
if slot_meta.contains_all_ticks() { if slot_meta.is_full() {
continue; continue;
} }
@ -998,7 +998,7 @@ impl Blocktree {
current_slot.borrow_mut().is_trunk = true; current_slot.borrow_mut().is_trunk = true;
let current_slot = &RefCell::borrow(&*current_slot); let current_slot = &RefCell::borrow(&*current_slot);
if current_slot.contains_all_ticks() { if current_slot.is_full() {
for next_slot_index in current_slot.next_slots.iter() { for next_slot_index in current_slot.next_slots.iter() {
let next_slot = self.find_slot_meta_else_create( let next_slot = self.find_slot_meta_else_create(
working_set, working_set,
@ -1021,7 +1021,7 @@ impl Blocktree {
current_slot: &mut SlotMeta, current_slot: &mut SlotMeta,
) { ) {
prev_slot.next_slots.push(current_slot_height); prev_slot.next_slots.push(current_slot_height);
current_slot.is_trunk = prev_slot.is_trunk && prev_slot.contains_all_ticks(); current_slot.is_trunk = prev_slot.is_trunk && prev_slot.is_full();
} }
fn is_newly_completed_slot( fn is_newly_completed_slot(
@ -1029,7 +1029,7 @@ impl Blocktree {
slot_meta: &SlotMeta, slot_meta: &SlotMeta,
backup_slot_meta: &Option<SlotMeta>, backup_slot_meta: &Option<SlotMeta>,
) -> bool { ) -> bool {
slot_meta.contains_all_ticks() slot_meta.is_full()
&& (backup_slot_meta.is_none() && (backup_slot_meta.is_none()
|| slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed) || slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed)
} }
@ -1120,7 +1120,6 @@ impl Blocktree {
None, None,
)?; )?;
let blob_to_insert = Cow::Borrowed(&blob_to_insert.data[..]);
// Add one because we skipped this current blob when calling // Add one because we skipped this current blob when calling
// get_slot_consecutive_blobs() earlier // get_slot_consecutive_blobs() earlier
slot_meta.consumed + blob_datas.len() as u64 + 1 slot_meta.consumed + blob_datas.len() as u64 + 1
@ -1145,7 +1144,7 @@ impl Blocktree {
// set it to this blob index // set it to this blob index
if slot_meta.last_index == std::u64::MAX { if slot_meta.last_index == std::u64::MAX {
if blob_to_insert.is_last_in_slot() { if blob_to_insert.is_last_in_slot() {
blob_slot blob_index
} else { } else {
std::u64::MAX std::u64::MAX
} }
@ -1197,11 +1196,7 @@ impl Blocktree {
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
let mut bootstrap_meta = SlotMeta::new(0, 1); let mut bootstrap_meta = SlotMeta::new(0, 1);
let last = blobs.last().unwrap(); let last = blobs.last().unwrap();
let num_ending_ticks = blobs.iter().skip(2).fold(0, |tick_count, blob| {
let entry: Entry = deserialize(&blob.data[BLOB_HEADER_SIZE..])
.expect("Blob has to be deseriablizable");
tick_count + entry.is_tick() as u64
});
bootstrap_meta.consumed = last.index() + 1; bootstrap_meta.consumed = last.index() + 1;
bootstrap_meta.received = last.index() + 1; bootstrap_meta.received = last.index() + 1;
bootstrap_meta.is_trunk = true; bootstrap_meta.is_trunk = true;
@ -1377,10 +1372,11 @@ pub fn tmp_copy_ledger(from: &str, name: &str, config: &BlocktreeConfig) -> Stri
mod tests { mod tests {
use super::*; use super::*;
use crate::entry::{ use crate::entry::{
create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_id, EntrySlice, create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_id, Entry, EntrySlice,
}; };
use crate::packet::index_blobs; use crate::packet::index_blobs;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use std::iter::once;
use std::time::Duration; use std::time::Duration;
#[test] #[test]
@ -1498,49 +1494,45 @@ mod tests {
#[test] #[test]
fn test_insert_data_blobs_basic() { fn test_insert_data_blobs_basic() {
let entries = make_tiny_test_entries(2); let num_entries = 5;
let shared_blobs = entries.to_shared_blobs(); assert!(num_entries > 1);
let slot_height = 0;
for (i, b) in shared_blobs.iter().enumerate() { let (blobs, entries) = make_slot_entries(0, 0, num_entries);
b.write().unwrap().set_index(i as u64);
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_basic"); let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_basic");
let ledger = Blocktree::open(&ledger_path).unwrap(); let ledger = Blocktree::open(&ledger_path).unwrap();
// Insert second blob, we're missing the first blob, so no consecutive // Insert last blob, we're missing the other blobs, so no consecutive
// blobs starting from slot 0, index 0 should exist. // blobs starting from slot 0, index 0 should exist.
ledger.insert_data_blobs(vec![blobs[1]]).unwrap(); ledger
assert!(ledger .insert_data_blobs(once(&blobs[num_entries as usize - 1]))
.get_slot_entries(slot_height, 0, None) .unwrap();
.unwrap() assert!(ledger.get_slot_entries(0, 0, None).unwrap().is_empty());
.is_empty());
let meta = ledger let meta = ledger
.meta_cf .meta_cf
.get(&MetaCf::key(slot_height)) .get(&MetaCf::key(0))
.unwrap() .unwrap()
.expect("Expected new metadata object to be created"); .expect("Expected new metadata object to be created");
assert!(meta.consumed == 0 && meta.received == 2); assert!(meta.consumed == 0 && meta.received == num_entries);
// Insert first blob, check for consecutive returned entries // Insert the other blobs, check for consecutive returned entries
ledger.insert_data_blobs(vec![blobs[0]]).unwrap(); ledger
let result = ledger.get_slot_entries(slot_height, 0, None).unwrap(); .insert_data_blobs(&blobs[0..(num_entries - 1) as usize])
.unwrap();
let result = ledger.get_slot_entries(0, 0, None).unwrap();
assert_eq!(result, entries); assert_eq!(result, entries);
let meta = ledger let meta = ledger
.meta_cf .meta_cf
.get(&MetaCf::key(slot_height)) .get(&MetaCf::key(0))
.unwrap() .unwrap()
.expect("Expected new metadata object to exist"); .expect("Expected new metadata object to exist");
assert_eq!(meta.consumed, 2); assert_eq!(meta.consumed, num_entries);
assert_eq!(meta.received, 2); assert_eq!(meta.received, num_entries);
assert_eq!(meta.consumed_ticks, 0); assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_entries - 1);
assert!(meta.next_slots.is_empty()); assert!(meta.next_slots.is_empty());
assert!(meta.is_trunk); assert!(meta.is_trunk);
@ -1550,36 +1542,31 @@ mod tests {
} }
#[test] #[test]
fn test_insert_data_blobs_multiple() { fn test_insert_data_blobs_reverse() {
let num_blobs = 10; let num_entries = 10;
let entries = make_tiny_test_entries(num_blobs); let (blobs, entries) = make_slot_entries(0, 0, num_entries);
let slot_height = 0;
let shared_blobs = entries.to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64);
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_multiple"); let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_reverse");
let ledger = Blocktree::open(&ledger_path).unwrap(); let ledger = Blocktree::open(&ledger_path).unwrap();
// Insert blobs in reverse, check for consecutive returned blobs // Insert blobs in reverse, check for consecutive returned blobs
for i in (0..num_blobs).rev() { for i in (0..num_entries).rev() {
ledger.insert_data_blobs(vec![blobs[i]]).unwrap(); ledger.insert_data_blobs(once(&blobs[i as usize])).unwrap();
let result = ledger.get_slot_entries(slot_height, 0, None).unwrap(); let result = ledger.get_slot_entries(0, 0, None).unwrap();
let meta = ledger let meta = ledger
.meta_cf .meta_cf
.get(&MetaCf::key(slot_height)) .get(&MetaCf::key(0))
.unwrap() .unwrap()
.expect("Expected metadata object to exist"); .expect("Expected metadata object to exist");
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_entries - 1);
if i != 0 { if i != 0 {
assert_eq!(result.len(), 0); assert_eq!(result.len(), 0);
assert!(meta.consumed == 0 && meta.received == num_blobs as u64); assert!(meta.consumed == 0 && meta.received == num_entries as u64);
} else { } else {
assert_eq!(result, entries); assert_eq!(result, entries);
assert!(meta.consumed == num_blobs as u64 && meta.received == num_blobs as u64); assert!(meta.consumed == num_entries as u64 && meta.received == num_entries as u64);
} }
} }
@ -1706,32 +1693,33 @@ mod tests {
{ {
let blocktree = Blocktree::open(&blocktree_path).unwrap(); let blocktree = Blocktree::open(&blocktree_path).unwrap();
let slot = 0; let slot = 0;
let parent_slot = 0;
// Write entries // Write entries
let num_entries = 20 as u64; let num_entries = 21 as u64;
let original_entries = create_ticks(num_entries, Hash::default()); let (blobs, original_entries) = make_slot_entries(slot, parent_slot, num_entries);
let shared_blobs = original_entries.clone().to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(i as u64);
w_b.set_slot(slot);
}
blocktree blocktree
.write_shared_blobs(shared_blobs.iter().skip(1).step_by(2)) .write_blobs(blobs.iter().skip(1).step_by(2))
.unwrap(); .unwrap();
assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]);
let meta_key = MetaCf::key(slot); let meta_key = MetaCf::key(slot);
let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap();
if num_entries % 2 == 0 {
assert_eq!(meta.received, num_entries); assert_eq!(meta.received, num_entries);
} else {
assert_eq!(meta.received, num_entries - 1);
}
assert_eq!(meta.consumed, 0); assert_eq!(meta.consumed, 0);
assert_eq!(meta.consumed_ticks, 0); assert_eq!(meta.parent_slot, 0);
if num_entries % 2 == 0 {
assert_eq!(meta.last_index, num_entries - 1);
} else {
assert_eq!(meta.last_index, std::u64::MAX);
}
blocktree blocktree.write_blobs(blobs.iter().step_by(2)).unwrap();
.write_shared_blobs(shared_blobs.iter().step_by(2))
.unwrap();
assert_eq!( assert_eq!(
blocktree.get_slot_entries(0, 0, None).unwrap(), blocktree.get_slot_entries(0, 0, None).unwrap(),
@ -1742,7 +1730,8 @@ mod tests {
let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap();
assert_eq!(meta.received, num_entries); assert_eq!(meta.received, num_entries);
assert_eq!(meta.consumed, num_entries); assert_eq!(meta.consumed, num_entries);
assert_eq!(meta.consumed_ticks, num_entries); assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_entries - 1);
} }
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
@ -1755,47 +1744,47 @@ mod tests {
{ {
let blocktree = Blocktree::open(&blocktree_path).unwrap(); let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Write entries // Make duplicate entries and blobs
let num_entries = 10 as u64;
let num_duplicates = 2; let num_duplicates = 2;
let original_entries: Vec<Entry> = make_tiny_test_entries(num_entries as usize) let num_unique_entries = 10;
let (original_entries, blobs) = {
let (blobs, entries) = make_slot_entries(0, 0, num_unique_entries);
let entries: Vec<_> = entries
.into_iter() .into_iter()
.flat_map(|e| vec![e; num_duplicates]) .flat_map(|e| vec![e.clone(), e])
.collect(); .collect();
let blobs: Vec<_> = blobs.into_iter().flat_map(|b| vec![b.clone(), b]).collect();
let shared_blobs = original_entries.clone().to_shared_blobs(); (entries, blobs)
for (i, b) in shared_blobs.iter().enumerate() { };
let index = (i / 2) as u64;
let mut w_b = b.write().unwrap();
w_b.set_index(index);
}
blocktree blocktree
.write_shared_blobs( .write_blobs(
shared_blobs blobs
.iter() .iter()
.skip(num_duplicates) .skip(num_duplicates as usize)
.step_by(num_duplicates * 2), .step_by(num_duplicates as usize * 2),
) )
.unwrap(); .unwrap();
assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]);
blocktree blocktree
.write_shared_blobs(shared_blobs.iter().step_by(num_duplicates * 2)) .write_blobs(blobs.iter().step_by(num_duplicates as usize * 2))
.unwrap(); .unwrap();
let expected: Vec<_> = original_entries let expected: Vec<_> = original_entries
.into_iter() .into_iter()
.step_by(num_duplicates) .step_by(num_duplicates as usize)
.collect(); .collect();
assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), expected,); assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), expected,);
let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT);
let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap();
assert_eq!(meta.consumed, num_entries); assert_eq!(meta.consumed, num_unique_entries);
assert_eq!(meta.received, num_entries); assert_eq!(meta.received, num_unique_entries);
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_unique_entries - 1);
} }
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
} }
@ -1856,32 +1845,26 @@ mod tests {
pub fn test_new_blobs_signal() { pub fn test_new_blobs_signal() {
// Initialize ledger // Initialize ledger
let ledger_path = get_tmp_ledger_path("test_new_blobs_signal"); let ledger_path = get_tmp_ledger_path("test_new_blobs_signal");
let ticks_per_slot = 10; let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap();
let config = BlocktreeConfig::new(ticks_per_slot);
let (ledger, _, recvr) = Blocktree::open_with_config_signal(&ledger_path, &config).unwrap();
let ledger = Arc::new(ledger); let ledger = Arc::new(ledger);
// Create ticks for slot 0 let entries_per_slot = 10;
let entries = create_ticks(ticks_per_slot, Hash::default()); // Create entries for slot 0
let mut blobs = entries.to_blobs(); let (blobs, _) = make_slot_entries(0, 0, entries_per_slot);
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64);
}
// Insert second blob, but we're missing the first blob, so no consecutive // Insert second blob, but we're missing the first blob, so no consecutive
// blobs starting from slot 0, index 0 should exist. // blobs starting from slot 0, index 0 should exist.
ledger.insert_data_blobs(&blobs[1..2]).unwrap(); ledger.insert_data_blobs(once(&blobs[1])).unwrap();
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
assert!(recvr.recv_timeout(timer).is_err()); assert!(recvr.recv_timeout(timer).is_err());
// Insert first blob, now we've made a consecutive block // Insert first blob, now we've made a consecutive block
ledger.insert_data_blobs(&blobs[0..1]).unwrap(); ledger.insert_data_blobs(once(&blobs[0])).unwrap();
// Wait to get notified of update, should only be one update // Wait to get notified of update, should only be one update
assert!(recvr.recv_timeout(timer).is_ok()); assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err()); assert!(recvr.try_recv().is_err());
// Insert the rest of the ticks // Insert the rest of the ticks
ledger ledger
.insert_data_blobs(&blobs[1..ticks_per_slot as usize]) .insert_data_blobs(&blobs[1..entries_per_slot as usize])
.unwrap(); .unwrap();
// Wait to get notified of update, should only be one update // Wait to get notified of update, should only be one update
assert!(recvr.recv_timeout(timer).is_ok()); assert!(recvr.recv_timeout(timer).is_ok());
@ -1890,68 +1873,48 @@ mod tests {
// Create some other slots, and send batches of ticks for each slot such that each slot // Create some other slots, and send batches of ticks for each slot such that each slot
// is missing the tick at blob index == slot index - 1. Thus, no consecutive blocks // is missing the tick at blob index == slot index - 1. Thus, no consecutive blocks
// will be formed // will be formed
let num_slots = ticks_per_slot; let num_slots = entries_per_slot;
let mut all_blobs = vec![]; let mut blobs: Vec<Blob> = vec![];
for slot_index in 1..num_slots + 1 { let mut missing_blobs = vec![];
let entries = create_ticks(num_slots, Hash::default()); for slot_height in 1..num_slots + 1 {
let mut blobs = entries.to_blobs(); let (mut slot_blobs, _) =
for (i, ref mut b) in blobs.iter_mut().enumerate() { make_slot_entries(slot_height, slot_height - 1, entries_per_slot);
if (i as u64) < slot_index - 1 { let missing_blob = slot_blobs.remove(slot_height as usize - 1);
b.set_index(i as u64); blobs.extend(slot_blobs);
} else { missing_blobs.push(missing_blob);
b.set_index(i as u64 + 1);
}
b.set_slot(slot_index as u64);
}
all_blobs.extend(blobs);
} }
// Should be no updates, since no new chains from block 0 were formed // Should be no updates, since no new chains from block 0 were formed
ledger.insert_data_blobs(all_blobs.iter()).unwrap(); ledger.insert_data_blobs(blobs.iter()).unwrap();
assert!(recvr.recv_timeout(timer).is_err()); assert!(recvr.recv_timeout(timer).is_err());
// Insert a blob for each slot that doesn't make a consecutive block, we // Insert a blob for each slot that doesn't make a consecutive block, we
// should get no updates // should get no updates
let all_blobs: Vec<_> = (1..num_slots + 1) let blobs: Vec<_> = (1..num_slots + 1)
.map(|slot_index| { .flat_map(|slot_height| {
let entries = create_ticks(1, Hash::default());; let (mut blob, _) = make_slot_entries(slot_height, slot_height - 1, 1);
let mut blob = entries[0].to_blob(); blob[0].set_index(2 * num_slots as u64);
blob.set_index(2 * num_slots as u64);
blob.set_slot(slot_index as u64);
blob blob
}) })
.collect(); .collect();
ledger.insert_data_blobs(all_blobs.iter()).unwrap(); ledger.insert_data_blobs(blobs.iter()).unwrap();
assert!(recvr.recv_timeout(timer).is_err()); assert!(recvr.recv_timeout(timer).is_err());
// For slots 1..num_slots/2, fill in the holes in one batch insertion, // For slots 1..num_slots/2, fill in the holes in one batch insertion,
// so we should only get one signal // so we should only get one signal
let all_blobs: Vec<_> = (1..num_slots / 2) ledger
.map(|slot_index| { .insert_data_blobs(&missing_blobs[..(num_slots / 2) as usize])
let entries = make_tiny_test_entries(1); .unwrap();
let mut blob = entries[0].to_blob();
blob.set_index(slot_index as u64 - 1);
blob.set_slot(slot_index as u64);
blob
})
.collect();
ledger.insert_data_blobs(all_blobs.iter()).unwrap();
assert!(recvr.recv_timeout(timer).is_ok()); assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err()); assert!(recvr.try_recv().is_err());
// Fill in the holes for each of the remaining slots, we should get a single update // Fill in the holes for each of the remaining slots, we should get a single update
// for each // for each
for slot_index in num_slots / 2..num_slots { for missing_blob in &missing_blobs[(num_slots / 2) as usize..] {
let entries = make_tiny_test_entries(1); ledger
let mut blob = entries[0].to_blob(); .insert_data_blobs(vec![missing_blob.clone()])
blob.set_index(slot_index as u64 - 1); .unwrap();
blob.set_slot(slot_index as u64);
ledger.insert_data_blobs(vec![blob]).unwrap();
assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err());
} }
// Destroying database without closing it first is undefined behavior // Destroying database without closing it first is undefined behavior
@ -1963,50 +1926,47 @@ mod tests {
pub fn test_handle_chaining_basic() { pub fn test_handle_chaining_basic() {
let blocktree_path = get_tmp_ledger_path("test_handle_chaining_basic"); let blocktree_path = get_tmp_ledger_path("test_handle_chaining_basic");
{ {
let ticks_per_slot = 2; let entries_per_slot = 2;
let config = BlocktreeConfig::new(ticks_per_slot); let num_slots = 3;
let blocktree = Blocktree::open_config(&blocktree_path, &config).unwrap(); let blocktree = Blocktree::open(&blocktree_path).unwrap();
let entries = create_ticks(6, Hash::default()); // Construct the blobs
let mut blobs = entries.to_blobs(); let (blobs, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
for (i, ref mut b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64 % ticks_per_slot);
b.set_slot(i as u64 / ticks_per_slot);
}
// 1) Write to the first slot // 1) Write to the first slot
blocktree blocktree
.write_blobs(&blobs[ticks_per_slot as usize..2 * ticks_per_slot as usize]) .write_blobs(&blobs[entries_per_slot as usize..2 * entries_per_slot as usize])
.unwrap(); .unwrap();
let s1 = blocktree.meta_cf.get_slot_meta(1).unwrap().unwrap(); let s1 = blocktree.meta_cf.get_slot_meta(1).unwrap().unwrap();
assert!(s1.next_slots.is_empty()); assert!(s1.next_slots.is_empty());
// Slot 1 is not trunk because slot 0 hasn't been inserted yet // Slot 1 is not trunk because slot 0 hasn't been inserted yet
assert!(!s1.is_trunk); assert!(!s1.is_trunk);
assert_eq!(s1.num_blocks, 1); assert_eq!(s1.parent_slot, 0);
assert_eq!(s1.consumed_ticks, ticks_per_slot); assert_eq!(s1.last_index, entries_per_slot - 1);
// 2) Write to the second slot // 2) Write to the second slot
blocktree blocktree
.write_blobs(&blobs[2 * ticks_per_slot as usize..3 * ticks_per_slot as usize]) .write_blobs(&blobs[2 * entries_per_slot as usize..3 * entries_per_slot as usize])
.unwrap(); .unwrap();
let s2 = blocktree.meta_cf.get_slot_meta(2).unwrap().unwrap(); let s2 = blocktree.meta_cf.get_slot_meta(2).unwrap().unwrap();
assert!(s2.next_slots.is_empty()); assert!(s2.next_slots.is_empty());
// Slot 2 is not trunk because slot 0 hasn't been inserted yet // Slot 2 is not trunk because slot 0 hasn't been inserted yet
assert!(!s2.is_trunk); assert!(!s2.is_trunk);
assert_eq!(s2.num_blocks, 1); assert_eq!(s2.parent_slot, 1);
assert_eq!(s2.consumed_ticks, ticks_per_slot); assert_eq!(s2.last_index, entries_per_slot - 1);
// Check the first slot again, it should chain to the second slot, // Check the first slot again, it should chain to the second slot,
// but still isn't part of the trunk // but still isn't part of the trunk
let s1 = blocktree.meta_cf.get_slot_meta(1).unwrap().unwrap(); let s1 = blocktree.meta_cf.get_slot_meta(1).unwrap().unwrap();
assert_eq!(s1.next_slots, vec![2]); assert_eq!(s1.next_slots, vec![2]);
assert!(!s1.is_trunk); assert!(!s1.is_trunk);
assert_eq!(s1.consumed_ticks, ticks_per_slot); assert_eq!(s1.parent_slot, 0);
assert_eq!(s1.last_index, entries_per_slot - 1);
// 3) Write to the zeroth slot, check that every slot // 3) Write to the zeroth slot, check that every slot
// is now part of the trunk // is now part of the trunk
blocktree blocktree
.write_blobs(&blobs[0..ticks_per_slot as usize]) .write_blobs(&blobs[0..entries_per_slot as usize])
.unwrap(); .unwrap();
for i in 0..3 { for i in 0..3 {
let s = blocktree.meta_cf.get_slot_meta(i).unwrap().unwrap(); let s = blocktree.meta_cf.get_slot_meta(i).unwrap().unwrap();
@ -2014,8 +1974,12 @@ mod tests {
if i != 2 { if i != 2 {
assert_eq!(s.next_slots, vec![i + 1]); assert_eq!(s.next_slots, vec![i + 1]);
} }
assert_eq!(s.num_blocks, 1); if i == 0 {
assert_eq!(s.consumed_ticks, ticks_per_slot); assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, entries_per_slot - 1);
assert!(s.is_trunk); assert!(s.is_trunk);
} }
} }
@ -2026,35 +1990,47 @@ mod tests {
pub fn test_handle_chaining_missing_slots() { pub fn test_handle_chaining_missing_slots() {
let blocktree_path = get_tmp_ledger_path("test_handle_chaining_missing_slots"); let blocktree_path = get_tmp_ledger_path("test_handle_chaining_missing_slots");
{ {
let mut blocktree = Blocktree::open(&blocktree_path).unwrap(); let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 30; let num_slots = 30;
let ticks_per_slot = 2; let entries_per_slot = 2;
blocktree.ticks_per_slot = ticks_per_slot as u64;
let ticks = create_ticks((num_slots / 2) * ticks_per_slot, Hash::default()); // Separate every other slot into two separate vectors
let mut blobs = ticks.to_blobs(); let mut slots = vec![];
let mut missing_slots = vec![];
for slot_height in 0..num_slots {
let parent_slot = {
if slot_height == 0 {
0
} else {
slot_height - 1
}
};
let (slot_blobs, _) = make_slot_entries(slot_height, parent_slot, entries_per_slot);
// Leave a gap for every other slot if slot_height % 2 == 1 {
for (i, ref mut b) in blobs.iter_mut().enumerate() { slots.extend(slot_blobs);
b.set_index(i as u64 % ticks_per_slot); } else {
b.set_slot(((i / 2) * 2 + 1) as u64); missing_slots.extend(slot_blobs);
}
} }
blocktree.write_blobs(&blobs[..]).unwrap(); // Write the blobs for every other slot
blocktree.write_blobs(&slots).unwrap();
// Check metadata // Check metadata
for i in 0..num_slots { for i in 0..num_slots {
// If it's a slot we just inserted, then next_slots should be empty // If "i" is the index of a slot we just inserted, then next_slots should be empty
// because no slots chain to it yet because we left a gap. However, if it's // for slot "i" because no slots chain to that slot, because slot i + 1 is missing.
// a slot we haven't inserted, aka one of the gaps, then one of the slots // However, if it's a slot we haven't inserted, aka one of the gaps, then one of the slots
// we just inserted will chain to that gap // we just inserted will chain to that gap, so next_slots for that placeholder
// slot won't be empty, but the parent slot is unknown so should equal std::u64::MAX.
let s = blocktree.meta_cf.get_slot_meta(i as u64).unwrap().unwrap(); let s = blocktree.meta_cf.get_slot_meta(i as u64).unwrap().unwrap();
if i % 2 == 0 { if i % 2 == 0 {
assert_eq!(s.next_slots, vec![i as u64 + 1]); assert_eq!(s.next_slots, vec![i as u64 + 1]);
assert_eq!(s.consumed_ticks, 0); assert_eq!(s.parent_slot, std::u64::MAX);
} else { } else {
assert!(s.next_slots.is_empty()); assert!(s.next_slots.is_empty());
assert_eq!(s.consumed_ticks, ticks_per_slot as u64); assert_eq!(s.parent_slot, i - 1);
} }
if i == 0 { if i == 0 {
@ -2064,13 +2040,8 @@ mod tests {
} }
} }
// Fill in the gaps // Write the blobs for the other half of the slots that we didn't insert earlier
for (i, ref mut b) in blobs.iter_mut().enumerate() { blocktree.write_blobs(&missing_slots[..]).unwrap();
b.set_index(i as u64 % ticks_per_slot);
b.set_slot(((i / 2) * 2) as u64);
}
blocktree.write_blobs(&blobs[..]).unwrap();
for i in 0..num_slots { for i in 0..num_slots {
// Check that all the slots chain correctly once the missing slots // Check that all the slots chain correctly once the missing slots
@ -2081,7 +2052,13 @@ mod tests {
} else { } else {
assert!(s.next_slots.is_empty()); assert!(s.next_slots.is_empty());
} }
assert_eq!(s.consumed_ticks, ticks_per_slot);
if i == 0 {
assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, entries_per_slot - 1);
assert!(s.is_trunk); assert!(s.is_trunk);
} }
} }
@ -2093,27 +2070,22 @@ mod tests {
pub fn test_forward_chaining_is_trunk() { pub fn test_forward_chaining_is_trunk() {
let blocktree_path = get_tmp_ledger_path("test_forward_chaining_is_trunk"); let blocktree_path = get_tmp_ledger_path("test_forward_chaining_is_trunk");
{ {
let mut blocktree = Blocktree::open(&blocktree_path).unwrap(); let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 15; let num_slots = 15;
let ticks_per_slot = 2; let entries_per_slot = 2;
blocktree.ticks_per_slot = ticks_per_slot as u64; assert!(entries_per_slot > 1);
let entries = create_ticks(num_slots * ticks_per_slot, Hash::default()); let (blobs, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
let mut blobs = entries.to_blobs();
for (i, ref mut b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64 % ticks_per_slot);
b.set_slot(i as u64 / ticks_per_slot);
}
// Write the blobs such that every 3rd block has a gap in the beginning // Write the blobs such that every 3rd slot has a gap in the beginning
for (slot_index, slot_ticks) in blobs.chunks(ticks_per_slot as usize).enumerate() { for (slot_height, slot_ticks) in blobs.chunks(entries_per_slot as usize).enumerate() {
if slot_index % 3 == 0 { if slot_height % 3 == 0 {
blocktree blocktree
.write_blobs(&slot_ticks[1..ticks_per_slot as usize]) .write_blobs(&slot_ticks[1..entries_per_slot as usize])
.unwrap(); .unwrap();
} else { } else {
blocktree blocktree
.write_blobs(&slot_ticks[..ticks_per_slot as usize]) .write_blobs(&slot_ticks[..entries_per_slot as usize])
.unwrap(); .unwrap();
} }
} }
@ -2127,13 +2099,15 @@ mod tests {
} else { } else {
assert!(s.next_slots.is_empty()); assert!(s.next_slots.is_empty());
} }
assert_eq!(s.num_blocks, 1);
if i % 3 == 0 { if i == 0 {
assert_eq!(s.consumed_ticks, 0); assert_eq!(s.parent_slot, 0);
} else { } else {
assert_eq!(s.consumed_ticks, ticks_per_slot as u64); assert_eq!(s.parent_slot, i - 1);
} }
assert_eq!(s.last_index, entries_per_slot - 1);
// Other than slot 0, no slots should be part of the trunk // Other than slot 0, no slots should be part of the trunk
if i != 0 { if i != 0 {
assert!(!s.is_trunk); assert!(!s.is_trunk);
@ -2144,7 +2118,7 @@ mod tests {
// Iteratively finish every 3rd slot, and check that all slots up to and including // Iteratively finish every 3rd slot, and check that all slots up to and including
// slot_index + 3 become part of the trunk // slot_index + 3 become part of the trunk
for (slot_index, slot_ticks) in blobs.chunks(ticks_per_slot as usize).enumerate() { for (slot_index, slot_ticks) in blobs.chunks(entries_per_slot as usize).enumerate() {
if slot_index % 3 == 0 { if slot_index % 3 == 0 {
blocktree.write_blobs(&slot_ticks[0..1]).unwrap(); blocktree.write_blobs(&slot_ticks[0..1]).unwrap();
@ -2160,6 +2134,14 @@ mod tests {
} else { } else {
assert!(!s.is_trunk); assert!(!s.is_trunk);
} }
if i == 0 {
assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, entries_per_slot - 1);
} }
} }
} }
@ -2206,43 +2188,90 @@ mod tests {
{ {
let blocktree = Blocktree::open(&blocktree_path).unwrap(); let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Write entries // Create blobs and entries
let num_entries = 20 as u64; let num_entries = 20 as u64;
let original_entries = make_tiny_test_entries(num_entries as usize); let mut entries = vec![];
let shared_blobs = original_entries.clone().to_shared_blobs(); let mut blobs = vec![];
for (i, b) in shared_blobs.iter().enumerate() { for slot_height in 0..num_entries {
let mut w_b = b.write().unwrap(); let parent_slot = {
w_b.set_index(i as u64); if slot_height == 0 {
w_b.set_slot(i as u64); 0
} else {
slot_height - 1
}
};
let (mut blob, entry) = make_slot_entries(slot_height, parent_slot, 1);
blob[0].set_index(slot_height);
blobs.extend(blob);
entries.extend(entry);
} }
// Write blobs to the database
if should_bulk_write { if should_bulk_write {
blocktree.write_shared_blobs(shared_blobs.iter()).unwrap(); blocktree.write_blobs(blobs.iter()).unwrap();
} else { } else {
for i in 0..num_entries { for i in 0..num_entries {
let i = i as usize; let i = i as usize;
blocktree blocktree.write_blobs(&blobs[i..i + 1]).unwrap();
.write_shared_blobs(&shared_blobs[i..i + 1])
.unwrap();
} }
} }
for i in 0..num_entries - 1 { for i in 0..num_entries - 1 {
assert_eq!( assert_eq!(
blocktree.get_slot_entries(i, i, None).unwrap()[0], blocktree.get_slot_entries(i, i, None).unwrap()[0],
original_entries[i as usize] entries[i as usize]
); );
let meta_key = MetaCf::key(i); let meta_key = MetaCf::key(i);
let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap();
assert_eq!(meta.received, i + 1); assert_eq!(meta.received, i + 1);
assert_eq!(meta.last_index, i);
if i != 0 { if i != 0 {
assert_eq!(meta.parent_slot, i - 1);
assert!(meta.consumed == 0); assert!(meta.consumed == 0);
} else { } else {
assert_eq!(meta.parent_slot, 0);
assert!(meta.consumed == 1); assert!(meta.consumed == 1);
} }
} }
} }
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
} }
fn make_slot_entries(
slot_height: u64,
parent_slot: u64,
num_entries: u64,
) -> (Vec<Blob>, Vec<Entry>) {
let entries = make_tiny_test_entries(num_entries as usize);
let mut blobs = entries.clone().to_blobs();
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64);
b.set_slot(slot_height);
b.set_parent(parent_slot);
}
blobs.last_mut().unwrap().set_is_last_in_slot();
(blobs, entries)
}
fn make_many_slot_entries(
start_slot_height: u64,
num_slots: u64,
entries_per_slot: u64,
) -> (Vec<Blob>, Vec<Entry>) {
let mut blobs = vec![];
let mut entries = vec![];
for slot_height in start_slot_height..start_slot_height + num_slots {
let parent_slot = if slot_height == 0 { 0 } else { slot_height - 1 };
let (slot_blobs, slot_entries) =
make_slot_entries(slot_height, parent_slot, entries_per_slot);
blobs.extend(slot_blobs);
entries.extend(slot_entries);
}
(blobs, entries)
}
} }

View File

@ -118,10 +118,10 @@ impl RepairService {
slot: &SlotMeta, slot: &SlotMeta,
max_repairs: usize, max_repairs: usize,
) -> Result<Vec<(u64, u64)>> { ) -> Result<Vec<(u64, u64)>> {
if slot.contains_all_ticks() { if slot.is_full() {
Ok(vec![]) Ok(vec![])
} else { } else {
let num_unreceived_ticks = { /*let num_unreceived_ticks = {
if slot.consumed == slot.received { if slot.consumed == slot.received {
let num_expected_ticks = slot.num_expected_ticks(blocktree); let num_expected_ticks = slot.num_expected_ticks(blocktree);
if num_expected_ticks == 0 { if num_expected_ticks == 0 {
@ -141,9 +141,9 @@ impl RepairService {
} else { } else {
0 0
} }
}; };*/
let upper = slot.received + num_unreceived_ticks; let upper = slot.received;
let reqs = let reqs =
blocktree.find_missing_data_indexes(slot_height, slot.consumed, upper, max_repairs); blocktree.find_missing_data_indexes(slot_height, slot.consumed, upper, max_repairs);