diff --git a/benches/db_ledger.rs b/benches/db_ledger.rs index f265277d70..07aa46606b 100644 --- a/benches/db_ledger.rs +++ b/benches/db_ledger.rs @@ -154,18 +154,17 @@ fn bench_insert_data_blob_small(bench: &mut Bencher) { DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger"); let num_entries = 32 * 1024; let entries = make_tiny_test_entries(num_entries); - let shared_blobs = entries.to_blobs(); - let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect(); - let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect(); - blobs.shuffle(&mut thread_rng()); - let slot = 0; + let mut shared_blobs = entries.to_blobs(); + shared_blobs.shuffle(&mut thread_rng()); bench.iter(move || { - for blob in blobs.iter_mut() { - let index = blob.index().unwrap(); - let key = DataCf::key(slot, index); - db_ledger.insert_data_blob(&key, blob).unwrap(); - blob.set_index(index + num_entries as u64).unwrap(); + for blob in shared_blobs.iter_mut() { + let index = blob.read().unwrap().index().unwrap(); + db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap(); + blob.write() + .unwrap() + .set_index(index + num_entries as u64) + .unwrap(); } }); @@ -181,18 +180,17 @@ fn bench_insert_data_blob_big(bench: &mut Bencher) { DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger"); let num_entries = 32 * 1024; let entries = make_large_test_entries(num_entries); - let shared_blobs = entries.to_blobs(); - let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect(); - let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect(); - blobs.shuffle(&mut thread_rng()); - let slot = 0; + let mut shared_blobs = entries.to_blobs(); + shared_blobs.shuffle(&mut thread_rng()); bench.iter(move || { - for blob in blobs.iter_mut() { - let index = blob.index().unwrap(); - let key = DataCf::key(slot, index); - db_ledger.insert_data_blob(&key, blob).unwrap(); - blob.set_index(index + num_entries as u64).unwrap(); + for blob in shared_blobs.iter_mut() { + let index = blob.read().unwrap().index().unwrap(); + db_ledger.write_shared_blobs(vec![blob.clone()]).unwrap(); + blob.write() + .unwrap() + .set_index(index + num_entries as u64) + .unwrap(); } }); diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index 6b3020bab3..1fe27e0984 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -100,7 +100,8 @@ fn broadcast( { let mut win = window.write().unwrap(); assert!(blobs.len() <= win.len()); - for (b, _) in &blobs { + let blobs: Vec<_> = blobs.into_iter().map(|(b, _)| b).collect(); + for b in &blobs { let ix = b.read().unwrap().index().expect("blob index"); let pos = (ix % window_size) as usize; if let Some(x) = win[pos].data.take() { @@ -122,7 +123,7 @@ fn broadcast( trace!("{} null {}", id, pos); } - for (b, _) in &blobs { + for b in &blobs { { let ix = b.read().unwrap().index().expect("blob index"); let pos = (ix % window_size) as usize; @@ -130,8 +131,11 @@ fn broadcast( assert!(win[pos].data.is_none()); win[pos].data = Some(b.clone()); } - db_ledger.write_shared_blobs(vec![b])?; } + + db_ledger + .write_consecutive_blobs(&blobs) + .expect("Unrecoverable failure to write to database"); } // Fill in the coding blob data from the window data blobs diff --git a/src/db_ledger.rs b/src/db_ledger.rs index bfd267e222..7a44637a0c 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -7,15 +7,18 @@ use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; use bincode::{deserialize, serialize}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use rocksdb::{ColumnFamily, DBRawIterator, Options, WriteBatch, DB}; +use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, Options, WriteBatch, DB}; use serde::de::DeserializeOwned; use serde::Serialize; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::borrow::Borrow; +use std::cmp::max; use std::io; use std::path::Path; pub const DB_LEDGER_DIRECTORY: &str = "rocksdb"; +// A good value for this is the number of cores on the machine +pub const TOTAL_THREADS: i32 = 8; #[derive(Debug, PartialEq, Eq)] pub enum DbLedgerError { @@ -252,15 +255,20 @@ impl DbLedger { let ledger_path = Path::new(ledger_path).join(DB_LEDGER_DIRECTORY); // Use default database options - let mut options = Options::default(); - options.create_if_missing(true); - options.create_missing_column_families(true); + let db_options = Self::get_db_options(); // Column family names - let cfs = vec![META_CF, DATA_CF, ERASURE_CF]; + let meta_cf_descriptor = ColumnFamilyDescriptor::new(META_CF, Self::get_cf_options()); + let data_cf_descriptor = ColumnFamilyDescriptor::new(DATA_CF, Self::get_cf_options()); + let erasure_cf_descriptor = ColumnFamilyDescriptor::new(ERASURE_CF, Self::get_cf_options()); + let cfs = vec![ + meta_cf_descriptor, + data_cf_descriptor, + erasure_cf_descriptor, + ]; // Open the database - let db = DB::open_cf(&options, ledger_path, &cfs)?; + let db = DB::open_cf_descriptors(&db_options, ledger_path, cfs)?; // Create the metadata column family let meta_cf = MetaCf::default(); @@ -290,30 +298,26 @@ impl DbLedger { I: IntoIterator, I::Item: Borrow, { - let mut entries = vec![]; - for b in shared_blobs { - let bl = b.borrow().read().unwrap(); - let index = bl.index()?; - let slot = bl.slot()?; - let key = DataCf::key(slot, index); - let new_entries = self.insert_data_blob(&key, &*bl)?; - entries.extend(new_entries); - } - Ok(entries) + let c_blobs: Vec<_> = shared_blobs + .into_iter() + .map(move |s| s.borrow().clone()) + .collect(); + + let r_blobs: Vec<_> = c_blobs.iter().map(move |b| b.read().unwrap()).collect(); + + let blobs = r_blobs.iter().map(|s| &**s); + + let new_entries = self.insert_data_blobs(blobs)?; + Ok(new_entries) } pub fn write_blobs<'a, I>(&self, blobs: I) -> Result> where I: IntoIterator, { - let mut entries = vec![]; - for blob in blobs.into_iter() { - let index = blob.index()?; - let key = DataCf::key(blob.slot()?, index); - let new_entries = self.insert_data_blob(&key, blob)?; - entries.extend(new_entries); - } - Ok(entries) + let blobs = blobs.into_iter().cloned(); + let new_entries = self.insert_data_blobs(blobs)?; + Ok(new_entries) } pub fn write_entries(&self, slot: u64, entries: I) -> Result> @@ -334,8 +338,24 @@ impl DbLedger { self.write_shared_blobs(shared_blobs) } - pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result> { - let slot_height = DataCf::slot_height_from_key(key)?; + pub fn insert_data_blobs(&self, new_blobs: I) -> Result> + where + I: IntoIterator, + I::Item: Borrow, + { + let mut new_blobs: Vec<_> = new_blobs.into_iter().collect(); + + if new_blobs.is_empty() { + return Ok(vec![]); + } + + new_blobs.sort_unstable_by(|b1, b2| { + b1.borrow() + .index() + .unwrap() + .cmp(&b2.borrow().index().unwrap()) + }); + let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); let mut should_write_meta = false; @@ -349,63 +369,80 @@ impl DbLedger { } }; - let mut index = DataCf::index_from_key(key)?; - // TODO: Handle if leader sends different blob for same index when the index > consumed // The old window implementation would just replace that index. - if index < meta.consumed { + let lowest_index = new_blobs[0].borrow().index()?; + let lowest_slot = new_blobs[0].borrow().slot()?; + let highest_index = new_blobs.last().unwrap().borrow().index()?; + let highest_slot = new_blobs.last().unwrap().borrow().slot()?; + if lowest_index < meta.consumed { return Err(Error::DbLedgerError(DbLedgerError::BlobForIndexExists)); } // Index is zero-indexed, while the "received" height starts from 1, // so received = index + 1 for the same blob. - if index >= meta.received { - meta.received = index + 1; - meta.received_slot = slot_height; + if highest_index >= meta.received { + meta.received = highest_index + 1; + meta.received_slot = highest_slot; should_write_meta = true; } let mut consumed_queue = vec![]; - if meta.consumed == index { - // Add the new blob to the consumed queue - let serialized_entry_data = - &new_blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + new_blob.size()?]; - // Verify entries can actually be reconstructed - let entry: Entry = deserialize(serialized_entry_data) - .expect("Blob made it past validation, so must be deserializable at this point"); - should_write_meta = true; - meta.consumed += 1; - consumed_queue.push(entry); + if meta.consumed == lowest_index { // Find the next consecutive block of blobs. // TODO: account for consecutive blocks that // span multiple slots - - let mut current_slot = slot_height; - loop { - index += 1; - let key = DataCf::key(current_slot, index); - let blob_data = { - if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { - blob_data - } else if meta.consumed < meta.received { - let key = DataCf::key(current_slot + 1, index); - if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { - current_slot += 1; - meta.consumed_slot = current_slot; - blob_data + should_write_meta = true; + let mut index_into_blob = 0; + let mut current_index = lowest_index; + let mut current_slot = lowest_slot; + 'outer: loop { + let entry: Entry = { + let (next_new_blob, new_blob_index) = { + if index_into_blob < new_blobs.len() { + let blob = new_blobs[index_into_blob].borrow(); + (Some(blob), Some(blob.index()?)) } else { - break; + (None, None) } + }; + + if new_blob_index == Some(current_index) { + index_into_blob += 1; + let next_new_blob = next_new_blob.unwrap(); + current_slot = next_new_blob.slot()?; + let serialized_entry_data = &next_new_blob.data + [BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_new_blob.size()?]; + // Verify entries can actually be reconstructed + deserialize(serialized_entry_data).expect( + "Blob made it past validation, so must be deserializable at this point", + ) } else { - break; + let key = DataCf::key(current_slot, current_index); + let blob_data = { + if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { + blob_data + } else if meta.consumed < meta.received { + let key = DataCf::key(current_slot + 1, current_index); + if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { + current_slot += 1; + meta.consumed_slot = current_slot; + blob_data + } else { + break 'outer; + } + } else { + break 'outer; + } + }; + deserialize(&blob_data[BLOB_HEADER_SIZE..]) + .expect("Blobs in database must be deserializable") } }; - let serialized_entry_data = &blob_data[BLOB_HEADER_SIZE..]; - let entry: Entry = deserialize(serialized_entry_data) - .expect("Ledger should only contain well formed data"); consumed_queue.push(entry); + current_index += 1; meta.consumed += 1; } } @@ -416,12 +453,53 @@ impl DbLedger { batch.put_cf(self.meta_cf.handle(&self.db), &meta_key, &serialize(&meta)?)?; } - let serialized_blob_data = &new_blob.data[..BLOB_HEADER_SIZE + new_blob.size()?]; - batch.put_cf(self.data_cf.handle(&self.db), key, serialized_blob_data)?; + for blob in new_blobs { + let blob = blob.borrow(); + let key = DataCf::key(blob.slot()?, blob.index()?); + let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?]; + batch.put_cf(self.data_cf.handle(&self.db), &key, serialized_blob_datas)?; + } + self.db.write(batch)?; Ok(consumed_queue) } + // Writes a list of sorted, consecutive broadcast blobs to the db_ledger + pub fn write_consecutive_blobs(&self, blobs: &[SharedBlob]) -> Result<()> { + assert!(!blobs.is_empty()); + + let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); + + let mut meta = { + if let Some(meta) = self.meta_cf.get(&self.db, &meta_key)? { + let first = blobs[0].read().unwrap(); + assert_eq!(meta.consumed, first.index()?); + meta + } else { + SlotMeta::new() + } + }; + + { + let last = blobs.last().unwrap().read().unwrap(); + meta.consumed = last.index()? + 1; + meta.consumed_slot = last.slot()?; + meta.received = max(meta.received, last.index()? + 1); + meta.received_slot = max(meta.received_slot, last.index()?); + } + + let mut batch = WriteBatch::default(); + batch.put_cf(self.meta_cf.handle(&self.db), &meta_key, &serialize(&meta)?)?; + for blob in blobs { + let blob = blob.read().unwrap(); + let key = DataCf::key(blob.slot()?, blob.index()?); + let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?]; + batch.put_cf(self.data_cf.handle(&self.db), &key, serialized_blob_datas)?; + } + self.db.write(batch)?; + Ok(()) + } + // Fill 'buf' with num_blobs or most number of consecutive // whole blobs that fit into buf.len() // @@ -492,6 +570,25 @@ impl DbLedger { db_iterator.seek_to_first(); Ok(EntryIterator { db_iterator }) } + + fn get_cf_options() -> Options { + let mut options = Options::default(); + options.set_max_write_buffer_number(32); + options.set_write_buffer_size(512 * 1024 * 1024); + options + } + + fn get_db_options() -> Options { + let mut options = Options::default(); + options.create_if_missing(true); + options.create_missing_column_families(true); + options.increase_parallelism(TOTAL_THREADS); + options.set_max_background_flushes(4); + options.set_max_background_compactions(4); + options.set_max_write_buffer_number(32); + options.set_write_buffer_size(512 * 1024 * 1024); + options + } } struct EntryIterator { @@ -690,6 +787,11 @@ mod tests { fn test_insert_data_blobs_basic() { let entries = make_tiny_test_entries(2); let shared_blobs = entries.to_blobs(); + + for (i, b) in shared_blobs.iter().enumerate() { + b.write().unwrap().set_index(i as u64).unwrap(); + } + let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -697,9 +799,7 @@ mod tests { let ledger = DbLedger::open(&ledger_path).unwrap(); // Insert second blob, we're missing the first blob, so should return nothing - let result = ledger - .insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, 1), blobs[1]) - .unwrap(); + let result = ledger.insert_data_blobs(vec![blobs[1]]).unwrap(); assert!(result.len() == 0); let meta = ledger @@ -710,9 +810,7 @@ mod tests { assert!(meta.consumed == 0 && meta.received == 2); // Insert first blob, check for consecutive returned entries - let result = ledger - .insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, 0), blobs[0]) - .unwrap(); + let result = ledger.insert_data_blobs(vec![blobs[0]]).unwrap(); assert_eq!(result, entries); @@ -733,6 +831,9 @@ mod tests { let num_blobs = 10; let entries = make_tiny_test_entries(num_blobs); let shared_blobs = entries.to_blobs(); + for (i, b) in shared_blobs.iter().enumerate() { + b.write().unwrap().set_index(i as u64).unwrap(); + } let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -741,9 +842,7 @@ mod tests { // Insert blobs in reverse, check for consecutive returned blobs for i in (0..num_blobs).rev() { - let result = ledger - .insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, i as u64), blobs[i]) - .unwrap(); + let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap(); let meta = ledger .meta_cf @@ -769,6 +868,9 @@ mod tests { let num_blobs = 10; let entries = make_tiny_test_entries(num_blobs); let shared_blobs = entries.to_blobs(); + for (i, b) in shared_blobs.iter().enumerate() { + b.write().unwrap().set_index(i as u64).unwrap(); + } let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); @@ -777,18 +879,13 @@ mod tests { // Insert last blob into next slot let result = ledger - .insert_data_blob( - &DataCf::key(DEFAULT_SLOT_HEIGHT + 1, (num_blobs - 1) as u64), - blobs.last().unwrap(), - ) + .insert_data_blobs(vec![*blobs.last().unwrap()]) .unwrap(); assert_eq!(result.len(), 0); // Insert blobs into first slot, check for consecutive blobs for i in (0..num_blobs - 1).rev() { - let result = ledger - .insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, i as u64), blobs[i]) - .unwrap(); + let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap(); let meta = ledger .meta_cf .get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) @@ -826,9 +923,12 @@ mod tests { w_b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap(); } - db_ledger - .write_shared_blobs(&shared_blobs) - .expect("Expected successful write of blobs"); + assert_eq!( + db_ledger + .write_shared_blobs(&shared_blobs) + .expect("Expected successful write of blobs"), + vec![] + ); let mut db_iterator = db_ledger .db .raw_iterator_cf(db_ledger.data_cf.handle(&db_ledger.db)) @@ -849,6 +949,106 @@ mod tests { DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_insert_data_blobs_bulk() { + // Create RocksDb ledger + let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_bulk"); + { + let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + + // Write entries + let num_entries = 20 as u64; + let original_entries = make_tiny_test_entries(num_entries as usize); + let shared_blobs = original_entries.clone().to_blobs(); + for (i, b) in shared_blobs.iter().enumerate() { + let mut w_b = b.write().unwrap(); + w_b.set_index(i as u64).unwrap(); + w_b.set_slot(i as u64).unwrap(); + } + + assert_eq!( + db_ledger + .write_shared_blobs(shared_blobs.iter().skip(1).step_by(2)) + .unwrap(), + vec![] + ); + + assert_eq!( + db_ledger + .write_shared_blobs(shared_blobs.iter().step_by(2)) + .unwrap(), + original_entries + ); + + let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); + let meta = db_ledger + .meta_cf + .get(&db_ledger.db, &meta_key) + .unwrap() + .unwrap(); + assert_eq!(meta.consumed, num_entries); + assert_eq!(meta.received, num_entries); + assert_eq!(meta.consumed_slot, num_entries - 1); + assert_eq!(meta.received_slot, num_entries - 1); + } + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } + + #[test] + pub fn test_write_consecutive_blobs() { + // Create RocksDb ledger + let db_ledger_path = get_tmp_ledger_path("test_write_consecutive_blobs"); + { + let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + + // Write entries + let num_entries = 20 as u64; + let original_entries = make_tiny_test_entries(num_entries as usize); + let shared_blobs = original_entries.to_blobs(); + for (i, b) in shared_blobs.iter().enumerate() { + let mut w_b = b.write().unwrap(); + w_b.set_index(i as u64).unwrap(); + w_b.set_slot(i as u64).unwrap(); + } + + db_ledger + .write_consecutive_blobs(&shared_blobs) + .expect("Expect successful blob writes"); + + let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); + let meta = db_ledger + .meta_cf + .get(&db_ledger.db, &meta_key) + .unwrap() + .unwrap(); + assert_eq!(meta.consumed, num_entries); + assert_eq!(meta.received, num_entries); + assert_eq!(meta.consumed_slot, num_entries - 1); + assert_eq!(meta.received_slot, num_entries - 1); + + for (i, b) in shared_blobs.iter().enumerate() { + let mut w_b = b.write().unwrap(); + w_b.set_index(num_entries + i as u64).unwrap(); + w_b.set_slot(num_entries + i as u64).unwrap(); + } + + db_ledger + .write_consecutive_blobs(&shared_blobs) + .expect("Expect successful blob writes"); + + let meta = db_ledger + .meta_cf + .get(&db_ledger.db, &meta_key) + .unwrap() + .unwrap(); + assert_eq!(meta.consumed, 2 * num_entries); + assert_eq!(meta.received, 2 * num_entries); + assert_eq!(meta.consumed_slot, 2 * num_entries - 1); + assert_eq!(meta.received_slot, 2 * num_entries - 1); + } + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } + #[test] pub fn test_genesis_and_entry_iterator() { // Create RocksDb ledger diff --git a/src/db_window.rs b/src/db_window.rs index 0e4c9e733d..a168eef9d3 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -310,8 +310,7 @@ pub fn process_blob( )?; vec![] } else { - let data_key = DataCf::key(slot, pix); - db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())? + db_ledger.insert_data_blobs(vec![&*blob.read().unwrap()])? }; #[cfg(feature = "erasure")] diff --git a/src/thin_client.rs b/src/thin_client.rs index caf0ab0395..594814cbd4 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -450,6 +450,7 @@ mod tests { let mut bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let ledger_path = create_tmp_ledger_with_mint("thin_client", &alice); + let entry_height = alice.create_entries().len() as u64; let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_data.id, @@ -461,7 +462,7 @@ mod tests { leader_keypair, vote_account_keypair, bank, - 0, + entry_height, &last_id, leader, None,