diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index d59235263c..1b97214f4e 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -1,5 +1,5 @@ use crate::entry::Entry; -use crate::kvstore::{self, Key}; +use crate::kvstore::{self, Key, KvStore}; use crate::packet::Blob; use crate::result::{Error, Result}; @@ -12,7 +12,7 @@ use super::db::{ use super::{Blocktree, BlocktreeError}; #[derive(Debug)] -pub struct Kvs(()); +pub struct Kvs(KvStore); /// The metadata column family #[derive(Debug)] diff --git a/core/src/kvstore.rs b/core/src/kvstore.rs index d27696d66d..2a5ebf7179 100644 --- a/core/src/kvstore.rs +++ b/core/src/kvstore.rs @@ -2,14 +2,13 @@ use crate::kvstore::mapper::{Disk, Mapper, Memory}; use crate::kvstore::sstable::SSTable; use crate::kvstore::storage::WriteState; use crate::kvstore::writelog::WriteLog; - use std::collections::BTreeMap; use std::fs; use std::io; use std::ops::RangeInclusive; use std::path::{Path, PathBuf}; use std::sync::mpsc::{Receiver, Sender}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::JoinHandle; mod compactor; @@ -48,8 +47,8 @@ pub struct KvStore { config: Config, root: PathBuf, mapper: Arc, - req_tx: RwLock>, - resp_rx: RwLock>, + sender: Mutex>, + receiver: Mutex>, compactor_handle: JoinHandle<()>, } @@ -201,17 +200,17 @@ impl KvStore { } fn query_compactor(&self) -> Result<()> { - if let (Ok(mut req_tx), Ok(mut resp_rx), Ok(mut tables)) = ( - self.req_tx.try_write(), - self.resp_rx.try_write(), + if let (Ok(mut sender), Ok(mut receiver), Ok(mut tables)) = ( + self.sender.try_lock(), + self.receiver.try_lock(), self.tables.try_write(), ) { query_compactor( &self.root, &*self.mapper, &mut *tables, - &mut *resp_rx, - &mut *req_tx, + &mut *receiver, + &mut *sender, )?; } @@ -238,8 +237,8 @@ impl KvStore { dump_tables(&self.root, &*self.mapper).unwrap(); if trigger_compact { let tables_path = self.root.join(TABLES_FILE); - self.req_tx - .write() + self.sender + .lock() .unwrap() .send(compactor::Req::Start(tables_path)) .expect("compactor thread dead"); @@ -279,9 +278,9 @@ fn open(root: &Path, mapper: Arc, config: Config) -> Result max_pages: config.max_tables, page_size: config.page_size, }; - let (req_tx, resp_rx, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg) + let (sender, receiver, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let (req_tx, resp_rx) = (RwLock::new(req_tx), RwLock::new(resp_rx)); + let (sender, receiver) = (Mutex::new(sender), Mutex::new(receiver)); Ok(KvStore { write, @@ -289,8 +288,8 @@ fn open(root: &Path, mapper: Arc, config: Config) -> Result config, mapper, root, - req_tx, - resp_rx, + sender, + receiver, compactor_handle, }) } @@ -316,14 +315,14 @@ fn query_compactor( root: &Path, mapper: &dyn Mapper, tables: &mut Vec>, - resp_rx: &mut Receiver, - req_tx: &mut Sender, + receiver: &mut Receiver, + sender: &mut Sender, ) -> Result<()> { - match resp_rx.try_recv() { + match receiver.try_recv() { Ok(compactor::Resp::Done(new_tables)) => { std::mem::replace(tables, new_tables); dump_tables(root, mapper)?; - req_tx.send(compactor::Req::Gc).unwrap(); + sender.send(compactor::Req::Gc).unwrap(); } Ok(compactor::Resp::Failed(e)) => { return Err(e);