Changed transition to restart Rpu rather than modify bank to prevent lock contention
This commit is contained in:
@ -76,7 +76,7 @@ pub struct Fullnode {
|
||||
pub node_role: Option<NodeRole>,
|
||||
keypair: Arc<Keypair>,
|
||||
exit: Arc<AtomicBool>,
|
||||
rpu: Rpu,
|
||||
rpu: Option<Rpu>,
|
||||
rpc_service: JsonRpcService,
|
||||
ncp: Ncp,
|
||||
bank: Arc<Bank>,
|
||||
@ -89,6 +89,8 @@ pub struct Fullnode {
|
||||
retransmit_socket: UdpSocket,
|
||||
transaction_sockets: Vec<UdpSocket>,
|
||||
broadcast_socket: UdpSocket,
|
||||
requests_socket: UdpSocket,
|
||||
respond_socket: UdpSocket,
|
||||
blob_recycler: BlobRecycler,
|
||||
}
|
||||
|
||||
@ -236,13 +238,18 @@ impl Fullnode {
|
||||
let mut blob_recycler = BlobRecycler::default();
|
||||
blob_recycler.set_name("fullnode::Blob");
|
||||
|
||||
let rpu = Rpu::new(
|
||||
let rpu = Some(Rpu::new(
|
||||
&bank,
|
||||
node.sockets.requests,
|
||||
node.sockets.respond,
|
||||
node.sockets
|
||||
.requests
|
||||
.try_clone()
|
||||
.expect("Failed to clone requests socket"),
|
||||
node.sockets
|
||||
.respond
|
||||
.try_clone()
|
||||
.expect("Failed to clone respond socket"),
|
||||
&blob_recycler,
|
||||
exit.clone(),
|
||||
);
|
||||
));
|
||||
|
||||
// TODO: this code assumes this node is the leader
|
||||
let mut drone_addr = node.info.contact_info.tpu;
|
||||
@ -364,10 +371,12 @@ impl Fullnode {
|
||||
retransmit_socket: node.sockets.retransmit,
|
||||
transaction_sockets: node.sockets.transaction,
|
||||
broadcast_socket: node.sockets.broadcast,
|
||||
requests_socket: node.sockets.requests,
|
||||
respond_socket: node.sockets.respond,
|
||||
}
|
||||
}
|
||||
|
||||
fn leader_to_validator(&mut self) {
|
||||
fn leader_to_validator(&mut self) -> Result<()> {
|
||||
// TODO: We can avoid building the bank again once RecordStage is
|
||||
// integrated with BankingStage
|
||||
let (bank, entry_height, _) = Self::new_bank_from_ledger(&self.ledger_path);
|
||||
@ -384,9 +393,23 @@ impl Fullnode {
|
||||
}
|
||||
}
|
||||
|
||||
// Tell the RPU to serve requests out of the new bank we've created
|
||||
// Make a new RPU to serve requests out of the new bank we've created
|
||||
// instead of the old one
|
||||
self.rpu.set_new_bank(self.bank.clone());
|
||||
if !self.rpu.is_none() {
|
||||
let old_rpu = self.rpu.take().unwrap();
|
||||
old_rpu.close()?;
|
||||
self.rpu = Some(Rpu::new(
|
||||
&self.bank,
|
||||
self.requests_socket
|
||||
.try_clone()
|
||||
.expect("Failed to clone requests socket"),
|
||||
self.respond_socket
|
||||
.try_clone()
|
||||
.expect("Failed to clone respond socket"),
|
||||
&self.blob_recycler,
|
||||
));
|
||||
}
|
||||
|
||||
let tvu = Tvu::new(
|
||||
self.keypair.clone(),
|
||||
&self.bank,
|
||||
@ -409,6 +432,7 @@ impl Fullnode {
|
||||
);
|
||||
let validator_state = ValidatorServices::new(tvu);
|
||||
self.node_role = Some(NodeRole::Validator(validator_state));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn handle_role_transition(&mut self) -> Result<Option<FullnodeReturnType>> {
|
||||
@ -416,7 +440,7 @@ impl Fullnode {
|
||||
match node_role {
|
||||
Some(NodeRole::Leader(leader_services)) => match leader_services.join()? {
|
||||
Some(TpuReturnType::LeaderRotation) => {
|
||||
self.leader_to_validator();
|
||||
self.leader_to_validator()?;
|
||||
Ok(Some(FullnodeReturnType::LeaderRotation))
|
||||
}
|
||||
_ => Ok(None),
|
||||
@ -431,7 +455,9 @@ impl Fullnode {
|
||||
//used for notifying many nodes in parallel to exit
|
||||
pub fn exit(&self) {
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
|
||||
if let Some(ref rpu) = self.rpu {
|
||||
rpu.exit();
|
||||
}
|
||||
match self.node_role {
|
||||
Some(NodeRole::Leader(ref leader_services)) => leader_services.exit(),
|
||||
Some(NodeRole::Validator(ref validator_services)) => validator_services.exit(),
|
||||
@ -471,7 +497,9 @@ impl Service for Fullnode {
|
||||
type JoinReturnType = Option<FullnodeReturnType>;
|
||||
|
||||
fn join(self) -> Result<Option<FullnodeReturnType>> {
|
||||
self.rpu.join()?;
|
||||
if let Some(rpu) = self.rpu {
|
||||
rpu.join()?;
|
||||
}
|
||||
self.ncp.join()?;
|
||||
self.rpc_service.join()?;
|
||||
|
||||
|
@ -9,7 +9,7 @@ use result::{Error, Result};
|
||||
use service::Service;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::Arc;
|
||||
use std::thread::{self, Builder, JoinHandle};
|
||||
use std::time::Instant;
|
||||
use streamer::{self, BlobReceiver, BlobSender};
|
||||
@ -17,7 +17,7 @@ use timing;
|
||||
|
||||
pub struct RequestStage {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
pub request_processor: Arc<RwLock<RequestProcessor>>,
|
||||
pub request_processor: Arc<RequestProcessor>,
|
||||
}
|
||||
|
||||
impl RequestStage {
|
||||
@ -78,18 +78,19 @@ impl RequestStage {
|
||||
Ok(())
|
||||
}
|
||||
pub fn new(
|
||||
request_processor: Arc<RwLock<RequestProcessor>>,
|
||||
request_processor: RequestProcessor,
|
||||
packet_receiver: Receiver<SharedPackets>,
|
||||
packet_recycler: PacketRecycler,
|
||||
blob_recycler: BlobRecycler,
|
||||
) -> (Self, BlobReceiver) {
|
||||
let request_processor = Arc::new(request_processor);
|
||||
let request_processor_ = request_processor.clone();
|
||||
let (blob_sender, blob_receiver) = channel();
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solana-request-stage".to_string())
|
||||
.spawn(move || loop {
|
||||
if let Err(e) = Self::process_request_packets(
|
||||
&request_processor_.read().unwrap(),
|
||||
&request_processor_,
|
||||
&packet_receiver,
|
||||
&blob_sender,
|
||||
&packet_recycler,
|
||||
|
27
src/rpu.rs
27
src/rpu.rs
@ -28,18 +28,17 @@ use packet::{BlobRecycler, PacketRecycler};
|
||||
use request_processor::RequestProcessor;
|
||||
use request_stage::RequestStage;
|
||||
use service::Service;
|
||||
use std::mem;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::Arc;
|
||||
use std::thread::{self, JoinHandle};
|
||||
use streamer;
|
||||
|
||||
pub struct Rpu {
|
||||
request_stage: RequestStage,
|
||||
thread_hdls: Vec<JoinHandle<()>>,
|
||||
request_processor: Arc<RwLock<RequestProcessor>>,
|
||||
exit: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Rpu {
|
||||
@ -48,21 +47,21 @@ impl Rpu {
|
||||
requests_socket: UdpSocket,
|
||||
respond_socket: UdpSocket,
|
||||
blob_recycler: &BlobRecycler,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let mut packet_recycler = PacketRecycler::default();
|
||||
packet_recycler.set_name("rpu::Packet");
|
||||
let (packet_sender, packet_receiver) = channel();
|
||||
let t_receiver = streamer::receiver(
|
||||
Arc::new(requests_socket),
|
||||
exit,
|
||||
exit.clone(),
|
||||
packet_recycler.clone(),
|
||||
packet_sender,
|
||||
);
|
||||
|
||||
let request_processor = Arc::new(RwLock::new(RequestProcessor::new(bank.clone())));
|
||||
let request_processor = RequestProcessor::new(bank.clone());
|
||||
let (request_stage, blob_receiver) = RequestStage::new(
|
||||
request_processor.clone(),
|
||||
request_processor,
|
||||
packet_receiver,
|
||||
packet_recycler.clone(),
|
||||
blob_recycler.clone(),
|
||||
@ -80,13 +79,17 @@ impl Rpu {
|
||||
Rpu {
|
||||
thread_hdls,
|
||||
request_stage,
|
||||
request_processor,
|
||||
exit,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_new_bank(&self, new_bank: Arc<Bank>) {
|
||||
let mut w_request_procesor = self.request_processor.write().unwrap();
|
||||
mem::replace(&mut *w_request_procesor, RequestProcessor::new(new_bank));
|
||||
pub fn exit(&self) {
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn close(self) -> thread::Result<()> {
|
||||
self.exit();
|
||||
self.join()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -810,18 +810,29 @@ fn test_leader_to_validator_transition() {
|
||||
// make sure the leader stops.
|
||||
assert!(genesis_height < leader_rotation_interval);
|
||||
for i in genesis_height..(leader_rotation_interval + extra_transactions) {
|
||||
let expected_balance = std::cmp::min(
|
||||
leader_rotation_interval - genesis_height,
|
||||
i - genesis_height,
|
||||
);
|
||||
if i < leader_rotation_interval {
|
||||
// Poll to see that the bank state is updated after every transaction
|
||||
// to ensure that each transaction is packaged as a single entry,
|
||||
// so that we can be sure leader rotation is triggered
|
||||
let expected_balance = std::cmp::min(
|
||||
leader_rotation_interval - genesis_height,
|
||||
i - genesis_height + 1,
|
||||
);
|
||||
let result = send_tx_and_retry_get_balance(
|
||||
&leader_info,
|
||||
&mint,
|
||||
&bob_pubkey,
|
||||
1,
|
||||
Some(expected_balance as i64),
|
||||
);
|
||||
|
||||
send_tx_and_retry_get_balance(
|
||||
&leader_info,
|
||||
&mint,
|
||||
&bob_pubkey,
|
||||
1,
|
||||
Some(expected_balance as i64),
|
||||
);
|
||||
assert_eq!(result, Some(expected_balance as i64))
|
||||
} else {
|
||||
// After leader_rotation_interval entries, we don't care about the
|
||||
// number of entries generated by these transactions. These are just
|
||||
// here for testing to make sure the leader stopped at the correct point.
|
||||
send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for leader to shut down tpu and restart tvu
|
||||
|
Reference in New Issue
Block a user