diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index 5bd5e72ae0..3c676ffacf 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -9,22 +9,24 @@ use solana_drone::drone::request_airdrop_transaction; use solana_exchange_api::exchange_instruction; use solana_exchange_api::exchange_state::*; use solana_exchange_api::id; +use solana_metrics::influxdb; use solana_sdk::client::Client; use solana_sdk::client::{AsyncClient, SyncClient}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_instruction; +use solana_sdk::timing::{duration_as_ms, duration_as_ns, duration_as_s}; use solana_sdk::transaction::Transaction; use std::cmp; use std::collections::VecDeque; +use std::mem; use std::net::SocketAddr; use std::process::exit; use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, RwLock}; -use std::thread::sleep; +use std::thread::{sleep, Builder}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use std::{mem, thread}; // TODO Chunk length as specified results in a bunch of failures, divide by 10 helps... // Assume 4MB network buffers, and 512 byte packets @@ -65,7 +67,7 @@ impl Default for Config { #[derive(Default)] pub struct SampleStats { /// Maximum TPS reported by this node - pub tps: f64, + pub tps: f32, /// Total time taken for those txs pub tx_time: Duration, /// Total transactions reported by this node @@ -139,9 +141,12 @@ where let exit_signal = exit_signal.clone(); let sample_stats = sample_stats.clone(); let client_ctor = ctor.clone(); - thread::spawn(move || { - sample_tx_count(&exit_signal, &sample_stats, sample_period, &client_ctor) - }) + Builder::new() + .name("solana-exchange-sample".to_string()) + .spawn(move || { + sample_tx_count(&exit_signal, &sample_stats, sample_period, &client_ctor) + }) + .unwrap() }) .collect(); @@ -155,15 +160,18 @@ where let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let total_tx_sent_count = total_tx_sent_count.clone(); let client_ctor = client_ctors[0].clone(); - thread::spawn(move || { - do_tx_transfers( - &exit_signal, - &shared_txs, - &shared_tx_active_thread_count, - &total_tx_sent_count, - &client_ctor, - ) - }) + Builder::new() + .name("solana-exchange-transfer".to_string()) + .spawn(move || { + do_tx_transfers( + &exit_signal, + &shared_txs, + &shared_tx_active_thread_count, + &total_tx_sent_count, + &client_ctor, + ) + }) + .unwrap() }) .collect(); @@ -174,19 +182,22 @@ where let shared_txs = shared_txs.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let client_ctor = client_ctors[0].clone(); - thread::spawn(move || { - swapper( - &exit_signal, - &swapper_receiver, - &shared_txs, - &shared_tx_active_thread_count, - &swapper_signers, - &profit_pubkeys, - batch_size, - account_groups, - &client_ctor, - ) - }) + Builder::new() + .name("solana-exchange-swapper".to_string()) + .spawn(move || { + swapper( + &exit_signal, + &swapper_receiver, + &shared_txs, + &shared_tx_active_thread_count, + &swapper_signers, + &profit_pubkeys, + batch_size, + account_groups, + &client_ctor, + ) + }) + .unwrap() }; trace!("Start trader thread"); @@ -195,21 +206,24 @@ where let shared_txs = shared_txs.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let client_ctor = client_ctors[0].clone(); - thread::spawn(move || { - trader( - &exit_signal, - &swapper_sender, - &shared_txs, - &shared_tx_active_thread_count, - &trader_signers, - &src_pubkeys, - &dst_pubkeys, - trade_delay, - batch_size, - account_groups, - &client_ctor, - ) - }) + Builder::new() + .name("solana-exchange-trader".to_string()) + .spawn(move || { + trader( + &exit_signal, + &swapper_sender, + &shared_txs, + &shared_tx_active_thread_count, + &trader_signers, + &src_pubkeys, + &dst_pubkeys, + trade_delay, + batch_size, + account_groups, + &client_ctor, + ) + }) + .unwrap() }; info!("Requesting and swapping trades"); @@ -259,8 +273,7 @@ fn sample_tx_count( let sample = tx_count - initial_tx_count; initial_tx_count = tx_count; - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let tps = (sample * 1_000_000_000) as f64 / ns as f64; + let tps = sample as f32 / duration_as_s(&duration); if tps > max_tps { max_tps = tps; } @@ -320,14 +333,23 @@ fn do_tx_transfers( total_tx_sent_count.fetch_add(n, Ordering::Relaxed); stats.total += n as u64; - let sent_ns = - duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - stats.sent_ns += sent_ns; - let rate = (n as f64 / sent_ns as f64) * 1_000_000_000_f64; + stats.sent_ns += duration_as_ns(&duration); + let rate = n as f32 / duration_as_s(&duration); if rate > stats.sent_peak_rate { stats.sent_peak_rate = rate; } trace!(" tx {:?} sent {:.2}/s", n, rate); + + solana_metrics::submit( + influxdb::Point::new("bench-exchange") + .add_tag("op", influxdb::Value::String("do_tx_transfers".to_string())) + .add_field( + "duration", + influxdb::Value::Integer(duration_as_ms(&duration) as i64), + ) + .add_field("count", influxdb::Value::Integer(n as i64)) + .to_owned(), + ); } None => { if exit_signal.load(Ordering::Relaxed) { @@ -348,11 +370,11 @@ fn do_tx_transfers( struct Stats { total: u64, keygen_ns: u64, - keygen_peak_rate: f64, + keygen_peak_rate: f32, sign_ns: u64, - sign_peak_rate: f64, + sign_peak_rate: f32, sent_ns: u64, - sent_peak_rate: f64, + sent_peak_rate: f32, } struct TradeInfo { @@ -431,10 +453,8 @@ fn swapper( } account_group = (account_group + 1) % account_groups as usize; let duration = now.elapsed(); - let keypair_ns = - duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let rate = (swaps_size as f64 / keypair_ns as f64) * 1_000_000_000_f64; - stats.keygen_ns += keypair_ns; + let rate = swaps_size as f32 / duration_as_s(&duration); + stats.keygen_ns += duration_as_ns(&duration); if rate > stats.keygen_peak_rate { stats.keygen_peak_rate = rate; } @@ -486,10 +506,9 @@ fn swapper( }) .collect(); let duration = now.elapsed(); - let sign_ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let n = to_swap_txs.len(); - let rate = (n as f64 / sign_ns as f64) * 1_000_000_000_f64; - stats.sign_ns += sign_ns; + let rate = n as f32 / duration_as_s(&duration); + stats.sign_ns += duration_as_ns(&duration); if rate > stats.sign_peak_rate { stats.sign_peak_rate = rate; } @@ -604,9 +623,8 @@ fn trader( } account_group = (account_group + 1) % account_groups as usize; let duration = now.elapsed(); - let keypair_ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let rate = (batch_size as f64 / keypair_ns as f64) * 1_000_000_000_f64; - stats.keygen_ns += keypair_ns; + let rate = batch_size as f32 / duration_as_s(&duration); + stats.keygen_ns += duration_as_ns(&duration); if rate > stats.keygen_peak_rate { stats.keygen_peak_rate = rate; } @@ -647,10 +665,9 @@ fn trader( }) .collect(); let duration = now.elapsed(); - let sign_ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let n = trades_txs.len(); - let rate = (n as f64 / sign_ns as f64) * 1_000_000_000_f64; - stats.sign_ns += sign_ns; + let rate = n as f32 / duration_as_s(&duration); + stats.sign_ns += duration_as_ns(&duration); if rate > stats.sign_peak_rate { stats.sign_peak_rate = rate; } @@ -1007,6 +1024,7 @@ pub fn airdrop_lamports(client: &Client, drone_addr: &SocketAddr, id: &Keypair, mod tests { use super::*; use solana::cluster_info::FULLNODE_PORT_RANGE; + use solana::contact_info::ContactInfo; use solana::fullnode::FullnodeConfig; use solana::gossip_service::discover_nodes; use solana::local_cluster::{ClusterConfig, LocalCluster}; @@ -1067,23 +1085,30 @@ mod tests { error!("Failed to discover {} nodes: {:?}", NUM_NODES, err); exit(1); }); - if nodes.len() < NUM_NODES { + let client_ctors: Vec<_> = nodes + .iter() + .filter_map(|node| { + let cluster_entrypoint = node.clone(); + let cluster_addrs = cluster_entrypoint.client_facing_addr(); + if ContactInfo::is_valid_address(&cluster_addrs.0) + && ContactInfo::is_valid_address(&cluster_addrs.1) + { + let client_ctor = + move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) }; + Some(client_ctor) + } else { + None + } + }) + .collect(); + + if client_ctors.len() < NUM_NODES { error!( "Error: Insufficient nodes discovered. Expecting {} or more", NUM_NODES ); exit(1); } - let client_ctors: Vec<_> = nodes - .iter() - .map(|node| { - let cluster_entrypoint = node.clone(); - let cluster_addrs = cluster_entrypoint.client_facing_addr(); - let client_ctor = - move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) }; - client_ctor - }) - .collect(); let client = client_ctors[0](); airdrop_lamports( diff --git a/bench-exchange/src/main.rs b/bench-exchange/src/main.rs index 6f314abbea..092822141c 100644 --- a/bench-exchange/src/main.rs +++ b/bench-exchange/src/main.rs @@ -5,6 +5,7 @@ pub mod order_book; use crate::bench::{airdrop_lamports, do_bench_exchange, Config}; use log::*; use solana::cluster_info::FULLNODE_PORT_RANGE; +use solana::contact_info::ContactInfo; use solana::gossip_service::discover_nodes; use solana_client::thin_client::create_client; use solana_client::thin_client::ThinClient; @@ -34,20 +35,28 @@ fn main() { let nodes = discover_nodes(&network_addr, num_nodes).unwrap_or_else(|_| { panic!("Failed to discover nodes"); }); - info!("{} nodes found", nodes.len()); - if nodes.len() < num_nodes { - panic!("Error: Insufficient nodes discovered"); - } - let client_ctors: Vec<_> = nodes .iter() - .map(|node| { + .filter_map(|node| { let cluster_entrypoint = node.clone(); let cluster_addrs = cluster_entrypoint.client_facing_addr(); - move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) } + if ContactInfo::is_valid_address(&cluster_addrs.0) + && ContactInfo::is_valid_address(&cluster_addrs.1) + { + let client_ctor = + move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) }; + Some(client_ctor) + } else { + None + } }) .collect(); + info!("{} nodes found", client_ctors.len()); + if client_ctors.len() < num_nodes { + panic!("Error: Insufficient nodes discovered"); + } + info!("Funding keypair: {}", identity.pubkey()); let client = client_ctors[0](); diff --git a/sdk/src/timing.rs b/sdk/src/timing.rs index 8f3c42c789..b663323c6e 100644 --- a/sdk/src/timing.rs +++ b/sdk/src/timing.rs @@ -24,6 +24,10 @@ pub const MAX_HASH_AGE_IN_SECONDS: usize = 120; // This must be <= MAX_HASH_AGE_IN_SECONDS, otherwise there's risk for DuplicateSignature errors pub const MAX_RECENT_BLOCKHASHES: usize = MAX_HASH_AGE_IN_SECONDS; +pub fn duration_as_ns(d: &Duration) -> u64 { + d.as_secs() * 1_000_000_000 + u64::from(d.subsec_nanos()) +} + pub fn duration_as_us(d: &Duration) -> u64 { (d.as_secs() * 1000 * 1000) + (u64::from(d.subsec_nanos()) / 1_000) }