diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index ef38e55fa6..c7e08083e1 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -27,7 +27,7 @@ use std::fs::File; use std::io::Write; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream, UdpSocket}; use std::process::exit; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::thread::Builder; @@ -155,6 +155,44 @@ fn generate_txs( } } +fn do_tx_transfers( + exit_signal: &Arc, + shared_txs: &Arc>>>, + leader: &NodeInfo, + shared_tx_thread_count: &Arc, +) { + let client = mk_client(&leader); + loop { + let txs; + { + let mut shared_txs_wl = shared_txs.write().unwrap(); + txs = shared_txs_wl.pop_front(); + } + if let Some(txs0) = txs { + shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); + println!( + "Transferring 1 unit {} times... to {}", + txs0.len(), + leader.contact_info.tpu + ); + let tx_len = txs0.len(); + let transfer_start = Instant::now(); + for tx in txs0 { + client.transfer_signed(&tx).unwrap(); + } + shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); + println!( + "Tx send done. {} ms {} tps", + duration_as_ms(&transfer_start.elapsed()), + tx_len as f32 / duration_as_s(&transfer_start.elapsed()), + ); + } + if exit_signal.load(Ordering::Relaxed) { + break; + } + } +} + fn main() { env_logger::init(); set_panic_hook("bench-tps"); @@ -162,6 +200,8 @@ fn main() { let mut num_nodes = 1usize; let mut time_sec = 90; let mut addr = None; + let mut sustained = false; + let mut tx_count = 500_000; let matches = App::new("solana-bench-tps") .arg( @@ -218,6 +258,18 @@ fn main() { .takes_value(true) .help("address to advertise to the network"), ) + .arg( + Arg::with_name("sustained") + .long("sustained") + .help("Use sustained performance mode vs. peak mode. This overlaps the tx generation with transfers."), + ) + .arg( + Arg::with_name("tx_count") + .long("tx_count") + .value_name("NUMBER") + .takes_value(true) + .help("number of transactions to send in a single batch") + ) .get_matches(); let leader: NodeInfo; @@ -246,6 +298,14 @@ fn main() { addr = Some(s.to_string()); } + if let Some(s) = matches.value_of("tx_count") { + tx_count = s.to_string().parse().expect("integer"); + } + + if matches.is_present("sustained") { + sustained = true; + } + let mut drone_addr = leader.contact_info.tpu; drone_addr.set_port(DRONE_PORT); @@ -280,10 +340,9 @@ fn main() { let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap(); println!("Token balance: {}", starting_balance); - let txs: i64 = 500_000; - if starting_balance < txs { - let airdrop_amount = txs - starting_balance; + if starting_balance < tx_count { + let airdrop_amount = tx_count - starting_balance; println!( "Airdropping {:?} tokens from {}", airdrop_amount, drone_addr @@ -318,8 +377,8 @@ fn main() { seed.copy_from_slice(&id.public_key_bytes()[..32]); let rnd = GenKeys::new(seed); - println!("Creating {} keypairs...", txs / 2); - let keypairs = rnd.gen_n_keypairs(txs / 2); + println!("Creating {} keypairs...", tx_count / 2); + let keypairs = rnd.gen_n_keypairs(tx_count / 2); let first_tx_count = client.transaction_count(); println!("Initial transaction count {}", first_tx_count); @@ -346,44 +405,23 @@ fn main() { let shared_txs: Arc>>> = Arc::new(RwLock::new(VecDeque::new())); + let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); + let s_threads: Vec<_> = (0..threads) .map(|_| { let exit_signal = exit_signal.clone(); let shared_txs = shared_txs.clone(); let leader = leader.clone(); + let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); Builder::new() .name("solana-client-sender".to_string()) .spawn(move || { - let client = mk_client(&leader); - loop { - let mut txs = None; - { - let mut shared_txs_wl = shared_txs.write().unwrap(); - if shared_txs_wl.len() > 0 { - txs = shared_txs_wl.pop_front(); - } - } - if let Some(txs0) = txs { - println!( - "Transferring 1 unit {} times... to {}", - txs0.len(), - leader.contact_info.tpu - ); - let tx_len = txs0.len(); - let transfer_start = Instant::now(); - for tx in txs0 { - client.transfer_signed(&tx).unwrap(); - } - println!( - "Tx send done. {} ms {} tps", - duration_as_ms(&transfer_start.elapsed()), - tx_len as f32 / duration_as_s(&transfer_start.elapsed()), - ); - } - if exit_signal.load(Ordering::Relaxed) { - break; - } - } + do_tx_transfers( + &exit_signal, + &shared_txs, + &leader, + &shared_tx_active_thread_count, + ); }) .unwrap() }) @@ -405,12 +443,21 @@ fn main() { &shared_txs, &id, &keypairs, - txs, + tx_count, &mut last_id, threads, reclaim_tokens_back_to_source_account, ); reclaim_tokens_back_to_source_account = !reclaim_tokens_back_to_source_account; + + // In sustained mode overlap the transfers with generation + // this has higher average performance but lower peak performance + // in tested environments. + if !sustained { + while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 { + sleep(Duration::from_millis(100)); + } + } } // Stop the sampling threads so it will collect the stats