diff --git a/src/fullnode.rs b/src/fullnode.rs index 9eb4a18432..3464dea4f2 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -266,6 +266,7 @@ impl Fullnode { config.entry_stream.as_ref(), ledger_signal_receiver, leader_scheduler.clone(), + &subscriptions, ); let tpu = Tpu::new(id, &cluster_info); diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 5c708371cc..fba3804840 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -9,6 +9,7 @@ use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; use crate::leader_scheduler::LeaderScheduler; use crate::packet::BlobError; use crate::result::{Error, Result}; +use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::tvu::TvuRotationSender; use crate::voting_keypair::VotingKeypair; @@ -63,6 +64,7 @@ impl ReplayStage { current_blob_index: &mut u64, last_entry_id: &Arc>, leader_scheduler: &Arc>, + subscriptions: &Arc, ) -> Result<()> { // Coalesce all the available entries into a single vote submit( @@ -120,6 +122,7 @@ impl ReplayStage { } if 0 == num_ticks_to_next_vote { + subscriptions.notify_subscribers(&bank); if let Some(voting_keypair) = voting_keypair { let keypair = voting_keypair.as_ref(); let vote = VoteTransaction::new_vote( @@ -178,11 +181,13 @@ impl ReplayStage { to_leader_sender: &TvuRotationSender, ledger_signal_receiver: Receiver, leader_scheduler: &Arc>, + subscriptions: &Arc, ) -> (Self, EntryReceiver) { let (ledger_entry_sender, ledger_entry_receiver) = channel(); let exit_ = exit.clone(); let leader_scheduler_ = leader_scheduler.clone(); let to_leader_sender = to_leader_sender.clone(); + let subscriptions_ = subscriptions.clone(); let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { @@ -262,6 +267,7 @@ impl ReplayStage { &mut current_blob_index, &last_entry_id, &leader_scheduler_, + &subscriptions_, ) { error!("process_entries failed: {:?}", e); } @@ -445,6 +451,7 @@ mod test { &rotation_sender, l_receiver, &leader_scheduler, + &Arc::new(RpcSubscriptions::default()), ); let total_entries_to_send = 2 * ticks_per_slot as usize - 2; @@ -548,6 +555,7 @@ mod test { &to_leader_sender, l_receiver, &leader_scheduler, + &Arc::new(RpcSubscriptions::default()), ); let keypair = voting_keypair.as_ref(); @@ -673,6 +681,7 @@ mod test { &rotation_tx, l_receiver, &leader_scheduler, + &Arc::new(RpcSubscriptions::default()), ); let keypair = voting_keypair.as_ref(); @@ -757,6 +766,7 @@ mod test { &mut current_blob_index, &Arc::new(RwLock::new(last_entry_id)), &leader_scheduler, + &Arc::new(RpcSubscriptions::default()), ); match res { @@ -782,6 +792,7 @@ mod test { &mut current_blob_index, &Arc::new(RwLock::new(last_entry_id)), &leader_scheduler, + &Arc::new(RpcSubscriptions::default()), ); match res { diff --git a/src/tvu.rs b/src/tvu.rs index 848e8ecae2..780fa76e3d 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -20,6 +20,7 @@ use crate::entry_stream_stage::EntryStreamStage; use crate::leader_scheduler::LeaderScheduler; use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; +use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; use crate::tpu::{TpuReturnType, TpuRotationReceiver, TpuRotationSender}; @@ -79,6 +80,7 @@ impl Tvu { entry_stream: Option<&String>, ledger_signal_receiver: Receiver, leader_scheduler: Arc>, + subscriptions: &Arc, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); let keypair: Arc = cluster_info @@ -130,6 +132,7 @@ impl Tvu { to_leader_sender, ledger_signal_receiver, &leader_scheduler, + subscriptions, ); let entry_stream_stage = if entry_stream.is_some() { @@ -261,6 +264,7 @@ pub mod tests { None, l_receiver, leader_scheduler, + &Arc::new(RpcSubscriptions::default()), ); tvu.close().expect("close"); } diff --git a/tests/tvu.rs b/tests/tvu.rs index 76028edd5a..a55135bff9 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -10,6 +10,7 @@ use solana::gossip_service::GossipService; use solana::leader_scheduler::LeaderScheduler; use solana::leader_scheduler::LeaderSchedulerConfig; use solana::packet::index_blobs; +use solana::rpc_subscriptions::RpcSubscriptions; use solana::service::Service; use solana::storage_stage::StorageState; use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; @@ -127,6 +128,7 @@ fn test_replay() { None, l_receiver, leader_scheduler, + &Arc::new(RpcSubscriptions::default()), ); let mut alice_ref_balance = starting_balance;