diff --git a/runtime/src/shared_buffer_reader.rs b/runtime/src/shared_buffer_reader.rs index 5a90b73bfa..b52fab54fb 100644 --- a/runtime/src/shared_buffer_reader.rs +++ b/runtime/src/shared_buffer_reader.rs @@ -29,11 +29,9 @@ const CHUNK_SIZE_DEFAULT: usize = 100_000_000; type OneSharedBuffer = Arc>; struct SharedBufferInternal { - // error encountered during read - error: RwLock>, - bg_reader: Mutex>>, - bg_eof_reached: AtomicBool, - stop: AtomicBool, + bg_reader_data: Arc, + + bg_reader_join_handle: Mutex>>, // Keep track of the next read location per outstanding client. // index is client's my_client_index. @@ -41,21 +39,10 @@ struct SharedBufferInternal { // Any buffer at index < min(clients) can be recycled or destroyed. clients: RwLock>, - // bg thread reads to 'newly_read_data' and signals - newly_read_data: RwLock>, - // set when newly_read_data gets new data written to it and can be transferred - newly_read_data_signal: WaitableCondvar, // unpacking callers read from 'data'. newly_read_data is transferred to 'data when 'data' is exhausted. // This minimizes lock contention since bg file reader has to have almost constant write access. data: RwLock>, - // currently available set of buffers for bg to read into - // during operation, this is exhausted as the bg reads ahead - // As all clients are done with an earlier buffer, it is recycled by being put back into this vec for the bg thread to pull out. - buffers: RwLock>, - // signaled when a new buffer is added to buffers. This throttles the bg reading. - new_buffer_signal: WaitableCondvar, - // it is convenient to have one of these around empty_buffer: OneSharedBuffer, } @@ -74,44 +61,26 @@ impl SharedBuffer { reader: T, ) -> Self { let instance = SharedBufferInternal { - buffers: RwLock::new(Self::alloc_buffers(total_buffer_budget, chunk_size)), + bg_reader_data: Arc::new(SharedBufferBgReader::new(total_buffer_budget, chunk_size)), data: RwLock::new(vec![OneSharedBuffer::default()]), // initialize with 1 vector of empty data at data[0] - error: RwLock::new(Ok(0)), // default values - newly_read_data: RwLock::default(), - bg_reader: Mutex::default(), - bg_eof_reached: AtomicBool::default(), - stop: AtomicBool::default(), - newly_read_data_signal: WaitableCondvar::default(), - new_buffer_signal: WaitableCondvar::default(), + bg_reader_join_handle: Mutex::default(), clients: RwLock::default(), empty_buffer: OneSharedBuffer::default(), }; let instance = Arc::new(instance); - let instance_ = instance.clone(); + let bg_reader_data = instance.bg_reader_data.clone(); let handle = Builder::new() .name("solana-compressed_file_reader".to_string()) .spawn(move || { - instance_.read_entire_file_in_bg(reader); + // importantly, this thread does NOT hold a refcount on the arc of 'instance' + bg_reader_data.read_entire_file_in_bg(reader); }); - *instance.bg_reader.lock().unwrap() = Some(handle.unwrap()); + *instance.bg_reader_join_handle.lock().unwrap() = Some(handle.unwrap()); Self { instance } } - fn alloc_buffers(total_buffer_budget: usize, chunk_size: usize) -> Vec { - assert!(total_buffer_budget > 0); - assert!(chunk_size > 0); - let buffers = Self::num_buffers(total_buffer_budget, chunk_size); - let initial_vector_count = buffers; - (0..initial_vector_count) - .into_iter() - .map(|_| Arc::new(vec![0u8; chunk_size])) - .collect() - } - fn num_buffers(total_buffer_budget: usize, chunk_size: usize) -> usize { - std::cmp::max(1, total_buffer_budget / chunk_size) // at least 1 buffer - } } pub struct SharedBufferReader { @@ -131,8 +100,8 @@ pub struct SharedBufferReader { impl Drop for SharedBufferInternal { fn drop(&mut self) { - if let Some(handle) = self.bg_reader.lock().unwrap().take() { - self.stop.store(true, Ordering::Relaxed); + if let Some(handle) = self.bg_reader_join_handle.lock().unwrap().take() { + self.bg_reader_data.stop.store(true, Ordering::Relaxed); handle.join().unwrap(); } } @@ -144,7 +113,67 @@ impl Drop for SharedBufferReader { } } -impl SharedBufferInternal { +#[derive(Debug)] +struct SharedBufferBgReader { + stop: AtomicBool, + // error encountered during read + error: RwLock>, + // bg thread reads to 'newly_read_data' and signals + newly_read_data: RwLock>, + // set when newly_read_data gets new data written to it and can be transferred + newly_read_data_signal: WaitableCondvar, + + // currently available set of buffers for bg to read into + // during operation, this is exhausted as the bg reads ahead + // As all clients are done with an earlier buffer, it is recycled by being put back into this vec for the bg thread to pull out. + buffers: RwLock>, + // signaled when a new buffer is added to buffers. This throttles the bg reading. + new_buffer_signal: WaitableCondvar, + + bg_eof_reached: AtomicBool, +} + +impl SharedBufferBgReader { + fn new(total_buffer_budget: usize, chunk_size: usize) -> Self { + SharedBufferBgReader { + buffers: RwLock::new(Self::alloc_buffers(total_buffer_budget, chunk_size)), + error: RwLock::new(Ok(0)), + + // easy defaults + stop: AtomicBool::new(false), + newly_read_data: RwLock::default(), + newly_read_data_signal: WaitableCondvar::default(), + new_buffer_signal: WaitableCondvar::default(), + bg_eof_reached: AtomicBool::default(), + } + } + + fn default_wait_timeout() -> Duration { + Duration::from_millis(100) // short enough to be unnoticable in case of trouble, long enough for efficient waiting + } + fn wait_for_new_buffer(&self) -> bool { + self.new_buffer_signal + .wait_timeout(Self::default_wait_timeout()) + } + + fn alloc_buffers(total_buffer_budget: usize, chunk_size: usize) -> Vec { + assert!(total_buffer_budget > 0); + assert!(chunk_size > 0); + let buffers = Self::num_buffers(total_buffer_budget, chunk_size); + let initial_vector_count = buffers; + (0..initial_vector_count) + .into_iter() + .map(|_| Arc::new(vec![0u8; chunk_size])) + .collect() + } + fn num_buffers(total_buffer_budget: usize, chunk_size: usize) -> usize { + std::cmp::max(1, total_buffer_budget / chunk_size) // at least 1 buffer + } + fn set_error(&self, error: std::io::Error) { + *self.error.write().unwrap() = Err(error); + self.newly_read_data_signal.notify_all(); // any client waiting for new data needs to wake up and check for errors + } + // read ahead the entire file. // This is governed by the supply of buffers. // Buffers are likely limited to cap memory usage. @@ -244,27 +273,20 @@ impl SharedBufferInternal { self.error.read().unwrap() ); } - fn set_error(&self, error: std::io::Error) { - *self.error.write().unwrap() = Err(error); - self.newly_read_data_signal.notify_all(); // any client waiting for new data needs to wake up and check for errors - } - fn default_wait_timeout() -> Duration { - Duration::from_millis(100) // short enough to be unnoticable in case of trouble, long enough for efficient waiting - } - fn wait_for_new_buffer(&self) -> bool { - self.new_buffer_signal - .wait_timeout(Self::default_wait_timeout()) - } +} + +impl SharedBufferInternal { fn wait_for_newly_read_data(&self) -> bool { - self.newly_read_data_signal - .wait_timeout(Self::default_wait_timeout()) + self.bg_reader_data + .newly_read_data_signal + .wait_timeout(SharedBufferBgReader::default_wait_timeout()) } // bg reader uses write lock on 'newly_read_data' each time a buffer is read or recycled // client readers read from 'data' using read locks // when all of 'data' has been exhausted by clients, 1 client needs to transfer from 'newly_read_data' to 'data' one time. // returns true if any data was added to 'data' fn transfer_data_from_bg(&self) -> bool { - let mut from_lock = self.newly_read_data.write().unwrap(); + let mut from_lock = self.bg_reader_data.newly_read_data.write().unwrap(); if from_lock.is_empty() { // no data available from bg return false; @@ -280,7 +302,7 @@ impl SharedBufferInternal { true // data was transferred } fn has_reached_eof(&self) -> bool { - self.bg_eof_reached.load(Ordering::Relaxed) + self.bg_reader_data.bg_eof_reached.load(Ordering::Relaxed) } } @@ -351,8 +373,14 @@ impl SharedBufferReader { if !eof { // if !eof, recycle this buffer and notify waiting reader(s) // if eof, just drop buffer this buffer since it isn't needed for reading anymore - self.instance.buffers.write().unwrap().push(remove); - self.instance.new_buffer_signal.notify_all(); // new buffer available for bg reader + self.instance + .bg_reader_data + .buffers + .write() + .unwrap() + .push(remove); + self.instance.bg_reader_data.new_buffer_signal.notify_all(); + // new buffer available for bg reader } } } @@ -427,7 +455,7 @@ impl Read for SharedBufferReader { // Since the bg reader could not satisfy our read, now is a good time to check to see if the bg reader encountered an error. // Note this is a write lock because we want to get the actual error detected and return it here and avoid races with other readers if we tried a read and then subsequent write lock. // This would be simpler if I could clone an io error. - let mut error = instance.error.write().unwrap(); + let mut error = instance.bg_reader_data.error.write().unwrap(); if error.is_err() { // replace the current error (with AN error instead of ok) let mut stored_error = Err(Self::default_error()); @@ -738,7 +766,7 @@ pub mod tests { } fn adjusted_buffer_size(total_buffer_budget: usize, chunk_size: usize) -> usize { - let num_buffers = SharedBuffer::num_buffers(total_buffer_budget, chunk_size); + let num_buffers = SharedBufferBgReader::num_buffers(total_buffer_budget, chunk_size); num_buffers * chunk_size }