Move transaction executor (#19946)
This commit is contained in:
		
							
								
								
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -4560,6 +4560,7 @@ dependencies = [ | ||||
|  "solana-clap-utils", | ||||
|  "solana-faucet", | ||||
|  "solana-logger 1.9.0", | ||||
|  "solana-measure", | ||||
|  "solana-net-utils", | ||||
|  "solana-sdk", | ||||
|  "solana-transaction-status", | ||||
|   | ||||
| @@ -5,10 +5,9 @@ use rand::{thread_rng, Rng}; | ||||
| use rayon::prelude::*; | ||||
| use solana_account_decoder::parse_token::spl_token_v2_0_pubkey; | ||||
| use solana_clap_utils::input_parsers::pubkey_of; | ||||
| use solana_client::rpc_client::RpcClient; | ||||
| use solana_client::{rpc_client::RpcClient, transaction_executor::TransactionExecutor}; | ||||
| use solana_faucet::faucet::{request_airdrop_transaction, FAUCET_PORT}; | ||||
| use solana_gossip::gossip_service::discover; | ||||
| use solana_measure::measure::Measure; | ||||
| use solana_runtime::inline_spl_token_v2_0; | ||||
| use solana_sdk::{ | ||||
|     commitment_config::CommitmentConfig, | ||||
| @@ -16,9 +15,8 @@ use solana_sdk::{ | ||||
|     message::Message, | ||||
|     pubkey::Pubkey, | ||||
|     rpc_port::DEFAULT_RPC_PORT, | ||||
|     signature::{read_keypair_file, Keypair, Signature, Signer}, | ||||
|     signature::{read_keypair_file, Keypair, Signer}, | ||||
|     system_instruction, system_program, | ||||
|     timing::timestamp, | ||||
|     transaction::Transaction, | ||||
| }; | ||||
| use solana_streamer::socket::SocketAddrSpace; | ||||
| @@ -27,10 +25,10 @@ use std::{ | ||||
|     net::SocketAddr, | ||||
|     process::exit, | ||||
|     sync::{ | ||||
|         atomic::{AtomicBool, AtomicU64, Ordering}, | ||||
|         Arc, RwLock, | ||||
|         atomic::{AtomicU64, Ordering}, | ||||
|         Arc, | ||||
|     }, | ||||
|     thread::{sleep, Builder, JoinHandle}, | ||||
|     thread::sleep, | ||||
|     time::{Duration, Instant}, | ||||
| }; | ||||
|  | ||||
| @@ -97,160 +95,6 @@ pub fn airdrop_lamports( | ||||
|     true | ||||
| } | ||||
|  | ||||
| // signature, timestamp, id | ||||
| type PendingQueue = Vec<(Signature, u64, u64)>; | ||||
|  | ||||
| struct TransactionExecutor { | ||||
|     sig_clear_t: JoinHandle<()>, | ||||
|     sigs: Arc<RwLock<PendingQueue>>, | ||||
|     cleared: Arc<RwLock<Vec<u64>>>, | ||||
|     exit: Arc<AtomicBool>, | ||||
|     counter: AtomicU64, | ||||
|     client: RpcClient, | ||||
| } | ||||
|  | ||||
| impl TransactionExecutor { | ||||
|     fn new(entrypoint_addr: SocketAddr) -> Self { | ||||
|         let sigs = Arc::new(RwLock::new(Vec::new())); | ||||
|         let cleared = Arc::new(RwLock::new(Vec::new())); | ||||
|         let exit = Arc::new(AtomicBool::new(false)); | ||||
|         let sig_clear_t = Self::start_sig_clear_thread(&exit, &sigs, &cleared, entrypoint_addr); | ||||
|         let client = | ||||
|             RpcClient::new_socket_with_commitment(entrypoint_addr, CommitmentConfig::confirmed()); | ||||
|         Self { | ||||
|             sigs, | ||||
|             cleared, | ||||
|             sig_clear_t, | ||||
|             exit, | ||||
|             counter: AtomicU64::new(0), | ||||
|             client, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn num_outstanding(&self) -> usize { | ||||
|         self.sigs.read().unwrap().len() | ||||
|     } | ||||
|  | ||||
|     fn push_transactions(&self, txs: Vec<Transaction>) -> Vec<u64> { | ||||
|         let mut ids = vec![]; | ||||
|         let new_sigs = txs.into_iter().filter_map(|tx| { | ||||
|             let id = self.counter.fetch_add(1, Ordering::Relaxed); | ||||
|             ids.push(id); | ||||
|             match self.client.send_transaction(&tx) { | ||||
|                 Ok(sig) => { | ||||
|                     return Some((sig, timestamp(), id)); | ||||
|                 } | ||||
|                 Err(e) => { | ||||
|                     info!("error: {:#?}", e); | ||||
|                 } | ||||
|             } | ||||
|             None | ||||
|         }); | ||||
|         let mut sigs_w = self.sigs.write().unwrap(); | ||||
|         sigs_w.extend(new_sigs); | ||||
|         ids | ||||
|     } | ||||
|  | ||||
|     fn drain_cleared(&self) -> Vec<u64> { | ||||
|         std::mem::take(&mut *self.cleared.write().unwrap()) | ||||
|     } | ||||
|  | ||||
|     fn close(self) { | ||||
|         self.exit.store(true, Ordering::Relaxed); | ||||
|         self.sig_clear_t.join().unwrap(); | ||||
|     } | ||||
|  | ||||
|     fn start_sig_clear_thread( | ||||
|         exit: &Arc<AtomicBool>, | ||||
|         sigs: &Arc<RwLock<PendingQueue>>, | ||||
|         cleared: &Arc<RwLock<Vec<u64>>>, | ||||
|         entrypoint_addr: SocketAddr, | ||||
|     ) -> JoinHandle<()> { | ||||
|         let sigs = sigs.clone(); | ||||
|         let exit = exit.clone(); | ||||
|         let cleared = cleared.clone(); | ||||
|         Builder::new() | ||||
|             .name("sig_clear".to_string()) | ||||
|             .spawn(move || { | ||||
|                 let client = RpcClient::new_socket_with_commitment( | ||||
|                     entrypoint_addr, | ||||
|                     CommitmentConfig::confirmed(), | ||||
|                 ); | ||||
|                 let mut success = 0; | ||||
|                 let mut error_count = 0; | ||||
|                 let mut timed_out = 0; | ||||
|                 let mut last_log = Instant::now(); | ||||
|                 while !exit.load(Ordering::Relaxed) { | ||||
|                     let sigs_len = sigs.read().unwrap().len(); | ||||
|                     if sigs_len > 0 { | ||||
|                         let mut sigs_w = sigs.write().unwrap(); | ||||
|                         let mut start = Measure::start("sig_status"); | ||||
|                         let statuses: Vec<_> = sigs_w | ||||
|                             .chunks(200) | ||||
|                             .flat_map(|sig_chunk| { | ||||
|                                 let only_sigs: Vec<_> = sig_chunk.iter().map(|s| s.0).collect(); | ||||
|                                 client | ||||
|                                     .get_signature_statuses(&only_sigs) | ||||
|                                     .expect("status fail") | ||||
|                                     .value | ||||
|                             }) | ||||
|                             .collect(); | ||||
|                         let mut num_cleared = 0; | ||||
|                         let start_len = sigs_w.len(); | ||||
|                         let now = timestamp(); | ||||
|                         let mut new_ids = vec![]; | ||||
|                         let mut i = 0; | ||||
|                         let mut j = 0; | ||||
|                         while i != sigs_w.len() { | ||||
|                             let mut retain = true; | ||||
|                             let sent_ts = sigs_w[i].1; | ||||
|                             if let Some(e) = &statuses[j] { | ||||
|                                 debug!("error: {:?}", e); | ||||
|                                 if e.status.is_ok() { | ||||
|                                     success += 1; | ||||
|                                 } else { | ||||
|                                     error_count += 1; | ||||
|                                 } | ||||
|                                 num_cleared += 1; | ||||
|                                 retain = false; | ||||
|                             } else if now - sent_ts > 30_000 { | ||||
|                                 retain = false; | ||||
|                                 timed_out += 1; | ||||
|                             } | ||||
|                             if !retain { | ||||
|                                 new_ids.push(sigs_w.remove(i).2); | ||||
|                             } else { | ||||
|                                 i += 1; | ||||
|                             } | ||||
|                             j += 1; | ||||
|                         } | ||||
|                         let final_sigs_len = sigs_w.len(); | ||||
|                         drop(sigs_w); | ||||
|                         cleared.write().unwrap().extend(new_ids); | ||||
|                         start.stop(); | ||||
|                         debug!( | ||||
|                             "sigs len: {:?} success: {} took: {}ms cleared: {}/{}", | ||||
|                             final_sigs_len, | ||||
|                             success, | ||||
|                             start.as_ms(), | ||||
|                             num_cleared, | ||||
|                             start_len, | ||||
|                         ); | ||||
|                         if last_log.elapsed().as_millis() > 5000 { | ||||
|                             info!( | ||||
|                                 "success: {} error: {} timed_out: {}", | ||||
|                                 success, error_count, timed_out, | ||||
|                             ); | ||||
|                             last_log = Instant::now(); | ||||
|                         } | ||||
|                     } | ||||
|                     sleep(Duration::from_millis(200)); | ||||
|                 } | ||||
|             }) | ||||
|             .unwrap() | ||||
|     } | ||||
| } | ||||
|  | ||||
| struct SeedTracker { | ||||
|     max_created: Arc<AtomicU64>, | ||||
|     max_closed: Arc<AtomicU64>, | ||||
| @@ -720,6 +564,7 @@ pub mod test { | ||||
|         local_cluster::{ClusterConfig, LocalCluster}, | ||||
|         validator_configs::make_identical_validator_configs, | ||||
|     }; | ||||
|     use solana_measure::measure::Measure; | ||||
|     use solana_sdk::poh_config::PohConfig; | ||||
|  | ||||
|     #[test] | ||||
|   | ||||
| @@ -27,6 +27,7 @@ solana-account-decoder = { path = "../account-decoder", version = "=1.9.0" } | ||||
| solana-clap-utils = { path = "../clap-utils", version = "=1.9.0" } | ||||
| solana-faucet = { path = "../faucet", version = "=1.9.0" } | ||||
| solana-net-utils = { path = "../net-utils", version = "=1.9.0" } | ||||
| solana-measure = { path = "../measure", version = "=1.9.0" } | ||||
| solana-sdk = { path = "../sdk", version = "=1.9.0" } | ||||
| solana-transaction-status = { path = "../transaction-status", version = "=1.9.0" } | ||||
| solana-version = { path = "../version", version = "=1.9.0" } | ||||
|   | ||||
| @@ -20,3 +20,4 @@ pub mod rpc_response; | ||||
| pub mod rpc_sender; | ||||
| pub mod thin_client; | ||||
| pub mod tpu_client; | ||||
| pub mod transaction_executor; | ||||
|   | ||||
							
								
								
									
										171
									
								
								client/src/transaction_executor.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										171
									
								
								client/src/transaction_executor.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,171 @@ | ||||
| #![allow(clippy::integer_arithmetic)] | ||||
| use crate::rpc_client::RpcClient; | ||||
| use log::*; | ||||
| use solana_measure::measure::Measure; | ||||
| use solana_sdk::{ | ||||
|     commitment_config::CommitmentConfig, signature::Signature, timing::timestamp, | ||||
|     transaction::Transaction, | ||||
| }; | ||||
| use std::{ | ||||
|     net::SocketAddr, | ||||
|     sync::{ | ||||
|         atomic::{AtomicBool, AtomicU64, Ordering}, | ||||
|         Arc, RwLock, | ||||
|     }, | ||||
|     thread::{sleep, Builder, JoinHandle}, | ||||
|     time::{Duration, Instant}, | ||||
| }; | ||||
|  | ||||
| // signature, timestamp, id | ||||
| type PendingQueue = Vec<(Signature, u64, u64)>; | ||||
|  | ||||
| pub struct TransactionExecutor { | ||||
|     sig_clear_t: JoinHandle<()>, | ||||
|     sigs: Arc<RwLock<PendingQueue>>, | ||||
|     cleared: Arc<RwLock<Vec<u64>>>, | ||||
|     exit: Arc<AtomicBool>, | ||||
|     counter: AtomicU64, | ||||
|     client: RpcClient, | ||||
| } | ||||
|  | ||||
| impl TransactionExecutor { | ||||
|     pub fn new(entrypoint_addr: SocketAddr) -> Self { | ||||
|         let sigs = Arc::new(RwLock::new(Vec::new())); | ||||
|         let cleared = Arc::new(RwLock::new(Vec::new())); | ||||
|         let exit = Arc::new(AtomicBool::new(false)); | ||||
|         let sig_clear_t = Self::start_sig_clear_thread(&exit, &sigs, &cleared, entrypoint_addr); | ||||
|         let client = | ||||
|             RpcClient::new_socket_with_commitment(entrypoint_addr, CommitmentConfig::confirmed()); | ||||
|         Self { | ||||
|             sigs, | ||||
|             cleared, | ||||
|             sig_clear_t, | ||||
|             exit, | ||||
|             counter: AtomicU64::new(0), | ||||
|             client, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn num_outstanding(&self) -> usize { | ||||
|         self.sigs.read().unwrap().len() | ||||
|     } | ||||
|  | ||||
|     pub fn push_transactions(&self, txs: Vec<Transaction>) -> Vec<u64> { | ||||
|         let mut ids = vec![]; | ||||
|         let new_sigs = txs.into_iter().filter_map(|tx| { | ||||
|             let id = self.counter.fetch_add(1, Ordering::Relaxed); | ||||
|             ids.push(id); | ||||
|             match self.client.send_transaction(&tx) { | ||||
|                 Ok(sig) => { | ||||
|                     return Some((sig, timestamp(), id)); | ||||
|                 } | ||||
|                 Err(e) => { | ||||
|                     info!("error: {:#?}", e); | ||||
|                 } | ||||
|             } | ||||
|             None | ||||
|         }); | ||||
|         let mut sigs_w = self.sigs.write().unwrap(); | ||||
|         sigs_w.extend(new_sigs); | ||||
|         ids | ||||
|     } | ||||
|  | ||||
|     pub fn drain_cleared(&self) -> Vec<u64> { | ||||
|         std::mem::take(&mut *self.cleared.write().unwrap()) | ||||
|     } | ||||
|  | ||||
|     pub fn close(self) { | ||||
|         self.exit.store(true, Ordering::Relaxed); | ||||
|         self.sig_clear_t.join().unwrap(); | ||||
|     } | ||||
|  | ||||
|     fn start_sig_clear_thread( | ||||
|         exit: &Arc<AtomicBool>, | ||||
|         sigs: &Arc<RwLock<PendingQueue>>, | ||||
|         cleared: &Arc<RwLock<Vec<u64>>>, | ||||
|         entrypoint_addr: SocketAddr, | ||||
|     ) -> JoinHandle<()> { | ||||
|         let sigs = sigs.clone(); | ||||
|         let exit = exit.clone(); | ||||
|         let cleared = cleared.clone(); | ||||
|         Builder::new() | ||||
|             .name("sig_clear".to_string()) | ||||
|             .spawn(move || { | ||||
|                 let client = RpcClient::new_socket_with_commitment( | ||||
|                     entrypoint_addr, | ||||
|                     CommitmentConfig::confirmed(), | ||||
|                 ); | ||||
|                 let mut success = 0; | ||||
|                 let mut error_count = 0; | ||||
|                 let mut timed_out = 0; | ||||
|                 let mut last_log = Instant::now(); | ||||
|                 while !exit.load(Ordering::Relaxed) { | ||||
|                     let sigs_len = sigs.read().unwrap().len(); | ||||
|                     if sigs_len > 0 { | ||||
|                         let mut sigs_w = sigs.write().unwrap(); | ||||
|                         let mut start = Measure::start("sig_status"); | ||||
|                         let statuses: Vec<_> = sigs_w | ||||
|                             .chunks(200) | ||||
|                             .flat_map(|sig_chunk| { | ||||
|                                 let only_sigs: Vec<_> = sig_chunk.iter().map(|s| s.0).collect(); | ||||
|                                 client | ||||
|                                     .get_signature_statuses(&only_sigs) | ||||
|                                     .expect("status fail") | ||||
|                                     .value | ||||
|                             }) | ||||
|                             .collect(); | ||||
|                         let mut num_cleared = 0; | ||||
|                         let start_len = sigs_w.len(); | ||||
|                         let now = timestamp(); | ||||
|                         let mut new_ids = vec![]; | ||||
|                         let mut i = 0; | ||||
|                         let mut j = 0; | ||||
|                         while i != sigs_w.len() { | ||||
|                             let mut retain = true; | ||||
|                             let sent_ts = sigs_w[i].1; | ||||
|                             if let Some(e) = &statuses[j] { | ||||
|                                 debug!("error: {:?}", e); | ||||
|                                 if e.status.is_ok() { | ||||
|                                     success += 1; | ||||
|                                 } else { | ||||
|                                     error_count += 1; | ||||
|                                 } | ||||
|                                 num_cleared += 1; | ||||
|                                 retain = false; | ||||
|                             } else if now - sent_ts > 30_000 { | ||||
|                                 retain = false; | ||||
|                                 timed_out += 1; | ||||
|                             } | ||||
|                             if !retain { | ||||
|                                 new_ids.push(sigs_w.remove(i).2); | ||||
|                             } else { | ||||
|                                 i += 1; | ||||
|                             } | ||||
|                             j += 1; | ||||
|                         } | ||||
|                         let final_sigs_len = sigs_w.len(); | ||||
|                         drop(sigs_w); | ||||
|                         cleared.write().unwrap().extend(new_ids); | ||||
|                         start.stop(); | ||||
|                         debug!( | ||||
|                             "sigs len: {:?} success: {} took: {}ms cleared: {}/{}", | ||||
|                             final_sigs_len, | ||||
|                             success, | ||||
|                             start.as_ms(), | ||||
|                             num_cleared, | ||||
|                             start_len, | ||||
|                         ); | ||||
|                         if last_log.elapsed().as_millis() > 5000 { | ||||
|                             info!( | ||||
|                                 "success: {} error: {} timed_out: {}", | ||||
|                                 success, error_count, timed_out, | ||||
|                             ); | ||||
|                             last_log = Instant::now(); | ||||
|                         } | ||||
|                     } | ||||
|                     sleep(Duration::from_millis(200)); | ||||
|                 } | ||||
|             }) | ||||
|             .unwrap() | ||||
|     } | ||||
| } | ||||
							
								
								
									
										1
									
								
								programs/bpf/Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1
									
								
								programs/bpf/Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -2904,6 +2904,7 @@ dependencies = [ | ||||
|  "solana-account-decoder", | ||||
|  "solana-clap-utils", | ||||
|  "solana-faucet", | ||||
|  "solana-measure", | ||||
|  "solana-net-utils", | ||||
|  "solana-sdk", | ||||
|  "solana-transaction-status", | ||||
|   | ||||
		Reference in New Issue
	
	Block a user