Refactor blocktree storage abstraction (#3588)
This commit is contained in:
		| @@ -8,7 +8,7 @@ use crate::result::{Error, Result}; | ||||
| #[cfg(feature = "kvstore")] | ||||
| use solana_kvstore as kvstore; | ||||
|  | ||||
| use bincode::{deserialize, serialize}; | ||||
| use bincode::deserialize; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| @@ -36,16 +36,19 @@ macro_rules! db_imports { | ||||
|     { $mod:ident, $db:ident, $db_path:expr } => { | ||||
|         mod $mod; | ||||
|  | ||||
|         pub use db::{ | ||||
|             Cursor, Database, IndexColumn, IWriteBatch, LedgerColumnFamily, | ||||
|             LedgerColumnFamilyRaw, | ||||
|         }; | ||||
|         use $mod::$db; | ||||
|         use db::columns as cf; | ||||
|  | ||||
|         pub use db::columns; | ||||
|  | ||||
|         pub type Database = db::Database<$db>; | ||||
|         pub type Cursor<C>  = db::Cursor<$db, C>; | ||||
|         pub type LedgerColumn<C> = db::LedgerColumn<$db, C>; | ||||
|         pub type WriteBatch = db::WriteBatch<$db>; | ||||
|  | ||||
|         pub trait Column: db::Column<$db> {} | ||||
|         impl<C: db::Column<$db>> Column for C {} | ||||
|  | ||||
|         pub use $mod::{$db, ErasureCf, MetaCf, DataCf, DetachedHeadsCf}; | ||||
|         pub type BlocktreeRawIterator = <$db as Database>::Cursor; | ||||
|         pub type WriteBatch = <$db as Database>::WriteBatch; | ||||
|         pub type OwnedKey = <$db as Database>::OwnedKey; | ||||
|         pub type Key = <$db as Database>::Key; | ||||
|         pub const BLOCKTREE_DIRECTORY: &str = $db_path; | ||||
|     }; | ||||
| } | ||||
| @@ -117,15 +120,11 @@ impl SlotMeta { | ||||
|  | ||||
| // ledger window | ||||
| pub struct Blocktree { | ||||
|     // Underlying database is automatically closed in the Drop implementation of DB | ||||
|     #[cfg(not(feature = "kvstore"))] | ||||
|     db: Arc<Rocks>, | ||||
|     #[cfg(feature = "kvstore")] | ||||
|     db: Arc<Kvs>, | ||||
|     meta_cf: MetaCf, | ||||
|     data_cf: DataCf, | ||||
|     erasure_cf: ErasureCf, | ||||
|     detached_heads_cf: DetachedHeadsCf, | ||||
|     db: Arc<Database>, | ||||
|     meta_cf: LedgerColumn<cf::SlotMeta>, | ||||
|     data_cf: LedgerColumn<cf::Data>, | ||||
|     erasure_cf: LedgerColumn<cf::Coding>, | ||||
|     detached_heads_cf: LedgerColumn<cf::DetachedHeads>, | ||||
|     pub new_blobs_signals: Vec<SyncSender<bool>>, | ||||
| } | ||||
|  | ||||
| @@ -139,6 +138,38 @@ pub const ERASURE_CF: &str = "erasure"; | ||||
| pub const DETACHED_HEADS_CF: &str = "detached_heads"; | ||||
|  | ||||
| impl Blocktree { | ||||
|     /// Opens a Ledger in directory, provides "infinite" window of blobs | ||||
|     pub fn open(ledger_path: &str) -> Result<Blocktree> { | ||||
|         use std::path::Path; | ||||
|  | ||||
|         fs::create_dir_all(&ledger_path)?; | ||||
|         let ledger_path = Path::new(&ledger_path).join(BLOCKTREE_DIRECTORY); | ||||
|  | ||||
|         // Open the database | ||||
|         let db = Arc::new(Database::open(&ledger_path)?); | ||||
|  | ||||
|         // Create the metadata column family | ||||
|         let meta_cf = LedgerColumn::new(&db); | ||||
|  | ||||
|         // Create the data column family | ||||
|         let data_cf = LedgerColumn::new(&db); | ||||
|  | ||||
|         // Create the erasure column family | ||||
|         let erasure_cf = LedgerColumn::new(&db); | ||||
|  | ||||
|         // Create the detached heads column family | ||||
|         let detached_heads_cf = LedgerColumn::new(&db); | ||||
|  | ||||
|         Ok(Blocktree { | ||||
|             db, | ||||
|             meta_cf, | ||||
|             data_cf, | ||||
|             erasure_cf, | ||||
|             detached_heads_cf, | ||||
|             new_blobs_signals: vec![], | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     pub fn open_with_signal(ledger_path: &str) -> Result<(Self, Receiver<bool>)> { | ||||
|         let mut blocktree = Self::open(ledger_path)?; | ||||
|         let (signal_sender, signal_receiver) = sync_channel(1); | ||||
| @@ -147,37 +178,43 @@ impl Blocktree { | ||||
|         Ok((blocktree, signal_receiver)) | ||||
|     } | ||||
|  | ||||
|     pub fn destroy(ledger_path: &str) -> Result<()> { | ||||
|         // Database::destroy() fails is the path doesn't exist | ||||
|         fs::create_dir_all(ledger_path)?; | ||||
|         let path = std::path::Path::new(ledger_path).join(BLOCKTREE_DIRECTORY); | ||||
|         Database::destroy(&path) | ||||
|     } | ||||
|  | ||||
|     pub fn meta(&self, slot: u64) -> Result<Option<SlotMeta>> { | ||||
|         self.meta_cf.get(&MetaCf::key(&slot)) | ||||
|         self.meta_cf.get(slot) | ||||
|     } | ||||
|  | ||||
|     pub fn detached_head(&self, slot: u64) -> Result<Option<bool>> { | ||||
|         self.detached_heads_cf.get(&DetachedHeadsCf::key(&slot)) | ||||
|         self.detached_heads_cf.get(slot) | ||||
|     } | ||||
|  | ||||
|     pub fn reset_slot_consumed(&self, slot: u64) -> Result<()> { | ||||
|         let meta_key = MetaCf::key(&slot); | ||||
|         if let Some(mut meta) = self.meta_cf.get(&meta_key)? { | ||||
|         if let Some(mut meta) = self.meta_cf.get(slot)? { | ||||
|             for index in 0..meta.received { | ||||
|                 self.data_cf.delete_by_index(&(slot, index))?; | ||||
|                 self.data_cf.delete((slot, index))?; | ||||
|             } | ||||
|             meta.consumed = 0; | ||||
|             meta.received = 0; | ||||
|             meta.last_index = std::u64::MAX; | ||||
|             meta.next_slots = vec![]; | ||||
|             self.meta_cf.put(&meta_key, &meta)?; | ||||
|             self.meta_cf.put(0, &meta)?; | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn get_next_slot(&self, slot: u64) -> Result<Option<u64>> { | ||||
|         let mut db_iterator = self.db.raw_iterator_cf(self.meta_cf.handle())?; | ||||
|         db_iterator.seek(&MetaCf::key(&(slot + 1))); | ||||
|         let mut db_iterator = self.db.cursor::<cf::SlotMeta>()?; | ||||
|         db_iterator.seek(slot + 1); | ||||
|         if !db_iterator.valid() { | ||||
|             Ok(None) | ||||
|         } else { | ||||
|             let key = &db_iterator.key().expect("Expected valid key"); | ||||
|             Ok(Some(MetaCf::index(&key))) | ||||
|             let next_slot = db_iterator.key().expect("Expected valid key"); | ||||
|             Ok(Some(next_slot)) | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -328,11 +365,7 @@ impl Blocktree { | ||||
|             // Check if the working copy of the metadata has changed | ||||
|             if Some(meta) != meta_backup.as_ref() { | ||||
|                 should_signal = should_signal || Self::slot_has_updates(meta, &meta_backup); | ||||
|                 write_batch.put_cf( | ||||
|                     self.meta_cf.handle(), | ||||
|                     &MetaCf::key(slot), | ||||
|                     &serialize(&meta)?, | ||||
|                 )?; | ||||
|                 write_batch.put::<cf::SlotMeta>(*slot, &meta)?; | ||||
|             } | ||||
|         } | ||||
|  | ||||
| @@ -357,9 +390,8 @@ impl Blocktree { | ||||
|         buf: &mut [u8], | ||||
|         slot: u64, | ||||
|     ) -> Result<(u64, u64)> { | ||||
|         let start_key = DataCf::key(&(slot, start_index)); | ||||
|         let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?; | ||||
|         db_iterator.seek(&start_key); | ||||
|         let mut db_iterator = self.db.cursor::<cf::Data>()?; | ||||
|         db_iterator.seek((slot, start_index)); | ||||
|         let mut total_blobs = 0; | ||||
|         let mut total_current_size = 0; | ||||
|         for expected_index in start_index..start_index + num_blobs { | ||||
| @@ -376,14 +408,13 @@ impl Blocktree { | ||||
|  | ||||
|             // Check key is the next sequential key based on | ||||
|             // blob index | ||||
|             let key = &db_iterator.key().expect("Expected valid key"); | ||||
|             let index = DataCf::index(key).1; | ||||
|             let (_, index) = db_iterator.key().expect("Expected valid key"); | ||||
|             if index != expected_index { | ||||
|                 break; | ||||
|             } | ||||
|  | ||||
|             // Get the blob data | ||||
|             let value = &db_iterator.value(); | ||||
|             let value = &db_iterator.value_bytes(); | ||||
|  | ||||
|             if value.is_none() { | ||||
|                 break; | ||||
| @@ -410,24 +441,24 @@ impl Blocktree { | ||||
|     } | ||||
|  | ||||
|     pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> { | ||||
|         self.erasure_cf.get_by_index(&(slot, index)) | ||||
|         self.erasure_cf.get_bytes((slot, index)) | ||||
|     } | ||||
|     pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { | ||||
|         self.erasure_cf.delete_by_index(&(slot, index)) | ||||
|         self.erasure_cf.delete((slot, index)) | ||||
|     } | ||||
|     pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> { | ||||
|         self.data_cf.get_by_index(&(slot, index)) | ||||
|         self.data_cf.get_bytes((slot, index)) | ||||
|     } | ||||
|     pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { | ||||
|         self.erasure_cf.put_by_index(&(slot, index), bytes) | ||||
|         self.erasure_cf.put_bytes((slot, index), bytes) | ||||
|     } | ||||
|  | ||||
|     pub fn put_data_raw(&self, key: &Key, value: &[u8]) -> Result<()> { | ||||
|         self.data_cf.put_bytes(key, value) | ||||
|     pub fn put_data_raw(&self, slot: u64, index: u64, value: &[u8]) -> Result<()> { | ||||
|         self.data_cf.put_bytes((slot, index), value) | ||||
|     } | ||||
|  | ||||
|     pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { | ||||
|         self.data_cf.put_by_index(&(slot, index), bytes) | ||||
|         self.data_cf.put_bytes((slot, index), bytes) | ||||
|     } | ||||
|  | ||||
|     pub fn get_data_blob(&self, slot: u64, blob_index: u64) -> Result<Option<Blob>> { | ||||
| @@ -452,16 +483,16 @@ impl Blocktree { | ||||
|     // Given a start and end entry index, find all the missing | ||||
|     // indexes in the ledger in the range [start_index, end_index) | ||||
|     // for the slot with the specified slot | ||||
|     fn find_missing_indexes( | ||||
|         db_iterator: &mut BlocktreeRawIterator, | ||||
|     fn find_missing_indexes<C>( | ||||
|         db_iterator: &mut Cursor<C>, | ||||
|         slot: u64, | ||||
|         start_index: u64, | ||||
|         end_index: u64, | ||||
|         key: &dyn Fn(u64, u64) -> OwnedKey, | ||||
|         slot_from_key: &dyn Fn(&Key) -> u64, | ||||
|         index_from_key: &dyn Fn(&Key) -> u64, | ||||
|         max_missing: usize, | ||||
|     ) -> Vec<u64> { | ||||
|     ) -> Vec<u64> | ||||
|     where | ||||
|         C: Column<Index = (u64, u64)>, | ||||
|     { | ||||
|         if start_index >= end_index || max_missing == 0 { | ||||
|             return vec![]; | ||||
|         } | ||||
| @@ -469,7 +500,7 @@ impl Blocktree { | ||||
|         let mut missing_indexes = vec![]; | ||||
|  | ||||
|         // Seek to the first blob with index >= start_index | ||||
|         db_iterator.seek(&key(slot, start_index)); | ||||
|         db_iterator.seek((slot, start_index)); | ||||
|  | ||||
|         // The index of the first missing blob in the slot | ||||
|         let mut prev_index = start_index; | ||||
| @@ -483,13 +514,12 @@ impl Blocktree { | ||||
|                 } | ||||
|                 break; | ||||
|             } | ||||
|             let current_key = db_iterator.key().expect("Expect a valid key"); | ||||
|             let current_slot = slot_from_key(¤t_key); | ||||
|             let (current_slot, index) = db_iterator.key().expect("Expect a valid key"); | ||||
|             let current_index = { | ||||
|                 if current_slot > slot { | ||||
|                     end_index | ||||
|                 } else { | ||||
|                     index_from_key(¤t_key) | ||||
|                     index | ||||
|                 } | ||||
|             }; | ||||
|             let upper_index = cmp::min(current_index, end_index); | ||||
| @@ -523,18 +553,11 @@ impl Blocktree { | ||||
|         end_index: u64, | ||||
|         max_missing: usize, | ||||
|     ) -> Vec<u64> { | ||||
|         let mut db_iterator = self.data_cf.raw_iterator(); | ||||
|  | ||||
|         Self::find_missing_indexes( | ||||
|             &mut db_iterator, | ||||
|             slot, | ||||
|             start_index, | ||||
|             end_index, | ||||
|             &|slot, index| DataCf::key(&(slot, index)), | ||||
|             &MetaCf::index, | ||||
|             &|key| DataCf::index(key).1, | ||||
|             max_missing, | ||||
|         ) | ||||
|         if let Ok(mut db_iterator) = self.data_cf.cursor() { | ||||
|             Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing) | ||||
|         } else { | ||||
|             vec![] | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn find_missing_coding_indexes( | ||||
| @@ -544,18 +567,11 @@ impl Blocktree { | ||||
|         end_index: u64, | ||||
|         max_missing: usize, | ||||
|     ) -> Vec<u64> { | ||||
|         let mut db_iterator = self.erasure_cf.raw_iterator(); | ||||
|  | ||||
|         Self::find_missing_indexes( | ||||
|             &mut db_iterator, | ||||
|             slot, | ||||
|             start_index, | ||||
|             end_index, | ||||
|             &|slot, index| ErasureCf::key(&(slot, index)), | ||||
|             &MetaCf::index, | ||||
|             &|key| ErasureCf::index(key).1, | ||||
|             max_missing, | ||||
|         ) | ||||
|         if let Ok(mut db_iterator) = self.erasure_cf.cursor() { | ||||
|             Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing) | ||||
|         } else { | ||||
|             vec![] | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Returns the entry vector for the slot starting with `blob_start_index` | ||||
| @@ -569,6 +585,77 @@ impl Blocktree { | ||||
|             .map(|x| x.0) | ||||
|     } | ||||
|  | ||||
|     pub fn read_ledger_blobs(&self) -> impl Iterator<Item = Blob> { | ||||
|         self.data_cf | ||||
|             .iter() | ||||
|             .unwrap() | ||||
|             .map(|(_, blob_data)| Blob::new(&blob_data)) | ||||
|     } | ||||
|  | ||||
|     /// Return an iterator for all the entries in the given file. | ||||
|     pub fn read_ledger(&self) -> Result<impl Iterator<Item = Entry>> { | ||||
|         use crate::entry::EntrySlice; | ||||
|         use std::collections::VecDeque; | ||||
|  | ||||
|         struct EntryIterator { | ||||
|             db_iterator: Cursor<cf::Data>, | ||||
|  | ||||
|             // TODO: remove me when replay_stage is iterating by block (Blocktree) | ||||
|             //    this verification is duplicating that of replay_stage, which | ||||
|             //    can do this in parallel | ||||
|             blockhash: Option<Hash>, | ||||
|             // https://github.com/rust-rocksdb/rust-rocksdb/issues/234 | ||||
|             //   rocksdb issue: the _blocktree member must be lower in the struct to prevent a crash | ||||
|             //   when the db_iterator member above is dropped. | ||||
|             //   _blocktree is unused, but dropping _blocktree results in a broken db_iterator | ||||
|             //   you have to hold the database open in order to iterate over it, and in order | ||||
|             //   for db_iterator to be able to run Drop | ||||
|             //    _blocktree: Blocktree, | ||||
|             entries: VecDeque<Entry>, | ||||
|         } | ||||
|  | ||||
|         impl Iterator for EntryIterator { | ||||
|             type Item = Entry; | ||||
|  | ||||
|             fn next(&mut self) -> Option<Entry> { | ||||
|                 if !self.entries.is_empty() { | ||||
|                     return Some(self.entries.pop_front().unwrap()); | ||||
|                 } | ||||
|  | ||||
|                 if self.db_iterator.valid() { | ||||
|                     if let Some(value) = self.db_iterator.value_bytes() { | ||||
|                         if let Ok(next_entries) = | ||||
|                             deserialize::<Vec<Entry>>(&value[BLOB_HEADER_SIZE..]) | ||||
|                         { | ||||
|                             if let Some(blockhash) = self.blockhash { | ||||
|                                 if !next_entries.verify(&blockhash) { | ||||
|                                     return None; | ||||
|                                 } | ||||
|                             } | ||||
|                             self.db_iterator.next(); | ||||
|                             if next_entries.is_empty() { | ||||
|                                 return None; | ||||
|                             } | ||||
|                             self.entries = VecDeque::from(next_entries); | ||||
|                             let entry = self.entries.pop_front().unwrap(); | ||||
|                             self.blockhash = Some(entry.hash); | ||||
|                             return Some(entry); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|                 None | ||||
|             } | ||||
|         } | ||||
|         let mut db_iterator = self.data_cf.cursor()?; | ||||
|  | ||||
|         db_iterator.seek_to_first(); | ||||
|         Ok(EntryIterator { | ||||
|             entries: VecDeque::new(), | ||||
|             db_iterator, | ||||
|             blockhash: None, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     pub fn get_slot_entries_with_blob_count( | ||||
|         &self, | ||||
|         slot: u64, | ||||
| @@ -648,7 +735,7 @@ impl Blocktree { | ||||
|         // Write all the newly changed slots in new_chained_slots to the write_batch | ||||
|         for (slot, meta) in new_chained_slots.iter() { | ||||
|             let meta: &SlotMeta = &RefCell::borrow(&*meta); | ||||
|             write_batch.put_cf(self.meta_cf.handle(), &MetaCf::key(slot), &serialize(meta)?)?; | ||||
|             write_batch.put::<cf::SlotMeta>(*slot, meta)?; | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| @@ -692,21 +779,14 @@ impl Blocktree { | ||||
|                     ); | ||||
|  | ||||
|                     if Self::is_detached_head(&RefCell::borrow(&*prev_slot_meta)) { | ||||
|                         write_batch.put_cf( | ||||
|                             self.detached_heads_cf.handle(), | ||||
|                             &DetachedHeadsCf::key(&prev_slot), | ||||
|                             &serialize(&true)?, | ||||
|                         )?; | ||||
|                         write_batch.put::<cf::DetachedHeads>(prev_slot, &true)?; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             // At this point this slot has received a parent, so no longer a detached head | ||||
|             if is_detached_head { | ||||
|                 write_batch.delete_cf( | ||||
|                     self.detached_heads_cf.handle(), | ||||
|                     &DetachedHeadsCf::key(&slot), | ||||
|                 )?; | ||||
|                 write_batch.delete::<cf::DetachedHeads>(slot)?; | ||||
|             } | ||||
|         } | ||||
|  | ||||
| @@ -883,12 +963,11 @@ impl Blocktree { | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         let key = DataCf::key(&(blob_slot, blob_index)); | ||||
|         let serialized_blob_data = &blob_to_insert.data[..BLOB_HEADER_SIZE + blob_size]; | ||||
|  | ||||
|         // Commit step: commit all changes to the mutable structures at once, or none at all. | ||||
|         // We don't want only some of these changes going through. | ||||
|         write_batch.put_cf(self.data_cf.handle(), &key, serialized_blob_data)?; | ||||
|         write_batch.put_bytes::<cf::Data>((blob_slot, blob_index), serialized_blob_data)?; | ||||
|         prev_inserted_blob_datas.insert((blob_slot, blob_index), serialized_blob_data); | ||||
|         // Index is zero-indexed, while the "received" height starts from 1, | ||||
|         // so received = index + 1 for the same blob. | ||||
| @@ -927,7 +1006,7 @@ impl Blocktree { | ||||
|             // Try to find the next blob we're looking for in the prev_inserted_blob_datas | ||||
|             if let Some(prev_blob_data) = prev_inserted_blob_datas.get(&(slot, current_index)) { | ||||
|                 blobs.push(Cow::Borrowed(*prev_blob_data)); | ||||
|             } else if let Some(blob_data) = self.data_cf.get_by_index(&(slot, current_index))? { | ||||
|             } else if let Some(blob_data) = self.data_cf.get_bytes((slot, current_index))? { | ||||
|                 // Try to find the next blob we're looking for in the database | ||||
|                 blobs.push(Cow::Owned(blob_data)); | ||||
|             } else { | ||||
| @@ -944,7 +1023,6 @@ impl Blocktree { | ||||
|     // don't count as ticks, even if they're empty entries | ||||
|     fn write_genesis_blobs(&self, blobs: &[Blob]) -> Result<()> { | ||||
|         // TODO: change bootstrap height to number of slots | ||||
|         let meta_key = MetaCf::key(&0); | ||||
|         let mut bootstrap_meta = SlotMeta::new(0, 1); | ||||
|         let last = blobs.last().unwrap(); | ||||
|  | ||||
| @@ -953,15 +1031,10 @@ impl Blocktree { | ||||
|         bootstrap_meta.is_connected = true; | ||||
|  | ||||
|         let mut batch = self.db.batch()?; | ||||
|         batch.put_cf( | ||||
|             self.meta_cf.handle(), | ||||
|             &meta_key, | ||||
|             &serialize(&bootstrap_meta)?, | ||||
|         )?; | ||||
|         batch.put::<cf::SlotMeta>(0, &bootstrap_meta)?; | ||||
|         for blob in blobs { | ||||
|             let key = DataCf::key(&(blob.slot(), blob.index())); | ||||
|             let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; | ||||
|             batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?; | ||||
|             batch.put_bytes::<cf::Data>((blob.slot(), blob.index()), serialized_blob_datas)?; | ||||
|         } | ||||
|         self.db.write(batch)?; | ||||
|         Ok(()) | ||||
| @@ -1077,7 +1150,6 @@ pub fn tmp_copy_blocktree(from: &str, name: &str) -> String { | ||||
| #[cfg(test)] | ||||
| pub mod tests { | ||||
|     use super::*; | ||||
|     use crate::blocktree::db::Database; | ||||
|     use crate::entry::{ | ||||
|         create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_hash, Entry, EntrySlice, | ||||
|     }; | ||||
| @@ -1208,11 +1280,10 @@ pub mod tests { | ||||
|  | ||||
|         // Test meta column family | ||||
|         let meta = SlotMeta::new(0, 1); | ||||
|         let meta_key = MetaCf::key(&0); | ||||
|         ledger.meta_cf.put(&meta_key, &meta).unwrap(); | ||||
|         ledger.meta_cf.put(0, &meta).unwrap(); | ||||
|         let result = ledger | ||||
|             .meta_cf | ||||
|             .get(&meta_key) | ||||
|             .get(0) | ||||
|             .unwrap() | ||||
|             .expect("Expected meta object to exist"); | ||||
|  | ||||
| @@ -1220,15 +1291,12 @@ pub mod tests { | ||||
|  | ||||
|         // Test erasure column family | ||||
|         let erasure = vec![1u8; 16]; | ||||
|         let erasure_key = ErasureCf::key(&(0, 0)); | ||||
|         ledger | ||||
|             .erasure_cf | ||||
|             .put_bytes(&erasure_key[..], &erasure) | ||||
|             .unwrap(); | ||||
|         let erasure_key = (0, 0); | ||||
|         ledger.erasure_cf.put_bytes(erasure_key, &erasure).unwrap(); | ||||
|  | ||||
|         let result = ledger | ||||
|             .erasure_cf | ||||
|             .get_bytes(&erasure_key[..]) | ||||
|             .get_bytes(erasure_key) | ||||
|             .unwrap() | ||||
|             .expect("Expected erasure object to exist"); | ||||
|  | ||||
| @@ -1236,12 +1304,12 @@ pub mod tests { | ||||
|  | ||||
|         // Test data column family | ||||
|         let data = vec![2u8; 16]; | ||||
|         let data_key = DataCf::key(&(0, 0)); | ||||
|         ledger.data_cf.put_bytes(&data_key, &data).unwrap(); | ||||
|         let data_key = (0, 0); | ||||
|         ledger.data_cf.put_bytes(data_key, &data).unwrap(); | ||||
|  | ||||
|         let result = ledger | ||||
|             .data_cf | ||||
|             .get_bytes(&data_key) | ||||
|             .get_bytes(data_key) | ||||
|             .unwrap() | ||||
|             .expect("Expected data object to exist"); | ||||
|  | ||||
| @@ -1336,7 +1404,7 @@ pub mod tests { | ||||
|  | ||||
|         let meta = ledger | ||||
|             .meta_cf | ||||
|             .get(&MetaCf::key(&0)) | ||||
|             .get(0) | ||||
|             .unwrap() | ||||
|             .expect("Expected new metadata object to be created"); | ||||
|         assert!(meta.consumed == 0 && meta.received == num_entries); | ||||
| @@ -1351,7 +1419,7 @@ pub mod tests { | ||||
|  | ||||
|         let meta = ledger | ||||
|             .meta_cf | ||||
|             .get(&MetaCf::key(&0)) | ||||
|             .get(0) | ||||
|             .unwrap() | ||||
|             .expect("Expected new metadata object to exist"); | ||||
|         assert_eq!(meta.consumed, num_entries); | ||||
| @@ -1381,7 +1449,7 @@ pub mod tests { | ||||
|  | ||||
|             let meta = ledger | ||||
|                 .meta_cf | ||||
|                 .get(&MetaCf::key(&0)) | ||||
|                 .get(0) | ||||
|                 .unwrap() | ||||
|                 .expect("Expected metadata object to exist"); | ||||
|             assert_eq!(meta.parent_slot, 0); | ||||
| @@ -1429,16 +1497,15 @@ pub mod tests { | ||||
|  | ||||
|             let mut db_iterator = blocktree | ||||
|                 .db | ||||
|                 .raw_iterator_cf(blocktree.data_cf.handle()) | ||||
|                 .cursor::<cf::Data>() | ||||
|                 .expect("Expected to be able to open database iterator"); | ||||
|  | ||||
|             db_iterator.seek(&DataCf::key(&(slot, 1))); | ||||
|             db_iterator.seek((slot, 1)); | ||||
|  | ||||
|             // Iterate through ledger | ||||
|             for i in 0..num_entries { | ||||
|                 assert!(db_iterator.valid()); | ||||
|                 let current_key = db_iterator.key().expect("Expected a valid key"); | ||||
|                 let current_index = DataCf::index(¤t_key).1; | ||||
|                 let (_, current_index) = db_iterator.key().expect("Expected a valid key"); | ||||
|                 assert_eq!(current_index, (1 as u64) << (i * 8)); | ||||
|                 db_iterator.next(); | ||||
|             } | ||||
| @@ -1558,8 +1625,7 @@ pub mod tests { | ||||
|  | ||||
|             assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); | ||||
|  | ||||
|             let meta_key = MetaCf::key(&slot); | ||||
|             let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); | ||||
|             let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); | ||||
|             if num_entries % 2 == 0 { | ||||
|                 assert_eq!(meta.received, num_entries); | ||||
|             } else { | ||||
| @@ -1580,8 +1646,7 @@ pub mod tests { | ||||
|                 original_entries, | ||||
|             ); | ||||
|  | ||||
|             let meta_key = MetaCf::key(&slot); | ||||
|             let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); | ||||
|             let meta = blocktree.meta_cf.get(slot).unwrap().unwrap(); | ||||
|             assert_eq!(meta.received, num_entries); | ||||
|             assert_eq!(meta.consumed, num_entries); | ||||
|             assert_eq!(meta.parent_slot, 0); | ||||
| @@ -1633,8 +1698,7 @@ pub mod tests { | ||||
|  | ||||
|             assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), expected,); | ||||
|  | ||||
|             let meta_key = MetaCf::key(&0); | ||||
|             let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); | ||||
|             let meta = blocktree.meta_cf.get(0).unwrap().unwrap(); | ||||
|             assert_eq!(meta.consumed, num_unique_entries); | ||||
|             assert_eq!(meta.received, num_unique_entries); | ||||
|             assert_eq!(meta.parent_slot, 0); | ||||
| @@ -2096,17 +2160,14 @@ pub mod tests { | ||||
|             assert!(blocktree.get_slots_since(&vec![0]).unwrap().is_empty()); | ||||
|  | ||||
|             let mut meta0 = SlotMeta::new(0, 0); | ||||
|             blocktree | ||||
|                 .meta_cf | ||||
|                 .put_by_index(&0, &serialize(&meta0).unwrap()) | ||||
|                 .unwrap(); | ||||
|             blocktree.meta_cf.put(0, &meta0).unwrap(); | ||||
|  | ||||
|             // Slot exists, chains to nothing | ||||
|             let expected: HashMap<u64, Vec<u64>> = | ||||
|                 HashMap::from_iter(vec![(0, vec![])].into_iter()); | ||||
|             assert_eq!(blocktree.get_slots_since(&vec![0]).unwrap(), expected); | ||||
|             meta0.next_slots = vec![1, 2]; | ||||
|             blocktree.meta_cf.put(&MetaCf::key(&0), &meta0).unwrap(); | ||||
|             blocktree.meta_cf.put(0, &meta0).unwrap(); | ||||
|  | ||||
|             // Slot exists, chains to some other slots | ||||
|             let expected: HashMap<u64, Vec<u64>> = | ||||
| @@ -2116,10 +2177,7 @@ pub mod tests { | ||||
|  | ||||
|             let mut meta3 = SlotMeta::new(3, 1); | ||||
|             meta3.next_slots = vec![10, 5]; | ||||
|             blocktree | ||||
|                 .meta_cf | ||||
|                 .put_by_index(&3, &serialize(&meta3).unwrap()) | ||||
|                 .unwrap(); | ||||
|             blocktree.meta_cf.put(3, &meta3).unwrap(); | ||||
|             let expected: HashMap<u64, Vec<u64>> = | ||||
|                 HashMap::from_iter(vec![(0, vec![1, 2]), (3, vec![10, 5])].into_iter()); | ||||
|             assert_eq!(blocktree.get_slots_since(&vec![0, 1, 3]).unwrap(), expected); | ||||
| @@ -2225,8 +2283,7 @@ pub mod tests { | ||||
|                     entries[i as usize] | ||||
|                 ); | ||||
|  | ||||
|                 let meta_key = MetaCf::key(&i); | ||||
|                 let meta = blocktree.meta_cf.get(&meta_key).unwrap().unwrap(); | ||||
|                 let meta = blocktree.meta_cf.get(i).unwrap().unwrap(); | ||||
|                 assert_eq!(meta.received, i + 1); | ||||
|                 assert_eq!(meta.last_index, i); | ||||
|                 if i != 0 { | ||||
| @@ -2448,13 +2505,10 @@ pub mod tests { | ||||
|  | ||||
|     fn get_detached_heads(blocktree: &Blocktree) -> Vec<u64> { | ||||
|         let mut results = vec![]; | ||||
|         let mut iter = blocktree | ||||
|             .db | ||||
|             .raw_iterator_cf(blocktree.detached_heads_cf.handle()) | ||||
|             .unwrap(); | ||||
|         let mut iter = blocktree.detached_heads_cf.cursor().unwrap(); | ||||
|         iter.seek_to_first(); | ||||
|         while iter.valid() { | ||||
|             results.push(DetachedHeadsCf::index(&iter.key().unwrap())); | ||||
|             results.push(iter.key().unwrap()); | ||||
|             iter.next(); | ||||
|         } | ||||
|         results | ||||
|   | ||||
| @@ -1,4 +1,3 @@ | ||||
| use crate::entry::Entry; | ||||
| use crate::result::{Error, Result}; | ||||
|  | ||||
| use bincode::{deserialize, serialize}; | ||||
| @@ -7,25 +6,54 @@ use serde::de::DeserializeOwned; | ||||
| use serde::Serialize; | ||||
|  | ||||
| use std::borrow::Borrow; | ||||
| use std::collections::HashMap; | ||||
| use std::marker::PhantomData; | ||||
| use std::path::Path; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| pub trait Database: Sized + Send + Sync { | ||||
|     type Error: Into<Error>; | ||||
|     type Key: ?Sized; | ||||
|     type OwnedKey: Borrow<Self::Key>; | ||||
|     type ColumnFamily; | ||||
|     type Cursor: Cursor<Self>; | ||||
|     type EntryIter: Iterator<Item = Entry>; | ||||
|     type WriteBatch: IWriteBatch<Self>; | ||||
| pub mod columns { | ||||
|     #[derive(Debug)] | ||||
|     /// SlotMeta Column | ||||
|     pub struct SlotMeta; | ||||
|  | ||||
|     fn cf_handle(&self, cf: &str) -> Option<Self::ColumnFamily>; | ||||
|     #[derive(Debug)] | ||||
|     /// DetachedHeads Column | ||||
|     pub struct DetachedHeads; | ||||
|  | ||||
|     #[derive(Debug)] | ||||
|     /// Erasure Column | ||||
|     pub struct Coding; | ||||
|  | ||||
|     #[derive(Debug)] | ||||
|     /// Data Column | ||||
|     pub struct Data; | ||||
| } | ||||
|  | ||||
| pub trait Backend: Sized + Send + Sync { | ||||
|     type Key: ?Sized + ToOwned<Owned = Self::OwnedKey>; | ||||
|     type OwnedKey: Borrow<Self::Key>; | ||||
|     type ColumnFamily: Clone; | ||||
|     type Cursor: DbCursor<Self>; | ||||
|     type Iter: Iterator<Item = (Box<Self::Key>, Box<[u8]>)>; | ||||
|     type WriteBatch: IWriteBatch<Self>; | ||||
|     type Error: Into<Error>; | ||||
|  | ||||
|     fn open(path: &Path) -> Result<Self>; | ||||
|  | ||||
|     fn columns(&self) -> Vec<&'static str>; | ||||
|  | ||||
|     fn destroy(path: &Path) -> Result<()>; | ||||
|  | ||||
|     fn cf_handle(&self, cf: &str) -> Self::ColumnFamily; | ||||
|  | ||||
|     fn get_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<Option<Vec<u8>>>; | ||||
|  | ||||
|     fn put_cf(&self, cf: Self::ColumnFamily, key: &Self::Key, data: &[u8]) -> Result<()>; | ||||
|     fn put_cf(&self, cf: Self::ColumnFamily, key: &Self::Key, value: &[u8]) -> Result<()>; | ||||
|  | ||||
|     fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<()>; | ||||
|  | ||||
|     fn iterator_cf(&self, cf: Self::ColumnFamily) -> Result<Self::Iter>; | ||||
|  | ||||
|     fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result<Self::Cursor>; | ||||
|  | ||||
|     fn write(&self, batch: Self::WriteBatch) -> Result<()>; | ||||
| @@ -33,103 +61,343 @@ pub trait Database: Sized + Send + Sync { | ||||
|     fn batch(&self) -> Result<Self::WriteBatch>; | ||||
| } | ||||
|  | ||||
| pub trait Cursor<D: Database> { | ||||
| pub trait Column<B> | ||||
| where | ||||
|     B: Backend, | ||||
| { | ||||
|     const NAME: &'static str; | ||||
|     type Index; | ||||
|  | ||||
|     fn key(index: Self::Index) -> B::OwnedKey; | ||||
|     fn index(key: &B::Key) -> Self::Index; | ||||
| } | ||||
|  | ||||
| pub trait DbCursor<B> | ||||
| where | ||||
|     B: Backend, | ||||
| { | ||||
|     fn valid(&self) -> bool; | ||||
|  | ||||
|     fn seek(&mut self, key: &D::Key); | ||||
|     fn seek(&mut self, key: &B::Key); | ||||
|  | ||||
|     fn seek_to_first(&mut self); | ||||
|  | ||||
|     fn next(&mut self); | ||||
|  | ||||
|     fn key(&self) -> Option<D::OwnedKey>; | ||||
|     fn key(&self) -> Option<B::OwnedKey>; | ||||
|  | ||||
|     fn value(&self) -> Option<Vec<u8>>; | ||||
| } | ||||
|  | ||||
| pub trait IWriteBatch<D: Database> { | ||||
|     fn put_cf(&mut self, cf: D::ColumnFamily, key: &D::Key, data: &[u8]) -> Result<()>; | ||||
| pub trait IWriteBatch<B> | ||||
| where | ||||
|     B: Backend, | ||||
| { | ||||
|     fn put_cf(&mut self, cf: B::ColumnFamily, key: &B::Key, value: &[u8]) -> Result<()>; | ||||
|     fn delete_cf(&mut self, cf: B::ColumnFamily, key: &B::Key) -> Result<()>; | ||||
| } | ||||
|  | ||||
| pub trait LedgerColumnFamily<D: Database>: LedgerColumnFamilyRaw<D> { | ||||
|     type ValueType: DeserializeOwned + Serialize; | ||||
| pub trait TypedColumn<B>: Column<B> | ||||
| where | ||||
|     B: Backend, | ||||
| { | ||||
|     type Type: Serialize + DeserializeOwned; | ||||
| } | ||||
|  | ||||
|     fn get(&self, key: &D::Key) -> Result<Option<Self::ValueType>> { | ||||
|         let db = self.db(); | ||||
|         let data_bytes = db.get_cf(self.handle(), key)?; | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct Database<B> | ||||
| where | ||||
|     B: Backend, | ||||
| { | ||||
|     backend: B, | ||||
| } | ||||
|  | ||||
|         if let Some(raw) = data_bytes { | ||||
|             let result: Self::ValueType = deserialize(&raw)?; | ||||
|             Ok(Some(result)) | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct Cursor<B, C> | ||||
| where | ||||
|     B: Backend, | ||||
|     C: Column<B>, | ||||
| { | ||||
|     db_cursor: B::Cursor, | ||||
|     column: PhantomData<C>, | ||||
|     backend: PhantomData<B>, | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct LedgerColumn<B, C> | ||||
| where | ||||
|     B: Backend, | ||||
|     C: Column<B>, | ||||
| { | ||||
|     pub db: Arc<Database<B>>, | ||||
|     column: PhantomData<C>, | ||||
| } | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct WriteBatch<B> | ||||
| where | ||||
|     B: Backend, | ||||
| { | ||||
|     write_batch: B::WriteBatch, | ||||
|     backend: PhantomData<B>, | ||||
|     map: HashMap<&'static str, B::ColumnFamily>, | ||||
| } | ||||
|  | ||||
| impl<B> Database<B> | ||||
| where | ||||
|     B: Backend, | ||||
| { | ||||
|     pub fn open(path: &Path) -> Result<Self> { | ||||
|         let backend = B::open(path)?; | ||||
|  | ||||
|         Ok(Database { backend }) | ||||
|     } | ||||
|  | ||||
|     pub fn destroy(path: &Path) -> Result<()> { | ||||
|         B::destroy(path)?; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn get_bytes<C>(&self, key: C::Index) -> Result<Option<Vec<u8>>> | ||||
|     where | ||||
|         C: Column<B>, | ||||
|     { | ||||
|         self.backend | ||||
|             .get_cf(self.cf_handle::<C>(), C::key(key).borrow()) | ||||
|     } | ||||
|  | ||||
|     pub fn put_bytes<C>(&self, key: C::Index, data: &[u8]) -> Result<()> | ||||
|     where | ||||
|         C: Column<B>, | ||||
|     { | ||||
|         self.backend | ||||
|             .put_cf(self.cf_handle::<C>(), C::key(key).borrow(), data) | ||||
|     } | ||||
|  | ||||
|     pub fn delete<C>(&self, key: C::Index) -> Result<()> | ||||
|     where | ||||
|         C: Column<B>, | ||||
|     { | ||||
|         self.backend | ||||
|             .delete_cf(self.cf_handle::<C>(), C::key(key).borrow()) | ||||
|     } | ||||
|  | ||||
|     pub fn get<C>(&self, key: C::Index) -> Result<Option<C::Type>> | ||||
|     where | ||||
|         C: TypedColumn<B>, | ||||
|     { | ||||
|         if let Some(serialized_value) = self | ||||
|             .backend | ||||
|             .get_cf(self.cf_handle::<C>(), C::key(key).borrow())? | ||||
|         { | ||||
|             let value = deserialize(&serialized_value)?; | ||||
|  | ||||
|             Ok(Some(value)) | ||||
|         } else { | ||||
|             Ok(None) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn put(&self, key: &D::Key, value: &Self::ValueType) -> Result<()> { | ||||
|         let db = self.db(); | ||||
|         let serialized = serialize(value)?; | ||||
|         db.put_cf(self.handle(), key, &serialized)?; | ||||
|         Ok(()) | ||||
|     pub fn put<C>(&self, key: C::Index, value: &C::Type) -> Result<()> | ||||
|     where | ||||
|         C: TypedColumn<B>, | ||||
|     { | ||||
|         let serialized_value = serialize(value)?; | ||||
|  | ||||
|         self.backend.put_cf( | ||||
|             self.cf_handle::<C>(), | ||||
|             C::key(key).borrow(), | ||||
|             &serialized_value, | ||||
|         ) | ||||
|     } | ||||
|  | ||||
|     fn is_empty(&self) -> Result<bool> { | ||||
|         let mut db_iterator = self.db().raw_iterator_cf(self.handle())?; | ||||
|         db_iterator.seek_to_first(); | ||||
|         Ok(!db_iterator.valid()) | ||||
|     pub fn cursor<C>(&self) -> Result<Cursor<B, C>> | ||||
|     where | ||||
|         C: Column<B>, | ||||
|     { | ||||
|         let db_cursor = self.backend.raw_iterator_cf(self.cf_handle::<C>())?; | ||||
|  | ||||
|         Ok(Cursor { | ||||
|             db_cursor, | ||||
|             column: PhantomData, | ||||
|             backend: PhantomData, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     pub fn iter<C>(&self) -> Result<impl Iterator<Item = (C::Index, Vec<u8>)>> | ||||
|     where | ||||
|         C: Column<B>, | ||||
|     { | ||||
|         let iter = self | ||||
|             .backend | ||||
|             .iterator_cf(self.cf_handle::<C>())? | ||||
|             .map(|(key, value)| (C::index(&key), value.into())); | ||||
|  | ||||
|         Ok(iter) | ||||
|     } | ||||
|  | ||||
|     pub fn batch(&self) -> Result<WriteBatch<B>> { | ||||
|         let db_write_batch = self.backend.batch()?; | ||||
|         let map = self | ||||
|             .backend | ||||
|             .columns() | ||||
|             .into_iter() | ||||
|             .map(|desc| (desc, self.backend.cf_handle(desc))) | ||||
|             .collect(); | ||||
|  | ||||
|         Ok(WriteBatch { | ||||
|             write_batch: db_write_batch, | ||||
|             backend: PhantomData, | ||||
|             map, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     pub fn write(&self, batch: WriteBatch<B>) -> Result<()> { | ||||
|         self.backend.write(batch.write_batch) | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     pub fn cf_handle<C>(&self) -> B::ColumnFamily | ||||
|     where | ||||
|         C: Column<B>, | ||||
|     { | ||||
|         self.backend.cf_handle(C::NAME).clone() | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub trait LedgerColumnFamilyRaw<D: Database> { | ||||
|     fn get_bytes(&self, key: &D::Key) -> Result<Option<Vec<u8>>> { | ||||
|         let db = self.db(); | ||||
|         let data_bytes = db.get_cf(self.handle(), key.borrow())?; | ||||
|         Ok(data_bytes.map(|x| x.to_vec())) | ||||
| impl<B, C> Cursor<B, C> | ||||
| where | ||||
|     B: Backend, | ||||
|     C: Column<B>, | ||||
| { | ||||
|     pub fn valid(&self) -> bool { | ||||
|         self.db_cursor.valid() | ||||
|     } | ||||
|  | ||||
|     fn put_bytes(&self, key: &D::Key, serialized_value: &[u8]) -> Result<()> { | ||||
|         let db = self.db(); | ||||
|         db.put_cf(self.handle(), key.borrow(), &serialized_value)?; | ||||
|         Ok(()) | ||||
|     pub fn seek(&mut self, key: C::Index) { | ||||
|         self.db_cursor.seek(C::key(key).borrow()); | ||||
|     } | ||||
|  | ||||
|     fn delete(&self, key: &D::Key) -> Result<()> { | ||||
|         let db = self.db(); | ||||
|         db.delete_cf(self.handle(), key.borrow())?; | ||||
|         Ok(()) | ||||
|     pub fn seek_to_first(&mut self) { | ||||
|         self.db_cursor.seek_to_first(); | ||||
|     } | ||||
|  | ||||
|     fn raw_iterator(&self) -> D::Cursor { | ||||
|         let db = self.db(); | ||||
|         db.raw_iterator_cf(self.handle()) | ||||
|             .expect("Expected to be able to open database iterator") | ||||
|     pub fn next(&mut self) { | ||||
|         self.db_cursor.next(); | ||||
|     } | ||||
|  | ||||
|     fn handle(&self) -> D::ColumnFamily; | ||||
|     pub fn key(&self) -> Option<C::Index> { | ||||
|         if let Some(key) = self.db_cursor.key() { | ||||
|             Some(C::index(key.borrow())) | ||||
|         } else { | ||||
|             None | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn db(&self) -> &Arc<D>; | ||||
|     pub fn value_bytes(&self) -> Option<Vec<u8>> { | ||||
|         self.db_cursor.value() | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub trait IndexColumn<D: Database>: LedgerColumnFamilyRaw<D> { | ||||
|     type Index; | ||||
|  | ||||
|     fn get_by_index(&self, index: &Self::Index) -> Result<Option<Vec<u8>>> { | ||||
|         let db = self.db(); | ||||
|         let data_bytes = db.get_cf(self.handle(), Self::key(index).borrow())?; | ||||
|         Ok(data_bytes.map(|x| x.to_vec())) | ||||
| impl<B, C> Cursor<B, C> | ||||
| where | ||||
|     B: Backend, | ||||
|     C: TypedColumn<B>, | ||||
| { | ||||
|     pub fn value(&self) -> Option<C::Type> { | ||||
|         if let Some(bytes) = self.db_cursor.value() { | ||||
|             let value = deserialize(&bytes).ok()?; | ||||
|             Some(value) | ||||
|         } else { | ||||
|             None | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<B, C> LedgerColumn<B, C> | ||||
| where | ||||
|     B: Backend, | ||||
|     C: Column<B>, | ||||
| { | ||||
|     pub fn new(db: &Arc<Database<B>>) -> Self { | ||||
|         LedgerColumn { | ||||
|             db: Arc::clone(db), | ||||
|             column: PhantomData, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { | ||||
|         self.db | ||||
|             .backend | ||||
|             .put_cf(self.handle(), C::key(key).borrow(), value) | ||||
|     } | ||||
|  | ||||
|     pub fn get_bytes(&self, key: C::Index) -> Result<Option<Vec<u8>>> { | ||||
|         self.db.backend.get_cf(self.handle(), C::key(key).borrow()) | ||||
|     } | ||||
|  | ||||
|     pub fn delete(&self, key: C::Index) -> Result<()> { | ||||
|         self.db | ||||
|             .backend | ||||
|             .delete_cf(self.handle(), C::key(key).borrow()) | ||||
|     } | ||||
|  | ||||
|     pub fn cursor(&self) -> Result<Cursor<B, C>> { | ||||
|         self.db.cursor() | ||||
|     } | ||||
|  | ||||
|     pub fn iter(&self) -> Result<impl Iterator<Item = (C::Index, Vec<u8>)>> { | ||||
|         self.db.iter::<C>() | ||||
|     } | ||||
|  | ||||
|     pub fn handle(&self) -> B::ColumnFamily { | ||||
|         self.db.cf_handle::<C>() | ||||
|     } | ||||
|  | ||||
|     pub fn is_empty(&self) -> Result<bool> { | ||||
|         let mut cursor = self.cursor()?; | ||||
|         cursor.seek_to_first(); | ||||
|         Ok(!cursor.valid()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<B, C> LedgerColumn<B, C> | ||||
| where | ||||
|     B: Backend, | ||||
|     C: TypedColumn<B>, | ||||
| { | ||||
|     pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> { | ||||
|         self.db.put::<C>(key, value) | ||||
|     } | ||||
|  | ||||
|     pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> { | ||||
|         self.db.get::<C>(key) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<B> WriteBatch<B> | ||||
| where | ||||
|     B: Backend, | ||||
| { | ||||
|     pub fn put_bytes<C: Column<B>>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> { | ||||
|         self.write_batch | ||||
|             .put_cf(self.get_cf::<C>(), C::key(key).borrow(), bytes) | ||||
|     } | ||||
|  | ||||
|     pub fn delete<C: Column<B>>(&mut self, key: C::Index) -> Result<()> { | ||||
|         self.write_batch | ||||
|             .delete_cf(self.get_cf::<C>(), C::key(key).borrow()) | ||||
|     } | ||||
|  | ||||
|     pub fn put<C: TypedColumn<B>>(&mut self, key: C::Index, value: &C::Type) -> Result<()> { | ||||
|         let serialized_value = serialize(&value)?; | ||||
|         self.write_batch | ||||
|             .put_cf(self.get_cf::<C>(), C::key(key).borrow(), &serialized_value) | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     fn get_cf<C: Column<B>>(&self) -> B::ColumnFamily { | ||||
|         self.map[C::NAME].clone() | ||||
|     } | ||||
|  | ||||
|     fn put_by_index(&self, index: &Self::Index, serialized_value: &[u8]) -> Result<()> { | ||||
|         let db = self.db(); | ||||
|         db.put_cf(self.handle(), Self::key(index).borrow(), &serialized_value)?; | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn delete_by_index(&self, index: &Self::Index) -> Result<()> { | ||||
|         self.delete(Self::key(index).borrow()) | ||||
|     } | ||||
|  | ||||
|     fn index(key: &D::Key) -> Self::Index; | ||||
|  | ||||
|     fn key(index: &Self::Index) -> D::OwnedKey; | ||||
| } | ||||
|   | ||||
| @@ -1,85 +1,42 @@ | ||||
| use crate::entry::Entry; | ||||
| use crate::packet::Blob; | ||||
| use crate::blocktree::db::columns as cf; | ||||
| use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn}; | ||||
| use crate::blocktree::BlocktreeError; | ||||
| use crate::result::{Error, Result}; | ||||
| use byteorder::{BigEndian, ByteOrder}; | ||||
| use solana_kvstore::{self as kvstore, Key, KvStore}; | ||||
| use std::sync::Arc; | ||||
| use std::path::Path; | ||||
|  | ||||
| use super::db::{ | ||||
|     Cursor, Database, IWriteBatch, IndexColumn, LedgerColumnFamily, LedgerColumnFamilyRaw, | ||||
| }; | ||||
| use super::{Blocktree, BlocktreeError}; | ||||
| type ColumnFamily = u64; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct Kvs(KvStore); | ||||
|  | ||||
| /// The metadata column family | ||||
| #[derive(Debug)] | ||||
| pub struct MetaCf { | ||||
|     db: Arc<Kvs>, | ||||
| } | ||||
| /// Dummy struct for now | ||||
| #[derive(Debug, Clone, Copy)] | ||||
| pub struct Dummy; | ||||
|  | ||||
| /// The data column family | ||||
| #[derive(Debug)] | ||||
| pub struct DataCf { | ||||
|     db: Arc<Kvs>, | ||||
| } | ||||
|  | ||||
| /// The erasure column family | ||||
| #[derive(Debug)] | ||||
| pub struct ErasureCf { | ||||
|     db: Arc<Kvs>, | ||||
| } | ||||
|  | ||||
| /// The detached heads column family | ||||
| #[derive(Debug)] | ||||
| pub struct DetachedHeadsCf { | ||||
|     db: Arc<Kvs>, | ||||
| } | ||||
|  | ||||
| /// Dummy struct to get things compiling | ||||
| /// TODO: all this goes away with Blocktree | ||||
| pub struct EntryIterator(i32); | ||||
| /// Dummy struct to get things compiling | ||||
| pub struct KvsCursor; | ||||
| /// Dummy struct to get things compiling | ||||
| pub struct ColumnFamily; | ||||
| /// Dummy struct to get things compiling | ||||
| pub struct KvsWriteBatch; | ||||
|  | ||||
| impl Blocktree { | ||||
|     /// Opens a Ledger in directory, provides "infinite" window of blobs | ||||
|     pub fn open(_ledger_path: &str) -> Result<Blocktree> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     #[allow(unreachable_code)] | ||||
|     pub fn read_ledger_blobs(&self) -> impl Iterator<Item = Blob> { | ||||
|         unimplemented!(); | ||||
|         self.read_ledger().unwrap().map(|_| Blob::new(&[])) | ||||
|     } | ||||
|  | ||||
|     /// Return an iterator for all the entries in the given file. | ||||
|     #[allow(unreachable_code)] | ||||
|     pub fn read_ledger(&self) -> Result<impl Iterator<Item = Entry>> { | ||||
|         Ok(EntryIterator(unimplemented!())) | ||||
|     } | ||||
|  | ||||
|     pub fn destroy(_ledger_path: &str) -> Result<()> { | ||||
|         unimplemented!() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Database for Kvs { | ||||
|     type Error = kvstore::Error; | ||||
| impl Backend for Kvs { | ||||
|     type Key = Key; | ||||
|     type OwnedKey = Key; | ||||
|     type ColumnFamily = ColumnFamily; | ||||
|     type Cursor = KvsCursor; | ||||
|     type EntryIter = EntryIterator; | ||||
|     type WriteBatch = KvsWriteBatch; | ||||
|     type Cursor = Dummy; | ||||
|     type Iter = Dummy; | ||||
|     type WriteBatch = Dummy; | ||||
|     type Error = kvstore::Error; | ||||
|  | ||||
|     fn cf_handle(&self, _cf: &str) -> Option<ColumnFamily> { | ||||
|     fn open(_path: &Path) -> Result<Kvs> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn columns(&self) -> Vec<&'static str> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn destroy(_path: &Path) -> Result<()> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn cf_handle(&self, _cf: &str) -> ColumnFamily { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
| @@ -87,28 +44,101 @@ impl Database for Kvs { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn put_cf(&self, _cf: ColumnFamily, _key: &Key, _data: &[u8]) -> Result<()> { | ||||
|     fn put_cf(&self, _cf: ColumnFamily, _key: &Key, _value: &[u8]) -> Result<()> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn delete_cf(&self, _cf: Self::ColumnFamily, _key: &Key) -> Result<()> { | ||||
|     fn delete_cf(&self, _cf: ColumnFamily, _key: &Key) -> Result<()> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn raw_iterator_cf(&self, _cf: Self::ColumnFamily) -> Result<Self::Cursor> { | ||||
|     fn iterator_cf(&self, _cf: ColumnFamily) -> Result<Dummy> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn write(&self, _batch: Self::WriteBatch) -> Result<()> { | ||||
|     fn raw_iterator_cf(&self, _cf: ColumnFamily) -> Result<Dummy> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn batch(&self) -> Result<Self::WriteBatch> { | ||||
|     fn batch(&self) -> Result<Dummy> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn write(&self, _batch: Dummy) -> Result<()> { | ||||
|         unimplemented!() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Cursor<Kvs> for KvsCursor { | ||||
| impl Column<Kvs> for cf::Coding { | ||||
|     const NAME: &'static str = super::ERASURE_CF; | ||||
|     type Index = (u64, u64); | ||||
|  | ||||
|     fn key(index: (u64, u64)) -> Key { | ||||
|         cf::Data::key(index) | ||||
|     } | ||||
|  | ||||
|     fn index(key: &Key) -> (u64, u64) { | ||||
|         cf::Data::index(key) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Column<Kvs> for cf::Data { | ||||
|     const NAME: &'static str = super::DATA_CF; | ||||
|     type Index = (u64, u64); | ||||
|  | ||||
|     fn key((slot, index): (u64, u64)) -> Key { | ||||
|         let mut key = Key::default(); | ||||
|         BigEndian::write_u64(&mut key.0[8..16], slot); | ||||
|         BigEndian::write_u64(&mut key.0[16..], index); | ||||
|         key | ||||
|     } | ||||
|  | ||||
|     fn index(key: &Key) -> (u64, u64) { | ||||
|         let slot = BigEndian::read_u64(&key.0[8..16]); | ||||
|         let index = BigEndian::read_u64(&key.0[16..]); | ||||
|         (slot, index) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Column<Kvs> for cf::DetachedHeads { | ||||
|     const NAME: &'static str = super::DETACHED_HEADS_CF; | ||||
|     type Index = u64; | ||||
|  | ||||
|     fn key(slot: u64) -> Key { | ||||
|         let mut key = Key::default(); | ||||
|         BigEndian::write_u64(&mut key.0[8..16], slot); | ||||
|         key | ||||
|     } | ||||
|  | ||||
|     fn index(key: &Key) -> u64 { | ||||
|         BigEndian::read_u64(&key.0[8..16]) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl TypedColumn<Kvs> for cf::DetachedHeads { | ||||
|     type Type = bool; | ||||
| } | ||||
|  | ||||
| impl Column<Kvs> for cf::SlotMeta { | ||||
|     const NAME: &'static str = super::META_CF; | ||||
|     type Index = u64; | ||||
|  | ||||
|     fn key(slot: u64) -> Key { | ||||
|         let mut key = Key::default(); | ||||
|         BigEndian::write_u64(&mut key.0[8..16], slot); | ||||
|         key | ||||
|     } | ||||
|  | ||||
|     fn index(key: &Key) -> u64 { | ||||
|         BigEndian::read_u64(&key.0[8..16]) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl TypedColumn<Kvs> for cf::SlotMeta { | ||||
|     type Type = super::SlotMeta; | ||||
| } | ||||
|  | ||||
| impl DbCursor<Kvs> for Dummy { | ||||
|     fn valid(&self) -> bool { | ||||
|         unimplemented!() | ||||
|     } | ||||
| @@ -134,111 +164,21 @@ impl Cursor<Kvs> for KvsCursor { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl IWriteBatch<Kvs> for KvsWriteBatch { | ||||
|     fn put_cf(&mut self, _cf: ColumnFamily, _key: &Key, _data: &[u8]) -> Result<()> { | ||||
| impl IWriteBatch<Kvs> for Dummy { | ||||
|     fn put_cf(&mut self, _cf: ColumnFamily, _key: &Key, _value: &[u8]) -> Result<()> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn delete_cf(&mut self, _cf: ColumnFamily, _key: &Key) -> Result<()> { | ||||
|         unimplemented!() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamilyRaw<Kvs> for DataCf { | ||||
|     fn db(&self) -> &Arc<Kvs> { | ||||
|         &self.db | ||||
|     } | ||||
| impl Iterator for Dummy { | ||||
|     type Item = (Box<Key>, Box<[u8]>); | ||||
|  | ||||
|     fn handle(&self) -> ColumnFamily { | ||||
|         self.db.cf_handle(super::DATA_CF).unwrap() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl IndexColumn<Kvs> for DataCf { | ||||
|     type Index = (u64, u64); | ||||
|  | ||||
|     fn index(key: &Key) -> (u64, u64) { | ||||
|         let slot = BigEndian::read_u64(&key.0[8..16]); | ||||
|         let index = BigEndian::read_u64(&key.0[16..24]); | ||||
|         (slot, index) | ||||
|     } | ||||
|  | ||||
|     fn key(idx: &(u64, u64)) -> Key { | ||||
|         Key::from((0, idx.0, idx.1)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamilyRaw<Kvs> for ErasureCf { | ||||
|     fn db(&self) -> &Arc<Kvs> { | ||||
|         &self.db | ||||
|     } | ||||
|  | ||||
|     fn handle(&self) -> ColumnFamily { | ||||
|         self.db.cf_handle(super::ERASURE_CF).unwrap() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl IndexColumn<Kvs> for ErasureCf { | ||||
|     type Index = (u64, u64); | ||||
|  | ||||
|     fn index(key: &Key) -> (u64, u64) { | ||||
|         DataCf::index(key) | ||||
|     } | ||||
|  | ||||
|     fn key(idx: &(u64, u64)) -> Key { | ||||
|         DataCf::key(idx) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamilyRaw<Kvs> for MetaCf { | ||||
|     fn db(&self) -> &Arc<Kvs> { | ||||
|         &self.db | ||||
|     } | ||||
|  | ||||
|     fn handle(&self) -> ColumnFamily { | ||||
|         self.db.cf_handle(super::META_CF).unwrap() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamily<Kvs> for MetaCf { | ||||
|     type ValueType = super::SlotMeta; | ||||
| } | ||||
|  | ||||
| impl IndexColumn<Kvs> for MetaCf { | ||||
|     type Index = u64; | ||||
|  | ||||
|     fn index(key: &Key) -> u64 { | ||||
|         BigEndian::read_u64(&key.0[8..16]) | ||||
|     } | ||||
|  | ||||
|     fn key(slot: &u64) -> Key { | ||||
|         let mut key = Key::default(); | ||||
|         BigEndian::write_u64(&mut key.0[8..16], *slot); | ||||
|         key | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamilyRaw<Kvs> for DetachedHeadsCf { | ||||
|     fn db(&self) -> &Arc<Kvs> { | ||||
|         &self.db | ||||
|     } | ||||
|  | ||||
|     fn handle(&self) -> ColumnFamily { | ||||
|         self.db.cf_handle(super::DETACHED_HEADS_CF).unwrap() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamily<Kvs> for DetachedHeadsCf { | ||||
|     type ValueType = bool; | ||||
| } | ||||
|  | ||||
| impl IndexColumn<Kvs> for DetachedHeadsCf { | ||||
|     type Index = u64; | ||||
|  | ||||
|     fn index(key: &Key) -> u64 { | ||||
|         BigEndian::read_u64(&key.0[8..16]) | ||||
|     } | ||||
|  | ||||
|     fn key(slot: &u64) -> Key { | ||||
|         let mut key = Key::default(); | ||||
|         BigEndian::write_u64(&mut key.0[8..16], *slot); | ||||
|         key | ||||
|     fn next(&mut self) -> Option<Self::Item> { | ||||
|         unimplemented!() | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -247,12 +187,3 @@ impl std::convert::From<kvstore::Error> for Error { | ||||
|         Error::BlocktreeError(BlocktreeError::KvsDb(e)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// TODO: all this goes away with Blocktree | ||||
| impl Iterator for EntryIterator { | ||||
|     type Item = Entry; | ||||
|  | ||||
|     fn next(&mut self) -> Option<Entry> { | ||||
|         unimplemented!() | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -1,27 +1,17 @@ | ||||
| use crate::entry::{Entry, EntrySlice}; | ||||
| use crate::packet::{Blob, BLOB_HEADER_SIZE}; | ||||
| use crate::blocktree::db::columns as cf; | ||||
| use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn}; | ||||
| use crate::blocktree::BlocktreeError; | ||||
| use crate::result::{Error, Result}; | ||||
|  | ||||
| use bincode::deserialize; | ||||
|  | ||||
| use byteorder::{BigEndian, ByteOrder}; | ||||
|  | ||||
| use rocksdb::{ | ||||
|     self, ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, IteratorMode, Options, | ||||
|     self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, IteratorMode, Options, | ||||
|     WriteBatch as RWriteBatch, DB, | ||||
| }; | ||||
|  | ||||
| use solana_sdk::hash::Hash; | ||||
|  | ||||
| use std::collections::VecDeque; | ||||
| use std::fs; | ||||
| use std::path::Path; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use super::db::{ | ||||
|     Cursor, Database, IWriteBatch, IndexColumn, LedgerColumnFamily, LedgerColumnFamilyRaw, | ||||
| }; | ||||
| use super::{Blocktree, BlocktreeError}; | ||||
|  | ||||
| // A good value for this is the number of cores on the machine | ||||
| const TOTAL_THREADS: i32 = 8; | ||||
| @@ -30,66 +20,30 @@ const MAX_WRITE_BUFFER_SIZE: usize = 512 * 1024 * 1024; | ||||
| #[derive(Debug)] | ||||
| pub struct Rocks(rocksdb::DB); | ||||
|  | ||||
| /// The metadata column family | ||||
| #[derive(Debug)] | ||||
| pub struct MetaCf { | ||||
|     db: Arc<Rocks>, | ||||
| } | ||||
| impl Backend for Rocks { | ||||
|     type Key = [u8]; | ||||
|     type OwnedKey = Vec<u8>; | ||||
|     type ColumnFamily = ColumnFamily; | ||||
|     type Cursor = DBRawIterator; | ||||
|     type Iter = DBIterator; | ||||
|     type WriteBatch = RWriteBatch; | ||||
|     type Error = rocksdb::Error; | ||||
|  | ||||
| /// The data column family | ||||
| #[derive(Debug)] | ||||
| pub struct DataCf { | ||||
|     db: Arc<Rocks>, | ||||
| } | ||||
|     fn open(path: &Path) -> Result<Rocks> { | ||||
|         use crate::blocktree::db::columns::{Coding, Data, DetachedHeads, SlotMeta}; | ||||
|  | ||||
| /// The erasure column family | ||||
| #[derive(Debug)] | ||||
| pub struct ErasureCf { | ||||
|     db: Arc<Rocks>, | ||||
| } | ||||
|  | ||||
| /// The detached heads column family | ||||
| #[derive(Debug)] | ||||
| pub struct DetachedHeadsCf { | ||||
|     db: Arc<Rocks>, | ||||
| } | ||||
|  | ||||
| /// TODO: all this goes away with Blocktree | ||||
| pub struct EntryIterator { | ||||
|     db_iterator: DBRawIterator, | ||||
|  | ||||
|     // TODO: remove me when replay_stage is iterating by block (Blocktree) | ||||
|     //    this verification is duplicating that of replay_stage, which | ||||
|     //    can do this in parallel | ||||
|     blockhash: Option<Hash>, | ||||
|     // https://github.com/rust-rocksdb/rust-rocksdb/issues/234 | ||||
|     //   rocksdb issue: the _blocktree member must be lower in the struct to prevent a crash | ||||
|     //   when the db_iterator member above is dropped. | ||||
|     //   _blocktree is unused, but dropping _blocktree results in a broken db_iterator | ||||
|     //   you have to hold the database open in order to iterate over it, and in order | ||||
|     //   for db_iterator to be able to run Drop | ||||
|     //    _blocktree: Blocktree, | ||||
|     entries: VecDeque<Entry>, | ||||
| } | ||||
|  | ||||
| impl Blocktree { | ||||
|     /// Opens a Ledger in directory, provides "infinite" window of blobs | ||||
|     pub fn open(ledger_path: &str) -> Result<Blocktree> { | ||||
|         fs::create_dir_all(&ledger_path)?; | ||||
|         let ledger_path = Path::new(ledger_path).join(super::BLOCKTREE_DIRECTORY); | ||||
|         fs::create_dir_all(&path)?; | ||||
|  | ||||
|         // Use default database options | ||||
|         let db_options = Blocktree::get_db_options(); | ||||
|         let db_options = get_db_options(); | ||||
|  | ||||
|         // Column family names | ||||
|         let meta_cf_descriptor = | ||||
|             ColumnFamilyDescriptor::new(super::META_CF, Blocktree::get_cf_options()); | ||||
|         let data_cf_descriptor = | ||||
|             ColumnFamilyDescriptor::new(super::DATA_CF, Blocktree::get_cf_options()); | ||||
|         let erasure_cf_descriptor = | ||||
|             ColumnFamilyDescriptor::new(super::ERASURE_CF, Blocktree::get_cf_options()); | ||||
|         let detached_heads_descriptor =  | ||||
|             ColumnFamilyDescriptor::new(super::DETACHED_HEADS_CF, Blocktree::get_cf_options()); | ||||
|         let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); | ||||
|         let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); | ||||
|         let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); | ||||
|         let detached_heads_descriptor = | ||||
|             ColumnFamilyDescriptor::new(DetachedHeads::NAME, get_cf_options()); | ||||
|  | ||||
|         let cfs = vec![ | ||||
|             meta_cf_descriptor, | ||||
|             data_cf_descriptor, | ||||
| @@ -98,138 +52,155 @@ impl Blocktree { | ||||
|         ]; | ||||
|  | ||||
|         // Open the database | ||||
|         let db = Arc::new(Rocks(DB::open_cf_descriptors( | ||||
|             &db_options, | ||||
|             ledger_path, | ||||
|             cfs, | ||||
|         )?)); | ||||
|         let db = Rocks(DB::open_cf_descriptors(&db_options, path, cfs)?); | ||||
|  | ||||
|         // Create the metadata column family | ||||
|         let meta_cf = MetaCf { db: db.clone() }; | ||||
|  | ||||
|         // Create the data column family | ||||
|         let data_cf = DataCf { db: db.clone() }; | ||||
|  | ||||
|         // Create the erasure column family | ||||
|         let erasure_cf = ErasureCf { db: db.clone() }; | ||||
|  | ||||
|         let detached_heads_cf = DetachedHeadsCf { db: db.clone() }; | ||||
|  | ||||
|         Ok(Blocktree { | ||||
|             db, | ||||
|             meta_cf, | ||||
|             data_cf, | ||||
|             erasure_cf, | ||||
|             detached_heads_cf, | ||||
|             new_blobs_signals: vec![], | ||||
|         }) | ||||
|         Ok(db) | ||||
|     } | ||||
|  | ||||
|     pub fn read_ledger_blobs(&self) -> impl Iterator<Item = Blob> { | ||||
|         self.db | ||||
|             .0 | ||||
|             .iterator_cf(self.data_cf.handle(), IteratorMode::Start) | ||||
|             .unwrap() | ||||
|             .map(|(_, blob_data)| Blob::new(&blob_data)) | ||||
|     fn columns(&self) -> Vec<&'static str> { | ||||
|         use crate::blocktree::db::columns::{Coding, Data, DetachedHeads, SlotMeta}; | ||||
|  | ||||
|         vec![ | ||||
|             Coding::NAME, | ||||
|             Data::NAME, | ||||
|             DetachedHeads::NAME, | ||||
|             SlotMeta::NAME, | ||||
|         ] | ||||
|     } | ||||
|  | ||||
|     /// Return an iterator for all the entries in the given file. | ||||
|     pub fn read_ledger(&self) -> Result<impl Iterator<Item = Entry>> { | ||||
|         let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?; | ||||
|     fn destroy(path: &Path) -> Result<()> { | ||||
|         DB::destroy(&Options::default(), path)?; | ||||
|  | ||||
|         db_iterator.seek_to_first(); | ||||
|         Ok(EntryIterator { | ||||
|             entries: VecDeque::new(), | ||||
|             db_iterator, | ||||
|             blockhash: None, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     pub fn destroy(ledger_path: &str) -> Result<()> { | ||||
|         // DB::destroy() fails if `ledger_path` doesn't exist | ||||
|         fs::create_dir_all(&ledger_path)?; | ||||
|         let ledger_path = Path::new(ledger_path).join(super::BLOCKTREE_DIRECTORY); | ||||
|         DB::destroy(&Options::default(), &ledger_path)?; | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn get_cf_options() -> Options { | ||||
|         let mut options = Options::default(); | ||||
|         options.set_max_write_buffer_number(32); | ||||
|         options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); | ||||
|         options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); | ||||
|         options | ||||
|     } | ||||
|  | ||||
|     fn get_db_options() -> Options { | ||||
|         let mut options = Options::default(); | ||||
|         options.create_if_missing(true); | ||||
|         options.create_missing_column_families(true); | ||||
|         options.increase_parallelism(TOTAL_THREADS); | ||||
|         options.set_max_background_flushes(4); | ||||
|         options.set_max_background_compactions(4); | ||||
|         options.set_max_write_buffer_number(32); | ||||
|         options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); | ||||
|         options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); | ||||
|         options | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Database for Rocks { | ||||
|     type Error = rocksdb::Error; | ||||
|     type Key = [u8]; | ||||
|     type OwnedKey = Vec<u8>; | ||||
|     type ColumnFamily = ColumnFamily; | ||||
|     type Cursor = DBRawIterator; | ||||
|     type EntryIter = EntryIterator; | ||||
|     type WriteBatch = RWriteBatch; | ||||
|  | ||||
|     fn cf_handle(&self, cf: &str) -> Option<ColumnFamily> { | ||||
|         self.0.cf_handle(cf) | ||||
|     fn cf_handle(&self, cf: &str) -> ColumnFamily { | ||||
|         self.0 | ||||
|             .cf_handle(cf) | ||||
|             .expect("should never get an unknown column") | ||||
|     } | ||||
|  | ||||
|     fn get_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> { | ||||
|         let opt = self.0.get_cf(cf, key)?; | ||||
|         Ok(opt.map(|dbvec| dbvec.to_vec())) | ||||
|         let opt = self.0.get_cf(cf, key)?.map(|db_vec| db_vec.to_vec()); | ||||
|         Ok(opt) | ||||
|     } | ||||
|  | ||||
|     fn put_cf(&self, cf: ColumnFamily, key: &[u8], data: &[u8]) -> Result<()> { | ||||
|         self.0.put_cf(cf, key, data)?; | ||||
|     fn put_cf(&self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { | ||||
|         self.0.put_cf(cf, key, value)?; | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn delete_cf(&self, cf: Self::ColumnFamily, key: &[u8]) -> Result<()> { | ||||
|         self.0.delete_cf(cf, key).map_err(From::from) | ||||
|     fn delete_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<()> { | ||||
|         self.0.delete_cf(cf, key)?; | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result<Self::Cursor> { | ||||
|         Ok(self.0.raw_iterator_cf(cf)?) | ||||
|     fn iterator_cf(&self, cf: ColumnFamily) -> Result<DBIterator> { | ||||
|         let raw_iter = self.0.iterator_cf(cf, IteratorMode::Start)?; | ||||
|  | ||||
|         Ok(raw_iter) | ||||
|     } | ||||
|  | ||||
|     fn write(&self, batch: Self::WriteBatch) -> Result<()> { | ||||
|         self.0.write(batch).map_err(From::from) | ||||
|     fn raw_iterator_cf(&self, cf: ColumnFamily) -> Result<DBRawIterator> { | ||||
|         let raw_iter = self.0.raw_iterator_cf(cf)?; | ||||
|  | ||||
|         Ok(raw_iter) | ||||
|     } | ||||
|  | ||||
|     fn batch(&self) -> Result<Self::WriteBatch> { | ||||
|     fn batch(&self) -> Result<RWriteBatch> { | ||||
|         Ok(RWriteBatch::default()) | ||||
|     } | ||||
|  | ||||
|     fn write(&self, batch: RWriteBatch) -> Result<()> { | ||||
|         self.0.write(batch)?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Cursor<Rocks> for DBRawIterator { | ||||
| impl Column<Rocks> for cf::Coding { | ||||
|     const NAME: &'static str = super::ERASURE_CF; | ||||
|     type Index = (u64, u64); | ||||
|  | ||||
|     fn key(index: (u64, u64)) -> Vec<u8> { | ||||
|         cf::Data::key(index) | ||||
|     } | ||||
|  | ||||
|     fn index(key: &[u8]) -> (u64, u64) { | ||||
|         cf::Data::index(key) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Column<Rocks> for cf::Data { | ||||
|     const NAME: &'static str = super::DATA_CF; | ||||
|     type Index = (u64, u64); | ||||
|  | ||||
|     fn key((slot, index): (u64, u64)) -> Vec<u8> { | ||||
|         let mut key = vec![0; 16]; | ||||
|         BigEndian::write_u64(&mut key[..8], slot); | ||||
|         BigEndian::write_u64(&mut key[8..16], index); | ||||
|         key | ||||
|     } | ||||
|  | ||||
|     fn index(key: &[u8]) -> (u64, u64) { | ||||
|         let slot = BigEndian::read_u64(&key[..8]); | ||||
|         let index = BigEndian::read_u64(&key[8..16]); | ||||
|         (slot, index) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Column<Rocks> for cf::DetachedHeads { | ||||
|     const NAME: &'static str = super::DETACHED_HEADS_CF; | ||||
|     type Index = u64; | ||||
|  | ||||
|     fn key(slot: u64) -> Vec<u8> { | ||||
|         let mut key = vec![0; 8]; | ||||
|         BigEndian::write_u64(&mut key[..], slot); | ||||
|         key | ||||
|     } | ||||
|  | ||||
|     fn index(key: &[u8]) -> u64 { | ||||
|         BigEndian::read_u64(&key[..8]) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl TypedColumn<Rocks> for cf::DetachedHeads { | ||||
|     type Type = bool; | ||||
| } | ||||
|  | ||||
| impl Column<Rocks> for cf::SlotMeta { | ||||
|     const NAME: &'static str = super::META_CF; | ||||
|     type Index = u64; | ||||
|  | ||||
|     fn key(slot: u64) -> Vec<u8> { | ||||
|         let mut key = vec![0; 8]; | ||||
|         BigEndian::write_u64(&mut key[..], slot); | ||||
|         key | ||||
|     } | ||||
|  | ||||
|     fn index(key: &[u8]) -> u64 { | ||||
|         BigEndian::read_u64(&key[..8]) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl TypedColumn<Rocks> for cf::SlotMeta { | ||||
|     type Type = super::SlotMeta; | ||||
| } | ||||
|  | ||||
| impl DbCursor<Rocks> for DBRawIterator { | ||||
|     fn valid(&self) -> bool { | ||||
|         DBRawIterator::valid(self) | ||||
|     } | ||||
|  | ||||
|     fn seek(&mut self, key: &[u8]) { | ||||
|         DBRawIterator::seek(self, key) | ||||
|         DBRawIterator::seek(self, key); | ||||
|     } | ||||
|  | ||||
|     fn seek_to_first(&mut self) { | ||||
|         DBRawIterator::seek_to_first(self) | ||||
|         DBRawIterator::seek_to_first(self); | ||||
|     } | ||||
|  | ||||
|     fn next(&mut self) { | ||||
|         DBRawIterator::next(self) | ||||
|         DBRawIterator::next(self); | ||||
|     } | ||||
|  | ||||
|     fn key(&self) -> Option<Vec<u8>> { | ||||
| @@ -242,114 +213,14 @@ impl Cursor<Rocks> for DBRawIterator { | ||||
| } | ||||
|  | ||||
| impl IWriteBatch<Rocks> for RWriteBatch { | ||||
|     fn put_cf(&mut self, cf: ColumnFamily, key: &[u8], data: &[u8]) -> Result<()> { | ||||
|         RWriteBatch::put_cf(self, cf, key, data)?; | ||||
|     fn put_cf(&mut self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { | ||||
|         RWriteBatch::put_cf(self, cf, key, value)?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamilyRaw<Rocks> for DataCf { | ||||
|     fn db(&self) -> &Arc<Rocks> { | ||||
|         &self.db | ||||
|     } | ||||
|  | ||||
|     fn handle(&self) -> ColumnFamily { | ||||
|         self.db.cf_handle(super::DATA_CF).unwrap() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl IndexColumn<Rocks> for DataCf { | ||||
|     type Index = (u64, u64); | ||||
|  | ||||
|     fn index(key: &[u8]) -> (u64, u64) { | ||||
|         let slot = BigEndian::read_u64(&key[..8]); | ||||
|         let index = BigEndian::read_u64(&key[8..16]); | ||||
|         (slot, index) | ||||
|     } | ||||
|  | ||||
|     fn key(idx: &(u64, u64)) -> Vec<u8> { | ||||
|         let mut key = vec![0u8; 16]; | ||||
|         BigEndian::write_u64(&mut key[0..8], idx.0); | ||||
|         BigEndian::write_u64(&mut key[8..16], idx.1); | ||||
|         key | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamilyRaw<Rocks> for ErasureCf { | ||||
|     fn db(&self) -> &Arc<Rocks> { | ||||
|         &self.db | ||||
|     } | ||||
|  | ||||
|     fn handle(&self) -> ColumnFamily { | ||||
|         self.db.cf_handle(super::ERASURE_CF).unwrap() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl IndexColumn<Rocks> for ErasureCf { | ||||
|     type Index = (u64, u64); | ||||
|  | ||||
|     fn index(key: &[u8]) -> (u64, u64) { | ||||
|         DataCf::index(key) | ||||
|     } | ||||
|  | ||||
|     fn key(idx: &(u64, u64)) -> Vec<u8> { | ||||
|         DataCf::key(idx) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamilyRaw<Rocks> for MetaCf { | ||||
|     fn db(&self) -> &Arc<Rocks> { | ||||
|         &self.db | ||||
|     } | ||||
|  | ||||
|     fn handle(&self) -> ColumnFamily { | ||||
|         self.db.cf_handle(super::META_CF).unwrap() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamily<Rocks> for MetaCf { | ||||
|     type ValueType = super::SlotMeta; | ||||
| } | ||||
|  | ||||
| impl IndexColumn<Rocks> for MetaCf { | ||||
|     type Index = u64; | ||||
|  | ||||
|     fn index(key: &[u8]) -> u64 { | ||||
|         BigEndian::read_u64(&key[..8]) | ||||
|     } | ||||
|  | ||||
|     fn key(slot: &u64) -> Vec<u8> { | ||||
|         let mut key = vec![0; 8]; | ||||
|         BigEndian::write_u64(&mut key[..], *slot); | ||||
|         key | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamilyRaw<Rocks> for DetachedHeadsCf { | ||||
|     fn db(&self) -> &Arc<Rocks> { | ||||
|         &self.db | ||||
|     } | ||||
|  | ||||
|     fn handle(&self) -> ColumnFamily { | ||||
|         self.db.cf_handle(super::DETACHED_HEADS_CF).unwrap() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl LedgerColumnFamily<Rocks> for DetachedHeadsCf { | ||||
|     type ValueType = bool; | ||||
| } | ||||
|  | ||||
| impl IndexColumn<Rocks> for DetachedHeadsCf { | ||||
|     type Index = u64; | ||||
|  | ||||
|     fn index(key: &[u8]) -> u64 { | ||||
|         BigEndian::read_u64(&key[..8]) | ||||
|     } | ||||
|  | ||||
|     fn key(slot: &u64) -> Vec<u8> { | ||||
|         let mut key = vec![0; 8]; | ||||
|         BigEndian::write_u64(&mut key[..], *slot); | ||||
|         key | ||||
|     fn delete_cf(&mut self, cf: ColumnFamily, key: &[u8]) -> Result<()> { | ||||
|         RWriteBatch::delete_cf(self, cf, key)?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -359,34 +230,23 @@ impl std::convert::From<rocksdb::Error> for Error { | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// TODO: all this goes away with Blocktree | ||||
| impl Iterator for EntryIterator { | ||||
|     type Item = Entry; | ||||
|  | ||||
|     fn next(&mut self) -> Option<Entry> { | ||||
|         if !self.entries.is_empty() { | ||||
|             return Some(self.entries.pop_front().unwrap()); | ||||
|         } | ||||
|  | ||||
|         if self.db_iterator.valid() { | ||||
|             if let Some(value) = self.db_iterator.value() { | ||||
|                 if let Ok(next_entries) = deserialize::<Vec<Entry>>(&value[BLOB_HEADER_SIZE..]) { | ||||
|                     if let Some(blockhash) = self.blockhash { | ||||
|                         if !next_entries.verify(&blockhash) { | ||||
|                             return None; | ||||
|                         } | ||||
|                     } | ||||
|                     self.db_iterator.next(); | ||||
|                     if next_entries.is_empty() { | ||||
|                         return None; | ||||
|                     } | ||||
|                     self.entries = VecDeque::from(next_entries); | ||||
|                     let entry = self.entries.pop_front().unwrap(); | ||||
|                     self.blockhash = Some(entry.hash); | ||||
|                     return Some(entry); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         None | ||||
|     } | ||||
| fn get_cf_options() -> Options { | ||||
|     let mut options = Options::default(); | ||||
|     options.set_max_write_buffer_number(32); | ||||
|     options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); | ||||
|     options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); | ||||
|     options | ||||
| } | ||||
|  | ||||
| fn get_db_options() -> Options { | ||||
|     let mut options = Options::default(); | ||||
|     options.create_if_missing(true); | ||||
|     options.create_missing_column_families(true); | ||||
|     options.increase_parallelism(TOTAL_THREADS); | ||||
|     options.set_max_background_flushes(4); | ||||
|     options.set_max_background_compactions(4); | ||||
|     options.set_max_write_buffer_number(32); | ||||
|     options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); | ||||
|     options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); | ||||
|     options | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user