diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index 4e78583c28..a62addb681 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -26,11 +26,11 @@ use std::collections::VecDeque; use std::mem; use std::net::SocketAddr; use std::process::exit; -use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder}; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant}; // TODO Chunk length as specified results in a bunch of failures, divide by 10 helps... // Assume 4MB network buffers, and 512 byte packets @@ -39,9 +39,6 @@ const FUND_CHUNK_LEN: usize = 4 * 1024 * 1024 / 512; // Maximum system transfers per transaction const MAX_TRANSFERS_PER_TX: u64 = 4; -// Interval between fetching a new blockhash -const BLOCKHASH_RENEW_PERIOD_S: u64 = 30; - pub type SharedTransactions = Arc>>>; pub struct Config { @@ -140,25 +137,17 @@ where ); let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new())); - let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); let total_txs_sent_count = Arc::new(AtomicUsize::new(0)); let s_threads: Vec<_> = (0..threads) .map(|_| { let exit_signal = exit_signal.clone(); let shared_txs = shared_txs.clone(); - let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let total_txs_sent_count = total_txs_sent_count.clone(); let client = clients[0].clone(); Builder::new() .name("solana-exchange-transfer".to_string()) .spawn(move || { - do_tx_transfers( - &exit_signal, - &shared_txs, - &shared_tx_active_thread_count, - &total_txs_sent_count, - &client, - ) + do_tx_transfers(&exit_signal, &shared_txs, &total_txs_sent_count, &client) }) .unwrap() }) @@ -169,7 +158,6 @@ where let swapper_thread = { let exit_signal = exit_signal.clone(); let shared_txs = shared_txs.clone(); - let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let client = clients[0].clone(); Builder::new() .name("solana-exchange-swapper".to_string()) @@ -178,7 +166,6 @@ where &exit_signal, &swapper_receiver, &shared_txs, - &shared_tx_active_thread_count, &swapper_signers, &profit_pubkeys, batch_size, @@ -194,7 +181,6 @@ where let trader_thread = { let exit_signal = exit_signal.clone(); let shared_txs = shared_txs.clone(); - let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let client = clients[0].clone(); Builder::new() .name("solana-exchange-trader".to_string()) @@ -203,7 +189,6 @@ where &exit_signal, &swapper_sender, &shared_txs, - &shared_tx_active_thread_count, &trader_signers, &src_pubkeys, transfer_delay, @@ -311,7 +296,6 @@ fn sample_txs( fn do_tx_transfers( exit_signal: &Arc, shared_txs: &SharedTransactions, - shared_tx_thread_count: &Arc, total_txs_sent_count: &Arc, client: &Arc, ) where @@ -327,13 +311,11 @@ fn do_tx_transfers( if let Some(txs0) = txs { let n = txs0.len(); - shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); let now = Instant::now(); for tx in txs0 { client.async_send_transaction(tx).expect("Transfer"); } let duration = now.elapsed(); - shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); total_txs_sent_count.fetch_add(n, Ordering::Relaxed); stats.total += n as u64; @@ -387,7 +369,6 @@ fn swapper( exit_signal: &Arc, receiver: &Receiver>, shared_txs: &SharedTransactions, - shared_tx_active_thread_count: &Arc, signers: &[Arc], profit_pubkeys: &[Pubkey], batch_size: usize, @@ -400,10 +381,6 @@ fn swapper( let mut stats = Stats::default(); let mut order_book = OrderBook::default(); let mut account_group: usize = 0; - let mut blockhash = client - .get_recent_blockhash() - .expect("Failed to get blockhash"); - let mut blockhash_now = UNIX_EPOCH; 'outer: loop { if let Ok(trade_infos) = receiver.try_recv() { let mut tries = 0; @@ -462,19 +439,9 @@ fn swapper( let now = Instant::now(); - // Don't get a blockhash every time - if SystemTime::now() - .duration_since(blockhash_now) - .unwrap() - .as_secs() - > BLOCKHASH_RENEW_PERIOD_S - { - blockhash = client - .get_recent_blockhash() - .expect("Failed to get blockhash"); - blockhash_now = SystemTime::now(); - } - + let blockhash = client + .get_recent_blockhash() + .expect("Failed to get blockhash"); let to_swap_txs: Vec<_> = to_swap .par_iter() .map(|(signer, swap, profit)| { @@ -517,10 +484,6 @@ fn swapper( } } - while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 { - sleep(Duration::from_millis(100)); - } - if exit_signal.load(Ordering::Relaxed) { break 'outer; } @@ -550,7 +513,6 @@ fn trader( exit_signal: &Arc, sender: &Sender>, shared_txs: &SharedTransactions, - shared_tx_active_thread_count: &Arc, signers: &[Arc], srcs: &[Pubkey], delay: u64, @@ -568,10 +530,6 @@ fn trader( let tokens = 1; let price = 1000; let mut account_group: usize = 0; - let mut blockhash = client - .get_recent_blockhash() - .expect("Failed to get blockhash"); - let mut blockhash_now = UNIX_EPOCH; loop { let now = Instant::now(); @@ -616,22 +574,12 @@ fn trader( } trace!("sw {:?} keypairs {:.2} /s", batch_size, rate); + let blockhash = client + .get_recent_blockhash() + .expect("Failed to get blockhash"); + trades.chunks(chunk_size).for_each(|chunk| { let now = Instant::now(); - - // Don't get a blockhash every time - if SystemTime::now() - .duration_since(blockhash_now) - .unwrap() - .as_secs() - > BLOCKHASH_RENEW_PERIOD_S - { - blockhash = client - .get_recent_blockhash() - .expect("Failed to get blockhash"); - blockhash_now = SystemTime::now(); - } - let trades_txs: Vec<_> = chunk .par_iter() .map(|(signer, trade, direction, src)| { @@ -682,10 +630,6 @@ fn trader( .send(trade_infos) .expect("Failed to send trades to swapper"); - while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 { - sleep(Duration::from_millis(100)); - } - if exit_signal.load(Ordering::Relaxed) { info!( "{} Trades with batch size {} chunk size {}", @@ -1023,7 +967,7 @@ pub fn airdrop_lamports(client: &Client, drone_addr: &SocketAddr, id: &Keypair, } } -pub fn get_clients(nodes: Vec) -> Vec { +pub fn get_clients(nodes: &[ContactInfo]) -> Vec { nodes .iter() .filter_map(|node| { @@ -1104,7 +1048,7 @@ mod tests { exit(1); }); - let clients = get_clients(nodes); + let clients = get_clients(&nodes); if clients.len() < NUM_NODES { error!( diff --git a/bench-exchange/src/main.rs b/bench-exchange/src/main.rs index 3cdfbf5844..35113065c7 100644 --- a/bench-exchange/src/main.rs +++ b/bench-exchange/src/main.rs @@ -34,7 +34,7 @@ fn main() { panic!("Failed to discover nodes"); }); - let clients = get_clients(nodes); + let clients = get_clients(&nodes); info!("{} nodes found", clients.len()); if clients.len() < num_nodes {