From 3f6befe012512772e9ddfe413f2d3862ba94e645 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 1 May 2020 17:48:18 -0700 Subject: [PATCH] Watchtower can now emit a notification on all non-vote transactions (#9848) --- Cargo.lock | 3 + cli/src/display.rs | 76 ++++++++++++------ watchtower/Cargo.toml | 5 +- watchtower/src/main.rs | 153 ++++++++++++++++++++++++++++++++++++- watchtower/src/notifier.rs | 18 +++-- 5 files changed, 224 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc535a36f6..9ec0a4dbc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4041,11 +4041,14 @@ dependencies = [ "reqwest 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.46 (registry+https://github.com/rust-lang/crates.io-index)", "solana-clap-utils 1.0.21", + "solana-cli 1.0.21", "solana-cli-config 1.0.21", "solana-client 1.0.21", "solana-logger 1.0.21", "solana-metrics 1.0.21", "solana-sdk 1.0.21", + "solana-transaction-status 1.0.21", + "solana-vote-program 1.0.21", ] [[package]] diff --git a/cli/src/display.rs b/cli/src/display.rs index 08b1b9dd3e..5d46ae7112 100644 --- a/cli/src/display.rs +++ b/cli/src/display.rs @@ -5,6 +5,7 @@ use solana_sdk::{ transaction::Transaction, }; use solana_transaction_status::RpcTransactionStatusMeta; +use std::io; // Pretty print a "name value" pub fn println_name_value(name: &str, value: &str) { @@ -54,33 +55,44 @@ pub fn println_signers( println!(); } -pub fn println_transaction( +pub fn write_transaction( + w: &mut W, transaction: &Transaction, transaction_status: &Option, prefix: &str, -) { +) -> io::Result<()> { let message = &transaction.message; - println!("{}Recent Blockhash: {:?}", prefix, message.recent_blockhash); + writeln!( + w, + "{}Recent Blockhash: {:?}", + prefix, message.recent_blockhash + )?; for (signature_index, signature) in transaction.signatures.iter().enumerate() { - println!("{}Signature {}: {:?}", prefix, signature_index, signature); + writeln!( + w, + "{}Signature {}: {:?}", + prefix, signature_index, signature + )?; } - println!("{}{:?}", prefix, message.header); + writeln!(w, "{}{:?}", prefix, message.header)?; for (account_index, account) in message.account_keys.iter().enumerate() { - println!("{}Account {}: {:?}", prefix, account_index, account); + writeln!(w, "{}Account {}: {:?}", prefix, account_index, account)?; } for (instruction_index, instruction) in message.instructions.iter().enumerate() { let program_pubkey = message.account_keys[instruction.program_id_index as usize]; - println!("{}Instruction {}", prefix, instruction_index); - println!( + writeln!(w, "{}Instruction {}", prefix, instruction_index)?; + writeln!( + w, "{} Program: {} ({})", prefix, program_pubkey, instruction.program_id_index - ); + )?; for (account_index, account) in instruction.accounts.iter().enumerate() { let account_pubkey = message.account_keys[*account as usize]; - println!( + writeln!( + w, "{} Account {}: {} ({})", prefix, account_index, account_pubkey, account - ); + )?; } let mut raw = true; @@ -89,7 +101,7 @@ pub fn println_transaction( solana_vote_program::vote_instruction::VoteInstruction, >(&instruction.data) { - println!("{} {:?}", prefix, vote_instruction); + writeln!(w, "{} {:?}", prefix, vote_instruction)?; raw = false; } } else if program_pubkey == solana_stake_program::id() { @@ -97,7 +109,7 @@ pub fn println_transaction( solana_stake_program::stake_instruction::StakeInstruction, >(&instruction.data) { - println!("{} {:?}", prefix, stake_instruction); + writeln!(w, "{} {:?}", prefix, stake_instruction)?; raw = false; } } else if program_pubkey == solana_sdk::system_program::id() { @@ -105,26 +117,27 @@ pub fn println_transaction( solana_sdk::system_instruction::SystemInstruction, >(&instruction.data) { - println!("{} {:?}", prefix, system_instruction); + writeln!(w, "{} {:?}", prefix, system_instruction)?; raw = false; } } if raw { - println!("{} Data: {:?}", prefix, instruction.data); + writeln!(w, "{} Data: {:?}", prefix, instruction.data)?; } } if let Some(transaction_status) = transaction_status { - println!( + writeln!( + w, "{}Status: {}", prefix, match &transaction_status.status { Ok(_) => "Ok".into(), Err(err) => err.to_string(), } - ); - println!("{} Fee: {}", prefix, transaction_status.fee); + )?; + writeln!(w, "{} Fee: {}", prefix, transaction_status.fee)?; assert_eq!( transaction_status.pre_balances.len(), transaction_status.post_balances.len() @@ -136,23 +149,40 @@ pub fn println_transaction( .enumerate() { if pre == post { - println!( + writeln!( + w, "{} Account {} balance: {} SOL", prefix, i, lamports_to_sol(*pre) - ); + )?; } else { - println!( + writeln!( + w, "{} Account {} balance: {} SOL -> {} SOL", prefix, i, lamports_to_sol(*pre), lamports_to_sol(*post) - ); + )?; } } } else { - println!("{}Status: Unavailable", prefix); + writeln!(w, "{}Status: Unavailable", prefix)?; + } + + Ok(()) +} + +pub fn println_transaction( + transaction: &Transaction, + transaction_status: &Option, + prefix: &str, +) { + let mut w = Vec::new(); + if write_transaction(&mut w, transaction, transaction_status, prefix).is_ok() { + if let Ok(s) = String::from_utf8(w) { + print!("{}", s); + } } } diff --git a/watchtower/Cargo.toml b/watchtower/Cargo.toml index b9c018bd0d..327d77fcfa 100644 --- a/watchtower/Cargo.toml +++ b/watchtower/Cargo.toml @@ -12,14 +12,17 @@ homepage = "https://solana.com/" clap = "2.33.0" log = "0.4.8" humantime = "2.0.0" -reqwest = { version = "0.10.1", default-features = false, features = ["blocking", "rustls-tls"] } +reqwest = { version = "0.10.1", default-features = false, features = ["blocking", "json", "rustls-tls"] } serde_json = "1.0" solana-clap-utils = { path = "../clap-utils", version = "1.0.21" } solana-cli-config = { path = "../cli-config", version = "1.0.21" } +solana-cli = { path = "../cli", version = "1.0.21" } solana-client = { path = "../client", version = "1.0.21" } solana-logger = { path = "../logger", version = "1.0.21" } solana-metrics = { path = "../metrics", version = "1.0.21" } solana-sdk = { path = "../sdk", version = "1.0.21" } +solana-transaction-status = { path = "../transaction-status", version = "1.0.21" } +solana-vote-program = { path = "../programs/vote", version = "1.0.21" } [[bin]] name = "solana-watchtower" diff --git a/watchtower/src/main.rs b/watchtower/src/main.rs index ef7be6c1b8..22380ef351 100644 --- a/watchtower/src/main.rs +++ b/watchtower/src/main.rs @@ -13,7 +13,12 @@ use solana_client::{ client_error::Result as ClientResult, rpc_client::RpcClient, rpc_response::RpcVoteAccountStatus, }; use solana_metrics::{datapoint_error, datapoint_info}; -use solana_sdk::{hash::Hash, native_token::lamports_to_sol, pubkey::Pubkey}; +use solana_sdk::{ + clock::Slot, hash::Hash, native_token::lamports_to_sol, program_utils::limited_deserialize, + pubkey::Pubkey, +}; +use solana_transaction_status::{ConfirmedBlock, TransactionEncoding}; +use solana_vote_program::vote_instruction::VoteInstruction; use std::{ error, str::FromStr, @@ -27,6 +32,7 @@ struct Config { validator_identity_pubkeys: Vec, no_duplicate_notifications: bool, monitor_active_stake: bool, + notify_on_transactions: bool, } fn get_config() -> Config { @@ -84,6 +90,14 @@ fn get_config() -> Config { .takes_value(false) .help("Alert when the current stake for the cluster drops below 80%"), ) + .arg( + Arg::with_name("notify_on_transactions") + .long("notify-on-transactions") + .takes_value(false) + .help("Send a notification on all non-vote transactions. This can be very verbose!\ + Note that the notification environment variables used by this feature all require a \ + TRANSACTION_NOTIFIER_ prefix. For example: TRANSACTION_NOTIFIER_SLACK_WEBHOOK"), + ) .get_matches(); let config = if let Some(config_file) = matches.value_of("config_file") { @@ -103,6 +117,7 @@ fn get_config() -> Config { let no_duplicate_notifications = matches.is_present("no_duplicate_notifications"); let monitor_active_stake = matches.is_present("monitor_active_stake"); + let notify_on_transactions = matches.is_present("notify_on_transactions"); let config = Config { interval, @@ -110,6 +125,7 @@ fn get_config() -> Config { validator_identity_pubkeys, no_duplicate_notifications, monitor_active_stake, + notify_on_transactions, }; info!("RPC URL: {}", config.json_rpc_url); @@ -122,6 +138,133 @@ fn get_config() -> Config { config } +fn process_confirmed_block(notifier: &Notifier, slot: Slot, confirmed_block: ConfirmedBlock) { + let mut vote_transactions = 0; + + for rpc_transaction in &confirmed_block.transactions { + if let Some(transaction) = rpc_transaction.transaction.decode() { + if transaction.verify().is_ok() { + let mut notify = true; + + // Ignore simple Vote transactions since they are too prevalent + if transaction.message.instructions.len() == 1 { + let instruction = &transaction.message.instructions[0]; + let program_pubkey = + transaction.message.account_keys[instruction.program_id_index as usize]; + if program_pubkey == solana_vote_program::id() { + if let Ok(VoteInstruction::Vote(_)) = + limited_deserialize::(&instruction.data) + { + vote_transactions += 1; + notify = false; + } + } + } + + if notify { + let mut w = Vec::new(); + if solana_cli::display::write_transaction( + &mut w, + &transaction, + &rpc_transaction.meta, + "", + ) + .is_ok() + { + if let Ok(s) = String::from_utf8(w) { + notifier.send(&format!("```Slot: {}\n{}```", slot, s)); + } + } + } + } else { + datapoint_error!( + "watchtower-sanity-failure", + ("slot", slot, i64), + ("err", "Transaction signature verification failed", String) + ); + } + } + } + info!( + "Process slot {} with {} regular transactions (and {} votes)", + slot, + confirmed_block.transactions.len() - vote_transactions, + vote_transactions + ); +} + +fn load_blocks( + rpc_client: &RpcClient, + start_slot: Slot, + end_slot: Slot, +) -> ClientResult> { + info!( + "Loading confirmed blocks between slots: {} - {}", + start_slot, end_slot + ); + + let slots = rpc_client.get_confirmed_blocks(start_slot, Some(end_slot))?; + + let mut blocks = vec![]; + for slot in slots.into_iter() { + let block = + rpc_client.get_confirmed_block_with_encoding(slot, TransactionEncoding::Binary)?; + blocks.push((slot, block)); + } + Ok(blocks) +} + +fn transaction_monitor(rpc_client: RpcClient) { + let notifier = Notifier::new_with_env_prefix("TRANSACTION_NOTIFIER_"); + let mut start_slot = loop { + match rpc_client.get_slot() { + Ok(slot) => break slot, + Err(err) => { + warn!("Failed to get current slot: {}", err); + } + } + sleep(Duration::from_secs(1)); + }; + + loop { + let end_slot = start_slot + 50; + info!("start_slot:{} - end_slot:{}", start_slot, end_slot); + + let latest_available_slot = rpc_client.get_slot().unwrap_or_else(|err| { + info!("get_slot() failed: {}", err); + 0 + }); + + if latest_available_slot <= start_slot { + info!("Waiting for a slot greater than {}...", start_slot); + sleep(Duration::from_secs(5)); + continue; + } + + match load_blocks(&rpc_client, start_slot + 1, end_slot) { + Ok(blocks) => { + info!("Loaded {} blocks", blocks.len()); + + if blocks.is_empty() && end_slot < latest_available_slot { + start_slot = end_slot; + } else { + for (slot, block) in blocks.into_iter() { + process_confirmed_block(¬ifier, slot, block); + start_slot = slot; + } + } + } + Err(err) => { + info!( + "failed to get blocks in range ({},{}): {}", + start_slot, end_slot, err + ); + sleep(Duration::from_secs(1)); + } + } + } +} + fn get_cluster_info(rpc_client: &RpcClient) -> ClientResult<(u64, Hash, RpcVoteAccountStatus)> { let transaction_count = rpc_client.get_transaction_count()?; let recent_blockhash = rpc_client.get_recent_blockhash()?.0; @@ -135,8 +278,14 @@ fn main() -> Result<(), Box> { solana_logger::setup_with_default("solana=info"); solana_metrics::set_panic_hook("watchtower"); - let rpc_client = RpcClient::new(config.json_rpc_url); + let _notify_thread = if config.notify_on_transactions { + let rpc_client = RpcClient::new(config.json_rpc_url.clone()); + Some(std::thread::spawn(move || transaction_monitor(rpc_client))) + } else { + None + }; + let rpc_client = RpcClient::new(config.json_rpc_url.clone()); let notifier = Notifier::new(); let mut last_transaction_count = 0; let mut last_recent_blockhash = Hash::default(); diff --git a/watchtower/src/notifier.rs b/watchtower/src/notifier.rs index e0e414fba6..0553f464b2 100644 --- a/watchtower/src/notifier.rs +++ b/watchtower/src/notifier.rs @@ -66,19 +66,27 @@ pub struct Notifier { impl Notifier { pub fn new() -> Self { - let discord_webhook = env::var("DISCORD_WEBHOOK") + Self::new_with_env_prefix("") + } + + pub fn new_with_env_prefix(env_prefix: &str) -> Self { + info!("Initializing {}Notifier", env_prefix); + + let discord_webhook = env::var(format!("{}DISCORD_WEBHOOK", env_prefix)) .map_err(|_| { info!("Discord notifications disabled"); }) .ok(); - let slack_webhook = env::var("SLACK_WEBHOOK") + let slack_webhook = env::var(format!("{}SLACK_WEBHOOK", env_prefix)) .map_err(|_| { info!("Slack notifications disabled"); }) .ok(); - let telegram_webhook = if let (Ok(bot_token), Ok(chat_id)) = - (env::var("TELEGRAM_BOT_TOKEN"), env::var("TELEGRAM_CHAT_ID")) - { + + let telegram_webhook = if let (Ok(bot_token), Ok(chat_id)) = ( + env::var(format!("{}TELEGRAM_BOT_TOKEN", env_prefix)), + env::var(format!("{}TELEGRAM_CHAT_ID", env_prefix)), + ) { Some(TelegramWebHook { bot_token, chat_id }) } else { info!("Telegram notifications disabled");