Reduce max snapshot hashes to stay under MTU (bp #8541) (#8544)

automerge
This commit is contained in:
mergify[bot]
2020-02-29 09:17:09 -08:00
committed by GitHub
parent 1c576d4a68
commit f2fda14333
3 changed files with 29 additions and 13 deletions

View File

@ -84,6 +84,9 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
const NUM_BITS_PER_BYTE: u64 = 8; const NUM_BITS_PER_BYTE: u64 = 8;
const MIN_SIZE_TO_COMPRESS_GZIP: u64 = 64; const MIN_SIZE_TO_COMPRESS_GZIP: u64 = 64;
/// Keep the number of snapshot hashes a node publishes under MAX_PROTOCOL_PAYLOAD_SIZE
pub const MAX_SNAPSHOT_HASHES: usize = 16;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError { pub enum ClusterInfoError {
NoPeers, NoPeers,
@ -441,6 +444,14 @@ impl ClusterInfo {
} }
pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) { pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) {
if snapshot_hashes.len() > MAX_SNAPSHOT_HASHES {
warn!(
"snapshot_hashes too large, ignored: {}",
snapshot_hashes.len()
);
return;
}
let now = timestamp(); let now = timestamp();
let entry = CrdsValue::new_signed( let entry = CrdsValue::new_signed(
CrdsData::SnapshotHash(SnapshotHash::new(self.id(), snapshot_hashes, now)), CrdsData::SnapshotHash(SnapshotHash::new(self.id(), snapshot_hashes, now)),
@ -1059,7 +1070,7 @@ impl ClusterInfo {
} }
/// Splits a Vec of CrdsValues into a nested Vec, trying to make sure that /// Splits a Vec of CrdsValues into a nested Vec, trying to make sure that
/// each Vec is no larger than `PROTOCOL_PAYLOAD_SIZE` /// each Vec is no larger than `MAX_PROTOCOL_PAYLOAD_SIZE`
/// Note: some messages cannot be contained within that size so in the worst case this returns /// Note: some messages cannot be contained within that size so in the worst case this returns
/// N nested Vecs with 1 item each. /// N nested Vecs with 1 item each.
fn split_gossip_messages(msgs: Vec<CrdsValue>) -> Vec<Vec<CrdsValue>> { fn split_gossip_messages(msgs: Vec<CrdsValue>) -> Vec<Vec<CrdsValue>> {

View File

@ -1,4 +1,4 @@
use crate::cluster_info::ClusterInfo; use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
use solana_ledger::{ use solana_ledger::{
snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package, snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package,
}; };
@ -16,8 +16,6 @@ pub struct SnapshotPackagerService {
t_snapshot_packager: JoinHandle<()>, t_snapshot_packager: JoinHandle<()>,
} }
const MAX_SNAPSHOT_HASHES: usize = 24;
impl SnapshotPackagerService { impl SnapshotPackagerService {
pub fn new( pub fn new(
snapshot_package_receiver: SnapshotPackageReceiver, snapshot_package_receiver: SnapshotPackageReceiver,

View File

@ -93,12 +93,15 @@ fn download_file(url: &str, destination_file: &Path) -> Result<(), String> {
let progress_bar = new_spinner_progress_bar(); let progress_bar = new_spinner_progress_bar();
progress_bar.set_message(&format!("{}Downloading {}...", TRUCK, url)); progress_bar.set_message(&format!("{}Downloading {}...", TRUCK, url));
let client = reqwest::blocking::Client::new(); let response = reqwest::blocking::Client::new()
let response = client.get(url).send().map_err(|err| err.to_string())?; .get(url)
.send()
.and_then(|response| response.error_for_status())
.map_err(|err| {
progress_bar.finish_and_clear();
err.to_string()
})?;
let response = response
.error_for_status()
.map_err(|err| format!("Unable to download {}: {}", url, err))?;
let download_size = { let download_size = {
response response
.headers() .headers()
@ -139,9 +142,8 @@ fn download_file(url: &str, destination_file: &Path) -> Result<(), String> {
response, response,
}; };
let mut file = File::create(&temp_destination_file) File::create(&temp_destination_file)
.map_err(|err| format!("Unable to create {:?}: {:?}", temp_destination_file, err))?; .and_then(|mut file| std::io::copy(&mut source, &mut file))
std::io::copy(&mut source, &mut file)
.map_err(|err| format!("Unable to write {:?}: {:?}", temp_destination_file, err))?; .map_err(|err| format!("Unable to write {:?}: {:?}", temp_destination_file, err))?;
source.progress_bar.finish_and_clear(); source.progress_bar.finish_and_clear();
@ -1138,7 +1140,12 @@ pub fn main() {
}) })
.and_then(|_| { .and_then(|_| {
if let Some(snapshot_hash) = snapshot_hash { if let Some(snapshot_hash) = snapshot_hash {
rpc_client.get_slot()
.map_err(|err| format!("Failed to get RPC node slot: {}", err))
.and_then(|slot| {
info!("RPC node root slot: {}", slot);
download_snapshot(&rpc_contact_info.rpc, &ledger_path, snapshot_hash) download_snapshot(&rpc_contact_info.rpc, &ledger_path, snapshot_hash)
})
} else { } else {
Ok(()) Ok(())
} }