diff --git a/benches/appendvec.rs b/benches/appendvec.rs new file mode 100644 index 0000000000..cdfa31780a --- /dev/null +++ b/benches/appendvec.rs @@ -0,0 +1,172 @@ +#![cfg_attr(feature = "unstable", feature(test))] +extern crate rand; +extern crate test; + +use rand::{thread_rng, Rng}; +use solana_runtime::appendvec::AppendVec; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::spawn; +use test::Bencher; + +const START_SIZE: u64 = 4 * 1024 * 1024; +const INC_SIZE: u64 = 1 * 1024 * 1024; + +#[bench] +fn appendvec_atomic_append(bencher: &mut Bencher) { + let path = Path::new("/media/nvme0/bench/bench_append"); + let mut vec = AppendVec::::new(path, true, START_SIZE, INC_SIZE); + bencher.iter(|| { + if vec.append(AtomicUsize::new(0)).is_none() { + assert!(vec.grow_file().is_ok()); + assert!(vec.append(AtomicUsize::new(0)).is_some()); + } + }); + std::fs::remove_file(path).unwrap(); +} + +#[bench] +fn appendvec_atomic_random_access(bencher: &mut Bencher) { + let path = Path::new("/media/nvme0/bench/bench_ra"); + let mut vec = AppendVec::::new(path, true, START_SIZE, INC_SIZE); + let size = 10_000_000; + for _ in 0..size { + if vec.append(AtomicUsize::new(0)).is_none() { + assert!(vec.grow_file().is_ok()); + assert!(vec.append(AtomicUsize::new(0)).is_some()); + } + } + bencher.iter(|| { + let index = thread_rng().gen_range(0, size as u64); + vec.get(index * std::mem::size_of::() as u64); + }); + std::fs::remove_file(path).unwrap(); +} + +#[bench] +fn appendvec_atomic_random_change(bencher: &mut Bencher) { + let path = Path::new("/media/nvme0/bench/bench_rax"); + let mut vec = AppendVec::::new(path, true, START_SIZE, INC_SIZE); + let size = 10_000_000; + for _ in 0..size { + if vec.append(AtomicUsize::new(0)).is_none() { + assert!(vec.grow_file().is_ok()); + assert!(vec.append(AtomicUsize::new(0)).is_some()); + } + } + bencher.iter(|| { + let index = thread_rng().gen_range(0, size as u64); + let atomic1 = vec.get(index * std::mem::size_of::() as u64); + let current1 = atomic1.load(Ordering::Relaxed); + let next = current1 + 1; + atomic1.store(next, Ordering::Relaxed); + let atomic2 = vec.get(index * std::mem::size_of::() as u64); + let current2 = atomic2.load(Ordering::Relaxed); + assert_eq!(current2, next); + }); + std::fs::remove_file(path).unwrap(); +} + +#[bench] +fn appendvec_atomic_random_read(bencher: &mut Bencher) { + let path = Path::new("/media/nvme0/bench/bench_read"); + let mut vec = AppendVec::::new(path, true, START_SIZE, INC_SIZE); + let size = 100_000_000; + for _ in 0..size { + if vec.append(AtomicUsize::new(0)).is_none() { + assert!(vec.grow_file().is_ok()); + assert!(vec.append(AtomicUsize::new(0)).is_some()); + } + } + bencher.iter(|| { + let index = thread_rng().gen_range(0, size); + let atomic1 = vec.get((index * std::mem::size_of::()) as u64); + let current1 = atomic1.load(Ordering::Relaxed); + assert_eq!(current1, 0); + }); + std::fs::remove_file(path).unwrap(); +} + +#[bench] +fn appendvec_concurrent_lock_append(bencher: &mut Bencher) { + let path = Path::new("bench_lock_append"); + let vec = Arc::new(RwLock::new(AppendVec::::new( + path, true, START_SIZE, INC_SIZE, + ))); + let vec1 = vec.clone(); + let size = 100_000_000; + let count = Arc::new(AtomicUsize::new(0)); + let count1 = count.clone(); + spawn(move || loop { + let mut len = count.load(Ordering::Relaxed); + { + let rlock = vec1.read().unwrap(); + loop { + if rlock.append(AtomicUsize::new(0)).is_none() { + break; + } + len = count.fetch_add(1, Ordering::Relaxed); + } + if len >= size { + break; + } + } + { + let mut wlock = vec1.write().unwrap(); + if len >= size { + break; + } + assert!(wlock.grow_file().is_ok()); + } + }); + bencher.iter(|| { + let _rlock = vec.read().unwrap(); + let len = count1.load(Ordering::Relaxed); + assert!(len < size * 2); + }); + std::fs::remove_file(path).unwrap(); +} + +#[bench] +fn appendvec_concurrent_get_append(bencher: &mut Bencher) { + let path = Path::new("bench_get_append"); + let vec = Arc::new(RwLock::new(AppendVec::::new( + path, true, START_SIZE, INC_SIZE, + ))); + let vec1 = vec.clone(); + let size = 100_000_000; + let count = Arc::new(AtomicUsize::new(0)); + let count1 = count.clone(); + spawn(move || loop { + let mut len = count.load(Ordering::Relaxed); + { + let rlock = vec1.read().unwrap(); + loop { + if rlock.append(AtomicUsize::new(0)).is_none() { + break; + } + len = count.fetch_add(1, Ordering::Relaxed); + } + if len >= size { + break; + } + } + { + let mut wlock = vec1.write().unwrap(); + if len >= size { + break; + } + assert!(wlock.grow_file().is_ok()); + } + }); + bencher.iter(|| { + let rlock = vec.read().unwrap(); + let len = count1.load(Ordering::Relaxed); + if len > 0 { + let index = thread_rng().gen_range(0, len); + rlock.get((index * std::mem::size_of::()) as u64); + } + }); + std::fs::remove_file(path).unwrap(); +} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index d4d833a6c1..58727339ec 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -14,6 +14,7 @@ bv = { version = "0.11.0", features = ["serde"] } fnv = "1.0.6" hashbrown = "0.1.8" log = "0.4.2" +memmap = "0.6.2" rand = "0.6.5" serde = "1.0.88" serde_derive = "1.0.88" diff --git a/runtime/src/appendvec.rs b/runtime/src/appendvec.rs new file mode 100644 index 0000000000..a763e4d414 --- /dev/null +++ b/runtime/src/appendvec.rs @@ -0,0 +1,336 @@ +use memmap::MmapMut; +use solana_sdk::account::Account; +use solana_sdk::pubkey::Pubkey; +use std::fs::{File, OpenOptions}; +use std::io::{Error, ErrorKind, Result, Seek, SeekFrom, Write}; +use std::marker::PhantomData; +use std::mem; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Mutex; + +const SIZEOF_U64: usize = mem::size_of::(); + +pub struct AppendVec { + data: File, + map: MmapMut, + current_offset: AtomicUsize, + append_lock: Mutex<()>, + file_size: u64, + inc_size: u64, + phantom: PhantomData, +} + +impl AppendVec +where + T: Default, +{ + pub fn new(path: &Path, create: bool, size: u64, inc: u64) -> Self { + let mut data = OpenOptions::new() + .read(true) + .write(true) + .create(create) + .open(path) + .expect("Unable to open data file"); + + data.seek(SeekFrom::Start(size)).unwrap(); + data.write_all(&[0]).unwrap(); + data.seek(SeekFrom::Start(0)).unwrap(); + data.flush().unwrap(); + let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; + + AppendVec { + data, + map, + current_offset: AtomicUsize::new(0), + append_lock: Mutex::new(()), + file_size: size, + inc_size: inc, + phantom: PhantomData, + } + } + + pub fn reset(&mut self) { + let _append_lock = self.append_lock.lock().unwrap(); + self.current_offset.store(0, Ordering::Relaxed); + } + + #[allow(dead_code)] + pub fn get(&self, index: u64) -> &T { + let offset = self.current_offset.load(Ordering::Relaxed); + let at = index as usize; + assert!(offset >= at + mem::size_of::()); + let data = &self.map[at..at + mem::size_of::()]; + let ptr = data.as_ptr() as *const T; + let x: Option<&T> = unsafe { ptr.as_ref() }; + x.unwrap() + } + + #[allow(dead_code)] + pub fn grow_file(&mut self) -> Result<()> { + if self.inc_size == 0 { + return Err(Error::new(ErrorKind::WriteZero, "Grow not supported")); + } + let _append_lock = self.append_lock.lock().unwrap(); + let index = self.current_offset.load(Ordering::Relaxed) + mem::size_of::(); + if index as u64 + self.inc_size < self.file_size { + // grow was already called + return Ok(()); + } + let end = self.file_size + self.inc_size; + drop(self.map.to_owned()); + self.data.seek(SeekFrom::Start(end))?; + self.data.write_all(&[0])?; + self.data.seek(SeekFrom::Start(0))?; + self.data.flush()?; + self.map = unsafe { MmapMut::map_mut(&self.data)? }; + self.file_size = end; + Ok(()) + } + + #[allow(dead_code)] + pub fn append(&self, val: T) -> Option { + let _append_lock = self.append_lock.lock().unwrap(); + let index = self.current_offset.load(Ordering::Relaxed); + + if (self.file_size as usize) < index + mem::size_of::() { + return None; + } + + let data = &self.map[index..(index + mem::size_of::())]; + unsafe { + let ptr = data.as_ptr() as *mut T; + std::ptr::write(ptr, val) + }; + self.current_offset + .fetch_add(mem::size_of::(), Ordering::Relaxed); + Some(index as u64) + } + + fn get_account_size_static() -> usize { + mem::size_of::() + + mem::size_of::() + + mem::size_of::() + + mem::size_of::() + } + + pub fn get_account(&self, index: u64) -> Result { + let mut at = index as usize; + let data = &self.map[at..(at + mem::size_of::())]; + #[allow(clippy::cast_ptr_alignment)] + let size: u64 = unsafe { std::ptr::read_unaligned(data.as_ptr() as *const _) }; + let len = size as usize; + at += SIZEOF_U64 as usize; + + let offset = self.current_offset.load(Ordering::Relaxed); + assert!(offset >= at + len); + + let data = &self.map[at..(at + mem::size_of::())]; + #[allow(clippy::cast_ptr_alignment)] + let tokens: u64 = unsafe { std::ptr::read_unaligned(data.as_ptr() as *const _) }; + at += mem::size_of::(); + + let userdata_len = len - Self::get_account_size_static(); + let mut userdata = vec![]; + userdata.extend_from_slice(&self.map[at..at + userdata_len]); + at += userdata_len; + + let data = &self.map[at..(at + mem::size_of::())]; + let owner: Pubkey = unsafe { std::ptr::read(data.as_ptr() as *const _) }; + at += mem::size_of::(); + + let data = &self.map[at..(at + mem::size_of::())]; + let executable: bool = unsafe { std::ptr::read(data.as_ptr() as *const _) }; + + Ok(Account { + tokens, + userdata, + owner, + executable, + }) + } + + pub fn append_account(&self, account: &Account) -> Option { + let _append_lock = self.append_lock.lock().unwrap(); + let data_at = self.current_offset.load(Ordering::Relaxed); + let len = Self::get_account_size_static() + account.userdata.len(); + + if (self.file_size as usize) < data_at + len + SIZEOF_U64 { + return None; + } + + let mut at = data_at as usize; + unsafe { + let data = &self.map[at..at + mem::size_of::()]; + #[allow(clippy::cast_ptr_alignment)] + let ptr = data.as_ptr() as *mut u64; + std::ptr::write_unaligned(ptr, len as u64); + at += mem::size_of::(); + + let data = &self.map[at..at + mem::size_of::()]; + #[allow(clippy::cast_ptr_alignment)] + let ptr = data.as_ptr() as *mut u64; + std::ptr::write_unaligned(ptr, account.tokens); + at += mem::size_of::(); + + let data = &self.map[at..at + account.userdata.len()]; + let dst = data.as_ptr() as *mut u8; + let data = &account.userdata[0..account.userdata.len()]; + let src = data.as_ptr(); + std::ptr::copy_nonoverlapping(src, dst, account.userdata.len()); + at += account.userdata.len(); + + let data = &self.map[at..at + mem::size_of::()]; + let ptr = data.as_ptr() as *mut Pubkey; + std::ptr::write(ptr, account.owner); + at += mem::size_of::(); + + let data = &self.map[at..at + mem::size_of::()]; + let ptr = data.as_ptr() as *mut bool; + std::ptr::write(ptr, account.executable); + }; + + self.current_offset + .fetch_add(len + SIZEOF_U64, Ordering::Relaxed); + Some(data_at as u64) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use log::info; + use rand::{thread_rng, Rng}; + use solana_sdk::timing::{duration_as_ms, duration_as_s}; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Instant; + + const START_SIZE: u64 = 4 * 1024 * 1024; + const INC_SIZE: u64 = 1 * 1024 * 1024; + + #[test] + fn test_append_vec() { + let path = Path::new("append_vec"); + let av = AppendVec::new(path, true, START_SIZE, INC_SIZE); + let val: u64 = 5; + let index = av.append(val).unwrap(); + assert_eq!(*av.get(index), val); + let val1 = val + 1; + let index1 = av.append(val1).unwrap(); + assert_eq!(*av.get(index), val); + assert_eq!(*av.get(index1), val1); + std::fs::remove_file(path).unwrap(); + } + + #[test] + fn test_append_vec_account() { + let path = Path::new("append_vec_account"); + let av: AppendVec = AppendVec::new(path, true, START_SIZE, INC_SIZE); + let v1 = vec![1u8; 32]; + let mut account1 = Account { + tokens: 1, + userdata: v1, + owner: Pubkey::default(), + executable: false, + }; + let index1 = av.append_account(&account1).unwrap(); + assert_eq!(index1, 0); + assert_eq!(av.get_account(index1).unwrap(), account1); + + let v2 = vec![4u8; 32]; + let mut account2 = Account { + tokens: 1, + userdata: v2, + owner: Pubkey::default(), + executable: false, + }; + let index2 = av.append_account(&account2).unwrap(); + let mut len = AppendVec::::get_account_size_static() + + account1.userdata.len() + + SIZEOF_U64 as usize; + assert_eq!(index2, len as u64); + assert_eq!(av.get_account(index2).unwrap(), account2); + assert_eq!(av.get_account(index1).unwrap(), account1); + + account2.userdata.iter_mut().for_each(|e| *e *= 2); + let index3 = av.append_account(&account2).unwrap(); + len += AppendVec::::get_account_size_static() + + account2.userdata.len() + + SIZEOF_U64 as usize; + assert_eq!(index3, len as u64); + assert_eq!(av.get_account(index3).unwrap(), account2); + + account1.userdata.extend([1, 2, 3, 4, 5, 6].iter().cloned()); + let index4 = av.append_account(&account1).unwrap(); + len += AppendVec::::get_account_size_static() + + account2.userdata.len() + + SIZEOF_U64 as usize; + assert_eq!(index4, len as u64); + assert_eq!(av.get_account(index4).unwrap(), account1); + std::fs::remove_file(path).unwrap(); + } + + #[test] + fn test_grow_append_vec() { + let path = Path::new("grow"); + let mut av = AppendVec::new(path, true, START_SIZE, INC_SIZE); + let mut val = [5u64; 32]; + let size = 100_000; + let mut offsets = vec![0; size]; + + let now = Instant::now(); + for index in 0..size { + if let Some(offset) = av.append(val) { + offsets[index] = offset; + } else { + assert!(av.grow_file().is_ok()); + if let Some(offset) = av.append(val) { + offsets[index] = offset; + } else { + assert!(false); + } + } + val[0] += 1; + } + info!( + "time: {} ms {} / s", + duration_as_ms(&now.elapsed()), + ((mem::size_of::<[u64; 32]>() * size) as f32) / duration_as_s(&now.elapsed()), + ); + + let now = Instant::now(); + let num_reads = 100_000; + for _ in 0..num_reads { + let index = thread_rng().gen_range(0, size); + assert_eq!(av.get(offsets[index])[0], (index + 5) as u64); + } + info!( + "time: {} ms {} / s", + duration_as_ms(&now.elapsed()), + (num_reads as f32) / duration_as_s(&now.elapsed()), + ); + std::fs::remove_file(path).unwrap(); + } + + #[test] + fn random_atomic_change() { + let path = Path::new("random"); + let mut vec = AppendVec::::new(path, true, START_SIZE, INC_SIZE); + let size = 1_000; + for _ in 0..size { + if vec.append(AtomicUsize::new(0)).is_none() { + assert!(vec.grow_file().is_ok()); + assert!(vec.append(AtomicUsize::new(0)).is_some()); + } + } + let index = thread_rng().gen_range(0, size as u64); + let atomic1 = vec.get(index); + let current1 = atomic1.load(Ordering::Relaxed); + let next = current1 + 1; + atomic1.store(next, Ordering::Relaxed); + let atomic2 = vec.get(index); + let current2 = atomic2.load(Ordering::Relaxed); + assert_eq!(current2, next); + std::fs::remove_file(path).unwrap(); + } +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index a797fa286b..dbe11aef79 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -1,4 +1,5 @@ mod accounts; +pub mod appendvec; pub mod bank; pub mod bloom; mod last_id_queue;