diff --git a/src/compute_leader_finality_service.rs b/src/compute_leader_finality_service.rs index 36f64197af..709ac3fdbe 100644 --- a/src/compute_leader_finality_service.rs +++ b/src/compute_leader_finality_service.rs @@ -21,7 +21,7 @@ pub enum FinalityError { NoValidSupermajority, } -pub const COMPUTE_FINALITY_MS: u64 = 1000; +pub const COMPUTE_FINALITY_MS: u64 = 100; pub struct ComputeLeaderFinalityService { compute_finality_thread: JoinHandle<()>, diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 72d3c94a88..d56be6b35e 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -25,6 +25,9 @@ use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; +pub const BLOCK_TICK_COUNT: u64 = 8; +pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; + #[derive(Debug, PartialEq, Eq, Clone)] pub enum ReplayStageReturnType { LeaderRotation(u64, u64, Hash), @@ -71,6 +74,9 @@ impl ReplayStage { let mut entries = window_receiver.recv_timeout(timer)?; while let Ok(mut more) = window_receiver.try_recv() { entries.append(&mut more); + if entries.len() >= MAX_ENTRY_RECV_PER_ITER { + break; + } } submit( @@ -94,9 +100,25 @@ impl ReplayStage { let (current_leader, _) = bank .get_current_leader() .expect("Scheduled leader should be calculated by this point"); + let my_id = keypair.pubkey(); for (i, entry) in entries.iter().enumerate() { res = bank.process_entry(&entry); - let my_id = keypair.pubkey(); + if res.is_err() { + // TODO: This will return early from the first entry that has an erroneous + // transaction, instead of processing the rest of the entries in the vector + // of received entries. This is in line with previous behavior when + // bank.process_entries() was used to process the entries, but doesn't solve the + // issue that the bank state was still changed, leading to inconsistencies with the + // leader as the leader currently should not be publishing erroneous transactions + break; + } + + if bank.tick_height() % BLOCK_TICK_COUNT == 0 { + if let Some(sender) = vote_blob_sender { + send_validator_vote(bank, vote_account_keypair, &cluster_info, sender).unwrap(); + } + } + let (scheduled_leader, _) = bank .get_current_leader() .expect("Scheduled leader should be calculated by this point"); @@ -105,20 +127,11 @@ impl ReplayStage { if scheduled_leader != current_leader { cluster_info.write().unwrap().set_leader(scheduled_leader); } + if my_id == scheduled_leader { num_entries_to_write = i + 1; break; } - - if res.is_err() { - // TODO: This will return early from the first entry that has an erroneous - // transaction, instead of processing the rest of the entries in the vector - // of received entries. This is in line with previous behavior when - // bank.process_entries() was used to process the entries, but doesn't solve the - // issue that the bank state was still changed, leading to inconsistencies with the - // leader as the leader currently should not be publishing erroneous transactions - break; - } } // If leader rotation happened, only write the entries up to leader rotation. @@ -134,7 +147,6 @@ impl ReplayStage { ); let entries_len = entries.len() as u64; - // TODO: move this to another stage? // TODO: In line with previous behavior, this will write all the entries even if // an error occurred processing one of the entries (causing the rest of the entries to // not be processed). @@ -144,9 +156,10 @@ impl ReplayStage { *entry_height += entries_len; res?; - if let Some(sender) = vote_blob_sender { - send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)?; - } + inc_new_counter_info!( + "replicate_stage-duration", + duration_as_ms(&now.elapsed()) as usize + ); Ok(()) } @@ -173,8 +186,6 @@ impl ReplayStage { .name("solana-replay-stage".to_string()) .spawn(move || { let _exit = Finalizer::new(exit); - let now = Instant::now(); - let mut next_vote_secs = 1; let mut entry_height_ = entry_height; let mut last_entry_id = last_entry_id; loop { @@ -194,21 +205,13 @@ impl ReplayStage { )); } - // Only vote once a second. - let vote_sender = if now.elapsed().as_secs() > next_vote_secs { - next_vote_secs += 1; - Some(&vote_blob_sender) - } else { - None - }; - match Self::process_entries( &bank, &cluster_info, &window_receiver, &keypair, &vote_account_keypair, - vote_sender, + Some(&vote_blob_sender), &ledger_entry_sender, &mut entry_height_, &mut last_entry_id, @@ -258,7 +261,7 @@ mod test { use crate::replay_stage::{ReplayStage, ReplayStageReturnType}; use crate::result::Error; use crate::service::Service; - use crate::vote_stage::{send_validator_vote, VoteError}; + use crate::vote_stage::send_validator_vote; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; @@ -437,13 +440,8 @@ mod test { // Vote sender should error because no leader contact info is found in the // ClusterInfo let (mock_sender, _mock_receiver) = channel(); - let vote_err = + let _vote_err = send_validator_vote(&bank, &vote_account_keypair, &cluster_info_me, &mock_sender); - if let Err(Error::VoteError(vote_error)) = vote_err { - assert_eq!(vote_error, VoteError::LeaderInfoNotFound); - } else { - panic!("Expected validator vote to fail with LeaderInfoNotFound"); - } // Send ReplayStage an entry, should see it on the ledger writer receiver let next_tick = create_ticks( @@ -549,13 +547,8 @@ mod test { // Vote sender should error because no leader contact info is found in the // ClusterInfo let (mock_sender, _mock_receiver) = channel(); - let vote_err = + let _vote_err = send_validator_vote(&bank, &vote_account_keypair, &cluster_info_me, &mock_sender); - if let Err(Error::VoteError(vote_error)) = vote_err { - assert_eq!(vote_error, VoteError::LeaderInfoNotFound); - } else { - panic!("Expected validator vote to fail with LeaderInfoNotFound"); - } // Send enough ticks to trigger leader rotation let total_entries_to_send = (bootstrap_height - initial_tick_height) as usize; diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 2a1584bc20..b3845f52df 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -74,9 +74,10 @@ pub fn send_validator_vote( ) -> Result<()> { let last_id = bank.last_id(); - let shared_blob = create_new_signed_vote_blob(&last_id, vote_account, bank, cluster_info)?; - inc_new_counter_info!("validator-vote_sent", 1); - vote_blob_sender.send(vec![shared_blob])?; - + if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, vote_account, bank, cluster_info) + { + inc_new_counter_info!("validator-vote_sent", 1); + vote_blob_sender.send(vec![shared_blob])?; + } Ok(()) }