diff --git a/src/bank.rs b/src/bank.rs index 3fc81c15a5..3d8f5094b8 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -13,7 +13,6 @@ use crate::leader_scheduler::LeaderScheduler; use crate::poh_recorder::PohRecorder; use crate::runtime::{self, RuntimeError}; use crate::status_deque::{Status, StatusDeque, MAX_ENTRY_IDS}; -use crate::storage_stage::StorageState; use bincode::deserialize; use itertools::Itertools; use log::Level; @@ -112,8 +111,6 @@ pub struct Bank { /// processed by the bank pub leader_scheduler: Arc>, - pub storage_state: StorageState, - subscriptions: RwLock>>, } @@ -124,7 +121,6 @@ impl Default for Bank { last_ids: RwLock::new(StatusDeque::default()), confirmation_time: AtomicUsize::new(std::usize::MAX), leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())), - storage_state: StorageState::new(), subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))), } } @@ -298,11 +294,6 @@ impl Bank { .expect("no last_id has been set") } - pub fn get_pubkeys_for_entry_height(&self, entry_height: u64) -> Vec { - self.storage_state - .get_pubkeys_for_entry_height(entry_height) - } - pub fn get_storage_entry_height(&self) -> u64 { match self.get_account(&storage_program::system_id()) { Some(storage_system_account) => { @@ -1884,7 +1875,6 @@ mod tests { assert_eq!(bank.get_storage_entry_height(), ENTRIES_PER_SEGMENT); assert_eq!(bank.get_storage_last_id(), storage_last_id); - assert_eq!(bank.get_pubkeys_for_entry_height(0), vec![]); } #[test] diff --git a/src/fullnode.rs b/src/fullnode.rs index 8ed2b421c8..f7902d4c97 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -10,6 +10,7 @@ use crate::leader_scheduler::LeaderScheduler; use crate::rpc::JsonRpcService; use crate::rpc_pubsub::PubSubService; use crate::service::Service; +use crate::storage_stage::StorageState; use crate::tpu::{Tpu, TpuReturnType}; use crate::tvu::{Sockets, Tvu, TvuReturnType}; use crate::vote_signer_proxy::VoteSignerProxy; @@ -232,11 +233,14 @@ impl Fullnode { entrypoint_drone_addr }; + let storage_state = StorageState::new(); + let rpc_service = JsonRpcService::new( &bank, &cluster_info, SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_addr.port()), drone_addr, + storage_state.clone(), ); let rpc_pubsub_service = PubSubService::new( @@ -304,6 +308,7 @@ impl Fullnode { db_ledger.clone(), storage_rotate_count, to_leader_sender, + &storage_state, ); let max_tick_height = { let ls_lock = bank.leader_scheduler.read().unwrap(); diff --git a/src/rpc.rs b/src/rpc.rs index 3d16be534b..e575aa7b0c 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -7,6 +7,7 @@ use crate::jsonrpc_http_server::*; use crate::packet::PACKET_DATA_SIZE; use crate::service::Service; use crate::status_deque::Status; +use crate::storage_stage::StorageState; use bincode::{deserialize, serialize}; use bs58; use solana_drone::drone::request_airdrop_transaction; @@ -36,10 +37,14 @@ impl JsonRpcService { cluster_info: &Arc>, rpc_addr: SocketAddr, drone_addr: SocketAddr, + storage_state: StorageState, ) -> Self { info!("rpc bound to {:?}", rpc_addr); let exit = Arc::new(AtomicBool::new(false)); - let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(bank.clone()))); + let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( + bank.clone(), + storage_state, + ))); request_processor.write().unwrap().bank = bank.clone(); let request_processor_ = request_processor.clone(); @@ -355,11 +360,15 @@ impl RpcSol for RpcSolImpl { #[derive(Clone)] pub struct JsonRpcRequestProcessor { bank: Arc, + storage_state: StorageState, } impl JsonRpcRequestProcessor { /// Create a new request processor that wraps the given Bank. - pub fn new(bank: Arc) -> Self { - JsonRpcRequestProcessor { bank } + pub fn new(bank: Arc, storage_state: StorageState) -> Self { + JsonRpcRequestProcessor { + bank, + storage_state, + } } /// Process JSON-RPC request items sent via JSON-RPC. @@ -386,15 +395,17 @@ impl JsonRpcRequestProcessor { Ok(self.bank.transaction_count() as u64) } fn get_storage_mining_last_id(&self) -> Result { - let id = self.bank.storage_state.get_last_id(); + let id = self.storage_state.get_last_id(); Ok(bs58::encode(id).into_string()) } fn get_storage_mining_entry_height(&self) -> Result { - let entry_height = self.bank.storage_state.get_entry_height(); + let entry_height = self.storage_state.get_entry_height(); Ok(entry_height) } fn get_storage_pubkeys_for_entry_height(&self, entry_height: u64) -> Result> { - Ok(self.bank.get_pubkeys_for_entry_height(entry_height)) + Ok(self + .storage_state + .get_pubkeys_for_entry_height(entry_height)) } } @@ -463,7 +474,10 @@ mod tests { let tx = Transaction::system_move(&alice, pubkey, 20, last_id, 0); bank.process_transaction(&tx).expect("process transaction"); - let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(Arc::new(bank)))); + let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( + Arc::new(bank), + StorageState::default(), + ))); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); @@ -497,7 +511,13 @@ mod tests { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(), ); - let rpc_service = JsonRpcService::new(&Arc::new(bank), &cluster_info, rpc_addr, drone_addr); + let rpc_service = JsonRpcService::new( + &Arc::new(bank), + &cluster_info, + rpc_addr, + drone_addr, + StorageState::default(), + ); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); @@ -520,7 +540,8 @@ mod tests { let bob_pubkey = Keypair::new().pubkey(); let bank = Bank::new(&genesis_block); let arc_bank = Arc::new(bank); - let request_processor = JsonRpcRequestProcessor::new(arc_bank.clone()); + let request_processor = + JsonRpcRequestProcessor::new(arc_bank.clone(), StorageState::default()); thread::spawn(move || { let last_id = arc_bank.last_id(); let tx = Transaction::system_move(&alice, bob_pubkey, 20, last_id, 0); @@ -705,7 +726,10 @@ mod tests { let rpc = RpcSolImpl; io.extend_with(rpc.to_delegate()); let meta = Meta { - request_processor: Arc::new(RwLock::new(JsonRpcRequestProcessor::new(Arc::new(bank)))), + request_processor: Arc::new(RwLock::new(JsonRpcRequestProcessor::new( + Arc::new(bank), + StorageState::default(), + ))), cluster_info: Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))), drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), diff --git a/src/storage_stage.rs b/src/storage_stage.rs index a98ffb3cf5..f38acc3522 100644 --- a/src/storage_stage.rs +++ b/src/storage_stage.rs @@ -48,7 +48,7 @@ pub struct StorageStateInner { entry_height: u64, } -#[derive(Default)] +#[derive(Clone, Default)] pub struct StorageState { state: Arc>, } diff --git a/src/tvu.rs b/src/tvu.rs index b3f10ff70e..41f5f474e5 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -20,7 +20,7 @@ use crate::fullnode::TvuRotationSender; use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; use crate::service::Service; -use crate::storage_stage::StorageStage; +use crate::storage_stage::{StorageStage, StorageState}; use crate::vote_signer_proxy::VoteSignerProxy; use solana_sdk::hash::Hash; use solana_sdk::signature::Keypair; @@ -60,6 +60,7 @@ impl Tvu { /// * `cluster_info` - The cluster_info state. /// * `sockets` - My fetch, repair, and restransmit sockets /// * `db_ledger` - the ledger itself + #[allow(clippy::too_many_arguments)] pub fn new( vote_signer: Option>, bank: &Arc, @@ -70,6 +71,7 @@ impl Tvu { db_ledger: Arc, storage_rotate_count: u64, to_leader_sender: TvuRotationSender, + storage_state: &StorageState, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); let keypair: Arc = cluster_info @@ -122,7 +124,7 @@ impl Tvu { ); let storage_stage = StorageStage::new( - &bank.storage_state, + storage_state, ledger_entry_receiver, Some(db_ledger), &keypair, @@ -190,7 +192,7 @@ pub mod tests { use crate::genesis_block::GenesisBlock; use crate::packet::SharedBlob; use crate::service::Service; - use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; + use crate::storage_stage::{StorageState, STORAGE_ROTATE_TEST_COUNT}; use crate::streamer; use crate::tvu::{Sockets, Tvu}; use crate::vote_signer_proxy::VoteSignerProxy; @@ -297,6 +299,7 @@ pub mod tests { Arc::new(db_ledger), STORAGE_ROTATE_TEST_COUNT, sender, + &StorageState::default(), ); let mut alice_ref_balance = starting_balance;