Change client-demo to run continuosly for some amount of time
Also retry for get_last_id/transaction_count if dropped.
This commit is contained in:
committed by
Greg Fitzgerald
parent
5f1d8c95eb
commit
af6a07697a
@ -10,11 +10,13 @@ use atty::{is, Stream};
|
|||||||
use getopts::Options;
|
use getopts::Options;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use solana::crdt::{get_ip_addr, Crdt, ReplicatedData};
|
use solana::crdt::{get_ip_addr, Crdt, ReplicatedData};
|
||||||
|
use solana::hash::Hash;
|
||||||
use solana::mint::MintDemo;
|
use solana::mint::MintDemo;
|
||||||
use solana::ncp::Ncp;
|
use solana::ncp::Ncp;
|
||||||
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
|
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
|
||||||
use solana::streamer::default_window;
|
use solana::streamer::default_window;
|
||||||
use solana::thin_client::ThinClient;
|
use solana::thin_client::ThinClient;
|
||||||
|
use solana::timing::{duration_as_ms, duration_as_s};
|
||||||
use solana::transaction::Transaction;
|
use solana::transaction::Transaction;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
@ -24,6 +26,7 @@ use std::process::exit;
|
|||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
|
use std::thread::Builder;
|
||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
@ -37,16 +40,118 @@ fn print_usage(program: &str, opts: Options) {
|
|||||||
print!("{}", opts.usage(&brief));
|
print!("{}", opts.usage(&brief));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn sample_tx_count(
|
||||||
|
thread_addr: Arc<RwLock<SocketAddr>>,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
maxes: Arc<RwLock<Vec<(f64, u64)>>>,
|
||||||
|
first_count: u64,
|
||||||
|
v: ReplicatedData,
|
||||||
|
sample_period: u64,
|
||||||
|
) {
|
||||||
|
let mut client = mk_client(&thread_addr, &v);
|
||||||
|
let mut now = Instant::now();
|
||||||
|
let mut initial_tx_count = client.transaction_count();
|
||||||
|
let mut max_tps = 0.0;
|
||||||
|
let mut total;
|
||||||
|
loop {
|
||||||
|
let tx_count = client.transaction_count();
|
||||||
|
let duration = now.elapsed();
|
||||||
|
now = Instant::now();
|
||||||
|
let sample = tx_count - initial_tx_count;
|
||||||
|
initial_tx_count = tx_count;
|
||||||
|
println!("{}: Transactions processed {}", v.transactions_addr, sample);
|
||||||
|
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
|
||||||
|
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
|
||||||
|
if tps > max_tps {
|
||||||
|
max_tps = tps;
|
||||||
|
}
|
||||||
|
println!("{}: {} tps", v.transactions_addr, tps);
|
||||||
|
total = tx_count - first_count;
|
||||||
|
println!(
|
||||||
|
"{}: Total Transactions processed {}",
|
||||||
|
v.transactions_addr, total
|
||||||
|
);
|
||||||
|
sleep(Duration::new(sample_period, 0));
|
||||||
|
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
println!("exiting validator thread");
|
||||||
|
maxes.write().unwrap().push((max_tps, total));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn generate_and_send_txs(
|
||||||
|
client: &mut ThinClient,
|
||||||
|
keypair_pairs: &Vec<&[KeyPair]>,
|
||||||
|
leader: &ReplicatedData,
|
||||||
|
txs: i64,
|
||||||
|
last_id: &mut Hash,
|
||||||
|
threads: usize,
|
||||||
|
client_addr: Arc<RwLock<SocketAddr>>,
|
||||||
|
) {
|
||||||
|
println!(
|
||||||
|
"Signing transactions... {} {}",
|
||||||
|
keypair_pairs.len(),
|
||||||
|
keypair_pairs[0].len()
|
||||||
|
);
|
||||||
|
let signing_start = Instant::now();
|
||||||
|
let transactions: Vec<_> = keypair_pairs
|
||||||
|
.par_iter()
|
||||||
|
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, *last_id))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let duration = signing_start.elapsed();
|
||||||
|
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
|
||||||
|
let bsps = txs as f64 / ns as f64;
|
||||||
|
let nsps = ns as f64 / txs as f64;
|
||||||
|
println!(
|
||||||
|
"Done. {} thousand signatures per second, {}us per signature",
|
||||||
|
bsps * 1_000_000_f64,
|
||||||
|
nsps / 1_000_f64
|
||||||
|
);
|
||||||
|
|
||||||
|
println!("Transfering {} transactions in {} batches", txs, threads);
|
||||||
|
let transfer_start = Instant::now();
|
||||||
|
let sz = transactions.len() / threads;
|
||||||
|
let chunks: Vec<_> = transactions.chunks(sz).collect();
|
||||||
|
chunks.into_par_iter().for_each(|txs| {
|
||||||
|
println!(
|
||||||
|
"Transferring 1 unit {} times... to {:?}",
|
||||||
|
txs.len(),
|
||||||
|
leader.transactions_addr
|
||||||
|
);
|
||||||
|
let client = mk_client(&client_addr, &leader);
|
||||||
|
for tx in txs {
|
||||||
|
client.transfer_signed(tx.clone()).unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
println!(
|
||||||
|
"Transfer done. {:?} ms {} tps",
|
||||||
|
duration_as_ms(&transfer_start.elapsed()),
|
||||||
|
txs as f32 / (duration_as_s(&transfer_start.elapsed()))
|
||||||
|
);
|
||||||
|
|
||||||
|
*last_id = client.get_last_id();
|
||||||
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
let mut threads = 4usize;
|
let mut threads = 4usize;
|
||||||
let mut num_nodes = 1usize;
|
let mut num_nodes = 1usize;
|
||||||
|
let mut time_sec = 60;
|
||||||
|
|
||||||
let mut opts = Options::new();
|
let mut opts = Options::new();
|
||||||
opts.optopt("l", "", "leader", "leader.json");
|
opts.optopt("l", "", "leader", "leader.json");
|
||||||
opts.optopt("c", "", "client port", "port");
|
opts.optopt("c", "", "client port", "port");
|
||||||
opts.optopt("t", "", "number of threads", &format!("{}", threads));
|
opts.optopt("t", "", "number of threads", &format!("{}", threads));
|
||||||
opts.optflag("d", "dyn", "detect network address dynamically");
|
opts.optflag("d", "dyn", "detect network address dynamically");
|
||||||
|
opts.optopt(
|
||||||
|
"s",
|
||||||
|
"",
|
||||||
|
"send transactions for this many seconds",
|
||||||
|
&format!("{}", time_sec),
|
||||||
|
);
|
||||||
opts.optopt(
|
opts.optopt(
|
||||||
"n",
|
"n",
|
||||||
"",
|
"",
|
||||||
@ -83,6 +188,9 @@ fn main() {
|
|||||||
if matches.opt_present("n") {
|
if matches.opt_present("n") {
|
||||||
num_nodes = matches.opt_str("n").unwrap().parse().expect("integer");
|
num_nodes = matches.opt_str("n").unwrap().parse().expect("integer");
|
||||||
}
|
}
|
||||||
|
if matches.opt_present("s") {
|
||||||
|
time_sec = matches.opt_str("s").unwrap().parse().expect("integer");
|
||||||
|
}
|
||||||
|
|
||||||
let leader = if matches.opt_present("l") {
|
let leader = if matches.opt_present("l") {
|
||||||
read_leader(matches.opt_str("l").unwrap())
|
read_leader(matches.opt_str("l").unwrap())
|
||||||
@ -122,7 +230,7 @@ fn main() {
|
|||||||
let mut client = mk_client(&client_addr, &leader);
|
let mut client = mk_client(&client_addr, &leader);
|
||||||
|
|
||||||
println!("Get last ID...");
|
println!("Get last ID...");
|
||||||
let last_id = client.get_last_id();
|
let mut last_id = client.get_last_id();
|
||||||
println!("Got last ID {:?}", last_id);
|
println!("Got last ID {:?}", last_id);
|
||||||
|
|
||||||
let mut seed = [0u8; 32];
|
let mut seed = [0u8; 32];
|
||||||
@ -134,85 +242,50 @@ fn main() {
|
|||||||
let keypairs = rnd.gen_n_keypairs(demo.num_accounts);
|
let keypairs = rnd.gen_n_keypairs(demo.num_accounts);
|
||||||
let keypair_pairs: Vec<_> = keypairs.chunks(2).collect();
|
let keypair_pairs: Vec<_> = keypairs.chunks(2).collect();
|
||||||
|
|
||||||
println!("Signing transactions...");
|
|
||||||
let now = Instant::now();
|
|
||||||
let transactions: Vec<_> = keypair_pairs
|
|
||||||
.into_par_iter()
|
|
||||||
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id))
|
|
||||||
.collect();
|
|
||||||
let duration = now.elapsed();
|
|
||||||
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
|
|
||||||
let bsps = txs as f64 / ns as f64;
|
|
||||||
let nsps = ns as f64 / txs as f64;
|
|
||||||
println!(
|
|
||||||
"Done. {} thousand signatures per second, {}us per signature",
|
|
||||||
bsps * 1_000_000_f64,
|
|
||||||
nsps / 1_000_f64
|
|
||||||
);
|
|
||||||
|
|
||||||
let first_count = client.transaction_count();
|
let first_count = client.transaction_count();
|
||||||
println!("initial count {}", first_count);
|
println!("initial count {}", first_count);
|
||||||
|
|
||||||
println!("Transfering {} transactions in {} batches", txs, threads);
|
|
||||||
let sz = transactions.len() / threads;
|
|
||||||
let chunks: Vec<_> = transactions.chunks(sz).collect();
|
|
||||||
chunks.into_par_iter().for_each(|txs| {
|
|
||||||
println!(
|
|
||||||
"Transferring 1 unit {} times... to {:?}",
|
|
||||||
txs.len(),
|
|
||||||
leader.transactions_addr
|
|
||||||
);
|
|
||||||
let client = mk_client(&client_addr, &leader);
|
|
||||||
for tx in txs {
|
|
||||||
client.transfer_signed(tx.clone()).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let sample_period = 1; // in seconds
|
|
||||||
println!("Sampling tps every second...",);
|
println!("Sampling tps every second...",);
|
||||||
let maxes: Vec<_> = validators
|
|
||||||
.into_par_iter()
|
let maxes = Arc::new(RwLock::new(Vec::new()));
|
||||||
.map(|val| {
|
let sample_period = 1; // in seconds
|
||||||
let mut client = mk_client(&client_addr, &val);
|
let v_threads: Vec<_> = validators
|
||||||
let mut now = Instant::now();
|
.into_iter()
|
||||||
let mut initial_tx_count = client.transaction_count();
|
.map(|v| {
|
||||||
let mut max_tps = 0.0;
|
let exit = signal.clone();
|
||||||
let mut total = 0;
|
let thread_addr = client_addr.clone();
|
||||||
for i in 0..100 {
|
let maxes = maxes.clone();
|
||||||
let tx_count = client.transaction_count();
|
Builder::new()
|
||||||
let duration = now.elapsed();
|
.name("solana-client-sample".to_string())
|
||||||
now = Instant::now();
|
.spawn(move || {
|
||||||
let sample = tx_count - initial_tx_count;
|
sample_tx_count(thread_addr, exit, maxes, first_count, v, sample_period);
|
||||||
initial_tx_count = tx_count;
|
})
|
||||||
println!(
|
.unwrap()
|
||||||
"{}: Transactions processed {}",
|
|
||||||
val.transactions_addr, sample
|
|
||||||
);
|
|
||||||
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
|
|
||||||
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
|
|
||||||
if tps > max_tps {
|
|
||||||
max_tps = tps;
|
|
||||||
}
|
|
||||||
println!("{}: {} tps", val.transactions_addr, tps);
|
|
||||||
total = tx_count - first_count;
|
|
||||||
println!(
|
|
||||||
"{}: Total Transactions processed {}",
|
|
||||||
val.transactions_addr, total
|
|
||||||
);
|
|
||||||
if total == transactions.len() as u64 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if i > 20 && sample == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
sleep(Duration::new(sample_period, 0));
|
|
||||||
}
|
|
||||||
(max_tps, total)
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let time = Duration::new(time_sec, 0);
|
||||||
|
let now = Instant::now();
|
||||||
|
while now.elapsed() < time {
|
||||||
|
generate_and_send_txs(
|
||||||
|
&mut client,
|
||||||
|
&keypair_pairs,
|
||||||
|
&leader,
|
||||||
|
txs,
|
||||||
|
&mut last_id,
|
||||||
|
threads,
|
||||||
|
client_addr.clone(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
signal.store(true, Ordering::Relaxed);
|
||||||
|
for t in v_threads {
|
||||||
|
t.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
let mut max_of_maxes = 0.0;
|
let mut max_of_maxes = 0.0;
|
||||||
let mut total_txs = 0;
|
let mut total_txs = 0;
|
||||||
for (max, txs) in &maxes {
|
for (max, txs) in maxes.read().unwrap().iter() {
|
||||||
if *max > max_of_maxes {
|
if *max > max_of_maxes {
|
||||||
max_of_maxes = *max;
|
max_of_maxes = *max;
|
||||||
}
|
}
|
||||||
@ -223,9 +296,9 @@ fn main() {
|
|||||||
max_of_maxes,
|
max_of_maxes,
|
||||||
sample_period,
|
sample_period,
|
||||||
total_txs,
|
total_txs,
|
||||||
maxes.len()
|
maxes.read().unwrap().len()
|
||||||
);
|
);
|
||||||
signal.store(true, Ordering::Relaxed);
|
|
||||||
for t in c_threads {
|
for t in c_threads {
|
||||||
t.join().unwrap();
|
t.join().unwrap();
|
||||||
}
|
}
|
||||||
@ -237,6 +310,10 @@ fn mk_client(locked_addr: &Arc<RwLock<SocketAddr>>, r: &ReplicatedData) -> ThinC
|
|||||||
let transactions_socket = UdpSocket::bind(addr.clone()).unwrap();
|
let transactions_socket = UdpSocket::bind(addr.clone()).unwrap();
|
||||||
addr.set_port(port + 1);
|
addr.set_port(port + 1);
|
||||||
let requests_socket = UdpSocket::bind(addr.clone()).unwrap();
|
let requests_socket = UdpSocket::bind(addr.clone()).unwrap();
|
||||||
|
requests_socket
|
||||||
|
.set_read_timeout(Some(Duration::new(1, 0)))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
addr.set_port(port + 2);
|
addr.set_port(port + 2);
|
||||||
ThinClient::new(
|
ThinClient::new(
|
||||||
r.requests_addr,
|
r.requests_addr,
|
||||||
|
@ -121,18 +121,20 @@ impl ThinClient {
|
|||||||
let req = Request::GetTransactionCount;
|
let req = Request::GetTransactionCount;
|
||||||
let data =
|
let data =
|
||||||
serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count");
|
serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count");
|
||||||
|
let mut done = false;
|
||||||
|
while !done {
|
||||||
self.requests_socket
|
self.requests_socket
|
||||||
.send_to(&data, &self.requests_addr)
|
.send_to(&data, &self.requests_addr)
|
||||||
.expect("buffer error in pub fn transaction_count");
|
.expect("buffer error in pub fn transaction_count");
|
||||||
let mut done = false;
|
|
||||||
while !done {
|
if let Ok(resp) = self.recv_response() {
|
||||||
let resp = self.recv_response().expect("transaction count dropped");
|
|
||||||
info!("recv_response {:?}", resp);
|
info!("recv_response {:?}", resp);
|
||||||
if let &Response::TransactionCount { .. } = &resp {
|
if let &Response::TransactionCount { .. } = &resp {
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
self.process_response(resp);
|
self.process_response(resp);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
self.transaction_count
|
self.transaction_count
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -142,17 +144,19 @@ impl ThinClient {
|
|||||||
info!("get_last_id");
|
info!("get_last_id");
|
||||||
let req = Request::GetLastId;
|
let req = Request::GetLastId;
|
||||||
let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id");
|
let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id");
|
||||||
|
let mut done = false;
|
||||||
|
while !done {
|
||||||
self.requests_socket
|
self.requests_socket
|
||||||
.send_to(&data, &self.requests_addr)
|
.send_to(&data, &self.requests_addr)
|
||||||
.expect("buffer error in pub fn get_last_id");
|
.expect("buffer error in pub fn get_last_id");
|
||||||
let mut done = false;
|
|
||||||
while !done {
|
if let Ok(resp) = self.recv_response() {
|
||||||
let resp = self.recv_response().expect("get_last_id response");
|
|
||||||
if let &Response::LastId { .. } = &resp {
|
if let &Response::LastId { .. } = &resp {
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
self.process_response(resp);
|
self.process_response(resp);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
self.last_id.expect("some last_id")
|
self.last_id.expect("some last_id")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user