diff --git a/runtime/benches/append_vec.rs b/runtime/benches/append_vec.rs index 54c81f39ec..2533ac14e5 100644 --- a/runtime/benches/append_vec.rs +++ b/runtime/benches/append_vec.rs @@ -1,120 +1,241 @@ #![feature(test)] + +extern crate rand; extern crate test; +use bincode::{deserialize, serialize_into, serialized_size}; use rand::{thread_rng, Rng}; -use solana_runtime::append_vec::test_utils::{create_test_account, get_append_vec_path}; -use solana_runtime::append_vec::AppendVec; -use std::sync::{Arc, Mutex}; -use std::thread::sleep; +use solana_runtime::append_vec::{ + deserialize_account, get_serialized_size, serialize_account, AppendVec, +}; +use solana_sdk::account::Account; +use solana_sdk::pubkey::Pubkey; +use std::env; +use std::fs::{create_dir_all, remove_dir_all}; +use std::io::Cursor; +use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; use std::thread::spawn; -use std::time::Duration; use test::Bencher; +const START_SIZE: u64 = 4 * 1024 * 1024; +const INC_SIZE: u64 = 1 * 1024 * 1024; + +macro_rules! align_up { + ($addr: expr, $align: expr) => { + ($addr + ($align - 1)) & !($align - 1) + }; +} + +fn get_append_vec_bench_path(path: &str) -> PathBuf { + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); + let mut buf = PathBuf::new(); + buf.push(&format!("{}/{}", out_dir, path)); + let _ignored = remove_dir_all(out_dir.clone()); + create_dir_all(out_dir).expect("Create directory failed"); + buf +} + #[bench] -fn append_vec_append(bencher: &mut Bencher) { - let path = get_append_vec_path("bench_append"); - let vec = AppendVec::new(&path.path, true, 64 * 1024); +fn append_vec_atomic_append(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_append"); + let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); bencher.iter(|| { - let account = create_test_account(0); - if vec.append_account(&account).is_none() { - vec.reset(); + if vec.append(AtomicUsize::new(0)).is_none() { + assert!(vec.grow_file().is_ok()); + assert!(vec.append(AtomicUsize::new(0)).is_some()); } }); -} - -fn add_test_accounts(vec: &AppendVec, size: usize) -> Vec<(usize, usize)> { - (0..size) - .into_iter() - .filter_map(|sample| { - let account = create_test_account(sample); - vec.append_account(&account).map(|pos| (sample, pos)) - }) - .collect() + std::fs::remove_file(path).unwrap(); } #[bench] -fn append_vec_sequential_read(bencher: &mut Bencher) { - let path = get_append_vec_path("seq_read"); - let vec = AppendVec::new(&path.path, true, 64 * 1024); - let size = 1_000; - let mut indexes = add_test_accounts(&vec, size); - bencher.iter(|| { - let (sample, pos) = indexes.pop().unwrap(); - let account = vec.get_account(pos); - let test = create_test_account(sample); - assert_eq!(*account, test); - indexes.push((sample, pos)); - }); -} -#[bench] -fn append_vec_random_read(bencher: &mut Bencher) { - let path = get_append_vec_path("random_read"); - let vec = AppendVec::new(&path.path, true, 64 * 1024); - let size = 1_000; - let mut indexes = add_test_accounts(&vec, size); - bencher.iter(|| { - let random_index: usize = thread_rng().gen_range(0, indexes.len()); - let (sample, pos) = &indexes[random_index]; - let account = vec.get_account(*pos); - let test = create_test_account(*sample); - assert_eq!(*account, test); - }); -} - -#[bench] -fn append_vec_concurrent_append_read(bencher: &mut Bencher) { - let path = get_append_vec_path("concurrent_read"); - let vec = Arc::new(AppendVec::new(&path.path, true, 1024 * 1024)); - let vec1 = vec.clone(); - let indexes: Arc>> = Arc::new(Mutex::new(vec![])); - let indexes1 = indexes.clone(); - spawn(move || loop { - let sample = indexes1.lock().unwrap().len(); - let account = create_test_account(sample); - if let Some(pos) = vec1.append_account(&account) { - indexes1.lock().unwrap().push((sample, pos)) - } else { - break; +fn append_vec_atomic_random_access(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_ra"); + let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); + let size = 1_000_000; + for _ in 0..size { + if vec.append(AtomicUsize::new(0)).is_none() { + assert!(vec.grow_file().is_ok()); + assert!(vec.append(AtomicUsize::new(0)).is_some()); } - }); - while indexes.lock().unwrap().is_empty() { - sleep(Duration::from_millis(100)); } bencher.iter(|| { - let len = indexes.lock().unwrap().len(); - let random_index: usize = thread_rng().gen_range(0, len); - let (sample, pos) = indexes.lock().unwrap().get(random_index).unwrap().clone(); - let account = vec.get_account(pos); - let test = create_test_account(sample); - assert_eq!(*account, test); + let index = thread_rng().gen_range(0, size as u64); + vec.get(index * std::mem::size_of::() as u64); }); + std::fs::remove_file(path).unwrap(); } #[bench] -fn append_vec_concurrent_read_append(bencher: &mut Bencher) { - let path = get_append_vec_path("concurrent_read"); - let vec = Arc::new(AppendVec::new(&path.path, true, 1024 * 1024)); - let vec1 = vec.clone(); - let indexes: Arc>> = Arc::new(Mutex::new(vec![])); - let indexes1 = indexes.clone(); - spawn(move || loop { - let len = indexes1.lock().unwrap().len(); - let random_index: usize = thread_rng().gen_range(0, len + 1); - let (sample, pos) = indexes1 - .lock() - .unwrap() - .get(random_index % len) - .unwrap() - .clone(); - let account = vec1.get_account(pos); - let test = create_test_account(sample); - assert_eq!(*account, test); - }); +fn append_vec_atomic_random_change(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_rax"); + let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); + let size = 1_000_000; + for k in 0..size { + if vec.append(AtomicUsize::new(k)).is_none() { + assert!(vec.grow_file().is_ok()); + assert!(vec.append(AtomicUsize::new(k)).is_some()); + } + } bencher.iter(|| { - let sample: usize = thread_rng().gen_range(0, 256); - let account = create_test_account(sample); - if let Some(pos) = vec.append_account(&account) { - indexes.lock().unwrap().push((sample, pos)) + let index = thread_rng().gen_range(0, size as u64); + let atomic1 = vec.get(index * std::mem::size_of::() as u64); + let current1 = atomic1.load(Ordering::Relaxed); + assert_eq!(current1, index as usize); + let next = current1 + 1; + let mut index = vec.append(AtomicUsize::new(next)); + if index.is_none() { + assert!(vec.grow_file().is_ok()); + index = vec.append(AtomicUsize::new(next)); + } + let atomic2 = vec.get(index.unwrap()); + let current2 = atomic2.load(Ordering::Relaxed); + assert_eq!(current2, next); + }); + std::fs::remove_file(path).unwrap(); +} + +#[bench] +fn append_vec_atomic_random_read(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_read"); + let mut vec = AppendVec::::new(&path, true, START_SIZE, INC_SIZE); + let size = 1_000_000; + for _ in 0..size { + if vec.append(AtomicUsize::new(0)).is_none() { + assert!(vec.grow_file().is_ok()); + assert!(vec.append(AtomicUsize::new(0)).is_some()); + } + } + bencher.iter(|| { + let index = thread_rng().gen_range(0, size); + let atomic1 = vec.get((index * std::mem::size_of::()) as u64); + let current1 = atomic1.load(Ordering::Relaxed); + assert_eq!(current1, 0); + }); + std::fs::remove_file(path).unwrap(); +} + +#[bench] +fn append_vec_concurrent_lock_append(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_lock_append"); + let vec = Arc::new(RwLock::new(AppendVec::::new( + &path, true, START_SIZE, INC_SIZE, + ))); + let vec1 = vec.clone(); + let size = 1_000_000; + let count = Arc::new(AtomicUsize::new(0)); + let count1 = count.clone(); + spawn(move || loop { + let mut len = count.load(Ordering::Relaxed); + { + let rlock = vec1.read().unwrap(); + loop { + if rlock.append(AtomicUsize::new(0)).is_none() { + break; + } + len = count.fetch_add(1, Ordering::Relaxed); + } + if len >= size { + break; + } + } + { + let mut wlock = vec1.write().unwrap(); + if len >= size { + break; + } + assert!(wlock.grow_file().is_ok()); } }); + bencher.iter(|| { + let _rlock = vec.read().unwrap(); + let len = count1.load(Ordering::Relaxed); + assert!(len < size * 2); + }); + std::fs::remove_file(path).unwrap(); +} + +#[bench] +fn append_vec_concurrent_get_append(bencher: &mut Bencher) { + let path = get_append_vec_bench_path("bench_get_append"); + let vec = Arc::new(RwLock::new(AppendVec::::new( + &path, true, START_SIZE, INC_SIZE, + ))); + let vec1 = vec.clone(); + let size = 1_000_000; + let count = Arc::new(AtomicUsize::new(0)); + let count1 = count.clone(); + spawn(move || loop { + let mut len = count.load(Ordering::Relaxed); + { + let rlock = vec1.read().unwrap(); + loop { + if rlock.append(AtomicUsize::new(0)).is_none() { + break; + } + len = count.fetch_add(1, Ordering::Relaxed); + } + if len >= size { + break; + } + } + { + let mut wlock = vec1.write().unwrap(); + if len >= size { + break; + } + assert!(wlock.grow_file().is_ok()); + } + }); + bencher.iter(|| { + let rlock = vec.read().unwrap(); + let len = count1.load(Ordering::Relaxed); + if len > 0 { + let index = thread_rng().gen_range(0, len); + rlock.get((index * std::mem::size_of::()) as u64); + } + }); + std::fs::remove_file(path).unwrap(); +} + +#[bench] +fn bench_account_serialize(bencher: &mut Bencher) { + let num: usize = 1000; + let account = Account::new(2, 100, &Pubkey::new_rand()); + let len = get_serialized_size(&account); + let ser_len = align_up!(len + std::mem::size_of::(), std::mem::size_of::()); + let mut memory = test::black_box(vec![0; num * ser_len]); + bencher.iter(|| { + for i in 0..num { + let start = i * ser_len; + serialize_account(&mut memory[start..start + ser_len], &account, len); + } + }); + + let index = thread_rng().gen_range(0, num); + let start = index * ser_len; + let new_account = deserialize_account(&memory[start..start + ser_len], 0, num * len).unwrap(); + assert_eq!(new_account, account); +} + +#[bench] +fn bench_account_serialize_bincode(bencher: &mut Bencher) { + let num: usize = 1000; + let account = Account::new(2, 100, &Pubkey::new_rand()); + let len = serialized_size(&account).unwrap() as usize; + let mut memory = test::black_box(vec![0u8; num * len]); + bencher.iter(|| { + for i in 0..num { + let start = i * len; + let cursor = Cursor::new(&mut memory[start..start + len]); + serialize_into(cursor, &account).unwrap(); + } + }); + + let index = thread_rng().gen_range(0, len); + let start = index * len; + let new_account: Account = deserialize(&memory[start..start + len]).unwrap(); + assert_eq!(new_account, account); } diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 847cd0bcbd..21668adc0f 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -18,7 +18,7 @@ use std::env; use std::fs::{create_dir_all, remove_dir_all}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Mutex, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; pub type InstructionAccounts = Vec; pub type InstructionLoaders = Vec>; @@ -101,7 +101,7 @@ struct AccountInfo { id: AppendVecId, /// offset into the storage - offset: usize, + offset: u64, /// lamports in the account used when squashing kept for optimization /// purposes to remove accounts with zero balance. @@ -127,7 +127,7 @@ struct AccountIndex { /// Persistent storage structure holding the accounts struct AccountStorageEntry { /// storage holding the accounts - accounts: AppendVec, + accounts: Arc>>, /// Keeps track of the number of accounts stored in a specific AppendVec. /// This is periodically checked to reuse the stores that do not have @@ -139,12 +139,17 @@ struct AccountStorageEntry { } impl AccountStorageEntry { - pub fn new(path: &str, id: usize, file_size: u64) -> Self { + pub fn new(path: &str, id: usize, file_size: u64, inc_size: u64) -> Self { let p = format!("{}/{}", path, id); let path = Path::new(&p); let _ignored = remove_dir_all(path); create_dir_all(path).expect("Create directory failed"); - let accounts = AppendVec::new(&path.join(ACCOUNT_DATA_FILE), true, file_size as usize); + let accounts = Arc::new(RwLock::new(AppendVec::::new( + &path.join(ACCOUNT_DATA_FILE), + true, + file_size, + inc_size, + ))); AccountStorageEntry { accounts, @@ -167,7 +172,7 @@ impl AccountStorageEntry { fn remove_account(&self) { if self.count.fetch_sub(1, Ordering::Relaxed) == 1 { - self.accounts.reset(); + self.accounts.write().unwrap().reset(); self.set_status(AccountStorageStatus::StorageAvailable); } } @@ -195,6 +200,9 @@ pub struct AccountsDB { /// Starting file size of appendvecs file_size: u64, + + /// Increment size of appendvecs + inc_size: u64, } /// This structure handles synchronization for db @@ -233,7 +241,7 @@ impl Drop for Accounts { } impl AccountsDB { - pub fn new_with_file_size(fork: Fork, paths: &str, file_size: u64) -> Self { + pub fn new_with_file_size(fork: Fork, paths: &str, file_size: u64, inc_size: u64) -> Self { let account_index = AccountIndex { account_maps: RwLock::new(HashMap::new()), }; @@ -245,6 +253,7 @@ impl AccountsDB { parents_map: RwLock::new(HashMap::new()), paths, file_size, + inc_size, }; accounts_db.add_storage(&accounts_db.paths); accounts_db.add_fork(fork, None); @@ -252,7 +261,7 @@ impl AccountsDB { } pub fn new(fork: Fork, paths: &str) -> Self { - Self::new_with_file_size(fork, paths, ACCOUNT_DATA_FILE_SIZE) + Self::new_with_file_size(fork, paths, ACCOUNT_DATA_FILE_SIZE, 0) } pub fn add_fork(&self, fork: Fork, parent: Option) { @@ -278,6 +287,7 @@ impl AccountsDB { path, self.next_id.fetch_add(1, Ordering::Relaxed), self.file_size, + self.inc_size, ) } @@ -319,11 +329,10 @@ impl AccountsDB { Some(hash(&serialize(&ordered_accounts).unwrap())) } - fn get_account(&self, id: AppendVecId, offset: usize) -> Account { - self.storage.read().unwrap()[id] - .accounts - .get_account(offset) - .clone() + fn get_account(&self, id: AppendVecId, offset: u64) -> Account { + let accounts = &self.storage.read().unwrap()[id].accounts; + let av = accounts.read().unwrap(); + av.get_account(offset).unwrap() } fn load(&self, fork: Fork, pubkey: &Pubkey, walk_back: bool) -> Option { @@ -431,8 +440,8 @@ impl AccountsDB { id } - fn append_account(&self, account: &Account) -> (usize, usize) { - let offset: usize; + fn append_account(&self, account: &Account) -> (usize, u64) { + let offset: u64; let start = self.next_id.fetch_add(1, Ordering::Relaxed); let mut id = self.get_storage_id(start, std::usize::MAX); @@ -445,10 +454,10 @@ impl AccountsDB { } loop { - let result: Option; + let result: Option; { let av = &self.storage.read().unwrap()[id].accounts; - result = av.append_account(acc); + result = av.read().unwrap().append_account(acc); } if let Some(val) = result { offset = val; @@ -1619,7 +1628,7 @@ mod tests { fn test_account_grow_many() { let paths = get_tmp_accounts_path("many2,many3"); let size = 4096; - let accounts = AccountsDB::new_with_file_size(0, &paths.paths, size); + let accounts = AccountsDB::new_with_file_size(0, &paths.paths, size, 0); let mut keys = vec![]; for i in 0..9 { let key = Pubkey::new_rand(); diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index 9bbcd80c95..9ac2f8e1f4 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -1,276 +1,358 @@ -use memmap::MmapMut; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use memmap::{Mmap, MmapMut}; use solana_sdk::account::Account; -use std::fs::OpenOptions; -use std::io::{Seek, SeekFrom, Write}; +use solana_sdk::pubkey::Pubkey; +use std::fs::{File, OpenOptions}; +use std::io::{Error, ErrorKind, Read, Result, Seek, SeekFrom, Write}; +use std::marker::PhantomData; use std::mem; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; -//Data is aligned at the next 64 byte offset. Without alignment loading the memory may -//crash on some architectures. +const SIZEOF_U64: usize = mem::size_of::(); + macro_rules! align_up { ($addr: expr, $align: expr) => { ($addr + ($align - 1)) & !($align - 1) }; } -pub struct AppendVec { - map: MmapMut, - // This mutex forces append to be single threaded, but concurrent with reads - append_offset: Mutex, - current_len: AtomicUsize, +pub struct AppendVec { + data: File, + mmap: Mmap, + current_offset: AtomicUsize, + mmap_mut: Mutex, file_size: u64, + inc_size: u64, + phantom: PhantomData, } -impl AppendVec { - #[allow(clippy::mutex_atomic)] - pub fn new(file: &Path, create: bool, size: usize) -> Self { +fn get_account_size_static() -> usize { + mem::size_of::() - mem::size_of::>() +} + +pub fn get_serialized_size(account: &Account) -> usize { + get_account_size_static() + account.data.len() +} + +pub fn serialize_account(dst_slice: &mut [u8], account: &Account, len: usize) { + let mut at = 0; + + write_u64(&mut at, dst_slice, len as u64); + write_u64(&mut at, dst_slice, account.lamports); + write_bytes(&mut at, dst_slice, &account.data); + write_bytes(&mut at, dst_slice, account.owner.as_ref()); + write_bytes(&mut at, dst_slice, &[account.executable as u8]); +} + +fn read_bytes(at: &mut usize, dst_slice: &mut [u8], src_slice: &[u8], len: usize) { + let data = &src_slice[*at..*at + len]; + (&data[..]).read_exact(&mut dst_slice[..]).unwrap(); + *at += len; +} + +fn write_bytes(at: &mut usize, dst_slice: &mut [u8], src_slice: &[u8]) { + let data = &mut dst_slice[*at..*at + src_slice.len()]; + (&mut data[..]).write_all(&src_slice).unwrap(); + *at += src_slice.len(); +} + +fn read_u64(at: &mut usize, src_slice: &[u8]) -> u64 { + let data = &src_slice[*at..*at + mem::size_of::()]; + *at += mem::size_of::(); + (&data[..]).read_u64::().unwrap() +} + +fn write_u64(at: &mut usize, dst_slice: &mut [u8], value: u64) { + let data = &mut dst_slice[*at..*at + mem::size_of::()]; + (&mut data[..]).write_u64::(value).unwrap(); + *at += mem::size_of::(); +} + +pub fn deserialize_account( + src_slice: &[u8], + index: usize, + current_offset: usize, +) -> Result { + let mut at = index; + + let size = read_u64(&mut at, &src_slice); + let len = size as usize; + assert!(current_offset >= at + len); + + let lamports = read_u64(&mut at, &src_slice); + + let data_len = len - get_account_size_static(); + let mut data = vec![0; data_len]; + read_bytes(&mut at, &mut data, &src_slice, data_len); + + let mut pubkey = vec![0; mem::size_of::()]; + read_bytes(&mut at, &mut pubkey, &src_slice, mem::size_of::()); + let owner = Pubkey::new(&pubkey); + + let mut exec = vec![0; mem::size_of::()]; + read_bytes(&mut at, &mut exec, &src_slice, mem::size_of::()); + let executable: bool = exec[0] != 0; + + Ok(Account { + lamports, + data, + owner, + executable, + }) +} + +impl AppendVec +where + T: Default, +{ + pub fn new(path: &Path, create: bool, size: u64, inc: u64) -> Self { let mut data = OpenOptions::new() .read(true) .write(true) .create(create) - .open(file) + .open(path) .expect("Unable to open data file"); - data.seek(SeekFrom::Start(size as u64)).unwrap(); + data.seek(SeekFrom::Start(size)).unwrap(); data.write_all(&[0]).unwrap(); data.seek(SeekFrom::Start(0)).unwrap(); data.flush().unwrap(); - let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; + let mmap = unsafe { Mmap::map(&data).expect("failed to map the data file") }; + let mmap_mut = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; AppendVec { - map, - // This mutex forces append to be single threaded, but concurrent with reads - append_offset: Mutex::new(0), - current_len: AtomicUsize::new(0), - file_size: size as u64, + data, + mmap, + current_offset: AtomicUsize::new(0), + mmap_mut: Mutex::new(mmap_mut), + file_size: size, + inc_size: inc, + phantom: PhantomData, } } - #[allow(clippy::mutex_atomic)] - pub fn reset(&self) { - // This mutex forces append to be single threaded, but concurrent with reads - let mut offset = self.append_offset.lock().unwrap(); - self.current_len.store(0, Ordering::Relaxed); - *offset = 0; + pub fn reset(&mut self) { + let _mmap_mut = self.mmap_mut.lock().unwrap(); + self.current_offset.store(0, Ordering::Relaxed); } - pub fn len(&self) -> usize { - self.current_len.load(Ordering::Relaxed) + #[allow(dead_code)] + pub fn get(&self, index: u64) -> &T { + let offset = self.current_offset.load(Ordering::Relaxed); + let at = index as usize; + assert!(offset >= at + mem::size_of::()); + let data = &self.mmap[at..at + mem::size_of::()]; + let ptr = data.as_ptr() as *const T; + let x: Option<&T> = unsafe { ptr.as_ref() }; + x.unwrap() } - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn capacity(&self) -> u64 { - self.file_size - } - - // The reason for the `mut` is to allow the account data pointer to be fixed up after - // the structure is loaded - #[allow(clippy::mut_from_ref)] - fn get_slice(&self, offset: usize, size: usize) -> &mut [u8] { - let len = self.len(); - assert!(len >= offset + size); - let data = &self.map[offset..offset + size]; - unsafe { - let dst = data.as_ptr() as *mut u8; - std::slice::from_raw_parts_mut(dst, size) + #[allow(dead_code)] + pub fn grow_file(&mut self) -> Result<()> { + if self.inc_size == 0 { + return Err(Error::new(ErrorKind::WriteZero, "Grow not supported")); } - } - - fn append_ptr(&self, offset: &mut usize, src: *const u8, len: usize) { - //Data is aligned at the next 64 byte offset. Without alignment loading the memory may - //crash on some architectures. - let pos = align_up!(*offset as usize, mem::size_of::()); - let data = &self.map[pos..(pos + len)]; - unsafe { - let dst = data.as_ptr() as *mut u8; - std::ptr::copy(src, dst, len); - }; - *offset = pos + len; - } - - #[allow(clippy::mutex_atomic)] - fn append_ptrs(&self, vals: &[(*const u8, usize)]) -> Option { - // This mutex forces append to be single threaded, but concurrent with reads - let mut offset = self.append_offset.lock().unwrap(); - let mut end = *offset; - for val in vals { - //Data is aligned at the next 64 byte offset. Without alignment loading the memory may - //crash on some architectures. - end = align_up!(end, mem::size_of::()); - end += val.1; + let mut mmap_mut = self.mmap_mut.lock().unwrap(); + let index = self.current_offset.load(Ordering::Relaxed) + mem::size_of::(); + if index as u64 + self.inc_size < self.file_size { + // grow was already called + return Ok(()); } + let end = self.file_size + self.inc_size; + drop(mmap_mut.to_owned()); + drop(self.mmap.to_owned()); + self.data.seek(SeekFrom::Start(end))?; + self.data.write_all(&[0])?; + self.mmap = unsafe { Mmap::map(&self.data)? }; + *mmap_mut = unsafe { MmapMut::map_mut(&self.data)? }; + self.file_size = end; + Ok(()) + } - if (self.file_size as usize) <= end { + #[allow(dead_code)] + pub fn append(&self, val: T) -> Option { + let mmap_mut = self.mmap_mut.lock().unwrap(); + let index = self.current_offset.load(Ordering::Relaxed); + + if (self.file_size as usize) < index + mem::size_of::() { return None; } - //Data is aligned at the next 64 byte offset. Without alignment loading the memory may - //crash on some architectures. - let pos = align_up!(*offset, mem::size_of::()); - for val in vals { - self.append_ptr(&mut offset, val.0, val.1) - } - self.current_len.store(*offset, Ordering::Relaxed); - Some(pos) - } - - #[allow(clippy::transmute_ptr_to_ptr)] - pub fn get_account(&self, offset: usize) -> &Account { - let account: *mut Account = { - let data = self.get_slice(offset, mem::size_of::()); - unsafe { std::mem::transmute::<*const u8, *mut Account>(data.as_ptr()) } - }; - //Data is aligned at the next 64 byte offset. Without alignment loading the memory may - //crash on some architectures. - let data_at = align_up!(offset + mem::size_of::(), mem::size_of::()); - let account_ref: &mut Account = unsafe { &mut *account }; - let data = self.get_slice(data_at, account_ref.data.len()); + let data = &mmap_mut[index..(index + mem::size_of::())]; unsafe { - let mut new_data = Vec::from_raw_parts(data.as_mut_ptr(), data.len(), data.len()); - std::mem::swap(&mut account_ref.data, &mut new_data); - std::mem::forget(new_data); + let ptr = data.as_ptr() as *mut T; + std::ptr::write(ptr, val) }; - account_ref + self.current_offset + .fetch_add(mem::size_of::(), Ordering::Relaxed); + Some(index as u64) } - pub fn accounts(&self, mut start: usize) -> Vec<&Account> { - let mut accounts = vec![]; - loop { - //Data is aligned at the next 64 byte offset. Without alignment loading the memory may - //crash on some architectures. - let end = align_up!(start + mem::size_of::(), mem::size_of::()); - if end > self.len() { - break; - } - let first = self.get_account(start); - accounts.push(first); - //Data is aligned at the next 64 byte offset. Without alignment loading the memory may - //crash on some architectures. - let data_at = align_up!(start + mem::size_of::(), mem::size_of::()); - let next = align_up!(data_at + first.data.len(), mem::size_of::()); - start = next; + pub fn get_account(&self, index: u64) -> Result { + let index = index as usize; + deserialize_account( + &self.mmap[..], + index, + self.current_offset.load(Ordering::Relaxed), + ) + } + + pub fn append_account(&self, account: &Account) -> Option { + let mut mmap_mut = self.mmap_mut.lock().unwrap(); + let data_at = align_up!( + self.current_offset.load(Ordering::Relaxed), + mem::size_of::() + ); + let len = get_serialized_size(account); + + if (self.file_size as usize) < data_at + len + SIZEOF_U64 { + return None; } - accounts - } - pub fn append_account(&self, account: &Account) -> Option { - let acc_ptr = account as *const Account; - let data_len = account.data.len(); - let data_ptr = account.data.as_ptr(); - let ptrs = [ - (acc_ptr as *const u8, mem::size_of::()), - (data_ptr, data_len), - ]; - self.append_ptrs(&ptrs) - } -} + serialize_account( + &mut mmap_mut[data_at..data_at + len + SIZEOF_U64], + &account, + len, + ); -pub mod test_utils { - use solana_sdk::account::Account; - use solana_sdk::pubkey::Pubkey; - use std::fs::{create_dir_all, remove_dir_all}; - use std::path::PathBuf; - - pub struct TempFile { - pub path: PathBuf, - } - - impl Drop for TempFile { - fn drop(&mut self) { - let mut path = PathBuf::new(); - std::mem::swap(&mut path, &mut self.path); - std::fs::remove_file(path).unwrap(); - } - } - - pub fn get_append_vec_path(path: &str) -> TempFile { - let out_dir = std::env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); - let mut buf = PathBuf::new(); - buf.push(&format!("{}/{}", out_dir, path)); - let _ignored = remove_dir_all(out_dir.clone()); - create_dir_all(out_dir).expect("Create directory failed"); - TempFile { path: buf } - } - - pub fn create_test_account(sample: usize) -> Account { - let data_len = sample % 256; - let mut account = Account::new(sample as u64, 0, &Pubkey::default()); - account.data = (0..data_len).map(|_| data_len as u8).collect(); - account + self.current_offset + .store(data_at + len + SIZEOF_U64, Ordering::Relaxed); + Some(data_at as u64) } } #[cfg(test)] pub mod tests { - use super::test_utils::*; use super::*; use log::*; use rand::{thread_rng, Rng}; - use solana_sdk::timing::duration_as_ms; + use solana_sdk::timing::{duration_as_ms, duration_as_s}; + use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Instant; + const START_SIZE: u64 = 4 * 1024 * 1024; + const INC_SIZE: u64 = 1 * 1024 * 1024; + #[test] fn test_append_vec() { - let path = get_append_vec_path("test_append"); - let av = AppendVec::new(&path.path, true, 1024 * 1024); - let account = create_test_account(0); - let index = av.append_account(&account).unwrap(); - assert_eq!(*av.get_account(index), account); + let path = Path::new("append_vec"); + let av = AppendVec::new(path, true, START_SIZE, INC_SIZE); + let val: u64 = 5; + let index = av.append(val).unwrap(); + assert_eq!(*av.get(index), val); + let val1 = val + 1; + let index1 = av.append(val1).unwrap(); + assert_eq!(*av.get(index), val); + assert_eq!(*av.get(index1), val1); + std::fs::remove_file(path).unwrap(); } #[test] - fn test_append_vec_data() { - let path = get_append_vec_path("test_append_data"); - let av = AppendVec::new(&path.path, true, 1024 * 1024); - let account = create_test_account(5); - let index = av.append_account(&account).unwrap(); - assert_eq!(*av.get_account(index), account); - let account1 = create_test_account(6); + fn test_append_vec_account() { + let path = Path::new("append_vec_account"); + let av: AppendVec = AppendVec::new(path, true, START_SIZE, INC_SIZE); + let v1 = vec![1u8; 32]; + let mut account1 = Account { + lamports: 1, + data: v1, + owner: Pubkey::default(), + executable: false, + }; let index1 = av.append_account(&account1).unwrap(); - assert_eq!(*av.get_account(index), account); - assert_eq!(*av.get_account(index1), account1); + assert_eq!(index1, 0); + assert_eq!(av.get_account(index1).unwrap(), account1); + + let v2 = vec![4u8; 32]; + let mut account2 = Account { + lamports: 1, + data: v2, + owner: Pubkey::default(), + executable: false, + }; + let index2 = av.append_account(&account2).unwrap(); + let mut len = get_serialized_size(&account1) + SIZEOF_U64 as usize; + assert_eq!(index2, len as u64); + assert_eq!(av.get_account(index2).unwrap(), account2); + assert_eq!(av.get_account(index1).unwrap(), account1); + + account2.data.iter_mut().for_each(|e| *e *= 2); + let index3 = av.append_account(&account2).unwrap(); + len += get_serialized_size(&account2) + SIZEOF_U64 as usize; + assert_eq!(index3, len as u64); + assert_eq!(av.get_account(index3).unwrap(), account2); + + account1.data.extend([1, 2, 3, 4, 5, 6].iter().cloned()); + let index4 = av.append_account(&account1).unwrap(); + len += get_serialized_size(&account2) + SIZEOF_U64 as usize; + assert_eq!(index4, len as u64); + assert_eq!(av.get_account(index4).unwrap(), account1); + std::fs::remove_file(path).unwrap(); } #[test] - fn test_append_vec_append_many() { - let path = get_append_vec_path("test_append_many"); - let av = AppendVec::new(&path.path, true, 1024 * 1024); - let size = 1000; - let mut indexes = vec![]; - let now = Instant::now(); - for sample in 0..size { - let account = create_test_account(sample); - let pos = av.append_account(&account).unwrap(); - assert_eq!(*av.get_account(pos), account); - indexes.push(pos) - } - trace!("append time: {} ms", duration_as_ms(&now.elapsed()),); + fn test_grow_append_vec() { + let path = Path::new("grow"); + let mut av = AppendVec::new(path, true, START_SIZE, INC_SIZE); + let mut val = [5u64; 32]; + let size = 100_000; + let mut offsets = vec![0; size]; let now = Instant::now(); - for _ in 0..size { - let sample = thread_rng().gen_range(0, indexes.len()); - let account = create_test_account(sample); - assert_eq!(*av.get_account(indexes[sample]), account); + for index in 0..size { + if let Some(offset) = av.append(val) { + offsets[index] = offset; + } else { + assert!(av.grow_file().is_ok()); + if let Some(offset) = av.append(val) { + offsets[index] = offset; + } else { + assert!(false); + } + } + val[0] += 1; } - trace!("random read time: {} ms", duration_as_ms(&now.elapsed()),); - - let now = Instant::now(); - assert_eq!(indexes.len(), size); - assert_eq!(indexes[0], 0); - let accounts = av.accounts(indexes[0]); - assert_eq!(accounts.len(), size); - for (sample, v) in accounts.iter().enumerate() { - let account = create_test_account(sample); - assert_eq!(**v, account) - } - trace!( - "sequential read time: {} ms", + info!( + "time: {} ms {} / s", duration_as_ms(&now.elapsed()), + ((mem::size_of::<[u64; 32]>() * size) as f32) / duration_as_s(&now.elapsed()), ); + + let now = Instant::now(); + let num_reads = 100_000; + for _ in 0..num_reads { + let index = thread_rng().gen_range(0, size); + assert_eq!(av.get(offsets[index])[0], (index + 5) as u64); + } + info!( + "time: {} ms {} / s", + duration_as_ms(&now.elapsed()), + (num_reads as f32) / duration_as_s(&now.elapsed()), + ); + std::fs::remove_file(path).unwrap(); + } + + #[test] + fn random_atomic_change() { + let path = Path::new("random"); + let mut vec = AppendVec::::new(path, true, START_SIZE, INC_SIZE); + let size = 1_000; + for k in 0..size { + if vec.append(AtomicUsize::new(k)).is_none() { + assert!(vec.grow_file().is_ok()); + assert!(vec.append(AtomicUsize::new(0)).is_some()); + } + } + let index = thread_rng().gen_range(0, size as u64); + let atomic1 = vec.get(index * mem::size_of::() as u64); + let current1 = atomic1.load(Ordering::Relaxed); + assert_eq!(current1, index as usize); + let next = current1 + 1; + let index = vec.append(AtomicUsize::new(next)).unwrap(); + let atomic2 = vec.get(index); + let current2 = atomic2.load(Ordering::Relaxed); + assert_eq!(current2, next); + std::fs::remove_file(path).unwrap(); } }