diff --git a/core/src/lib.rs b/core/src/lib.rs index be50e1a890..9f1016ac16 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -48,6 +48,7 @@ pub mod rpc_pubsub; pub mod rpc_pubsub_service; pub mod rpc_service; pub mod rpc_subscriptions; +pub mod send_transaction_service; pub mod serve_repair; pub mod serve_repair_service; pub mod sigverify; diff --git a/core/src/rpc.rs b/core/src/rpc.rs index f33282602c..9a327fe5f4 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -7,6 +7,7 @@ use crate::{ non_circulating_supply::calculate_non_circulating_supply, rpc_error::RpcCustomError, rpc_health::*, + send_transaction_service::SendTransactionService, validator::ValidatorExit, }; use bincode::serialize; @@ -44,11 +45,9 @@ use solana_vote_program::vote_state::{VoteState, MAX_LOCKOUT_HISTORY}; use std::{ cmp::{max, min}, collections::{HashMap, HashSet}, - net::{SocketAddr, UdpSocket}, + net::SocketAddr, str::FromStr, sync::{Arc, RwLock}, - thread::sleep, - time::{Duration, Instant}, }; type RpcResponse = Result>; @@ -78,6 +77,7 @@ pub struct JsonRpcRequestProcessor { health: Arc, cluster_info: Arc, genesis_hash: Hash, + send_transaction_service: Arc, } impl Metadata for JsonRpcRequestProcessor {} @@ -136,6 +136,7 @@ impl JsonRpcRequestProcessor { health: Arc, cluster_info: Arc, genesis_hash: Hash, + send_transaction_service: Arc, ) -> Self { Self { config, @@ -146,6 +147,7 @@ impl JsonRpcRequestProcessor { health, cluster_info, genesis_hash, + send_transaction_service, } } @@ -707,11 +709,6 @@ impl JsonRpcRequestProcessor { } } -fn get_tpu_addr(cluster_info: &ClusterInfo) -> Result { - let contact_info = cluster_info.my_contact_info(); - Ok(contact_info.tpu) -} - fn verify_pubkey(input: String) -> Result { input .parse() @@ -1314,49 +1311,32 @@ impl RpcSol for RpcSolImpl { let faucet_addr = meta.config.faucet_addr.ok_or_else(Error::invalid_request)?; let pubkey = verify_pubkey(pubkey_str)?; - let blockhash = meta.bank(commitment)?.confirmed_last_blockhash().0; + let (blockhash, last_valid_slot) = { + let bank = meta.bank(commitment)?; + + let blockhash = bank.confirmed_last_blockhash().0; + ( + blockhash, + bank.get_blockhash_last_valid_slot(&blockhash).unwrap_or(0), + ) + }; + let transaction = request_airdrop_transaction(&faucet_addr, &pubkey, lamports, blockhash) .map_err(|err| { info!("request_airdrop_transaction failed: {:?}", err); Error::internal_error() })?; + let signature = transaction.signatures[0]; - let data = serialize(&transaction).map_err(|err| { + let wire_transaction = serialize(&transaction).map_err(|err| { info!("request_airdrop: serialize error: {:?}", err); Error::internal_error() })?; - let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let tpu_addr = get_tpu_addr(&meta.cluster_info)?; - transactions_socket - .send_to(&data, tpu_addr) - .map_err(|err| { - info!("request_airdrop: send_to error: {:?}", err); - Error::internal_error() - })?; + meta.send_transaction_service + .send(signature, wire_transaction, last_valid_slot); - let signature = transaction.signatures[0]; - let now = Instant::now(); - let mut signature_status; - let signature_timeout = match &commitment { - Some(config) if config.commitment == CommitmentLevel::Recent => 5, - _ => 30, - }; - loop { - signature_status = meta.get_signature_statuses(vec![signature], None)?.value[0] - .clone() - .filter(|result| result.satisfies_commitment(commitment.unwrap_or_default())) - .map(|x| x.status); - - if signature_status == Some(Ok(())) { - info!("airdrop signature ok"); - return Ok(signature.to_string()); - } else if now.elapsed().as_secs() > signature_timeout { - info!("airdrop signature timeout"); - return Err(Error::internal_error()); - } - sleep(Duration::from_millis(100)); - } + Ok(signature.to_string()) } fn send_transaction( @@ -1367,7 +1347,11 @@ impl RpcSol for RpcSolImpl { ) -> Result { let config = config.unwrap_or_default(); let (wire_transaction, transaction) = deserialize_bs58_transaction(data)?; - let signature = transaction.signatures[0].to_string(); + let signature = transaction.signatures[0]; + let bank = &*meta.bank(None)?; + let last_valid_slot = bank + .get_blockhash_last_valid_slot(&transaction.message.recent_blockhash) + .unwrap_or(0); if !config.skip_preflight { if transaction.verify().is_err() { @@ -1384,7 +1368,6 @@ impl RpcSol for RpcSolImpl { .into()); } - let bank = &*meta.bank(None)?; if let (Err(err), _log_output) = run_transaction_simulation(&bank, transaction) { // Note: it's possible that the transaction simulation failed but the actual // transaction would succeed, such as when a transaction depends on an earlier @@ -1398,20 +1381,9 @@ impl RpcSol for RpcSolImpl { } } - let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let tpu_addr = get_tpu_addr(&meta.cluster_info)?; - transactions_socket - .send_to(&wire_transaction, tpu_addr) - .map_err(|err| { - info!("send_transaction: send_to error: {:?}", err); - Error::internal_error() - })?; - trace!( - "send_transaction: sent {} bytes, signature={}", - wire_transaction.len(), - signature - ); - Ok(signature) + meta.send_transaction_service + .send(signature, wire_transaction, last_valid_slot); + Ok(signature.to_string()) } fn simulate_transaction( @@ -1755,10 +1727,20 @@ pub mod tests { blockstore, validator_exit, RpcHealth::stub(), - cluster_info, + cluster_info.clone(), Hash::default(), + Arc::new(SendTransactionService::new( + &cluster_info, + &bank_forks, + &exit, + )), ); + cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr( + &leader_pubkey, + &socketaddr!("127.0.0.1:1234"), + )); + let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; io.extend_with(rpc.to_delegate()); @@ -1788,15 +1770,21 @@ pub mod tests { let block_commitment_cache = Arc::new(RwLock::new( BlockCommitmentCache::default_with_blockstore(blockstore.clone()), )); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let request_processor = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), - bank_forks, + bank_forks.clone(), block_commitment_cache, blockstore, validator_exit, RpcHealth::stub(), Arc::new(ClusterInfo::default()), Hash::default(), + Arc::new(SendTransactionService::new( + &cluster_info, + &bank_forks, + &exit, + )), ); thread::spawn(move || { let blockhash = bank.confirmed_last_blockhash().0; @@ -2733,6 +2721,8 @@ pub mod tests { let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; io.extend_with(rpc.to_delegate()); + let cluster_info = Arc::new(ClusterInfo::default()); + let bank_forks = new_bank_forks().0; let meta = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), new_bank_forks().0, @@ -2740,8 +2730,13 @@ pub mod tests { blockstore, validator_exit, RpcHealth::stub(), - Arc::new(ClusterInfo::default()), + cluster_info.clone(), Hash::default(), + Arc::new(SendTransactionService::new( + &cluster_info, + &bank_forks, + &exit, + )), ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#; @@ -2769,17 +2764,23 @@ pub mod tests { let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; io.extend_with(rpc.to_delegate()); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( + ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")), + )); let meta = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), - bank_forks, + bank_forks.clone(), block_commitment_cache, blockstore, validator_exit, health.clone(), - Arc::new(ClusterInfo::new_with_invalid_keypair( - ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")), - )), + cluster_info.clone(), Hash::default(), + Arc::new(SendTransactionService::new( + &cluster_info, + &bank_forks, + &exit, + )), ); let mut bad_transaction = @@ -2843,17 +2844,6 @@ pub mod tests { ); } - #[test] - fn test_rpc_get_tpu_addr() { - let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( - ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")), - )); - assert_eq!( - get_tpu_addr(&cluster_info), - Ok(socketaddr!("127.0.0.1:1234")) - ); - } - #[test] fn test_rpc_verify_pubkey() { let pubkey = Pubkey::new_rand(); @@ -2879,7 +2869,7 @@ pub mod tests { ); } - fn new_bank_forks() -> (Arc>, Keypair, Keypair) { + pub(crate) fn new_bank_forks() -> (Arc>, Keypair, Keypair) { let GenesisConfigInfo { mut genesis_config, mint_keypair, @@ -2917,15 +2907,22 @@ pub mod tests { let block_commitment_cache = Arc::new(RwLock::new( BlockCommitmentCache::default_with_blockstore(blockstore.clone()), )); + let cluster_info = Arc::new(ClusterInfo::default()); + let bank_forks = new_bank_forks().0; let request_processor = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), - new_bank_forks().0, + bank_forks.clone(), block_commitment_cache, blockstore, validator_exit, RpcHealth::stub(), - Arc::new(ClusterInfo::default()), + cluster_info.clone(), Hash::default(), + Arc::new(SendTransactionService::new( + &cluster_info, + &bank_forks, + &exit, + )), ); assert_eq!(request_processor.validator_exit(), Ok(false)); assert_eq!(exit.load(Ordering::Relaxed), false); @@ -2942,15 +2939,22 @@ pub mod tests { )); let mut config = JsonRpcConfig::default(); config.enable_validator_exit = true; + let bank_forks = new_bank_forks().0; + let cluster_info = Arc::new(ClusterInfo::default()); let request_processor = JsonRpcRequestProcessor::new( config, - new_bank_forks().0, + bank_forks.clone(), block_commitment_cache, blockstore, validator_exit, RpcHealth::stub(), - Arc::new(ClusterInfo::default()), + cluster_info.clone(), Hash::default(), + Arc::new(SendTransactionService::new( + &cluster_info, + &bank_forks, + &exit, + )), ); assert_eq!(request_processor.validator_exit(), Ok(true)); assert_eq!(exit.load(Ordering::Relaxed), true); @@ -3027,15 +3031,21 @@ pub mod tests { let mut config = JsonRpcConfig::default(); config.enable_validator_exit = true; + let cluster_info = Arc::new(ClusterInfo::default()); let request_processor = JsonRpcRequestProcessor::new( config, - bank_forks, + bank_forks.clone(), block_commitment_cache, blockstore, validator_exit, RpcHealth::stub(), - Arc::new(ClusterInfo::default()), + cluster_info.clone(), Hash::default(), + Arc::new(SendTransactionService::new( + &cluster_info, + &bank_forks, + &exit, + )), ); assert_eq!( request_processor.get_block_commitment(0), diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 700698bd63..f316253774 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -2,7 +2,7 @@ use crate::{ cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, rpc_health::*, - validator::ValidatorExit, + send_transaction_service::SendTransactionService, validator::ValidatorExit, }; use jsonrpc_core::MetaIoHandler; use jsonrpc_http_server::{ @@ -20,7 +20,7 @@ use std::{ collections::HashSet, net::SocketAddr, path::{Path, PathBuf}, - sync::atomic::AtomicBool, + sync::atomic::{AtomicBool, Ordering}, sync::{mpsc::channel, Arc, RwLock}, thread::{self, Builder, JoinHandle}, }; @@ -249,6 +249,13 @@ impl JsonRpcService { override_health_check, )); + let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); + let send_transaction_service = Arc::new(SendTransactionService::new( + &cluster_info, + &bank_forks, + &exit_send_transaction_service, + )); + let request_processor = JsonRpcRequestProcessor::new( config, bank_forks.clone(), @@ -258,6 +265,7 @@ impl JsonRpcService { health.clone(), cluster_info, genesis_hash, + send_transaction_service, ); #[cfg(test)] @@ -304,6 +312,7 @@ impl JsonRpcService { let server = server.unwrap(); close_handle_sender.send(server.close_handle()).unwrap(); server.wait(); + exit_send_transaction_service.store(true, Ordering::Relaxed); }) .unwrap(); @@ -347,10 +356,7 @@ mod tests { }; use solana_runtime::bank::Bank; use solana_sdk::signature::Signer; - use std::{ - net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::atomic::Ordering, - }; + use std::net::{IpAddr, Ipv4Addr}; #[test] fn test_rpc_new() { diff --git a/core/src/send_transaction_service.rs b/core/src/send_transaction_service.rs new file mode 100644 index 0000000000..d67bbb3225 --- /dev/null +++ b/core/src/send_transaction_service.rs @@ -0,0 +1,377 @@ +use crate::cluster_info::ClusterInfo; +use solana_ledger::bank_forks::BankForks; +use solana_metrics::{datapoint_warn, inc_new_counter_info}; +use solana_runtime::bank::Bank; +use solana_sdk::{clock::Slot, signature::Signature}; +use std::{ + collections::HashMap, + net::{SocketAddr, UdpSocket}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, +}; + +/// Maximum size of the transaction queue +const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but maybe it needs to be bigger one day + +pub struct SendTransactionService { + thread: JoinHandle<()>, + sender: Mutex>, + send_socket: UdpSocket, + tpu_address: SocketAddr, +} + +struct TransactionInfo { + signature: Signature, + wire_transaction: Vec, + last_valid_slot: Slot, +} + +#[derive(Default, Debug, PartialEq)] +struct ProcessTransactionsResult { + rooted: u64, + expired: u64, + retried: u64, + failed: u64, + retained: u64, +} + +impl SendTransactionService { + pub fn new( + cluster_info: &Arc, + bank_forks: &Arc>, + exit: &Arc, + ) -> Self { + let (sender, receiver) = channel::(); + let tpu_address = cluster_info.my_contact_info().tpu; + + let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address, exit.clone()); + Self { + thread, + sender: Mutex::new(sender), + send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + tpu_address, + } + } + + fn retry_thread( + receiver: Receiver, + bank_forks: Arc>, + tpu_address: SocketAddr, + exit: Arc, + ) -> JoinHandle<()> { + let mut last_status_check = Instant::now(); + let mut transactions = HashMap::new(); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + + Builder::new() + .name("send-tx-svc".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + + if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) { + if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { + transactions.insert(transaction_info.signature, transaction_info); + } else { + datapoint_warn!("send_transaction_service-queue-overflow"); + } + } + + if Instant::now().duration_since(last_status_check).as_secs() >= 5 { + if !transactions.is_empty() { + datapoint_info!( + "send_transaction_service-queue-size", + ("len", transactions.len(), i64) + ); + let bank_forks = bank_forks.read().unwrap(); + let root_bank = bank_forks.root_bank(); + let working_bank = bank_forks.working_bank(); + + let _result = Self::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + ); + } + last_status_check = Instant::now(); + } + }) + .unwrap() + } + + fn process_transactions( + working_bank: &Arc, + root_bank: &Arc, + send_socket: &UdpSocket, + tpu_address: &SocketAddr, + transactions: &mut HashMap, + ) -> ProcessTransactionsResult { + let mut result = ProcessTransactionsResult::default(); + + transactions.retain(|signature, transaction_info| { + if root_bank.has_signature(signature) { + info!("Transaction is rooted: {}", signature); + result.rooted += 1; + inc_new_counter_info!("send_transaction_service-rooted", 1); + false + } else if transaction_info.last_valid_slot < root_bank.slot() { + info!("Dropping expired transaction: {}", signature); + result.expired += 1; + inc_new_counter_info!("send_transaction_service-expired", 1); + false + } else { + match working_bank.get_signature_status_slot(signature) { + None => { + // Transaction is unknown to the working bank, it might have been + // dropped or landed in another fork. Re-send it + info!("Retrying transaction: {}", signature); + result.retried += 1; + inc_new_counter_info!("send_transaction_service-retry", 1); + Self::send_transaction( + &send_socket, + &tpu_address, + &transaction_info.wire_transaction, + ); + true + } + Some((_slot, status)) => { + if status.is_err() { + info!("Dropping failed transaction: {}", signature); + result.failed += 1; + inc_new_counter_info!("send_transaction_service-failed", 1); + false + } else { + result.retained += 1; + true + } + } + } + } + }); + + result + } + + fn send_transaction( + send_socket: &UdpSocket, + tpu_address: &SocketAddr, + wire_transaction: &[u8], + ) { + if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) { + warn!("Failed to send transaction to {}: {:?}", tpu_address, err); + } + } + + pub fn send(&self, signature: Signature, wire_transaction: Vec, last_valid_slot: Slot) { + inc_new_counter_info!("send_transaction_service-enqueue", 1, 1); + Self::send_transaction(&self.send_socket, &self.tpu_address, &wire_transaction); + + self.sender + .lock() + .unwrap() + .send(TransactionInfo { + signature, + wire_transaction, + last_valid_slot, + }) + .unwrap_or_else(|err| warn!("Failed to enqueue transaction: {}", err)); + } + + pub fn join(self) -> thread::Result<()> { + self.thread.join() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::rpc::tests::new_bank_forks; + use solana_sdk::{pubkey::Pubkey, signature::Signer}; + + #[test] + fn service_exit() { + let cluster_info = Arc::new(ClusterInfo::default()); + let bank_forks = new_bank_forks().0; + let exit = Arc::new(AtomicBool::new(false)); + + let send_tranaction_service = + SendTransactionService::new(&cluster_info, &bank_forks, &exit); + + exit.store(true, Ordering::Relaxed); + send_tranaction_service.join().unwrap(); + } + + #[test] + fn process_transactions() { + solana_logger::setup(); + + let (bank_forks, mint_keypair, _voting_keypair) = new_bank_forks(); + let cluster_info = ClusterInfo::default(); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let tpu_address = cluster_info.my_contact_info().tpu; + + let root_bank = Arc::new(Bank::new_from_parent( + &bank_forks.read().unwrap().working_bank(), + &Pubkey::default(), + 1, + )); + let rooted_signature = root_bank + .transfer(1, &mint_keypair, &mint_keypair.pubkey()) + .unwrap(); + + let working_bank = Arc::new(Bank::new_from_parent(&root_bank, &Pubkey::default(), 2)); + + let non_rooted_signature = working_bank + .transfer(2, &mint_keypair, &mint_keypair.pubkey()) + .unwrap(); + + let failed_signature = { + let blockhash = working_bank.last_blockhash(); + let transaction = solana_sdk::system_transaction::transfer( + &mint_keypair, + &Pubkey::default(), + 1, + blockhash, + ); + let signature = transaction.signatures[0]; + working_bank.process_transaction(&transaction).unwrap_err(); + signature + }; + + let mut transactions = HashMap::new(); + + info!("Expired transactions are dropped.."); + transactions.insert( + Signature::default(), + TransactionInfo { + signature: Signature::default(), + wire_transaction: vec![], + last_valid_slot: root_bank.slot() - 1, + }, + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + expired: 1, + ..ProcessTransactionsResult::default() + } + ); + + info!("Rooted transactions are dropped..."); + transactions.insert( + rooted_signature, + TransactionInfo { + signature: rooted_signature, + wire_transaction: vec![], + last_valid_slot: working_bank.slot(), + }, + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + rooted: 1, + ..ProcessTransactionsResult::default() + } + ); + + info!("Failed transactions are dropped..."); + transactions.insert( + failed_signature, + TransactionInfo { + signature: failed_signature, + wire_transaction: vec![], + last_valid_slot: working_bank.slot(), + }, + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + failed: 1, + ..ProcessTransactionsResult::default() + } + ); + + info!("Non-rooted transactions are kept..."); + transactions.insert( + non_rooted_signature, + TransactionInfo { + signature: non_rooted_signature, + wire_transaction: vec![], + last_valid_slot: working_bank.slot(), + }, + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + ); + assert_eq!(transactions.len(), 1); + assert_eq!( + result, + ProcessTransactionsResult { + retained: 1, + ..ProcessTransactionsResult::default() + } + ); + transactions.clear(); + + info!("Unknown transactions are retried..."); + transactions.insert( + Signature::default(), + TransactionInfo { + signature: Signature::default(), + wire_transaction: vec![], + last_valid_slot: working_bank.slot(), + }, + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + ); + assert_eq!(transactions.len(), 1); + assert_eq!( + result, + ProcessTransactionsResult { + retried: 1, + ..ProcessTransactionsResult::default() + } + ); + } +}