Remove Box for RPC pubsub subscriptions

This commit is contained in:
Pankaj Garg
2019-02-11 15:25:26 -08:00
committed by Grimes
parent d41dec9395
commit 144d321193
2 changed files with 16 additions and 29 deletions

View File

@ -12,6 +12,7 @@ use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS};
use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig}; use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig};
use crate::poh_recorder::{PohRecorder, PohRecorderError}; use crate::poh_recorder::{PohRecorder, PohRecorderError};
use crate::result::Error; use crate::result::Error;
use crate::rpc_pubsub::RpcSubscriptions;
use crate::status_cache::StatusCache; use crate::status_cache::StatusCache;
use bincode::deserialize; use bincode::deserialize;
use itertools::Itertools; use itertools::Itertools;
@ -93,18 +94,6 @@ pub trait BankSubscriptions {
fn check_signature(&self, signature: &Signature, status: &Result<()>); fn check_signature(&self, signature: &Signature, status: &Result<()>);
} }
struct LocalSubscriptions {}
impl Default for LocalSubscriptions {
fn default() -> Self {
LocalSubscriptions {}
}
}
impl BankSubscriptions for LocalSubscriptions {
fn check_account(&self, _pubkey: &Pubkey, _account: &Account) {}
fn check_signature(&self, _signature: &Signature, _status: &Result<()>) {}
}
type BankStatusCache = StatusCache<BankError>; type BankStatusCache = StatusCache<BankError>;
/// Manager for the state of all accounts and programs after processing its entries. /// Manager for the state of all accounts and programs after processing its entries.
@ -124,7 +113,7 @@ pub struct Bank {
/// processed by the bank /// processed by the bank
pub leader_scheduler: Arc<RwLock<LeaderScheduler>>, pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,
subscriptions: RwLock<Box<Arc<BankSubscriptions + Send + Sync>>>, subscriptions: RwLock<Option<Arc<RpcSubscriptions>>>,
} }
impl Default for Bank { impl Default for Bank {
@ -135,7 +124,7 @@ impl Default for Bank {
status_cache: RwLock::new(BankStatusCache::default()), status_cache: RwLock::new(BankStatusCache::default()),
confirmation_time: AtomicUsize::new(std::usize::MAX), confirmation_time: AtomicUsize::new(std::usize::MAX),
leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())), leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())),
subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))), subscriptions: RwLock::new(None),
} }
} }
} }
@ -157,9 +146,9 @@ impl Bank {
Self::new_with_leader_scheduler_config(genesis_block, &LeaderSchedulerConfig::default()) Self::new_with_leader_scheduler_config(genesis_block, &LeaderSchedulerConfig::default())
} }
pub fn set_subscriptions(&self, subscriptions: Box<Arc<BankSubscriptions + Send + Sync>>) { pub fn set_subscriptions(&self, subscriptions: Arc<RpcSubscriptions>) {
let mut sub = self.subscriptions.write().unwrap(); let mut sub = self.subscriptions.write().unwrap();
*sub = subscriptions *sub = Some(subscriptions)
} }
pub fn copy_for_tpu(&self) -> Self { pub fn copy_for_tpu(&self) -> Self {
@ -171,7 +160,7 @@ impl Bank {
last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()), last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()),
confirmation_time: AtomicUsize::new(self.confirmation_time()), confirmation_time: AtomicUsize::new(self.confirmation_time()),
leader_scheduler: self.leader_scheduler.clone(), leader_scheduler: self.leader_scheduler.clone(),
subscriptions: RwLock::new(Box::new(Arc::new(LocalSubscriptions::default()))), subscriptions: RwLock::new(None),
} }
} }
@ -352,10 +341,9 @@ impl Bank {
fn update_subscriptions(&self, txs: &[Transaction], res: &[Result<()>]) { fn update_subscriptions(&self, txs: &[Transaction], res: &[Result<()>]) {
for (i, tx) in txs.iter().enumerate() { for (i, tx) in txs.iter().enumerate() {
self.subscriptions if let Some(ref subs) = *self.subscriptions.read().unwrap() {
.read() subs.check_signature(&tx.signatures[0], &res[i]);
.unwrap() }
.check_signature(&tx.signatures[0], &res[i]);
} }
} }
fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
@ -932,10 +920,9 @@ impl Bank {
let tx = &txs[i]; let tx = &txs[i];
let accs = raccs.as_ref().unwrap(); let accs = raccs.as_ref().unwrap();
for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) { for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) {
self.subscriptions if let Some(ref subs) = *self.subscriptions.read().unwrap() {
.read() subs.check_account(&key, account)
.unwrap() }
.check_account(&key, account);
} }
} }
} }

View File

@ -43,7 +43,7 @@ impl PubSubService {
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone()))); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone(); let subscription = rpc.subscription.clone();
bank.set_subscriptions(Box::new(subscription.clone())); bank.set_subscriptions(subscription.clone());
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let exit_ = exit.clone(); let exit_ = exit.clone();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
@ -82,7 +82,7 @@ impl PubSubService {
pub fn set_bank(&self, bank: &Arc<Bank>) { pub fn set_bank(&self, bank: &Arc<Bank>) {
self.rpc_bank.write().unwrap().bank = bank.clone(); self.rpc_bank.write().unwrap().bank = bank.clone();
bank.set_subscriptions(Box::new(self.subscription.clone())); bank.set_subscriptions(self.subscription.clone());
} }
pub fn exit(&self) { pub fn exit(&self) {
@ -429,7 +429,7 @@ mod tests {
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone(); let subscription = rpc.subscription.clone();
arc_bank.set_subscriptions(Box::new(subscription)); arc_bank.set_subscriptions(subscription);
// Test signature subscription // Test signature subscription
let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0); let tx = SystemTransaction::new_move(&alice, bob_pubkey, 20, last_id, 0);
@ -516,7 +516,7 @@ mod tests {
let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone()))); let rpc_bank = Arc::new(RwLock::new(RpcPubSubBank::new(arc_bank.clone())));
let rpc = RpcSolPubSubImpl::new(rpc_bank.clone()); let rpc = RpcSolPubSubImpl::new(rpc_bank.clone());
let subscription = rpc.subscription.clone(); let subscription = rpc.subscription.clone();
arc_bank.set_subscriptions(Box::new(subscription)); arc_bank.set_subscriptions(subscription);
let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification"); let (subscriber, _id_receiver, mut receiver) = Subscriber::new_test("accountNotification");
rpc.subscribe_to_account_updates(subscriber, contract_state.pubkey().to_string()); rpc.subscribe_to_account_updates(subscriber, contract_state.pubkey().to_string());