From 0ebfa08860bc3003e7d2973df03abaf34e83d188 Mon Sep 17 00:00:00 2001 From: Solana Maintainers Date: Mon, 3 May 2021 17:38:59 -0700 Subject: [PATCH] Remove ramp-tps --- Cargo.lock | 22 -- Cargo.toml | 1 - ramp-tps/Cargo.toml | 28 --- ramp-tps/src/main.rs | 493 ---------------------------------------- ramp-tps/src/results.rs | 92 -------- ramp-tps/src/stake.rs | 142 ------------ ramp-tps/src/tps.rs | 72 ------ ramp-tps/src/utils.rs | 168 -------------- ramp-tps/src/voters.rs | 247 -------------------- 9 files changed, 1265 deletions(-) delete mode 100644 ramp-tps/Cargo.toml delete mode 100644 ramp-tps/src/main.rs delete mode 100644 ramp-tps/src/results.rs delete mode 100644 ramp-tps/src/stake.rs delete mode 100644 ramp-tps/src/tps.rs delete mode 100644 ramp-tps/src/utils.rs delete mode 100644 ramp-tps/src/voters.rs diff --git a/Cargo.lock b/Cargo.lock index 00edbf3502..6ab0dd84df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5003,28 +5003,6 @@ dependencies = [ "tokio 1.1.1", ] -[[package]] -name = "solana-ramp-tps" -version = "1.7.0" -dependencies = [ - "bzip2", - "clap", - "log 0.4.11", - "reqwest", - "serde", - "serde_json", - "serde_yaml", - "solana-client", - "solana-core", - "solana-logger 1.7.0", - "solana-metrics", - "solana-net-utils", - "solana-notifier", - "solana-sdk", - "solana-stake-program", - "tar", -] - [[package]] name = "solana-rayon-threadlimit" version = "1.7.0" diff --git a/Cargo.toml b/Cargo.toml index 2a7a8d5650..7a4da61834 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,6 @@ members = [ "programs/stake", "programs/vote", "remote-wallet", - "ramp-tps", "runtime", "runtime/store-tool", "sdk", diff --git a/ramp-tps/Cargo.toml b/ramp-tps/Cargo.toml deleted file mode 100644 index c8fcdef21a..0000000000 --- a/ramp-tps/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -authors = ["Solana Maintainers "] -edition = "2018" -name = "solana-ramp-tps" -description = "Solana Tour de SOL - TPS ramp up" -version = "1.7.0" -repository = "https://github.com/solana-labs/tour-de-sol" -license = "Apache-2.0" -homepage = "https://solana.com/" -documentation = "https://docs.rs/solana-ramp-tps" - -[dependencies] -bzip2 = "0.3.3" -clap = "2.33.1" -log = "0.4.11" -reqwest = { version = "0.11.2", default-features = false } -serde = "1.0.122" -serde_json = "1.0.56" -serde_yaml = "0.8.13" -solana-core = { path = "../core", version = "=1.7.0" } -solana-client = { path = "../client", version = "=1.7.0" } -solana-logger = { path = "../logger", version = "=1.7.0" } -solana-metrics = { path = "../metrics", version = "=1.7.0" } -solana-net-utils = { path = "../net-utils", version = "=1.7.0" } -solana-notifier = { path = "../notifier", version = "=1.7.0" } -solana-sdk = { path = "../sdk", version = "=1.7.0" } -solana-stake-program = { path = "../programs/stake", version = "=1.7.0" } -tar = "0.4.28" diff --git a/ramp-tps/src/main.rs b/ramp-tps/src/main.rs deleted file mode 100644 index 5cdbf860b1..0000000000 --- a/ramp-tps/src/main.rs +++ /dev/null @@ -1,493 +0,0 @@ -//! Ramp up TPS for Tour de SOL until all validators drop out -#![allow(clippy::integer_arithmetic)] - -mod results; -mod stake; -mod tps; -mod utils; -mod voters; - -use clap::{crate_description, crate_name, crate_version, value_t, value_t_or_exit, App, Arg}; -use log::*; -use results::Results; -use solana_client::rpc_client::RpcClient; -use solana_metrics::datapoint_info; -use solana_sdk::{genesis_config::GenesisConfig, signature::read_keypair_file}; -use solana_stake_program::config::{id as stake_config_id, Config as StakeConfig}; -use std::{ - collections::HashMap, - fs, - path::PathBuf, - process::{exit, Command}, - rc::Rc, - thread::sleep, - time::Duration, -}; - -const NUM_BENCH_CLIENTS: usize = 2; -const TDS_ENTRYPOINT: &str = "tds.solana.com"; -const TMP_LEDGER_PATH: &str = ".tmp/ledger"; -const FAUCET_KEYPAIR_PATH: &str = "faucet-keypair.json"; -const PUBKEY_MAP_FILE: &str = "validators/all-username.yml"; -const RESULTS_FILE: &str = "results.yml"; -const DEFAULT_TX_COUNT_BASELINE: &str = "5000"; -const DEFAULT_TX_COUNT_INCREMENT: &str = "5000"; -const DEFAULT_TPS_ROUND_MINUTES: &str = "60"; -const THREAD_BATCH_SLEEP_MS: &str = "1000"; -const DEFAULT_INITIAL_SOL_BALANCE: &str = "1"; - -// Transaction count increments linearly each round -fn tx_count_for_round(tps_round: u32, base: u64, incr: u64) -> u64 { - base + u64::from(tps_round - 1) * incr -} - -// Gift will double the staked lamports each round. -fn gift_for_round(tps_round: u32, initial_balance: u64) -> u64 { - if tps_round > 1 { - initial_balance * 2u64.pow(tps_round - 2) - } else { - 0 - } -} - -#[allow(clippy::cognitive_complexity)] -fn main() { - solana_logger::setup_with_default("solana=debug"); - solana_metrics::set_panic_hook("ramp-tps"); - let mut notifier = solana_notifier::Notifier::default(); - - let matches = App::new(crate_name!()) - .about(crate_description!()) - .version(crate_version!()) - .arg( - Arg::with_name("faucet_keypair_path") - .long("faucet-keypair-path") - .short("k") - .value_name("PATH") - .takes_value(true) - .default_value(FAUCET_KEYPAIR_PATH) - .help("Path to the faucet keypair for stake award distribution"), - ) - .arg( - Arg::with_name("net_dir") - .long("net-dir") - .value_name("DIR") - .takes_value(true) - .help("The directory used for running commands on the cluster"), - ) - .arg( - Arg::with_name("pubkey_map_file") - .long("pubkey-map-file") - .value_name("FILE") - .default_value(PUBKEY_MAP_FILE) - .takes_value(true) - .help("YAML file that maps validator identity pubkeys to keybase user id"), - ) - .arg( - Arg::with_name("results_file") - .long("results-file") - .value_name("FILE") - .default_value(RESULTS_FILE) - .takes_value(true) - .help("YAML file that lists the results for each round"), - ) - .arg( - Arg::with_name("round") - .long("round") - .value_name("NUM") - .takes_value(true) - .default_value("1") - .help("The starting round of TPS ramp up"), - ) - .arg( - Arg::with_name("round_minutes") - .long("round-minutes") - .value_name("NUM") - .takes_value(true) - .default_value(DEFAULT_TPS_ROUND_MINUTES) - .help("The duration in minutes of a TPS round"), - ) - .arg( - Arg::with_name("tx_count_baseline") - .long("tx-count-baseline") - .value_name("NUM") - .takes_value(true) - .default_value(DEFAULT_TX_COUNT_BASELINE) - .help("The tx-count of round 1"), - ) - .arg( - Arg::with_name("tx_count_increment") - .long("tx-count-increment") - .value_name("NUM") - .takes_value(true) - .default_value(DEFAULT_TX_COUNT_INCREMENT) - .help("The tx-count increment for the next round"), - ) - .arg( - Arg::with_name("initial_balance") - .long("initial-balance") - .value_name("SOL") - .takes_value(true) - .default_value(DEFAULT_INITIAL_SOL_BALANCE) - .help("The number of SOL that each partipant started with"), - ) - .arg( - Arg::with_name("entrypoint") - .short("n") - .long("entrypoint") - .value_name("HOST") - .takes_value(true) - .default_value(TDS_ENTRYPOINT) - .validator(utils::is_host) - .help("The entrypoint used for RPC calls"), - ) - .arg( - Arg::with_name("stake_activation_epoch") - .long("stake-activation-epoch") - .value_name("NUM") - .takes_value(true) - .help("The stake activated in this epoch must fully warm up before the first round begins"), - ) - .arg( - Arg::with_name("destake_net_nodes_epoch") - .long("destake-net-nodes-epoch") - .value_name("NUM") - .takes_value(true) - .default_value("9") - .help("The epoch for which to run destake-net-nodes.sh at"), - ) - .get_matches(); - - let pubkey_map_file = value_t_or_exit!(matches, "pubkey_map_file", String); - let pubkey_map: HashMap = - serde_yaml::from_reader(fs::File::open(&pubkey_map_file).unwrap_or_else(|err| { - eprintln!( - "Error: Unable to open --pubkey-map-file {}: {}", - pubkey_map_file, err - ); - exit(1); - })) - .unwrap_or_else(|err| { - eprintln!( - "Error: Unable to parse --pubkey-map-file {}: {}", - pubkey_map_file, err - ); - exit(1); - }); - let pubkey_to_keybase = Rc::new(move |pubkey: &solana_sdk::pubkey::Pubkey| -> String { - let pubkey = pubkey.to_string(); - match pubkey_map.get(&pubkey) { - Some(keybase) => format!("{} ({})", keybase, pubkey), - None => pubkey, - } - }); - - let net_dir = value_t_or_exit!(matches, "net_dir", String); - let faucet_keypair_path = value_t_or_exit!(matches, "faucet_keypair_path", String); - let faucet_keypair = read_keypair_file(&faucet_keypair_path) - .unwrap_or_else(|err| panic!("Unable to read {}: {}", faucet_keypair_path, err)); - let mut tps_round = value_t_or_exit!(matches, "round", u32).max(1); - let results_file_name = value_t_or_exit!(matches, "results_file", String); - let previous_results = Results::read(&results_file_name); - let mut tps_round_results = Results::new(results_file_name, previous_results, tps_round); - let tx_count_baseline = value_t_or_exit!(matches, "tx_count_baseline", u64); - let tx_count_increment = value_t_or_exit!(matches, "tx_count_increment", u64); - let round_minutes = value_t_or_exit!(matches, "round_minutes", u64).max(1); - let round_duration = Duration::from_secs(round_minutes * 60); - let initial_balance = value_t_or_exit!(matches, "initial_balance", u64); - let tmp_ledger_path = PathBuf::from(TMP_LEDGER_PATH); - let _ = fs::remove_dir_all(&tmp_ledger_path); - fs::create_dir_all(&tmp_ledger_path).expect("failed to create temp ledger path"); - - notifier.send("Hi!"); - datapoint_info!("ramp-tps", ("event", "boot", String),); - - let entrypoint_str = matches.value_of("entrypoint").unwrap(); - debug!("Connecting to {}", entrypoint_str); - let entrypoint_addr = solana_net_utils::parse_host_port(&format!("{}:8899", entrypoint_str)) - .expect("failed to parse entrypoint address"); - utils::download_genesis(&entrypoint_addr, &tmp_ledger_path).expect("genesis download failed"); - let genesis_config = - GenesisConfig::load(&tmp_ledger_path).expect("failed to load genesis block"); - - debug!("Fetching current slot..."); - let rpc_client = RpcClient::new_socket_with_timeout(entrypoint_addr, Duration::from_secs(10)); - let current_slot = rpc_client.get_slot().expect("failed to fetch current slot"); - debug!("Current slot: {}", current_slot); - let epoch_schedule = &genesis_config.epoch_schedule; - let first_normal_slot = epoch_schedule.first_normal_slot; - debug!("First normal slot: {}", first_normal_slot); - let sleep_slots = first_normal_slot.saturating_sub(current_slot); - if sleep_slots > 0 { - notifier.send(&format!( - "Waiting for warm-up epochs to complete (epoch {})", - epoch_schedule.first_normal_epoch - )); - utils::sleep_until_epoch( - &rpc_client, - ¬ifier, - &genesis_config, - current_slot, - epoch_schedule.first_normal_epoch, - ); - } - - debug!("Fetching stake config..."); - let stake_config_account = rpc_client - .get_account(&stake_config_id()) - .expect("failed to fetch stake config"); - let stake_config = StakeConfig::from(&stake_config_account).unwrap(); - - // Check if destake-net-nodes.sh should be run - { - let epoch_info = rpc_client.get_epoch_info().unwrap(); - let destake_net_nodes_epoch = value_t_or_exit!(matches, "destake_net_nodes_epoch", u64); - - if epoch_info.epoch >= destake_net_nodes_epoch { - info!( - "Current epoch {} >= destake_net_nodes_epoch of {}, skipping destake-net-nodes.sh", - epoch_info.epoch, destake_net_nodes_epoch - ); - } else { - info!( - "Waiting for destake-net-nodes epoch {}", - destake_net_nodes_epoch - ); - utils::sleep_until_epoch( - &rpc_client, - ¬ifier, - &genesis_config, - epoch_info.absolute_slot, - destake_net_nodes_epoch, - ); - - info!("Destaking net nodes..."); - Command::new("bash") - .args(&["destake-net-nodes.sh", &net_dir]) - .spawn() - .unwrap(); - info!("Done destaking net nodes"); - } - } - - // Wait for the next epoch, or --stake-activation-epoch - { - let epoch_info = rpc_client.get_epoch_info().unwrap(); - let activation_epoch = value_t!(matches, "stake_activation_epoch", u64) - .ok() - .unwrap_or(epoch_info.epoch - 1); - debug!("Current epoch info: {:?}", &epoch_info); - debug!("Activation epoch is: {:?}", activation_epoch); - stake::wait_for_warm_up( - activation_epoch, - epoch_info, - &rpc_client, - &stake_config, - &genesis_config, - ¬ifier, - ); - } - - let mut tps_sampler = tps::Sampler::new(&entrypoint_addr); - - loop { - notifier.send(&format!("Round {}!", tps_round)); - let tx_count = tx_count_for_round(tps_round, tx_count_baseline, tx_count_increment); - datapoint_info!( - "ramp-tps", - ("event", "round-start", String), - ("round", tps_round, i64), - ("tx_count", tx_count, i64) - ); - - let latest_slot = rpc_client.get_slot().unwrap_or_else(|err| { - utils::bail( - ¬ifier, - &format!("Error: get latest slot failed: {}", err), - ); - }); - sleep(Duration::from_secs(5)); - let round_start_slot = rpc_client.get_slot().unwrap_or_else(|err| { - utils::bail( - ¬ifier, - &format!("Error: get round start slot failed: {}", err), - ); - }); - if round_start_slot == latest_slot { - utils::bail( - ¬ifier, - &format!("Slot is not advancing from {}", latest_slot), - ); - } - - let starting_validators = voters::fetch_active_validators(&rpc_client); - datapoint_info!( - "ramp-tps", - ("event", "start-transactions", String), - ("round", tps_round, i64), - ("validators", starting_validators.len(), i64) - ); - - notifier.send(&format!( - "There are {} validators present:", - starting_validators.len() - )); - - let mut validators: Vec<_> = starting_validators - .keys() - .map(|node_pubkey| format!("* {}", pubkey_to_keybase(&node_pubkey))) - .collect(); - validators.sort(); - notifier.send(&validators.join("\n")); - - let client_tx_count = tx_count / NUM_BENCH_CLIENTS as u64; - notifier.send(&format!( - "Starting transactions for {} minutes (batch size={})", - round_minutes, tx_count, - )); - info!( - "Running bench-tps={}='--tx_count={} --thread-batch-sleep-ms={}'", - NUM_BENCH_CLIENTS, client_tx_count, THREAD_BATCH_SLEEP_MS - ); - for client_id in 0..NUM_BENCH_CLIENTS { - Command::new("bash") - .args(&[ - "wrapper-bench-tps.sh", - &net_dir, - &client_id.to_string(), - &client_tx_count.to_string(), - THREAD_BATCH_SLEEP_MS, - ]) - .spawn() - .unwrap(); - } - - let bench_warmup_secs = 60; - info!( - "Sleeping {}s to allow bench-tps to warmup", - bench_warmup_secs - ); - sleep(Duration::from_secs(bench_warmup_secs)); - - tps_sampler.start_sampling_thread(); - sleep(round_duration); - tps_sampler.stop_sampling_thread(); - - for client_id in 0..NUM_BENCH_CLIENTS { - Command::new("bash") - .args(&[ - "wrapper-bench-tps.sh", - &net_dir, - &client_id.to_string(), - "0", // Setting txCount to 0 will kill bench-tps - THREAD_BATCH_SLEEP_MS, - ]) - .spawn() - .unwrap(); - } - - datapoint_info!( - "ramp-tps", - ("event", "stop-transactions", String), - ("round", tps_round, i64), - ); - - notifier.send("Transactions stopped"); - tps_sampler.report_results(¬ifier); - - let remaining_validators = voters::fetch_active_validators(&rpc_client); - let remaining_keybase = remaining_validators - .keys() - .map(|k| pubkey_to_keybase(k)) - .collect(); - tps_round_results - .record(tps_round, remaining_keybase) - .unwrap_or_else(|err| { - warn!("Failed to record round results: {}", err); - }); - - if remaining_validators.is_empty() { - utils::bail(¬ifier, "No validators remain"); - } - - datapoint_info!( - "ramp-tps", - ("event", "calculate-leader-records", String), - ("round", tps_round, i64), - ("validators", remaining_validators.len(), i64) - ); - - let round_end_slot = rpc_client.get_slot().unwrap_or_else(|err| { - utils::bail( - ¬ifier, - &format!("Error: get round end slot failed: {}", err), - ); - }); - - let leader_records = voters::calculate_leader_records( - &rpc_client, - &epoch_schedule, - round_start_slot, - round_end_slot, - ¬ifier, - ) - .unwrap_or_else(|err| { - utils::bail( - ¬ifier, - &format!("Error: Could not calculate leader records: {}", err), - ); - }); - - voters::announce_results( - &starting_validators, - &remaining_validators, - pubkey_to_keybase.clone(), - &leader_records, - &mut notifier, - ); - - datapoint_info!( - "ramp-tps", - ("event", "gifting", String), - ("round", tps_round, i64) - ); - - let healthy_validators: Vec<_> = remaining_validators - .iter() - .filter(|(k, _)| leader_records.get(k).map(|r| r.healthy()).unwrap_or(false)) - .map(|(node_pubkey, vote_account_pubkey)| { - (pubkey_to_keybase(&node_pubkey), vote_account_pubkey) - }) - .collect(); - - let next_gift = gift_for_round(tps_round + 1, initial_balance); - voters::award_stake( - &rpc_client, - &faucet_keypair, - healthy_validators, - next_gift, - &mut notifier, - ); - - datapoint_info!( - "ramp-tps", - ("event", "new-stake-warmup", String), - ("round", tps_round, i64) - ); - - // Wait for stake to warm up before starting the next round - let epoch_info = rpc_client.get_epoch_info().unwrap(); - debug!("Current epoch info: {:?}", &epoch_info); - let current_epoch = epoch_info.epoch; - stake::wait_for_warm_up( - current_epoch, - epoch_info, - &rpc_client, - &stake_config, - &genesis_config, - ¬ifier, - ); - - tps_round += 1; - } -} diff --git a/ramp-tps/src/results.rs b/ramp-tps/src/results.rs deleted file mode 100644 index 03e1a7f708..0000000000 --- a/ramp-tps/src/results.rs +++ /dev/null @@ -1,92 +0,0 @@ -use log::*; -use serde::{Serialize, Serializer}; -use std::{ - collections::{BTreeMap, HashMap}, - error::Error, - fs::File, - io::ErrorKind, - process::exit, - str::FromStr, -}; - -const ROUND_KEY_PREFIX: &str = "round-"; - -#[derive(Eq, PartialEq, Ord, PartialOrd)] -struct Round(u32); - -impl Serialize for Round { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(&format!("{}{}", ROUND_KEY_PREFIX, self.0)) - } -} - -pub struct Results { - file_path: String, - results: BTreeMap>, -} - -impl Results { - /// Keep any result entries which occurred before the starting round. - #[allow(clippy::manual_strip)] - pub fn new( - file_path: String, - mut previous_results: HashMap>, - start_round: u32, - ) -> Self { - let mut results: BTreeMap> = BTreeMap::new(); - previous_results.drain().for_each(|(key, value)| { - if key.starts_with(ROUND_KEY_PREFIX) { - let round_str = &key[ROUND_KEY_PREFIX.len()..]; - if let Ok(round) = u32::from_str(round_str) { - if round < start_round { - results.insert(Round(round), value); - } - } - } - }); - - Results { file_path, results } - } - - // Reads the previous results file and if it exists, parses the contents - pub fn read(file_path: &str) -> HashMap> { - match File::open(file_path) { - Ok(file) => serde_yaml::from_reader(&file) - .map_err(|err| { - warn!("Failed to recover previous results: {}", err); - }) - .unwrap_or_default(), - Err(err) => match err.kind() { - ErrorKind::NotFound => { - // Check that we can write to this file - File::create(file_path).unwrap_or_else(|err| { - eprintln!( - "Error: Unable to create --results-file {}: {}", - file_path, err - ); - exit(1); - }); - HashMap::new() - } - err => { - eprintln!( - "Error: Unable to open --results-file {}: {:?}", - file_path, err - ); - exit(1); - } - }, - } - } - - /// Record the remaining validators after each TPS round - pub fn record(&mut self, round: u32, validators: Vec) -> Result<(), Box> { - self.results.insert(Round(round), validators); - let file = File::create(&self.file_path)?; - serde_yaml::to_writer(&file, &self.results)?; - Ok(()) - } -} diff --git a/ramp-tps/src/stake.rs b/ramp-tps/src/stake.rs deleted file mode 100644 index d8d7fc170b..0000000000 --- a/ramp-tps/src/stake.rs +++ /dev/null @@ -1,142 +0,0 @@ -use crate::utils; -use log::*; -use solana_client::rpc_client::RpcClient; -use solana_sdk::{ - account::from_account, - clock::Epoch, - epoch_info::EpochInfo, - genesis_config::GenesisConfig, - stake_history::StakeHistoryEntry, - sysvar::stake_history::{self, StakeHistory}, -}; -use solana_stake_program::config::Config as StakeConfig; -use std::{thread::sleep, time::Duration}; - -fn calculate_stake_warmup(mut stake_entry: StakeHistoryEntry, stake_config: &StakeConfig) -> u64 { - let mut epochs = 0; - loop { - let percent_warming_up = - stake_entry.activating as f64 / stake_entry.effective.max(1) as f64; - let percent_cooling_down = - stake_entry.deactivating as f64 / stake_entry.effective.max(1) as f64; - debug!( - "epoch +{}: stake warming up {:.1}%, cooling down {:.1}% ", - epochs, - percent_warming_up * 100., - percent_cooling_down * 100. - ); - - if (percent_warming_up < 0.05) && (percent_cooling_down < 0.05) { - break; - } - let warmup_cooldown_rate = stake_config.warmup_cooldown_rate; - let max_warmup_stake = (stake_entry.effective as f64 * warmup_cooldown_rate) as u64; - let warmup_stake = stake_entry.activating.min(max_warmup_stake); - stake_entry.effective += warmup_stake; - stake_entry.activating -= warmup_stake; - - let max_cooldown_stake = (stake_entry.effective as f64 * warmup_cooldown_rate) as u64; - let cooldown_stake = stake_entry.deactivating.min(max_cooldown_stake); - stake_entry.effective -= cooldown_stake; - stake_entry.deactivating -= cooldown_stake; - debug!( - "epoch +{}: stake warming up {}, cooling down {}", - epochs, warmup_stake, cooldown_stake - ); - - epochs += 1; - } - info!("95% stake warmup will take {} epochs", epochs); - epochs -} - -fn stake_history_entry(epoch: Epoch, rpc_client: &RpcClient) -> Option { - let stake_history_account = rpc_client.get_account(&stake_history::id()).ok()?; - let stake_history = from_account::(&stake_history_account)?; - stake_history.get(&epoch).cloned() -} - -/// Wait until stake warms up and return the current epoch -pub fn wait_for_warm_up( - activation_epoch: Epoch, - mut epoch_info: EpochInfo, - rpc_client: &RpcClient, - stake_config: &StakeConfig, - genesis_config: &GenesisConfig, - notifier: &solana_notifier::Notifier, -) { - // Sleep until activation_epoch has finished - if epoch_info.epoch <= activation_epoch { - notifier.send(&format!( - "Waiting until epoch {} is finished...", - activation_epoch - )); - - utils::sleep_until_epoch( - rpc_client, - notifier, - genesis_config, - epoch_info.absolute_slot, - activation_epoch + 1, - ); - } - - loop { - epoch_info = rpc_client.get_epoch_info().unwrap_or_else(|err| { - utils::bail( - notifier, - &format!("Error: get_epoch_info RPC call failed: {}", err), - ); - }); - - let current_slot = epoch_info.absolute_slot; - info!("Current slot is {}", current_slot); - - let current_epoch = epoch_info.epoch; - let latest_epoch = current_epoch - 1; - debug!( - "Fetching stake history entry for epoch: {}...", - latest_epoch - ); - - if let Some(stake_entry) = stake_history_entry(latest_epoch, &rpc_client) { - debug!("Stake history entry: {:?}", &stake_entry); - let warm_up_epochs = calculate_stake_warmup(stake_entry, stake_config); - let stake_warmed_up_epoch = latest_epoch + warm_up_epochs; - if stake_warmed_up_epoch > current_epoch { - notifier.send(&format!( - "Waiting until epoch {} for stake to warmup (current epoch is {})...", - stake_warmed_up_epoch, current_epoch - )); - utils::sleep_until_epoch( - rpc_client, - notifier, - genesis_config, - current_slot, - stake_warmed_up_epoch, - ); - } else { - break; - } - } else { - warn!( - "Failed to fetch stake history entry for epoch: {}", - latest_epoch - ); - sleep(Duration::from_secs(5)); - } - - let latest_slot = rpc_client.get_slot().unwrap_or_else(|err| { - utils::bail( - notifier, - &format!("Error: get_slot RPC call 3 failed: {}", err), - ); - }); - if current_slot == latest_slot { - utils::bail( - notifier, - &format!("Error: Slot did not advance from {}", current_slot), - ); - } - } -} diff --git a/ramp-tps/src/tps.rs b/ramp-tps/src/tps.rs deleted file mode 100644 index 96db434a37..0000000000 --- a/ramp-tps/src/tps.rs +++ /dev/null @@ -1,72 +0,0 @@ -use log::*; -use solana_client::perf_utils::{sample_txs, SampleStats}; -use solana_client::thin_client::ThinClient; -use solana_notifier::Notifier; -use solana_sdk::timing::duration_as_s; -use std::{ - net::SocketAddr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, - }, - thread::{Builder, JoinHandle}, -}; - -pub struct Sampler { - client: Arc, - exit_signal: Arc, - maxes: Arc>>, - handle: Option>, -} - -impl Sampler { - pub fn new(rpc_addr: &SocketAddr) -> Self { - let (_, dummy_socket) = - solana_net_utils::bind_in_range(rpc_addr.ip(), (8000, 10_000)).unwrap(); - let dummy_tpu_addr = *rpc_addr; - let client = Arc::new(ThinClient::new(*rpc_addr, dummy_tpu_addr, dummy_socket)); - - Self { - client, - exit_signal: Arc::new(AtomicBool::new(false)), - maxes: Arc::new(RwLock::new(Vec::new())), - handle: None, - } - } - - // Setup a thread to sample every period and - // collect the max transaction rate and total tx count seen - pub fn start_sampling_thread(&mut self) { - // Reset - self.exit_signal.store(false, Ordering::Relaxed); - self.maxes.write().unwrap().clear(); - - let sample_period = 5; // in seconds - info!("Sampling TPS every {} seconds...", sample_period); - let exit_signal = self.exit_signal.clone(); - let maxes = self.maxes.clone(); - let client = self.client.clone(); - let handle = Builder::new() - .name("solana-client-sample".to_string()) - .spawn(move || { - sample_txs(&exit_signal, &maxes, sample_period, &client); - }) - .unwrap(); - - self.handle = Some(handle); - } - - pub fn stop_sampling_thread(&mut self) { - self.exit_signal.store(true, Ordering::Relaxed); - self.handle.take().unwrap().join().unwrap(); - } - - pub fn report_results(&self, notifier: &Notifier) { - let SampleStats { tps, elapsed, txs } = self.maxes.read().unwrap()[0].1; - let avg_tps = txs as f32 / duration_as_s(&elapsed); - notifier.send(&format!( - "Highest TPS: {:.0}, Average TPS: {:.0}", - tps, avg_tps - )); - } -} diff --git a/ramp-tps/src/utils.rs b/ramp-tps/src/utils.rs deleted file mode 100644 index 6a6052e472..0000000000 --- a/ramp-tps/src/utils.rs +++ /dev/null @@ -1,168 +0,0 @@ -use bzip2::bufread::BzDecoder; -use log::*; -use solana_client::rpc_client::RpcClient; -use solana_net_utils::parse_host; -use solana_notifier::Notifier; -use solana_sdk::{ - clock::{Epoch, Slot}, - genesis_config::GenesisConfig, - timing::duration_as_ms, -}; -use std::{ - fs::File, - io, - net::SocketAddr, - path::Path, - thread::sleep, - time::{Duration, Instant}, -}; -use tar::Archive; - -const GENESIS_ARCHIVE_NAME: &str = "genesis.tar.bz2"; - -/// Inspired by solana_local_cluster::cluster_tests -fn slots_to_secs(num_slots: u64, genesis_config: &GenesisConfig) -> u64 { - let poh_config = &genesis_config.poh_config; - let ticks_per_slot = genesis_config.ticks_per_slot; - let num_ticks_to_sleep = num_slots as f64 * ticks_per_slot as f64; - let num_ticks_per_second = (1000 / duration_as_ms(&poh_config.target_tick_duration)) as f64; - ((num_ticks_to_sleep + num_ticks_per_second - 1.0) / num_ticks_per_second) as u64 -} - -fn sleep_n_slots(num_slots: u64, genesis_config: &GenesisConfig) { - let secs = slots_to_secs(num_slots, genesis_config); - let mins = secs / 60; - let hours = mins / 60; - if hours >= 5 { - debug!("Sleeping for {} slots ({} hours)", num_slots, hours); - } else if mins >= 5 { - debug!("Sleeping for {} slots ({} minutes)", num_slots, mins); - } else if secs > 0 { - debug!("Sleeping for {} slots ({} seconds)", num_slots, secs); - } - sleep(Duration::from_secs(secs)); -} - -/// Sleep until the target epoch has started or bail if cluster is stuck -pub fn sleep_until_epoch( - rpc_client: &RpcClient, - notifier: &Notifier, - genesis_config: &GenesisConfig, - mut current_slot: Slot, - target_epoch: Epoch, -) { - let target_slot = genesis_config - .epoch_schedule - .get_first_slot_in_epoch(target_epoch); - info!( - "sleep_until_epoch() target_epoch: {}, target_slot: {}", - target_epoch, target_slot - ); - - loop { - let sleep_slots = target_slot.saturating_sub(current_slot); - if sleep_slots == 0 { - break; - } - - sleep_n_slots(sleep_slots.max(50), genesis_config); - let latest_slot = rpc_client.get_slot().unwrap_or_else(|err| { - bail( - notifier, - &format!("Error: Could not fetch current slot: {}", err), - ); - }); - - if current_slot == latest_slot { - bail( - notifier, - &format!("Error: Slot did not advance from {}", current_slot), - ); - } else { - current_slot = latest_slot; - } - } -} - -pub fn is_host(string: String) -> Result<(), String> { - parse_host(&string)?; - Ok(()) -} - -pub fn bail(notifier: &Notifier, msg: &str) -> ! { - notifier.send(msg); - sleep(Duration::from_secs(30)); // Wait for notifications to send - std::process::exit(1); -} - -/// Inspired by solana_validator::download_tar_bz2 -pub fn download_genesis(rpc_addr: &SocketAddr, download_path: &Path) -> Result<(), String> { - let archive_name = GENESIS_ARCHIVE_NAME; - let archive_path = download_path.join(archive_name); - let url = format!("http://{}/{}", rpc_addr, archive_name); - let download_start = Instant::now(); - debug!("Downloading genesis ({})...", url); - - let client = reqwest::blocking::Client::new(); - let mut response = client - .get(url.as_str()) - .send() - .and_then(|response| response.error_for_status()) - .map_err(|err| format!("Unable to get: {:?}", err))?; - let download_size = { - response - .headers() - .get(reqwest::header::CONTENT_LENGTH) - .and_then(|content_length| content_length.to_str().ok()) - .and_then(|content_length| content_length.parse().ok()) - .unwrap_or(0) - }; - - let mut file = File::create(&archive_path) - .map_err(|err| format!("Unable to create {:?}: {:?}", archive_path, err))?; - io::copy(&mut response, &mut file) - .map_err(|err| format!("Unable to write {:?}: {:?}", archive_path, err))?; - - debug!( - "Downloaded genesis ({} bytes) in {:?}", - download_size, - Instant::now().duration_since(download_start), - ); - - debug!("Extracting genesis ({})...", archive_name); - let extract_start = Instant::now(); - let tar_bz2 = File::open(&archive_path) - .map_err(|err| format!("Unable to open {}: {:?}", archive_name, err))?; - let tar = BzDecoder::new(io::BufReader::new(tar_bz2)); - let mut archive = Archive::new(tar); - archive - .unpack(download_path) - .map_err(|err| format!("Unable to unpack {}: {:?}", archive_name, err))?; - debug!( - "Extracted {} in {:?}", - archive_name, - Instant::now().duration_since(extract_start) - ); - - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_slots_to_secs() { - let mut genesis_config = GenesisConfig::default(); - genesis_config.poh_config.target_tick_duration = Duration::from_millis(500); - - genesis_config.ticks_per_slot = 10; - assert_eq!(slots_to_secs(2, &genesis_config), 10); - - genesis_config.ticks_per_slot = 1; - assert_eq!(slots_to_secs(1, &genesis_config), 1); - - genesis_config.ticks_per_slot = 0; - assert_eq!(slots_to_secs(10, &genesis_config), 0); - } -} diff --git a/ramp-tps/src/voters.rs b/ramp-tps/src/voters.rs deleted file mode 100644 index 5b8400da49..0000000000 --- a/ramp-tps/src/voters.rs +++ /dev/null @@ -1,247 +0,0 @@ -use crate::utils; -use log::*; -use solana_client::{client_error::Result as ClientResult, rpc_client::RpcClient}; -use solana_notifier::Notifier; -use solana_sdk::{ - clock::Slot, - epoch_schedule::EpochSchedule, - message::Message, - native_token::sol_to_lamports, - pubkey::Pubkey, - signature::{Keypair, Signer}, - transaction::Transaction, -}; -use solana_stake_program::{ - stake_instruction, - stake_state::{Authorized as StakeAuthorized, Lockup}, -}; -use std::{ - collections::{HashMap, HashSet}, - rc::Rc, - str::FromStr, - thread::sleep, - time::Duration, -}; - -// The percentage of leader slots that validators complete in order to receive the stake -// reward at the end of a TPS round. -const MIN_LEADER_SLOT_PCT: f64 = 80.0; - -#[derive(Default)] -pub struct LeaderRecord { - total_slots: u64, - missed_slots: u64, -} - -impl LeaderRecord { - pub fn completed_slot_pct(&self) -> f64 { - if self.total_slots == 0 { - 0f64 - } else { - let completed_slots = self.total_slots - self.missed_slots; - 100f64 * completed_slots as f64 / self.total_slots as f64 - } - } - - pub fn healthy(&self) -> bool { - self.completed_slot_pct() >= MIN_LEADER_SLOT_PCT - } -} - -/// Calculate the leader record for each active validator -pub fn calculate_leader_records( - rpc_client: &RpcClient, - epoch_schedule: &EpochSchedule, - start_slot: Slot, - end_slot: Slot, - notifier: &Notifier, -) -> ClientResult> { - let start_epoch = epoch_schedule.get_epoch(start_slot); - let end_epoch = epoch_schedule.get_epoch(end_slot); - let confirmed_blocks: HashSet<_> = rpc_client - .get_blocks(start_slot, Some(end_slot))? - .into_iter() - .collect(); - - let mut leader_records = HashMap::::new(); - for epoch in start_epoch..=end_epoch { - let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch); - let start_slot = std::cmp::max(start_slot, first_slot_in_epoch); - let last_slot_in_epoch = epoch_schedule.get_last_slot_in_epoch(epoch); - let end_slot = std::cmp::min(end_slot, last_slot_in_epoch); - - rpc_client - .get_leader_schedule(Some(start_slot))? - .unwrap_or_else(|| utils::bail(notifier, "Error: Leader schedule was not found")) - .into_iter() - .map(|(pk, s)| (Pubkey::from_str(&pk).unwrap(), s)) - .for_each(|(pubkey, leader_slots)| { - let mut record = leader_records.entry(pubkey).or_default(); - for slot_index in leader_slots.iter() { - let slot = (*slot_index as u64) + first_slot_in_epoch; - if slot >= start_slot && slot <= end_slot { - record.total_slots += 1; - if !confirmed_blocks.contains(&slot) { - record.missed_slots += 1; - } - } - } - }); - } - - Ok(leader_records) -} - -pub fn fetch_active_validators(rpc_client: &RpcClient) -> HashMap { - match rpc_client.get_vote_accounts() { - Err(err) => { - warn!("Failed to get_vote_accounts(): {}", err); - HashMap::new() - } - Ok(vote_accounts) => vote_accounts - .current - .into_iter() - .filter_map(|info| { - if let (Ok(node_pubkey), Ok(vote_pubkey)) = ( - Pubkey::from_str(&info.node_pubkey), - Pubkey::from_str(&info.vote_pubkey), - ) { - Some((node_pubkey, vote_pubkey)) - } else { - None - } - }) - .collect(), - } -} - -/// Endlessly retry stake delegation until success -fn delegate_stake( - rpc_client: &RpcClient, - faucet_keypair: &Keypair, - vote_account_pubkey: &Pubkey, - sol_gift: u64, -) { - let stake_account_keypair = Keypair::new(); - info!( - "delegate_stake: stake pubkey: {}", - stake_account_keypair.pubkey() - ); - let mut retry_count = 0; - loop { - let recent_blockhash = loop { - match rpc_client.get_recent_blockhash() { - Ok(response) => break response.0, - Err(err) => { - error!("Failed to get recent blockhash: {}", err); - sleep(Duration::from_secs(5)); - } - } - }; - - let instructions = stake_instruction::create_account_and_delegate_stake( - &faucet_keypair.pubkey(), - &stake_account_keypair.pubkey(), - &vote_account_pubkey, - &StakeAuthorized::auto(&faucet_keypair.pubkey()), - &Lockup::default(), - sol_to_lamports(sol_gift as f64), - ); - let message = Message::new(&instructions, Some(&faucet_keypair.pubkey())); - let transaction = Transaction::new( - &[faucet_keypair, &stake_account_keypair], - message, - recent_blockhash, - ); - - // Check if stake was delegated but just failed to confirm on an earlier attempt - if retry_count > 0 { - if let Ok(stake_account) = rpc_client.get_account(&stake_account_keypair.pubkey()) { - if stake_account.owner == solana_stake_program::id() { - break; - } - } - } - - if let Err(err) = rpc_client.send_and_confirm_transaction(&transaction) { - error!( - "Failed to delegate stake (retries: {}): {}", - retry_count, err - ); - retry_count += 1; - sleep(Duration::from_secs(5)); - } else { - break; - } - } -} - -/// Announce validator status leader slot performance -pub fn announce_results( - starting_validators: &HashMap, - remaining_validators: &HashMap, - pubkey_to_keybase: Rc String>, - leader_records: &HashMap, - notifier: &mut Notifier, -) { - let buffer_records = |keys: Vec<&Pubkey>, notifier: &mut Notifier| { - if keys.is_empty() { - notifier.send("* None"); - return; - } - - let mut validators = vec![]; - for pubkey in keys { - let name = pubkey_to_keybase(pubkey); - if let Some(record) = leader_records.get(pubkey) { - validators.push(format!( - "* {} ({:.1}% leader efficiency)", - name, - record.completed_slot_pct() - )); - } - } - validators.sort(); - notifier.send(&validators.join("\n")); - }; - - let healthy: Vec<_> = remaining_validators - .keys() - .filter(|k| leader_records.get(k).map(|r| r.healthy()).unwrap_or(false)) - .collect(); - - let unhealthy: Vec<_> = remaining_validators - .keys() - .filter(|k| leader_records.get(k).map(|r| !r.healthy()).unwrap_or(true)) - .collect(); - - let inactive: Vec<_> = starting_validators - .keys() - .filter(|k| !remaining_validators.contains_key(k)) - .collect(); - - notifier.send("Healthy Validators:"); - buffer_records(healthy, notifier); - notifier.send("Unhealthy Validators:"); - buffer_records(unhealthy, notifier); - notifier.send("Inactive Validators:"); - buffer_records(inactive, notifier); -} - -/// Award stake to the surviving validators by delegating stake to their vote account -pub fn award_stake( - rpc_client: &RpcClient, - faucet_keypair: &Keypair, - voters: Vec<(String, &Pubkey)>, - sol_gift: u64, - notifier: &mut Notifier, -) { - let mut buffer = vec![]; - - for (node_pubkey, vote_account_pubkey) in voters { - info!("Delegate {} SOL to {}", sol_gift, node_pubkey); - delegate_stake(rpc_client, faucet_keypair, vote_account_pubkey, sol_gift); - buffer.push(format!("Delegated {} SOL to {}", sol_gift, node_pubkey)); - } - notifier.send(&buffer.join("\n")); -}