Use a separate channel to process votes in banking stage (#3861)

- This will help expedite the vote processing on peer nodes
This commit is contained in:
Pankaj Garg 2019-04-17 21:07:45 -07:00 committed by GitHub
parent 26a7eb6fa5
commit 2f4a3ed190
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 11 deletions

View File

@ -55,6 +55,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let (genesis_block, mint_keypair) = GenesisBlock::new(mint_total); let (genesis_block, mint_keypair) = GenesisBlock::new(mint_total);
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let dummy = system_transaction::transfer( let dummy = system_transaction::transfer(
&mint_keypair, &mint_keypair,
@ -116,7 +117,12 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
create_test_recorder(&bank, &blocktree); create_test_recorder(&bank, &blocktree);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); let _banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
verified_receiver,
vote_receiver,
);
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);
let mut id = genesis_block.hash(); let mut id = genesis_block.hash();
@ -138,6 +144,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
start += half_len; start += half_len;
start %= verified.len(); start %= verified.len();
}); });
drop(vote_sender);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap(); poh_service.join().unwrap();
} }
@ -155,6 +162,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
let (genesis_block, mint_keypair) = GenesisBlock::new(mint_total); let (genesis_block, mint_keypair) = GenesisBlock::new(mint_total);
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let dummy = system_transaction::transfer( let dummy = system_transaction::transfer(
&mint_keypair, &mint_keypair,
@ -232,7 +240,12 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
create_test_recorder(&bank, &blocktree); create_test_recorder(&bank, &blocktree);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); let _banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
verified_receiver,
vote_receiver,
);
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);
let mut id = genesis_block.hash(); let mut id = genesis_block.hash();
@ -254,6 +267,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
start += half_len; start += half_len;
start %= verified.len(); start %= verified.len();
}); });
drop(vote_sender);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap(); poh_service.join().unwrap();
} }

View File

@ -21,6 +21,7 @@ use solana_runtime::locked_accounts_results::LockedAccountsResults;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{self, duration_as_us, MAX_RECENT_BLOCKHASHES}; use solana_sdk::timing::{self, duration_as_us, MAX_RECENT_BLOCKHASHES};
use solana_sdk::transaction::{self, Transaction, TransactionError}; use solana_sdk::transaction::{self, Transaction, TransactionError};
use std::cmp;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::mpsc::{Receiver, RecvTimeoutError};
@ -54,12 +55,14 @@ impl BankingStage {
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>, verified_receiver: Receiver<VerifiedPackets>,
verified_vote_receiver: Receiver<VerifiedPackets>,
) -> Self { ) -> Self {
Self::new_num_threads( Self::new_num_threads(
cluster_info, cluster_info,
poh_recorder, poh_recorder,
verified_receiver, verified_receiver,
Self::num_threads(), verified_vote_receiver,
cmp::min(2, Self::num_threads()),
) )
} }
@ -67,9 +70,11 @@ impl BankingStage {
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>, verified_receiver: Receiver<VerifiedPackets>,
verified_vote_receiver: Receiver<VerifiedPackets>,
num_threads: u32, num_threads: u32,
) -> Self { ) -> Self {
let verified_receiver = Arc::new(Mutex::new(verified_receiver)); let verified_receiver = Arc::new(Mutex::new(verified_receiver));
let verified_vote_receiver = Arc::new(Mutex::new(verified_vote_receiver));
// Single thread to generate entries from many banks. // Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded. // This thread talks to poh_service and broadcasts the entries once they have been recorded.
@ -78,8 +83,13 @@ impl BankingStage {
// Many banks that process transactions in parallel. // Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads) let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|_| { .map(|i| {
let verified_receiver = verified_receiver.clone(); let verified_receiver = if i < num_threads - 1 {
verified_receiver.clone()
} else {
verified_vote_receiver.clone()
};
let poh_recorder = poh_recorder.clone(); let poh_recorder = poh_recorder.clone();
let cluster_info = cluster_info.clone(); let cluster_info = cluster_info.clone();
let exit = exit.clone(); let exit = exit.clone();
@ -629,6 +639,7 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let blocktree = Arc::new( let blocktree = Arc::new(
@ -638,8 +649,14 @@ mod tests {
create_test_recorder(&bank, &blocktree); create_test_recorder(&bank, &blocktree);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); let banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
verified_receiver,
vote_receiver,
);
drop(verified_sender); drop(verified_sender);
drop(vote_sender);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap(); banking_stage.join().unwrap();
poh_service.join().unwrap(); poh_service.join().unwrap();
@ -655,6 +672,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let blocktree = Arc::new( let blocktree = Arc::new(
@ -664,10 +682,16 @@ mod tests {
create_test_recorder(&bank, &blocktree); create_test_recorder(&bank, &blocktree);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); let banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
verified_receiver,
vote_receiver,
);
trace!("sending bank"); trace!("sending bank");
sleep(Duration::from_millis(600)); sleep(Duration::from_millis(600));
drop(verified_sender); drop(verified_sender);
drop(vote_sender);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap(); poh_service.join().unwrap();
drop(poh_recorder); drop(poh_recorder);
@ -693,6 +717,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let blocktree = Arc::new( let blocktree = Arc::new(
@ -702,7 +727,12 @@ mod tests {
create_test_recorder(&bank, &blocktree); create_test_recorder(&bank, &blocktree);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); let banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
verified_receiver,
vote_receiver,
);
// fund another account so we can send 2 good transactions in a single batch. // fund another account so we can send 2 good transactions in a single batch.
let keypair = Keypair::new(); let keypair = Keypair::new();
@ -739,6 +769,7 @@ mod tests {
.unwrap(); .unwrap();
drop(verified_sender); drop(verified_sender);
drop(vote_sender);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap(); poh_service.join().unwrap();
drop(poh_recorder); drop(poh_recorder);
@ -816,6 +847,7 @@ mod tests {
.send(vec![(packets[0].clone(), vec![1u8])]) .send(vec![(packets[0].clone(), vec![1u8])])
.unwrap(); .unwrap();
let (vote_sender, vote_receiver) = channel();
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let entry_receiver = { let entry_receiver = {
@ -834,7 +866,8 @@ mod tests {
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
verified_receiver, verified_receiver,
1, vote_receiver,
2,
); );
// wait for banking_stage to eat the packets // wait for banking_stage to eat the packets
@ -846,6 +879,7 @@ mod tests {
entry_receiver entry_receiver
}; };
drop(verified_sender); drop(verified_sender);
drop(vote_sender);
// consume the entire entry_receiver, feed it into a new bank // consume the entire entry_receiver, feed it into a new bank
// check that the balance is what we expect. // check that the balance is what we expect.

View File

@ -55,15 +55,21 @@ impl Tpu {
let sigverify_stage = let sigverify_stage =
SigVerifyStage::new(packet_receiver, sigverify_disabled, verified_sender.clone()); SigVerifyStage::new(packet_receiver, sigverify_disabled, verified_sender.clone());
let (verified_vote_sender, verified_vote_receiver) = channel();
let cluster_info_vote_listener = ClusterInfoVoteListener::new( let cluster_info_vote_listener = ClusterInfoVoteListener::new(
&exit, &exit,
cluster_info.clone(), cluster_info.clone(),
sigverify_disabled, sigverify_disabled,
verified_sender, verified_vote_sender,
&poh_recorder, &poh_recorder,
); );
let banking_stage = BankingStage::new(&cluster_info, poh_recorder, verified_receiver); let banking_stage = BankingStage::new(
&cluster_info,
poh_recorder,
verified_receiver,
verified_vote_receiver,
);
let broadcast_stage = BroadcastStage::new( let broadcast_stage = BroadcastStage::new(
broadcast_socket, broadcast_socket,