diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 2154a4c179..8c6e2f6009 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -23,7 +23,7 @@ use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; use crate::crds_value::{CrdsValue, CrdsValueLabel, LeaderId, Vote}; use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; use crate::result::Result; -use crate::rpc::RPC_PORT; +use crate::rpc_service::RPC_PORT; use crate::streamer::{BlobReceiver, BlobSender}; use bincode::{deserialize, serialize}; use hashbrown::HashMap; diff --git a/src/contact_info.rs b/src/contact_info.rs index 3991df82e5..c83677fbad 100644 --- a/src/contact_info.rs +++ b/src/contact_info.rs @@ -1,4 +1,4 @@ -use crate::rpc::RPC_PORT; +use crate::rpc_service::RPC_PORT; use bincode::serialize; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; diff --git a/src/fullnode.rs b/src/fullnode.rs index b89e3508d6..c4a2df72b3 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -10,8 +10,8 @@ use crate::genesis_block::GenesisBlock; use crate::gossip_service::GossipService; use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig}; use crate::poh_service::PohServiceConfig; -use crate::rpc::JsonRpcService; use crate::rpc_pubsub_service::PubSubService; +use crate::rpc_service::JsonRpcService; use crate::service::Service; use crate::storage_stage::StorageState; use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender}; diff --git a/src/lib.rs b/src/lib.rs index 0a7a21c4d4..db2c19d6da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,6 +63,7 @@ pub mod rpc_mock; pub mod rpc_pubsub; pub mod rpc_pubsub_service; pub mod rpc_request; +pub mod rpc_service; pub mod rpc_subscriptions; pub mod service; pub mod sigverify; diff --git a/src/rpc.rs b/src/rpc.rs index 7a3977c7ba..b1ca0a5553 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -3,13 +3,11 @@ use crate::bank::{self, Bank, BankError}; use crate::cluster_info::ClusterInfo; use crate::packet::PACKET_DATA_SIZE; -use crate::service::Service; use crate::storage_stage::StorageState; use bincode::{deserialize, serialize}; use bs58; use jsonrpc_core::{Error, ErrorCode, MetaIoHandler, Metadata, Result}; use jsonrpc_derive::rpc; -use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; use solana_drone::drone::request_airdrop_transaction; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; @@ -18,93 +16,101 @@ use solana_sdk::transaction::Transaction; use std::mem; use std::net::{SocketAddr, UdpSocket}; use std::str::FromStr; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; -use std::thread::{self, sleep, Builder, JoinHandle}; +use std::thread::{self, sleep}; use std::time::{Duration, Instant}; -pub const RPC_PORT: u16 = 8899; - -pub struct JsonRpcService { - thread_hdl: JoinHandle<()>, - exit: Arc, - request_processor: Arc>, +#[derive(Clone)] +pub struct JsonRpcRequestProcessor { + pub bank: Arc, + storage_state: StorageState, } -impl JsonRpcService { - pub fn new( - bank: &Arc, - cluster_info: &Arc>, - rpc_addr: SocketAddr, - drone_addr: SocketAddr, - storage_state: StorageState, - ) -> Self { - info!("rpc bound to {:?}", rpc_addr); - let exit = Arc::new(AtomicBool::new(false)); - let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( - bank.clone(), +impl JsonRpcRequestProcessor { + /// Create a new request processor that wraps the given Bank. + pub fn new(bank: Arc, storage_state: StorageState) -> Self { + JsonRpcRequestProcessor { + bank, storage_state, - ))); - request_processor.write().unwrap().bank = bank.clone(); - let request_processor_ = request_processor.clone(); - - let info = cluster_info.clone(); - let exit_ = exit.clone(); - - let thread_hdl = Builder::new() - .name("solana-jsonrpc".to_string()) - .spawn(move || { - let mut io = MetaIoHandler::default(); - let rpc = RpcSolImpl; - io.extend_with(rpc.to_delegate()); - - let server = - ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta { - request_processor: request_processor_.clone(), - cluster_info: info.clone(), - drone_addr, - rpc_addr, - }).threads(4) - .cors(DomainsValidation::AllowOnly(vec![ - AccessControlAllowOrigin::Any, - ])) - .start_http(&rpc_addr); - if let Err(e) = server { - warn!("JSON RPC service unavailable error: {:?}. \nAlso, check that port {} is not already in use by another application", e, rpc_addr.port()); - return; - } - while !exit_.load(Ordering::Relaxed) { - sleep(Duration::from_millis(100)); - } - server.unwrap().close(); - }) - .unwrap(); - Self { - thread_hdl, - exit, - request_processor, } } - pub fn set_bank(&mut self, bank: &Arc) { - self.request_processor.write().unwrap().bank = bank.clone(); + /// Process JSON-RPC request items sent via JSON-RPC. + pub fn get_account_info(&self, pubkey: Pubkey) -> Result { + self.bank + .get_account(&pubkey) + .ok_or_else(Error::invalid_request) } - - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); + pub fn get_balance(&self, pubkey: Pubkey) -> Result { + let val = self.bank.get_balance(&pubkey); + Ok(val) } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() + fn get_last_id(&self) -> Result { + let id = self.bank.last_id(); + Ok(bs58::encode(id).into_string()) + } + pub fn get_signature_status(&self, signature: Signature) -> Option> { + self.bank.get_signature_status(&signature) + } + fn get_transaction_count(&self) -> Result { + Ok(self.bank.transaction_count() as u64) + } + fn get_storage_mining_last_id(&self) -> Result { + let id = self.storage_state.get_last_id(); + Ok(bs58::encode(id).into_string()) + } + fn get_storage_mining_entry_height(&self) -> Result { + let entry_height = self.storage_state.get_entry_height(); + Ok(entry_height) + } + fn get_storage_pubkeys_for_entry_height(&self, entry_height: u64) -> Result> { + Ok(self + .storage_state + .get_pubkeys_for_entry_height(entry_height)) } } -impl Service for JsonRpcService { - type JoinReturnType = (); +fn get_leader_addr(cluster_info: &Arc>) -> Result { + if let Some(leader_data) = cluster_info.read().unwrap().leader_data() { + Ok(leader_data.tpu) + } else { + Err(Error { + code: ErrorCode::InternalError, + message: "No leader detected".into(), + data: None, + }) + } +} - fn join(self) -> thread::Result<()> { - self.thread_hdl.join() +fn verify_pubkey(input: String) -> Result { + let pubkey_vec = bs58::decode(input).into_vec().map_err(|err| { + info!("verify_pubkey: invalid input: {:?}", err); + Error::invalid_request() + })?; + if pubkey_vec.len() != mem::size_of::() { + info!( + "verify_pubkey: invalid pubkey_vec length: {}", + pubkey_vec.len() + ); + Err(Error::invalid_request()) + } else { + Ok(Pubkey::new(&pubkey_vec)) + } +} + +fn verify_signature(input: &str) -> Result { + let signature_vec = bs58::decode(input).into_vec().map_err(|err| { + info!("verify_signature: invalid input: {}: {:?}", input, err); + Error::invalid_request() + })?; + if signature_vec.len() != mem::size_of::() { + info!( + "verify_signature: invalid signature_vec length: {}", + signature_vec.len() + ); + Err(Error::invalid_request()) + } else { + Ok(Signature::new(&signature_vec)) } } @@ -344,99 +350,6 @@ impl RpcSol for RpcSolImpl { .get_storage_pubkeys_for_entry_height(entry_height) } } -#[derive(Clone)] -pub struct JsonRpcRequestProcessor { - bank: Arc, - storage_state: StorageState, -} -impl JsonRpcRequestProcessor { - /// Create a new request processor that wraps the given Bank. - pub fn new(bank: Arc, storage_state: StorageState) -> Self { - JsonRpcRequestProcessor { - bank, - storage_state, - } - } - - /// Process JSON-RPC request items sent via JSON-RPC. - pub fn get_account_info(&self, pubkey: Pubkey) -> Result { - self.bank - .get_account(&pubkey) - .ok_or_else(Error::invalid_request) - } - fn get_balance(&self, pubkey: Pubkey) -> Result { - let val = self.bank.get_balance(&pubkey); - Ok(val) - } - fn get_last_id(&self) -> Result { - let id = self.bank.last_id(); - Ok(bs58::encode(id).into_string()) - } - pub fn get_signature_status(&self, signature: Signature) -> Option> { - self.bank.get_signature_status(&signature) - } - fn get_transaction_count(&self) -> Result { - Ok(self.bank.transaction_count() as u64) - } - fn get_storage_mining_last_id(&self) -> Result { - let id = self.storage_state.get_last_id(); - Ok(bs58::encode(id).into_string()) - } - fn get_storage_mining_entry_height(&self) -> Result { - let entry_height = self.storage_state.get_entry_height(); - Ok(entry_height) - } - fn get_storage_pubkeys_for_entry_height(&self, entry_height: u64) -> Result> { - Ok(self - .storage_state - .get_pubkeys_for_entry_height(entry_height)) - } -} - -fn get_leader_addr(cluster_info: &Arc>) -> Result { - if let Some(leader_data) = cluster_info.read().unwrap().leader_data() { - Ok(leader_data.tpu) - } else { - Err(Error { - code: ErrorCode::InternalError, - message: "No leader detected".into(), - data: None, - }) - } -} - -fn verify_pubkey(input: String) -> Result { - let pubkey_vec = bs58::decode(input).into_vec().map_err(|err| { - info!("verify_pubkey: invalid input: {:?}", err); - Error::invalid_request() - })?; - if pubkey_vec.len() != mem::size_of::() { - info!( - "verify_pubkey: invalid pubkey_vec length: {}", - pubkey_vec.len() - ); - Err(Error::invalid_request()) - } else { - Ok(Pubkey::new(&pubkey_vec)) - } -} - -fn verify_signature(input: &str) -> Result { - let signature_vec = bs58::decode(input).into_vec().map_err(|err| { - info!("verify_signature: invalid input: {}: {:?}", input, err); - Error::invalid_request() - })?; - if signature_vec.len() != mem::size_of::() { - info!( - "verify_signature: invalid signature_vec length: {}", - signature_vec.len() - ); - Err(Error::invalid_request()) - } else { - Ok(Signature::new(&signature_vec)) - } -} - #[cfg(test)] mod tests { use super::*; @@ -481,42 +394,6 @@ mod tests { (io, meta, last_id, alice) } - #[test] - fn test_rpc_new() { - let (genesis_block, alice) = GenesisBlock::new(10_000); - let bank = Bank::new(&genesis_block); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); - let rpc_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), - ); - let drone_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), - ); - let rpc_service = JsonRpcService::new( - &Arc::new(bank), - &cluster_info, - rpc_addr, - drone_addr, - StorageState::default(), - ); - let thread = rpc_service.thread_hdl.thread(); - assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); - - assert_eq!( - 10_000, - rpc_service - .request_processor - .read() - .unwrap() - .get_balance(alice.pubkey()) - .unwrap() - ); - - rpc_service.close().unwrap(); - } - #[test] fn test_rpc_request_processor_new() { let (genesis_block, alice) = GenesisBlock::new(10_000); diff --git a/src/rpc_service.rs b/src/rpc_service.rs new file mode 100644 index 0000000000..5f9a796aad --- /dev/null +++ b/src/rpc_service.rs @@ -0,0 +1,146 @@ +//! The `rpc_service` module implements the Solana JSON RPC service. + +use crate::bank::Bank; +use crate::cluster_info::ClusterInfo; +use crate::rpc::*; +use crate::service::Service; +use crate::storage_stage::StorageState; +use bs58; +use jsonrpc_core::MetaIoHandler; +use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder}; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::{self, sleep, Builder, JoinHandle}; +use std::time::Duration; + +pub const RPC_PORT: u16 = 8899; + +pub struct JsonRpcService { + thread_hdl: JoinHandle<()>, + exit: Arc, + request_processor: Arc>, +} + +impl JsonRpcService { + pub fn new( + bank: &Arc, + cluster_info: &Arc>, + rpc_addr: SocketAddr, + drone_addr: SocketAddr, + storage_state: StorageState, + ) -> Self { + info!("rpc bound to {:?}", rpc_addr); + let exit = Arc::new(AtomicBool::new(false)); + let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( + bank.clone(), + storage_state, + ))); + request_processor.write().unwrap().bank = bank.clone(); + let request_processor_ = request_processor.clone(); + + let info = cluster_info.clone(); + let exit_ = exit.clone(); + + let thread_hdl = Builder::new() + .name("solana-jsonrpc".to_string()) + .spawn(move || { + let mut io = MetaIoHandler::default(); + let rpc = RpcSolImpl; + io.extend_with(rpc.to_delegate()); + + let server = + ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta { + request_processor: request_processor_.clone(), + cluster_info: info.clone(), + drone_addr, + rpc_addr, + }).threads(4) + .cors(DomainsValidation::AllowOnly(vec![ + AccessControlAllowOrigin::Any, + ])) + .start_http(&rpc_addr); + if let Err(e) = server { + warn!("JSON RPC service unavailable error: {:?}. \nAlso, check that port {} is not already in use by another application", e, rpc_addr.port()); + return; + } + while !exit_.load(Ordering::Relaxed) { + sleep(Duration::from_millis(100)); + } + server.unwrap().close(); + }) + .unwrap(); + Self { + thread_hdl, + exit, + request_processor, + } + } + + pub fn set_bank(&mut self, bank: &Arc) { + self.request_processor.write().unwrap().bank = bank.clone(); + } + + pub fn exit(&self) { + self.exit.store(true, Ordering::Relaxed); + } + + pub fn close(self) -> thread::Result<()> { + self.exit(); + self.join() + } +} + +impl Service for JsonRpcService { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::bank::Bank; + use crate::cluster_info::NodeInfo; + use crate::genesis_block::GenesisBlock; + use solana_sdk::signature::KeypairUtil; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + #[test] + fn test_rpc_new() { + let (genesis_block, alice) = GenesisBlock::new(10_000); + let bank = Bank::new(&genesis_block); + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); + let rpc_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), + ); + let drone_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), + ); + let rpc_service = JsonRpcService::new( + &Arc::new(bank), + &cluster_info, + rpc_addr, + drone_addr, + StorageState::default(), + ); + let thread = rpc_service.thread_hdl.thread(); + assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); + + assert_eq!( + 10_000, + rpc_service + .request_processor + .read() + .unwrap() + .get_balance(alice.pubkey()) + .unwrap() + ); + + rpc_service.close().unwrap(); + } +}