From 6c275ea5efcb777aa5c46a7e343e0a5d725838d3 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Wed, 25 Jul 2018 09:00:55 -0700 Subject: [PATCH] More knobs. Arg for tx count per batch and also sustained mode sustained mode overlaps tx generation with transfer. This mode seems to have lower peak performance but higher average performance --- src/bin/bench-tps.rs | 121 ++++++++++++++++++++++++++++++------------- 1 file changed, 84 insertions(+), 37 deletions(-) diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index ef38e55fa6..c7e08083e1 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -27,7 +27,7 @@ use std::fs::File; use std::io::Write; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream, UdpSocket}; use std::process::exit; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::thread::Builder; @@ -155,6 +155,44 @@ fn generate_txs( } } +fn do_tx_transfers( + exit_signal: &Arc, + shared_txs: &Arc>>>, + leader: &NodeInfo, + shared_tx_thread_count: &Arc, +) { + let client = mk_client(&leader); + loop { + let txs; + { + let mut shared_txs_wl = shared_txs.write().unwrap(); + txs = shared_txs_wl.pop_front(); + } + if let Some(txs0) = txs { + shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); + println!( + "Transferring 1 unit {} times... to {}", + txs0.len(), + leader.contact_info.tpu + ); + let tx_len = txs0.len(); + let transfer_start = Instant::now(); + for tx in txs0 { + client.transfer_signed(&tx).unwrap(); + } + shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); + println!( + "Tx send done. {} ms {} tps", + duration_as_ms(&transfer_start.elapsed()), + tx_len as f32 / duration_as_s(&transfer_start.elapsed()), + ); + } + if exit_signal.load(Ordering::Relaxed) { + break; + } + } +} + fn main() { env_logger::init(); set_panic_hook("bench-tps"); @@ -162,6 +200,8 @@ fn main() { let mut num_nodes = 1usize; let mut time_sec = 90; let mut addr = None; + let mut sustained = false; + let mut tx_count = 500_000; let matches = App::new("solana-bench-tps") .arg( @@ -218,6 +258,18 @@ fn main() { .takes_value(true) .help("address to advertise to the network"), ) + .arg( + Arg::with_name("sustained") + .long("sustained") + .help("Use sustained performance mode vs. peak mode. This overlaps the tx generation with transfers."), + ) + .arg( + Arg::with_name("tx_count") + .long("tx_count") + .value_name("NUMBER") + .takes_value(true) + .help("number of transactions to send in a single batch") + ) .get_matches(); let leader: NodeInfo; @@ -246,6 +298,14 @@ fn main() { addr = Some(s.to_string()); } + if let Some(s) = matches.value_of("tx_count") { + tx_count = s.to_string().parse().expect("integer"); + } + + if matches.is_present("sustained") { + sustained = true; + } + let mut drone_addr = leader.contact_info.tpu; drone_addr.set_port(DRONE_PORT); @@ -280,10 +340,9 @@ fn main() { let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap(); println!("Token balance: {}", starting_balance); - let txs: i64 = 500_000; - if starting_balance < txs { - let airdrop_amount = txs - starting_balance; + if starting_balance < tx_count { + let airdrop_amount = tx_count - starting_balance; println!( "Airdropping {:?} tokens from {}", airdrop_amount, drone_addr @@ -318,8 +377,8 @@ fn main() { seed.copy_from_slice(&id.public_key_bytes()[..32]); let rnd = GenKeys::new(seed); - println!("Creating {} keypairs...", txs / 2); - let keypairs = rnd.gen_n_keypairs(txs / 2); + println!("Creating {} keypairs...", tx_count / 2); + let keypairs = rnd.gen_n_keypairs(tx_count / 2); let first_tx_count = client.transaction_count(); println!("Initial transaction count {}", first_tx_count); @@ -346,44 +405,23 @@ fn main() { let shared_txs: Arc>>> = Arc::new(RwLock::new(VecDeque::new())); + let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); + let s_threads: Vec<_> = (0..threads) .map(|_| { let exit_signal = exit_signal.clone(); let shared_txs = shared_txs.clone(); let leader = leader.clone(); + let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); Builder::new() .name("solana-client-sender".to_string()) .spawn(move || { - let client = mk_client(&leader); - loop { - let mut txs = None; - { - let mut shared_txs_wl = shared_txs.write().unwrap(); - if shared_txs_wl.len() > 0 { - txs = shared_txs_wl.pop_front(); - } - } - if let Some(txs0) = txs { - println!( - "Transferring 1 unit {} times... to {}", - txs0.len(), - leader.contact_info.tpu - ); - let tx_len = txs0.len(); - let transfer_start = Instant::now(); - for tx in txs0 { - client.transfer_signed(&tx).unwrap(); - } - println!( - "Tx send done. {} ms {} tps", - duration_as_ms(&transfer_start.elapsed()), - tx_len as f32 / duration_as_s(&transfer_start.elapsed()), - ); - } - if exit_signal.load(Ordering::Relaxed) { - break; - } - } + do_tx_transfers( + &exit_signal, + &shared_txs, + &leader, + &shared_tx_active_thread_count, + ); }) .unwrap() }) @@ -405,12 +443,21 @@ fn main() { &shared_txs, &id, &keypairs, - txs, + tx_count, &mut last_id, threads, reclaim_tokens_back_to_source_account, ); reclaim_tokens_back_to_source_account = !reclaim_tokens_back_to_source_account; + + // In sustained mode overlap the transfers with generation + // this has higher average performance but lower peak performance + // in tested environments. + if !sustained { + while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 { + sleep(Duration::from_millis(100)); + } + } } // Stop the sampling threads so it will collect the stats