Document and clean up AppendVec (#15640)
* Document AppendVec * Remove the almost-duplicate state in AppendVec AppendVec was maintaining two offsets, `current_len` and `append_offset`. Despite the different looking names, the two values have the same meaning, but were updated at slightly different times. When appending a batch of accounts, `current_len` updates would be immediately available to other threads after each append, whereas `append_offset` would only be updated after its mutex was unlocked. `append_offset` is redundant. By removing it, we eliminate potential bugs and no longer need to suppress clippy warnings. * Remove get_mut() from AppendVec design Only the offset into the AppendVec memory is thread-safe. The memory itself is only threadsafe because it is append-only and is otherwise unprotected. Adding get_mut() would only be safe if the memory was protected by a ReadWrite lock.
This commit is contained in:
@@ -20,8 +20,8 @@ use std::{
|
||||
sync::Mutex,
|
||||
};
|
||||
|
||||
//Data placement should be aligned at the next boundary. Without alignment accessing the memory may
|
||||
//crash on some architectures.
|
||||
// Data placement should be aligned at the next boundary. Without alignment accessing the memory may
|
||||
// crash on some architectures.
|
||||
const ALIGN_BOUNDARY_OFFSET: usize = mem::size_of::<u64>();
|
||||
macro_rules! u64_align {
|
||||
($addr: expr) => {
|
||||
@@ -68,8 +68,8 @@ impl<'a> From<&'a Account> for AccountMeta {
|
||||
}
|
||||
}
|
||||
|
||||
/// References to Memory Mapped memory
|
||||
/// The Account is stored separately from its data, so getting the actual account requires a clone
|
||||
/// References to account data stored elsewhere. Getting an `Account` requires cloning
|
||||
/// (see `StoredAccountMeta::clone_account()`).
|
||||
#[derive(PartialEq, Debug)]
|
||||
pub struct StoredAccountMeta<'a> {
|
||||
pub meta: &'a StoredMeta,
|
||||
@@ -82,6 +82,7 @@ pub struct StoredAccountMeta<'a> {
|
||||
}
|
||||
|
||||
impl<'a> StoredAccountMeta<'a> {
|
||||
/// Return a new Account by copying all the data referenced by the `StoredAccountMeta`.
|
||||
pub fn clone_account(&self) -> Account {
|
||||
Account {
|
||||
lamports: self.account_meta.lamports,
|
||||
@@ -116,16 +117,28 @@ impl<'a> StoredAccountMeta<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A thread-safe, file-backed block of memory used to store `Account` instances. Append operations
|
||||
/// are serialized such that only one thread updates the internal `append_lock` at a time. No
|
||||
/// restrictions are placed on reading. That is, one may read items from one thread while another
|
||||
/// is appending new items.
|
||||
#[derive(Debug, AbiExample)]
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
pub struct AppendVec {
|
||||
/// The file path where the data is stored.
|
||||
path: PathBuf,
|
||||
|
||||
/// A file-backed block of memory that is used to store the data for each appended item.
|
||||
map: MmapMut,
|
||||
// This mutex forces append to be single threaded, but concurrent with reads
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
append_offset: Mutex<usize>,
|
||||
|
||||
/// A lock used to serialize append operations.
|
||||
append_lock: Mutex<()>,
|
||||
|
||||
/// The number of bytes used to store items, not the number of items.
|
||||
current_len: AtomicUsize,
|
||||
|
||||
/// The number of bytes available for storing items.
|
||||
file_size: u64,
|
||||
|
||||
/// True if the file should automatically be deleted when this AppendVec is dropped.
|
||||
remove_on_drop: bool,
|
||||
}
|
||||
|
||||
@@ -143,7 +156,6 @@ impl Drop for AppendVec {
|
||||
}
|
||||
|
||||
impl AppendVec {
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
pub fn new(file: &Path, create: bool, size: usize) -> Self {
|
||||
let initial_len = 0;
|
||||
AppendVec::sanitize_len_and_size(initial_len, size).unwrap();
|
||||
@@ -192,7 +204,7 @@ impl AppendVec {
|
||||
map,
|
||||
// This mutex forces append to be single threaded, but concurrent with reads
|
||||
// See UNSAFE usage in `append_ptr`
|
||||
append_offset: Mutex::new(initial_len),
|
||||
append_lock: Mutex::new(()),
|
||||
current_len: AtomicUsize::new(initial_len),
|
||||
file_size: size as u64,
|
||||
remove_on_drop: true,
|
||||
@@ -203,7 +215,6 @@ impl AppendVec {
|
||||
self.remove_on_drop = false;
|
||||
}
|
||||
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
pub fn new_empty_map(current_len: usize) -> Self {
|
||||
let map = MmapMut::map_anon(1).unwrap_or_else(|e| {
|
||||
error!(
|
||||
@@ -217,7 +228,7 @@ impl AppendVec {
|
||||
AppendVec {
|
||||
path: PathBuf::from(String::default()),
|
||||
map,
|
||||
append_offset: Mutex::new(current_len),
|
||||
append_lock: Mutex::new(()),
|
||||
current_len: AtomicUsize::new(current_len),
|
||||
file_size: 0, // will be filled by set_file()
|
||||
remove_on_drop: true,
|
||||
@@ -249,13 +260,11 @@ impl AppendVec {
|
||||
self.map.flush()
|
||||
}
|
||||
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
pub fn reset(&self) {
|
||||
// This mutex forces append to be single threaded, but concurrent with reads
|
||||
// See UNSAFE usage in `append_ptr`
|
||||
let mut offset = self.append_offset.lock().unwrap();
|
||||
let _lock = self.append_lock.lock().unwrap();
|
||||
self.current_len.store(0, Ordering::Relaxed);
|
||||
*offset = 0;
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
@@ -279,7 +288,6 @@ impl AppendVec {
|
||||
PathBuf::from(&format!("{}.{}", slot, id))
|
||||
}
|
||||
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
pub fn new_from_file<P: AsRef<Path>>(path: P, current_len: usize) -> io::Result<(Self, usize)> {
|
||||
let data = OpenOptions::new()
|
||||
.read(true)
|
||||
@@ -295,7 +303,7 @@ impl AppendVec {
|
||||
let new = AppendVec {
|
||||
path: path.as_ref().to_path_buf(),
|
||||
map,
|
||||
append_offset: Mutex::new(current_len),
|
||||
append_lock: Mutex::new(()),
|
||||
current_len: AtomicUsize::new(current_len),
|
||||
file_size,
|
||||
remove_on_drop: true,
|
||||
@@ -333,6 +341,10 @@ impl AppendVec {
|
||||
(offset == aligned_current_len, num_accounts)
|
||||
}
|
||||
|
||||
/// Get a reference to the data at `offset` of `size` bytes if that slice
|
||||
/// doesn't overrun the internal buffer. Otherwise return None.
|
||||
/// Also return the offset of the first byte after the requested data that
|
||||
/// falls on a 64-byte boundary.
|
||||
fn get_slice(&self, offset: usize, size: usize) -> Option<(&[u8], usize)> {
|
||||
let (next, overflow) = offset.overflowing_add(size);
|
||||
if overflow || next > self.len() {
|
||||
@@ -349,11 +361,13 @@ impl AppendVec {
|
||||
))
|
||||
}
|
||||
|
||||
/// Copy `len` bytes from `src` to the first 64-byte boundary after position `offset` of
|
||||
/// the internal buffer. Then update `offset` to the first byte after the copied data.
|
||||
fn append_ptr(&self, offset: &mut usize, src: *const u8, len: usize) {
|
||||
let pos = u64_align!(*offset);
|
||||
let data = &self.map[pos..(pos + len)];
|
||||
//UNSAFE: This mut append is safe because only 1 thread can append at a time
|
||||
//Mutex<append_offset> guarantees exclusive write access to the memory occupied in
|
||||
//Mutex<()> guarantees exclusive write access to the memory occupied in
|
||||
//the range.
|
||||
unsafe {
|
||||
let dst = data.as_ptr() as *mut u8;
|
||||
@@ -362,6 +376,10 @@ impl AppendVec {
|
||||
*offset = pos + len;
|
||||
}
|
||||
|
||||
/// Copy each value in `vals`, in order, to the first 64-byte boundary after position `offset`.
|
||||
/// If there is sufficient space, then update `offset` and the internal `current_len` to the
|
||||
/// first byte after the copied data and return the starting position of the copied data.
|
||||
/// Otherwise return None and leave `offset` unchanged.
|
||||
fn append_ptrs_locked(&self, offset: &mut usize, vals: &[(*const u8, usize)]) -> Option<usize> {
|
||||
let mut end = *offset;
|
||||
for val in vals {
|
||||
@@ -381,14 +399,20 @@ impl AppendVec {
|
||||
Some(pos)
|
||||
}
|
||||
|
||||
/// Return a reference to the type at `offset` if its data doesn't overrun the internal buffer.
|
||||
/// Otherwise return None. Also return the offset of the first byte after the requested data
|
||||
/// that falls on a 64-byte boundary.
|
||||
fn get_type<'a, T>(&self, offset: usize) -> Option<(&'a T, usize)> {
|
||||
let (data, next) = self.get_slice(offset, mem::size_of::<T>())?;
|
||||
let ptr: *const T = data.as_ptr() as *const T;
|
||||
//UNSAFE: The cast is safe because the slice is aligned and fits into the memory
|
||||
//and the lifetime of he &T is tied to self, which holds the underlying memory map
|
||||
//and the lifetime of the &T is tied to self, which holds the underlying memory map
|
||||
Some((unsafe { &*ptr }, next))
|
||||
}
|
||||
|
||||
/// Return account metadata for the account at `offset` if its data doesn't overrun
|
||||
/// the internal buffer. Otherwise return None. Also return the offset of the first byte
|
||||
/// after the requested data that falls on a 64-byte boundary.
|
||||
pub fn get_account<'a>(&'a self, offset: usize) -> Option<(StoredAccountMeta<'a>, usize)> {
|
||||
let (meta, next): (&'a StoredMeta, _) = self.get_type(offset)?;
|
||||
let (account_meta, next): (&'a AccountMeta, _) = self.get_type(next)?;
|
||||
@@ -417,22 +441,27 @@ impl AppendVec {
|
||||
self.path.clone()
|
||||
}
|
||||
|
||||
pub fn accounts(&self, mut start: usize) -> Vec<StoredAccountMeta> {
|
||||
/// Return account metadata for each account, starting from `offset`.
|
||||
pub fn accounts(&self, mut offset: usize) -> Vec<StoredAccountMeta> {
|
||||
let mut accounts = vec![];
|
||||
while let Some((account, next)) = self.get_account(start) {
|
||||
while let Some((account, next)) = self.get_account(offset) {
|
||||
accounts.push(account);
|
||||
start = next;
|
||||
offset = next;
|
||||
}
|
||||
accounts
|
||||
}
|
||||
|
||||
#[allow(clippy::mutex_atomic)]
|
||||
/// Copy each account metadata, account and hash to the internal buffer.
|
||||
/// Return the starting offset of each account metadata.
|
||||
/// After each account is appended, the internal `current_len` is updated
|
||||
/// and will be available to other threads.
|
||||
pub fn append_accounts(
|
||||
&self,
|
||||
accounts: &[(StoredMeta, &Account)],
|
||||
hashes: &[Hash],
|
||||
) -> Vec<usize> {
|
||||
let mut offset = self.append_offset.lock().unwrap();
|
||||
let _lock = self.append_lock.lock().unwrap();
|
||||
let mut offset = self.len();
|
||||
let mut rv = Vec::with_capacity(accounts.len());
|
||||
for ((stored_meta, account), hash) in accounts.iter().zip(hashes) {
|
||||
let meta_ptr = stored_meta as *const StoredMeta;
|
||||
@@ -456,11 +485,14 @@ impl AppendVec {
|
||||
|
||||
// The last entry in this offset needs to be the u64 aligned offset, because that's
|
||||
// where the *next* entry will begin to be stored.
|
||||
rv.push(u64_align!(*offset));
|
||||
rv.push(u64_align!(offset));
|
||||
|
||||
rv
|
||||
}
|
||||
|
||||
/// Copy the account metadata, account and hash to the internal buffer.
|
||||
/// Return the starting offset of the account metadata.
|
||||
/// After the account is appended, the internal `current_len` is updated.
|
||||
pub fn append_account(
|
||||
&self,
|
||||
storage_meta: StoredMeta,
|
||||
|
Reference in New Issue
Block a user