rename to BucketStorage
This commit is contained in:
committed by
Jeff Washington (jwash)
parent
e4103b5886
commit
45b9d7980a
@ -1,5 +1,6 @@
|
||||
use crate::bucket_map::{BucketItem, BucketMapError, MaxSearch, RefCount};
|
||||
use crate::data_bucket::{BucketMapStats, DataBucket};
|
||||
use crate::bucket_stats::BucketMapStats;
|
||||
use crate::bucket_storage::BucketStorage;
|
||||
use crate::index_entry::IndexEntry;
|
||||
use rand::thread_rng;
|
||||
use rand::Rng;
|
||||
@ -13,15 +14,15 @@ use std::path::PathBuf;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
// >= 2 instances of DataBucket per 'bucket' in the bucket map. 1 for index, >= 1 for data
|
||||
// >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data
|
||||
pub struct Bucket<T> {
|
||||
drives: Arc<Vec<PathBuf>>,
|
||||
//index
|
||||
index: DataBucket,
|
||||
index: BucketStorage,
|
||||
//random offset for the index
|
||||
random: u64,
|
||||
//data buckets to store SlotSlice up to a power of 2 in len
|
||||
pub data: Vec<DataBucket>,
|
||||
//storage buckets to store SlotSlice up to a power of 2 in len
|
||||
pub data: Vec<BucketStorage>,
|
||||
_phantom: PhantomData<T>,
|
||||
stats: Arc<BucketMapStats>,
|
||||
}
|
||||
@ -32,7 +33,7 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||
max_search: MaxSearch,
|
||||
stats: Arc<BucketMapStats>,
|
||||
) -> Self {
|
||||
let index = DataBucket::new(
|
||||
let index = BucketStorage::new(
|
||||
Arc::clone(&drives),
|
||||
1,
|
||||
std::mem::size_of::<IndexEntry>() as u64,
|
||||
@ -98,7 +99,7 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||
}
|
||||
|
||||
fn bucket_find_entry_mut<'a>(
|
||||
index: &'a DataBucket,
|
||||
index: &'a BucketStorage,
|
||||
key: &Pubkey,
|
||||
random: u64,
|
||||
) -> Option<(&'a mut IndexEntry, u64)> {
|
||||
@ -117,7 +118,7 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||
}
|
||||
|
||||
fn bucket_find_entry<'a>(
|
||||
index: &'a DataBucket,
|
||||
index: &'a BucketStorage,
|
||||
key: &Pubkey,
|
||||
random: u64,
|
||||
) -> Option<(&'a IndexEntry, u64)> {
|
||||
@ -136,7 +137,7 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||
}
|
||||
|
||||
fn bucket_create_key(
|
||||
index: &DataBucket,
|
||||
index: &BucketStorage,
|
||||
key: &Pubkey,
|
||||
elem_uid: u64,
|
||||
random: u64,
|
||||
@ -152,8 +153,8 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||
let mut elem: &mut IndexEntry = index.get_mut(ii);
|
||||
elem.key = *key;
|
||||
elem.ref_count = ref_count;
|
||||
elem.data_location = 0;
|
||||
elem.bucket_capacity_when_created_pow2 = 0;
|
||||
elem.storage_offset = 0;
|
||||
elem.storage_capacity_when_created_pow2 = 0;
|
||||
elem.num_slots = 0;
|
||||
//debug!( "INDEX ALLOC {:?} {} {} {}", key, ii, index.capacity, elem_uid );
|
||||
return Ok(ii);
|
||||
@ -240,8 +241,8 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||
current_bucket.free(elem_loc, elem_uid).unwrap();
|
||||
}
|
||||
// elem: &mut IndexEntry = self.index.get_mut(elem_ix);
|
||||
elem.data_location = ix;
|
||||
elem.bucket_capacity_when_created_pow2 = best_bucket.capacity_pow2;
|
||||
elem.storage_offset = ix;
|
||||
elem.storage_capacity_when_created_pow2 = best_bucket.capacity_pow2;
|
||||
elem.num_slots = data.len() as u64;
|
||||
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
|
||||
if elem.num_slots > 0 {
|
||||
@ -279,7 +280,7 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||
//increasing the capacity by ^4 reduces the
|
||||
//likelyhood of a re-index collision of 2^(max_search)^2
|
||||
//1 in 2^32
|
||||
let index = DataBucket::new_with_capacity(
|
||||
let index = BucketStorage::new_with_capacity(
|
||||
Arc::clone(&self.drives),
|
||||
1,
|
||||
std::mem::size_of::<IndexEntry>() as u64,
|
||||
@ -335,7 +336,7 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||
pub fn grow_data(&mut self, sz: (u64, u8)) {
|
||||
if self.data.get(sz.0 as usize).is_none() {
|
||||
for i in self.data.len() as u64..(sz.0 + 1) {
|
||||
self.data.push(DataBucket::new(
|
||||
self.data.push(BucketStorage::new(
|
||||
Arc::clone(&self.drives),
|
||||
1 << i,
|
||||
std::mem::size_of::<T>() as u64,
|
||||
@ -350,7 +351,7 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn bucket_index_ix(index: &DataBucket, key: &Pubkey, random: u64) -> u64 {
|
||||
fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 {
|
||||
let uid = IndexEntry::key_uid(key);
|
||||
let mut s = DefaultHasher::new();
|
||||
uid.hash(&mut s);
|
||||
|
@ -72,7 +72,7 @@ impl Header {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DataBucket {
|
||||
pub struct BucketStorage {
|
||||
drives: Arc<Vec<PathBuf>>,
|
||||
path: PathBuf,
|
||||
mmap: MmapMut,
|
||||
@ -84,18 +84,18 @@ pub struct DataBucket {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DataBucketError {
|
||||
pub enum BucketStorageError {
|
||||
AlreadyAllocated,
|
||||
InvalidFree,
|
||||
}
|
||||
|
||||
impl Drop for DataBucket {
|
||||
impl Drop for BucketStorage {
|
||||
fn drop(&mut self) {
|
||||
let _ = remove_file(&self.path);
|
||||
}
|
||||
}
|
||||
|
||||
impl DataBucket {
|
||||
impl BucketStorage {
|
||||
pub fn new_with_capacity(
|
||||
drives: Arc<Vec<PathBuf>>,
|
||||
num_elems: u64,
|
||||
@ -151,14 +151,14 @@ impl DataBucket {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn allocate(&self, ix: u64, uid: u64) -> Result<(), DataBucketError> {
|
||||
pub fn allocate(&self, ix: u64, uid: u64) -> Result<(), BucketStorageError> {
|
||||
if ix >= self.num_cells() {
|
||||
panic!("allocate: bad index size");
|
||||
}
|
||||
if 0 == uid {
|
||||
panic!("allocate: bad uid");
|
||||
}
|
||||
let mut e = Err(DataBucketError::AlreadyAllocated);
|
||||
let mut e = Err(BucketStorageError::AlreadyAllocated);
|
||||
let ix = (ix * self.cell_size) as usize;
|
||||
//debug!("ALLOC {} {}", ix, uid);
|
||||
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
|
||||
@ -172,7 +172,7 @@ impl DataBucket {
|
||||
e
|
||||
}
|
||||
|
||||
pub fn free(&self, ix: u64, uid: u64) -> Result<(), DataBucketError> {
|
||||
pub fn free(&self, ix: u64, uid: u64) -> Result<(), BucketStorageError> {
|
||||
if ix >= self.num_cells() {
|
||||
panic!("free: bad index size");
|
||||
}
|
||||
@ -182,7 +182,7 @@ impl DataBucket {
|
||||
let ix = (ix * self.cell_size) as usize;
|
||||
//debug!("FREE {} {}", ix, uid);
|
||||
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
|
||||
let mut e = Err(DataBucketError::InvalidFree);
|
||||
let mut e = Err(BucketStorageError::InvalidFree);
|
||||
unsafe {
|
||||
let hdr = hdr_slice.as_ptr() as *const Header;
|
||||
//debug!("FREE uid: {}", hdr.as_ref().unwrap().uid());
|
@ -1,6 +1,6 @@
|
||||
use crate::bucket::Bucket;
|
||||
use crate::bucket_map::RefCount;
|
||||
use crate::data_bucket::DataBucket;
|
||||
use crate::bucket_storage::BucketStorage;
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
@ -14,9 +14,9 @@ use std::hash::{Hash, Hasher};
|
||||
pub struct IndexEntry {
|
||||
pub key: Pubkey, // can this be smaller if we have reduced the keys into buckets already?
|
||||
pub ref_count: RefCount, // can this be smaller? Do we ever need more than 4B refcounts?
|
||||
pub data_location: u64, // smaller? since these are variably sized, this could get tricky. well, actually accountinfo is not variable sized...
|
||||
pub storage_offset: u64, // smaller? since these are variably sized, this could get tricky. well, actually accountinfo is not variable sized...
|
||||
// if the bucket doubled, the index can be recomputed using create_bucket_capacity_pow2
|
||||
pub bucket_capacity_when_created_pow2: u8, // see data_location
|
||||
pub storage_capacity_when_created_pow2: u8, // see data_location
|
||||
pub num_slots: Slot, // can this be smaller? epoch size should ~ be the max len. this is the num elements in the slot list
|
||||
}
|
||||
|
||||
@ -33,10 +33,10 @@ impl IndexEntry {
|
||||
self.ref_count
|
||||
}
|
||||
|
||||
// This function maps the original data location into an index in the current data bucket.
|
||||
// This is coupled with how we resize data buckets.
|
||||
pub fn data_loc(&self, bucket: &DataBucket) -> u64 {
|
||||
self.data_location << (bucket.capacity_pow2 - self.bucket_capacity_when_created_pow2)
|
||||
// This function maps the original data location into an index in the current bucket storage.
|
||||
// This is coupled with how we resize bucket storages.
|
||||
pub fn data_loc(&self, storage: &BucketStorage) -> u64 {
|
||||
self.storage_offset << (storage.capacity_pow2 - self.storage_capacity_when_created_pow2)
|
||||
}
|
||||
|
||||
pub fn read_value<'a, T>(&self, bucket: &'a Bucket<T>) -> Option<(&'a [T], RefCount)> {
|
||||
|
@ -1,7 +1,7 @@
|
||||
#![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))]
|
||||
#![allow(clippy::integer_arithmetic)]
|
||||
#![allow(clippy::mut_from_ref)]
|
||||
pub mod bucket_map;
|
||||
mod bucket;
|
||||
mod data_bucket;
|
||||
pub mod bucket_map;
|
||||
mod bucket_storage;
|
||||
mod index_entry;
|
||||
|
Reference in New Issue
Block a user