diff --git a/src/fullnode.rs b/src/fullnode.rs index ff1bf3855e..c6d79bd9d1 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -76,7 +76,7 @@ pub struct Fullnode { pub node_role: Option, keypair: Arc, exit: Arc, - rpu: Rpu, + rpu: Option, rpc_service: JsonRpcService, ncp: Ncp, bank: Arc, @@ -89,6 +89,8 @@ pub struct Fullnode { retransmit_socket: UdpSocket, transaction_sockets: Vec, broadcast_socket: UdpSocket, + requests_socket: UdpSocket, + respond_socket: UdpSocket, blob_recycler: BlobRecycler, } @@ -236,13 +238,18 @@ impl Fullnode { let mut blob_recycler = BlobRecycler::default(); blob_recycler.set_name("fullnode::Blob"); - let rpu = Rpu::new( + let rpu = Some(Rpu::new( &bank, - node.sockets.requests, - node.sockets.respond, + node.sockets + .requests + .try_clone() + .expect("Failed to clone requests socket"), + node.sockets + .respond + .try_clone() + .expect("Failed to clone respond socket"), &blob_recycler, - exit.clone(), - ); + )); // TODO: this code assumes this node is the leader let mut drone_addr = node.info.contact_info.tpu; @@ -364,10 +371,12 @@ impl Fullnode { retransmit_socket: node.sockets.retransmit, transaction_sockets: node.sockets.transaction, broadcast_socket: node.sockets.broadcast, + requests_socket: node.sockets.requests, + respond_socket: node.sockets.respond, } } - fn leader_to_validator(&mut self) { + fn leader_to_validator(&mut self) -> Result<()> { // TODO: We can avoid building the bank again once RecordStage is // integrated with BankingStage let (bank, entry_height, _) = Self::new_bank_from_ledger(&self.ledger_path); @@ -384,9 +393,23 @@ impl Fullnode { } } - // Tell the RPU to serve requests out of the new bank we've created + // Make a new RPU to serve requests out of the new bank we've created // instead of the old one - self.rpu.set_new_bank(self.bank.clone()); + if !self.rpu.is_none() { + let old_rpu = self.rpu.take().unwrap(); + old_rpu.close()?; + self.rpu = Some(Rpu::new( + &self.bank, + self.requests_socket + .try_clone() + .expect("Failed to clone requests socket"), + self.respond_socket + .try_clone() + .expect("Failed to clone respond socket"), + &self.blob_recycler, + )); + } + let tvu = Tvu::new( self.keypair.clone(), &self.bank, @@ -409,6 +432,7 @@ impl Fullnode { ); let validator_state = ValidatorServices::new(tvu); self.node_role = Some(NodeRole::Validator(validator_state)); + Ok(()) } pub fn handle_role_transition(&mut self) -> Result> { @@ -416,7 +440,7 @@ impl Fullnode { match node_role { Some(NodeRole::Leader(leader_services)) => match leader_services.join()? { Some(TpuReturnType::LeaderRotation) => { - self.leader_to_validator(); + self.leader_to_validator()?; Ok(Some(FullnodeReturnType::LeaderRotation)) } _ => Ok(None), @@ -431,7 +455,9 @@ impl Fullnode { //used for notifying many nodes in parallel to exit pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); - + if let Some(ref rpu) = self.rpu { + rpu.exit(); + } match self.node_role { Some(NodeRole::Leader(ref leader_services)) => leader_services.exit(), Some(NodeRole::Validator(ref validator_services)) => validator_services.exit(), @@ -471,7 +497,9 @@ impl Service for Fullnode { type JoinReturnType = Option; fn join(self) -> Result> { - self.rpu.join()?; + if let Some(rpu) = self.rpu { + rpu.join()?; + } self.ncp.join()?; self.rpc_service.join()?; diff --git a/src/request_stage.rs b/src/request_stage.rs index c25d1867d0..90e962a370 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -9,7 +9,7 @@ use result::{Error, Result}; use service::Service; use std::net::SocketAddr; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; use streamer::{self, BlobReceiver, BlobSender}; @@ -17,7 +17,7 @@ use timing; pub struct RequestStage { thread_hdl: JoinHandle<()>, - pub request_processor: Arc>, + pub request_processor: Arc, } impl RequestStage { @@ -78,18 +78,19 @@ impl RequestStage { Ok(()) } pub fn new( - request_processor: Arc>, + request_processor: RequestProcessor, packet_receiver: Receiver, packet_recycler: PacketRecycler, blob_recycler: BlobRecycler, ) -> (Self, BlobReceiver) { + let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); let (blob_sender, blob_receiver) = channel(); let thread_hdl = Builder::new() .name("solana-request-stage".to_string()) .spawn(move || loop { if let Err(e) = Self::process_request_packets( - &request_processor_.read().unwrap(), + &request_processor_, &packet_receiver, &blob_sender, &packet_recycler, diff --git a/src/rpu.rs b/src/rpu.rs index 62cc5cd011..6381ef766c 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -28,18 +28,17 @@ use packet::{BlobRecycler, PacketRecycler}; use request_processor::RequestProcessor; use request_stage::RequestStage; use service::Service; -use std::mem; use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread::{self, JoinHandle}; use streamer; pub struct Rpu { request_stage: RequestStage, thread_hdls: Vec>, - request_processor: Arc>, + exit: Arc, } impl Rpu { @@ -48,21 +47,21 @@ impl Rpu { requests_socket: UdpSocket, respond_socket: UdpSocket, blob_recycler: &BlobRecycler, - exit: Arc, ) -> Self { + let exit = Arc::new(AtomicBool::new(false)); let mut packet_recycler = PacketRecycler::default(); packet_recycler.set_name("rpu::Packet"); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( Arc::new(requests_socket), - exit, + exit.clone(), packet_recycler.clone(), packet_sender, ); - let request_processor = Arc::new(RwLock::new(RequestProcessor::new(bank.clone()))); + let request_processor = RequestProcessor::new(bank.clone()); let (request_stage, blob_receiver) = RequestStage::new( - request_processor.clone(), + request_processor, packet_receiver, packet_recycler.clone(), blob_recycler.clone(), @@ -80,13 +79,17 @@ impl Rpu { Rpu { thread_hdls, request_stage, - request_processor, + exit, } } - pub fn set_new_bank(&self, new_bank: Arc) { - let mut w_request_procesor = self.request_processor.write().unwrap(); - mem::replace(&mut *w_request_procesor, RequestProcessor::new(new_bank)); + pub fn exit(&self) { + self.exit.store(true, Ordering::Relaxed); + } + + pub fn close(self) -> thread::Result<()> { + self.exit(); + self.join() } } diff --git a/tests/multinode.rs b/tests/multinode.rs index 837e14ace2..b94fcb04a0 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -810,18 +810,29 @@ fn test_leader_to_validator_transition() { // make sure the leader stops. assert!(genesis_height < leader_rotation_interval); for i in genesis_height..(leader_rotation_interval + extra_transactions) { - let expected_balance = std::cmp::min( - leader_rotation_interval - genesis_height, - i - genesis_height, - ); + if i < leader_rotation_interval { + // Poll to see that the bank state is updated after every transaction + // to ensure that each transaction is packaged as a single entry, + // so that we can be sure leader rotation is triggered + let expected_balance = std::cmp::min( + leader_rotation_interval - genesis_height, + i - genesis_height + 1, + ); + let result = send_tx_and_retry_get_balance( + &leader_info, + &mint, + &bob_pubkey, + 1, + Some(expected_balance as i64), + ); - send_tx_and_retry_get_balance( - &leader_info, - &mint, - &bob_pubkey, - 1, - Some(expected_balance as i64), - ); + assert_eq!(result, Some(expected_balance as i64)) + } else { + // After leader_rotation_interval entries, we don't care about the + // number of entries generated by these transactions. These are just + // here for testing to make sure the leader stopped at the correct point. + send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None); + } } // Wait for leader to shut down tpu and restart tvu