From 7148c141783e5a368070bab1f92a7c25f2abc9e8 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 20 Dec 2018 12:12:04 -0800 Subject: [PATCH] Debug broadcast (#2233) * Account for duplicate blobs in process_blobs * Increase max bytes for level base to match write buffer --- src/db_ledger.rs | 110 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 93 insertions(+), 17 deletions(-) diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 9455c50bfa..53001657dd 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -21,7 +21,8 @@ pub use rocksdb::DBRawIterator; pub const DB_LEDGER_DIRECTORY: &str = "rocksdb"; // A good value for this is the number of cores on the machine -pub const TOTAL_THREADS: i32 = 8; +const TOTAL_THREADS: i32 = 8; +const MAX_WRITE_BUFFER_SIZE: usize = 512 * 1024 * 1024; #[derive(Debug)] pub enum DbLedgerError { @@ -409,21 +410,33 @@ impl DbLedger { let mut current_slot = lowest_slot; 'outer: loop { let entry: Entry = { - let (next_new_blob, new_blob_index) = { - if index_into_blob < new_blobs.len() { - let blob = new_blobs[index_into_blob].borrow(); - (Some(blob), Some(blob.index()?)) - } else { - (None, None) - } - }; + // Try to find the next blob we're looking for in the new_blobs + // vector + let mut found_blob = None; + while index_into_blob < new_blobs.len() { + let new_blob = new_blobs[index_into_blob].borrow(); + let index = new_blob.index()?; + + // Skip over duplicate blobs with the same index and continue + // until we either find the index we're looking for, or detect + // that the index doesn't exist in the new_blobs vector. + if index > current_index { + break; + } - if new_blob_index == Some(current_index) { index_into_blob += 1; - let next_new_blob = next_new_blob.unwrap(); - current_slot = next_new_blob.slot()?; - let serialized_entry_data = &next_new_blob.data - [BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_new_blob.size()?]; + + if index == current_index { + found_blob = Some(new_blob); + } + } + + // If we found the blob in the new_blobs vector, process it, otherwise, + // look for the blob in the database. + if let Some(next_blob) = found_blob { + current_slot = next_blob.slot()?; + let serialized_entry_data = &next_blob.data + [BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_blob.size()?]; // Verify entries can actually be reconstructed deserialize(serialized_entry_data).expect( "Blob made it past validation, so must be deserializable at this point", @@ -437,7 +450,6 @@ impl DbLedger { let key = DataCf::key(current_slot + 1, current_index); if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { current_slot += 1; - meta.consumed_slot = current_slot; blob_data } else { break 'outer; @@ -454,6 +466,7 @@ impl DbLedger { consumed_queue.push(entry); current_index += 1; meta.consumed += 1; + meta.consumed_slot = current_slot; } } @@ -584,7 +597,8 @@ impl DbLedger { fn get_cf_options() -> Options { let mut options = Options::default(); options.set_max_write_buffer_number(32); - options.set_write_buffer_size(512 * 1024 * 1024); + options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); + options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); options } @@ -596,7 +610,8 @@ impl DbLedger { options.set_max_background_flushes(4); options.set_max_background_compactions(4); options.set_max_write_buffer_number(32); - options.set_write_buffer_size(512 * 1024 * 1024); + options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); + options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); options } } @@ -1004,6 +1019,67 @@ mod tests { DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_insert_data_blobs_duplicate() { + // Create RocksDb ledger + let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_duplicate"); + { + let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + + // Write entries + let num_entries = 10 as u64; + let num_duplicates = 2; + let original_entries: Vec = make_tiny_test_entries(num_entries as usize) + .into_iter() + .flat_map(|e| vec![e; num_duplicates]) + .collect(); + + let shared_blobs = original_entries.clone().to_blobs(); + for (i, b) in shared_blobs.iter().enumerate() { + let index = (i / 2) as u64; + let mut w_b = b.write().unwrap(); + w_b.set_index(index).unwrap(); + w_b.set_slot(index).unwrap(); + } + + assert_eq!( + db_ledger + .write_shared_blobs( + shared_blobs + .iter() + .skip(num_duplicates) + .step_by(num_duplicates * 2) + ) + .unwrap(), + vec![] + ); + + let expected: Vec<_> = original_entries + .into_iter() + .step_by(num_duplicates) + .collect(); + + assert_eq!( + db_ledger + .write_shared_blobs(shared_blobs.iter().step_by(num_duplicates * 2)) + .unwrap(), + expected, + ); + + let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); + let meta = db_ledger + .meta_cf + .get(&db_ledger.db, &meta_key) + .unwrap() + .unwrap(); + assert_eq!(meta.consumed, num_entries); + assert_eq!(meta.received, num_entries); + assert_eq!(meta.consumed_slot, num_entries - 1); + assert_eq!(meta.received_slot, num_entries - 1); + } + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } + #[test] pub fn test_write_consecutive_blobs() { // Create RocksDb ledger