Remove storage_state from the bank
Construct in TVU and pass to RPC and StorageStage instead.
This commit is contained in:
parent
c9bf9ce094
commit
53afa64634
10
src/bank.rs
10
src/bank.rs
@ -13,7 +13,6 @@ use crate::leader_scheduler::LeaderScheduler;
|
||||
use crate::poh_recorder::PohRecorder;
|
||||
use crate::runtime::{self, RuntimeError};
|
||||
use crate::status_deque::{Status, StatusDeque, MAX_ENTRY_IDS};
|
||||
use crate::storage_stage::StorageState;
|
||||
use bincode::deserialize;
|
||||
use itertools::Itertools;
|
||||
use log::Level;
|
||||
@ -112,8 +111,6 @@ pub struct Bank {
|
||||
/// processed by the bank
|
||||
pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
||||
|
||||
pub storage_state: StorageState,
|
||||
|
||||
subscriptions: RwLock<Box<Arc<BankSubscriptions + Send + Sync>>>,
|
||||
}
|
||||
|
||||
@ -124,7 +121,6 @@ impl Default for Bank {
|
||||
last_ids: RwLock::new(StatusDeque::default()),
|
||||
confirmation_time: AtomicUsize::new(std::usize::MAX),
|
||||
leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())),
|
||||
storage_state: StorageState::new(),
|
||||
subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))),
|
||||
}
|
||||
}
|
||||
@ -298,11 +294,6 @@ impl Bank {
|
||||
.expect("no last_id has been set")
|
||||
}
|
||||
|
||||
pub fn get_pubkeys_for_entry_height(&self, entry_height: u64) -> Vec<Pubkey> {
|
||||
self.storage_state
|
||||
.get_pubkeys_for_entry_height(entry_height)
|
||||
}
|
||||
|
||||
pub fn get_storage_entry_height(&self) -> u64 {
|
||||
match self.get_account(&storage_program::system_id()) {
|
||||
Some(storage_system_account) => {
|
||||
@ -1884,7 +1875,6 @@ mod tests {
|
||||
|
||||
assert_eq!(bank.get_storage_entry_height(), ENTRIES_PER_SEGMENT);
|
||||
assert_eq!(bank.get_storage_last_id(), storage_last_id);
|
||||
assert_eq!(bank.get_pubkeys_for_entry_height(0), vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -10,6 +10,7 @@ use crate::leader_scheduler::LeaderScheduler;
|
||||
use crate::rpc::JsonRpcService;
|
||||
use crate::rpc_pubsub::PubSubService;
|
||||
use crate::service::Service;
|
||||
use crate::storage_stage::StorageState;
|
||||
use crate::tpu::{Tpu, TpuReturnType};
|
||||
use crate::tvu::{Sockets, Tvu, TvuReturnType};
|
||||
use crate::vote_signer_proxy::VoteSignerProxy;
|
||||
@ -232,11 +233,14 @@ impl Fullnode {
|
||||
entrypoint_drone_addr
|
||||
};
|
||||
|
||||
let storage_state = StorageState::new();
|
||||
|
||||
let rpc_service = JsonRpcService::new(
|
||||
&bank,
|
||||
&cluster_info,
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_addr.port()),
|
||||
drone_addr,
|
||||
storage_state.clone(),
|
||||
);
|
||||
|
||||
let rpc_pubsub_service = PubSubService::new(
|
||||
@ -304,6 +308,7 @@ impl Fullnode {
|
||||
db_ledger.clone(),
|
||||
storage_rotate_count,
|
||||
to_leader_sender,
|
||||
&storage_state,
|
||||
);
|
||||
let max_tick_height = {
|
||||
let ls_lock = bank.leader_scheduler.read().unwrap();
|
||||
|
44
src/rpc.rs
44
src/rpc.rs
@ -7,6 +7,7 @@ use crate::jsonrpc_http_server::*;
|
||||
use crate::packet::PACKET_DATA_SIZE;
|
||||
use crate::service::Service;
|
||||
use crate::status_deque::Status;
|
||||
use crate::storage_stage::StorageState;
|
||||
use bincode::{deserialize, serialize};
|
||||
use bs58;
|
||||
use solana_drone::drone::request_airdrop_transaction;
|
||||
@ -36,10 +37,14 @@ impl JsonRpcService {
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
rpc_addr: SocketAddr,
|
||||
drone_addr: SocketAddr,
|
||||
storage_state: StorageState,
|
||||
) -> Self {
|
||||
info!("rpc bound to {:?}", rpc_addr);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(bank.clone())));
|
||||
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
|
||||
bank.clone(),
|
||||
storage_state,
|
||||
)));
|
||||
request_processor.write().unwrap().bank = bank.clone();
|
||||
let request_processor_ = request_processor.clone();
|
||||
|
||||
@ -355,11 +360,15 @@ impl RpcSol for RpcSolImpl {
|
||||
#[derive(Clone)]
|
||||
pub struct JsonRpcRequestProcessor {
|
||||
bank: Arc<Bank>,
|
||||
storage_state: StorageState,
|
||||
}
|
||||
impl JsonRpcRequestProcessor {
|
||||
/// Create a new request processor that wraps the given Bank.
|
||||
pub fn new(bank: Arc<Bank>) -> Self {
|
||||
JsonRpcRequestProcessor { bank }
|
||||
pub fn new(bank: Arc<Bank>, storage_state: StorageState) -> Self {
|
||||
JsonRpcRequestProcessor {
|
||||
bank,
|
||||
storage_state,
|
||||
}
|
||||
}
|
||||
|
||||
/// Process JSON-RPC request items sent via JSON-RPC.
|
||||
@ -386,15 +395,17 @@ impl JsonRpcRequestProcessor {
|
||||
Ok(self.bank.transaction_count() as u64)
|
||||
}
|
||||
fn get_storage_mining_last_id(&self) -> Result<String> {
|
||||
let id = self.bank.storage_state.get_last_id();
|
||||
let id = self.storage_state.get_last_id();
|
||||
Ok(bs58::encode(id).into_string())
|
||||
}
|
||||
fn get_storage_mining_entry_height(&self) -> Result<u64> {
|
||||
let entry_height = self.bank.storage_state.get_entry_height();
|
||||
let entry_height = self.storage_state.get_entry_height();
|
||||
Ok(entry_height)
|
||||
}
|
||||
fn get_storage_pubkeys_for_entry_height(&self, entry_height: u64) -> Result<Vec<Pubkey>> {
|
||||
Ok(self.bank.get_pubkeys_for_entry_height(entry_height))
|
||||
Ok(self
|
||||
.storage_state
|
||||
.get_pubkeys_for_entry_height(entry_height))
|
||||
}
|
||||
}
|
||||
|
||||
@ -463,7 +474,10 @@ mod tests {
|
||||
let tx = Transaction::system_move(&alice, pubkey, 20, last_id, 0);
|
||||
bank.process_transaction(&tx).expect("process transaction");
|
||||
|
||||
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(Arc::new(bank))));
|
||||
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
|
||||
Arc::new(bank),
|
||||
StorageState::default(),
|
||||
)));
|
||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
|
||||
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
|
||||
|
||||
@ -497,7 +511,13 @@ mod tests {
|
||||
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||
solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(),
|
||||
);
|
||||
let rpc_service = JsonRpcService::new(&Arc::new(bank), &cluster_info, rpc_addr, drone_addr);
|
||||
let rpc_service = JsonRpcService::new(
|
||||
&Arc::new(bank),
|
||||
&cluster_info,
|
||||
rpc_addr,
|
||||
drone_addr,
|
||||
StorageState::default(),
|
||||
);
|
||||
let thread = rpc_service.thread_hdl.thread();
|
||||
assert_eq!(thread.name().unwrap(), "solana-jsonrpc");
|
||||
|
||||
@ -520,7 +540,8 @@ mod tests {
|
||||
let bob_pubkey = Keypair::new().pubkey();
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let arc_bank = Arc::new(bank);
|
||||
let request_processor = JsonRpcRequestProcessor::new(arc_bank.clone());
|
||||
let request_processor =
|
||||
JsonRpcRequestProcessor::new(arc_bank.clone(), StorageState::default());
|
||||
thread::spawn(move || {
|
||||
let last_id = arc_bank.last_id();
|
||||
let tx = Transaction::system_move(&alice, bob_pubkey, 20, last_id, 0);
|
||||
@ -705,7 +726,10 @@ mod tests {
|
||||
let rpc = RpcSolImpl;
|
||||
io.extend_with(rpc.to_delegate());
|
||||
let meta = Meta {
|
||||
request_processor: Arc::new(RwLock::new(JsonRpcRequestProcessor::new(Arc::new(bank)))),
|
||||
request_processor: Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
|
||||
Arc::new(bank),
|
||||
StorageState::default(),
|
||||
))),
|
||||
cluster_info: Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))),
|
||||
drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
||||
rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
||||
|
@ -48,7 +48,7 @@ pub struct StorageStateInner {
|
||||
entry_height: u64,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Clone, Default)]
|
||||
pub struct StorageState {
|
||||
state: Arc<RwLock<StorageStateInner>>,
|
||||
}
|
||||
|
@ -20,7 +20,7 @@ use crate::fullnode::TvuRotationSender;
|
||||
use crate::replay_stage::ReplayStage;
|
||||
use crate::retransmit_stage::RetransmitStage;
|
||||
use crate::service::Service;
|
||||
use crate::storage_stage::StorageStage;
|
||||
use crate::storage_stage::{StorageStage, StorageState};
|
||||
use crate::vote_signer_proxy::VoteSignerProxy;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::Keypair;
|
||||
@ -60,6 +60,7 @@ impl Tvu {
|
||||
/// * `cluster_info` - The cluster_info state.
|
||||
/// * `sockets` - My fetch, repair, and restransmit sockets
|
||||
/// * `db_ledger` - the ledger itself
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
vote_signer: Option<Arc<VoteSignerProxy>>,
|
||||
bank: &Arc<Bank>,
|
||||
@ -70,6 +71,7 @@ impl Tvu {
|
||||
db_ledger: Arc<DbLedger>,
|
||||
storage_rotate_count: u64,
|
||||
to_leader_sender: TvuRotationSender,
|
||||
storage_state: &StorageState,
|
||||
) -> Self {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let keypair: Arc<Keypair> = cluster_info
|
||||
@ -122,7 +124,7 @@ impl Tvu {
|
||||
);
|
||||
|
||||
let storage_stage = StorageStage::new(
|
||||
&bank.storage_state,
|
||||
storage_state,
|
||||
ledger_entry_receiver,
|
||||
Some(db_ledger),
|
||||
&keypair,
|
||||
@ -190,7 +192,7 @@ pub mod tests {
|
||||
use crate::genesis_block::GenesisBlock;
|
||||
use crate::packet::SharedBlob;
|
||||
use crate::service::Service;
|
||||
use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT;
|
||||
use crate::storage_stage::{StorageState, STORAGE_ROTATE_TEST_COUNT};
|
||||
use crate::streamer;
|
||||
use crate::tvu::{Sockets, Tvu};
|
||||
use crate::vote_signer_proxy::VoteSignerProxy;
|
||||
@ -297,6 +299,7 @@ pub mod tests {
|
||||
Arc::new(db_ledger),
|
||||
STORAGE_ROTATE_TEST_COUNT,
|
||||
sender,
|
||||
&StorageState::default(),
|
||||
);
|
||||
|
||||
let mut alice_ref_balance = starting_balance;
|
||||
|
Loading…
x
Reference in New Issue
Block a user