From 0b5d3df251f3d8744aecb90585b3780a0e0ea053 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Sun, 24 May 2020 11:46:10 -0700 Subject: [PATCH] Optimize banking processing of AccountInUse (#10154) (#10193) automerge --- Cargo.lock | 6 +- banking-bench/Cargo.toml | 6 +- banking-bench/src/main.rs | 142 +++++++++++++++++++++++++++++--------- core/src/banking_stage.rs | 12 +++- 4 files changed, 126 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf52936dfe..c08db30318 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,9 +414,9 @@ dependencies = [ [[package]] name = "clap" -version = "2.33.0" +version = "2.33.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" +checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" dependencies = [ "ansi_term", "atty", @@ -2963,6 +2963,7 @@ dependencies = [ name = "solana-banking-bench" version = "1.1.14" dependencies = [ + "clap", "crossbeam-channel", "log 0.4.8", "rand 0.7.3", @@ -2975,6 +2976,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-streamer", + "solana-version", ] [[package]] diff --git a/banking-bench/Cargo.toml b/banking-bench/Cargo.toml index c9a4af8823..fddd8ea9e0 100644 --- a/banking-bench/Cargo.toml +++ b/banking-bench/Cargo.toml @@ -8,7 +8,10 @@ license = "Apache-2.0" homepage = "https://solana.com/" [dependencies] +clap = "2.33.1" +crossbeam-channel = "0.4" log = "0.4.6" +rand = "0.7.0" rayon = "1.3.0" solana-core = { path = "../core", version = "1.1.14" } solana-streamer = { path = "../streamer", version = "1.1.14" } @@ -18,8 +21,7 @@ solana-logger = { path = "../logger", version = "1.1.14" } solana-runtime = { path = "../runtime", version = "1.1.14" } solana-measure = { path = "../measure", version = "1.1.14" } solana-sdk = { path = "../sdk", version = "1.1.14" } -rand = "0.7.0" -crossbeam-channel = "0.4" +solana-version = { path = "../version", version = "1.1.14" } [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index bba9ec24da..824237da88 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -1,3 +1,4 @@ +use clap::{crate_description, crate_name, value_t, App, Arg}; use crossbeam_channel::unbounded; use log::*; use rand::{thread_rng, Rng}; @@ -64,15 +65,22 @@ fn check_txs( no_bank } -fn make_accounts_txs(txes: usize, mint_keypair: &Keypair, hash: Hash) -> Vec { +fn make_accounts_txs( + total_num_transactions: usize, + hash: Hash, + same_payer: bool, +) -> Vec { let to_pubkey = Pubkey::new_rand(); - let dummy = system_transaction::transfer(mint_keypair, &to_pubkey, 1, hash); - (0..txes) + let payer_key = Keypair::new(); + let dummy = system_transaction::transfer(&payer_key, &to_pubkey, 1, hash); + (0..total_num_transactions) .into_par_iter() .map(|_| { let mut new = dummy.clone(); let sig: Vec = (0..64).map(|_| thread_rng().gen()).collect(); - new.message.account_keys[0] = Pubkey::new_rand(); + if !same_payer { + new.message.account_keys[0] = Pubkey::new_rand(); + } new.message.account_keys[1] = Pubkey::new_rand(); new.signatures = vec![Signature::new(&sig[0..64])]; new @@ -96,13 +104,61 @@ fn bytes_as_usize(bytes: &[u8]) -> usize { bytes[0] as usize | (bytes[1] as usize) << 8 } +#[allow(clippy::cognitive_complexity)] fn main() { solana_logger::setup(); - let num_threads = BankingStage::num_threads() as usize; + + let matches = App::new(crate_name!()) + .about(crate_description!()) + .version(solana_version::version!()) + .arg( + Arg::with_name("num_chunks") + .long("num-chunks") + .takes_value(true) + .value_name("SIZE") + .help("Number of transaction chunks."), + ) + .arg( + Arg::with_name("packets_per_chunk") + .long("packets-per-chunk") + .takes_value(true) + .value_name("SIZE") + .help("Packets per chunk"), + ) + .arg( + Arg::with_name("skip_sanity") + .long("skip-sanity") + .takes_value(false) + .help("Skip transaction sanity execution"), + ) + .arg( + Arg::with_name("same_payer") + .long("same-payer") + .takes_value(false) + .help("Use the same payer for transfers"), + ) + .arg( + Arg::with_name("iterations") + .long("iterations") + .takes_value(true) + .help("Number of iterations"), + ) + .arg( + Arg::with_name("num_threads") + .long("num-threads") + .takes_value(true) + .help("Number of iterations"), + ) + .get_matches(); + + let num_threads = + value_t!(matches, "num_threads", usize).unwrap_or(BankingStage::num_threads() as usize); // a multiple of packet chunk duplicates to avoid races - const CHUNKS: usize = 8 * 2; - const PACKETS_PER_BATCH: usize = 192; - let txes = PACKETS_PER_BATCH * num_threads * CHUNKS; + let num_chunks = value_t!(matches, "num_chunks", usize).unwrap_or(16); + let packets_per_chunk = value_t!(matches, "packets_per_chunk", usize).unwrap_or(192); + let iterations = value_t!(matches, "iterations", usize).unwrap_or(1000); + + let total_num_transactions = num_chunks * num_threads * packets_per_chunk; let mint_total = 1_000_000_000_000; let GenesisConfigInfo { genesis_config, @@ -116,34 +172,44 @@ fn main() { let mut bank_forks = BankForks::new(0, bank0); let mut bank = bank_forks.working_bank(); - info!("threads: {} txs: {}", num_threads, txes); + info!("threads: {} txs: {}", num_threads, total_num_transactions); - let mut transactions = make_accounts_txs(txes, &mint_keypair, genesis_config.hash()); + let same_payer = matches.is_present("same_payer"); + let mut transactions = + make_accounts_txs(total_num_transactions, genesis_config.hash(), same_payer); // fund all the accounts transactions.iter().for_each(|tx| { - let fund = system_transaction::transfer( + let mut fund = system_transaction::transfer( &mint_keypair, &tx.message.account_keys[0], - mint_total / txes as u64, + mint_total / total_num_transactions as u64, genesis_config.hash(), ); + // Ignore any pesky duplicate signature errors in the case we are using single-payer + let sig: Vec = (0..64).map(|_| thread_rng().gen()).collect(); + fund.signatures = vec![Signature::new(&sig[0..64])]; let x = bank.process_transaction(&fund); x.unwrap(); }); - //sanity check, make sure all the transactions can execute sequentially - transactions.iter().for_each(|tx| { - let res = bank.process_transaction(&tx); - assert!(res.is_ok(), "sanity test transactions"); - }); - bank.clear_signatures(); - //sanity check, make sure all the transactions can execute in parallel - let res = bank.process_transactions(&transactions); - for r in res { - assert!(r.is_ok(), "sanity parallel execution"); + + let skip_sanity = matches.is_present("skip_sanity"); + if !skip_sanity { + //sanity check, make sure all the transactions can execute sequentially + transactions.iter().for_each(|tx| { + let res = bank.process_transaction(&tx); + assert!(res.is_ok(), "sanity test transactions error: {:?}", res); + }); + bank.clear_signatures(); + //sanity check, make sure all the transactions can execute in parallel + let res = bank.process_transactions(&transactions); + for r in res { + assert!(r.is_ok(), "sanity parallel execution error: {:?}", r); + } + bank.clear_signatures(); } - bank.clear_signatures(); - let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH); + + let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), packets_per_chunk); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new( @@ -162,7 +228,7 @@ fn main() { ); poh_recorder.lock().unwrap().set_bank(&bank); - let chunk_len = verified.len() / CHUNKS; + let chunk_len = verified.len() / num_chunks; let mut start = 0; // This is so that the signal_receiver does not go out of scope after the closure. @@ -171,17 +237,17 @@ fn main() { let signal_receiver = Arc::new(signal_receiver); let mut total_us = 0; let mut tx_total_us = 0; + let base_tx_count = bank.transaction_count(); let mut txs_processed = 0; let mut root = 1; let collector = Pubkey::new_rand(); - const ITERS: usize = 1_000; let config = Config { - packets_per_batch: PACKETS_PER_BATCH, + packets_per_batch: packets_per_chunk, chunk_len, num_threads, }; let mut total_sent = 0; - for _ in 0..ITERS { + for _ in 0..iterations { let now = Instant::now(); let mut sent = 0; @@ -222,7 +288,11 @@ fn main() { sleep(Duration::from_millis(5)); } } - if check_txs(&signal_receiver, txes / CHUNKS, &poh_recorder) { + if check_txs( + &signal_receiver, + total_num_transactions / num_chunks, + &poh_recorder, + ) { debug!( "resetting bank {} tx count: {} txs_proc: {}", bank.slot(), @@ -274,7 +344,7 @@ fn main() { debug!( "time: {} us checked: {} sent: {}", duration_as_us(&now.elapsed()), - txes / CHUNKS, + total_num_transactions / num_chunks, sent, ); total_sent += sent; @@ -285,20 +355,26 @@ fn main() { let sig: Vec = (0..64).map(|_| thread_rng().gen()).collect(); tx.signatures[0] = Signature::new(&sig[0..64]); } - verified = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH); + verified = to_packets_chunked(&transactions.clone(), packets_per_chunk); } start += chunk_len; start %= verified.len(); } + let txs_processed = bank_forks.working_bank().transaction_count(); + debug!("processed: {} base: {}", txs_processed, base_tx_count); eprintln!( - "{{'name': 'banking_bench_total', 'median': '{}'}}", + "{{'name': 'banking_bench_total', 'median': '{:.2}'}}", (1000.0 * 1000.0 * total_sent as f64) / (total_us as f64), ); eprintln!( - "{{'name': 'banking_bench_tx_total', 'median': '{}'}}", + "{{'name': 'banking_bench_tx_total', 'median': '{:.2}'}}", (1000.0 * 1000.0 * total_sent as f64) / (tx_total_us as f64), ); + eprintln!( + "{{'name': 'banking_bench_success_tx_total', 'median': '{:.2}'}}", + (1000.0 * 1000.0 * (txs_processed - base_tx_count) as f64) / (total_us as f64), + ); drop(verified_sender); drop(vote_sender); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0479539db7..55b2decf15 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -292,7 +292,7 @@ impl BankingStage { enable_forwarding: bool, batch_limit: usize, transaction_status_sender: Option, - ) { + ) -> BufferedPacketsDecision { let (leader_at_slot_offset, poh_has_bank, would_be_leader) = { let poh = poh_recorder.lock().unwrap(); ( @@ -349,6 +349,7 @@ impl BankingStage { } _ => (), } + decision } pub fn process_loop( @@ -365,8 +366,8 @@ impl BankingStage { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; loop { - if !buffered_packets.is_empty() { - Self::process_buffered_packets( + while !buffered_packets.is_empty() { + let decision = Self::process_buffered_packets( &my_pubkey, &socket, poh_recorder, @@ -376,6 +377,11 @@ impl BankingStage { batch_limit, transaction_status_sender.clone(), ); + if decision == BufferedPacketsDecision::Hold { + // If we are waiting on a new bank, + // check the receiver for more transactions/for exiting + break; + } } let recv_timeout = if !buffered_packets.is_empty() {