Remove holding cluster_info lock while forwarding packets (#4773)

This commit is contained in:
carllin
2019-06-21 15:21:49 -07:00
committed by GitHub
parent a38e1a81ef
commit 06ba0b7279

View File

@ -85,7 +85,7 @@ impl BankingStage {
// This thread talks to poh_service and broadcasts the entries once they have been recorded. // This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank. // Once an entry has been recorded, its blockhash is registered with the bank.
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let my_pubkey = cluster_info.read().unwrap().id();
// Many banks that process transactions in parallel. // Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads) let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|i| { .map(|i| {
@ -104,6 +104,7 @@ impl BankingStage {
.name("solana-banking-stage-tx".to_string()) .name("solana-banking-stage-tx".to_string())
.spawn(move || { .spawn(move || {
Self::process_loop( Self::process_loop(
my_pubkey,
&verified_receiver, &verified_receiver,
&poh_recorder, &poh_recorder,
&cluster_info, &cluster_info,
@ -241,14 +242,13 @@ impl BankingStage {
} }
fn process_buffered_packets( fn process_buffered_packets(
my_pubkey: &Pubkey,
socket: &std::net::UdpSocket, socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
buffered_packets: &mut Vec<PacketsAndOffsets>, buffered_packets: &mut Vec<PacketsAndOffsets>,
enable_forwarding: bool, enable_forwarding: bool,
) -> Result<()> { ) -> Result<()> {
let rcluster_info = cluster_info.read().unwrap();
let (decision, next_leader) = { let (decision, next_leader) = {
let poh = poh_recorder.lock().unwrap(); let poh = poh_recorder.lock().unwrap();
let next_leader = poh.next_slot_leader(); let next_leader = poh.next_slot_leader();
@ -257,7 +257,7 @@ impl BankingStage {
next_leader, next_leader,
poh.bank().is_some(), poh.bank().is_some(),
poh.would_be_leader(DEFAULT_TICKS_PER_SLOT * 2), poh.would_be_leader(DEFAULT_TICKS_PER_SLOT * 2),
&rcluster_info.id(), my_pubkey,
), ),
next_leader, next_leader,
) )
@ -265,28 +265,31 @@ impl BankingStage {
match decision { match decision {
BufferedPacketsDecision::Consume => { BufferedPacketsDecision::Consume => {
let mut unprocessed = Self::consume_buffered_packets( let mut unprocessed =
&rcluster_info.id(), Self::consume_buffered_packets(my_pubkey, poh_recorder, buffered_packets)?;
poh_recorder,
buffered_packets,
)?;
buffered_packets.append(&mut unprocessed); buffered_packets.append(&mut unprocessed);
Ok(()) Ok(())
} }
BufferedPacketsDecision::Forward => { BufferedPacketsDecision::Forward => {
if enable_forwarding { if enable_forwarding {
next_leader.map_or(Ok(()), |leader_pubkey| { next_leader.map_or(Ok(()), |leader_pubkey| {
rcluster_info let leader_addr = {
.lookup(&leader_pubkey) cluster_info
.map_or(Ok(()), |leader| { .read()
let _ = Self::forward_buffered_packets( .unwrap()
&socket, .lookup(&leader_pubkey)
&leader.tpu_via_blobs, .map(|leader| leader.tpu_via_blobs)
&buffered_packets, };
);
buffered_packets.clear(); leader_addr.map_or(Ok(()), |leader_addr| {
Ok(()) let _ = Self::forward_buffered_packets(
}) &socket,
&leader_addr,
&buffered_packets,
);
buffered_packets.clear();
Ok(())
})
}) })
} else { } else {
buffered_packets.clear(); buffered_packets.clear();
@ -298,6 +301,7 @@ impl BankingStage {
} }
pub fn process_loop( pub fn process_loop(
my_pubkey: Pubkey,
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>, verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
@ -310,6 +314,7 @@ impl BankingStage {
loop { loop {
if !buffered_packets.is_empty() { if !buffered_packets.is_empty() {
Self::process_buffered_packets( Self::process_buffered_packets(
&my_pubkey,
&socket, &socket,
poh_recorder, poh_recorder,
cluster_info, cluster_info,
@ -330,11 +335,11 @@ impl BankingStage {
}; };
match Self::process_packets( match Self::process_packets(
&my_pubkey,
&verified_receiver, &verified_receiver,
&poh_recorder, &poh_recorder,
recv_start, recv_start,
recv_timeout, recv_timeout,
cluster_info,
id, id,
) { ) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
@ -694,11 +699,11 @@ impl BankingStage {
/// Process the incoming packets /// Process the incoming packets
pub fn process_packets( pub fn process_packets(
my_pubkey: &Pubkey,
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>, verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
recv_start: &mut Instant, recv_start: &mut Instant,
recv_timeout: Duration, recv_timeout: Duration,
cluster_info: &Arc<RwLock<ClusterInfo>>,
id: u32, id: u32,
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
let mms = verified_receiver let mms = verified_receiver
@ -740,7 +745,6 @@ impl BankingStage {
if processed < verified_txs_len { if processed < verified_txs_len {
let next_leader = poh.lock().unwrap().next_slot_leader(); let next_leader = poh.lock().unwrap().next_slot_leader();
let my_pubkey = cluster_info.read().unwrap().id();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
while let Some((msgs, vers)) = mms_iter.next() { while let Some((msgs, vers)) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(vers); let packet_indexes = Self::generate_packet_indexes(vers);