From e20a8329d330c121f96c318baae2319a387103f1 Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 13 May 2019 22:04:54 -0700 Subject: [PATCH] Add API to iterate over slot's blobs (#4276) --- core/src/blocktree.rs | 38 ++++++++++++++++++++++++++++++++- core/src/blocktree/db.rs | 42 +++++++++++++++++++++++++------------ core/src/blocktree/rocks.rs | 17 ++++++++++----- 3 files changed, 78 insertions(+), 19 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index fe6088396f..86055e9bdb 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -188,6 +188,14 @@ impl Blocktree { Ok(db_iterator) } + pub fn slot_data_iterator( + &self, + slot: u64, + ) -> Result)>> { + let slot_iterator = self.db.iter::(Some((slot, 0)))?; + Ok(slot_iterator.take_while(move |((blob_slot, _), _)| *blob_slot == slot)) + } + pub fn write_shared_blobs(&self, shared_blobs: I) -> Result<()> where I: IntoIterator, @@ -673,7 +681,7 @@ impl Blocktree { } pub fn read_ledger_blobs(&self) -> impl Iterator + '_ { - let iter = self.db.iter::().unwrap(); + let iter = self.db.iter::(None).unwrap(); iter.map(|(_, blob_data)| Blob::new(&blob_data)) } @@ -3042,6 +3050,34 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + fn test_slot_data_iterator() { + // Construct the blobs + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + let blobs_per_slot = 10; + let slots = vec![2, 4, 8, 12]; + let all_blobs = make_chaining_slot_entries(&slots, blobs_per_slot); + let slot_8_blobs = all_blobs[2].0.clone(); + for (slot_blobs, _) in all_blobs { + blocktree.insert_data_blobs(&slot_blobs[..]).unwrap(); + } + + // Slot doesnt exist, iterator should be empty + let blob_iter = blocktree.slot_data_iterator(5).unwrap(); + let result: Vec<_> = blob_iter.collect(); + assert_eq!(result, vec![]); + + // Test that the iterator for slot 8 contains what was inserted earlier + let blob_iter = blocktree.slot_data_iterator(8).unwrap(); + let result: Vec<_> = blob_iter.map(|(_, bytes)| Blob::new(&bytes)).collect(); + assert_eq!(result.len() as u64, blobs_per_slot); + assert_eq!(result, slot_8_blobs); + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + mod erasure { use super::*; use crate::blocktree::meta::ErasureMetaStatus; diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 38e2d24e4d..333dd983df 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -60,7 +60,7 @@ pub trait Backend: Sized + Send + Sync { fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<()>; - fn iterator_cf(&self, cf: Self::ColumnFamily) -> Result; + fn iterator_cf(&self, cf: Self::ColumnFamily, from: Option<&Self::Key>) -> Result; fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result; @@ -241,16 +241,24 @@ where }) } - pub fn iter(&self) -> Result)>> + pub fn iter( + &self, + start_from: Option, + ) -> Result)>> where C: Column, { - let iter = self - .backend - .iterator_cf(self.cf_handle::())? - .map(|(key, value)| (C::index(&key), value.into())); + let iter = { + if let Some(index) = start_from { + let key = C::key(index); + self.backend + .iterator_cf(self.cf_handle::(), Some(key.borrow()))? + } else { + self.backend.iterator_cf(self.cf_handle::(), None)? + } + }; - Ok(iter) + Ok(iter.map(|(key, value)| (C::index(&key), value.into()))) } #[inline] @@ -371,13 +379,21 @@ where }) } - pub fn iter(&self) -> Result)>> { - let iter = self - .backend - .iterator_cf(self.handle())? - .map(|(key, value)| (C::index(&key), value.into())); + pub fn iter( + &self, + start_from: Option, + ) -> Result)>> { + let iter = { + if let Some(index) = start_from { + let key = C::key(index); + self.backend + .iterator_cf(self.handle(), Some(key.borrow()))? + } else { + self.backend.iterator_cf(self.handle(), None)? + } + }; - Ok(iter) + Ok(iter.map(|(key, value)| (C::index(&key), value.into()))) } #[inline] diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index c68392d993..fe14121319 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -6,8 +6,8 @@ use crate::result::{Error, Result}; use byteorder::{BigEndian, ByteOrder}; use rocksdb::{ - self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, IteratorMode, Options, - WriteBatch as RWriteBatch, DB, + self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction, IteratorMode, + Options, WriteBatch as RWriteBatch, DB, }; use std::fs; @@ -101,10 +101,17 @@ impl Backend for Rocks { Ok(()) } - fn iterator_cf(&self, cf: ColumnFamily) -> Result { - let raw_iter = self.0.iterator_cf(cf, IteratorMode::Start)?; + fn iterator_cf(&self, cf: ColumnFamily, start_from: Option<&[u8]>) -> Result { + let iter = { + if let Some(start_from) = start_from { + self.0 + .iterator_cf(cf, IteratorMode::From(start_from, Direction::Forward))? + } else { + self.0.iterator_cf(cf, IteratorMode::Start)? + } + }; - Ok(raw_iter) + Ok(iter) } fn raw_iterator_cf(&self, cf: ColumnFamily) -> Result {