Move genesis/snapshot archive download into Rust (#5515)
This commit is contained in:
@ -9,8 +9,10 @@ license = "Apache-2.0"
|
||||
homepage = "https://solana.com/"
|
||||
|
||||
[dependencies]
|
||||
bzip2 = "0.3.3"
|
||||
clap = "2.33.0"
|
||||
log = "0.4.8"
|
||||
reqwest = "0.9.19"
|
||||
serde_json = "1.0.40"
|
||||
solana = { path = "../core", version = "0.18.0-pre1" }
|
||||
solana-drone = { path = "../drone", version = "0.18.0-pre1" }
|
||||
@ -21,6 +23,8 @@ solana-runtime = { path = "../runtime", version = "0.18.0-pre1" }
|
||||
solana-sdk = { path = "../sdk", version = "0.18.0-pre1" }
|
||||
solana-vote-api = { path = "../programs/vote_api", version = "0.18.0-pre1" }
|
||||
solana-vote-signer = { path = "../vote-signer", version = "0.18.0-pre1" }
|
||||
tempfile = "3.1.0"
|
||||
tar = "0.4.26"
|
||||
|
||||
[features]
|
||||
cuda = ["solana/cuda"]
|
||||
|
@ -1,8 +1,10 @@
|
||||
use bzip2::bufread::BzDecoder;
|
||||
use clap::{crate_description, crate_name, crate_version, value_t, App, Arg};
|
||||
use log::*;
|
||||
use solana::bank_forks::SnapshotConfig;
|
||||
use solana::cluster_info::{Node, FULLNODE_PORT_RANGE};
|
||||
use solana::contact_info::ContactInfo;
|
||||
use solana::gossip_service::discover;
|
||||
use solana::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS;
|
||||
use solana::local_vote_signer_service::LocalVoteSignerService;
|
||||
use solana::service::Service;
|
||||
@ -11,12 +13,12 @@ use solana::validator::{Validator, ValidatorConfig};
|
||||
use solana_netutil::parse_port_range;
|
||||
use solana_sdk::signature::{read_keypair, Keypair, KeypairUtil};
|
||||
use solana_sdk::timing::Slot;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::fs::{self, File};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::exit;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
fn port_range_validator(port_range: String) -> Result<(), String> {
|
||||
if parse_port_range(&port_range).is_some() {
|
||||
@ -26,6 +28,100 @@ fn port_range_validator(port_range: String) -> Result<(), String> {
|
||||
}
|
||||
}
|
||||
|
||||
fn download_archive(
|
||||
rpc_addr: &SocketAddr,
|
||||
archive_name: &str,
|
||||
download_path: &Path,
|
||||
extract: bool,
|
||||
) -> Result<(), String> {
|
||||
let archive_path = download_path.join(archive_name);
|
||||
if archive_path.is_file() {
|
||||
return Ok(());
|
||||
}
|
||||
let temp_archive_path = {
|
||||
let mut p = archive_path.clone();
|
||||
p.set_extension(".tmp");
|
||||
p
|
||||
};
|
||||
|
||||
let url = format!("http://{}/{}", rpc_addr, archive_name);
|
||||
println!("Downloading {}...", url);
|
||||
let download_start = Instant::now();
|
||||
|
||||
let mut response = reqwest::get(&url).map_err(|err| format!("Unable to get: {:?}", err))?;
|
||||
let mut file = File::create(&temp_archive_path)
|
||||
.map_err(|err| format!("Unable to create {:?}: {:?}", temp_archive_path, err))?;
|
||||
std::io::copy(&mut response, &mut file)
|
||||
.map_err(|err| format!("Unable to write {:?}: {:?}", temp_archive_path, err))?;
|
||||
|
||||
println!(
|
||||
"Downloaded {} in {:?}",
|
||||
archive_name,
|
||||
Instant::now().duration_since(download_start)
|
||||
);
|
||||
|
||||
if extract {
|
||||
println!("Extracting {:?}...", archive_path);
|
||||
let extract_start = Instant::now();
|
||||
let tar_bz2 = File::open(&temp_archive_path)
|
||||
.map_err(|err| format!("Unable to open {}: {:?}", archive_name, err))?;
|
||||
let tar = BzDecoder::new(std::io::BufReader::new(tar_bz2));
|
||||
let mut archive = tar::Archive::new(tar);
|
||||
archive
|
||||
.unpack(download_path)
|
||||
.map_err(|err| format!("Unable to unpack {}: {:?}", archive_name, err))?;
|
||||
println!(
|
||||
"Extracted {} in {:?}",
|
||||
archive_name,
|
||||
Instant::now().duration_since(extract_start)
|
||||
);
|
||||
}
|
||||
std::fs::rename(temp_archive_path, archive_path)
|
||||
.map_err(|err| format!("Unable to rename: {:?}", err))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn initialize_ledger_path(
|
||||
entrypoint: &ContactInfo,
|
||||
gossip_addr: &SocketAddr,
|
||||
ledger_path: &Path,
|
||||
no_snapshot_fetch: bool,
|
||||
) -> Result<(), String> {
|
||||
let (nodes, _replicators) = discover(
|
||||
&entrypoint.gossip,
|
||||
Some(1),
|
||||
Some(60),
|
||||
None,
|
||||
Some(&gossip_addr),
|
||||
)
|
||||
.map_err(|err| err.to_string())?;
|
||||
|
||||
let rpc_addr = nodes
|
||||
.iter()
|
||||
.filter_map(ContactInfo::valid_client_facing_addr)
|
||||
.map(|addrs| addrs.0)
|
||||
.find(|rpc_addr| rpc_addr.ip() == entrypoint.gossip.ip())
|
||||
.unwrap_or_else(|| {
|
||||
eprintln!(
|
||||
"Entrypoint ({:?}) is not running the RPC service",
|
||||
entrypoint.gossip.ip()
|
||||
);
|
||||
exit(1);
|
||||
});
|
||||
|
||||
fs::create_dir_all(ledger_path).map_err(|err| err.to_string())?;
|
||||
|
||||
download_archive(&rpc_addr, "genesis.tar.bz2", ledger_path, true)?;
|
||||
if !no_snapshot_fetch {
|
||||
let _ = fs::remove_file(ledger_path.join("snapshot.tar.bz2"));
|
||||
download_archive(&rpc_addr, "snapshot.tar.bz2", ledger_path, false)
|
||||
.unwrap_or_else(|err| eprintln!("Warning: Unable to fetch snapshot: {:?}", err));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() {
|
||||
solana_logger::setup_with_filter("solana=info");
|
||||
solana_metrics::set_panic_hook("validator");
|
||||
@ -40,7 +136,7 @@ fn main() {
|
||||
.long("blockstream")
|
||||
.takes_value(true)
|
||||
.value_name("UNIX DOMAIN SOCKET")
|
||||
.help("Open blockstream at this unix domain socket path")
|
||||
.help("Stream entries to this unix domain socket path")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("identity")
|
||||
@ -95,6 +191,13 @@ fn main() {
|
||||
.takes_value(true)
|
||||
.help("Rendezvous with the cluster at this entry point"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("no_snapshot_fetch")
|
||||
.long("no-snapshot-fetch")
|
||||
.takes_value(false)
|
||||
.requires("entrypoint")
|
||||
.help("Do not attempt to fetch a snapshot from the cluster entrypoint"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("no_voting")
|
||||
.long("no-voting")
|
||||
@ -142,7 +245,7 @@ fn main() {
|
||||
.help("Rendezvous with the vote signer at this RPC end point"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("accounts")
|
||||
Arg::with_name("account_paths")
|
||||
.long("accounts")
|
||||
.value_name("PATHS")
|
||||
.takes_value(true)
|
||||
@ -243,21 +346,19 @@ fn main() {
|
||||
),
|
||||
);
|
||||
|
||||
if let Some(paths) = matches.value_of("accounts") {
|
||||
validator_config.account_paths = Some(paths.to_string());
|
||||
if let Some(account_paths) = matches.value_of("account_paths") {
|
||||
validator_config.account_paths = Some(account_paths.to_string());
|
||||
} else {
|
||||
validator_config.account_paths =
|
||||
Some(ledger_path.join("accounts").to_str().unwrap().to_string());
|
||||
}
|
||||
|
||||
validator_config.snapshot_config = matches.value_of("snapshot_interval_slots").map(|s| {
|
||||
let snapshots_dir = ledger_path.clone().join("snapshot");
|
||||
let snapshots_bank_state_dir = snapshots_dir.join("bank_states");
|
||||
let snapshots_tar_dir = snapshots_dir.join("tar");
|
||||
fs::create_dir_all(&snapshots_dir).expect("Failed to create snapshots directory");
|
||||
fs::create_dir_all(&snapshots_bank_state_dir)
|
||||
.expect("Failed to create snapshots bank state directory");
|
||||
fs::create_dir_all(&snapshots_tar_dir).expect("Failed to create snapshots tar directory");
|
||||
SnapshotConfig::new(
|
||||
snapshots_bank_state_dir,
|
||||
snapshots_tar_dir,
|
||||
snapshots_dir,
|
||||
ledger_path.clone(),
|
||||
s.parse::<usize>().unwrap(),
|
||||
)
|
||||
});
|
||||
@ -291,7 +392,28 @@ fn main() {
|
||||
.value_of("blockstream_unix_socket")
|
||||
.map(PathBuf::from);
|
||||
|
||||
let keypair = Arc::new(keypair);
|
||||
if let Some(ref entrypoint_addr) = cluster_entrypoint {
|
||||
initialize_ledger_path(
|
||||
entrypoint_addr,
|
||||
&gossip_addr,
|
||||
&ledger_path,
|
||||
!matches.is_present("no_snapshot_fetch"),
|
||||
)
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Failed to download ledger: {}", err);
|
||||
exit(1);
|
||||
});
|
||||
} else {
|
||||
// Without a cluster entrypoint, ledger_path must already be present
|
||||
if !ledger_path.is_dir() {
|
||||
eprintln!(
|
||||
"Error: ledger directory does not exist or is not accessible: {:?}",
|
||||
ledger_path
|
||||
);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
let mut node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr, dynamic_port_range);
|
||||
if let Some(port) = matches.value_of("rpc_port") {
|
||||
let port_number = port.to_string().parse().expect("integer");
|
||||
@ -307,7 +429,7 @@ fn main() {
|
||||
|
||||
let validator = Validator::new(
|
||||
node,
|
||||
&keypair,
|
||||
&Arc::new(keypair),
|
||||
&ledger_path,
|
||||
&vote_account,
|
||||
&Arc::new(voting_keypair),
|
||||
|
Reference in New Issue
Block a user