Watchtower can now emit a notification on all non-vote transactions (#9848)

This commit is contained in:
Michael Vines
2020-05-01 17:48:18 -07:00
committed by GitHub
parent ee201cfb84
commit 3f6befe012
5 changed files with 224 additions and 31 deletions

3
Cargo.lock generated
View File

@ -4041,11 +4041,14 @@ dependencies = [
"reqwest 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.46 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.46 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-clap-utils 1.0.21", "solana-clap-utils 1.0.21",
"solana-cli 1.0.21",
"solana-cli-config 1.0.21", "solana-cli-config 1.0.21",
"solana-client 1.0.21", "solana-client 1.0.21",
"solana-logger 1.0.21", "solana-logger 1.0.21",
"solana-metrics 1.0.21", "solana-metrics 1.0.21",
"solana-sdk 1.0.21", "solana-sdk 1.0.21",
"solana-transaction-status 1.0.21",
"solana-vote-program 1.0.21",
] ]
[[package]] [[package]]

View File

@ -5,6 +5,7 @@ use solana_sdk::{
transaction::Transaction, transaction::Transaction,
}; };
use solana_transaction_status::RpcTransactionStatusMeta; use solana_transaction_status::RpcTransactionStatusMeta;
use std::io;
// Pretty print a "name value" // Pretty print a "name value"
pub fn println_name_value(name: &str, value: &str) { pub fn println_name_value(name: &str, value: &str) {
@ -54,33 +55,44 @@ pub fn println_signers(
println!(); println!();
} }
pub fn println_transaction( pub fn write_transaction<W: io::Write>(
w: &mut W,
transaction: &Transaction, transaction: &Transaction,
transaction_status: &Option<RpcTransactionStatusMeta>, transaction_status: &Option<RpcTransactionStatusMeta>,
prefix: &str, prefix: &str,
) { ) -> io::Result<()> {
let message = &transaction.message; let message = &transaction.message;
println!("{}Recent Blockhash: {:?}", prefix, message.recent_blockhash); writeln!(
w,
"{}Recent Blockhash: {:?}",
prefix, message.recent_blockhash
)?;
for (signature_index, signature) in transaction.signatures.iter().enumerate() { for (signature_index, signature) in transaction.signatures.iter().enumerate() {
println!("{}Signature {}: {:?}", prefix, signature_index, signature); writeln!(
w,
"{}Signature {}: {:?}",
prefix, signature_index, signature
)?;
} }
println!("{}{:?}", prefix, message.header); writeln!(w, "{}{:?}", prefix, message.header)?;
for (account_index, account) in message.account_keys.iter().enumerate() { for (account_index, account) in message.account_keys.iter().enumerate() {
println!("{}Account {}: {:?}", prefix, account_index, account); writeln!(w, "{}Account {}: {:?}", prefix, account_index, account)?;
} }
for (instruction_index, instruction) in message.instructions.iter().enumerate() { for (instruction_index, instruction) in message.instructions.iter().enumerate() {
let program_pubkey = message.account_keys[instruction.program_id_index as usize]; let program_pubkey = message.account_keys[instruction.program_id_index as usize];
println!("{}Instruction {}", prefix, instruction_index); writeln!(w, "{}Instruction {}", prefix, instruction_index)?;
println!( writeln!(
w,
"{} Program: {} ({})", "{} Program: {} ({})",
prefix, program_pubkey, instruction.program_id_index prefix, program_pubkey, instruction.program_id_index
); )?;
for (account_index, account) in instruction.accounts.iter().enumerate() { for (account_index, account) in instruction.accounts.iter().enumerate() {
let account_pubkey = message.account_keys[*account as usize]; let account_pubkey = message.account_keys[*account as usize];
println!( writeln!(
w,
"{} Account {}: {} ({})", "{} Account {}: {} ({})",
prefix, account_index, account_pubkey, account prefix, account_index, account_pubkey, account
); )?;
} }
let mut raw = true; let mut raw = true;
@ -89,7 +101,7 @@ pub fn println_transaction(
solana_vote_program::vote_instruction::VoteInstruction, solana_vote_program::vote_instruction::VoteInstruction,
>(&instruction.data) >(&instruction.data)
{ {
println!("{} {:?}", prefix, vote_instruction); writeln!(w, "{} {:?}", prefix, vote_instruction)?;
raw = false; raw = false;
} }
} else if program_pubkey == solana_stake_program::id() { } else if program_pubkey == solana_stake_program::id() {
@ -97,7 +109,7 @@ pub fn println_transaction(
solana_stake_program::stake_instruction::StakeInstruction, solana_stake_program::stake_instruction::StakeInstruction,
>(&instruction.data) >(&instruction.data)
{ {
println!("{} {:?}", prefix, stake_instruction); writeln!(w, "{} {:?}", prefix, stake_instruction)?;
raw = false; raw = false;
} }
} else if program_pubkey == solana_sdk::system_program::id() { } else if program_pubkey == solana_sdk::system_program::id() {
@ -105,26 +117,27 @@ pub fn println_transaction(
solana_sdk::system_instruction::SystemInstruction, solana_sdk::system_instruction::SystemInstruction,
>(&instruction.data) >(&instruction.data)
{ {
println!("{} {:?}", prefix, system_instruction); writeln!(w, "{} {:?}", prefix, system_instruction)?;
raw = false; raw = false;
} }
} }
if raw { if raw {
println!("{} Data: {:?}", prefix, instruction.data); writeln!(w, "{} Data: {:?}", prefix, instruction.data)?;
} }
} }
if let Some(transaction_status) = transaction_status { if let Some(transaction_status) = transaction_status {
println!( writeln!(
w,
"{}Status: {}", "{}Status: {}",
prefix, prefix,
match &transaction_status.status { match &transaction_status.status {
Ok(_) => "Ok".into(), Ok(_) => "Ok".into(),
Err(err) => err.to_string(), Err(err) => err.to_string(),
} }
); )?;
println!("{} Fee: {}", prefix, transaction_status.fee); writeln!(w, "{} Fee: {}", prefix, transaction_status.fee)?;
assert_eq!( assert_eq!(
transaction_status.pre_balances.len(), transaction_status.pre_balances.len(),
transaction_status.post_balances.len() transaction_status.post_balances.len()
@ -136,23 +149,40 @@ pub fn println_transaction(
.enumerate() .enumerate()
{ {
if pre == post { if pre == post {
println!( writeln!(
w,
"{} Account {} balance: {} SOL", "{} Account {} balance: {} SOL",
prefix, prefix,
i, i,
lamports_to_sol(*pre) lamports_to_sol(*pre)
); )?;
} else { } else {
println!( writeln!(
w,
"{} Account {} balance: {} SOL -> {} SOL", "{} Account {} balance: {} SOL -> {} SOL",
prefix, prefix,
i, i,
lamports_to_sol(*pre), lamports_to_sol(*pre),
lamports_to_sol(*post) lamports_to_sol(*post)
); )?;
} }
} }
} else { } else {
println!("{}Status: Unavailable", prefix); writeln!(w, "{}Status: Unavailable", prefix)?;
}
Ok(())
}
pub fn println_transaction(
transaction: &Transaction,
transaction_status: &Option<RpcTransactionStatusMeta>,
prefix: &str,
) {
let mut w = Vec::new();
if write_transaction(&mut w, transaction, transaction_status, prefix).is_ok() {
if let Ok(s) = String::from_utf8(w) {
print!("{}", s);
}
} }
} }

View File

@ -12,14 +12,17 @@ homepage = "https://solana.com/"
clap = "2.33.0" clap = "2.33.0"
log = "0.4.8" log = "0.4.8"
humantime = "2.0.0" humantime = "2.0.0"
reqwest = { version = "0.10.1", default-features = false, features = ["blocking", "rustls-tls"] } reqwest = { version = "0.10.1", default-features = false, features = ["blocking", "json", "rustls-tls"] }
serde_json = "1.0" serde_json = "1.0"
solana-clap-utils = { path = "../clap-utils", version = "1.0.21" } solana-clap-utils = { path = "../clap-utils", version = "1.0.21" }
solana-cli-config = { path = "../cli-config", version = "1.0.21" } solana-cli-config = { path = "../cli-config", version = "1.0.21" }
solana-cli = { path = "../cli", version = "1.0.21" }
solana-client = { path = "../client", version = "1.0.21" } solana-client = { path = "../client", version = "1.0.21" }
solana-logger = { path = "../logger", version = "1.0.21" } solana-logger = { path = "../logger", version = "1.0.21" }
solana-metrics = { path = "../metrics", version = "1.0.21" } solana-metrics = { path = "../metrics", version = "1.0.21" }
solana-sdk = { path = "../sdk", version = "1.0.21" } solana-sdk = { path = "../sdk", version = "1.0.21" }
solana-transaction-status = { path = "../transaction-status", version = "1.0.21" }
solana-vote-program = { path = "../programs/vote", version = "1.0.21" }
[[bin]] [[bin]]
name = "solana-watchtower" name = "solana-watchtower"

View File

@ -13,7 +13,12 @@ use solana_client::{
client_error::Result as ClientResult, rpc_client::RpcClient, rpc_response::RpcVoteAccountStatus, client_error::Result as ClientResult, rpc_client::RpcClient, rpc_response::RpcVoteAccountStatus,
}; };
use solana_metrics::{datapoint_error, datapoint_info}; use solana_metrics::{datapoint_error, datapoint_info};
use solana_sdk::{hash::Hash, native_token::lamports_to_sol, pubkey::Pubkey}; use solana_sdk::{
clock::Slot, hash::Hash, native_token::lamports_to_sol, program_utils::limited_deserialize,
pubkey::Pubkey,
};
use solana_transaction_status::{ConfirmedBlock, TransactionEncoding};
use solana_vote_program::vote_instruction::VoteInstruction;
use std::{ use std::{
error, error,
str::FromStr, str::FromStr,
@ -27,6 +32,7 @@ struct Config {
validator_identity_pubkeys: Vec<String>, validator_identity_pubkeys: Vec<String>,
no_duplicate_notifications: bool, no_duplicate_notifications: bool,
monitor_active_stake: bool, monitor_active_stake: bool,
notify_on_transactions: bool,
} }
fn get_config() -> Config { fn get_config() -> Config {
@ -84,6 +90,14 @@ fn get_config() -> Config {
.takes_value(false) .takes_value(false)
.help("Alert when the current stake for the cluster drops below 80%"), .help("Alert when the current stake for the cluster drops below 80%"),
) )
.arg(
Arg::with_name("notify_on_transactions")
.long("notify-on-transactions")
.takes_value(false)
.help("Send a notification on all non-vote transactions. This can be very verbose!\
Note that the notification environment variables used by this feature all require a \
TRANSACTION_NOTIFIER_ prefix. For example: TRANSACTION_NOTIFIER_SLACK_WEBHOOK"),
)
.get_matches(); .get_matches();
let config = if let Some(config_file) = matches.value_of("config_file") { let config = if let Some(config_file) = matches.value_of("config_file") {
@ -103,6 +117,7 @@ fn get_config() -> Config {
let no_duplicate_notifications = matches.is_present("no_duplicate_notifications"); let no_duplicate_notifications = matches.is_present("no_duplicate_notifications");
let monitor_active_stake = matches.is_present("monitor_active_stake"); let monitor_active_stake = matches.is_present("monitor_active_stake");
let notify_on_transactions = matches.is_present("notify_on_transactions");
let config = Config { let config = Config {
interval, interval,
@ -110,6 +125,7 @@ fn get_config() -> Config {
validator_identity_pubkeys, validator_identity_pubkeys,
no_duplicate_notifications, no_duplicate_notifications,
monitor_active_stake, monitor_active_stake,
notify_on_transactions,
}; };
info!("RPC URL: {}", config.json_rpc_url); info!("RPC URL: {}", config.json_rpc_url);
@ -122,6 +138,133 @@ fn get_config() -> Config {
config config
} }
fn process_confirmed_block(notifier: &Notifier, slot: Slot, confirmed_block: ConfirmedBlock) {
let mut vote_transactions = 0;
for rpc_transaction in &confirmed_block.transactions {
if let Some(transaction) = rpc_transaction.transaction.decode() {
if transaction.verify().is_ok() {
let mut notify = true;
// Ignore simple Vote transactions since they are too prevalent
if transaction.message.instructions.len() == 1 {
let instruction = &transaction.message.instructions[0];
let program_pubkey =
transaction.message.account_keys[instruction.program_id_index as usize];
if program_pubkey == solana_vote_program::id() {
if let Ok(VoteInstruction::Vote(_)) =
limited_deserialize::<VoteInstruction>(&instruction.data)
{
vote_transactions += 1;
notify = false;
}
}
}
if notify {
let mut w = Vec::new();
if solana_cli::display::write_transaction(
&mut w,
&transaction,
&rpc_transaction.meta,
"",
)
.is_ok()
{
if let Ok(s) = String::from_utf8(w) {
notifier.send(&format!("```Slot: {}\n{}```", slot, s));
}
}
}
} else {
datapoint_error!(
"watchtower-sanity-failure",
("slot", slot, i64),
("err", "Transaction signature verification failed", String)
);
}
}
}
info!(
"Process slot {} with {} regular transactions (and {} votes)",
slot,
confirmed_block.transactions.len() - vote_transactions,
vote_transactions
);
}
fn load_blocks(
rpc_client: &RpcClient,
start_slot: Slot,
end_slot: Slot,
) -> ClientResult<Vec<(Slot, ConfirmedBlock)>> {
info!(
"Loading confirmed blocks between slots: {} - {}",
start_slot, end_slot
);
let slots = rpc_client.get_confirmed_blocks(start_slot, Some(end_slot))?;
let mut blocks = vec![];
for slot in slots.into_iter() {
let block =
rpc_client.get_confirmed_block_with_encoding(slot, TransactionEncoding::Binary)?;
blocks.push((slot, block));
}
Ok(blocks)
}
fn transaction_monitor(rpc_client: RpcClient) {
let notifier = Notifier::new_with_env_prefix("TRANSACTION_NOTIFIER_");
let mut start_slot = loop {
match rpc_client.get_slot() {
Ok(slot) => break slot,
Err(err) => {
warn!("Failed to get current slot: {}", err);
}
}
sleep(Duration::from_secs(1));
};
loop {
let end_slot = start_slot + 50;
info!("start_slot:{} - end_slot:{}", start_slot, end_slot);
let latest_available_slot = rpc_client.get_slot().unwrap_or_else(|err| {
info!("get_slot() failed: {}", err);
0
});
if latest_available_slot <= start_slot {
info!("Waiting for a slot greater than {}...", start_slot);
sleep(Duration::from_secs(5));
continue;
}
match load_blocks(&rpc_client, start_slot + 1, end_slot) {
Ok(blocks) => {
info!("Loaded {} blocks", blocks.len());
if blocks.is_empty() && end_slot < latest_available_slot {
start_slot = end_slot;
} else {
for (slot, block) in blocks.into_iter() {
process_confirmed_block(&notifier, slot, block);
start_slot = slot;
}
}
}
Err(err) => {
info!(
"failed to get blocks in range ({},{}): {}",
start_slot, end_slot, err
);
sleep(Duration::from_secs(1));
}
}
}
}
fn get_cluster_info(rpc_client: &RpcClient) -> ClientResult<(u64, Hash, RpcVoteAccountStatus)> { fn get_cluster_info(rpc_client: &RpcClient) -> ClientResult<(u64, Hash, RpcVoteAccountStatus)> {
let transaction_count = rpc_client.get_transaction_count()?; let transaction_count = rpc_client.get_transaction_count()?;
let recent_blockhash = rpc_client.get_recent_blockhash()?.0; let recent_blockhash = rpc_client.get_recent_blockhash()?.0;
@ -135,8 +278,14 @@ fn main() -> Result<(), Box<dyn error::Error>> {
solana_logger::setup_with_default("solana=info"); solana_logger::setup_with_default("solana=info");
solana_metrics::set_panic_hook("watchtower"); solana_metrics::set_panic_hook("watchtower");
let rpc_client = RpcClient::new(config.json_rpc_url); let _notify_thread = if config.notify_on_transactions {
let rpc_client = RpcClient::new(config.json_rpc_url.clone());
Some(std::thread::spawn(move || transaction_monitor(rpc_client)))
} else {
None
};
let rpc_client = RpcClient::new(config.json_rpc_url.clone());
let notifier = Notifier::new(); let notifier = Notifier::new();
let mut last_transaction_count = 0; let mut last_transaction_count = 0;
let mut last_recent_blockhash = Hash::default(); let mut last_recent_blockhash = Hash::default();

View File

@ -66,19 +66,27 @@ pub struct Notifier {
impl Notifier { impl Notifier {
pub fn new() -> Self { pub fn new() -> Self {
let discord_webhook = env::var("DISCORD_WEBHOOK") Self::new_with_env_prefix("")
}
pub fn new_with_env_prefix(env_prefix: &str) -> Self {
info!("Initializing {}Notifier", env_prefix);
let discord_webhook = env::var(format!("{}DISCORD_WEBHOOK", env_prefix))
.map_err(|_| { .map_err(|_| {
info!("Discord notifications disabled"); info!("Discord notifications disabled");
}) })
.ok(); .ok();
let slack_webhook = env::var("SLACK_WEBHOOK") let slack_webhook = env::var(format!("{}SLACK_WEBHOOK", env_prefix))
.map_err(|_| { .map_err(|_| {
info!("Slack notifications disabled"); info!("Slack notifications disabled");
}) })
.ok(); .ok();
let telegram_webhook = if let (Ok(bot_token), Ok(chat_id)) =
(env::var("TELEGRAM_BOT_TOKEN"), env::var("TELEGRAM_CHAT_ID")) let telegram_webhook = if let (Ok(bot_token), Ok(chat_id)) = (
{ env::var(format!("{}TELEGRAM_BOT_TOKEN", env_prefix)),
env::var(format!("{}TELEGRAM_CHAT_ID", env_prefix)),
) {
Some(TelegramWebHook { bot_token, chat_id }) Some(TelegramWebHook { bot_token, chat_id })
} else { } else {
info!("Telegram notifications disabled"); info!("Telegram notifications disabled");