From 5f079137e5bdc83f6f117f455fbfeb68e2519e58 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Tue, 24 Sep 2019 19:59:32 -0700 Subject: [PATCH] Remove kvstore (#6075) automerge --- Cargo.lock | 16 - Cargo.toml | 2 - core/Cargo.toml | 2 - core/src/blocktree.rs | 9 - core/src/blocktree/kvs.rs | 283 ------------------ kvstore/.gitignore | 2 - kvstore/Cargo.toml | 22 -- kvstore/benches/basic.rs | 170 ----------- kvstore/src/compactor.rs | 223 -------------- kvstore/src/error.rs | 79 ----- kvstore/src/io_utils.rs | 437 --------------------------- kvstore/src/lib.rs | 407 ------------------------- kvstore/src/mapper.rs | 50 ---- kvstore/src/mapper/disk.rs | 336 --------------------- kvstore/src/mapper/memory.rs | 226 -------------- kvstore/src/readtx.rs | 33 -- kvstore/src/sstable.rs | 564 ----------------------------------- kvstore/src/storage.rs | 280 ----------------- kvstore/src/writebatch.rs | 209 ------------- kvstore/src/writelog.rs | 276 ----------------- kvstore/tests/basic.rs | 240 --------------- 21 files changed, 3866 deletions(-) delete mode 100644 core/src/blocktree/kvs.rs delete mode 100644 kvstore/.gitignore delete mode 100644 kvstore/Cargo.toml delete mode 100644 kvstore/benches/basic.rs delete mode 100644 kvstore/src/compactor.rs delete mode 100644 kvstore/src/error.rs delete mode 100644 kvstore/src/io_utils.rs delete mode 100644 kvstore/src/lib.rs delete mode 100644 kvstore/src/mapper.rs delete mode 100644 kvstore/src/mapper/disk.rs delete mode 100644 kvstore/src/mapper/memory.rs delete mode 100644 kvstore/src/readtx.rs delete mode 100644 kvstore/src/sstable.rs delete mode 100644 kvstore/src/storage.rs delete mode 100644 kvstore/src/writebatch.rs delete mode 100644 kvstore/src/writelog.rs delete mode 100644 kvstore/tests/basic.rs diff --git a/Cargo.lock b/Cargo.lock index 1446b59067..3a5ff362ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3278,7 +3278,6 @@ dependencies = [ "solana-client 0.20.0", "solana-drone 0.20.0", "solana-ed25519-dalek 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "solana-kvstore 0.20.0", "solana-logger 0.20.0", "solana-measure 0.20.0", "solana-merkle-tree 0.20.0", @@ -3499,21 +3498,6 @@ dependencies = [ "tiny-bip39 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "solana-kvstore" -version = "0.20.0" -dependencies = [ - "bincode 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", - "crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)", - "memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)", - "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "solana-ledger-tool" version = "0.20.0" diff --git a/Cargo.toml b/Cargo.toml index c31a73f156..1e5b36f53e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ default-members = [ "gossip", "install", "keygen", - "kvstore", "ledger-tool", "local_cluster", "logger", @@ -76,7 +75,6 @@ members = [ "gossip", "install", "keygen", - "kvstore", "ledger-tool", "local_cluster", "logger", diff --git a/core/Cargo.toml b/core/Cargo.toml index e919af5fb6..64bdea1c9c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,7 +15,6 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git [features] cuda = [] -kvstore = ["solana-kvstore"] pin_gpu_memory = [] [dependencies] @@ -54,7 +53,6 @@ solana-chacha-sys = { path = "../chacha-sys", version = "0.20.0" } solana-client = { path = "../client", version = "0.20.0" } solana-drone = { path = "../drone", version = "0.20.0" } solana-ed25519-dalek = "0.2.0" -solana-kvstore = { path = "../kvstore", version = "0.20.0", optional = true } solana-logger = { path = "../logger", version = "0.20.0" } solana-merkle-tree = { path = "../merkle-tree", version = "0.20.0" } solana-metrics = { path = "../metrics", version = "0.20.0" } diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 8d198664bc..e0dc72d7d1 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -6,14 +6,10 @@ use crate::erasure::ErasureConfig; use crate::result::{Error, Result}; use crate::shred::{Shred, Shredder}; -#[cfg(feature = "kvstore")] -use solana_kvstore as kvstore; - use bincode::deserialize; use std::collections::HashMap; -#[cfg(not(feature = "kvstore"))] use rocksdb; use solana_metrics::{datapoint_error, datapoint_info}; @@ -61,10 +57,7 @@ macro_rules! db_imports { }; } -#[cfg(not(feature = "kvstore"))] db_imports! {rocks, Rocks, "rocksdb"} -#[cfg(feature = "kvstore")] -db_imports! {kvs, Kvs, "kvstore"} pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; @@ -76,8 +69,6 @@ pub enum BlocktreeError { ShredForIndexExists, InvalidShredData(Box), RocksDb(rocksdb::Error), - #[cfg(feature = "kvstore")] - KvsDb(kvstore::Error), SlotNotRooted, } diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs deleted file mode 100644 index 407ece2a44..0000000000 --- a/core/src/blocktree/kvs.rs +++ /dev/null @@ -1,283 +0,0 @@ -use crate::blocktree::db::columns as cf; -use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn}; -use crate::blocktree::BlocktreeError; -use crate::result::{Error, Result}; -use byteorder::{BigEndian, ByteOrder}; -use solana_kvstore::{self as kvstore, Key, KvStore}; -use std::path::Path; - -type ColumnFamily = u64; - -#[derive(Debug)] -pub struct Kvs(KvStore); - -/// Dummy struct for now -#[derive(Debug, Clone, Copy)] -pub struct Dummy; - -impl Backend for Kvs { - type Key = Key; - type OwnedKey = Key; - type ColumnFamily = ColumnFamily; - type Cursor = Dummy; - type Iter = Dummy; - type WriteBatch = Dummy; - type Error = kvstore::Error; - - fn open(_path: &Path) -> Result { - unimplemented!() - } - - fn columns(&self) -> Vec<&'static str> { - unimplemented!() - } - - fn destroy(_path: &Path) -> Result<()> { - unimplemented!() - } - - fn cf_handle(&self, _cf: &str) -> ColumnFamily { - unimplemented!() - } - - fn get_cf(&self, _cf: ColumnFamily, _key: &Key) -> Result>> { - unimplemented!() - } - - fn put_cf(&self, _cf: ColumnFamily, _key: &Key, _value: &[u8]) -> Result<()> { - unimplemented!() - } - - fn delete_cf(&self, _cf: ColumnFamily, _key: &Key) -> Result<()> { - unimplemented!() - } - - fn iterator_cf(&self, _cf: ColumnFamily) -> Result { - unimplemented!() - } - - fn raw_iterator_cf(&self, _cf: ColumnFamily) -> Result { - unimplemented!() - } - - fn batch(&self) -> Result { - unimplemented!() - } - - fn write(&self, _batch: Dummy) -> Result<()> { - unimplemented!() - } -} - -impl Column for cf::Coding { - const NAME: &'static str = super::ERASURE_CF; - type Index = (u64, u64); - - fn key(index: (u64, u64)) -> Key { - cf::Data::key(index) - } - - fn index(key: &Key) -> (u64, u64) { - cf::Data::index(key) - } -} - -impl Column for cf::Data { - const NAME: &'static str = super::DATA_CF; - type Index = (u64, u64); - - fn key((slot, index): (u64, u64)) -> Key { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[8..16], slot); - BigEndian::write_u64(&mut key.0[16..], index); - key - } - - fn index(key: &Key) -> (u64, u64) { - let slot = BigEndian::read_u64(&key.0[8..16]); - let index = BigEndian::read_u64(&key.0[16..]); - (slot, index) - } -} - -impl Column for cf::Index { - const NAME: &'static str = super::INDEX_CF; - type Index = u64; - - fn key(slot: u64) -> Key { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[8..16], slot); - key - } - - fn index(key: &Key) -> u64 { - BigEndian::read_u64(&key.0[8..16]) - } -} - -impl TypedColumn for cf::Index { - type Type = crate::blocktree::meta::Index; -} - -impl Column for cf::DeadSlots { - const NAME: &'static str = super::DEAD_SLOTS; - type Index = u64; - - fn key(slot: u64) -> Key { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[8..16], slot); - key - } - - fn index(key: &Key) -> u64 { - BigEndian::read_u64(&key.0[8..16]) - } -} - -impl TypedColumn for cf::Root { - type Type = bool; -} - -impl Column for cf::Orphans { - const NAME: &'static str = super::ORPHANS_CF; - type Index = u64; - - fn key(slot: u64) -> Key { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[8..16], slot); - key - } - - fn index(key: &Key) -> u64 { - BigEndian::read_u64(&key.0[8..16]) - } -} - -impl TypedColumn for cf::Orphans { - type Type = bool; -} - -impl Column for cf::Root { - const NAME: &'static str = super::ROOT_CF; - type Index = u64; - - fn key(slot: u64) -> Key { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[8..16], slot); - key - } - - fn index(key: &Key) -> u64 { - BigEndian::read_u64(&key.0[8..16]) - } -} - -impl TypedColumn for cf::Root { - type Type = bool; -} - -impl Column for cf::SlotMeta { - const NAME: &'static str = super::META_CF; - type Index = u64; - - fn key(slot: u64) -> Key { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[8..16], slot); - key - } - - fn index(key: &Key) -> u64 { - BigEndian::read_u64(&key.0[8..16]) - } -} - -impl Column for cf::SlotMeta { - const NAME: &'static str = super::META_CF; - type Index = u64; - - fn key(slot: u64) -> Key { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[8..16], slot); - key - } - - fn index(key: &Key) -> u64 { - BigEndian::read_u64(&key.0[8..16]) - } -} - -impl TypedColumn for cf::SlotMeta { - type Type = super::SlotMeta; -} - -impl Column for cf::ErasureMeta { - const NAME: &'static str = super::ERASURE_META_CF; - type Index = (u64, u64); - - fn key((slot, set_index): (u64, u64)) -> Key { - let mut key = Key::default(); - BigEndian::write_u64(&mut key.0[8..16], slot); - BigEndian::write_u64(&mut key.0[16..], set_index); - key - } - - fn index(key: &Key) -> (u64, u64) { - let slot = BigEndian::read_u64(&key.0[8..16]); - let set_index = BigEndian::read_u64(&key.0[16..]); - (slot, set_index) - } -} - -impl TypedColumn for cf::ErasureMeta { - type Type = super::ErasureMeta; -} - -impl DbCursor for Dummy { - fn valid(&self) -> bool { - unimplemented!() - } - - fn seek(&mut self, _key: &Key) { - unimplemented!() - } - - fn seek_to_first(&mut self) { - unimplemented!() - } - - fn next(&mut self) { - unimplemented!() - } - - fn key(&self) -> Option { - unimplemented!() - } - - fn value(&self) -> Option> { - unimplemented!() - } -} - -impl IWriteBatch for Dummy { - fn put_cf(&mut self, _cf: ColumnFamily, _key: &Key, _value: &[u8]) -> Result<()> { - unimplemented!() - } - - fn delete_cf(&mut self, _cf: ColumnFamily, _key: &Key) -> Result<()> { - unimplemented!() - } -} - -impl Iterator for Dummy { - type Item = (Box, Box<[u8]>); - - fn next(&mut self) -> Option { - unimplemented!() - } -} - -impl std::convert::From for Error { - fn from(e: kvstore::Error) -> Error { - Error::BlocktreeError(BlocktreeError::KvsDb(e)) - } -} diff --git a/kvstore/.gitignore b/kvstore/.gitignore deleted file mode 100644 index 5404b132db..0000000000 --- a/kvstore/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/target/ -/farf/ diff --git a/kvstore/Cargo.toml b/kvstore/Cargo.toml deleted file mode 100644 index 4b401e6837..0000000000 --- a/kvstore/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "solana-kvstore" -description = "Embedded Key-Value store for solana" -version = "0.20.0" -homepage = "https://solana.com/" -repository = "https://github.com/solana-labs/solana" -authors = ["Solana Maintainers "] -license = "Apache-2.0" -edition = "2018" - -[dependencies] -bincode = "1.1.4" -byteorder = "1.3.2" -chrono = "0.4.9" -crc = "1.8.1" -memmap = "0.7.0" -rand = "0.6.5" -serde = "1.0.101" -serde_derive = "1.0.101" - -[dev-dependencies] -tempfile = "3.1.0" diff --git a/kvstore/benches/basic.rs b/kvstore/benches/basic.rs deleted file mode 100644 index c1c74f1c55..0000000000 --- a/kvstore/benches/basic.rs +++ /dev/null @@ -1,170 +0,0 @@ -#![feature(test)] -extern crate test; - -use std::fs; -use std::path::{Path, PathBuf}; - -use rand::{self, Rng}; - -use test::Bencher; - -use solana_kvstore::{test::gen, Config, Key, KvStore}; - -const SMALL_SIZE: usize = 512; -const LARGE_SIZE: usize = 32 * 1024; -const HUGE_SIZE: usize = 64 * 1024; - -fn bench_write(bench: &mut Bencher, rows: &[(Key, Vec)], ledger_path: &str) { - let store = KvStore::open_default(&ledger_path).unwrap(); - - bench.iter(move || { - store.put_many(rows.iter()).expect("Failed to insert rows"); - }); - - teardown(&ledger_path); -} - -fn bench_write_partitioned(bench: &mut Bencher, rows: &[(Key, Vec)], ledger_path: &str) { - let path = Path::new(ledger_path); - let storage_dirs = (0..4) - .map(|i| path.join(format!("parition-{}", i))) - .collect::>(); - - let store = KvStore::partitioned(&ledger_path, &storage_dirs, Config::default()).unwrap(); - - bench.iter(move || { - store.put_many(rows.iter()).expect("Failed to insert rows"); - }); - - teardown(&ledger_path); -} - -#[bench] -#[ignore] -fn bench_write_small(bench: &mut Bencher) { - let ledger_path = setup("bench_write_small"); - let num_entries = 32 * 1024; - let rows = gen::pairs(SMALL_SIZE).take(num_entries).collect::>(); - bench_write(bench, &rows, &ledger_path.to_string_lossy()); -} - -#[bench] -#[ignore] -fn bench_write_small_partitioned(bench: &mut Bencher) { - let ledger_path = setup("bench_write_small_partitioned"); - let num_entries = 32 * 1024; - let rows = gen::pairs(SMALL_SIZE).take(num_entries).collect::>(); - bench_write_partitioned(bench, &rows, &ledger_path.to_string_lossy()); -} - -#[bench] -#[ignore] -fn bench_write_large(bench: &mut Bencher) { - let ledger_path = setup("bench_write_large"); - let num_entries = 32 * 1024; - let rows = gen::pairs(LARGE_SIZE).take(num_entries).collect::>(); - bench_write(bench, &rows, &ledger_path.to_string_lossy()); -} - -#[bench] -#[ignore] -fn bench_write_huge(bench: &mut Bencher) { - let ledger_path = setup("bench_write_huge"); - let num_entries = 32 * 1024; - let rows = gen::pairs(HUGE_SIZE).take(num_entries).collect::>(); - bench_write(bench, &rows, &ledger_path.to_string_lossy()); -} - -#[bench] -#[ignore] -fn bench_read_sequential(bench: &mut Bencher) { - let ledger_path = setup("bench_read_sequential"); - let store = KvStore::open_default(&ledger_path).unwrap(); - - // Insert some big and small blobs into the ledger - let num_small_blobs = 32 * 1024; - let num_large_blobs = 32 * 1024; - let total_blobs = num_small_blobs + num_large_blobs; - - let small = gen::data(SMALL_SIZE).take(num_small_blobs); - let large = gen::data(LARGE_SIZE).take(num_large_blobs); - let rows = gen_seq_keys().zip(small.chain(large)); - - let _ = store.put_many(rows); - - let num_reads = total_blobs / 15; - let mut rng = rand::thread_rng(); - - bench.iter(move || { - // Generate random starting point in the range [0, total_blobs - 1], read num_reads blobs sequentially - let start_index = rng.gen_range(0, num_small_blobs + num_large_blobs); - for i in start_index..start_index + num_reads { - let i = i as u64; - let k = Key::from((i, i, i)); - let _ = store.get(&k); - } - }); - - teardown(&ledger_path); -} - -#[bench] -#[ignore] -fn bench_read_random(bench: &mut Bencher) { - let ledger_path = setup("bench_read_sequential"); - let store = KvStore::open_default(&ledger_path).unwrap(); - - // Insert some big and small blobs into the ledger - let num_small_blobs = 32 * 1024; - let num_large_blobs = 32 * 1024; - let total_blobs = num_small_blobs + num_large_blobs; - - let small = gen::data(SMALL_SIZE).take(num_small_blobs); - let large = gen::data(LARGE_SIZE).take(num_large_blobs); - let rows = gen_seq_keys().zip(small.chain(large)); - - let _ = store.put_many(rows); - - let num_reads = total_blobs / 15; - let mut rng = rand::thread_rng(); - - // Generate a num_reads sized random sample of indexes in range [0, total_blobs - 1], - // simulating random reads - let indexes: Vec = (0..num_reads) - .map(|_| rng.gen_range(0, total_blobs as u64)) - .collect(); - - bench.iter(move || { - for &i in indexes.iter() { - let i = i as u64; - let k = Key::from((i, i, i)); - let _ = store.get(&k); - } - }); - - teardown(&ledger_path); -} - -fn setup(test_name: &str) -> PathBuf { - let dir = Path::new("kvstore-bench").join(test_name); - - let _ig = fs::remove_dir_all(&dir); - fs::create_dir_all(&dir).unwrap(); - - dir -} - -fn gen_seq_keys() -> impl Iterator { - let mut n = 0; - - std::iter::repeat_with(move || { - let key = Key::from((n, n, n)); - n += 1; - - key - }) -} - -fn teardown>(p: P) { - KvStore::destroy(p).expect("Expect successful store destruction"); -} diff --git a/kvstore/src/compactor.rs b/kvstore/src/compactor.rs deleted file mode 100644 index 8c1124337b..0000000000 --- a/kvstore/src/compactor.rs +++ /dev/null @@ -1,223 +0,0 @@ -use crate::error::{Error, Result}; -use crate::mapper::{Kind, Mapper}; -use crate::sstable::{Key, Merged, SSTable}; - -use std::collections::BTreeMap; -use std::path::PathBuf; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::Arc; -use std::thread::{self, JoinHandle}; - -type TableVec = Vec>; -type TableSlice<'a> = &'a [BTreeMap]; - -#[derive(Debug, Copy, Clone)] -pub struct Config { - pub max_pages: usize, - pub page_size: usize, -} - -#[derive(Debug)] -pub enum Req { - Start(PathBuf), - Gc, -} - -#[derive(Debug)] -pub enum Resp { - Done(TableVec), - Failed(Error), -} - -pub fn spawn_compactor( - mapper: Arc, - config: Config, -) -> Result<(Sender, Receiver, JoinHandle<()>)> { - let (req_tx, req_rx) = channel(); - let (resp_tx, resp_rx) = channel(); - - let handle = thread::spawn(move || { - let _ignored = run_loop(mapper, config, req_rx, resp_tx); - }); - - Ok((req_tx, resp_rx, handle)) -} - -fn run_loop( - mapper: Arc, - config: Config, - req_rx: Receiver, - resp_tx: Sender, -) -> Result<()> { - while let Ok(msg) = req_rx.recv() { - match msg { - Req::Start(_) => { - let new_tables_res = run_compaction(&*mapper, &config); - - match new_tables_res { - Ok(new_tables) => { - resp_tx.send(Resp::Done(new_tables))?; - } - Err(e) => { - resp_tx.send(Resp::Failed(e))?; - } - } - } - Req::Gc => { - let _ = mapper.empty_trash(); - } - } - } - - Ok(()) -} - -fn run_compaction(mapper: &dyn Mapper, config: &Config) -> Result { - let mut tables = load_tables(mapper)?; - - compact_level_0(mapper, &mut tables, config)?; - - for level in 1..tables.len() { - while level_needs_compact(level as u8, config, &tables) { - compact_upper_level(mapper, &mut tables, config, level as u8)?; - } - } - - // move old tables to garbage - mapper.rotate_tables()?; - - Ok(tables) -} - -fn compact_level_0(mapper: &dyn Mapper, tables: &mut TableVec, config: &Config) -> Result<()> { - assert!(!tables.is_empty()); - - if tables.len() == 1 { - tables.push(BTreeMap::new()); - } - - let mut new_tables = BTreeMap::new(); - { - let sources = tables - .iter() - .take(2) - .map(BTreeMap::values) - .flatten() - .map(|sst| sst.range(&(Key::ALL_INCLUSIVE))) - .collect::>>()?; - - let mut iter = Merged::new(sources).peekable(); - while iter.peek().is_some() { - let sst = mapper.make_table(Kind::Compaction, &mut |mut data_wtr, mut index_wtr| { - SSTable::create_capped( - &mut iter, - 1, - config.page_size as u64, - &mut data_wtr, - &mut index_wtr, - ); - })?; - - new_tables.insert(sst.meta().start, sst); - } - } - - tables[0].clear(); - tables[1].clear(); - - tables[1].append(&mut new_tables); - - Ok(()) -} - -fn compact_upper_level( - mapper: &dyn Mapper, - pages: &mut TableVec, - config: &Config, - level: u8, -) -> Result<()> { - assert!(1 <= level && (level as usize) < pages.len()); - assert!(!pages[level as usize].is_empty()); - - let next_level = level + 1; - let level = level as usize; - - if next_level as usize == pages.len() { - pages.push(BTreeMap::new()); - } - - let (&key, chosen_sst) = pages[level].iter().next_back().unwrap(); - let (start, end) = { - let meta = chosen_sst.meta(); - (meta.start, meta.end) - }; - - let mut page_keys = Vec::new(); - let mut merge_with = Vec::new(); - - for (key, sst) in pages[next_level as usize].iter() { - if sst.is_overlap(&(start..=end)) { - page_keys.push(*key); - merge_with.push(sst); - } - } - - let mut new_tables = BTreeMap::new(); - { - let sources = merge_with - .into_iter() - .chain(std::iter::once(chosen_sst)) - .map(|sst| sst.range(&(Key::ALL_INCLUSIVE))) - .collect::>>()?; - - let mut iter = Merged::new(sources).peekable(); - - while iter.peek().is_some() { - let sst = mapper.make_table(Kind::Compaction, &mut |mut data_wtr, mut index_wtr| { - SSTable::create_capped( - &mut iter, - next_level, - config.page_size as u64, - &mut data_wtr, - &mut index_wtr, - ); - })?; - - new_tables.insert(sst.meta().start, sst); - } - } - - // delete merged page and merged pages in next level - pages[level].remove(&key).unwrap(); - - for start_key in page_keys { - pages[next_level as usize].remove(&start_key).unwrap(); - } - - pages[next_level as usize].append(&mut new_tables); - - Ok(()) -} - -fn load_tables(mapper: &dyn Mapper) -> Result { - Ok(SSTable::sorted_tables(&mapper.active_set()?)) -} - -#[inline] -fn level_max(level: u8, config: &Config) -> usize { - match level { - 0 => config.max_pages, - x => 10usize.pow(u32::from(x)), - } -} - -#[inline] -fn level_needs_compact(level: u8, config: &Config, tables: TableSlice) -> bool { - if level as usize >= tables.len() { - return false; - } - - let max = level_max(level, config); - - tables[level as usize].len() > max -} diff --git a/kvstore/src/error.rs b/kvstore/src/error.rs deleted file mode 100644 index 702eec2afc..0000000000 --- a/kvstore/src/error.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::error::Error as StdErr; -use std::fmt; -use std::io; -use std::result::Result as StdRes; -use std::sync::mpsc::{RecvError, SendError, TryRecvError}; - -pub type Result = StdRes; - -#[derive(Debug)] -pub enum Error { - Io(io::Error), - Corrupted(bincode::Error), - Channel(Box), - Missing, - WriteBatchFull(usize), -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Error::Corrupted(_) => write!(f, "Serialization error: Store may be corrupted"), - Error::Channel(e) => write!(f, "Internal communication error: {}", e), - Error::Io(e) => write!(f, "I/O error: {}", e), - Error::Missing => write!(f, "Item not present in ledger"), - Error::WriteBatchFull(capacity) => write!(f, "WriteBatch capacity {} full", capacity), - } - } -} - -impl StdErr for Error { - fn source(&self) -> Option<&(dyn StdErr + 'static)> { - match self { - Error::Io(e) => Some(e), - Error::Corrupted(ref e) => Some(e), - Error::Channel(e) => Some(e.as_ref()), - Error::Missing => None, - Error::WriteBatchFull(_) => None, - } - } -} - -impl From for Error { - fn from(e: io::Error) -> Self { - Error::Io(e) - } -} - -impl From> for Error { - fn from(e: io::IntoInnerError) -> Self { - Error::Io(e.into()) - } -} - -impl From for Error { - fn from(e: bincode::Error) -> Self { - Error::Corrupted(e) - } -} - -impl From> for Error -where - T: Send + Sync + 'static, -{ - fn from(e: SendError) -> Self { - Error::Channel(Box::new(e)) - } -} - -impl From for Error { - fn from(e: RecvError) -> Self { - Error::Channel(Box::new(e)) - } -} - -impl From for Error { - fn from(e: TryRecvError) -> Self { - Error::Channel(Box::new(e)) - } -} diff --git a/kvstore/src/io_utils.rs b/kvstore/src/io_utils.rs deleted file mode 100644 index 5082925c8b..0000000000 --- a/kvstore/src/io_utils.rs +++ /dev/null @@ -1,437 +0,0 @@ -use byteorder::{BigEndian, ByteOrder}; -use crc::crc32; -use memmap::Mmap; -use std::cmp; -use std::fs::File; -use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write}; -use std::ops::Deref; -use std::sync::{Arc, RwLock}; - -const BACKING_ERR: &str = "In-memory table lock poisoned; concurrency error"; - -#[derive(Debug)] -pub enum MemMap { - Disk(Mmap), - Mem(Arc>>), -} - -#[derive(Debug)] -pub enum Writer { - Disk(BufWriter), - Mem(SharedWriter), -} - -#[derive(Debug)] -pub struct SharedWriter { - buf: Arc>>, - pos: u64, -} - -#[derive(Debug)] -pub struct CRCWriter { - writer: W, - buffer: Vec, - position: usize, - capacity: usize, -} - -#[derive(Debug)] -pub struct CRCReader { - reader: R, - buffer: Vec, - position: usize, - chunk_size: usize, -} - -/// Helper trait to make zeroing buffers easier -pub trait Fill { - fn fill(&mut self, v: T); -} - -impl SharedWriter { - pub fn new(buf: Arc>>) -> SharedWriter { - SharedWriter { buf, pos: 0 } - } -} - -impl CRCWriter { - #[allow(dead_code)] - pub fn new(inner: W, chunk_size: usize) -> CRCWriter { - if chunk_size <= 8 { - panic!("chunk_size must be > 8"); - } - - CRCWriter { - writer: inner, - buffer: vec![0; chunk_size], - position: 0, - capacity: chunk_size - 8, - } - } - - #[allow(dead_code)] - pub fn into_inner(mut self) -> io::Result { - self.flush()?; - Ok(self.writer) - } - - #[allow(dead_code)] - pub fn get_ref(&self) -> &W { - &self.writer - } - - #[allow(dead_code)] - pub fn get_mut(&mut self) -> &mut W { - &mut self.writer - } -} - -impl CRCReader { - #[allow(dead_code)] - pub fn new(inner: R, chunk_size: usize) -> CRCReader { - if chunk_size <= 8 { - panic!("chunk_size must be > 8"); - } - - CRCReader { - reader: inner, - buffer: vec![0; chunk_size - 8], - position: chunk_size, - chunk_size, - } - } - - #[allow(dead_code)] - pub fn into_inner(self) -> R { - self.reader - } - - fn load_block(&mut self) -> io::Result<()> { - self.buffer.clear(); - self.position = 0; - - let mut block_buffer = vec![0; self.chunk_size]; - let mut block_position = 0; - - while block_position < self.chunk_size { - let bytes_read = self.reader.read(&mut block_buffer[block_position..])?; - if bytes_read == 0 { - break; - } - block_position += bytes_read - } - - if block_position < self.chunk_size { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - - assert_eq!(block_position, self.chunk_size); - - let stored_digest = BigEndian::read_u32(&block_buffer[0..4]); - let payload_len = BigEndian::read_u32(&block_buffer[4..8]) as usize; - if payload_len + 8 > block_buffer.len() { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "CRCReader: invalid block size", - )); - } - let payload = &block_buffer[8..8 + payload_len]; - let computed_digest = crc32::checksum_ieee(&block_buffer[4..8 + payload_len]); - - if computed_digest != stored_digest { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "CRCReader: CRC validation failed", - )); - } - - self.buffer.extend_from_slice(payload); - - Ok(()) - } -} - -impl Fill for [T] -where - T: Clone, -{ - fn fill(&mut self, v: T) { - for i in self { - *i = v.clone() - } - } -} - -impl Deref for MemMap { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - match self { - MemMap::Disk(mmap) => mmap.deref(), - MemMap::Mem(vec) => { - let buf = vec.read().expect(BACKING_ERR); - let slice = buf.as_slice(); - - // transmute lifetime. Relying on the RwLock + immutability for safety - unsafe { std::mem::transmute(slice) } - } - } - } -} - -impl Write for CRCWriter -where - W: Write, -{ - fn write(&mut self, buffer: &[u8]) -> io::Result { - let mut written = 0; - - while written < buffer.len() { - let batch_len = (&mut self.buffer[8 + self.position..]).write(&buffer[written..])?; - - self.position += batch_len; - written += batch_len; - - if self.position >= self.capacity { - self.flush()?; - } - } - - Ok(written) - } - - fn flush(&mut self) -> io::Result<()> { - BigEndian::write_u32(&mut self.buffer[4..8], self.position as u32); - let total_len = self.position + 8; - - // crc over length + payload - let digest = crc32::checksum_ieee(&self.buffer[4..total_len]); - - BigEndian::write_u32(&mut self.buffer[0..4], digest); - self.writer.write_all(&self.buffer)?; - - self.position = 0; - Ok(()) - } -} - -impl Read for CRCReader -where - R: Read, -{ - fn read(&mut self, buffer: &mut [u8]) -> io::Result { - let mut write_position = 0; - - while write_position < buffer.len() { - if self.position >= self.buffer.len() { - self.load_block()?; - } - - let bytes_available = self.buffer.len() - self.position; - let space_remaining = buffer.len() - write_position; - let copy_len = cmp::min(bytes_available, space_remaining); - - (&mut buffer[write_position..write_position + copy_len]) - .copy_from_slice(&self.buffer[self.position..self.position + copy_len]); - - write_position += copy_len; - self.position += copy_len; - } - - Ok(write_position) - } -} - -impl Write for SharedWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - let mut vec = self.buf.write().expect(BACKING_ERR); - - // Calc ranges - let space_remaining = vec.len() - self.pos as usize; - let copy_len = cmp::min(buf.len(), space_remaining); - let copy_src_range = 0..copy_len; - let append_src_range = copy_len..buf.len(); - let copy_dest_range = self.pos as usize..(self.pos as usize + copy_len); - - // Copy then append - (&mut vec[copy_dest_range]).copy_from_slice(&buf[copy_src_range]); - vec.extend_from_slice(&buf[append_src_range]); - - let written = buf.len(); - - self.pos += written as u64; - - Ok(written) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } - - fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - let _written = self.write(buf)?; - Ok(()) - } -} - -impl Seek for SharedWriter { - fn seek(&mut self, to: SeekFrom) -> io::Result { - self.pos = match to { - SeekFrom::Start(new_pos) => new_pos, - SeekFrom::Current(diff) => (self.pos as i64 + diff) as u64, - SeekFrom::End(rpos) => (self.buf.read().expect(BACKING_ERR).len() as i64 + rpos) as u64, - }; - - Ok(self.pos) - } -} - -impl Write for Writer { - fn write(&mut self, buf: &[u8]) -> io::Result { - match self { - Writer::Disk(ref mut wtr) => wtr.write(buf), - Writer::Mem(ref mut wtr) => wtr.write(buf), - } - } - - fn flush(&mut self) -> io::Result<()> { - match self { - Writer::Disk(ref mut wtr) => { - wtr.flush()?; - wtr.get_mut().sync_data()?; - Ok(()) - } - Writer::Mem(ref mut wtr) => wtr.flush(), - } - } - - fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - match self { - Writer::Disk(ref mut wtr) => wtr.write_all(buf), - Writer::Mem(ref mut wtr) => wtr.write_all(buf), - } - } -} - -impl Seek for Writer { - fn seek(&mut self, pos: SeekFrom) -> io::Result { - match self { - Writer::Disk(ref mut wtr) => wtr.seek(pos), - Writer::Mem(ref mut wtr) => wtr.seek(pos), - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_crc_write() { - let block_sizes = &[256, 512, 1024, 2048]; - let byte_counts = &[8, 128, 1024, 1024 * 8]; - - for &block_size in block_sizes { - for &n_bytes in byte_counts { - let bytes: Vec<_> = (0..n_bytes).map(|x| (x % 255) as u8).collect(); - let buffer = Vec::new(); - - let mut writer = CRCWriter::new(buffer, block_size); - writer.write_all(&bytes).unwrap(); - - let buffer = writer.into_inner().unwrap(); - - let space_per_block = block_size - 8; - let n_full_blocks = n_bytes / space_per_block; - let blocks_expected = n_full_blocks + (n_bytes % space_per_block != 0) as usize; - let expected_len = blocks_expected * block_size; - - assert_eq!(buffer.len(), expected_len); - assert_eq!(&buffer[8..16], &[0, 1, 2, 3, 4, 5, 6, 7]); - } - } - } - - #[test] - fn test_crc_io() { - const BLK_SIZE: usize = 1024; - let bytes: Vec<_> = (0..512 * 1024).map(|x| (x % 255) as u8).collect(); - let buffer = Vec::new(); - - let mut writer = CRCWriter::new(buffer, BLK_SIZE); - writer.write_all(&bytes).unwrap(); - - let buffer = writer.into_inner().unwrap(); - assert_eq!(&buffer[8..16], &[0, 1, 2, 3, 4, 5, 6, 7]); - - let mut reader = CRCReader::new(&buffer[..], BLK_SIZE); - - let mut retrieved = Vec::with_capacity(512 * 1024); - let read_buffer = &mut [0; 1024]; - while let Ok(amt) = reader.read(read_buffer) { - if amt == 0 { - break; - } - retrieved.extend_from_slice(&read_buffer[..amt]); - } - - assert_eq!(&retrieved[..8], &[0, 1, 2, 3, 4, 5, 6, 7]); - - assert_eq!(bytes.len(), retrieved.len()); - assert_eq!(bytes, retrieved); - } - - #[test] - fn test_crc_validation() { - const BLK_SIZE: usize = 1024; - let n_bytes = 512 * 1024; - let bytes: Vec<_> = (0..n_bytes).map(|x| (x % 255) as u8).collect(); - let buffer = Vec::new(); - - let mut writer = CRCWriter::new(buffer, BLK_SIZE); - writer.write_all(&bytes).unwrap(); - - let mut buffer = writer.into_inner().unwrap(); - buffer[BLK_SIZE / 2] += 1; - - let mut reader = CRCReader::new(&buffer[..], BLK_SIZE); - - let mut retrieved = vec![]; - let res = reader.read_to_end(&mut retrieved); - assert_eq!(res.unwrap_err().kind(), io::ErrorKind::InvalidData); - } - - #[test] - fn test_crc_size_mismatch() { - const BLK_SIZE: usize = 1024; - let n_bytes = 512 * 1024; - let bytes: Vec<_> = (0..n_bytes).map(|x| (x % 255) as u8).collect(); - let buffer = Vec::new(); - - let mut writer = CRCWriter::new(buffer, BLK_SIZE); - writer.write_all(&bytes).unwrap(); - - let mut buffer = writer.into_inner().unwrap(); - buffer.drain((n_bytes - 512)..n_bytes); - - for &size_diff in &[100, 1, 25, BLK_SIZE - 9] { - let mut reader = CRCReader::new(&buffer[..], BLK_SIZE - size_diff); - - let mut retrieved = vec![]; - let res = reader.read_to_end(&mut retrieved); - assert_eq!(res.unwrap_err().kind(), io::ErrorKind::InvalidData); - } - } - - #[should_panic] - #[test] - fn test_crc_writer_invalid_chunk_size() { - let _ = CRCWriter::new(Vec::new(), 8); - } - - #[should_panic] - #[test] - fn test_crc_reader_invalid_chunk_size() { - let _ = CRCReader::new(io::empty(), 8); - } -} diff --git a/kvstore/src/lib.rs b/kvstore/src/lib.rs deleted file mode 100644 index 3bb35743c7..0000000000 --- a/kvstore/src/lib.rs +++ /dev/null @@ -1,407 +0,0 @@ -use crate::mapper::{Disk, Mapper, Memory}; -use crate::sstable::SSTable; -use crate::storage::MemTable; -use crate::writelog::WriteLog; -use std::collections::BTreeMap; -use std::fs; -use std::io; -use std::ops::RangeInclusive; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::{Receiver, Sender}; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread::JoinHandle; - -mod compactor; -mod error; -mod io_utils; -mod mapper; -mod readtx; -mod sstable; -mod storage; -mod writebatch; -mod writelog; - -#[macro_use] -extern crate serde_derive; - -pub use self::error::{Error, Result}; -pub use self::readtx::ReadTx as Snapshot; -pub use self::sstable::Key; -pub use self::writebatch::{Config as WriteBatchConfig, WriteBatch}; -pub use self::writelog::Config as LogConfig; - -const TABLES_FILE: &str = "tables.meta"; -const LOG_FILE: &str = "mem-log"; -const DEFAULT_TABLE_SIZE: usize = 64 * 1024 * 1024; -const DEFAULT_MEM_SIZE: usize = 64 * 1024 * 1024; -const DEFAULT_MAX_PAGES: usize = 10; -const COMMIT_ORDERING: Ordering = Ordering::Relaxed; - -#[derive(Debug, PartialEq, Copy, Clone)] -pub struct Config { - pub max_mem: usize, - pub max_tables: usize, - pub page_size: usize, - pub in_memory: bool, - pub log_config: LogConfig, -} - -#[derive(Debug)] -pub struct KvStore { - config: Config, - root: PathBuf, - commit: AtomicUsize, - mem: RwLock, - log: Arc>, - tables: RwLock>>, - mapper: Arc, - sender: Mutex>, - receiver: Mutex>, - compactor_handle: JoinHandle<()>, -} - -impl KvStore { - pub fn open_default

(root: P) -> Result - where - P: AsRef, - { - let mapper = Disk::single(root.as_ref()); - open(root.as_ref(), Arc::new(mapper), Config::default()) - } - - pub fn open

(root: P, config: Config) -> Result - where - P: AsRef, - { - let mapper: Arc = if config.in_memory { - Arc::new(Memory::new()) - } else { - Arc::new(Disk::single(root.as_ref())) - }; - open(root.as_ref(), mapper, config) - } - - pub fn partitioned(root: P, storage_dirs: &[P2], config: Config) -> Result - where - P: AsRef, - P2: AsRef, - { - let mapper = Disk::new(storage_dirs); - open(root.as_ref(), Arc::new(mapper), config) - } - - pub fn config(&self) -> &Config { - &self.config - } - - pub fn put(&self, key: &Key, data: &[u8]) -> Result<()> { - let mut memtable = self.mem.write().unwrap(); - let mut log = self.log.write().unwrap(); - let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - - log.log_put(key, commit, data).unwrap(); - memtable.put(key, commit, data); - - self.ensure_memtable(&mut *memtable, &mut *log)?; - - Ok(()) - } - - pub fn put_many(&self, rows: Iter) -> Result<()> - where - Iter: Iterator, - Tup: std::borrow::Borrow<(K, V)>, - K: std::borrow::Borrow, - V: std::borrow::Borrow<[u8]>, - { - let mut memtable = self.mem.write().unwrap(); - let mut log = self.log.write().unwrap(); - let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - - for pair in rows { - let (ref k, ref d) = pair.borrow(); - let (key, data) = (k.borrow(), d.borrow()); - - log.log_put(key, commit, data).unwrap(); - memtable.put(key, commit, data); - } - - self.ensure_memtable(&mut *memtable, &mut *log)?; - - Ok(()) - } - - pub fn get(&self, key: &Key) -> Result>> { - self.query_compactor()?; - - let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap()); - - storage::get(&memtable.values, &*tables, key) - } - - pub fn delete(&self, key: &Key) -> Result<()> { - let mut memtable = self.mem.write().unwrap(); - let mut log = self.log.write().unwrap(); - let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - - log.log_delete(key, commit).unwrap(); - memtable.delete(key, commit); - - self.ensure_memtable(&mut *memtable, &mut *log)?; - - Ok(()) - } - - pub fn delete_many(&self, rows: Iter) -> Result<()> - where - Iter: Iterator, - K: std::borrow::Borrow, - { - let mut memtable = self.mem.write().unwrap(); - let mut log = self.log.write().unwrap(); - let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - - for k in rows { - let key = k.borrow(); - log.log_delete(key, commit).unwrap(); - memtable.delete(key, commit); - } - - self.ensure_memtable(&mut *memtable, &mut *log)?; - - Ok(()) - } - - pub fn batch(&self, config: WriteBatchConfig) -> WriteBatch { - let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - - WriteBatch { - config, - commit, - memtable: MemTable::new(BTreeMap::new()), - log: Arc::clone(&self.log), - } - } - - pub fn commit(&self, mut batch: WriteBatch) -> Result<()> { - let mut memtable = self.mem.write().unwrap(); - let mut log = self.log.write().unwrap(); - - memtable.values.append(&mut batch.memtable.values); - self.ensure_memtable(&mut *memtable, &mut *log)?; - - Ok(()) - } - - pub fn snapshot(&self) -> Snapshot { - let (memtable, tables) = ( - self.mem.read().unwrap().values.clone(), - self.tables.read().unwrap().clone(), - ); - - Snapshot::new(memtable, tables) - } - - pub fn range( - &self, - range: RangeInclusive, - ) -> Result)>> { - self.query_compactor()?; - - let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap()); - - storage::range(&memtable.values, &*tables, range) - } - - pub fn destroy

(path: P) -> Result<()> - where - P: AsRef, - { - let path = path.as_ref(); - if !path.exists() { - return Ok(()); - } - - fs::remove_dir_all(path)?; - Ok(()) - } - - fn query_compactor(&self) -> Result<()> { - if let (Ok(mut sender), Ok(mut receiver), Ok(mut tables)) = ( - self.sender.try_lock(), - self.receiver.try_lock(), - self.tables.try_write(), - ) { - query_compactor( - &self.root, - &*self.mapper, - &mut *tables, - &mut *receiver, - &mut *sender, - )?; - } - - Ok(()) - } - - fn ensure_memtable(&self, mem: &mut MemTable, log: &mut WriteLog) -> Result<()> { - if mem.mem_size < self.config.max_mem { - return Ok(()); - } - - let mut tables = self.tables.write().unwrap(); - - storage::flush_table(&mem.values, &*self.mapper, &mut *tables)?; - mem.values.clear(); - mem.mem_size = 0; - log.reset().expect("Write-log rotation failed"); - - if is_lvl0_full(&tables, &self.config) { - let sender = self.sender.lock().unwrap(); - - sender.send(compactor::Req::Start(PathBuf::new()))?; - } - - Ok(()) - } -} - -impl Default for Config { - fn default() -> Config { - Config { - max_mem: DEFAULT_MEM_SIZE, - max_tables: DEFAULT_MAX_PAGES, - page_size: DEFAULT_TABLE_SIZE, - in_memory: false, - log_config: LogConfig::default(), - } - } -} - -fn open(root: &Path, mapper: Arc, config: Config) -> Result { - let root = root.to_path_buf(); - let log_path = root.join(LOG_FILE); - let restore_log = log_path.exists(); - - if !root.exists() { - fs::create_dir(&root)?; - } - - let commit = chrono::Utc::now().timestamp(); - let mut log = WriteLog::open(&log_path, config.log_config)?; - let values = if restore_log && !config.in_memory { - log.materialize()? - } else { - BTreeMap::new() - }; - let mem = MemTable::new(values); - - let tables = load_tables(&root, &*mapper)?; - - let cfg = compactor::Config { - max_pages: config.max_tables, - page_size: config.page_size, - }; - let (sender, receiver, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - Ok(KvStore { - config, - root, - commit: AtomicUsize::new(commit as usize), - mem: RwLock::new(mem), - log: Arc::new(RwLock::new(log)), - tables: RwLock::new(tables), - mapper, - sender: Mutex::new(sender), - receiver: Mutex::new(receiver), - compactor_handle, - }) -} - -fn load_tables(root: &Path, mapper: &dyn Mapper) -> Result>> { - let mut tables = Vec::new(); - let meta_path = root.join(TABLES_FILE); - - if meta_path.exists() { - mapper.load_state_from(&meta_path)?; - tables = SSTable::sorted_tables(&mapper.active_set()?); - } - - Ok(tables) -} - -fn dump_tables(root: &Path, mapper: &dyn Mapper) -> Result<()> { - mapper.serialize_state_to(&root.join(TABLES_FILE))?; - Ok(()) -} - -fn query_compactor( - root: &Path, - mapper: &dyn Mapper, - tables: &mut Vec>, - receiver: &mut Receiver, - sender: &mut Sender, -) -> Result<()> { - match receiver.try_recv() { - Ok(compactor::Resp::Done(new_tables)) => { - std::mem::replace(tables, new_tables); - dump_tables(root, mapper)?; - sender.send(compactor::Req::Gc).unwrap(); - } - Ok(compactor::Resp::Failed(e)) => { - return Err(e); - } - // Nothing available, do nothing - _ => {} - } - - Ok(()) -} - -#[inline] -fn is_lvl0_full(tables: &[BTreeMap], config: &Config) -> bool { - if tables.is_empty() { - false - } else { - tables[0].len() > config.max_tables - } -} - -pub mod test { - pub mod gen { - use crate::Key; - use rand::distributions::Uniform; - use rand::{rngs::SmallRng, FromEntropy, Rng}; - use std::iter; - use std::ops::Range; - - pub fn keys() -> impl Iterator { - let mut rng = SmallRng::from_entropy(); - iter::repeat_with(move || Key(rng.gen())) - } - - pub fn data(size: usize) -> impl Iterator> { - iter::repeat(vec![0; size]) - } - - pub fn data_vary(range: Range) -> impl Iterator> { - let dist = Uniform::from(range); - let mut rng = SmallRng::from_entropy(); - - iter::repeat_with(move || { - let size: u64 = rng.sample(dist); - vec![0; size as usize] - }) - } - - pub fn pairs(size: usize) -> impl Iterator)> { - keys().zip(data(size)) - } - - pub fn pairs_vary(range: Range) -> impl Iterator)> { - keys().zip(data_vary(range)) - } - } -} diff --git a/kvstore/src/mapper.rs b/kvstore/src/mapper.rs deleted file mode 100644 index 7fa4b3e20d..0000000000 --- a/kvstore/src/mapper.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::io_utils::Writer; -use crate::sstable::SSTable; -use crate::Result; - -use std::path::Path; -use std::sync::RwLock; - -mod disk; -mod memory; - -pub use self::disk::Disk; -pub use self::memory::Memory; - -pub trait Mapper: std::fmt::Debug + Send + Sync { - fn make_table(&self, kind: Kind, func: &mut dyn FnMut(Writer, Writer)) -> Result; - fn rotate_tables(&self) -> Result<()>; - fn empty_trash(&self) -> Result<()>; - fn active_set(&self) -> Result>; - fn serialize_state_to(&self, path: &Path) -> Result<()>; - fn load_state_from(&self, path: &Path) -> Result<()>; -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)] -pub enum Kind { - Active, - Compaction, - Garbage, -} - -pub trait RwLockExt { - fn read_as U>(&self, f: F) -> U; - fn write_as U>(&self, f: F) -> U; - fn try_read_as U>(&self, f: F) -> U; - fn try_write_as U>(&self, f: F) -> U; -} - -impl RwLockExt for RwLock { - fn read_as U>(&self, f: F) -> U { - f(&*self.read().unwrap()) - } - fn write_as U>(&self, f: F) -> U { - f(&mut *self.write().unwrap()) - } - fn try_read_as U>(&self, f: F) -> U { - f(&*self.try_read().unwrap()) - } - fn try_write_as U>(&self, f: F) -> U { - f(&mut *self.try_write().unwrap()) - } -} diff --git a/kvstore/src/mapper/disk.rs b/kvstore/src/mapper/disk.rs deleted file mode 100644 index 0fc48a12de..0000000000 --- a/kvstore/src/mapper/disk.rs +++ /dev/null @@ -1,336 +0,0 @@ -use crate::io_utils::{MemMap, Writer}; -use crate::mapper::{Kind, Mapper, RwLockExt}; -use crate::sstable::SSTable; -use crate::Result; - -use memmap::Mmap; - -use rand::{rngs::SmallRng, seq::SliceRandom, FromEntropy, Rng}; - -use std::collections::HashMap; -use std::fs::{self, File, OpenOptions}; -use std::io::{self, BufReader, BufWriter}; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, RwLock}; - -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] -struct Id { - id: u32, - kind: Kind, -} - -#[derive(Debug)] -pub struct Disk { - rng: RwLock, - mappings: RwLock>, - storage_dirs: RwLock>, -} - -impl Disk { - pub fn single(dir: &Path) -> Self { - Disk::new(&[dir]) - } - - pub fn new>(storage_dirs: &[P]) -> Self { - if storage_dirs.is_empty() { - panic!("Disk Mapper requires at least one storage director"); - } - - let storage_dirs = storage_dirs - .iter() - .map(AsRef::as_ref) - .map(Path::to_path_buf) - .collect(); - - Disk { - storage_dirs: RwLock::new(storage_dirs), - mappings: RwLock::new(HashMap::new()), - rng: RwLock::new(SmallRng::from_entropy()), - } - } -} - -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] -pub struct PathInfo { - pub data: PathBuf, - pub index: PathBuf, -} - -impl Disk { - #[inline] - fn choose_storage(&self) -> PathBuf { - let mut rng = rand::thread_rng(); - let path = self - .storage_dirs - .read_as(|storage| storage.choose(&mut rng).unwrap().to_path_buf()); - if !path.exists() { - fs::create_dir_all(&path).expect("couldn't create table storage directory"); - } - - path - } - - #[inline] - fn add_mapping(&self, tref: Id, paths: PathInfo) { - let mut map = self.mappings.write().unwrap(); - map.insert(tref, paths); - } -} - -impl Mapper for Disk { - fn make_table(&self, kind: Kind, func: &mut dyn FnMut(Writer, Writer)) -> Result { - let storage = self.choose_storage(); - - let id = next_id(kind); - let paths = mk_paths(id, &storage); - let (data, index) = mk_writers(&paths)?; - - func(data, index); - - self.add_mapping(id, paths.clone()); - - let (data, index) = mk_maps(&paths)?; - let sst = SSTable::from_parts(Arc::new(data), Arc::new(index))?; - Ok(sst) - } - - fn rotate_tables(&self) -> Result<()> { - let mut map = self.mappings.write().unwrap(); - let mut new_map = HashMap::new(); - - for (tref, paths) in map.drain() { - let new_kind = match tref.kind { - Kind::Active => Kind::Garbage, - Kind::Compaction => Kind::Active, - k => k, - }; - let new_ref = next_id(new_kind); - new_map.insert(new_ref, paths); - } - *map = new_map; - - Ok(()) - } - - fn empty_trash(&self) -> Result<()> { - self.mappings.write_as(|map| { - let to_rm = map - .keys() - .filter(|tref| tref.kind == Kind::Garbage) - .cloned() - .collect::>(); - - for tref in to_rm { - let paths = map.remove(&tref).unwrap(); - fs::remove_file(&paths.index)?; - fs::remove_file(&paths.data)?; - } - - Ok(()) - }) - } - - fn active_set(&self) -> Result> { - let map = self.mappings.read().unwrap(); - let active = map.iter().filter(|(tref, _)| tref.kind == Kind::Active); - let mut vec = Vec::new(); - - for (_, paths) in active { - let (data, index): (MemMap, MemMap) = mk_maps(paths)?; - let sst = SSTable::from_parts(Arc::new(data), Arc::new(index))?; - - vec.push(sst); - } - Ok(vec) - } - - fn serialize_state_to(&self, path: &Path) -> Result<()> { - let file = OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(path)?; - let wtr = BufWriter::new(file); - - self.mappings.read_as(|mappings| { - self.storage_dirs - .read_as(|storage| bincode::serialize_into(wtr, &(storage, mappings))) - })?; - - Ok(()) - } - - fn load_state_from(&self, path: &Path) -> Result<()> { - let rdr = BufReader::new(File::open(path)?); - let (new_storage, new_mappings) = bincode::deserialize_from(rdr)?; - - self.storage_dirs.write_as(|storage| { - self.mappings.write_as(|mappings| { - *storage = new_storage; - *mappings = new_mappings; - }) - }); - - Ok(()) - } -} - -fn mk_writers(paths: &PathInfo) -> io::Result<(Writer, Writer)> { - let mut opts = OpenOptions::new(); - opts.create(true).append(true); - - let data = BufWriter::new(opts.open(&paths.data)?); - let index = BufWriter::new(opts.open(&paths.index)?); - - Ok((Writer::Disk(data), Writer::Disk(index))) -} - -fn mk_maps(paths: &PathInfo) -> io::Result<(MemMap, MemMap)> { - let (data_file, index_file) = (File::open(&paths.data)?, File::open(&paths.index)?); - let (data, index) = unsafe { (Mmap::map(&data_file)?, Mmap::map(&index_file)?) }; - Ok((MemMap::Disk(data), MemMap::Disk(index))) -} - -fn mk_paths(tref: Id, dir: &Path) -> PathInfo { - let (data_name, index_name) = mk_filenames(tref.id); - PathInfo { - data: dir.join(data_name), - index: dir.join(index_name), - } -} - -#[inline] -fn mk_filenames(n: u32) -> (String, String) { - let data = format!("{}.sstable", n,); - let index = format!("{}.index", n,); - (data, index) -} - -#[inline] -fn next_id(kind: Kind) -> Id { - Id { - id: rand::thread_rng().gen(), - kind, - } -} - -#[cfg(test)] -mod test { - use super::*; - use crate::mapper::Kind; - use crate::sstable::{Key, Value}; - use crate::test::gen; - use std::collections::BTreeMap; - use std::sync::Arc; - use std::thread; - use tempfile::tempdir; - - const DATA_SIZE: usize = 128; - - #[test] - fn test_table_management() { - let tempdir = tempdir().unwrap(); - let mapper = Arc::new(Disk::single(tempdir.path())); - let records: BTreeMap<_, _> = gen_records().take(1024).collect(); - - let mut threads = vec![]; - let mut number_of_tables = 4; - - for kind in [Kind::Active, Kind::Garbage, Kind::Compaction].iter() { - let records = records.clone(); - let mapper = Arc::clone(&mapper); - - let child = thread::spawn(move || { - for _ in 0..number_of_tables { - mapper - .make_table(*kind, &mut |mut data_writer, mut index_writer| { - SSTable::create( - &mut records.iter(), - 0, - &mut data_writer, - &mut index_writer, - ); - }) - .unwrap(); - } - }); - - number_of_tables *= 2; - threads.push(child); - } - - threads.into_iter().for_each(|child| child.join().unwrap()); - let count_kind = |kind, mapper: &Disk| { - mapper - .mappings - .read() - .unwrap() - .keys() - .filter(|id| id.kind == kind) - .count() - }; - assert_eq!(count_kind(Kind::Active, &mapper), 4); - assert_eq!(count_kind(Kind::Garbage, &mapper), 8); - assert_eq!(count_kind(Kind::Compaction, &mapper), 16); - - mapper.empty_trash().unwrap(); - assert_eq!(count_kind(Kind::Garbage, &mapper), 0); - - mapper.rotate_tables().unwrap(); - assert_eq!(count_kind(Kind::Active, &mapper), 16); - assert_eq!(count_kind(Kind::Garbage, &mapper), 4); - assert_eq!(count_kind(Kind::Compaction, &mapper), 0); - - let active_set = mapper.active_set().unwrap(); - assert_eq!(active_set.len(), 16); - } - - #[test] - fn test_state() { - let tempdir = tempdir().unwrap(); - let dirs_1: Vec<_> = (0..4).map(|i| tempdir.path().join(i.to_string())).collect(); - let dirs_2: Vec<_> = (4..8).map(|i| tempdir.path().join(i.to_string())).collect(); - - let mapper_1 = Arc::new(Disk::new(&dirs_1)); - let records: BTreeMap<_, _> = gen_records().take(1024).collect(); - - for (i, &kind) in [Kind::Active, Kind::Compaction, Kind::Garbage] - .iter() - .enumerate() - { - for _ in 0..(i * 3) { - mapper_1 - .make_table(kind, &mut |mut data_writer, mut index_writer| { - SSTable::create( - &mut records.iter(), - 0, - &mut data_writer, - &mut index_writer, - ); - }) - .unwrap(); - } - } - - let state_path = tempdir.path().join("state"); - mapper_1.serialize_state_to(&state_path).unwrap(); - assert!(state_path.exists()); - - let mapper_2 = Arc::new(Disk::new(&dirs_2)); - mapper_2.load_state_from(&state_path).unwrap(); - - assert_eq!( - &*mapper_1.mappings.read().unwrap(), - &*mapper_2.mappings.read().unwrap() - ); - assert_eq!( - &*mapper_1.storage_dirs.read().unwrap(), - &*mapper_2.storage_dirs.read().unwrap() - ); - } - - fn gen_records() -> impl Iterator { - gen::pairs(DATA_SIZE).map(|(key, data)| (key, Value::new(0, Some(data)))) - } - -} diff --git a/kvstore/src/mapper/memory.rs b/kvstore/src/mapper/memory.rs deleted file mode 100644 index 9a90658af6..0000000000 --- a/kvstore/src/mapper/memory.rs +++ /dev/null @@ -1,226 +0,0 @@ -use crate::io_utils::{MemMap, SharedWriter, Writer}; -use crate::mapper::{Kind, Mapper, RwLockExt}; -use crate::sstable::SSTable; -use crate::Result; - -use rand::{rngs::SmallRng, FromEntropy, Rng}; - -use std::collections::HashMap; -use std::path::Path; -use std::sync::{Arc, RwLock}; - -type Id = u32; -type TableMap = HashMap>>, Arc>>)>; -type Backing = Arc>; - -const BACKING_ERR_MSG: &str = "In-memory table lock poisoned; concurrency error"; - -#[derive(Debug)] -pub struct Memory { - tables: Backing, - compaction: Backing, - garbage: Backing, - meta: Arc>>, - rng: RwLock, -} - -impl Memory { - pub fn new() -> Self { - fn init_backing() -> Backing { - Arc::new(RwLock::new(HashMap::new())) - } - Memory { - tables: init_backing(), - compaction: init_backing(), - garbage: init_backing(), - meta: Arc::new(RwLock::new(vec![])), - rng: RwLock::new(SmallRng::from_entropy()), - } - } -} - -impl Memory { - #[inline] - fn get_backing(&self, kind: Kind) -> &Backing { - match kind { - Kind::Active => &self.tables, - Kind::Compaction => &self.compaction, - Kind::Garbage => &self.garbage, - } - } -} - -impl Mapper for Memory { - fn make_table(&self, kind: Kind, func: &mut dyn FnMut(Writer, Writer)) -> Result { - let backing = self.get_backing(kind); - let id = next_id(); - - let (data, index) = backing.write_as(|tables| get_memory_writers_for(id, tables))?; - func(data, index); - - backing.read_as(|map| get_table(id, map)) - } - - fn rotate_tables(&self) -> Result<()> { - let (mut active, mut compaction, mut garbage) = ( - self.tables.write().expect(BACKING_ERR_MSG), - self.compaction.write().expect(BACKING_ERR_MSG), - self.garbage.write().expect(BACKING_ERR_MSG), - ); - - // old active set => garbage - garbage.extend(active.drain()); - // compacted tables => new active set - active.extend(compaction.drain()); - - Ok(()) - } - - fn empty_trash(&self) -> Result<()> { - self.garbage.write().expect(BACKING_ERR_MSG).clear(); - - Ok(()) - } - - fn active_set(&self) -> Result> { - let active = self.tables.read().expect(BACKING_ERR_MSG); - - let mut tables = Vec::with_capacity(active.len()); - for tref in active.keys() { - let sst = get_table(*tref, &*active)?; - tables.push(sst); - } - - Ok(tables) - } - - fn serialize_state_to(&self, _: &Path) -> Result<()> { - Ok(()) - } - - fn load_state_from(&self, _: &Path) -> Result<()> { - Ok(()) - } -} - -fn get_memory_writers_for(id: Id, backing: &mut TableMap) -> Result<(Writer, Writer)> { - let data_buf = Arc::new(RwLock::new(vec![])); - let index_buf = Arc::new(RwLock::new(vec![])); - - backing.insert(id, (Arc::clone(&data_buf), Arc::clone(&index_buf))); - - let data_wtr = SharedWriter::new(data_buf); - let index_wtr = SharedWriter::new(index_buf); - - let data = Writer::Mem(data_wtr); - let index = Writer::Mem(index_wtr); - - Ok((data, index)) -} - -fn get_memmaps(id: Id, map: &TableMap) -> Result<(MemMap, MemMap)> { - let entry = map - .get(&id) - .expect("Map should always be present, given a Id that's not destroyed"); - - let data = MemMap::Mem(Arc::clone(&entry.0)); - let index = MemMap::Mem(Arc::clone(&entry.1)); - - Ok((data, index)) -} - -fn get_table(id: Id, map: &TableMap) -> Result { - let (data, index) = get_memmaps(id, map)?; - let sst = SSTable::from_parts(Arc::new(data), Arc::new(index))?; - - Ok(sst) -} - -#[inline] -fn next_id() -> Id { - rand::thread_rng().gen() -} - -#[cfg(test)] -mod test { - use super::*; - use crate::mapper::Kind; - use crate::sstable::{Key, Value}; - use crate::test::gen; - use std::collections::BTreeMap; - use std::sync::Arc; - use std::thread; - - const DATA_SIZE: usize = 128; - - #[test] - fn test_table_management() { - let mapper = Arc::new(Memory::new()); - let records: BTreeMap<_, _> = gen_records().take(1024).collect(); - - let mut threads = vec![]; - let mut number_of_tables = 4; - - for kind in [Kind::Active, Kind::Garbage, Kind::Compaction].iter() { - let records = records.clone(); - let mapper = Arc::clone(&mapper); - - let child = thread::spawn(move || { - for _ in 0..number_of_tables { - mapper - .make_table(*kind, &mut |mut data_writer, mut index_writer| { - SSTable::create( - &mut records.iter(), - 0, - &mut data_writer, - &mut index_writer, - ); - }) - .unwrap(); - } - }); - - number_of_tables *= 2; - threads.push(child); - } - - threads.into_iter().for_each(|child| child.join().unwrap()); - assert_eq!(mapper.tables.read().unwrap().len(), 4); - assert_eq!(mapper.garbage.read().unwrap().len(), 8); - assert_eq!(mapper.compaction.read().unwrap().len(), 16); - - mapper.empty_trash().unwrap(); - assert_eq!(mapper.garbage.read().unwrap().len(), 0); - - mapper.rotate_tables().unwrap(); - assert_eq!(mapper.tables.read().unwrap().len(), 16); - assert_eq!(mapper.garbage.read().unwrap().len(), 4); - assert!(mapper.compaction.read().unwrap().is_empty()); - - let active_set = mapper.active_set().unwrap(); - assert_eq!(active_set.len(), 16); - } - - #[test] - fn test_no_state() { - let tempdir = tempfile::tempdir().unwrap(); - let mapper = Arc::new(Memory::new()); - let records: BTreeMap<_, _> = gen_records().take(1024).collect(); - - mapper - .make_table(Kind::Active, &mut |mut data_writer, mut index_writer| { - SSTable::create(&mut records.iter(), 0, &mut data_writer, &mut index_writer); - }) - .unwrap(); - - let state_path = tempdir.path().join("state"); - mapper.serialize_state_to(&state_path).unwrap(); - mapper.load_state_from(&state_path).unwrap(); - assert!(!state_path.exists()); - } - - fn gen_records() -> impl Iterator { - gen::pairs(DATA_SIZE).map(|(key, data)| (key, Value::new(0, Some(data)))) - } - -} diff --git a/kvstore/src/readtx.rs b/kvstore/src/readtx.rs deleted file mode 100644 index 0a0b1d0580..0000000000 --- a/kvstore/src/readtx.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::error::Result; -use crate::sstable::{Key, SSTable, Value}; -use crate::storage; - -use std::collections::BTreeMap; -use std::ops::RangeInclusive; -use std::sync::Arc; - -#[derive(Debug)] -pub struct ReadTx { - mem: Arc>, - tables: Arc<[BTreeMap]>, -} - -impl ReadTx { - pub fn new(mem: BTreeMap, tables: Vec>) -> ReadTx { - ReadTx { - mem: Arc::new(mem), - tables: Arc::from(tables.into_boxed_slice()), - } - } - - pub fn get(&self, key: &Key) -> Result>> { - storage::get(&self.mem, &*self.tables, key) - } - - pub fn range( - &self, - range: RangeInclusive, - ) -> Result)>> { - storage::range(&self.mem, &*self.tables, range) - } -} diff --git a/kvstore/src/sstable.rs b/kvstore/src/sstable.rs deleted file mode 100644 index 07bc39ae2c..0000000000 --- a/kvstore/src/sstable.rs +++ /dev/null @@ -1,564 +0,0 @@ -use crate::error::Result; -use crate::io_utils::{Fill, MemMap}; - -use byteorder::{BigEndian, ByteOrder}; - -use std::borrow::Borrow; -use std::collections::BTreeMap; -use std::fmt; -use std::io::prelude::*; -use std::mem; -use std::ops::RangeInclusive; -use std::sync::Arc; -use std::u64; - -const INDEX_META_SIZE: usize = mem::size_of::(); -const KEY_LEN: usize = mem::size_of::(); -const INDEX_ENTRY_SIZE: usize = mem::size_of::(); -const INDEX_RECORD_SIZE: usize = KEY_LEN + INDEX_ENTRY_SIZE; - -#[derive(Clone, Debug)] -pub struct SSTable { - data: Arc, - index: Arc, - meta: IndexMeta, -} - -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct IndexMeta { - pub level: u8, - pub data_size: u64, - pub start: Key, - pub end: Key, -} - -#[derive( - Debug, Default, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Hash, Serialize, Deserialize, -)] -pub struct Key(pub [u8; 24]); - -#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)] -pub struct IndexEntry { - pub timestamp: i64, - pub offset: u64, - pub size: u64, -} - -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct Value { - pub ts: i64, - pub val: Option>, -} - -/// An iterator that produces logical view over a set of SSTables. -/// It implements [direct k-way merge](https://en.wikipedia.org/wiki/K-way_merge_algorithm#Heap) -/// and reconciles out-of-date/deleted values in a lazy fashion. Inputs *MUST* be sorted -pub struct Merged { - sources: Vec, - heads: BTreeMap<(Key, usize), Value>, -} - -impl SSTable { - pub fn meta(&self) -> &IndexMeta { - &self.meta - } - - #[allow(dead_code)] - pub fn num_keys(&self) -> u64 { - ((self.index.len() - INDEX_META_SIZE) / INDEX_ENTRY_SIZE) as u64 - } - - pub fn get(&self, key: &Key) -> Result> { - let range = *key..=*key; - let found_opt = self.range(&range)?.find(|(k, _)| k == key).map(|(_, v)| v); - Ok(found_opt) - } - - pub fn range(&self, range: &RangeInclusive) -> Result> { - Ok(Scan::new( - range.clone(), - Arc::clone(&self.data), - Arc::clone(&self.index), - )) - } - - pub fn create_capped( - rows: &mut I, - level: u8, - max_table_size: u64, - data_wtr: &mut dyn Write, - index_wtr: &mut dyn Write, - ) where - I: Iterator, - K: Borrow, - V: Borrow, - { - const DATA_ERR: &str = "Error writing table data"; - const INDEX_ERR: &str = "Error writing index data"; - - let (data_size, index) = - flush_mem_table_capped(rows, data_wtr, max_table_size).expect(DATA_ERR); - - data_wtr.flush().expect(DATA_ERR); - - let (&start, &end) = ( - index.keys().next().unwrap(), - index.keys().next_back().unwrap(), - ); - - let meta = IndexMeta { - start, - end, - level, - data_size, - }; - - flush_index(&index, &meta, index_wtr).expect(INDEX_ERR); - index_wtr.flush().expect(INDEX_ERR); - } - - pub fn create( - rows: &mut I, - level: u8, - data_wtr: &mut dyn Write, - index_wtr: &mut dyn Write, - ) where - I: Iterator, - K: Borrow, - V: Borrow, - { - SSTable::create_capped(rows, level, u64::MAX, data_wtr, index_wtr); - } - - pub fn from_parts(data: Arc, index: Arc) -> Result { - let len = index.len() as usize; - - assert!(len > INDEX_META_SIZE); - assert_eq!((len - INDEX_META_SIZE) % INDEX_RECORD_SIZE, 0); - - let meta = bincode::deserialize_from(&index[..INDEX_META_SIZE])?; - - Ok(SSTable { data, index, meta }) - } - - pub fn could_contain(&self, key: &Key) -> bool { - self.meta.start <= *key && *key <= self.meta.end - } - - pub fn is_overlap(&self, range: &RangeInclusive) -> bool { - let r = self.meta.start..=self.meta.end; - overlapping(&r, range) - } - - pub fn sorted_tables(tables: &[SSTable]) -> Vec> { - let mut sorted = Vec::new(); - - for sst in tables { - let (key, level) = { - let meta = sst.meta(); - (meta.start, meta.level) - }; - - while level as usize >= sorted.len() { - sorted.push(BTreeMap::new()); - } - sorted[level as usize].insert(key, sst.clone()); - } - - sorted - } -} - -impl Key { - pub const MIN: Key = Key([0u8; KEY_LEN]); - pub const MAX: Key = Key([255u8; KEY_LEN]); - pub const ALL_INCLUSIVE: RangeInclusive = RangeInclusive::new(Key::MIN, Key::MAX); - - pub fn write(&self, wtr: &mut W) -> Result<()> { - wtr.write_all(&self.0)?; - Ok(()) - } - - pub fn read(bytes: &[u8]) -> Key { - let mut key = Key::default(); - key.0.copy_from_slice(bytes); - key - } -} - -impl Value { - pub fn new(commit: i64, data: Option>) -> Value { - Value { - ts: commit, - val: data, - } - } -} - -struct Scan { - bounds: RangeInclusive, - data: Arc, - index: Arc, - index_pos: usize, -} - -impl Scan { - fn new(bounds: RangeInclusive, data: Arc, index: Arc) -> Self { - Scan { - bounds, - data, - index, - index_pos: INDEX_META_SIZE as usize, - } - } - - fn step(&mut self) -> Result> { - while self.index_pos < self.index.len() { - let pos = self.index_pos as usize; - let end = pos + INDEX_RECORD_SIZE; - - let (key, entry): (Key, IndexEntry) = bincode::deserialize_from(&self.index[pos..end])?; - self.index_pos = end; - - if key < *self.bounds.start() { - continue; - } - - if *self.bounds.end() < key { - self.index_pos = std::usize::MAX; - return Ok(None); - } - - let record_range = entry.offset as usize..(entry.offset + entry.size) as usize; - let (data_key, value) = bincode::deserialize_from(&self.data[record_range])?; - assert_eq!(data_key, key); - - return Ok(Some((data_key, value))); - } - - Ok(None) - } -} - -impl fmt::Display for Key { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let k0 = BigEndian::read_u64(&self.0[..8]); - let k1 = BigEndian::read_u64(&self.0[8..16]); - let k2 = BigEndian::read_u64(&self.0[16..]); - write!(f, "Key({}, {}, {})", k0, k1, k2) - } -} - -impl From<(u64, u64, u64)> for Key { - fn from((k0, k1, k2): (u64, u64, u64)) -> Self { - let mut buf = [0u8; KEY_LEN]; - - BigEndian::write_u64(&mut buf[..8], k0); - BigEndian::write_u64(&mut buf[8..16], k1); - BigEndian::write_u64(&mut buf[16..], k2); - - Key(buf) - } -} - -impl Merged -where - I: Iterator, -{ - pub fn new(mut sources: Vec) -> Self { - let mut heads = BTreeMap::new(); - - for (source_idx, source) in sources.iter_mut().enumerate() { - if let Some((k, v)) = source.next() { - heads.insert((k, source_idx), v); - } - } - - Merged { sources, heads } - } -} - -impl Iterator for Merged -where - I: Iterator, -{ - type Item = (Key, Value); - - fn next(&mut self) -> Option { - while !self.heads.is_empty() { - // get new key - let (key, source_idx) = *self.heads.keys().next().unwrap(); - let mut val = self.heads.remove(&(key, source_idx)).unwrap(); - - // replace - if let Some((k, v)) = self.sources[source_idx].next() { - self.heads.insert((k, source_idx), v); - } - - // check for other versions of this record - while !self.heads.is_empty() { - let (next_key, source_idx) = *self.heads.keys().next().unwrap(); - - // Found a different version of the record - if key == next_key { - // pop this version, check if it's newer - let other_version = self.heads.remove(&(next_key, source_idx)).unwrap(); - if other_version.ts > val.ts { - val = other_version; - } - - // replace - if let Some((k, v)) = self.sources[source_idx].next() { - self.heads.insert((k, source_idx), v); - } - } else { - break; - } - } - - // Don't produce deleted records - if val.val.is_some() { - return Some((key, val)); - } - } - - None - } -} - -impl Iterator for Scan { - type Item = (Key, Value); - - fn next(&mut self) -> Option { - if self.index_pos as usize >= self.index.len() { - return None; - } - - match self.step() { - Ok(opt) => opt, - Err(_) => { - self.index_pos = std::usize::MAX; - None - } - } - } -} - -fn flush_index( - index: &BTreeMap, - meta: &IndexMeta, - writer: &mut dyn Write, -) -> Result<()> { - let mut entry_buffer = [0u8; INDEX_RECORD_SIZE]; - let mut meta_buffer = [0u8; INDEX_META_SIZE]; - - bincode::serialize_into(&mut meta_buffer[..], meta)?; - writer.write_all(&meta_buffer)?; - - for (key, entry) in index.iter() { - let rec = (key, entry); - entry_buffer.fill(0); - - bincode::serialize_into(&mut entry_buffer[..], &rec)?; - writer.write_all(&entry_buffer)?; - } - - Ok(()) -} - -fn flush_mem_table_capped( - rows: &mut I, - mut wtr: &mut dyn Write, - max_table_size: u64, -) -> Result<(u64, BTreeMap)> -where - I: Iterator, - K: Borrow, - V: Borrow, -{ - let mut index = BTreeMap::new(); - let mut size = 0; - let bincode_config = bincode::config(); - - for (key, val) in rows { - let record = (key.borrow(), val.borrow()); - - let serialized_size = bincode_config.serialized_size(&record)?; - bincode::serialize_into(&mut wtr, &record)?; - - let entry = IndexEntry { - timestamp: record.1.ts, - offset: size, - size: serialized_size, - }; - - size += serialized_size; - - index.insert(*record.0, entry); - - if size >= max_table_size { - break; - } - } - - Ok((size, index)) -} - -#[inline] -fn overlapping(r1: &RangeInclusive, r2: &RangeInclusive) -> bool { - r1.start() <= r2.end() && r2.start() <= r1.end() -} - -#[cfg(test)] -pub mod test { - use super::*; - use crate::test::gen; - use std::sync::{Arc, RwLock}; - - #[test] - fn test_dump_data() { - let mut data_buffer = vec![]; - let records: BTreeMap<_, _> = gen_records().take(512).collect(); - - let (_, index) = - flush_mem_table_capped(&mut records.iter(), &mut data_buffer, u64::MAX).unwrap(); - - assert_eq!(index.len(), records.len()); - assert!(index.keys().eq(records.keys())); - - let mut retrieved = BTreeMap::new(); - - for (key, entry) in index.iter() { - let range = entry.offset as usize..(entry.offset + entry.size) as usize; - let (data_key, value) = bincode::deserialize_from(&data_buffer[range]).unwrap(); - assert_eq!(&data_key, key); - retrieved.insert(data_key, value); - } - - assert_eq!(records, retrieved); - } - - #[test] - fn test_dump_indexes() { - let mut data_buffer = vec![]; - let mut index_buffer = vec![]; - let records: BTreeMap<_, _> = gen_records().take(512).collect(); - - let (data_size, index) = - flush_mem_table_capped(&mut records.iter(), &mut data_buffer, u64::MAX).unwrap(); - - let (&start, &end) = ( - index.keys().next().unwrap(), - index.keys().next_back().unwrap(), - ); - - let meta = IndexMeta { - start, - end, - data_size, - level: 0, - }; - - flush_index(&index, &meta, &mut index_buffer).unwrap(); - - let retrieved_meta = bincode::deserialize_from(&index_buffer[..INDEX_META_SIZE]).unwrap(); - assert_eq!(meta, retrieved_meta); - - // By iterating over the BTreeMap we also check the order of index entries as written - for (i, (key, entry)) in index.iter().enumerate() { - let start = i * INDEX_RECORD_SIZE + INDEX_META_SIZE; - let end = start + INDEX_RECORD_SIZE; - - let (retrieved_key, retrieved_entry) = - bincode::deserialize_from(&index_buffer[start..end]).unwrap(); - - assert_eq!(key, &retrieved_key); - assert_eq!(entry, &retrieved_entry); - } - } - - #[test] - fn test_sstable_scan() { - let mut data_buffer = vec![]; - let mut index_buffer = vec![]; - let records: BTreeMap<_, _> = gen_records().take(512).collect(); - - SSTable::create(&mut records.iter(), 0, &mut data_buffer, &mut index_buffer); - - let data = MemMap::Mem(Arc::new(RwLock::new(data_buffer))); - let index = MemMap::Mem(Arc::new(RwLock::new(index_buffer))); - - let sst = SSTable::from_parts(Arc::new(data), Arc::new(index)).unwrap(); - - let output_iter = Scan::new( - Key::ALL_INCLUSIVE, - Arc::clone(&sst.data), - Arc::clone(&sst.index), - ); - - assert!(output_iter.eq(records.into_iter())); - } - - #[test] - fn test_merge_2way() { - let records: BTreeMap<_, _> = gen_records().take(512).collect(); - let updates: BTreeMap<_, _> = records - .iter() - .map(|(k, v)| (*k, Value::new(v.ts + 1, Some(vec![])))) - .collect(); - let deletes: BTreeMap<_, _> = records - .iter() - .map(|(k, v)| (*k, Value::new(v.ts + 1, None))) - .collect(); - - let owned = |(k, v): (&Key, &Value)| (*k, v.clone()); - - let sources = vec![records.iter().map(owned), updates.iter().map(owned)]; - let merged: Vec<_> = Merged::new(sources).collect(); - assert!(merged.into_iter().eq(updates.into_iter())); - - let sources = vec![records.into_iter(), deletes.into_iter()]; - let merged: Vec<_> = Merged::new(sources).collect(); - assert_eq!(merged.len(), 0); - } - - #[test] - fn test_merge_4way() { - // delete last half, then update first half, then delete last half of first half - let start: BTreeMap<_, _> = gen_records().take(512).collect(); - let deletes: BTreeMap<_, _> = start - .iter() - .skip(256) - .map(|(k, v)| (*k, Value::new(v.ts + 1, None))) - .collect(); - let updates: BTreeMap<_, _> = start - .iter() - .take(256) - .map(|(k, v)| (*k, Value::new(v.ts + 2, Some(vec![])))) - .collect(); - let more_deletes: BTreeMap<_, _> = updates - .iter() - .skip(128) - .map(|(k, v)| (*k, Value::new(v.ts + 3, None))) - .collect(); - - let sources = vec![ - more_deletes.into_iter(), - updates.clone().into_iter(), - start.into_iter(), - deletes.into_iter(), - ]; - - let merged: Vec<_> = Merged::new(sources).collect(); - let expected: Vec<_> = updates.into_iter().take(128).collect(); - - assert_eq!(merged.len(), expected.len()); - assert_eq!(merged, expected); - } - - fn gen_records() -> impl Iterator { - gen::pairs_vary(0..255) - .map(|(key, bytes)| (key, Value::new(bytes.len() as i64, Some(bytes)))) - } - -} diff --git a/kvstore/src/storage.rs b/kvstore/src/storage.rs deleted file mode 100644 index ad4ac6640e..0000000000 --- a/kvstore/src/storage.rs +++ /dev/null @@ -1,280 +0,0 @@ -use crate::error::Result; -use crate::mapper::{Kind, Mapper}; -use crate::sstable::{Key, Merged, SSTable, Value}; -use std::collections::btree_map::Entry; -use std::collections::BTreeMap; -use std::mem; - -/// Wrapper over a BTreeMap<`Key`, `Value`> that does basic accounting of memory usage -/// (Doesn't include BTreeMap internal stuff, can't reliably account for that without -/// using special data-structures or depending on unstable implementation details of `std`) -#[derive(Debug)] -pub struct MemTable { - pub mem_size: usize, - pub values: BTreeMap, -} - -impl MemTable { - /// Memory over-head per record. Size of the key + size of commit ID. - pub const OVERHEAD_PER_RECORD: usize = mem::size_of::() + mem::size_of::(); - - pub fn new(values: BTreeMap) -> MemTable { - let mem_size = values.values().fold(0, |acc, elem| { - acc + Self::OVERHEAD_PER_RECORD + opt_bytes_memory(&elem.val) - }); - MemTable { mem_size, values } - } - - pub fn put(&mut self, key: &Key, commit: i64, data: &[u8]) { - let value = Value { - ts: commit, - val: Some(data.to_vec()), - }; - - self.mem_size += data.len(); - match self.values.entry(*key) { - Entry::Vacant(entry) => { - entry.insert(value); - self.mem_size += Self::OVERHEAD_PER_RECORD; - } - Entry::Occupied(mut entry) => { - let old = entry.insert(value); - self.mem_size -= opt_bytes_memory(&old.val); - } - } - } - - pub fn delete(&mut self, key: &Key, commit: i64) { - let value = Value { - ts: commit, - val: None, - }; - - match self.values.entry(*key) { - Entry::Vacant(entry) => { - entry.insert(value); - self.mem_size += Self::OVERHEAD_PER_RECORD; - } - Entry::Occupied(mut entry) => { - let old = entry.insert(value); - self.mem_size -= opt_bytes_memory(&old.val); - } - } - } -} - -pub fn flush_table( - mem: &BTreeMap, - mapper: &dyn Mapper, - pages: &mut Vec>, -) -> Result<()> { - if mem.is_empty() { - return Ok(()); - }; - - if pages.is_empty() { - pages.push(BTreeMap::new()); - } - - let mut iter = mem.iter(); - let sst = mapper.make_table(Kind::Active, &mut |mut data_wtr, mut index_wtr| { - SSTable::create(&mut iter, 0, &mut data_wtr, &mut index_wtr); - })?; - - let first = sst.meta().start; - - pages[0].insert(first, sst); - Ok(()) -} - -pub fn get( - mem: &BTreeMap, - pages: &[BTreeMap], - key: &Key, -) -> Result>> { - if let Some(idx) = mem.get(key) { - return Ok(idx.val.clone()); - } - - let mut candidates = Vec::new(); - - for level in pages.iter() { - for (_, sst) in level.iter().rev() { - if sst.could_contain(key) { - if let Some(val) = sst.get(&key)? { - candidates.push((*key, val)); - } - } - } - } - - let merged = Merged::new(vec![candidates.into_iter()]) - .next() - .map(|(_, v)| v.val.unwrap()); - Ok(merged) -} - -pub fn range( - mem: &BTreeMap, - tables: &[BTreeMap], - range: std::ops::RangeInclusive, -) -> Result)>> { - let mut sources: Vec>> = Vec::new(); - - let mem = mem - .range(range.clone()) - .map(|(k, v)| (*k, v.clone())) - .collect::>(); - sources.push(Box::new(mem.into_iter())); - - for level in tables.iter() { - for sst in level.values() { - let iter = sst.range(&range)?; - let iter = Box::new(iter) as Box>; - - sources.push(iter); - } - } - - let rows = Merged::new(sources).map(|(k, v)| (k, v.val.unwrap())); - - Ok(rows) -} - -impl Default for MemTable { - fn default() -> MemTable { - MemTable { - values: BTreeMap::new(), - mem_size: 0, - } - } -} - -#[inline] -fn opt_bytes_memory(bytes: &Option>) -> usize { - bytes.as_ref().map(Vec::len).unwrap_or(0) -} - -#[cfg(test)] -mod test { - use super::*; - use crate::test::gen; - - const COMMIT: i64 = -1; - - #[test] - fn test_put_calc() { - const DATA_SIZE: usize = 16; - - let mut table = MemTable::default(); - - for (key, data) in gen::pairs(DATA_SIZE).take(1024) { - table.put(&key, COMMIT, &data); - } - - let expected_size = 1024 * (DATA_SIZE + MemTable::OVERHEAD_PER_RECORD); - assert_eq!(table.mem_size, expected_size); - } - - #[test] - fn test_delete_calc() { - const DATA_SIZE: usize = 32; - - let mut table = MemTable::default(); - let input = gen::pairs(DATA_SIZE).take(1024).collect::>(); - - for (key, data) in &input { - table.put(key, COMMIT, data); - } - - for (key, _) in input.iter().rev().take(512) { - table.delete(key, COMMIT); - } - - let expected_size = - 512 * (DATA_SIZE + MemTable::OVERHEAD_PER_RECORD) + 512 * MemTable::OVERHEAD_PER_RECORD; - assert_eq!(table.mem_size, expected_size); - - // Deletes of things not in the memory table must be recorded - for key in gen::keys().take(512) { - table.delete(&key, COMMIT); - } - - let expected_size = expected_size + 512 * MemTable::OVERHEAD_PER_RECORD; - assert_eq!(table.mem_size, expected_size); - } - - #[test] - fn test_put_order_irrelevant() { - let (mut table_1, mut table_2) = (MemTable::default(), MemTable::default()); - let big_input: Vec<_> = gen::pairs(1024).take(128).collect(); - let small_input: Vec<_> = gen::pairs(16).take(128).collect(); - - for (key, data) in big_input.iter().chain(small_input.iter()) { - table_1.put(key, COMMIT, data); - } - - let iter = big_input - .iter() - .rev() - .zip(small_input.iter().rev()) - .enumerate(); - - for (i, ((big_key, big_data), (small_key, small_data))) in iter { - if i % 2 == 0 { - table_2.put(big_key, COMMIT, big_data); - table_2.put(small_key, COMMIT, small_data); - } else { - table_2.put(small_key, COMMIT, small_data); - table_2.put(big_key, COMMIT, big_data); - } - } - - assert_eq!(table_1.mem_size, table_2.mem_size); - assert_eq!(table_1.values, table_2.values); - } - - #[test] - fn test_delete_order_irrelevant() { - let (mut table_1, mut table_2) = (MemTable::default(), MemTable::default()); - let big_input: Vec<_> = gen::pairs(1024).take(128).collect(); - let small_input: Vec<_> = gen::pairs(16).take(128).collect(); - - for (key, data) in big_input.iter().chain(small_input.iter()) { - table_1.put(key, COMMIT, data); - table_2.put(key, COMMIT, data); - } - - let iter = big_input - .iter() - .rev() - .take(64) - .chain(small_input.iter().rev().take(64)) - .map(|(key, _)| key); - - for key in iter { - table_1.delete(key, COMMIT); - } - - let iter = big_input - .iter() - .rev() - .take(64) - .zip(small_input.iter().rev().take(64)) - .map(|((key, _), (key2, _))| (key, key2)) - .enumerate(); - - for (i, (big_key, small_key)) in iter { - if i % 2 == 0 { - table_2.delete(big_key, COMMIT); - table_2.delete(small_key, COMMIT); - } else { - table_2.delete(small_key, COMMIT); - table_2.delete(big_key, COMMIT); - } - } - - assert_eq!(table_1.mem_size, table_2.mem_size); - assert_eq!(table_1.values, table_2.values); - } -} diff --git a/kvstore/src/writebatch.rs b/kvstore/src/writebatch.rs deleted file mode 100644 index 774cf7ad75..0000000000 --- a/kvstore/src/writebatch.rs +++ /dev/null @@ -1,209 +0,0 @@ -use crate::error::{Error, Result}; -use crate::sstable::Key; -use crate::storage::MemTable; -use crate::writelog::WriteLog; -use crate::DEFAULT_MEM_SIZE; -use std::sync::{Arc, RwLock}; - -/// Configuration for `WriteBatch` -#[derive(Debug)] -pub struct Config { - /// Determines whether writes using this batch will be written to the write-ahead-log - /// immediately, or only all-at-once when the batch is being committed. - pub log_writes: bool, - /// Size cap for the write-batch. Inserts after it is full will return an `Err`; - pub max_size: usize, -} - -#[derive(Debug)] -pub struct WriteBatch { - pub(crate) log: Arc>, - pub(crate) memtable: MemTable, - pub(crate) commit: i64, - pub(crate) config: Config, -} - -impl WriteBatch { - pub fn put(&mut self, key: &Key, data: &[u8]) -> Result<()> { - self.check_capacity()?; - - if self.config.log_writes { - let mut log = self.log.write().unwrap(); - log.log_put(key, self.commit, data).unwrap(); - } - - self.memtable.put(key, self.commit, data); - - Ok(()) - } - - pub fn put_many(&mut self, rows: Iter) -> Result<()> - where - Iter: Iterator, - Tup: std::borrow::Borrow<(K, V)>, - K: std::borrow::Borrow, - V: std::borrow::Borrow<[u8]>, - { - self.check_capacity()?; - - if self.config.log_writes { - let mut log = self.log.write().unwrap(); - - for pair in rows { - let (ref key, ref data) = pair.borrow(); - let (key, data) = (key.borrow(), data.borrow()); - log.log_put(key, self.commit, data).unwrap(); - - self.memtable.put(key, self.commit, data); - } - } else { - for pair in rows { - let (ref key, ref data) = pair.borrow(); - self.memtable.put(key.borrow(), self.commit, data.borrow()); - } - } - - Ok(()) - } - - pub fn delete(&mut self, key: &Key) { - if self.config.log_writes { - let mut log = self.log.write().unwrap(); - log.log_delete(key, self.commit).unwrap(); - } - - self.memtable.delete(key, self.commit); - } - - pub fn delete_many(&mut self, rows: Iter) - where - Iter: Iterator, - K: std::borrow::Borrow, - { - if self.config.log_writes { - let mut log = self.log.write().unwrap(); - - for key in rows { - let key = key.borrow(); - log.log_delete(key, self.commit).unwrap(); - - self.memtable.delete(key, self.commit); - } - } else { - for key in rows { - self.memtable.delete(key.borrow(), self.commit); - } - } - } - - #[inline] - fn check_capacity(&self) -> Result<()> { - if self.memtable.mem_size >= self.config.max_size { - return Err(Error::WriteBatchFull(self.config.max_size)); - } - - Ok(()) - } -} - -impl Default for Config { - fn default() -> Config { - Config { - log_writes: true, - max_size: DEFAULT_MEM_SIZE, - } - } -} - -#[cfg(test)] -mod test { - use super::*; - use crate::test::gen; - use crate::writelog::Config as WalConfig; - - const CAPACITY: usize = 10 * 1024; - - #[test] - fn test_put_associative() { - let mut writebatch = setup(); - let input: Vec<_> = gen::pairs(32).take(100).collect(); - - writebatch.put_many(input.iter()).unwrap(); - - let mut writebatch2 = setup(); - for (key, data) in &input { - writebatch2.put(key, data).unwrap(); - } - - let (materialized_1, materialized_2) = ( - writebatch.log.write().unwrap().materialize().unwrap(), - writebatch2.log.write().unwrap().materialize().unwrap(), - ); - - assert_eq!(materialized_1, materialized_2); - } - - #[test] - fn test_delete_associative() { - let (mut writebatch, mut writebatch2) = (setup(), setup()); - let input: Vec<_> = gen::pairs(32).take(100).collect(); - - writebatch.put_many(input.iter()).unwrap(); - writebatch2.put_many(input.iter()).unwrap(); - - writebatch.delete_many(input.iter().map(|(k, _)| k)); - - for (key, _) in &input { - writebatch2.delete(key); - } - - let (materialized_1, materialized_2) = ( - writebatch.log.write().unwrap().materialize().unwrap(), - writebatch2.log.write().unwrap().materialize().unwrap(), - ); - - assert_eq!(materialized_1, materialized_2); - } - - #[test] - fn test_no_put_when_full() { - const AMT_RECORDS: usize = 64; - - let mut writebatch = setup(); - - let space_per_record = CAPACITY / AMT_RECORDS - MemTable::OVERHEAD_PER_RECORD; - let input: Vec<_> = gen::pairs(space_per_record).take(AMT_RECORDS).collect(); - - writebatch.put_many(input.iter()).unwrap(); - - match writebatch.check_capacity() { - Err(Error::WriteBatchFull(CAPACITY)) => {} - _ => panic!("Writebatch should be exactly at capacity"), - } - - let (key, data) = gen::pairs(space_per_record).next().unwrap(); - let result = writebatch.put(&key, &data); - assert!(result.is_err()); - - // Free up space - writebatch.delete(&input[0].0); - let result = writebatch.put(&key, &data); - assert!(result.is_ok()); - } - - fn setup() -> WriteBatch { - let config = Config { - log_writes: true, - max_size: CAPACITY, - }; - - let log = WriteLog::memory(WalConfig::default()); - - WriteBatch { - config, - commit: -1, - memtable: MemTable::default(), - log: Arc::new(RwLock::new(log)), - } - } -} diff --git a/kvstore/src/writelog.rs b/kvstore/src/writelog.rs deleted file mode 100644 index 97cda03531..0000000000 --- a/kvstore/src/writelog.rs +++ /dev/null @@ -1,276 +0,0 @@ -use crate::error::Result; -use crate::io_utils::{CRCReader, CRCWriter}; -use crate::sstable::Value; -use crate::Key; -use memmap::Mmap; -use std::collections::BTreeMap; -use std::fs::{self, File}; -use std::io::Write; -use std::path::{Path, PathBuf}; - -// RocksDb's log uses this size. -// May be worth making configurable and experimenting -const BLOCK_SIZE: usize = 32 * 1024; - -#[derive(Debug)] -pub struct WriteLog { - log_path: PathBuf, - logger: Logger, - config: Config, - in_memory: bool, -} - -#[derive(Clone, Copy, Debug, PartialEq)] -pub struct Config { - pub use_fsync: bool, - pub sync_every_write: bool, -} - -impl WriteLog { - pub fn open(path: &Path, config: Config) -> Result { - let file = file_opts().open(path)?; - - Ok(WriteLog { - config, - log_path: path.to_path_buf(), - logger: Logger::disk(file), - in_memory: false, - }) - } - - #[allow(dead_code)] - pub fn memory(config: Config) -> WriteLog { - WriteLog { - config, - logger: Logger::memory(), - log_path: Path::new("").to_path_buf(), - in_memory: true, - } - } - - pub fn reset(&mut self) -> Result<()> { - let new_logger = if self.in_memory { - Logger::memory() - } else { - let file = file_opts().truncate(true).open(&self.log_path)?; - Logger::disk(file) - }; - - self.logger = new_logger; - - Ok(()) - } - - pub fn log_put(&mut self, key: &Key, ts: i64, val: &[u8]) -> Result<()> { - log(&mut self.logger, key, ts, Some(val))?; - - if self.config.sync_every_write { - sync(&mut self.logger, self.config.use_fsync)?; - } - - Ok(()) - } - - pub fn log_delete(&mut self, key: &Key, ts: i64) -> Result<()> { - log(&mut self.logger, key, ts, None)?; - - if self.config.sync_every_write { - sync(&mut self.logger, self.config.use_fsync)?; - } - - Ok(()) - } - - #[allow(dead_code)] - pub fn sync(&mut self) -> Result<()> { - sync(&mut self.logger, self.config.use_fsync) - } - - pub fn materialize(&mut self) -> Result> { - let mmap = self.logger.writer.mmap()?; - read_log(&mmap) - } -} - -impl Default for Config { - fn default() -> Config { - Config { - use_fsync: false, - sync_every_write: true, - } - } -} - -trait LogWriter: std::fmt::Debug + Write + Send + Sync { - fn sync(&mut self, fsync: bool) -> Result<()>; - fn mmap(&self) -> Result; -} - -/// Holds actual logging related state -#[derive(Debug)] -struct Logger { - writer: Box, -} - -impl Logger { - fn memory() -> Self { - Logger { - writer: Box::new(CRCWriter::new(vec![], BLOCK_SIZE)), - } - } - - fn disk(file: File) -> Self { - Logger { - writer: Box::new(CRCWriter::new(file, BLOCK_SIZE)), - } - } -} - -impl LogWriter for CRCWriter> { - fn sync(&mut self, _: bool) -> Result<()> { - self.flush()?; - Ok(()) - } - - fn mmap(&self) -> Result { - let mut map = memmap::MmapMut::map_anon(self.get_ref().len())?; - (&mut map[..]).copy_from_slice(self.get_ref()); - Ok(map.make_read_only()?) - } -} - -impl LogWriter for CRCWriter { - fn sync(&mut self, fsync: bool) -> Result<()> { - self.flush()?; - - let file = self.get_mut(); - if fsync { - file.sync_all()?; - } else { - file.sync_data()?; - } - - Ok(()) - } - - fn mmap(&self) -> Result { - let map = unsafe { Mmap::map(self.get_ref())? }; - Ok(map) - } -} - -fn log(logger: &mut Logger, key: &Key, commit: i64, data: Option<&[u8]>) -> Result<()> { - let writer = &mut logger.writer; - - bincode::serialize_into(writer, &(key, commit, data))?; - - Ok(()) -} - -fn sync(logger: &mut Logger, sync_all: bool) -> Result<()> { - let writer = &mut logger.writer; - - writer.sync(sync_all)?; - - Ok(()) -} - -#[inline] -fn file_opts() -> fs::OpenOptions { - let mut opts = fs::OpenOptions::new(); - opts.read(true).write(true).create(true); - opts -} - -fn read_log(log_buf: &[u8]) -> Result> { - let mut map = BTreeMap::new(); - - let mut reader = CRCReader::new(log_buf, BLOCK_SIZE); - - while let Ok((key, commit, opt_bytes)) = bincode::deserialize_from(&mut reader) { - map.insert(key, Value::new(commit, opt_bytes)); - } - - Ok(map) -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_log_serialization() { - let (key, commit, data) = (Key::from((1, 2, 3)), 4, Some(vec![0; 1024])); - - let mut buf = vec![]; - - bincode::serialize_into(&mut buf, &(&key, commit, &data)).unwrap(); - buf.extend(std::iter::repeat(0).take(buf.len())); - - let log_record: (Key, i64, Option>) = bincode::deserialize_from(&buf[..]).unwrap(); - assert_eq!(log_record.0, key); - assert_eq!(log_record.1, commit); - assert_eq!(log_record.2, data); - } - - #[test] - fn test_log_round_trip() { - let mut wal = WriteLog::memory(Config::default()); - - let values: BTreeMap = (0u64..100) - .map(|n| { - let val = if n % 2 == 0 { - Some(vec![0; 1024]) - } else { - None - }; - (Key::from((n, n, n)), Value { ts: n as i64, val }) - }) - .collect(); - - for (k, v) in values.iter() { - if v.val.is_some() { - wal.log_put(k, v.ts, v.val.as_ref().unwrap()) - .expect("Wal::put"); - } else { - wal.log_delete(k, v.ts).expect("Wal::delete"); - } - } - - let reloaded = wal.materialize().expect("Wal::materialize"); - - assert_eq!(values.len(), reloaded.len()); - assert_eq!(values, reloaded); - } - - #[test] - fn test_reset() { - use crate::error::Error; - - let mut wal = WriteLog::memory(Config::default()); - - let values: BTreeMap = (0u64..100) - .map(|n| { - let val = Some(vec![0; 64]); - (Key::from((n, n, n)), Value { ts: n as i64, val }) - }) - .collect(); - - for (k, v) in values.iter() { - wal.log_put(k, v.ts, v.val.as_ref().unwrap()) - .expect("Wal::put"); - } - - wal.reset().expect("Wal::reset"); - - // Should result in an error due to attempting to make a memory map of length 0 - let result = wal.materialize(); - - assert!(result.is_err()); - if let Err(Error::Io(e)) = result { - assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); - } else { - panic!("should fail to create 0-length memory-map with an empty log"); - } - } -} diff --git a/kvstore/tests/basic.rs b/kvstore/tests/basic.rs deleted file mode 100644 index 804ac981e1..0000000000 --- a/kvstore/tests/basic.rs +++ /dev/null @@ -1,240 +0,0 @@ -use std::fs; -use std::path::{Path, PathBuf}; - -use solana_kvstore::test::gen; -use solana_kvstore::{Config, Key, KvStore}; - -const KB: usize = 1024; -const HALF_KB: usize = 512; - -#[test] -fn test_put_get() { - let path = setup("test_put_get"); - - let cfg = Config { - max_mem: 64 * KB, - max_tables: 5, - page_size: 64 * KB, - ..Config::default() - }; - - let lsm = KvStore::open(&path, cfg).unwrap(); - let (key, bytes) = gen::pairs(HALF_KB).take(1).next().unwrap(); - - lsm.put(&key, &bytes).expect("put fail"); - let out_bytes = lsm.get(&key).expect("get fail").expect("missing"); - - assert_eq!(bytes, out_bytes); - - teardown(&path); -} - -#[test] -fn test_put_get_many() { - let path = setup("test_put_get_many"); - - let cfg = Config { - max_mem: 64 * KB, - max_tables: 5, - page_size: 64 * KB, - ..Config::default() - }; - let lsm = KvStore::open(&path, cfg).unwrap(); - - let mut pairs: Vec<_> = gen::pairs(HALF_KB).take(1024).collect(); - pairs.sort_unstable_by_key(|(k, _)| *k); - - lsm.put_many(pairs.clone().drain(..)) - .expect("put_many fail"); - - let retrieved: Vec<(Key, Vec)> = - lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); - - assert!(!retrieved.is_empty()); - assert_eq!(pairs.len(), retrieved.len()); - assert_eq!(pairs, retrieved); - - teardown(&path); -} - -#[test] -fn test_delete() { - let path = setup("test_delete"); - - let cfg = Config { - max_mem: 64 * KB, - max_tables: 5, - page_size: 64 * KB, - ..Config::default() - }; - let lsm = KvStore::open(&path, cfg).unwrap(); - - let mut pairs: Vec<_> = gen::pairs(HALF_KB).take(64 * 6).collect(); - pairs.sort_unstable_by_key(|(k, _)| *k); - - for (k, i) in pairs.iter() { - lsm.put(k, i).expect("put fail"); - } - - // drain iterator deletes from `pairs` - for (k, _) in pairs.drain(64..128) { - lsm.delete(&k).expect("delete fail"); - } - - let retrieved: Vec<(Key, Vec)> = - lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); - - assert!(!retrieved.is_empty()); - assert_eq!(pairs.len(), retrieved.len()); - assert_eq!(pairs, retrieved); - - teardown(&path); -} - -#[test] -fn test_delete_many() { - let path = setup("test_delete_many"); - - let cfg = Config { - max_mem: 64 * KB, - max_tables: 5, - page_size: 64 * KB, - ..Config::default() - }; - let lsm = KvStore::open(&path, cfg).unwrap(); - - let mut pairs: Vec<_> = gen::pairs(HALF_KB).take(64 * 6).collect(); - pairs.sort_unstable_by_key(|(k, _)| *k); - - for (k, i) in pairs.iter() { - lsm.put(k, i).expect("put fail"); - } - - // drain iterator deletes from `pairs` - let keys_to_delete = pairs.drain(320..384).map(|(k, _)| k); - - lsm.delete_many(keys_to_delete).expect("delete_many fail"); - - let retrieved: Vec<(Key, Vec)> = - lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); - - assert!(!retrieved.is_empty()); - assert_eq!(pairs.len(), retrieved.len()); - assert_eq!(pairs, retrieved); - - teardown(&path); -} - -#[test] -fn test_close_reopen() { - let path = setup("test_close_reopen"); - let cfg = Config::default(); - let lsm = KvStore::open(&path, cfg).unwrap(); - - let mut pairs: Vec<_> = gen::pairs(KB).take(1024).collect(); - pairs.sort_unstable_by_key(|(k, _)| *k); - - for (k, i) in pairs.iter() { - lsm.put(k, i).expect("put fail"); - } - - for (k, _) in pairs.drain(64..128) { - lsm.delete(&k).expect("delete fail"); - } - - // Drop and re-open - drop(lsm); - let lsm = KvStore::open(&path, cfg).unwrap(); - - let retrieved: Vec<(Key, Vec)> = - lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); - - assert!(!retrieved.is_empty()); - assert_eq!(pairs.len(), retrieved.len()); - assert_eq!(pairs, retrieved); - - teardown(&path); -} - -#[test] -fn test_partitioned() { - let path = setup("test_partitioned"); - - let cfg = Config { - max_mem: 64 * KB, - max_tables: 5, - page_size: 64 * KB, - ..Config::default() - }; - - let storage_dirs = (0..4) - .map(|i| path.join(format!("parition-{}", i))) - .collect::>(); - - let lsm = KvStore::partitioned(&path, &storage_dirs, cfg).unwrap(); - - let mut pairs: Vec<_> = gen::pairs(HALF_KB).take(64 * 12).collect(); - pairs.sort_unstable_by_key(|(k, _)| *k); - - lsm.put_many(pairs.iter()).expect("put_many fail"); - - // drain iterator deletes from `pairs` - let keys_to_delete = pairs.drain(320..384).map(|(k, _)| k); - - lsm.delete_many(keys_to_delete).expect("delete_many fail"); - - let retrieved: Vec<(Key, Vec)> = - lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); - - assert!(!retrieved.is_empty()); - assert_eq!(pairs.len(), retrieved.len()); - assert_eq!(pairs, retrieved); - - teardown(&path); -} - -#[test] -fn test_in_memory() { - let path = setup("test_in_memory"); - - let cfg = Config { - max_mem: 64 * KB, - max_tables: 5, - page_size: 64 * KB, - in_memory: true, - ..Config::default() - }; - let lsm = KvStore::open(&path, cfg).unwrap(); - - let mut pairs: Vec<_> = gen::pairs(HALF_KB).take(64 * 12).collect(); - pairs.sort_unstable_by_key(|(k, _)| *k); - - lsm.put_many(pairs.iter()).expect("put_many fail"); - - // drain iterator deletes from `pairs` - let keys_to_delete = pairs.drain(320..384).map(|(k, _)| k); - - lsm.delete_many(keys_to_delete).expect("delete_many fail"); - - let retrieved: Vec<(Key, Vec)> = - lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); - - assert!(!retrieved.is_empty()); - assert_eq!(pairs.len(), retrieved.len()); - assert_eq!(pairs, retrieved); - - teardown(&path); -} - -fn setup(test_name: &str) -> PathBuf { - let dir = Path::new("kvstore-test").join(test_name); - - let _ig = fs::remove_dir_all(&dir); - fs::create_dir_all(&dir).unwrap(); - - dir -} - -fn teardown(p: &Path) { - KvStore::destroy(p).expect("Expect successful store destruction"); -}