diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index 3c676ffacf..5ca4f533a7 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -11,7 +11,7 @@ use solana_exchange_api::exchange_state::*; use solana_exchange_api::id; use solana_metrics::influxdb; use solana_sdk::client::Client; -use solana_sdk::client::{AsyncClient, SyncClient}; +use solana_sdk::client::SyncClient; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_instruction; @@ -74,11 +74,9 @@ pub struct SampleStats { pub tx_count: u64, } -pub fn do_bench_exchange(client_ctors: Vec, config: Config) +pub fn do_bench_exchange(clients: Vec, config: Config) where - F: Fn() -> T, - F: 'static + std::marker::Sync + std::marker::Send, - T: Client, + T: 'static + Client + Send + Sync, { let Config { identity, @@ -91,8 +89,8 @@ where } = config; let accounts_in_groups = batch_size * account_groups; let exit_signal = Arc::new(AtomicBool::new(false)); - let client_ctors: Vec<_> = client_ctors.into_iter().map(Arc::new).collect(); - let client = client_ctors[0](); + let clients: Vec<_> = clients.into_iter().map(Arc::new).collect(); + let client = clients[0].as_ref(); let total_keys = accounts_in_groups as u64 * 5; info!("Generating {:?} keys", total_keys); @@ -119,33 +117,31 @@ where .collect(); info!("Fund trader accounts"); - fund_keys(&client, &identity, &trader_signers, fund_amount); + fund_keys(client, &identity, &trader_signers, fund_amount); info!("Fund swapper accounts"); - fund_keys(&client, &identity, &swapper_signers, fund_amount); + fund_keys(client, &identity, &swapper_signers, fund_amount); info!("Create {:?} source token accounts", src_pubkeys.len()); - create_token_accounts(&client, &trader_signers, &src_pubkeys); + create_token_accounts(client, &trader_signers, &src_pubkeys); info!("Create {:?} destination token accounts", dst_pubkeys.len()); - create_token_accounts(&client, &trader_signers, &dst_pubkeys); + create_token_accounts(client, &trader_signers, &dst_pubkeys); info!("Create {:?} profit token accounts", profit_pubkeys.len()); - create_token_accounts(&client, &swapper_signers, &profit_pubkeys); + create_token_accounts(client, &swapper_signers, &profit_pubkeys); // Collect the max transaction rate and total tx count seen (single node only) let sample_stats = Arc::new(RwLock::new(Vec::new())); let sample_period = 1; // in seconds info!("Sampling clients for tps every {} s", sample_period); - let sample_threads: Vec<_> = client_ctors + let sample_threads: Vec<_> = clients .iter() - .map(|ctor| { + .map(|client| { let exit_signal = exit_signal.clone(); let sample_stats = sample_stats.clone(); - let client_ctor = ctor.clone(); + let client = client.clone(); Builder::new() .name("solana-exchange-sample".to_string()) - .spawn(move || { - sample_tx_count(&exit_signal, &sample_stats, sample_period, &client_ctor) - }) + .spawn(move || sample_tx_count(&exit_signal, &sample_stats, sample_period, &client)) .unwrap() }) .collect(); @@ -159,7 +155,7 @@ where let shared_txs = shared_txs.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let total_tx_sent_count = total_tx_sent_count.clone(); - let client_ctor = client_ctors[0].clone(); + let client = clients[0].clone(); Builder::new() .name("solana-exchange-transfer".to_string()) .spawn(move || { @@ -168,7 +164,7 @@ where &shared_txs, &shared_tx_active_thread_count, &total_tx_sent_count, - &client_ctor, + &client, ) }) .unwrap() @@ -181,7 +177,7 @@ where let exit_signal = exit_signal.clone(); let shared_txs = shared_txs.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); - let client_ctor = client_ctors[0].clone(); + let client = clients[0].clone(); Builder::new() .name("solana-exchange-swapper".to_string()) .spawn(move || { @@ -194,7 +190,7 @@ where &profit_pubkeys, batch_size, account_groups, - &client_ctor, + &client, ) }) .unwrap() @@ -205,7 +201,7 @@ where let exit_signal = exit_signal.clone(); let shared_txs = shared_txs.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); - let client_ctor = client_ctors[0].clone(); + let client = clients[0].clone(); Builder::new() .name("solana-exchange-trader".to_string()) .spawn(move || { @@ -220,7 +216,7 @@ where trade_delay, batch_size, account_groups, - &client_ctor, + &client, ) }) .unwrap() @@ -242,16 +238,14 @@ where compute_and_report_stats(&sample_stats, total_tx_sent_count.load(Ordering::Relaxed)); } -fn sample_tx_count( +fn sample_tx_count( exit_signal: &Arc, sample_stats: &Arc>>, sample_period: u64, - client_ctor: &Arc, + client: &Arc, ) where - F: Fn() -> T, T: Client, { - let client = client_ctor(); let mut max_tps = 0.0; let mut total_tx_time; let mut total_tx_count; @@ -300,18 +294,15 @@ fn sample_tx_count( } } -fn do_tx_transfers( +fn do_tx_transfers( exit_signal: &Arc, shared_txs: &SharedTransactions, shared_tx_thread_count: &Arc, total_tx_sent_count: &Arc, - client_ctor: &Arc, + client: &Arc, ) where - F: Fn() -> T, T: Client, { - let client = client_ctor(); - let async_client: &AsyncClient = &client; let mut stats = Stats::default(); loop { let txs; @@ -326,7 +317,7 @@ fn do_tx_transfers( shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); let now = Instant::now(); for tx in txs0 { - async_client.async_send_transaction(tx).expect("Transfer"); + client.async_send_transaction(tx).expect("Transfer"); } let duration = now.elapsed(); shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); @@ -382,7 +373,7 @@ struct TradeInfo { order_info: TradeOrderInfo, } #[allow(clippy::too_many_arguments)] -fn swapper( +fn swapper( exit_signal: &Arc, receiver: &Receiver>, shared_txs: &SharedTransactions, @@ -391,12 +382,10 @@ fn swapper( profit_pubkeys: &[Pubkey], batch_size: usize, account_groups: usize, - client_ctor: &Arc, + client: &Arc, ) where - F: Fn() -> T, T: Client, { - let client = client_ctor(); let mut stats = Stats::default(); let mut order_book = OrderBook::default(); let mut account_group: usize = 0; @@ -554,7 +543,7 @@ fn swapper( } #[allow(clippy::too_many_arguments)] -fn trader( +fn trader( exit_signal: &Arc, sender: &Sender>, shared_txs: &SharedTransactions, @@ -565,12 +554,10 @@ fn trader( delay: u64, batch_size: usize, account_groups: usize, - client_ctor: &Arc, + client: &Arc, ) where - F: Fn() -> T, T: Client, { - let client = client_ctor(); let mut stats = Stats::default(); // TODO Hard coded for now @@ -1029,7 +1016,6 @@ mod tests { use solana::gossip_service::discover_nodes; use solana::local_cluster::{ClusterConfig, LocalCluster}; use solana_client::thin_client::create_client; - use solana_client::thin_client::ThinClient; use solana_drone::drone::run_local_drone; use std::sync::mpsc::channel; @@ -1085,7 +1071,8 @@ mod tests { error!("Failed to discover {} nodes: {:?}", NUM_NODES, err); exit(1); }); - let client_ctors: Vec<_> = nodes + + let clients: Vec<_> = nodes .iter() .filter_map(|node| { let cluster_entrypoint = node.clone(); @@ -1093,16 +1080,15 @@ mod tests { if ContactInfo::is_valid_address(&cluster_addrs.0) && ContactInfo::is_valid_address(&cluster_addrs.1) { - let client_ctor = - move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) }; - Some(client_ctor) + let client = create_client(cluster_addrs, FULLNODE_PORT_RANGE); + Some(client) } else { None } }) .collect(); - if client_ctors.len() < NUM_NODES { + if clients.len() < NUM_NODES { error!( "Error: Insufficient nodes discovered. Expecting {} or more", NUM_NODES @@ -1110,14 +1096,13 @@ mod tests { exit(1); } - let client = client_ctors[0](); airdrop_lamports( - &client, + &clients[0], &drone_addr, &config.identity, fund_amount * (accounts_in_groups + 1) as u64 * 2, ); - do_bench_exchange(client_ctors, config); + do_bench_exchange(clients, config); } } diff --git a/bench-exchange/src/main.rs b/bench-exchange/src/main.rs index 092822141c..c311ae5e11 100644 --- a/bench-exchange/src/main.rs +++ b/bench-exchange/src/main.rs @@ -8,7 +8,6 @@ use solana::cluster_info::FULLNODE_PORT_RANGE; use solana::contact_info::ContactInfo; use solana::gossip_service::discover_nodes; use solana_client::thin_client::create_client; -use solana_client::thin_client::ThinClient; use solana_sdk::signature::KeypairUtil; fn main() { @@ -35,7 +34,8 @@ fn main() { let nodes = discover_nodes(&network_addr, num_nodes).unwrap_or_else(|_| { panic!("Failed to discover nodes"); }); - let client_ctors: Vec<_> = nodes + + let clients: Vec<_> = nodes .iter() .filter_map(|node| { let cluster_entrypoint = node.clone(); @@ -43,26 +43,24 @@ fn main() { if ContactInfo::is_valid_address(&cluster_addrs.0) && ContactInfo::is_valid_address(&cluster_addrs.1) { - let client_ctor = - move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) }; - Some(client_ctor) + let client = create_client(cluster_addrs, FULLNODE_PORT_RANGE); + Some(client) } else { None } }) .collect(); - info!("{} nodes found", client_ctors.len()); - if client_ctors.len() < num_nodes { + info!("{} nodes found", clients.len()); + if clients.len() < num_nodes { panic!("Error: Insufficient nodes discovered"); } info!("Funding keypair: {}", identity.pubkey()); - let client = client_ctors[0](); let accounts_in_groups = batch_size * account_groups; airdrop_lamports( - &client, + &clients[0], &drone_addr, &identity, fund_amount * (accounts_in_groups + 1) as u64 * 2, @@ -78,5 +76,5 @@ fn main() { account_groups, }; - do_bench_exchange(client_ctors, config); + do_bench_exchange(clients, config); } diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 93d06ab2b4..0cd1785771 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -19,7 +19,7 @@ use std::thread::sleep; use std::time::{Duration, Instant}; pub struct RpcClient { - client: Box, + client: Box, } impl RpcClient { @@ -743,4 +743,9 @@ mod tests { ); } + #[test] + fn test_rpc_client_thread() { + let rpc_client = RpcClient::new_mock("succeeds".to_string()); + thread::spawn(move || rpc_client); + } }