From 124287a0ea857010d677480e1bfce3bdb0c84775 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 17 Apr 2020 20:13:38 -0700 Subject: [PATCH] Add ramp-tps --- Cargo.lock | 21 ++ Cargo.toml | 1 + ramp-tps/Cargo.toml | 26 +++ ramp-tps/src/main.rs | 494 +++++++++++++++++++++++++++++++++++++++ ramp-tps/src/notifier.rs | 92 ++++++++ ramp-tps/src/results.rs | 92 ++++++++ ramp-tps/src/stake.rs | 143 ++++++++++++ ramp-tps/src/tps.rs | 72 ++++++ ramp-tps/src/utils.rs | 168 +++++++++++++ ramp-tps/src/voters.rs | 247 ++++++++++++++++++++ 10 files changed, 1356 insertions(+) create mode 100644 ramp-tps/Cargo.toml create mode 100644 ramp-tps/src/main.rs create mode 100644 ramp-tps/src/notifier.rs create mode 100644 ramp-tps/src/results.rs create mode 100644 ramp-tps/src/stake.rs create mode 100644 ramp-tps/src/tps.rs create mode 100644 ramp-tps/src/utils.rs create mode 100644 ramp-tps/src/voters.rs diff --git a/Cargo.lock b/Cargo.lock index 2ede9687f0..d7d85fd296 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4461,6 +4461,27 @@ dependencies = [ "solana-sdk 1.2.0", ] +[[package]] +name = "solana-ramp-tps" +version = "1.2.0" +dependencies = [ + "bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "reqwest 0.10.4 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)", + "solana-client 1.2.0", + "solana-core 1.2.0", + "solana-logger 1.2.0", + "solana-metrics 1.2.0", + "solana-net-utils 1.2.0", + "solana-sdk 1.2.0", + "solana-stake-program 1.2.0", + "tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "solana-rayon-threadlimit" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 57f0164520..e11a159e16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ members = [ "archiver-lib", "archiver-utils", "remote-wallet", + "ramp-tps", "runtime", "sdk", "sdk-c", diff --git a/ramp-tps/Cargo.toml b/ramp-tps/Cargo.toml new file mode 100644 index 0000000000..9705f136a1 --- /dev/null +++ b/ramp-tps/Cargo.toml @@ -0,0 +1,26 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-ramp-tps" +description = "Solana Tour de SOL - TPS ramp up" +version = "1.2.0" +repository = "https://github.com/solana-labs/tour-de-sol" +license = "Apache-2.0" +homepage = "https://solana.com/" + +[dependencies] +bzip2 = "0.3.3" +clap = "2.33.0" +log = "0.4.8" +reqwest = { version = "0.10.4", default-features = false } +serde = "1.0.106" +serde_json = "1.0.51" +serde_yaml = "0.8.11" +solana-core = { path = "../core", version = "1.2.0" } +solana-client = { path = "../client", version = "1.2.0" } +solana-logger = { path = "../logger", version = "1.2.0" } +solana-metrics = { path = "../metrics", version = "1.2.0" } +solana-net-utils = { path = "../net-utils", version = "1.2.0" } +solana-sdk = { path = "../sdk", version = "1.2.0" } +solana-stake-program = { path = "../programs/stake", version = "1.2.0" } +tar = "0.4.26" diff --git a/ramp-tps/src/main.rs b/ramp-tps/src/main.rs new file mode 100644 index 0000000000..dcc8132795 --- /dev/null +++ b/ramp-tps/src/main.rs @@ -0,0 +1,494 @@ +//! Ramp up TPS for Tour de SOL until all validators drop out + +mod notifier; +mod results; +mod stake; +mod tps; +mod utils; +mod voters; + +use clap::{crate_description, crate_name, crate_version, value_t, value_t_or_exit, App, Arg}; +use log::*; +use results::Results; +use solana_client::rpc_client::RpcClient; +use solana_metrics::datapoint_info; +use solana_sdk::{genesis_config::GenesisConfig, signature::read_keypair_file}; +use solana_stake_program::config::{id as stake_config_id, Config as StakeConfig}; +use std::{ + collections::HashMap, + fs, + path::PathBuf, + process::{exit, Command}, + rc::Rc, + thread::sleep, + time::Duration, +}; + +const NUM_BENCH_CLIENTS: usize = 2; +const TDS_ENTRYPOINT: &str = "tds.solana.com"; +const TMP_LEDGER_PATH: &str = ".tmp/ledger"; +const FAUCET_KEYPAIR_PATH: &str = "faucet-keypair.json"; +const PUBKEY_MAP_FILE: &str = "validators/all-username.yml"; +const RESULTS_FILE: &str = "results.yml"; +const DEFAULT_TX_COUNT_BASELINE: &str = "5000"; +const DEFAULT_TX_COUNT_INCREMENT: &str = "5000"; +const DEFAULT_TPS_ROUND_MINUTES: &str = "60"; +const THREAD_BATCH_SLEEP_MS: &str = "1000"; +const DEFAULT_INITIAL_SOL_BALANCE: &str = "1"; + +// Transaction count increments linearly each round +fn tx_count_for_round(tps_round: u32, base: u64, incr: u64) -> u64 { + base + u64::from(tps_round - 1) * incr +} + +// Gift will double the staked lamports each round. +fn gift_for_round(tps_round: u32, initial_balance: u64) -> u64 { + if tps_round > 1 { + initial_balance * 2u64.pow(tps_round - 2) + } else { + 0 + } +} + +#[allow(clippy::cognitive_complexity)] +fn main() { + solana_logger::setup_with_default("solana=debug"); + solana_metrics::set_panic_hook("ramp-tps"); + let mut notifier = notifier::Notifier::new(); + + let matches = App::new(crate_name!()) + .about(crate_description!()) + .version(crate_version!()) + .arg( + Arg::with_name("faucet_keypair_path") + .long("faucet-keypair-path") + .short("k") + .value_name("PATH") + .takes_value(true) + .default_value(FAUCET_KEYPAIR_PATH) + .help("Path to the faucet keypair for stake award distribution"), + ) + .arg( + Arg::with_name("net_dir") + .long("net-dir") + .value_name("DIR") + .takes_value(true) + .help("The directory used for running commands on the cluster"), + ) + .arg( + Arg::with_name("pubkey_map_file") + .long("pubkey-map-file") + .value_name("FILE") + .default_value(PUBKEY_MAP_FILE) + .takes_value(true) + .help("YAML file that maps validator identity pubkeys to keybase user id"), + ) + .arg( + Arg::with_name("results_file") + .long("results-file") + .value_name("FILE") + .default_value(RESULTS_FILE) + .takes_value(true) + .help("YAML file that lists the results for each round"), + ) + .arg( + Arg::with_name("round") + .long("round") + .value_name("NUM") + .takes_value(true) + .default_value("1") + .help("The starting round of TPS ramp up"), + ) + .arg( + Arg::with_name("round_minutes") + .long("round-minutes") + .value_name("NUM") + .takes_value(true) + .default_value(DEFAULT_TPS_ROUND_MINUTES) + .help("The duration in minutes of a TPS round"), + ) + .arg( + Arg::with_name("tx_count_baseline") + .long("tx-count-baseline") + .value_name("NUM") + .takes_value(true) + .default_value(DEFAULT_TX_COUNT_BASELINE) + .help("The tx-count of round 1"), + ) + .arg( + Arg::with_name("tx_count_increment") + .long("tx-count-increment") + .value_name("NUM") + .takes_value(true) + .default_value(DEFAULT_TX_COUNT_INCREMENT) + .help("The tx-count increment for the next round"), + ) + .arg( + Arg::with_name("initial_balance") + .long("initial-balance") + .value_name("SOL") + .takes_value(true) + .default_value(DEFAULT_INITIAL_SOL_BALANCE) + .help("The number of SOL that each partipant started with"), + ) + .arg( + Arg::with_name("entrypoint") + .short("n") + .long("entrypoint") + .value_name("HOST") + .takes_value(true) + .default_value(TDS_ENTRYPOINT) + .validator(utils::is_host) + .help("The entrypoint used for RPC calls"), + ) + .arg( + Arg::with_name("stake_activation_epoch") + .long("stake-activation-epoch") + .value_name("NUM") + .takes_value(true) + .help("The stake activated in this epoch must fully warm up before the first round begins"), + ) + .arg( + Arg::with_name("destake_net_nodes_epoch") + .long("destake-net-nodes-epoch") + .value_name("NUM") + .takes_value(true) + .default_value("9") + .help("The epoch for which to run destake-net-nodes.sh at"), + ) + .get_matches(); + + let pubkey_map_file = value_t_or_exit!(matches, "pubkey_map_file", String); + let pubkey_map: HashMap = + serde_yaml::from_reader(fs::File::open(&pubkey_map_file).unwrap_or_else(|err| { + eprintln!( + "Error: Unable to open --pubkey-map-file {}: {}", + pubkey_map_file, err + ); + exit(1); + })) + .unwrap_or_else(|err| { + eprintln!( + "Error: Unable to parse --pubkey-map-file {}: {}", + pubkey_map_file, err + ); + exit(1); + }); + let pubkey_to_keybase = Rc::new(move |pubkey: &solana_sdk::pubkey::Pubkey| -> String { + let pubkey = pubkey.to_string(); + match pubkey_map.get(&pubkey) { + Some(keybase) => format!("{} ({})", keybase, pubkey), + None => pubkey, + } + }); + + let net_dir = value_t_or_exit!(matches, "net_dir", String); + let faucet_keypair_path = value_t_or_exit!(matches, "faucet_keypair_path", String); + let faucet_keypair = read_keypair_file(&faucet_keypair_path) + .unwrap_or_else(|err| panic!("Unable to read {}: {}", faucet_keypair_path, err)); + let mut tps_round = value_t_or_exit!(matches, "round", u32).max(1); + let results_file_name = value_t_or_exit!(matches, "results_file", String); + let previous_results = Results::read(&results_file_name); + let mut tps_round_results = Results::new(results_file_name, previous_results, tps_round); + let tx_count_baseline = value_t_or_exit!(matches, "tx_count_baseline", u64); + let tx_count_increment = value_t_or_exit!(matches, "tx_count_increment", u64); + let round_minutes = value_t_or_exit!(matches, "round_minutes", u64).max(1); + let round_duration = Duration::from_secs(round_minutes * 60); + let initial_balance = value_t_or_exit!(matches, "initial_balance", u64); + let tmp_ledger_path = PathBuf::from(TMP_LEDGER_PATH); + let _ = fs::remove_dir_all(&tmp_ledger_path); + fs::create_dir_all(&tmp_ledger_path).expect("failed to create temp ledger path"); + + notifier.notify("Hi!"); + datapoint_info!("ramp-tps", ("event", "boot", String),); + + let entrypoint_str = matches.value_of("entrypoint").unwrap(); + debug!("Connecting to {}", entrypoint_str); + let entrypoint_addr = solana_net_utils::parse_host_port(&format!("{}:8899", entrypoint_str)) + .expect("failed to parse entrypoint address"); + utils::download_genesis(&entrypoint_addr, &tmp_ledger_path).expect("genesis download failed"); + let genesis_config = + GenesisConfig::load(&tmp_ledger_path).expect("failed to load genesis block"); + + debug!("Fetching current slot..."); + let rpc_client = RpcClient::new_socket_with_timeout(entrypoint_addr, Duration::from_secs(10)); + let current_slot = rpc_client.get_slot().expect("failed to fetch current slot"); + debug!("Current slot: {}", current_slot); + let epoch_schedule = &genesis_config.epoch_schedule; + let first_normal_slot = epoch_schedule.first_normal_slot; + debug!("First normal slot: {}", first_normal_slot); + let sleep_slots = first_normal_slot.saturating_sub(current_slot); + if sleep_slots > 0 { + notifier.notify(&format!( + "Waiting for warm-up epochs to complete (epoch {})", + epoch_schedule.first_normal_epoch + )); + utils::sleep_until_epoch( + &rpc_client, + ¬ifier, + &genesis_config, + current_slot, + epoch_schedule.first_normal_epoch, + ); + } + + debug!("Fetching stake config..."); + let stake_config_account = rpc_client + .get_account(&stake_config_id()) + .expect("failed to fetch stake config"); + let stake_config = StakeConfig::from(&stake_config_account).unwrap(); + + // Check if destake-net-nodes.sh should be run + { + let epoch_info = rpc_client.get_epoch_info().unwrap(); + let destake_net_nodes_epoch = value_t_or_exit!(matches, "destake_net_nodes_epoch", u64); + + if epoch_info.epoch >= destake_net_nodes_epoch { + info!( + "Current epoch {} >= destake_net_nodes_epoch of {}, skipping destake-net-nodes.sh", + epoch_info.epoch, destake_net_nodes_epoch + ); + } else { + info!( + "Waiting for destake-net-nodes epoch {}", + destake_net_nodes_epoch + ); + utils::sleep_until_epoch( + &rpc_client, + ¬ifier, + &genesis_config, + epoch_info.absolute_slot, + destake_net_nodes_epoch, + ); + + info!("Destaking net nodes..."); + Command::new("bash") + .args(&["destake-net-nodes.sh", &net_dir]) + .spawn() + .unwrap(); + info!("Done destaking net nodes"); + } + } + + // Wait for the next epoch, or --stake-activation-epoch + { + let epoch_info = rpc_client.get_epoch_info().unwrap(); + let activation_epoch = value_t!(matches, "stake_activation_epoch", u64) + .ok() + .unwrap_or(epoch_info.epoch - 1); + debug!("Current epoch info: {:?}", &epoch_info); + debug!("Activation epoch is: {:?}", activation_epoch); + stake::wait_for_warm_up( + activation_epoch, + epoch_info, + &rpc_client, + &stake_config, + &genesis_config, + ¬ifier, + ); + } + + let mut tps_sampler = tps::Sampler::new(&entrypoint_addr); + + loop { + notifier.notify(&format!("Round {}!", tps_round)); + let tx_count = tx_count_for_round(tps_round, tx_count_baseline, tx_count_increment); + datapoint_info!( + "ramp-tps", + ("event", "round-start", String), + ("round", tps_round, i64), + ("tx_count", tx_count, i64) + ); + + let latest_slot = rpc_client.get_slot().unwrap_or_else(|err| { + utils::bail( + ¬ifier, + &format!("Error: get latest slot failed: {}", err), + ); + }); + sleep(Duration::from_secs(5)); + let round_start_slot = rpc_client.get_slot().unwrap_or_else(|err| { + utils::bail( + ¬ifier, + &format!("Error: get round start slot failed: {}", err), + ); + }); + if round_start_slot == latest_slot { + utils::bail( + ¬ifier, + &format!("Slot is not advancing from {}", latest_slot), + ); + } + + let starting_validators = voters::fetch_active_validators(&rpc_client); + datapoint_info!( + "ramp-tps", + ("event", "start-transactions", String), + ("round", tps_round, i64), + ("validators", starting_validators.len(), i64) + ); + + notifier.buffer(format!( + "There are {} validators present:", + starting_validators.len() + )); + + let mut validators: Vec<_> = starting_validators + .keys() + .map(|node_pubkey| format!("* {}", pubkey_to_keybase(&node_pubkey))) + .collect(); + validators.sort(); + notifier.buffer_vec(validators); + notifier.flush(); + + let client_tx_count = tx_count / NUM_BENCH_CLIENTS as u64; + notifier.notify(&format!( + "Starting transactions for {} minutes (batch size={})", + round_minutes, tx_count, + )); + info!( + "Running bench-tps={}='--tx_count={} --thread-batch-sleep-ms={}'", + NUM_BENCH_CLIENTS, client_tx_count, THREAD_BATCH_SLEEP_MS + ); + for client_id in 0..NUM_BENCH_CLIENTS { + Command::new("bash") + .args(&[ + "wrapper-bench-tps.sh", + &net_dir, + &client_id.to_string(), + &client_tx_count.to_string(), + THREAD_BATCH_SLEEP_MS, + ]) + .spawn() + .unwrap(); + } + + let bench_warmup_secs = 60; + info!( + "Sleeping {}s to allow bench-tps to warmup", + bench_warmup_secs + ); + sleep(Duration::from_secs(bench_warmup_secs)); + + tps_sampler.start_sampling_thread(); + sleep(round_duration); + tps_sampler.stop_sampling_thread(); + + for client_id in 0..NUM_BENCH_CLIENTS { + Command::new("bash") + .args(&[ + "wrapper-bench-tps.sh", + &net_dir, + &client_id.to_string(), + "0", // Setting txCount to 0 will kill bench-tps + THREAD_BATCH_SLEEP_MS, + ]) + .spawn() + .unwrap(); + } + + datapoint_info!( + "ramp-tps", + ("event", "stop-transactions", String), + ("round", tps_round, i64), + ); + + notifier.notify("Transactions stopped"); + tps_sampler.report_results(¬ifier); + + let remaining_validators = voters::fetch_active_validators(&rpc_client); + let remaining_keybase = remaining_validators + .keys() + .map(|k| pubkey_to_keybase(k)) + .collect(); + tps_round_results + .record(tps_round, remaining_keybase) + .unwrap_or_else(|err| { + warn!("Failed to record round results: {}", err); + }); + + if remaining_validators.is_empty() { + utils::bail(¬ifier, "No validators remain"); + } + + datapoint_info!( + "ramp-tps", + ("event", "calculate-leader-records", String), + ("round", tps_round, i64), + ("validators", remaining_validators.len(), i64) + ); + + let round_end_slot = rpc_client.get_slot().unwrap_or_else(|err| { + utils::bail( + ¬ifier, + &format!("Error: get round end slot failed: {}", err), + ); + }); + + let leader_records = voters::calculate_leader_records( + &rpc_client, + &epoch_schedule, + round_start_slot, + round_end_slot, + ¬ifier, + ) + .unwrap_or_else(|err| { + utils::bail( + ¬ifier, + &format!("Error: Could not calculate leader records: {}", err), + ); + }); + + voters::announce_results( + &starting_validators, + &remaining_validators, + pubkey_to_keybase.clone(), + &leader_records, + &mut notifier, + ); + + datapoint_info!( + "ramp-tps", + ("event", "gifting", String), + ("round", tps_round, i64) + ); + + let healthy_validators: Vec<_> = remaining_validators + .iter() + .filter(|(k, _)| leader_records.get(k).map(|r| r.healthy()).unwrap_or(false)) + .map(|(node_pubkey, vote_account_pubkey)| { + (pubkey_to_keybase(&node_pubkey), vote_account_pubkey) + }) + .collect(); + + let next_gift = gift_for_round(tps_round + 1, initial_balance); + voters::award_stake( + &rpc_client, + &faucet_keypair, + healthy_validators, + next_gift, + &mut notifier, + ); + + datapoint_info!( + "ramp-tps", + ("event", "new-stake-warmup", String), + ("round", tps_round, i64) + ); + + // Wait for stake to warm up before starting the next round + let epoch_info = rpc_client.get_epoch_info().unwrap(); + debug!("Current epoch info: {:?}", &epoch_info); + let current_epoch = epoch_info.epoch; + stake::wait_for_warm_up( + current_epoch, + epoch_info, + &rpc_client, + &stake_config, + &genesis_config, + ¬ifier, + ); + + tps_round += 1; + } +} diff --git a/ramp-tps/src/notifier.rs b/ramp-tps/src/notifier.rs new file mode 100644 index 0000000000..7f723bea36 --- /dev/null +++ b/ramp-tps/src/notifier.rs @@ -0,0 +1,92 @@ +use log::*; +use reqwest::{blocking::Client, StatusCode}; +use serde_json::json; +use std::{env, thread::sleep, time::Duration}; + +/// For each notification +/// 1) Log an info level message +/// 2) Notify Slack channel if Slack is configured +/// 3) Notify Discord channel if Discord is configured +pub struct Notifier { + buffer: Vec, + client: Client, + discord_webhook: Option, + slack_webhook: Option, +} + +impl Notifier { + pub fn new() -> Self { + let discord_webhook = env::var("DISCORD_WEBHOOK") + .map_err(|_| { + warn!("Discord notifications disabled"); + }) + .ok(); + let slack_webhook = env::var("SLACK_WEBHOOK") + .map_err(|_| { + warn!("Slack notifications disabled"); + }) + .ok(); + Notifier { + buffer: Vec::new(), + client: Client::new(), + discord_webhook, + slack_webhook, + } + } + + fn send(&self, msg: &str) { + if let Some(webhook) = &self.discord_webhook { + for line in msg.split('\n') { + // Discord rate limiting is aggressive, limit to 1 message a second to keep + // it from getting mad at us... + sleep(Duration::from_millis(1000)); + + info!("Sending {}", line); + let data = json!({ "content": line }); + + loop { + let response = self.client.post(webhook).json(&data).send(); + + if let Err(err) = response { + warn!("Failed to send Discord message: \"{}\": {:?}", line, err); + break; + } else if let Ok(response) = response { + info!("response status: {}", response.status()); + if response.status() == StatusCode::TOO_MANY_REQUESTS { + warn!("rate limited!..."); + warn!("response text: {:?}", response.text()); + std::thread::sleep(Duration::from_secs(2)); + } else { + break; + } + } + } + } + } + + if let Some(webhook) = &self.slack_webhook { + let data = json!({ "text": msg }); + if let Err(err) = self.client.post(webhook).json(&data).send() { + warn!("Failed to send Slack message: {:?}", err); + } + } + } + + pub fn buffer(&mut self, msg: String) { + self.buffer.push(msg); + } + + pub fn buffer_vec(&mut self, mut msgs: Vec) { + self.buffer.append(&mut msgs); + } + + pub fn flush(&mut self) { + self.notify(&self.buffer.join("\n")); + self.buffer.clear(); + } + + pub fn notify(&self, msg: &str) { + info!("{}", msg); + self.send(msg); + } +} diff --git a/ramp-tps/src/results.rs b/ramp-tps/src/results.rs new file mode 100644 index 0000000000..d201b4417d --- /dev/null +++ b/ramp-tps/src/results.rs @@ -0,0 +1,92 @@ +use log::*; +use serde::{Serialize, Serializer}; +use std::{ + collections::{BTreeMap, HashMap}, + error::Error, + fs::File, + io::ErrorKind, + process::exit, + str::FromStr, +}; + +const ROUND_KEY_PREFIX: &str = "round-"; + +#[derive(Eq, PartialEq, Ord, PartialOrd)] +struct Round(u32); + +impl Serialize for Round { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&format!("{}{}", ROUND_KEY_PREFIX, self.0)) + } +} + +pub struct Results { + file_path: String, + results: BTreeMap>, +} + +impl Results { + /// Keep any result entries which occurred before the starting round. + pub fn new( + file_path: String, + mut previous_results: HashMap>, + start_round: u32, + ) -> Self { + let mut results: BTreeMap> = BTreeMap::new(); + previous_results.drain().for_each(|(key, value)| { + if key.starts_with(ROUND_KEY_PREFIX) { + let round_str = &key[ROUND_KEY_PREFIX.len()..]; + dbg!(round_str); + if let Ok(round) = u32::from_str(round_str) { + if round < start_round { + results.insert(Round(round), value); + } + } + } + }); + + Results { file_path, results } + } + + // Reads the previous results file and if it exists, parses the contents + pub fn read(file_path: &str) -> HashMap> { + match File::open(file_path) { + Ok(file) => serde_yaml::from_reader(&file) + .map_err(|err| { + warn!("Failed to recover previous results: {}", err); + }) + .unwrap_or_default(), + Err(err) => match err.kind() { + ErrorKind::NotFound => { + // Check that we can write to this file + File::create(file_path).unwrap_or_else(|err| { + eprintln!( + "Error: Unable to create --results-file {}: {}", + file_path, err + ); + exit(1); + }); + HashMap::new() + } + err => { + eprintln!( + "Error: Unable to open --results-file {}: {:?}", + file_path, err + ); + exit(1); + } + }, + } + } + + /// Record the remaining validators after each TPS round + pub fn record(&mut self, round: u32, validators: Vec) -> Result<(), Box> { + self.results.insert(Round(round), validators); + let file = File::create(&self.file_path)?; + serde_yaml::to_writer(&file, &self.results)?; + Ok(()) + } +} diff --git a/ramp-tps/src/stake.rs b/ramp-tps/src/stake.rs new file mode 100644 index 0000000000..8289035baa --- /dev/null +++ b/ramp-tps/src/stake.rs @@ -0,0 +1,143 @@ +use crate::{notifier, utils}; +use log::*; +use solana_client::{rpc_client::RpcClient, rpc_response::RpcEpochInfo}; +use solana_sdk::{ + clock::Epoch, + genesis_config::GenesisConfig, + stake_history::StakeHistoryEntry, + sysvar::{ + stake_history::{self, StakeHistory}, + Sysvar, + }, +}; +use solana_stake_program::config::Config as StakeConfig; +use std::{thread::sleep, time::Duration}; + +fn calculate_stake_warmup(mut stake_entry: StakeHistoryEntry, stake_config: &StakeConfig) -> u64 { + let mut epochs = 0; + loop { + let percent_warming_up = + stake_entry.activating as f64 / stake_entry.effective.max(1) as f64; + let percent_cooling_down = + stake_entry.deactivating as f64 / stake_entry.effective.max(1) as f64; + debug!( + "epoch +{}: stake warming up {:.1}%, cooling down {:.1}% ", + epochs, + percent_warming_up * 100., + percent_cooling_down * 100. + ); + + if (percent_warming_up < 0.05) && (percent_cooling_down < 0.05) { + break; + } + let warmup_cooldown_rate = stake_config.warmup_cooldown_rate; + let max_warmup_stake = (stake_entry.effective as f64 * warmup_cooldown_rate) as u64; + let warmup_stake = stake_entry.activating.min(max_warmup_stake); + stake_entry.effective += warmup_stake; + stake_entry.activating -= warmup_stake; + + let max_cooldown_stake = (stake_entry.effective as f64 * warmup_cooldown_rate) as u64; + let cooldown_stake = stake_entry.deactivating.min(max_cooldown_stake); + stake_entry.effective -= cooldown_stake; + stake_entry.deactivating -= cooldown_stake; + debug!( + "epoch +{}: stake warming up {}, cooling down {}", + epochs, warmup_stake, cooldown_stake + ); + + epochs += 1; + } + info!("95% stake warmup will take {} epochs", epochs); + epochs +} + +fn stake_history_entry(epoch: Epoch, rpc_client: &RpcClient) -> Option { + let stake_history_account = rpc_client.get_account(&stake_history::id()).ok()?; + let stake_history = StakeHistory::from_account(&stake_history_account)?; + stake_history.get(&epoch).cloned() +} + +/// Wait until stake warms up and return the current epoch +pub fn wait_for_warm_up( + activation_epoch: Epoch, + mut epoch_info: RpcEpochInfo, + rpc_client: &RpcClient, + stake_config: &StakeConfig, + genesis_config: &GenesisConfig, + notifier: ¬ifier::Notifier, +) { + // Sleep until activation_epoch has finished + if epoch_info.epoch <= activation_epoch { + notifier.notify(&format!( + "Waiting until epoch {} is finished...", + activation_epoch + )); + + utils::sleep_until_epoch( + rpc_client, + notifier, + genesis_config, + epoch_info.absolute_slot, + activation_epoch + 1, + ); + } + + loop { + epoch_info = rpc_client.get_epoch_info().unwrap_or_else(|err| { + utils::bail( + notifier, + &format!("Error: get_epoch_info RPC call failed: {}", err), + ); + }); + + let current_slot = epoch_info.absolute_slot; + info!("Current slot is {}", current_slot); + + let current_epoch = epoch_info.epoch; + let latest_epoch = current_epoch - 1; + debug!( + "Fetching stake history entry for epoch: {}...", + latest_epoch + ); + + if let Some(stake_entry) = stake_history_entry(latest_epoch, &rpc_client) { + debug!("Stake history entry: {:?}", &stake_entry); + let warm_up_epochs = calculate_stake_warmup(stake_entry, stake_config); + let stake_warmed_up_epoch = latest_epoch + warm_up_epochs; + if stake_warmed_up_epoch > current_epoch { + notifier.notify(&format!( + "Waiting until epoch {} for stake to warmup (current epoch is {})...", + stake_warmed_up_epoch, current_epoch + )); + utils::sleep_until_epoch( + rpc_client, + notifier, + genesis_config, + current_slot, + stake_warmed_up_epoch, + ); + } else { + break; + } + } else { + warn!( + "Failed to fetch stake history entry for epoch: {}", + latest_epoch + ); + sleep(Duration::from_secs(5)); + } + + let latest_slot = rpc_client.get_slot().unwrap_or_else(|err| { + utils::bail( + notifier, + &format!("Error: get_slot RPC call 3 failed: {}", err), + ); + }); + if current_slot == latest_slot { + utils::bail( + notifier, + &format!("Error: Slot did not advance from {}", current_slot), + ); + } + } +} diff --git a/ramp-tps/src/tps.rs b/ramp-tps/src/tps.rs new file mode 100644 index 0000000000..316baac7fb --- /dev/null +++ b/ramp-tps/src/tps.rs @@ -0,0 +1,72 @@ +use crate::notifier::Notifier; +use log::*; +use solana_client::perf_utils::{sample_txs, SampleStats}; +use solana_client::thin_client::ThinClient; +use solana_sdk::timing::duration_as_s; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{Builder, JoinHandle}, +}; + +pub struct Sampler { + client: Arc, + exit_signal: Arc, + maxes: Arc>>, + handle: Option>, +} + +impl Sampler { + pub fn new(rpc_addr: &SocketAddr) -> Self { + let (_, dummy_socket) = + solana_net_utils::bind_in_range(rpc_addr.ip(), (8000, 10_000)).unwrap(); + let dummy_tpu_addr = *rpc_addr; + let client = Arc::new(ThinClient::new(*rpc_addr, dummy_tpu_addr, dummy_socket)); + + Self { + client, + exit_signal: Arc::new(AtomicBool::new(false)), + maxes: Arc::new(RwLock::new(Vec::new())), + handle: None, + } + } + + // Setup a thread to sample every period and + // collect the max transaction rate and total tx count seen + pub fn start_sampling_thread(&mut self) { + // Reset + self.exit_signal.store(false, Ordering::Relaxed); + self.maxes.write().unwrap().clear(); + + let sample_period = 5; // in seconds + info!("Sampling TPS every {} seconds...", sample_period); + let exit_signal = self.exit_signal.clone(); + let maxes = self.maxes.clone(); + let client = self.client.clone(); + let handle = Builder::new() + .name("solana-client-sample".to_string()) + .spawn(move || { + sample_txs(&exit_signal, &maxes, sample_period, &client); + }) + .unwrap(); + + self.handle = Some(handle); + } + + pub fn stop_sampling_thread(&mut self) { + self.exit_signal.store(true, Ordering::Relaxed); + self.handle.take().unwrap().join().unwrap(); + } + + pub fn report_results(&self, notifier: &Notifier) { + let SampleStats { tps, elapsed, txs } = self.maxes.read().unwrap()[0].1; + let avg_tps = txs as f32 / duration_as_s(&elapsed); + notifier.notify(&format!( + "Highest TPS: {:.0}, Average TPS: {:.0}", + tps, avg_tps + )); + } +} diff --git a/ramp-tps/src/utils.rs b/ramp-tps/src/utils.rs new file mode 100644 index 0000000000..dbc97a693b --- /dev/null +++ b/ramp-tps/src/utils.rs @@ -0,0 +1,168 @@ +use crate::notifier::Notifier; +use bzip2::bufread::BzDecoder; +use log::*; +use solana_client::rpc_client::RpcClient; +use solana_net_utils::parse_host; +use solana_sdk::{ + clock::{Epoch, Slot}, + genesis_config::GenesisConfig, + timing::duration_as_ms, +}; +use std::{ + fs::File, + io, + net::SocketAddr, + path::Path, + thread::sleep, + time::{Duration, Instant}, +}; +use tar::Archive; + +const GENESIS_ARCHIVE_NAME: &str = "genesis.tar.bz2"; + +/// Inspired by solana_local_cluster::cluster_tests +fn slots_to_secs(num_slots: u64, genesis_config: &GenesisConfig) -> u64 { + let poh_config = &genesis_config.poh_config; + let ticks_per_slot = genesis_config.ticks_per_slot; + let num_ticks_to_sleep = num_slots as f64 * ticks_per_slot as f64; + let num_ticks_per_second = (1000 / duration_as_ms(&poh_config.target_tick_duration)) as f64; + ((num_ticks_to_sleep + num_ticks_per_second - 1.0) / num_ticks_per_second) as u64 +} + +fn sleep_n_slots(num_slots: u64, genesis_config: &GenesisConfig) { + let secs = slots_to_secs(num_slots, genesis_config); + let mins = secs / 60; + let hours = mins / 60; + if hours >= 5 { + debug!("Sleeping for {} slots ({} hours)", num_slots, hours); + } else if mins >= 5 { + debug!("Sleeping for {} slots ({} minutes)", num_slots, mins); + } else if secs > 0 { + debug!("Sleeping for {} slots ({} seconds)", num_slots, secs); + } + sleep(Duration::from_secs(secs)); +} + +/// Sleep until the target epoch has started or bail if cluster is stuck +pub fn sleep_until_epoch( + rpc_client: &RpcClient, + notifier: &Notifier, + genesis_config: &GenesisConfig, + mut current_slot: Slot, + target_epoch: Epoch, +) { + let target_slot = genesis_config + .epoch_schedule + .get_first_slot_in_epoch(target_epoch); + info!( + "sleep_until_epoch() target_epoch: {}, target_slot: {}", + target_epoch, target_slot + ); + + loop { + let sleep_slots = target_slot.saturating_sub(current_slot); + if sleep_slots == 0 { + break; + } + + sleep_n_slots(sleep_slots.max(50), genesis_config); + let latest_slot = rpc_client.get_slot().unwrap_or_else(|err| { + bail( + notifier, + &format!("Error: Could not fetch current slot: {}", err), + ); + }); + + if current_slot == latest_slot { + bail( + notifier, + &format!("Error: Slot did not advance from {}", current_slot), + ); + } else { + current_slot = latest_slot; + } + } +} + +pub fn is_host(string: String) -> Result<(), String> { + parse_host(&string)?; + Ok(()) +} + +pub fn bail(notifier: &crate::notifier::Notifier, msg: &str) -> ! { + notifier.notify(msg); + sleep(Duration::from_secs(30)); // Wait for notifications to send + std::process::exit(1); +} + +/// Inspired by solana_validator::download_tar_bz2 +pub fn download_genesis(rpc_addr: &SocketAddr, download_path: &Path) -> Result<(), String> { + let archive_name = GENESIS_ARCHIVE_NAME; + let archive_path = download_path.join(archive_name); + let url = format!("http://{}/{}", rpc_addr, archive_name); + let download_start = Instant::now(); + debug!("Downloading genesis ({})...", url); + + let client = reqwest::blocking::Client::new(); + let mut response = client + .get(url.as_str()) + .send() + .and_then(|response| response.error_for_status()) + .map_err(|err| format!("Unable to get: {:?}", err))?; + let download_size = { + response + .headers() + .get(reqwest::header::CONTENT_LENGTH) + .and_then(|content_length| content_length.to_str().ok()) + .and_then(|content_length| content_length.parse().ok()) + .unwrap_or(0) + }; + + let mut file = File::create(&archive_path) + .map_err(|err| format!("Unable to create {:?}: {:?}", archive_path, err))?; + io::copy(&mut response, &mut file) + .map_err(|err| format!("Unable to write {:?}: {:?}", archive_path, err))?; + + debug!( + "Downloaded genesis ({} bytes) in {:?}", + download_size, + Instant::now().duration_since(download_start), + ); + + debug!("Extracting genesis ({})...", archive_name); + let extract_start = Instant::now(); + let tar_bz2 = File::open(&archive_path) + .map_err(|err| format!("Unable to open {}: {:?}", archive_name, err))?; + let tar = BzDecoder::new(io::BufReader::new(tar_bz2)); + let mut archive = Archive::new(tar); + archive + .unpack(download_path) + .map_err(|err| format!("Unable to unpack {}: {:?}", archive_name, err))?; + debug!( + "Extracted {} in {:?}", + archive_name, + Instant::now().duration_since(extract_start) + ); + + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_slots_to_secs() { + let mut genesis_config = GenesisConfig::default(); + genesis_config.poh_config.target_tick_duration = Duration::from_millis(500); + + genesis_config.ticks_per_slot = 10; + assert_eq!(slots_to_secs(2, &genesis_config), 10); + + genesis_config.ticks_per_slot = 1; + assert_eq!(slots_to_secs(1, &genesis_config), 1); + + genesis_config.ticks_per_slot = 0; + assert_eq!(slots_to_secs(10, &genesis_config), 0); + } +} diff --git a/ramp-tps/src/voters.rs b/ramp-tps/src/voters.rs new file mode 100644 index 0000000000..273cfda386 --- /dev/null +++ b/ramp-tps/src/voters.rs @@ -0,0 +1,247 @@ +use crate::notifier::Notifier; +use crate::utils; +use log::*; +use solana_client::{client_error::Result as ClientResult, rpc_client::RpcClient}; +use solana_sdk::{ + clock::Slot, + epoch_schedule::EpochSchedule, + native_token::sol_to_lamports, + pubkey::Pubkey, + signature::{Keypair, Signer}, + transaction::Transaction, +}; +use solana_stake_program::{ + stake_instruction, + stake_state::{Authorized as StakeAuthorized, Lockup}, +}; +use std::{ + collections::{HashMap, HashSet}, + rc::Rc, + str::FromStr, + thread::sleep, + time::Duration, +}; + +// The percentage of leader slots that validators complete in order to receive the stake +// reward at the end of a TPS round. +const MIN_LEADER_SLOT_PCT: f64 = 80.0; + +#[derive(Default)] +pub struct LeaderRecord { + total_slots: u64, + missed_slots: u64, +} + +impl LeaderRecord { + pub fn completed_slot_pct(&self) -> f64 { + if self.total_slots == 0 { + 0f64 + } else { + let completed_slots = self.total_slots - self.missed_slots; + 100f64 * completed_slots as f64 / self.total_slots as f64 + } + } + + pub fn healthy(&self) -> bool { + self.completed_slot_pct() >= MIN_LEADER_SLOT_PCT + } +} + +/// Calculate the leader record for each active validator +pub fn calculate_leader_records( + rpc_client: &RpcClient, + epoch_schedule: &EpochSchedule, + start_slot: Slot, + end_slot: Slot, + notifier: &Notifier, +) -> ClientResult> { + let start_epoch = epoch_schedule.get_epoch(start_slot); + let end_epoch = epoch_schedule.get_epoch(end_slot); + let confirmed_blocks: HashSet<_> = rpc_client + .get_confirmed_blocks(start_slot, Some(end_slot))? + .into_iter() + .collect(); + + let mut leader_records = HashMap::::new(); + for epoch in start_epoch..=end_epoch { + let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch); + let start_slot = std::cmp::max(start_slot, first_slot_in_epoch); + let last_slot_in_epoch = epoch_schedule.get_last_slot_in_epoch(epoch); + let end_slot = std::cmp::min(end_slot, last_slot_in_epoch); + + rpc_client + .get_leader_schedule(Some(start_slot))? + .unwrap_or_else(|| utils::bail(notifier, "Error: Leader schedule was not found")) + .into_iter() + .map(|(pk, s)| (Pubkey::from_str(&pk).unwrap(), s)) + .for_each(|(pubkey, leader_slots)| { + let mut record = leader_records.entry(pubkey).or_default(); + for slot_index in leader_slots.iter() { + let slot = (*slot_index as u64) + first_slot_in_epoch; + if slot >= start_slot && slot <= end_slot { + record.total_slots += 1; + if !confirmed_blocks.contains(&slot) { + record.missed_slots += 1; + } + } + } + }); + } + + Ok(leader_records) +} + +pub fn fetch_active_validators(rpc_client: &RpcClient) -> HashMap { + match rpc_client.get_vote_accounts() { + Err(err) => { + warn!("Failed to get_vote_accounts(): {}", err); + HashMap::new() + } + Ok(vote_accounts) => vote_accounts + .current + .into_iter() + .filter_map(|info| { + if let (Ok(node_pubkey), Ok(vote_pubkey)) = ( + Pubkey::from_str(&info.node_pubkey), + Pubkey::from_str(&info.vote_pubkey), + ) { + Some((node_pubkey, vote_pubkey)) + } else { + None + } + }) + .collect(), + } +} + +/// Endlessly retry stake delegation until success +fn delegate_stake( + rpc_client: &RpcClient, + faucet_keypair: &Keypair, + vote_account_pubkey: &Pubkey, + sol_gift: u64, +) { + let stake_account_keypair = Keypair::new(); + info!( + "delegate_stake: stake pubkey: {}", + stake_account_keypair.pubkey() + ); + let mut retry_count = 0; + loop { + let recent_blockhash = loop { + match rpc_client.get_recent_blockhash() { + Ok(response) => break response.0, + Err(err) => { + error!("Failed to get recent blockhash: {}", err); + sleep(Duration::from_secs(5)); + } + } + }; + + let mut transaction = Transaction::new_signed_instructions( + &[faucet_keypair, &stake_account_keypair], + stake_instruction::create_account_and_delegate_stake( + &faucet_keypair.pubkey(), + &stake_account_keypair.pubkey(), + &vote_account_pubkey, + &StakeAuthorized::auto(&faucet_keypair.pubkey()), + &Lockup::default(), + sol_to_lamports(sol_gift as f64), + ), + recent_blockhash, + ); + + // Check if stake was delegated but just failed to confirm on an earlier attempt + if retry_count > 0 { + if let Ok(stake_account) = rpc_client.get_account(&stake_account_keypair.pubkey()) { + if stake_account.owner == solana_stake_program::id() { + break; + } + } + } + + if let Err(err) = rpc_client.send_and_confirm_transaction( + &mut transaction, + &[faucet_keypair, &stake_account_keypair], + ) { + error!( + "Failed to delegate stake (retries: {}): {}", + retry_count, err + ); + retry_count += 1; + sleep(Duration::from_secs(5)); + } else { + break; + } + } +} + +/// Announce validator status leader slot performance +pub fn announce_results( + starting_validators: &HashMap, + remaining_validators: &HashMap, + pubkey_to_keybase: Rc String>, + leader_records: &HashMap, + notifier: &mut Notifier, +) { + let buffer_records = |keys: Vec<&Pubkey>, notifier: &mut Notifier| { + if keys.is_empty() { + notifier.buffer("* None".to_string()); + return; + } + + let mut validators = vec![]; + for pubkey in keys { + let name = pubkey_to_keybase(pubkey); + if let Some(record) = leader_records.get(pubkey) { + validators.push(format!( + "* {} ({:.1}% leader efficiency)", + name, + record.completed_slot_pct() + )); + } + } + validators.sort(); + notifier.buffer_vec(validators); + }; + + let healthy: Vec<_> = remaining_validators + .keys() + .filter(|k| leader_records.get(k).map(|r| r.healthy()).unwrap_or(false)) + .collect(); + + let unhealthy: Vec<_> = remaining_validators + .keys() + .filter(|k| leader_records.get(k).map(|r| !r.healthy()).unwrap_or(true)) + .collect(); + + let inactive: Vec<_> = starting_validators + .keys() + .filter(|k| !remaining_validators.contains_key(k)) + .collect(); + + notifier.buffer("Healthy Validators:".to_string()); + buffer_records(healthy, notifier); + notifier.buffer("Unhealthy Validators:".to_string()); + buffer_records(unhealthy, notifier); + notifier.buffer("Inactive Validators:".to_string()); + buffer_records(inactive, notifier); + + notifier.flush(); +} + +/// Award stake to the surviving validators by delegating stake to their vote account +pub fn award_stake( + rpc_client: &RpcClient, + faucet_keypair: &Keypair, + voters: Vec<(String, &Pubkey)>, + sol_gift: u64, + notifier: &mut Notifier, +) { + for (node_pubkey, vote_account_pubkey) in voters { + info!("Delegate {} SOL to {}", sol_gift, node_pubkey); + delegate_stake(rpc_client, faucet_keypair, vote_account_pubkey, sol_gift); + notifier.buffer(format!("Delegated {} SOL to {}", sol_gift, node_pubkey)); + } + notifier.flush(); +}