snapshot download enhancement (#17415)
1. Allow the validator bootstrap code to specify the minimal snapshot download speed. If the snapshot download speed is detected below that, a different RPC can be retried. The default is 10MB/sec. 2. To prevent spinning on a number of sub-optimal choices and not making progress, the abort/retry logic is implemented with the following safe guards: 2.1 at maximum we do this retry for 5 times -- this number is configurable with default 5. 2.2 if the download in one notification round (5 second) is more than 2%, do not do retry -- it is not as bad anyway. 2.3 if the remaining estimate time is less than 1 minutes, do not abort retry as it will be done quickly anyway. 2.4 We do this abort/retry logic only at the first notification to avoid wasting download efforts -- the reasoning is being opportunistic and too greedy may not achieve overall shorter download time. 3. The download_snapshot and download_file is modified with the option allowing caller to notified of download progress via a callback. This allows the business logic of retrying to the place it belongs.
This commit is contained in:
@@ -9,7 +9,7 @@ use std::io;
|
||||
use std::io::Read;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Instant;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
static TRUCK: Emoji = Emoji("🚚 ", "");
|
||||
static SPARKLE: Emoji = Emoji("✨ ", "");
|
||||
@@ -23,11 +23,41 @@ fn new_spinner_progress_bar() -> ProgressBar {
|
||||
progress_bar
|
||||
}
|
||||
|
||||
pub fn download_file(
|
||||
/// Structure modeling information about download progress
|
||||
#[derive(Debug)]
|
||||
pub struct DownloadProgressRecord {
|
||||
// Duration since the beginning of the download
|
||||
pub elapsed_time: Duration,
|
||||
// Duration since the the last notification
|
||||
pub last_elapsed_time: Duration,
|
||||
// the bytes/sec speed measured for the last notification period
|
||||
pub last_throughput: f32,
|
||||
// the bytes/sec speed measured from the beginning
|
||||
pub total_throughput: f32,
|
||||
// total bytes of the download
|
||||
pub total_bytes: usize,
|
||||
// bytes downloaded so far
|
||||
pub current_bytes: usize,
|
||||
// percentage downloaded
|
||||
pub percentage_done: f32,
|
||||
// Estimated remaining time (in seconds) to finish the download if it keeps at the the last download speed
|
||||
pub estimated_remaining_time: f32,
|
||||
// The times of the progress is being notified, it starts from 1 and increments by 1 each time
|
||||
pub notification_count: u64,
|
||||
}
|
||||
|
||||
/// This callback allows the caller to get notified of the download progress modelled by DownloadProgressRecord
|
||||
/// Return "true" to continue the download
|
||||
/// Return "false" to abort the download
|
||||
pub fn download_file<F>(
|
||||
url: &str,
|
||||
destination_file: &Path,
|
||||
use_progress_bar: bool,
|
||||
) -> Result<(), String> {
|
||||
progress_notify_callback: &Option<F>,
|
||||
) -> Result<(), String>
|
||||
where
|
||||
F: Fn(&DownloadProgressRecord) -> bool,
|
||||
{
|
||||
if destination_file.is_file() {
|
||||
return Err(format!("{:?} already exists", destination_file));
|
||||
}
|
||||
@@ -83,7 +113,10 @@ pub fn download_file(
|
||||
info!("Downloading {} bytes from {}", download_size, url);
|
||||
}
|
||||
|
||||
struct DownloadProgress<R> {
|
||||
struct DownloadProgress<R, F>
|
||||
where
|
||||
F: Fn(&DownloadProgressRecord) -> bool,
|
||||
{
|
||||
progress_bar: ProgressBar,
|
||||
response: R,
|
||||
last_print: Instant,
|
||||
@@ -91,30 +124,71 @@ pub fn download_file(
|
||||
last_print_bytes: usize,
|
||||
download_size: f32,
|
||||
use_progress_bar: bool,
|
||||
start_time: Instant,
|
||||
callback: Option<F>,
|
||||
notification_count: u64,
|
||||
}
|
||||
|
||||
impl<R: Read> Read for DownloadProgress<R> {
|
||||
impl<R: Read, F> Read for DownloadProgress<R, F>
|
||||
where
|
||||
F: Fn(&DownloadProgressRecord) -> bool,
|
||||
{
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.response.read(buf).map(|n| {
|
||||
if self.use_progress_bar {
|
||||
self.progress_bar.inc(n as u64);
|
||||
} else {
|
||||
self.current_bytes += n;
|
||||
if self.last_print.elapsed().as_secs() > 5 {
|
||||
let total_bytes_f32 = self.current_bytes as f32;
|
||||
let diff_bytes_f32 = (self.current_bytes - self.last_print_bytes) as f32;
|
||||
info!(
|
||||
"downloaded {} bytes {:.1}% {:.1} bytes/s",
|
||||
self.current_bytes,
|
||||
100f32 * (total_bytes_f32 / self.download_size),
|
||||
diff_bytes_f32 / self.last_print.elapsed().as_secs_f32(),
|
||||
);
|
||||
self.last_print = Instant::now();
|
||||
self.last_print_bytes = self.current_bytes;
|
||||
}
|
||||
let n = self.response.read(buf)?;
|
||||
|
||||
self.current_bytes += n;
|
||||
let total_bytes_f32 = self.current_bytes as f32;
|
||||
let diff_bytes_f32 = (self.current_bytes - self.last_print_bytes) as f32;
|
||||
let last_throughput = diff_bytes_f32 / self.last_print.elapsed().as_secs_f32();
|
||||
let estimated_remaining_time = if last_throughput > 0_f32 {
|
||||
(self.download_size - self.current_bytes as f32) / last_throughput
|
||||
} else {
|
||||
f32::MAX
|
||||
};
|
||||
|
||||
let mut progress_record = DownloadProgressRecord {
|
||||
elapsed_time: self.start_time.elapsed(),
|
||||
last_elapsed_time: self.last_print.elapsed(),
|
||||
last_throughput,
|
||||
total_throughput: self.current_bytes as f32
|
||||
/ self.start_time.elapsed().as_secs_f32(),
|
||||
total_bytes: self.download_size as usize,
|
||||
current_bytes: self.current_bytes,
|
||||
percentage_done: 100f32 * (total_bytes_f32 / self.download_size),
|
||||
estimated_remaining_time,
|
||||
notification_count: self.notification_count,
|
||||
};
|
||||
let mut to_update_progress = false;
|
||||
if progress_record.last_elapsed_time.as_secs() > 5 {
|
||||
self.last_print = Instant::now();
|
||||
self.last_print_bytes = self.current_bytes;
|
||||
to_update_progress = true;
|
||||
self.notification_count += 1;
|
||||
progress_record.notification_count = self.notification_count
|
||||
}
|
||||
|
||||
if self.use_progress_bar {
|
||||
self.progress_bar.inc(n as u64);
|
||||
} else if to_update_progress {
|
||||
info!(
|
||||
"downloaded {} bytes {:.1}% {:.1} bytes/s",
|
||||
self.current_bytes,
|
||||
progress_record.percentage_done,
|
||||
progress_record.last_throughput,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(callback) = &self.callback {
|
||||
if to_update_progress && !callback(&progress_record) {
|
||||
info!("Download is aborted by the caller");
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Download is aborted by the caller",
|
||||
));
|
||||
}
|
||||
n
|
||||
})
|
||||
}
|
||||
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,6 +200,9 @@ pub fn download_file(
|
||||
last_print_bytes: 0,
|
||||
download_size: (download_size as f32).max(1f32),
|
||||
use_progress_bar,
|
||||
start_time: Instant::now(),
|
||||
callback: progress_notify_callback.as_ref(),
|
||||
notification_count: 0,
|
||||
};
|
||||
|
||||
File::create(&temp_destination_file)
|
||||
@@ -164,6 +241,7 @@ pub fn download_genesis_if_missing(
|
||||
&format!("http://{}/{}", rpc_addr, DEFAULT_GENESIS_ARCHIVE),
|
||||
&tmp_genesis_package,
|
||||
use_progress_bar,
|
||||
&None::<fn(&DownloadProgressRecord) -> bool>,
|
||||
)?;
|
||||
|
||||
Ok(tmp_genesis_package)
|
||||
@@ -172,13 +250,17 @@ pub fn download_genesis_if_missing(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn download_snapshot(
|
||||
pub fn download_snapshot<F>(
|
||||
rpc_addr: &SocketAddr,
|
||||
snapshot_output_dir: &Path,
|
||||
desired_snapshot_hash: (Slot, Hash),
|
||||
use_progress_bar: bool,
|
||||
maximum_snapshots_to_retain: usize,
|
||||
) -> Result<(), String> {
|
||||
progress_notify_callback: &Option<F>,
|
||||
) -> Result<(), String>
|
||||
where
|
||||
F: Fn(&DownloadProgressRecord) -> bool,
|
||||
{
|
||||
snapshot_utils::purge_old_snapshot_archives(snapshot_output_dir, maximum_snapshots_to_retain);
|
||||
|
||||
for compression in &[
|
||||
@@ -208,6 +290,7 @@ pub fn download_snapshot(
|
||||
),
|
||||
&desired_snapshot_package,
|
||||
use_progress_bar,
|
||||
&progress_notify_callback,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
|
Reference in New Issue
Block a user