diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 8626f7b138..e8b5124598 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -10,11 +10,13 @@ use atty::{is, Stream}; use getopts::Options; use rayon::prelude::*; use solana::crdt::{get_ip_addr, Crdt, ReplicatedData}; +use solana::hash::Hash; use solana::mint::MintDemo; use solana::ncp::Ncp; use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; use solana::streamer::default_window; use solana::thin_client::ThinClient; +use solana::timing::{duration_as_ms, duration_as_s}; use solana::transaction::Transaction; use std::env; use std::fs::File; @@ -24,6 +26,7 @@ use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::sleep; +use std::thread::Builder; use std::thread::JoinHandle; use std::time::Duration; use std::time::Instant; @@ -37,16 +40,118 @@ fn print_usage(program: &str, opts: Options) { print!("{}", opts.usage(&brief)); } +fn sample_tx_count( + thread_addr: Arc>, + exit: Arc, + maxes: Arc>>, + first_count: u64, + v: ReplicatedData, + sample_period: u64, +) { + let mut client = mk_client(&thread_addr, &v); + let mut now = Instant::now(); + let mut initial_tx_count = client.transaction_count(); + let mut max_tps = 0.0; + let mut total; + loop { + let tx_count = client.transaction_count(); + let duration = now.elapsed(); + now = Instant::now(); + let sample = tx_count - initial_tx_count; + initial_tx_count = tx_count; + println!("{}: Transactions processed {}", v.transactions_addr, sample); + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); + let tps = (sample * 1_000_000_000) as f64 / ns as f64; + if tps > max_tps { + max_tps = tps; + } + println!("{}: {} tps", v.transactions_addr, tps); + total = tx_count - first_count; + println!( + "{}: Total Transactions processed {}", + v.transactions_addr, total + ); + sleep(Duration::new(sample_period, 0)); + + if exit.load(Ordering::Relaxed) { + println!("exiting validator thread"); + maxes.write().unwrap().push((max_tps, total)); + break; + } + } +} + +fn generate_and_send_txs( + client: &mut ThinClient, + keypair_pairs: &Vec<&[KeyPair]>, + leader: &ReplicatedData, + txs: i64, + last_id: &mut Hash, + threads: usize, + client_addr: Arc>, +) { + println!( + "Signing transactions... {} {}", + keypair_pairs.len(), + keypair_pairs[0].len() + ); + let signing_start = Instant::now(); + let transactions: Vec<_> = keypair_pairs + .par_iter() + .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, *last_id)) + .collect(); + + let duration = signing_start.elapsed(); + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); + let bsps = txs as f64 / ns as f64; + let nsps = ns as f64 / txs as f64; + println!( + "Done. {} thousand signatures per second, {}us per signature", + bsps * 1_000_000_f64, + nsps / 1_000_f64 + ); + + println!("Transfering {} transactions in {} batches", txs, threads); + let transfer_start = Instant::now(); + let sz = transactions.len() / threads; + let chunks: Vec<_> = transactions.chunks(sz).collect(); + chunks.into_par_iter().for_each(|txs| { + println!( + "Transferring 1 unit {} times... to {:?}", + txs.len(), + leader.transactions_addr + ); + let client = mk_client(&client_addr, &leader); + for tx in txs { + client.transfer_signed(tx.clone()).unwrap(); + } + }); + println!( + "Transfer done. {:?} ms {} tps", + duration_as_ms(&transfer_start.elapsed()), + txs as f32 / (duration_as_s(&transfer_start.elapsed())) + ); + + *last_id = client.get_last_id(); +} + fn main() { env_logger::init(); let mut threads = 4usize; let mut num_nodes = 1usize; + let mut time_sec = 60; let mut opts = Options::new(); opts.optopt("l", "", "leader", "leader.json"); opts.optopt("c", "", "client port", "port"); opts.optopt("t", "", "number of threads", &format!("{}", threads)); opts.optflag("d", "dyn", "detect network address dynamically"); + opts.optopt( + "s", + "", + "send transactions for this many seconds", + &format!("{}", time_sec), + ); opts.optopt( "n", "", @@ -83,6 +188,9 @@ fn main() { if matches.opt_present("n") { num_nodes = matches.opt_str("n").unwrap().parse().expect("integer"); } + if matches.opt_present("s") { + time_sec = matches.opt_str("s").unwrap().parse().expect("integer"); + } let leader = if matches.opt_present("l") { read_leader(matches.opt_str("l").unwrap()) @@ -122,7 +230,7 @@ fn main() { let mut client = mk_client(&client_addr, &leader); println!("Get last ID..."); - let last_id = client.get_last_id(); + let mut last_id = client.get_last_id(); println!("Got last ID {:?}", last_id); let mut seed = [0u8; 32]; @@ -134,85 +242,50 @@ fn main() { let keypairs = rnd.gen_n_keypairs(demo.num_accounts); let keypair_pairs: Vec<_> = keypairs.chunks(2).collect(); - println!("Signing transactions..."); - let now = Instant::now(); - let transactions: Vec<_> = keypair_pairs - .into_par_iter() - .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id)) - .collect(); - let duration = now.elapsed(); - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let bsps = txs as f64 / ns as f64; - let nsps = ns as f64 / txs as f64; - println!( - "Done. {} thousand signatures per second, {}us per signature", - bsps * 1_000_000_f64, - nsps / 1_000_f64 - ); - let first_count = client.transaction_count(); println!("initial count {}", first_count); - println!("Transfering {} transactions in {} batches", txs, threads); - let sz = transactions.len() / threads; - let chunks: Vec<_> = transactions.chunks(sz).collect(); - chunks.into_par_iter().for_each(|txs| { - println!( - "Transferring 1 unit {} times... to {:?}", - txs.len(), - leader.transactions_addr - ); - let client = mk_client(&client_addr, &leader); - for tx in txs { - client.transfer_signed(tx.clone()).unwrap(); - } - }); - - let sample_period = 1; // in seconds println!("Sampling tps every second...",); - let maxes: Vec<_> = validators - .into_par_iter() - .map(|val| { - let mut client = mk_client(&client_addr, &val); - let mut now = Instant::now(); - let mut initial_tx_count = client.transaction_count(); - let mut max_tps = 0.0; - let mut total = 0; - for i in 0..100 { - let tx_count = client.transaction_count(); - let duration = now.elapsed(); - now = Instant::now(); - let sample = tx_count - initial_tx_count; - initial_tx_count = tx_count; - println!( - "{}: Transactions processed {}", - val.transactions_addr, sample - ); - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let tps = (sample * 1_000_000_000) as f64 / ns as f64; - if tps > max_tps { - max_tps = tps; - } - println!("{}: {} tps", val.transactions_addr, tps); - total = tx_count - first_count; - println!( - "{}: Total Transactions processed {}", - val.transactions_addr, total - ); - if total == transactions.len() as u64 { - break; - } - if i > 20 && sample == 0 { - break; - } - sleep(Duration::new(sample_period, 0)); - } - (max_tps, total) + + let maxes = Arc::new(RwLock::new(Vec::new())); + let sample_period = 1; // in seconds + let v_threads: Vec<_> = validators + .into_iter() + .map(|v| { + let exit = signal.clone(); + let thread_addr = client_addr.clone(); + let maxes = maxes.clone(); + Builder::new() + .name("solana-client-sample".to_string()) + .spawn(move || { + sample_tx_count(thread_addr, exit, maxes, first_count, v, sample_period); + }) + .unwrap() }) .collect(); + + let time = Duration::new(time_sec, 0); + let now = Instant::now(); + while now.elapsed() < time { + generate_and_send_txs( + &mut client, + &keypair_pairs, + &leader, + txs, + &mut last_id, + threads, + client_addr.clone(), + ); + } + + signal.store(true, Ordering::Relaxed); + for t in v_threads { + t.join().unwrap(); + } + let mut max_of_maxes = 0.0; let mut total_txs = 0; - for (max, txs) in &maxes { + for (max, txs) in maxes.read().unwrap().iter() { if *max > max_of_maxes { max_of_maxes = *max; } @@ -223,9 +296,9 @@ fn main() { max_of_maxes, sample_period, total_txs, - maxes.len() + maxes.read().unwrap().len() ); - signal.store(true, Ordering::Relaxed); + for t in c_threads { t.join().unwrap(); } @@ -237,6 +310,10 @@ fn mk_client(locked_addr: &Arc>, r: &ReplicatedData) -> ThinC let transactions_socket = UdpSocket::bind(addr.clone()).unwrap(); addr.set_port(port + 1); let requests_socket = UdpSocket::bind(addr.clone()).unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + addr.set_port(port + 2); ThinClient::new( r.requests_addr, diff --git a/src/thin_client.rs b/src/thin_client.rs index 58174ab732..bb8bd6f83e 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -121,17 +121,19 @@ impl ThinClient { let req = Request::GetTransactionCount; let data = serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count"); - self.requests_socket - .send_to(&data, &self.requests_addr) - .expect("buffer error in pub fn transaction_count"); let mut done = false; while !done { - let resp = self.recv_response().expect("transaction count dropped"); - info!("recv_response {:?}", resp); - if let &Response::TransactionCount { .. } = &resp { - done = true; + self.requests_socket + .send_to(&data, &self.requests_addr) + .expect("buffer error in pub fn transaction_count"); + + if let Ok(resp) = self.recv_response() { + info!("recv_response {:?}", resp); + if let &Response::TransactionCount { .. } = &resp { + done = true; + } + self.process_response(resp); } - self.process_response(resp); } self.transaction_count } @@ -142,16 +144,18 @@ impl ThinClient { info!("get_last_id"); let req = Request::GetLastId; let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id"); - self.requests_socket - .send_to(&data, &self.requests_addr) - .expect("buffer error in pub fn get_last_id"); let mut done = false; while !done { - let resp = self.recv_response().expect("get_last_id response"); - if let &Response::LastId { .. } = &resp { - done = true; + self.requests_socket + .send_to(&data, &self.requests_addr) + .expect("buffer error in pub fn get_last_id"); + + if let Ok(resp) = self.recv_response() { + if let &Response::LastId { .. } = &resp { + done = true; + } + self.process_response(resp); } - self.process_response(resp); } self.last_id.expect("some last_id") }