Notify subscribers from ReplayStage

This commit is contained in:
Greg Fitzgerald
2019-02-18 19:08:54 -07:00
parent 5916177dc8
commit ad9cd23202
4 changed files with 18 additions and 0 deletions

View File

@ -266,6 +266,7 @@ impl Fullnode {
config.entry_stream.as_ref(), config.entry_stream.as_ref(),
ledger_signal_receiver, ledger_signal_receiver,
leader_scheduler.clone(), leader_scheduler.clone(),
&subscriptions,
); );
let tpu = Tpu::new(id, &cluster_info); let tpu = Tpu::new(id, &cluster_info);

View File

@ -9,6 +9,7 @@ use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use crate::leader_scheduler::LeaderScheduler; use crate::leader_scheduler::LeaderScheduler;
use crate::packet::BlobError; use crate::packet::BlobError;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service; use crate::service::Service;
use crate::tvu::TvuRotationSender; use crate::tvu::TvuRotationSender;
use crate::voting_keypair::VotingKeypair; use crate::voting_keypair::VotingKeypair;
@ -63,6 +64,7 @@ impl ReplayStage {
current_blob_index: &mut u64, current_blob_index: &mut u64,
last_entry_id: &Arc<RwLock<Hash>>, last_entry_id: &Arc<RwLock<Hash>>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
subscriptions: &Arc<RpcSubscriptions>,
) -> Result<()> { ) -> Result<()> {
// Coalesce all the available entries into a single vote // Coalesce all the available entries into a single vote
submit( submit(
@ -120,6 +122,7 @@ impl ReplayStage {
} }
if 0 == num_ticks_to_next_vote { if 0 == num_ticks_to_next_vote {
subscriptions.notify_subscribers(&bank);
if let Some(voting_keypair) = voting_keypair { if let Some(voting_keypair) = voting_keypair {
let keypair = voting_keypair.as_ref(); let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote( let vote = VoteTransaction::new_vote(
@ -178,11 +181,13 @@ impl ReplayStage {
to_leader_sender: &TvuRotationSender, to_leader_sender: &TvuRotationSender,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
subscriptions: &Arc<RpcSubscriptions>,
) -> (Self, EntryReceiver) { ) -> (Self, EntryReceiver) {
let (ledger_entry_sender, ledger_entry_receiver) = channel(); let (ledger_entry_sender, ledger_entry_receiver) = channel();
let exit_ = exit.clone(); let exit_ = exit.clone();
let leader_scheduler_ = leader_scheduler.clone(); let leader_scheduler_ = leader_scheduler.clone();
let to_leader_sender = to_leader_sender.clone(); let to_leader_sender = to_leader_sender.clone();
let subscriptions_ = subscriptions.clone();
let t_replay = Builder::new() let t_replay = Builder::new()
.name("solana-replay-stage".to_string()) .name("solana-replay-stage".to_string())
.spawn(move || { .spawn(move || {
@ -262,6 +267,7 @@ impl ReplayStage {
&mut current_blob_index, &mut current_blob_index,
&last_entry_id, &last_entry_id,
&leader_scheduler_, &leader_scheduler_,
&subscriptions_,
) { ) {
error!("process_entries failed: {:?}", e); error!("process_entries failed: {:?}", e);
} }
@ -445,6 +451,7 @@ mod test {
&rotation_sender, &rotation_sender,
l_receiver, l_receiver,
&leader_scheduler, &leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
); );
let total_entries_to_send = 2 * ticks_per_slot as usize - 2; let total_entries_to_send = 2 * ticks_per_slot as usize - 2;
@ -548,6 +555,7 @@ mod test {
&to_leader_sender, &to_leader_sender,
l_receiver, l_receiver,
&leader_scheduler, &leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
); );
let keypair = voting_keypair.as_ref(); let keypair = voting_keypair.as_ref();
@ -673,6 +681,7 @@ mod test {
&rotation_tx, &rotation_tx,
l_receiver, l_receiver,
&leader_scheduler, &leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
); );
let keypair = voting_keypair.as_ref(); let keypair = voting_keypair.as_ref();
@ -757,6 +766,7 @@ mod test {
&mut current_blob_index, &mut current_blob_index,
&Arc::new(RwLock::new(last_entry_id)), &Arc::new(RwLock::new(last_entry_id)),
&leader_scheduler, &leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
); );
match res { match res {
@ -782,6 +792,7 @@ mod test {
&mut current_blob_index, &mut current_blob_index,
&Arc::new(RwLock::new(last_entry_id)), &Arc::new(RwLock::new(last_entry_id)),
&leader_scheduler, &leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
); );
match res { match res {

View File

@ -20,6 +20,7 @@ use crate::entry_stream_stage::EntryStreamStage;
use crate::leader_scheduler::LeaderScheduler; use crate::leader_scheduler::LeaderScheduler;
use crate::replay_stage::ReplayStage; use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage; use crate::retransmit_stage::RetransmitStage;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service; use crate::service::Service;
use crate::storage_stage::{StorageStage, StorageState}; use crate::storage_stage::{StorageStage, StorageState};
use crate::tpu::{TpuReturnType, TpuRotationReceiver, TpuRotationSender}; use crate::tpu::{TpuReturnType, TpuRotationReceiver, TpuRotationSender};
@ -79,6 +80,7 @@ impl Tvu {
entry_stream: Option<&String>, entry_stream: Option<&String>,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>, leader_scheduler: Arc<RwLock<LeaderScheduler>>,
subscriptions: &Arc<RpcSubscriptions>,
) -> Self { ) -> Self {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let keypair: Arc<Keypair> = cluster_info let keypair: Arc<Keypair> = cluster_info
@ -130,6 +132,7 @@ impl Tvu {
to_leader_sender, to_leader_sender,
ledger_signal_receiver, ledger_signal_receiver,
&leader_scheduler, &leader_scheduler,
subscriptions,
); );
let entry_stream_stage = if entry_stream.is_some() { let entry_stream_stage = if entry_stream.is_some() {
@ -261,6 +264,7 @@ pub mod tests {
None, None,
l_receiver, l_receiver,
leader_scheduler, leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
); );
tvu.close().expect("close"); tvu.close().expect("close");
} }

View File

@ -10,6 +10,7 @@ use solana::gossip_service::GossipService;
use solana::leader_scheduler::LeaderScheduler; use solana::leader_scheduler::LeaderScheduler;
use solana::leader_scheduler::LeaderSchedulerConfig; use solana::leader_scheduler::LeaderSchedulerConfig;
use solana::packet::index_blobs; use solana::packet::index_blobs;
use solana::rpc_subscriptions::RpcSubscriptions;
use solana::service::Service; use solana::service::Service;
use solana::storage_stage::StorageState; use solana::storage_stage::StorageState;
use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT;
@ -127,6 +128,7 @@ fn test_replay() {
None, None,
l_receiver, l_receiver,
leader_scheduler, leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
); );
let mut alice_ref_balance = starting_balance; let mut alice_ref_balance = starting_balance;