diff --git a/src/fullnode.rs b/src/fullnode.rs index 439057b22e..4f12f1f627 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -102,9 +102,7 @@ pub struct Fullnode { retransmit_socket: UdpSocket, tpu_sockets: Vec, broadcast_socket: UdpSocket, - rpc_addr: SocketAddr, rpc_pubsub_addr: SocketAddr, - drone_addr: SocketAddr, db_ledger: Arc, vote_signer: Arc, } @@ -215,8 +213,22 @@ impl Fullnode { }; drone_addr.set_port(solana_drone::drone::DRONE_PORT); - let (rpc_service, rpc_pubsub_service) = - Self::startup_rpc_services(rpc_addr, rpc_pubsub_addr, drone_addr, &bank, &cluster_info); + // TODO: The RPC service assumes that there is a drone running on the leader + // See https://github.com/solana-labs/solana/issues/1830 + let rpc_service = JsonRpcService::new( + &bank, + &cluster_info, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_addr.port()), + drone_addr, + ); + + let rpc_pubsub_service = PubSubService::new( + &bank, + SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + rpc_pubsub_addr.port(), + ), + ); let gossip_service = GossipService::new( &cluster_info, @@ -338,21 +350,16 @@ impl Fullnode { retransmit_socket: node.sockets.retransmit, tpu_sockets: node.sockets.tpu, broadcast_socket: node.sockets.broadcast, - rpc_addr, rpc_pubsub_addr, - drone_addr, db_ledger, vote_signer, } } fn leader_to_validator(&mut self) -> Result<()> { - // Close down any services that could have a reference to the bank - if self.rpc_service.is_some() { - let old_rpc_service = self.rpc_service.take().unwrap(); - old_rpc_service.close()?; - } + trace!("leader_to_validator"); + // Close down any services that could have a reference to the bank if self.rpc_pubsub_service.is_some() { let old_rpc_pubsub_service = self.rpc_pubsub_service.take().unwrap(); old_rpc_pubsub_service.close()?; @@ -386,17 +393,21 @@ impl Fullnode { .unwrap() .set_leader(scheduled_leader); - // Spin up new versions of all the services that relied on the bank, passing in the - // new bank - let (rpc_service, rpc_pubsub_service) = Self::startup_rpc_services( - self.rpc_addr, - self.rpc_pubsub_addr, - self.drone_addr, + // + if let Some(ref mut rpc_service) = self.rpc_service { + rpc_service.set_bank(&new_bank); + } + + // TODO: Don't restart PubSubService on leader rotation + // See https://github.com/solana-labs/solana/issues/2419 + self.rpc_pubsub_service = Some(PubSubService::new( &new_bank, - &self.cluster_info, - ); - self.rpc_service = Some(rpc_service); - self.rpc_pubsub_service = Some(rpc_pubsub_service); + SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + self.rpc_pubsub_addr.port(), + ), + )); + self.bank = new_bank; // In the rare case that the leader exited on a multiple of seed_rotation_interval @@ -448,6 +459,7 @@ impl Fullnode { } fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_id: Hash) { + trace!("validator_to_leader"); self.cluster_info .write() .unwrap() @@ -571,31 +583,6 @@ impl Fullnode { &self.bank.leader_scheduler } - fn startup_rpc_services( - rpc_addr: SocketAddr, - rpc_pubsub_addr: SocketAddr, - drone_addr: SocketAddr, - bank: &Arc, - cluster_info: &Arc>, - ) -> (JsonRpcService, PubSubService) { - let rpc_port = rpc_addr.port(); - let rpc_pubsub_port = rpc_pubsub_addr.port(); - // TODO: The RPC service assumes that there is a drone running on the leader - // Drone location/id will need to be handled a different way as soon as leader rotation begins - ( - JsonRpcService::new( - bank, - cluster_info, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), - drone_addr, - ), - PubSubService::new( - bank, - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), - ), - ) - } - fn make_db_ledger(ledger_path: &str) -> Arc { Arc::new( DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"), diff --git a/src/rpc.rs b/src/rpc.rs index e6979a3454..08773cf83c 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -28,6 +28,7 @@ pub const RPC_PORT: u16 = 8899; pub struct JsonRpcService { thread_hdl: JoinHandle<()>, exit: Arc, + request_processor: Arc>, } impl JsonRpcService { @@ -37,10 +38,15 @@ impl JsonRpcService { rpc_addr: SocketAddr, drone_addr: SocketAddr, ) -> Self { + info!("rpc bound to {:?}", rpc_addr); let exit = Arc::new(AtomicBool::new(false)); - let request_processor = JsonRpcRequestProcessor::new(bank.clone()); + let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(bank.clone()))); + request_processor.write().unwrap().bank = bank.clone(); + let request_processor_ = request_processor.clone(); + let info = cluster_info.clone(); let exit_ = exit.clone(); + let thread_hdl = Builder::new() .name("solana-jsonrpc".to_string()) .spawn(move || { @@ -50,7 +56,7 @@ impl JsonRpcService { let server = ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta { - request_processor: request_processor.clone(), + request_processor: request_processor_.clone(), cluster_info: info.clone(), drone_addr, rpc_addr, @@ -69,7 +75,15 @@ impl JsonRpcService { server.unwrap().close(); }) .unwrap(); - Self { thread_hdl, exit } + Self { + thread_hdl, + exit, + request_processor, + } + } + + pub fn set_bank(&mut self, bank: &Arc) { + self.request_processor.write().unwrap().bank = bank.clone(); } pub fn exit(&self) { @@ -92,7 +106,7 @@ impl Service for JsonRpcService { #[derive(Clone)] pub struct Meta { - pub request_processor: JsonRpcRequestProcessor, + pub request_processor: Arc>, pub cluster_info: Arc>, pub rpc_addr: SocketAddr, pub drone_addr: SocketAddr, @@ -177,25 +191,35 @@ impl RpcSol for RpcSolImpl { fn get_account_info(&self, meta: Self::Metadata, id: String) -> Result { info!("get_account_info rpc request received: {:?}", id); let pubkey = verify_pubkey(id)?; - meta.request_processor.get_account_info(pubkey) + meta.request_processor + .read() + .unwrap() + .get_account_info(pubkey) } fn get_balance(&self, meta: Self::Metadata, id: String) -> Result { info!("get_balance rpc request received: {:?}", id); let pubkey = verify_pubkey(id)?; - meta.request_processor.get_balance(pubkey) + meta.request_processor.read().unwrap().get_balance(pubkey) } fn get_confirmation_time(&self, meta: Self::Metadata) -> Result { info!("get_confirmation_time rpc request received"); - meta.request_processor.get_confirmation_time() + meta.request_processor + .read() + .unwrap() + .get_confirmation_time() } fn get_last_id(&self, meta: Self::Metadata) -> Result { info!("get_last_id rpc request received"); - meta.request_processor.get_last_id() + meta.request_processor.read().unwrap().get_last_id() } fn get_signature_status(&self, meta: Self::Metadata, id: String) -> Result { info!("get_signature_status rpc request received: {:?}", id); let signature = verify_signature(&id)?; - let res = meta.request_processor.get_signature_status(signature); + let res = meta + .request_processor + .read() + .unwrap() + .get_signature_status(signature); if res.is_none() { return Ok(RpcSignatureStatus::SignatureNotFound); } @@ -216,17 +240,21 @@ impl RpcSol for RpcSolImpl { } }, }; + info!("get_signature_status rpc request status: {:?}", status); Ok(status) } fn get_transaction_count(&self, meta: Self::Metadata) -> Result { info!("get_transaction_count rpc request received"); - meta.request_processor.get_transaction_count() + meta.request_processor + .read() + .unwrap() + .get_transaction_count() } fn request_airdrop(&self, meta: Self::Metadata, id: String, tokens: u64) -> Result { trace!("request_airdrop id={} tokens={}", id, tokens); let pubkey = verify_pubkey(id)?; - let last_id = meta.request_processor.bank.last_id(); + let last_id = meta.request_processor.read().unwrap().bank.last_id(); let transaction = request_airdrop_transaction(&meta.drone_addr, &pubkey, tokens, last_id) .map_err(|err| { info!("request_airdrop_transaction failed: {:?}", err); @@ -251,7 +279,11 @@ impl RpcSol for RpcSolImpl { let now = Instant::now(); let mut signature_status; loop { - signature_status = meta.request_processor.get_signature_status(signature); + signature_status = meta + .request_processor + .read() + .unwrap() + .get_signature_status(signature); if signature_status == Some(Status::Complete(Ok(()))) { info!("airdrop signature ok"); @@ -278,6 +310,7 @@ impl RpcSol for RpcSolImpl { } let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_addr = get_leader_addr(&meta.cluster_info)?; + trace!("send_transaction: leader is {:?}", &transactions_addr); transactions_socket .send_to(&data, transactions_addr) .map_err(|err| { @@ -293,10 +326,16 @@ impl RpcSol for RpcSolImpl { Ok(signature) } fn get_storage_mining_last_id(&self, meta: Self::Metadata) -> Result { - meta.request_processor.get_storage_mining_last_id() + meta.request_processor + .read() + .unwrap() + .get_storage_mining_last_id() } fn get_storage_mining_entry_height(&self, meta: Self::Metadata) -> Result { - meta.request_processor.get_storage_mining_entry_height() + meta.request_processor + .read() + .unwrap() + .get_storage_mining_entry_height() } fn get_storage_pubkeys_for_entry_height( &self, @@ -304,6 +343,8 @@ impl RpcSol for RpcSolImpl { entry_height: u64, ) -> Result> { meta.request_processor + .read() + .unwrap() .get_storage_pubkeys_for_entry_height(entry_height) } } @@ -428,7 +469,7 @@ mod tests { let tx = Transaction::system_move(&alice.keypair(), pubkey, 20, last_id, 0); bank.process_transaction(&tx).expect("process transaction"); - let request_processor = JsonRpcRequestProcessor::new(Arc::new(bank)); + let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(Arc::new(bank)))); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); @@ -752,7 +793,7 @@ mod tests { let rpc = RpcSolImpl; io.extend_with(rpc.to_delegate()); let meta = Meta { - request_processor: JsonRpcRequestProcessor::new(Arc::new(bank)), + request_processor: Arc::new(RwLock::new(JsonRpcRequestProcessor::new(Arc::new(bank)))), cluster_info: Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))), drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index 177b6cc17f..43aea765a6 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -41,6 +41,7 @@ impl Service for PubSubService { impl PubSubService { pub fn new(bank: &Arc, pubsub_addr: SocketAddr) -> Self { + info!("rpc_pubsub bound to {:?}", pubsub_addr); let rpc = RpcSolPubSubImpl::new(bank.clone()); let exit = Arc::new(AtomicBool::new(false)); let exit_ = exit.clone();