diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index 1b7e5e87c9..c7b4d15ec7 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -85,6 +85,9 @@ fn check_txs(batches: usize, receiver: &Receiver, ref_tx_count: usize) { let signal = receiver.recv().unwrap(); if let Signal::Transactions(transactions) = signal { total += transactions.len(); + if total >= ref_tx_count { + break; + } } else { assert!(false); } diff --git a/ci/test-large-network.sh b/ci/test-large-network.sh index 3abe93d0ae..b3ef60865f 100755 --- a/ci/test-large-network.sh +++ b/ci/test-large-network.sh @@ -16,5 +16,10 @@ export LD_LIBRARY_PATH+=:$PWD export RUST_LOG=multinode=info +if [[ $(ulimit -n) -le 65000 ]]; then + echo 'Error: nofiles too small, run "ulimit -n 65000" to continue' + exit 1 +fi + set -x exec cargo test --release --features=erasure test_multi_node_dynamic_network -- --ignored diff --git a/src/banking_stage.rs b/src/banking_stage.rs index f3d976b109..0e6443aa96 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -11,6 +11,7 @@ use record_stage::Signal; use result::{Error, Result}; use service::Service; use std::net::SocketAddr; +use std::result; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::Arc; @@ -26,6 +27,28 @@ pub struct BankingStage { thread_hdl: JoinHandle<()>, } +fn recv_multiple_packets( + verified_receiver: &Receiver)>>, + wait_ms: u64, + max_tries: usize, +) -> result::Result)>, RecvTimeoutError> { + let timer = Duration::new(1, 0); + let mut mms = verified_receiver.recv_timeout(timer)?; + let mut recv_tries = 1; + + // Try receiving more packets from verified_receiver. Let's coalesce any packets + // that are received within "wait_ms" ms of each other. + while let Ok(mut nq) = verified_receiver.recv_timeout(Duration::from_millis(wait_ms)) { + recv_tries += 1; + mms.append(&mut nq); + + if recv_tries >= max_tries { + break; + } + } + Ok(mms) +} + impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. /// Discard input packets using `packet_recycler` to minimize memory @@ -77,9 +100,11 @@ impl BankingStage { signal_sender: &Sender, packet_recycler: &PacketRecycler, ) -> Result<()> { - let timer = Duration::new(1, 0); + // Coalesce upto 512 transactions before sending it to the next stage + let max_coalesced_txs = 512; + let max_recv_tries = 10; let recv_start = Instant::now(); - let mms = verified_receiver.recv_timeout(timer)?; + let mms = recv_multiple_packets(verified_receiver, 20, max_recv_tries)?; let mut reqs_len = 0; let mms_len = mms.len(); info!( @@ -91,6 +116,8 @@ impl BankingStage { let bank_starting_tx_count = bank.transaction_count(); let count = mms.iter().map(|x| x.1.len()).sum(); let proc_start = Instant::now(); + let mut txs: Vec = Vec::new(); + let mut num_sent = 0; for (msgs, vers) in mms { let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); reqs_len += transactions.len(); @@ -109,12 +136,24 @@ impl BankingStage { debug!("process_transactions"); let results = bank.process_transactions(transactions); - let transactions = results.into_iter().filter_map(|x| x.ok()).collect(); - signal_sender.send(Signal::Transactions(transactions))?; + let mut transactions: Vec = + results.into_iter().filter_map(|x| x.ok()).collect(); + txs.append(&mut transactions); + if txs.len() >= max_coalesced_txs { + signal_sender.send(Signal::Transactions(txs.clone()))?; + txs.clear(); + num_sent += 1; + } debug!("done process_transactions"); packet_recycler.recycle(msgs); } + + // Send now, if there are pending transactions, or if there was + // no transactions sent to the next stage yet. + if !txs.is_empty() || num_sent == 0 { + signal_sender.send(Signal::Transactions(txs))?; + } let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); info!( @@ -144,6 +183,42 @@ impl Service for BankingStage { } } +#[cfg(test)] +mod test { + use banking_stage::recv_multiple_packets; + use packet::SharedPackets; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::{channel, RecvTimeoutError}; + use std::sync::Arc; + use std::thread; + use std::thread::sleep; + use std::time::Duration; + + #[test] + pub fn recv_multiple_packets_test() { + let (sender, receiver) = channel(); + let exit = Arc::new(AtomicBool::new(false)); + + assert_eq!( + recv_multiple_packets(&receiver, 20, 10).unwrap_err(), + RecvTimeoutError::Timeout + ); + + { + let exit = exit.clone(); + thread::spawn(move || { + while !exit.load(Ordering::Relaxed) { + let testdata: Vec<(SharedPackets, Vec)> = Vec::new(); + sender.send(testdata).expect("Failed to send message"); + sleep(Duration::from_millis(10)); + } + }); + } + + assert_eq!(recv_multiple_packets(&receiver, 20, 10).is_ok(), true); + exit.store(true, Ordering::Relaxed); + } +} // TODO: When banking is pulled out of RequestStage, add this test back in. //use bank::Bank; diff --git a/tests/multinode.rs b/tests/multinode.rs index 3f7479fd6b..abc13a7f7b 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -663,7 +663,7 @@ fn retry_get_balance( bob_pubkey: &PublicKey, expected: Option, ) -> Option { - const LAST: usize = 20; + const LAST: usize = 30; for run in 0..(LAST + 1) { let out = client.poll_get_balance(bob_pubkey); if expected.is_none() || run == LAST { @@ -714,7 +714,12 @@ fn retry_send_tx_and_retry_get_balance( if expected.is_none() || run == LAST { return out.ok().clone(); } - trace!("retry_get_balance[{}] {:?} {:?}", run, out, expected); + trace!( + "retry_send_tx_and_retry_get_balance[{}] {:?} {:?}", + run, + out, + expected + ); if let (Some(e), Ok(o)) = (expected, out) { if o == e { return Some(o);