From 23ed65b339c485e2432e87bbfa5e899d213df348 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Mon, 23 Jul 2018 14:26:16 -0700 Subject: [PATCH] Transfer and sign at the same time in bench-tps --- src/bin/bench-tps.rs | 92 +++++++++++++++++++++++++++++--------------- 1 file changed, 61 insertions(+), 31 deletions(-) diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index adadf8c742..ef38e55fa6 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -21,6 +21,7 @@ use solana::streamer::default_window; use solana::thin_client::ThinClient; use solana::timing::{duration_as_ms, duration_as_s}; use solana::transaction::Transaction; +use std::collections::VecDeque; use std::error; use std::fs::File; use std::io::Write; @@ -88,12 +89,11 @@ fn sample_tx_count( } } -fn generate_and_send_txs( +fn generate_txs( client: &mut ThinClient, - tx_clients: &[ThinClient], + shared_txs: &Arc>>>, id: &KeyPair, keypairs: &[KeyPair], - leader: &NodeInfo, txs: i64, last_id: &mut Hash, threads: usize, @@ -125,32 +125,14 @@ fn generate_and_send_txs( duration_as_ms(&duration), ); - println!( - "Transferring {} transactions in {} batches", - txs / 2, - threads - ); - let transfer_start = Instant::now(); let sz = transactions.len() / threads; let chunks: Vec<_> = transactions.chunks(sz).collect(); - chunks - .into_par_iter() - .zip(tx_clients) - .for_each(|(txs, client)| { - println!( - "Transferring 1 unit {} times... to {}", - txs.len(), - leader.contact_info.tpu - ); - for tx in txs { - client.transfer_signed(tx).unwrap(); - } - }); - println!( - "Transfer done. {:?} ms {} tps", - duration_as_ms(&transfer_start.elapsed()), - txs as f32 / (duration_as_s(&transfer_start.elapsed())) - ); + { + let mut shared_txs_wl = shared_txs.write().unwrap(); + for chunk in chunks { + shared_txs_wl.push_back(chunk.to_vec()); + } + } let mut found_new_last_id = false; // try for ~5s to get a new last_id @@ -361,7 +343,51 @@ fn main() { }) .collect(); - let clients: Vec<_> = (0..threads).map(|_| mk_client(&leader)).collect(); + let shared_txs: Arc>>> = + Arc::new(RwLock::new(VecDeque::new())); + + let s_threads: Vec<_> = (0..threads) + .map(|_| { + let exit_signal = exit_signal.clone(); + let shared_txs = shared_txs.clone(); + let leader = leader.clone(); + Builder::new() + .name("solana-client-sender".to_string()) + .spawn(move || { + let client = mk_client(&leader); + loop { + let mut txs = None; + { + let mut shared_txs_wl = shared_txs.write().unwrap(); + if shared_txs_wl.len() > 0 { + txs = shared_txs_wl.pop_front(); + } + } + if let Some(txs0) = txs { + println!( + "Transferring 1 unit {} times... to {}", + txs0.len(), + leader.contact_info.tpu + ); + let tx_len = txs0.len(); + let transfer_start = Instant::now(); + for tx in txs0 { + client.transfer_signed(&tx).unwrap(); + } + println!( + "Tx send done. {} ms {} tps", + duration_as_ms(&transfer_start.elapsed()), + tx_len as f32 / duration_as_s(&transfer_start.elapsed()), + ); + } + if exit_signal.load(Ordering::Relaxed) { + break; + } + } + }) + .unwrap() + }) + .collect(); // generate and send transactions for the specified duration let time = Duration::new(time_sec, 0); @@ -374,12 +400,11 @@ fn main() { // ping-pong between source and destination accounts for each loop iteration // this seems to be faster than trying to determine the balance of individual // accounts - generate_and_send_txs( + generate_txs( &mut client, - &clients, + &shared_txs, &id, &keypairs, - &leader, txs, &mut last_id, threads, @@ -394,6 +419,11 @@ fn main() { t.join().unwrap(); } + // join the tx send threads + for t in s_threads { + t.join().unwrap(); + } + let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(-1); println!("Token balance: {}", balance);