From 7b7b7be99c66e86de76b0ce804dd557fb0e8995f Mon Sep 17 00:00:00 2001 From: carllin Date: Fri, 4 Sep 2020 15:22:18 -0700 Subject: [PATCH] Forward transactions to the expected leader instead of your own TPU port (#12012) Co-authored-by: Carl --- core/src/rpc.rs | 7 +++ core/src/rpc_service.rs | 10 +++- core/src/send_transaction_service.rs | 80 +++++++++++++++++++++++++--- core/src/validator.rs | 67 +++++++++++------------ 4 files changed, 122 insertions(+), 42 deletions(-) diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 29ec60bb58..58e53cef5f 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -2664,6 +2664,7 @@ pub mod tests { Arc::new(SendTransactionService::new( &cluster_info, &bank_forks, + None, &exit, )), &runtime::Runtime::new().unwrap(), @@ -2717,6 +2718,7 @@ pub mod tests { Arc::new(SendTransactionService::new( &cluster_info, &bank_forks, + None, &exit, )), &runtime::Runtime::new().unwrap(), @@ -3983,6 +3985,7 @@ pub mod tests { Arc::new(SendTransactionService::new( &cluster_info, &bank_forks, + None, &exit, )), &runtime::Runtime::new().unwrap(), @@ -4029,6 +4032,7 @@ pub mod tests { Arc::new(SendTransactionService::new( &cluster_info, &bank_forks, + None, &exit, )), &runtime::Runtime::new().unwrap(), @@ -4225,6 +4229,7 @@ pub mod tests { Arc::new(SendTransactionService::new( &cluster_info, &bank_forks, + None, &exit, )), &runtime::Runtime::new().unwrap(), @@ -4259,6 +4264,7 @@ pub mod tests { Arc::new(SendTransactionService::new( &cluster_info, &bank_forks, + None, &exit, )), &runtime::Runtime::new().unwrap(), @@ -4352,6 +4358,7 @@ pub mod tests { Arc::new(SendTransactionService::new( &cluster_info, &bank_forks, + None, &exit, )), &runtime::Runtime::new().unwrap(), diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 270bbc747f..5f29052870 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -1,7 +1,8 @@ //! The `rpc_service` module implements the Solana JSON RPC service. use crate::{ - cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, rpc_health::*, + cluster_info::ClusterInfo, commitment::BlockCommitmentCache, poh_recorder::PohRecorder, rpc::*, + rpc_health::*, send_transaction_service::LeaderInfo, send_transaction_service::SendTransactionService, validator::ValidatorExit, }; use jsonrpc_core::MetaIoHandler; @@ -21,7 +22,7 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, sync::atomic::{AtomicBool, Ordering}, - sync::{mpsc::channel, Arc, RwLock}, + sync::{mpsc::channel, Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, }; use tokio::runtime; @@ -237,6 +238,7 @@ impl JsonRpcService { block_commitment_cache: Arc>, blockstore: Arc, cluster_info: Arc, + poh_recorder: Option>>, genesis_hash: Hash, ledger_path: &Path, validator_exit: Arc>>, @@ -254,9 +256,12 @@ impl JsonRpcService { )); let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); + let leader_info = + poh_recorder.map(|recorder| LeaderInfo::new(cluster_info.clone(), recorder)); let send_transaction_service = Arc::new(SendTransactionService::new( &cluster_info, &bank_forks, + leader_info, &exit_send_transaction_service, )); @@ -418,6 +423,7 @@ mod tests { block_commitment_cache, blockstore, cluster_info, + None, Hash::default(), &PathBuf::from("farf"), validator_exit, diff --git a/core/src/send_transaction_service.rs b/core/src/send_transaction_service.rs index d67bbb3225..6ebf9efd96 100644 --- a/core/src/send_transaction_service.rs +++ b/core/src/send_transaction_service.rs @@ -1,8 +1,9 @@ -use crate::cluster_info::ClusterInfo; +use crate::{cluster_info::ClusterInfo, poh_recorder::PohRecorder}; +use log::*; use solana_ledger::bank_forks::BankForks; use solana_metrics::{datapoint_warn, inc_new_counter_info}; use solana_runtime::bank::Bank; -use solana_sdk::{clock::Slot, signature::Signature}; +use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use std::{ collections::HashMap, net::{SocketAddr, UdpSocket}, @@ -25,6 +26,39 @@ pub struct SendTransactionService { tpu_address: SocketAddr, } +pub struct LeaderInfo { + cluster_info: Arc, + poh_recorder: Arc>, + recent_peers: HashMap, +} + +impl LeaderInfo { + pub fn new(cluster_info: Arc, poh_recorder: Arc>) -> Self { + Self { + cluster_info, + poh_recorder, + recent_peers: HashMap::new(), + } + } + + pub fn refresh_recent_peers(&mut self) { + self.recent_peers = self + .cluster_info + .tpu_peers() + .into_iter() + .map(|ci| (ci.id, ci.tpu)) + .collect(); + } + + pub fn get_leader_tpu(&self) -> Option<&SocketAddr> { + self.poh_recorder + .lock() + .unwrap() + .leader_after_n_slots(0) + .and_then(|leader| self.recent_peers.get(&leader)) + } +} + struct TransactionInfo { signature: Signature, wire_transaction: Vec, @@ -44,12 +78,19 @@ impl SendTransactionService { pub fn new( cluster_info: &Arc, bank_forks: &Arc>, + leader_info: Option, exit: &Arc, ) -> Self { let (sender, receiver) = channel::(); let tpu_address = cluster_info.my_contact_info().tpu; - let thread = Self::retry_thread(receiver, bank_forks.clone(), tpu_address, exit.clone()); + let thread = Self::retry_thread( + receiver, + bank_forks.clone(), + leader_info, + tpu_address, + exit.clone(), + ); Self { thread, sender: Mutex::new(sender), @@ -61,6 +102,7 @@ impl SendTransactionService { fn retry_thread( receiver: Receiver, bank_forks: Arc>, + mut leader_info: Option, tpu_address: SocketAddr, exit: Arc, ) -> JoinHandle<()> { @@ -68,6 +110,10 @@ impl SendTransactionService { let mut transactions = HashMap::new(); let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + if let Some(leader_info) = leader_info.as_mut() { + leader_info.refresh_recent_peers(); + } + Builder::new() .name("send-tx-svc".to_string()) .spawn(move || loop { @@ -76,6 +122,15 @@ impl SendTransactionService { } if let Ok(transaction_info) = receiver.recv_timeout(Duration::from_secs(1)) { + let address = leader_info + .as_ref() + .and_then(|leader_info| leader_info.get_leader_tpu()) + .unwrap_or(&tpu_address); + Self::send_transaction( + &send_socket, + address, + &transaction_info.wire_transaction, + ); if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE { transactions.insert(transaction_info.signature, transaction_info); } else { @@ -99,9 +154,13 @@ impl SendTransactionService { &send_socket, &tpu_address, &mut transactions, + &leader_info, ); } last_status_check = Instant::now(); + if let Some(leader_info) = leader_info.as_mut() { + leader_info.refresh_recent_peers(); + } } }) .unwrap() @@ -113,6 +172,7 @@ impl SendTransactionService { send_socket: &UdpSocket, tpu_address: &SocketAddr, transactions: &mut HashMap, + leader_info: &Option, ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); @@ -137,7 +197,10 @@ impl SendTransactionService { inc_new_counter_info!("send_transaction_service-retry", 1); Self::send_transaction( &send_socket, - &tpu_address, + leader_info + .as_ref() + .and_then(|leader_info| leader_info.get_leader_tpu()) + .unwrap_or(&tpu_address), &transaction_info.wire_transaction, ); true @@ -203,7 +266,7 @@ mod test { let exit = Arc::new(AtomicBool::new(false)); let send_tranaction_service = - SendTransactionService::new(&cluster_info, &bank_forks, &exit); + SendTransactionService::new(&cluster_info, &bank_forks, None, &exit); exit.store(true, Ordering::Relaxed); send_tranaction_service.join().unwrap(); @@ -211,8 +274,6 @@ mod test { #[test] fn process_transactions() { - solana_logger::setup(); - let (bank_forks, mint_keypair, _voting_keypair) = new_bank_forks(); let cluster_info = ClusterInfo::default(); let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -263,6 +324,7 @@ mod test { &send_socket, &tpu_address, &mut transactions, + &None, ); assert!(transactions.is_empty()); assert_eq!( @@ -288,6 +350,7 @@ mod test { &send_socket, &tpu_address, &mut transactions, + &None, ); assert!(transactions.is_empty()); assert_eq!( @@ -313,6 +376,7 @@ mod test { &send_socket, &tpu_address, &mut transactions, + &None, ); assert!(transactions.is_empty()); assert_eq!( @@ -338,6 +402,7 @@ mod test { &send_socket, &tpu_address, &mut transactions, + &None, ); assert_eq!(transactions.len(), 1); assert_eq!( @@ -364,6 +429,7 @@ mod test { &send_socket, &tpu_address, &mut transactions, + &None, ); assert_eq!(transactions.len(), 1); assert_eq!( diff --git a/core/src/validator.rs b/core/src/validator.rs index 8b3d1dd9fb..c3b9c302bc 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -279,38 +279,6 @@ impl Validator { block_commitment_cache.clone(), )); - let rpc_override_health_check = Arc::new(AtomicBool::new(false)); - let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { - if ContactInfo::is_valid_address(&node.info.rpc) { - assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); - assert_eq!(rpc_port, node.info.rpc.port()); - assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); - } else { - assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); - } - ( - JsonRpcService::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), - config.rpc_config.clone(), - config.snapshot_config.clone(), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore.clone(), - cluster_info.clone(), - genesis_config.hash(), - ledger_path, - validator_exit.clone(), - config.trusted_validators.clone(), - rpc_override_health_check.clone(), - ), - PubSubService::new( - &subscriptions, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), - &exit, - ), - ) - }); - info!( "Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}", bank.epoch(), @@ -333,7 +301,7 @@ impl Validator { std::thread::park(); } - let poh_config = Arc::new(genesis_config.poh_config); + let poh_config = Arc::new(genesis_config.poh_config.clone()); let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal( bank.tick_height(), bank.last_blockhash(), @@ -357,6 +325,39 @@ impl Validator { } let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let rpc_override_health_check = Arc::new(AtomicBool::new(false)); + let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { + if ContactInfo::is_valid_address(&node.info.rpc) { + assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + assert_eq!(rpc_port, node.info.rpc.port()); + assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); + } else { + assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + } + ( + JsonRpcService::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), + config.rpc_config.clone(), + config.snapshot_config.clone(), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + Some(poh_recorder.clone()), + genesis_config.hash(), + ledger_path, + validator_exit.clone(), + config.trusted_validators.clone(), + rpc_override_health_check.clone(), + ), + PubSubService::new( + &subscriptions, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), + &exit, + ), + ) + }); + let ip_echo_server = solana_net_utils::ip_echo_server(node.sockets.ip_echo.unwrap()); let gossip_service = GossipService::new(