Change to crossbeam channel in banking_threads VerifiedReceiver (#4822)

* Add crossbeam channel instead of channel in banking_stage
This commit is contained in:
carllin
2019-06-26 18:42:27 -07:00
committed by GitHub
parent 8cea650535
commit 9a52b01171
10 changed files with 73 additions and 40 deletions

View File

@ -14,6 +14,7 @@ use crate::result::{Error, Result};
use crate::service::Service;
use crate::sigverify_stage::VerifiedPackets;
use bincode::deserialize;
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn};
use solana_runtime::accounts_db::ErrorCounters;
@ -28,7 +29,7 @@ use solana_sdk::timing::{
use solana_sdk::transaction::{self, Transaction, TransactionError};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
@ -62,8 +63,8 @@ impl BankingStage {
pub fn new(
cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>,
verified_vote_receiver: Receiver<VerifiedPackets>,
verified_receiver: CrossbeamReceiver<VerifiedPackets>,
verified_vote_receiver: CrossbeamReceiver<VerifiedPackets>,
) -> Self {
Self::new_num_threads(
cluster_info,
@ -77,13 +78,10 @@ impl BankingStage {
fn new_num_threads(
cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>,
verified_vote_receiver: Receiver<VerifiedPackets>,
verified_receiver: CrossbeamReceiver<VerifiedPackets>,
verified_vote_receiver: CrossbeamReceiver<VerifiedPackets>,
num_threads: u32,
) -> Self {
let verified_receiver = Arc::new(Mutex::new(verified_receiver));
let verified_vote_receiver = Arc::new(Mutex::new(verified_vote_receiver));
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank.
@ -306,7 +304,7 @@ impl BankingStage {
pub fn process_loop(
my_pubkey: Pubkey,
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
verified_receiver: &CrossbeamReceiver<VerifiedPackets>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
recv_start: &mut Instant,
@ -346,7 +344,8 @@ impl BankingStage {
recv_timeout,
id,
) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
Ok(mut unprocessed_packets) => {
if unprocessed_packets.is_empty() {
continue;
@ -714,16 +713,13 @@ impl BankingStage {
/// Process the incoming packets
pub fn process_packets(
my_pubkey: &Pubkey,
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
verified_receiver: &CrossbeamReceiver<VerifiedPackets>,
poh: &Arc<Mutex<PohRecorder>>,
recv_start: &mut Instant,
recv_timeout: Duration,
id: u32,
) -> Result<UnprocessedPackets> {
let mms = verified_receiver
.lock()
.unwrap()
.recv_timeout(recv_timeout)?;
let mms = verified_receiver.recv_timeout(recv_timeout)?;
let mms_len = mms.len();
let count: usize = mms.iter().map(|x| x.1.len()).sum();
@ -860,20 +856,21 @@ mod tests {
use crate::packet::to_packets;
use crate::poh_recorder::WorkingBank;
use crate::{get_tmp_ledger_path, tmp_ledger_name};
use crossbeam_channel::unbounded;
use itertools::Itertools;
use solana_sdk::instruction::InstructionError;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
use solana_sdk::transaction::TransactionError;
use std::sync::mpsc::channel;
use std::sync::atomic::Ordering;
use std::thread::sleep;
#[test]
fn test_banking_stage_shutdown1() {
let genesis_block = create_genesis_block(2).genesis_block;
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(
@ -907,8 +904,8 @@ mod tests {
genesis_block.ticks_per_slot = 4;
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(
@ -956,8 +953,8 @@ mod tests {
} = create_genesis_block(10);
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(
@ -1065,7 +1062,7 @@ mod tests {
mint_keypair,
..
} = create_genesis_block(2);
let (verified_sender, verified_receiver) = channel();
let (verified_sender, verified_receiver) = unbounded();
// Process a batch that includes a transaction that receives two lamports.
let alice = Keypair::new();
@ -1097,7 +1094,7 @@ mod tests {
.collect();
verified_sender.send(packets).unwrap();
let (vote_sender, vote_receiver) = channel();
let (vote_sender, vote_receiver) = unbounded();
let ledger_path = get_tmp_ledger_path!();
{
let entry_receiver = {

View File

@ -4,9 +4,9 @@ use crate::result::Result;
use crate::service::Service;
use crate::sigverify_stage::VerifiedPackets;
use crate::{packet, sigverify};
use crossbeam_channel::Sender as CrossbeamSender;
use solana_metrics::inc_new_counter_debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
@ -20,7 +20,7 @@ impl ClusterInfoVoteListener {
exit: &Arc<AtomicBool>,
cluster_info: Arc<RwLock<ClusterInfo>>,
sigverify_disabled: bool,
sender: Sender<VerifiedPackets>,
sender: CrossbeamSender<VerifiedPackets>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> Self {
let exit = exit.clone();
@ -45,7 +45,7 @@ impl ClusterInfoVoteListener {
exit: Arc<AtomicBool>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sigverify_disabled: bool,
sender: &Sender<VerifiedPackets>,
sender: &CrossbeamSender<VerifiedPackets>,
poh_recorder: Arc<Mutex<PohRecorder>>,
) -> Result<()> {
let mut last_ts = 0;

View File

@ -96,3 +96,5 @@ extern crate solana_metrics;
#[cfg(test)]
#[macro_use]
extern crate matches;
extern crate crossbeam_channel;

View File

@ -17,7 +17,10 @@ pub enum Error {
AddrParse(std::net::AddrParseError),
JoinError(Box<dyn Any + Send + 'static>),
RecvError(std::sync::mpsc::RecvError),
TryCrossbeamRecvError(crossbeam_channel::TryRecvError),
CrossbeamRecvTimeoutError(crossbeam_channel::RecvTimeoutError),
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
CrossbeamSendError,
TryRecvError(std::sync::mpsc::TryRecvError),
Serialize(std::boxed::Box<bincode::ErrorKind>),
TransactionError(transaction::TransactionError),
@ -44,11 +47,21 @@ impl std::convert::From<std::sync::mpsc::RecvError> for Error {
Error::RecvError(e)
}
}
impl std::convert::From<crossbeam_channel::TryRecvError> for Error {
fn from(e: crossbeam_channel::TryRecvError) -> Error {
Error::TryCrossbeamRecvError(e)
}
}
impl std::convert::From<std::sync::mpsc::TryRecvError> for Error {
fn from(e: std::sync::mpsc::TryRecvError) -> Error {
Error::TryRecvError(e)
}
}
impl std::convert::From<crossbeam_channel::RecvTimeoutError> for Error {
fn from(e: crossbeam_channel::RecvTimeoutError) -> Error {
Error::CrossbeamRecvTimeoutError(e)
}
}
impl std::convert::From<std::sync::mpsc::RecvTimeoutError> for Error {
fn from(e: std::sync::mpsc::RecvTimeoutError) -> Error {
Error::RecvTimeoutError(e)
@ -69,6 +82,11 @@ impl std::convert::From<reed_solomon_erasure::Error> for Error {
Error::ErasureError(e)
}
}
impl<T> std::convert::From<crossbeam_channel::SendError<T>> for Error {
fn from(_e: crossbeam_channel::SendError<T>) -> Error {
Error::CrossbeamSendError
}
}
impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error {
fn from(_e: std::sync::mpsc::SendError<T>) -> Error {
Error::SendError

View File

@ -10,9 +10,10 @@ use crate::result::{Error, Result};
use crate::service::Service;
use crate::sigverify;
use crate::streamer::{self, PacketReceiver};
use crossbeam_channel::Sender as CrossbeamSender;
use solana_metrics::{datapoint_info, inc_new_counter_info};
use solana_sdk::timing;
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, Mutex};
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
@ -34,7 +35,7 @@ impl SigVerifyStage {
pub fn new(
packet_receiver: Receiver<Packets>,
sigverify_disabled: bool,
verified_sender: Sender<VerifiedPackets>,
verified_sender: CrossbeamSender<VerifiedPackets>,
) -> Self {
sigverify::init();
let thread_hdls =
@ -53,7 +54,7 @@ impl SigVerifyStage {
fn verifier(
recvr: &Arc<Mutex<PacketReceiver>>,
sendr: &Sender<VerifiedPackets>,
sendr: &CrossbeamSender<VerifiedPackets>,
sigverify_disabled: bool,
id: usize,
) -> Result<()> {
@ -107,7 +108,7 @@ impl SigVerifyStage {
fn verifier_service(
packet_receiver: Arc<Mutex<PacketReceiver>>,
verified_sender: Sender<VerifiedPackets>,
verified_sender: CrossbeamSender<VerifiedPackets>,
sigverify_disabled: bool,
id: usize,
) -> JoinHandle<()> {
@ -132,7 +133,7 @@ impl SigVerifyStage {
fn verifier_services(
packet_receiver: PacketReceiver,
verified_sender: Sender<VerifiedPackets>,
verified_sender: CrossbeamSender<VerifiedPackets>,
sigverify_disabled: bool,
) -> Vec<JoinHandle<()>> {
let receiver = Arc::new(Mutex::new(packet_receiver));

View File

@ -10,6 +10,7 @@ use crate::fetch_stage::FetchStage;
use crate::poh_recorder::{PohRecorder, WorkingBankEntries};
use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage;
use crossbeam_channel::unbounded;
use solana_sdk::pubkey::Pubkey;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
@ -50,12 +51,12 @@ impl Tpu {
&packet_sender,
&poh_recorder,
);
let (verified_sender, verified_receiver) = channel();
let (verified_sender, verified_receiver) = unbounded();
let sigverify_stage =
SigVerifyStage::new(packet_receiver, sigverify_disabled, verified_sender.clone());
let (verified_vote_sender, verified_vote_receiver) = channel();
let (verified_vote_sender, verified_vote_receiver) = unbounded();
let cluster_info_vote_listener = ClusterInfoVoteListener::new(
&exit,
cluster_info.clone(),