lazy allocate buffers for bg reader in untar (#18640)
This commit is contained in:
committed by
GitHub
parent
5f9f3724d0
commit
090fbeca24
@ -60,8 +60,10 @@ impl SharedBuffer {
|
|||||||
chunk_size: usize,
|
chunk_size: usize,
|
||||||
reader: T,
|
reader: T,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
assert!(total_buffer_budget > 0);
|
||||||
|
assert!(chunk_size > 0);
|
||||||
let instance = SharedBufferInternal {
|
let instance = SharedBufferInternal {
|
||||||
bg_reader_data: Arc::new(SharedBufferBgReader::new(total_buffer_budget, chunk_size)),
|
bg_reader_data: Arc::new(SharedBufferBgReader::new()),
|
||||||
data: RwLock::new(vec![OneSharedBuffer::default()]), // initialize with 1 vector of empty data at data[0]
|
data: RwLock::new(vec![OneSharedBuffer::default()]), // initialize with 1 vector of empty data at data[0]
|
||||||
|
|
||||||
// default values
|
// default values
|
||||||
@ -76,7 +78,7 @@ impl SharedBuffer {
|
|||||||
.name("solana-compressed_file_reader".to_string())
|
.name("solana-compressed_file_reader".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
// importantly, this thread does NOT hold a refcount on the arc of 'instance'
|
// importantly, this thread does NOT hold a refcount on the arc of 'instance'
|
||||||
bg_reader_data.read_entire_file_in_bg(reader);
|
bg_reader_data.read_entire_file_in_bg(reader, total_buffer_budget, chunk_size);
|
||||||
});
|
});
|
||||||
*instance.bg_reader_join_handle.lock().unwrap() = Some(handle.unwrap());
|
*instance.bg_reader_join_handle.lock().unwrap() = Some(handle.unwrap());
|
||||||
Self { instance }
|
Self { instance }
|
||||||
@ -134,9 +136,9 @@ struct SharedBufferBgReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SharedBufferBgReader {
|
impl SharedBufferBgReader {
|
||||||
fn new(total_buffer_budget: usize, chunk_size: usize) -> Self {
|
fn new() -> Self {
|
||||||
SharedBufferBgReader {
|
SharedBufferBgReader {
|
||||||
buffers: RwLock::new(Self::alloc_buffers(total_buffer_budget, chunk_size)),
|
buffers: RwLock::new(vec![]),
|
||||||
error: RwLock::new(Ok(0)),
|
error: RwLock::new(Ok(0)),
|
||||||
|
|
||||||
// easy defaults
|
// easy defaults
|
||||||
@ -155,17 +157,6 @@ impl SharedBufferBgReader {
|
|||||||
self.new_buffer_signal
|
self.new_buffer_signal
|
||||||
.wait_timeout(Self::default_wait_timeout())
|
.wait_timeout(Self::default_wait_timeout())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn alloc_buffers(total_buffer_budget: usize, chunk_size: usize) -> Vec<OneSharedBuffer> {
|
|
||||||
assert!(total_buffer_budget > 0);
|
|
||||||
assert!(chunk_size > 0);
|
|
||||||
let buffers = Self::num_buffers(total_buffer_budget, chunk_size);
|
|
||||||
let initial_vector_count = buffers;
|
|
||||||
(0..initial_vector_count)
|
|
||||||
.into_iter()
|
|
||||||
.map(|_| Arc::new(vec![0u8; chunk_size]))
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
fn num_buffers(total_buffer_budget: usize, chunk_size: usize) -> usize {
|
fn num_buffers(total_buffer_budget: usize, chunk_size: usize) -> usize {
|
||||||
std::cmp::max(1, total_buffer_budget / chunk_size) // at least 1 buffer
|
std::cmp::max(1, total_buffer_budget / chunk_size) // at least 1 buffer
|
||||||
}
|
}
|
||||||
@ -179,7 +170,12 @@ impl SharedBufferBgReader {
|
|||||||
// Buffers are likely limited to cap memory usage.
|
// Buffers are likely limited to cap memory usage.
|
||||||
// A buffer is recycled after the last client finishes reading from it.
|
// A buffer is recycled after the last client finishes reading from it.
|
||||||
// When a buffer is available (initially or recycled), this code wakes up and reads into that buffer.
|
// When a buffer is available (initially or recycled), this code wakes up and reads into that buffer.
|
||||||
fn read_entire_file_in_bg<T: 'static + Read + std::marker::Send>(&self, mut reader: T) {
|
fn read_entire_file_in_bg<T: 'static + Read + std::marker::Send>(
|
||||||
|
&self,
|
||||||
|
mut reader: T,
|
||||||
|
total_buffer_budget: usize,
|
||||||
|
chunk_size: usize,
|
||||||
|
) {
|
||||||
let now = std::time::Instant::now();
|
let now = std::time::Instant::now();
|
||||||
let mut read_us = 0;
|
let mut read_us = 0;
|
||||||
|
|
||||||
@ -187,6 +183,7 @@ impl SharedBufferBgReader {
|
|||||||
let mut wait_us = 0;
|
let mut wait_us = 0;
|
||||||
let mut total_bytes = 0;
|
let mut total_bytes = 0;
|
||||||
let mut error = SharedBufferReader::default_error();
|
let mut error = SharedBufferReader::default_error();
|
||||||
|
let mut remaining_buffers_to_allocate = Self::num_buffers(total_buffer_budget, chunk_size);
|
||||||
loop {
|
loop {
|
||||||
if self.stop.load(Ordering::Relaxed) {
|
if self.stop.load(Ordering::Relaxed) {
|
||||||
// unsure what error is most appropriate here.
|
// unsure what error is most appropriate here.
|
||||||
@ -197,11 +194,15 @@ impl SharedBufferBgReader {
|
|||||||
let mut buffers = self.buffers.write().unwrap();
|
let mut buffers = self.buffers.write().unwrap();
|
||||||
let buffer = buffers.pop();
|
let buffer = buffers.pop();
|
||||||
drop(buffers);
|
drop(buffers);
|
||||||
let (dest_size, mut dest_data) = if let Some(dest_data) = buffer {
|
let mut dest_data = if let Some(dest_data) = buffer {
|
||||||
// assert that this should not result in a vector copy
|
// assert that this should not result in a vector copy
|
||||||
// These are internal buffers and should not be held by anyone else.
|
// These are internal buffers and should not be held by anyone else.
|
||||||
assert_eq!(Arc::strong_count(&dest_data), 1);
|
assert_eq!(Arc::strong_count(&dest_data), 1);
|
||||||
(dest_data.len(), dest_data)
|
dest_data
|
||||||
|
} else if remaining_buffers_to_allocate > 0 {
|
||||||
|
// we still haven't allocated all the buffers we are allowed to allocate
|
||||||
|
remaining_buffers_to_allocate -= 1;
|
||||||
|
Arc::new(vec![0; chunk_size])
|
||||||
} else {
|
} else {
|
||||||
// nowhere to write, so wait for a buffer to become available
|
// nowhere to write, so wait for a buffer to become available
|
||||||
let mut wait_for_new_buffer = Measure::start("wait_for_new_buffer");
|
let mut wait_for_new_buffer = Measure::start("wait_for_new_buffer");
|
||||||
@ -210,11 +211,12 @@ impl SharedBufferBgReader {
|
|||||||
wait_us += wait_for_new_buffer.as_us();
|
wait_us += wait_for_new_buffer.as_us();
|
||||||
continue; // check stop, try to get a buffer again
|
continue; // check stop, try to get a buffer again
|
||||||
};
|
};
|
||||||
|
let target = Arc::make_mut(&mut dest_data);
|
||||||
|
let dest_size = target.len();
|
||||||
|
|
||||||
let mut bytes_read = 0;
|
let mut bytes_read = 0;
|
||||||
let mut eof = false;
|
let mut eof = false;
|
||||||
let mut error_received = false;
|
let mut error_received = false;
|
||||||
let target = Arc::make_mut(&mut dest_data);
|
|
||||||
|
|
||||||
while bytes_read < dest_size {
|
while bytes_read < dest_size {
|
||||||
let mut time_read = Measure::start("read");
|
let mut time_read = Measure::start("read");
|
||||||
|
Reference in New Issue
Block a user