diff --git a/Cargo.lock b/Cargo.lock index ed72fe648f..8bacb80aa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5276,6 +5276,7 @@ dependencies = [ "solana-sdk", "solana-stake-program", "solana-transaction-status", + "thiserror", ] [[package]] diff --git a/stake-o-matic/Cargo.toml b/stake-o-matic/Cargo.toml index 1814c38558..8989bb819e 100644 --- a/stake-o-matic/Cargo.toml +++ b/stake-o-matic/Cargo.toml @@ -26,6 +26,7 @@ solana-notifier = { path = "../notifier", version = "1.6.0" } solana-sdk = { path = "../sdk", version = "1.6.0" } solana-stake-program = { path = "../programs/stake", version = "1.6.0" } solana-transaction-status = { path = "../transaction-status", version = "1.6.0" } +thiserror = "1.0.21" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/stake-o-matic/src/main.rs b/stake-o-matic/src/main.rs index 0bd3df1f0c..76570f355c 100644 --- a/stake-o-matic/src/main.rs +++ b/stake-o-matic/src/main.rs @@ -40,13 +40,107 @@ use { thread::sleep, time::Duration, }, + thiserror::Error, }; mod confirmed_block_cache; mod validator_list; +mod validators_app; use confirmed_block_cache::ConfirmedBlockCache; +enum InfrastructureConcentrationAffectKind { + Destake(String), + Warn(String), +} + +#[derive(Debug)] +enum InfrastructureConcentrationAffects { + WarnAll, + DestakeListed(HashSet), + DestakeAll, +} + +impl InfrastructureConcentrationAffects { + fn destake_memo(validator_id: &Pubkey, concentration: f64, config: &Config) -> String { + format!( + "🏟️ `{}` infrastructure concentration {:.1}% is too high. Max concentration is {:.0}%. Removed ◎{}", + validator_id, + concentration, + config.max_infrastructure_concentration, + lamports_to_sol(config.baseline_stake_amount), + ) + } + fn warning_memo(validator_id: &Pubkey, concentration: f64, config: &Config) -> String { + format!( + "🗺 `{}` infrastructure concentration {:.1}% is too high. Max concentration is {:.0}%. No stake removed. Consider finding a new data center", + validator_id, + concentration, + config.max_infrastructure_concentration, + ) + } + pub fn memo( + &self, + validator_id: &Pubkey, + concentration: f64, + config: &Config, + ) -> InfrastructureConcentrationAffectKind { + match self { + Self::DestakeAll => InfrastructureConcentrationAffectKind::Destake(Self::destake_memo( + validator_id, + concentration, + config, + )), + Self::WarnAll => InfrastructureConcentrationAffectKind::Warn(Self::warning_memo( + validator_id, + concentration, + config, + )), + Self::DestakeListed(ref list) => { + if list.contains(validator_id) { + InfrastructureConcentrationAffectKind::Destake(Self::destake_memo( + validator_id, + concentration, + config, + )) + } else { + InfrastructureConcentrationAffectKind::Warn(Self::warning_memo( + validator_id, + concentration, + config, + )) + } + } + } + } +} + +#[derive(Debug, Error)] +#[error("cannot convert to InfrastructureConcentrationAffects: {0}")] +struct InfrastructureConcentrationAffectsFromStrError(String); + +impl FromStr for InfrastructureConcentrationAffects { + type Err = InfrastructureConcentrationAffectsFromStrError; + fn from_str(s: &str) -> Result { + let lower = s.to_ascii_lowercase(); + match lower.as_str() { + "warn" => Ok(Self::WarnAll), + "destake" => Ok(Self::DestakeAll), + _ => { + let file = File::open(s) + .map_err(|_| InfrastructureConcentrationAffectsFromStrError(s.to_string()))?; + let mut list: Vec = serde_yaml::from_reader(file) + .map_err(|_| InfrastructureConcentrationAffectsFromStrError(s.to_string()))?; + let list = list + .drain(..) + .filter_map(|ref s| Pubkey::from_str(s).ok()) + .collect::>(); + Ok(Self::DestakeListed(list)) + } + } + } +} + pub fn is_release_version(string: String) -> Result<(), String> { if string.starts_with('v') && semver::Version::parse(string.split_at(1).1).is_ok() { return Ok(()); @@ -116,6 +210,17 @@ struct Config { /// Base path of confirmed block cache confirmed_block_cache_path: PathBuf, + + /// Vote accounts sharing infrastructure with larger than this amount will not be staked + max_infrastructure_concentration: f64, + + /// How validators with infrastruction concentration above `max_infrastructure_concentration` + /// will be affected. Accepted values are: + /// 1) "warn" - Stake unaffected. A warning message is notified + /// 2) "destake" - Removes all validator stake + /// 3) PATH_TO_YAML - Reads a list of validator identity pubkeys from the specified YAML file + /// destaking those in the list and warning any others + infrastructure_concentration_affects: InfrastructureConcentrationAffects, } fn default_confirmed_block_cache_path() -> PathBuf { @@ -264,6 +369,37 @@ fn get_config() -> Config { .default_value(&default_confirmed_block_cache_path) .help("Base path of confirmed block cache") ) + .arg( + Arg::with_name("max_infrastructure_concentration") + .long("max-infrastructure-concentration") + .takes_value(true) + .value_name("PERCENTAGE") + .default_value("100") + .validator(is_valid_percentage) + .help("Vote accounts sharing infrastructure with larger than this amount will not be staked") + ) + .arg( + Arg::with_name("infrastructure_concentration_affects") + .long("infrastructure-concentration-affects") + .takes_value(true) + .value_name("AFFECTS") + .default_value("warn") + .validator(|ref s| { + InfrastructureConcentrationAffects::from_str(s) + .map(|_| ()) + .map_err(|e| format!("{}", e)) + }) + .help("How validators with infrastruction concentration above \ + `max_infrastructure_concentration` will be affected. \ + Accepted values are: \ + 1) warn - Stake unaffected. A warning message \ + is notified \ + 2) destake - Removes all validator stake \ + 3) PATH_TO_YAML - Reads a list of validator identity \ + pubkeys from the specified YAML file \ + destaking those in the list and warning \ + any others") + ) .get_matches(); let config = if let Some(config_file) = matches.value_of("config_file") { @@ -334,6 +470,15 @@ fn get_config() -> Config { .map(PathBuf::from) .unwrap(); + let max_infrastructure_concentration = + value_t!(matches, "max_infrastructure_concentration", f64).unwrap(); + let infrastructure_concentration_affects = value_t!( + matches, + "infrastructure_concentration_affects", + InfrastructureConcentrationAffects + ) + .unwrap(); + let config = Config { json_rpc_url, cluster, @@ -351,6 +496,8 @@ fn get_config() -> Config { min_release_version, max_old_release_version_percentage, confirmed_block_cache_path, + max_infrastructure_concentration, + infrastructure_concentration_affects, }; info!("RPC URL: {}", config.json_rpc_url); @@ -709,6 +856,131 @@ fn process_confirmations( ok } +const DATA_CENTER_ID_UNKNOWN: &str = "0-Unknown"; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct DataCenterId { + asn: u64, + location: String, +} + +impl Default for DataCenterId { + fn default() -> Self { + Self::from_str(DATA_CENTER_ID_UNKNOWN).unwrap() + } +} + +impl std::str::FromStr for DataCenterId { + type Err = String; + fn from_str(s: &str) -> Result { + let mut parts = s.splitn(2, '-'); + let asn = parts.next(); + let location = parts.next(); + if let (Some(asn), Some(location)) = (asn, location) { + let asn = asn.parse().map_err(|e| format!("{:?}", e))?; + let location = location.to_string(); + Ok(Self { asn, location }) + } else { + Err(format!("cannot construct DataCenterId from input: {}", s)) + } + } +} + +impl std::fmt::Display for DataCenterId { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}-{}", self.asn, self.location) + } +} + +#[derive(Clone, Debug, Default)] +struct DatacenterInfo { + id: DataCenterId, + stake: u64, + stake_percent: f64, + validators: Vec, +} + +impl DatacenterInfo { + pub fn new(id: DataCenterId) -> Self { + Self { + id, + ..Self::default() + } + } +} + +impl std::fmt::Display for DatacenterInfo { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "{:<30} {:>20} {:>5.2} {}", + self.id.to_string(), + self.stake, + self.stake_percent, + self.validators.len() + ) + } +} + +fn get_data_center_info() -> Result, Box> { + let token = std::env::var("VALIDATORS_APP_TOKEN")?; + let client = validators_app::Client::new(token); + let validators = client.validators(None, None)?; + let mut data_center_infos = HashMap::new(); + let mut total_stake = 0; + let mut unknown_data_center_stake: u64 = 0; + for v in validators.as_ref() { + let account = v + .account + .as_ref() + .and_then(|pubkey| Pubkey::from_str(pubkey).ok()); + let account = if let Some(account) = account { + account + } else { + warn!("No vote pubkey for: {:?}", v); + continue; + }; + + let stake = v.active_stake.unwrap_or(0); + + let data_center = v + .data_center_key + .as_deref() + .or_else(|| { + unknown_data_center_stake = unknown_data_center_stake.saturating_add(stake); + None + }) + .unwrap_or(DATA_CENTER_ID_UNKNOWN); + let data_center_id = DataCenterId::from_str(data_center) + .map_err(|e| { + unknown_data_center_stake = unknown_data_center_stake.saturating_add(stake); + e + }) + .unwrap_or_default(); + + let mut data_center_info = data_center_infos + .entry(data_center_id.clone()) + .or_insert_with(|| DatacenterInfo::new(data_center_id)); + data_center_info.stake += stake; + total_stake += stake; + data_center_info.validators.push(account); + } + + let unknown_percent = 100f64 * (unknown_data_center_stake as f64) / total_stake as f64; + if unknown_percent > 3f64 { + warn!("unknown data center percentage: {:.0}%", unknown_percent); + } + + let data_center_infos = data_center_infos + .drain() + .map(|(_, mut i)| { + i.stake_percent = 100f64 * i.stake as f64 / total_stake as f64; + i + }) + .collect(); + Ok(data_center_infos) +} + #[allow(clippy::cognitive_complexity)] // Yeah I know... fn main() -> Result<(), Box> { solana_logger::setup_with_default("solana=info"); @@ -791,10 +1063,28 @@ fn main() -> Result<(), Box> { }) .collect::>(); + let infrastructure_concentration = get_data_center_info() + .map_err(|e| { + warn!("infrastructure concentration skipped: {}", e); + e + }) + .unwrap_or_default() + .drain(..) + .filter_map(|dci| { + if dci.stake_percent > config.max_infrastructure_concentration { + Some((dci.validators, dci.stake_percent)) + } else { + None + } + }) + .flat_map(|(v, sp)| v.into_iter().map(move |v| (v, sp))) + .collect::>(); + let mut source_stake_lamports_required = 0; let mut create_stake_transactions = vec![]; let mut delegate_stake_transactions = vec![]; let mut stake_activated_in_current_epoch: HashSet = HashSet::new(); + let mut infrastructure_concentration_warnings = vec![]; for RpcVoteAccountInfo { commission, @@ -905,7 +1195,48 @@ fn main() -> Result<(), Box> { )); } - if *commission > config.max_commission { + let infrastructure_concentration_destake_memo = infrastructure_concentration + .get(&node_pubkey) + .map(|concentration| { + config.infrastructure_concentration_affects.memo( + &node_pubkey, + *concentration, + &config, + ) + }) + .and_then(|affect| match affect { + InfrastructureConcentrationAffectKind::Destake(memo) => Some(memo), + InfrastructureConcentrationAffectKind::Warn(memo) => { + infrastructure_concentration_warnings.push(memo); + None + } + }); + + if let Some(memo_base) = infrastructure_concentration_destake_memo { + // Deactivate baseline stake + delegate_stake_transactions.push(( + Transaction::new_unsigned(Message::new( + &[stake_instruction::deactivate_stake( + &baseline_stake_address, + &config.authorized_staker.pubkey(), + )], + Some(&config.authorized_staker.pubkey()), + )), + format!("{} {}", memo_base, "base stake"), + )); + + // Deactivate bonus stake + delegate_stake_transactions.push(( + Transaction::new_unsigned(Message::new( + &[stake_instruction::deactivate_stake( + &bonus_stake_address, + &config.authorized_staker.pubkey(), + )], + Some(&config.authorized_staker.pubkey()), + )), + format!("{} {}", memo_base, "bonus stake"), + )); + } else if *commission > config.max_commission { // Deactivate baseline stake delegate_stake_transactions.push(( Transaction::new_unsigned(Message::new( @@ -1175,14 +1506,22 @@ fn main() -> Result<(), Box> { } } - if !process_confirmations( + let confirmations_succeeded = process_confirmations( confirmations, if config.dry_run { None } else { Some(¬ifier) }, - ) { + ); + + for memo in &infrastructure_concentration_warnings { + if config.dry_run && !notifier.is_empty() { + notifier.send(memo) + } + } + + if !confirmations_succeeded { process::exit(1); }