From fa3e1fa7c94b253806f664d9b1b9e478133aa54d Mon Sep 17 00:00:00 2001 From: Mark Date: Fri, 15 Mar 2019 15:04:34 -0500 Subject: [PATCH] Add error correction to write-log (#3323) --- core/src/kvstore.rs | 13 +- core/src/kvstore/writelog.rs | 392 ++++++++++++++++++++++++++++------- core/tests/kvstore.rs | 1 + 3 files changed, 326 insertions(+), 80 deletions(-) diff --git a/core/src/kvstore.rs b/core/src/kvstore.rs index 2a5ebf7179..e576f0cc65 100644 --- a/core/src/kvstore.rs +++ b/core/src/kvstore.rs @@ -24,6 +24,7 @@ mod writetx; pub use self::error::{Error, Result}; pub use self::readtx::ReadTx as Snapshot; pub use self::sstable::Key; +pub use self::writelog::Config as LogConfig; pub use self::writetx::WriteTx; const TABLES_FILE: &str = "tables.meta"; @@ -38,6 +39,7 @@ pub struct Config { pub max_tables: usize, pub page_size: usize, pub in_memory: bool, + pub log_config: LogConfig, } #[derive(Debug)] @@ -255,6 +257,7 @@ impl Default for Config { max_tables: DEFAULT_MAX_PAGES, page_size: DEFAULT_TABLE_SIZE, in_memory: false, + log_config: LogConfig::default(), } } } @@ -262,12 +265,18 @@ impl Default for Config { fn open(root: &Path, mapper: Arc, config: Config) -> Result { let root = root.to_path_buf(); let log_path = root.join(LOG_FILE); + let restore_log = log_path.exists(); + if !root.exists() { fs::create_dir(&root)?; } - let write_log = WriteLog::open(&log_path, config.max_mem)?; - let mem = write_log.materialize()?; + let write_log = WriteLog::open(&log_path, config.log_config)?; + let mem = if restore_log && !config.in_memory { + write_log.materialize()? + } else { + BTreeMap::new() + }; let write = RwLock::new(WriteState::new(write_log, mem)); diff --git a/core/src/kvstore/writelog.rs b/core/src/kvstore/writelog.rs index 14d1b7c7db..ba5a16b977 100644 --- a/core/src/kvstore/writelog.rs +++ b/core/src/kvstore/writelog.rs @@ -1,105 +1,341 @@ use crate::kvstore::error::Result; +use crate::kvstore::io_utils::{CRCReader, CRCWriter}; use crate::kvstore::sstable::Value; use crate::kvstore::Key; - -use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; - +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use memmap::Mmap; use std::collections::BTreeMap; use std::fs::{self, File}; -use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}; +use std::io::{Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::RwLock; + +// RocksDb's log uses this size. +// May be worth making configurable and experimenting +const BLOCK_SIZE: usize = 32 * 1024; #[derive(Debug)] pub struct WriteLog { log_path: PathBuf, - log_writer: BufWriter, - max_batch_size: usize, + logger: RwLock, + config: Config, + in_memory: bool, +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct Config { + pub use_fsync: bool, + pub sync_every_write: bool, } impl WriteLog { - pub fn open(path: &Path, max_batch_size: usize) -> Result { - let log_writer = BufWriter::new( - fs::OpenOptions::new() - .create(true) - .append(true) - .open(path)?, - ); - let log_path = path.to_path_buf(); + pub fn open(path: &Path, config: Config) -> Result { + let file = file_opts().open(path)?; Ok(WriteLog { - log_writer, - log_path, - max_batch_size, + config, + log_path: path.to_path_buf(), + logger: RwLock::new(Logger::disk(file)), + in_memory: false, }) } - pub fn reset(&mut self) -> Result<()> { - self.log_writer.flush()?; - let file = self.log_writer.get_mut(); - file.set_len(0)?; - file.seek(SeekFrom::Start(0))?; - - Ok(()) - } - - pub fn log_put(&mut self, key: &Key, ts: i64, val: &[u8]) -> Result<()> { - let rec_len = 24 + 8 + 1 + val.len() as u64; - let mut buf = vec![0u8; rec_len as usize + 8]; - - log_to_buffer(&mut buf, rec_len, key, ts, val); - - self.log_writer.write_all(&buf)?; - Ok(()) - } - - pub fn log_delete(&mut self, key: &Key, ts: i64) -> Result<()> { - self.log_put(key, ts, &[]) - } - - // TODO: decide how to configure/schedule calling this #[allow(dead_code)] - pub fn sync(&mut self) -> Result<()> { - self.log_writer.flush()?; - self.log_writer.get_mut().sync_all()?; + pub fn memory(config: Config) -> WriteLog { + WriteLog { + config, + logger: RwLock::new(Logger::memory()), + log_path: Path::new("").to_path_buf(), + in_memory: true, + } + } + + pub fn reset(&self) -> Result<()> { + let mut logger = self.logger.write().unwrap(); + + let new_logger = if self.in_memory { + Logger::memory() + } else { + let file = file_opts().truncate(true).open(&self.log_path)?; + Logger::disk(file) + }; + + *logger = new_logger; + Ok(()) } + pub fn log_put(&self, key: &Key, ts: i64, val: &[u8]) -> Result<()> { + let mut logger = self.logger.write().unwrap(); + + log(&mut logger, key, ts, Some(val))?; + + if self.config.sync_every_write { + sync(&mut logger, self.config.use_fsync)?; + } + + Ok(()) + } + + pub fn log_delete(&self, key: &Key, ts: i64) -> Result<()> { + let mut logger = self.logger.write().unwrap(); + + log(&mut logger, key, ts, None)?; + + if self.config.sync_every_write { + sync(&mut logger, self.config.use_fsync)?; + } + + Ok(()) + } + + #[allow(dead_code)] + pub fn sync(&self) -> Result<()> { + let mut logger = self.logger.write().unwrap(); + + sync(&mut logger, self.config.use_fsync) + } + pub fn materialize(&self) -> Result> { - let mut table = BTreeMap::new(); - if !self.log_path.exists() { - return Ok(table); - } - - let mut rdr = BufReader::new(File::open(&self.log_path)?); - let mut buf = vec![]; - - while let Ok(rec_len) = rdr.read_u64::() { - buf.resize(rec_len as usize, 0); - rdr.read_exact(&mut buf)?; - - let key = Key::read(&buf[0..24]); - let ts = BigEndian::read_i64(&buf[24..32]); - let exists = buf[32] != 0; - - let val = if exists { - Some(buf[33..].to_vec()) - } else { - None - }; - let value = Value { ts, val }; - - table.insert(key, value); - } - - Ok(table) + let mmap = self.logger.write().unwrap().writer.mmap()?; + read_log(&mmap) } } -#[inline] -fn log_to_buffer(buf: &mut [u8], rec_len: u64, key: &Key, ts: i64, val: &[u8]) { - BigEndian::write_u64(&mut buf[..8], rec_len); - (&mut buf[8..32]).copy_from_slice(&key.0); - BigEndian::write_i64(&mut buf[32..40], ts); - buf[40] = (!val.is_empty()) as u8; - (&mut buf[41..]).copy_from_slice(val); +impl Default for Config { + fn default() -> Config { + Config { + use_fsync: false, + sync_every_write: true, + } + } +} + +trait LogWriter: std::fmt::Debug + Write + Send + Sync { + fn sync(&mut self, fsync: bool) -> Result<()>; + fn mmap(&self) -> Result; +} + +/// Holds actual logging related state +#[derive(Debug)] +struct Logger { + writer: Box, +} + +impl Logger { + fn memory() -> Self { + Logger { + writer: Box::new(CRCWriter::new(vec![], BLOCK_SIZE)), + } + } + + fn disk(file: File) -> Self { + Logger { + writer: Box::new(CRCWriter::new(file, BLOCK_SIZE)), + } + } +} + +impl LogWriter for CRCWriter> { + fn sync(&mut self, _: bool) -> Result<()> { + Ok(self.flush()?) + } + + fn mmap(&self) -> Result { + let mut map = memmap::MmapMut::map_anon(self.get_ref().len())?; + (&mut map[..]).copy_from_slice(self.get_ref()); + Ok(map.make_read_only()?) + } +} + +impl LogWriter for CRCWriter { + fn sync(&mut self, fsync: bool) -> Result<()> { + self.flush()?; + + let file = self.get_mut(); + if fsync { + file.sync_all()?; + } else { + file.sync_data()?; + } + + Ok(()) + } + + fn mmap(&self) -> Result { + let map = unsafe { Mmap::map(self.get_ref())? }; + Ok(map) + } +} + +fn log(logger: &mut Logger, key: &Key, commit: i64, data: Option<&[u8]>) -> Result<()> { + let writer = &mut logger.writer; + write_value(writer, key, commit, data)?; + + Ok(()) +} + +fn sync(logger: &mut Logger, sync_all: bool) -> Result<()> { + let writer = &mut logger.writer; + + writer.sync(sync_all)?; + + Ok(()) +} + +#[inline] +fn file_opts() -> fs::OpenOptions { + let mut opts = fs::OpenOptions::new(); + opts.read(true).write(true).create(true); + opts +} + +fn read_log(log_buf: &[u8]) -> Result> { + let mut map = BTreeMap::new(); + if log_buf.len() <= 8 + 24 + 8 + 1 { + return Ok(map); + } + + let mut reader = CRCReader::new(log_buf, BLOCK_SIZE); + + while let Ok((key, val)) = read_value(&mut reader) { + map.insert(key, val); + } + + Ok(map) +} + +#[inline] +fn write_value( + writer: &mut W, + key: &Key, + commit: i64, + data: Option<&[u8]>, +) -> Result<()> { + let len = 24 + 8 + 1 + data.map(<[u8]>::len).unwrap_or(0); + + writer.write_u64::(len as u64)?; + writer.write_all(&key.0)?; + writer.write_i64::(commit)?; + + match data { + Some(data) => { + writer.write_u8(1)?; + writer.write_all(data)?; + } + None => { + writer.write_u8(0)?; + } + } + + Ok(()) +} + +#[inline] +fn read_value(reader: &mut R) -> Result<(Key, Value)> { + let len = reader.read_u64::()?; + let data_len = len as usize - (24 + 8 + 1); + + let mut reader = reader.by_ref().take(len); + + let mut key_buf = [0; 24]; + reader.read_exact(&mut key_buf)?; + let key = Key(key_buf); + + let commit = reader.read_i64::()?; + let exists = reader.read_u8()? != 0; + + let data = if exists { + let mut buf = Vec::with_capacity(data_len); + reader.read_to_end(&mut buf)?; + Some(buf) + } else { + None + }; + + let val = Value { + ts: commit, + val: data, + }; + Ok((key, val)) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_log_serialization() { + let (key, commit, data) = (&Key::from((1, 2, 3)), 4, vec![0; 1024]); + + let mut buf = vec![]; + + write_value(&mut buf, key, commit, Some(&data)).unwrap(); + + let (stored_key, stored_val) = read_value(&mut &buf[..]).unwrap(); + assert_eq!(&stored_key, key); + assert_eq!(stored_val.val.as_ref().unwrap(), &data); + assert_eq!(stored_val.ts, commit); + } + + #[test] + fn test_log_round_trip() { + let wal = WriteLog::memory(Config::default()); + + let values: BTreeMap = (0u64..100) + .map(|n| { + let val = if n % 2 == 0 { + Some(vec![0; 1024]) + } else { + None + }; + (Key::from((n, n, n)), Value { ts: n as i64, val }) + }) + .collect(); + + for (k, v) in values.iter() { + if v.val.is_some() { + wal.log_put(k, v.ts, v.val.as_ref().unwrap()) + .expect("Wal::put"); + } else { + wal.log_delete(k, v.ts).expect("Wal::delete"); + } + } + + let reloaded = wal.materialize().expect("Wal::materialize"); + + assert_eq!(values.len(), reloaded.len()); + assert_eq!(values, reloaded); + } + + #[test] + fn test_reset() { + use crate::kvstore::error::Error; + + let wal = WriteLog::memory(Config::default()); + + let values: BTreeMap = (0u64..100) + .map(|n| { + let val = Some(vec![0; 64]); + (Key::from((n, n, n)), Value { ts: n as i64, val }) + }) + .collect(); + + for (k, v) in values.iter() { + wal.log_put(k, v.ts, v.val.as_ref().unwrap()) + .expect("Wal::put"); + } + + wal.reset().expect("Wal::reset"); + + // Should result in an error due to attempting to make a memory map of length 0 + let result = wal.materialize(); + + assert!(result.is_err()); + if let Err(Error::Io(e)) = result { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); + } else { + panic!("should fail to create 0-length memory-map with an empty log"); + } + } } diff --git a/core/tests/kvstore.rs b/core/tests/kvstore.rs index a4e1f5f293..bec27ca40b 100644 --- a/core/tests/kvstore.rs +++ b/core/tests/kvstore.rs @@ -204,6 +204,7 @@ fn test_in_memory() { max_tables: 5, page_size: 64 * KB, in_memory: true, + ..Config::default() }; let lsm = KvStore::open(&path, cfg).unwrap();