Add bench-exchange tx send metrics (#3890)

This commit is contained in:
sakridge 2019-04-18 22:31:25 -07:00 committed by GitHub
parent 6eac5951ed
commit f4e40d2c41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 121 additions and 83 deletions

View File

@ -9,22 +9,24 @@ use solana_drone::drone::request_airdrop_transaction;
use solana_exchange_api::exchange_instruction; use solana_exchange_api::exchange_instruction;
use solana_exchange_api::exchange_state::*; use solana_exchange_api::exchange_state::*;
use solana_exchange_api::id; use solana_exchange_api::id;
use solana_metrics::influxdb;
use solana_sdk::client::Client; use solana_sdk::client::Client;
use solana_sdk::client::{AsyncClient, SyncClient}; use solana_sdk::client::{AsyncClient, SyncClient};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_instruction; use solana_sdk::system_instruction;
use solana_sdk::timing::{duration_as_ms, duration_as_ns, duration_as_s};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::cmp; use std::cmp;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::mem;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::process::exit; use std::process::exit;
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::{sleep, Builder};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{mem, thread};
// TODO Chunk length as specified results in a bunch of failures, divide by 10 helps... // TODO Chunk length as specified results in a bunch of failures, divide by 10 helps...
// Assume 4MB network buffers, and 512 byte packets // Assume 4MB network buffers, and 512 byte packets
@ -65,7 +67,7 @@ impl Default for Config {
#[derive(Default)] #[derive(Default)]
pub struct SampleStats { pub struct SampleStats {
/// Maximum TPS reported by this node /// Maximum TPS reported by this node
pub tps: f64, pub tps: f32,
/// Total time taken for those txs /// Total time taken for those txs
pub tx_time: Duration, pub tx_time: Duration,
/// Total transactions reported by this node /// Total transactions reported by this node
@ -139,9 +141,12 @@ where
let exit_signal = exit_signal.clone(); let exit_signal = exit_signal.clone();
let sample_stats = sample_stats.clone(); let sample_stats = sample_stats.clone();
let client_ctor = ctor.clone(); let client_ctor = ctor.clone();
thread::spawn(move || { Builder::new()
sample_tx_count(&exit_signal, &sample_stats, sample_period, &client_ctor) .name("solana-exchange-sample".to_string())
}) .spawn(move || {
sample_tx_count(&exit_signal, &sample_stats, sample_period, &client_ctor)
})
.unwrap()
}) })
.collect(); .collect();
@ -155,15 +160,18 @@ where
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone(); let total_tx_sent_count = total_tx_sent_count.clone();
let client_ctor = client_ctors[0].clone(); let client_ctor = client_ctors[0].clone();
thread::spawn(move || { Builder::new()
do_tx_transfers( .name("solana-exchange-transfer".to_string())
&exit_signal, .spawn(move || {
&shared_txs, do_tx_transfers(
&shared_tx_active_thread_count, &exit_signal,
&total_tx_sent_count, &shared_txs,
&client_ctor, &shared_tx_active_thread_count,
) &total_tx_sent_count,
}) &client_ctor,
)
})
.unwrap()
}) })
.collect(); .collect();
@ -174,19 +182,22 @@ where
let shared_txs = shared_txs.clone(); let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let client_ctor = client_ctors[0].clone(); let client_ctor = client_ctors[0].clone();
thread::spawn(move || { Builder::new()
swapper( .name("solana-exchange-swapper".to_string())
&exit_signal, .spawn(move || {
&swapper_receiver, swapper(
&shared_txs, &exit_signal,
&shared_tx_active_thread_count, &swapper_receiver,
&swapper_signers, &shared_txs,
&profit_pubkeys, &shared_tx_active_thread_count,
batch_size, &swapper_signers,
account_groups, &profit_pubkeys,
&client_ctor, batch_size,
) account_groups,
}) &client_ctor,
)
})
.unwrap()
}; };
trace!("Start trader thread"); trace!("Start trader thread");
@ -195,21 +206,24 @@ where
let shared_txs = shared_txs.clone(); let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let client_ctor = client_ctors[0].clone(); let client_ctor = client_ctors[0].clone();
thread::spawn(move || { Builder::new()
trader( .name("solana-exchange-trader".to_string())
&exit_signal, .spawn(move || {
&swapper_sender, trader(
&shared_txs, &exit_signal,
&shared_tx_active_thread_count, &swapper_sender,
&trader_signers, &shared_txs,
&src_pubkeys, &shared_tx_active_thread_count,
&dst_pubkeys, &trader_signers,
trade_delay, &src_pubkeys,
batch_size, &dst_pubkeys,
account_groups, trade_delay,
&client_ctor, batch_size,
) account_groups,
}) &client_ctor,
)
})
.unwrap()
}; };
info!("Requesting and swapping trades"); info!("Requesting and swapping trades");
@ -259,8 +273,7 @@ fn sample_tx_count<F, T>(
let sample = tx_count - initial_tx_count; let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count; initial_tx_count = tx_count;
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let tps = sample as f32 / duration_as_s(&duration);
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
if tps > max_tps { if tps > max_tps {
max_tps = tps; max_tps = tps;
} }
@ -320,14 +333,23 @@ fn do_tx_transfers<F, T>(
total_tx_sent_count.fetch_add(n, Ordering::Relaxed); total_tx_sent_count.fetch_add(n, Ordering::Relaxed);
stats.total += n as u64; stats.total += n as u64;
let sent_ns = stats.sent_ns += duration_as_ns(&duration);
duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let rate = n as f32 / duration_as_s(&duration);
stats.sent_ns += sent_ns;
let rate = (n as f64 / sent_ns as f64) * 1_000_000_000_f64;
if rate > stats.sent_peak_rate { if rate > stats.sent_peak_rate {
stats.sent_peak_rate = rate; stats.sent_peak_rate = rate;
} }
trace!(" tx {:?} sent {:.2}/s", n, rate); trace!(" tx {:?} sent {:.2}/s", n, rate);
solana_metrics::submit(
influxdb::Point::new("bench-exchange")
.add_tag("op", influxdb::Value::String("do_tx_transfers".to_string()))
.add_field(
"duration",
influxdb::Value::Integer(duration_as_ms(&duration) as i64),
)
.add_field("count", influxdb::Value::Integer(n as i64))
.to_owned(),
);
} }
None => { None => {
if exit_signal.load(Ordering::Relaxed) { if exit_signal.load(Ordering::Relaxed) {
@ -348,11 +370,11 @@ fn do_tx_transfers<F, T>(
struct Stats { struct Stats {
total: u64, total: u64,
keygen_ns: u64, keygen_ns: u64,
keygen_peak_rate: f64, keygen_peak_rate: f32,
sign_ns: u64, sign_ns: u64,
sign_peak_rate: f64, sign_peak_rate: f32,
sent_ns: u64, sent_ns: u64,
sent_peak_rate: f64, sent_peak_rate: f32,
} }
struct TradeInfo { struct TradeInfo {
@ -431,10 +453,8 @@ fn swapper<F, T>(
} }
account_group = (account_group + 1) % account_groups as usize; account_group = (account_group + 1) % account_groups as usize;
let duration = now.elapsed(); let duration = now.elapsed();
let keypair_ns = let rate = swaps_size as f32 / duration_as_s(&duration);
duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); stats.keygen_ns += duration_as_ns(&duration);
let rate = (swaps_size as f64 / keypair_ns as f64) * 1_000_000_000_f64;
stats.keygen_ns += keypair_ns;
if rate > stats.keygen_peak_rate { if rate > stats.keygen_peak_rate {
stats.keygen_peak_rate = rate; stats.keygen_peak_rate = rate;
} }
@ -486,10 +506,9 @@ fn swapper<F, T>(
}) })
.collect(); .collect();
let duration = now.elapsed(); let duration = now.elapsed();
let sign_ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let n = to_swap_txs.len(); let n = to_swap_txs.len();
let rate = (n as f64 / sign_ns as f64) * 1_000_000_000_f64; let rate = n as f32 / duration_as_s(&duration);
stats.sign_ns += sign_ns; stats.sign_ns += duration_as_ns(&duration);
if rate > stats.sign_peak_rate { if rate > stats.sign_peak_rate {
stats.sign_peak_rate = rate; stats.sign_peak_rate = rate;
} }
@ -604,9 +623,8 @@ fn trader<F, T>(
} }
account_group = (account_group + 1) % account_groups as usize; account_group = (account_group + 1) % account_groups as usize;
let duration = now.elapsed(); let duration = now.elapsed();
let keypair_ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let rate = batch_size as f32 / duration_as_s(&duration);
let rate = (batch_size as f64 / keypair_ns as f64) * 1_000_000_000_f64; stats.keygen_ns += duration_as_ns(&duration);
stats.keygen_ns += keypair_ns;
if rate > stats.keygen_peak_rate { if rate > stats.keygen_peak_rate {
stats.keygen_peak_rate = rate; stats.keygen_peak_rate = rate;
} }
@ -647,10 +665,9 @@ fn trader<F, T>(
}) })
.collect(); .collect();
let duration = now.elapsed(); let duration = now.elapsed();
let sign_ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let n = trades_txs.len(); let n = trades_txs.len();
let rate = (n as f64 / sign_ns as f64) * 1_000_000_000_f64; let rate = n as f32 / duration_as_s(&duration);
stats.sign_ns += sign_ns; stats.sign_ns += duration_as_ns(&duration);
if rate > stats.sign_peak_rate { if rate > stats.sign_peak_rate {
stats.sign_peak_rate = rate; stats.sign_peak_rate = rate;
} }
@ -1007,6 +1024,7 @@ pub fn airdrop_lamports(client: &Client, drone_addr: &SocketAddr, id: &Keypair,
mod tests { mod tests {
use super::*; use super::*;
use solana::cluster_info::FULLNODE_PORT_RANGE; use solana::cluster_info::FULLNODE_PORT_RANGE;
use solana::contact_info::ContactInfo;
use solana::fullnode::FullnodeConfig; use solana::fullnode::FullnodeConfig;
use solana::gossip_service::discover_nodes; use solana::gossip_service::discover_nodes;
use solana::local_cluster::{ClusterConfig, LocalCluster}; use solana::local_cluster::{ClusterConfig, LocalCluster};
@ -1067,23 +1085,30 @@ mod tests {
error!("Failed to discover {} nodes: {:?}", NUM_NODES, err); error!("Failed to discover {} nodes: {:?}", NUM_NODES, err);
exit(1); exit(1);
}); });
if nodes.len() < NUM_NODES { let client_ctors: Vec<_> = nodes
.iter()
.filter_map(|node| {
let cluster_entrypoint = node.clone();
let cluster_addrs = cluster_entrypoint.client_facing_addr();
if ContactInfo::is_valid_address(&cluster_addrs.0)
&& ContactInfo::is_valid_address(&cluster_addrs.1)
{
let client_ctor =
move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) };
Some(client_ctor)
} else {
None
}
})
.collect();
if client_ctors.len() < NUM_NODES {
error!( error!(
"Error: Insufficient nodes discovered. Expecting {} or more", "Error: Insufficient nodes discovered. Expecting {} or more",
NUM_NODES NUM_NODES
); );
exit(1); exit(1);
} }
let client_ctors: Vec<_> = nodes
.iter()
.map(|node| {
let cluster_entrypoint = node.clone();
let cluster_addrs = cluster_entrypoint.client_facing_addr();
let client_ctor =
move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) };
client_ctor
})
.collect();
let client = client_ctors[0](); let client = client_ctors[0]();
airdrop_lamports( airdrop_lamports(

View File

@ -5,6 +5,7 @@ pub mod order_book;
use crate::bench::{airdrop_lamports, do_bench_exchange, Config}; use crate::bench::{airdrop_lamports, do_bench_exchange, Config};
use log::*; use log::*;
use solana::cluster_info::FULLNODE_PORT_RANGE; use solana::cluster_info::FULLNODE_PORT_RANGE;
use solana::contact_info::ContactInfo;
use solana::gossip_service::discover_nodes; use solana::gossip_service::discover_nodes;
use solana_client::thin_client::create_client; use solana_client::thin_client::create_client;
use solana_client::thin_client::ThinClient; use solana_client::thin_client::ThinClient;
@ -34,20 +35,28 @@ fn main() {
let nodes = discover_nodes(&network_addr, num_nodes).unwrap_or_else(|_| { let nodes = discover_nodes(&network_addr, num_nodes).unwrap_or_else(|_| {
panic!("Failed to discover nodes"); panic!("Failed to discover nodes");
}); });
info!("{} nodes found", nodes.len());
if nodes.len() < num_nodes {
panic!("Error: Insufficient nodes discovered");
}
let client_ctors: Vec<_> = nodes let client_ctors: Vec<_> = nodes
.iter() .iter()
.map(|node| { .filter_map(|node| {
let cluster_entrypoint = node.clone(); let cluster_entrypoint = node.clone();
let cluster_addrs = cluster_entrypoint.client_facing_addr(); let cluster_addrs = cluster_entrypoint.client_facing_addr();
move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) } if ContactInfo::is_valid_address(&cluster_addrs.0)
&& ContactInfo::is_valid_address(&cluster_addrs.1)
{
let client_ctor =
move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) };
Some(client_ctor)
} else {
None
}
}) })
.collect(); .collect();
info!("{} nodes found", client_ctors.len());
if client_ctors.len() < num_nodes {
panic!("Error: Insufficient nodes discovered");
}
info!("Funding keypair: {}", identity.pubkey()); info!("Funding keypair: {}", identity.pubkey());
let client = client_ctors[0](); let client = client_ctors[0]();

View File

@ -24,6 +24,10 @@ pub const MAX_HASH_AGE_IN_SECONDS: usize = 120;
// This must be <= MAX_HASH_AGE_IN_SECONDS, otherwise there's risk for DuplicateSignature errors // This must be <= MAX_HASH_AGE_IN_SECONDS, otherwise there's risk for DuplicateSignature errors
pub const MAX_RECENT_BLOCKHASHES: usize = MAX_HASH_AGE_IN_SECONDS; pub const MAX_RECENT_BLOCKHASHES: usize = MAX_HASH_AGE_IN_SECONDS;
pub fn duration_as_ns(d: &Duration) -> u64 {
d.as_secs() * 1_000_000_000 + u64::from(d.subsec_nanos())
}
pub fn duration_as_us(d: &Duration) -> u64 { pub fn duration_as_us(d: &Duration) -> u64 {
(d.as_secs() * 1000 * 1000) + (u64::from(d.subsec_nanos()) / 1_000) (d.as_secs() * 1000 * 1000) + (u64::from(d.subsec_nanos()) / 1_000)
} }