Skip leader slots until a vote lands (#15607)
This commit is contained in:
@ -1455,6 +1455,8 @@ pub mod test {
|
||||
None,
|
||||
&mut self.heaviest_subtree_fork_choice,
|
||||
&mut BTreeMap::new(),
|
||||
&mut true,
|
||||
&mut Vec::new(),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -41,6 +41,7 @@ use solana_sdk::{
|
||||
genesis_config::ClusterType,
|
||||
hash::Hash,
|
||||
pubkey::Pubkey,
|
||||
signature::Signature,
|
||||
signature::{Keypair, Signer},
|
||||
timing::timestamp,
|
||||
transaction::Transaction,
|
||||
@ -63,6 +64,7 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64;
|
||||
pub const MAX_UNCONFIRMED_SLOTS: usize = 5;
|
||||
pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1;
|
||||
pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD;
|
||||
const MAX_VOTE_SIGNATURES: usize = 200;
|
||||
|
||||
#[derive(PartialEq, Debug)]
|
||||
pub(crate) enum HeaviestForkFailures {
|
||||
@ -111,6 +113,7 @@ pub struct ReplayStageConfig {
|
||||
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
|
||||
pub cache_block_time_sender: Option<CacheBlockTimeSender>,
|
||||
pub bank_notification_sender: Option<BankNotificationSender>,
|
||||
pub wait_for_vote_to_start_leader: bool,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@ -282,6 +285,7 @@ impl ReplayStage {
|
||||
rewards_recorder_sender,
|
||||
cache_block_time_sender,
|
||||
bank_notification_sender,
|
||||
wait_for_vote_to_start_leader,
|
||||
} = config;
|
||||
|
||||
trace!("replay stage");
|
||||
@ -312,6 +316,8 @@ impl ReplayStage {
|
||||
let mut skipped_slots_info = SkippedSlotsInfo::default();
|
||||
let mut replay_timing = ReplayTiming::default();
|
||||
let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = BTreeMap::new();
|
||||
let mut voted_signatures = Vec::new();
|
||||
let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
|
||||
loop {
|
||||
let allocated = thread_mem_usage::Allocatedp::default();
|
||||
|
||||
@ -523,6 +529,8 @@ impl ReplayStage {
|
||||
&cache_block_time_sender,
|
||||
&bank_notification_sender,
|
||||
&mut gossip_duplicate_confirmed_slots,
|
||||
&mut voted_signatures,
|
||||
&mut has_new_vote_been_rooted,
|
||||
);
|
||||
};
|
||||
voting_time.stop();
|
||||
@ -614,6 +622,7 @@ impl ReplayStage {
|
||||
&progress,
|
||||
&retransmit_slots_sender,
|
||||
&mut skipped_slots_info,
|
||||
has_new_vote_been_rooted,
|
||||
);
|
||||
|
||||
let poh_bank = poh_recorder.lock().unwrap().bank();
|
||||
@ -1020,7 +1029,12 @@ impl ReplayStage {
|
||||
progress_map: &ProgressMap,
|
||||
retransmit_slots_sender: &RetransmitSlotsSender,
|
||||
skipped_slots_info: &mut SkippedSlotsInfo,
|
||||
has_new_vote_been_rooted: bool,
|
||||
) {
|
||||
if !has_new_vote_been_rooted {
|
||||
info!("Haven't landed a vote, so skipping my leader slot");
|
||||
return;
|
||||
}
|
||||
// all the individual calls to poh_recorder.lock() are designed to
|
||||
// increase granularity, decrease contention
|
||||
|
||||
@ -1238,6 +1252,8 @@ impl ReplayStage {
|
||||
cache_block_time_sender: &Option<CacheBlockTimeSender>,
|
||||
bank_notification_sender: &Option<BankNotificationSender>,
|
||||
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
|
||||
vote_signatures: &mut Vec<Signature>,
|
||||
has_new_vote_been_rooted: &mut bool,
|
||||
) {
|
||||
if bank.is_empty() {
|
||||
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
|
||||
@ -1290,6 +1306,8 @@ impl ReplayStage {
|
||||
highest_confirmed_root,
|
||||
heaviest_subtree_fork_choice,
|
||||
gossip_duplicate_confirmed_slots,
|
||||
has_new_vote_been_rooted,
|
||||
vote_signatures,
|
||||
);
|
||||
subscriptions.notify_roots(rooted_slots);
|
||||
if let Some(sender) = bank_notification_sender {
|
||||
@ -1319,6 +1337,8 @@ impl ReplayStage {
|
||||
last_vote,
|
||||
&tower_slots,
|
||||
switch_fork_decision,
|
||||
vote_signatures,
|
||||
*has_new_vote_been_rooted,
|
||||
);
|
||||
}
|
||||
|
||||
@ -1330,6 +1350,8 @@ impl ReplayStage {
|
||||
vote: Vote,
|
||||
tower: &[Slot],
|
||||
switch_fork_decision: &SwitchForkDecision,
|
||||
vote_signatures: &mut Vec<Signature>,
|
||||
has_new_vote_been_rooted: bool,
|
||||
) {
|
||||
if authorized_voter_keypairs.is_empty() {
|
||||
return;
|
||||
@ -1399,6 +1421,14 @@ impl ReplayStage {
|
||||
|
||||
let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey()));
|
||||
|
||||
if !has_new_vote_been_rooted {
|
||||
vote_signatures.push(vote_tx.signatures[0]);
|
||||
if vote_signatures.len() > MAX_VOTE_SIGNATURES {
|
||||
vote_signatures.remove(0);
|
||||
}
|
||||
} else {
|
||||
vote_signatures.clear();
|
||||
}
|
||||
let blockhash = bank.last_blockhash();
|
||||
vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash);
|
||||
vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash);
|
||||
@ -2125,6 +2155,8 @@ impl ReplayStage {
|
||||
highest_confirmed_root: Option<Slot>,
|
||||
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
|
||||
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
|
||||
has_new_vote_been_rooted: &mut bool,
|
||||
voted_signatures: &mut Vec<Signature>,
|
||||
) {
|
||||
bank_forks.write().unwrap().set_root(
|
||||
new_root,
|
||||
@ -2132,6 +2164,18 @@ impl ReplayStage {
|
||||
highest_confirmed_root,
|
||||
);
|
||||
let r_bank_forks = bank_forks.read().unwrap();
|
||||
let new_root_bank = &r_bank_forks[new_root];
|
||||
if !*has_new_vote_been_rooted {
|
||||
for signature in voted_signatures.iter() {
|
||||
if new_root_bank.get_signature_status(signature).is_some() {
|
||||
*has_new_vote_been_rooted = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if *has_new_vote_been_rooted {
|
||||
std::mem::take(voted_signatures);
|
||||
}
|
||||
}
|
||||
progress.handle_new_root(&r_bank_forks);
|
||||
heaviest_subtree_fork_choice.set_root(new_root);
|
||||
let mut slots_ge_root = gossip_duplicate_confirmed_slots.split_off(&new_root);
|
||||
@ -2553,6 +2597,8 @@ pub(crate) mod tests {
|
||||
None,
|
||||
&mut heaviest_subtree_fork_choice,
|
||||
&mut gossip_duplicate_confirmed_slots,
|
||||
&mut true,
|
||||
&mut Vec::new(),
|
||||
);
|
||||
assert_eq!(bank_forks.read().unwrap().root(), root);
|
||||
assert_eq!(progress.len(), 1);
|
||||
@ -2609,6 +2655,8 @@ pub(crate) mod tests {
|
||||
Some(confirmed_root),
|
||||
&mut heaviest_subtree_fork_choice,
|
||||
&mut BTreeMap::new(),
|
||||
&mut true,
|
||||
&mut Vec::new(),
|
||||
);
|
||||
assert_eq!(bank_forks.read().unwrap().root(), root);
|
||||
assert!(bank_forks.read().unwrap().get(confirmed_root).is_some());
|
||||
|
@ -422,6 +422,7 @@ impl TestValidator {
|
||||
warp_slot: config.warp_slot,
|
||||
bpf_jit: !config.no_bpf_jit,
|
||||
validator_exit: config.validator_exit.clone(),
|
||||
no_wait_for_vote_to_start_leader: true,
|
||||
..ValidatorConfig::default()
|
||||
};
|
||||
|
||||
|
@ -86,6 +86,7 @@ pub struct TvuConfig {
|
||||
pub use_index_hash_calculation: bool,
|
||||
pub rocksdb_compaction_interval: Option<u64>,
|
||||
pub rocksdb_max_compaction_jitter: Option<u64>,
|
||||
pub wait_for_vote_to_start_leader: bool,
|
||||
}
|
||||
|
||||
impl Tvu {
|
||||
@ -259,6 +260,7 @@ impl Tvu {
|
||||
rewards_recorder_sender,
|
||||
cache_block_time_sender,
|
||||
bank_notification_sender,
|
||||
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
|
||||
};
|
||||
|
||||
let replay_stage = ReplayStage::new(
|
||||
|
@ -130,6 +130,7 @@ pub struct ValidatorConfig {
|
||||
pub accounts_db_use_index_hash_calculation: bool,
|
||||
pub tpu_coalesce_ms: u64,
|
||||
pub validator_exit: Arc<RwLock<ValidatorExit>>,
|
||||
pub no_wait_for_vote_to_start_leader: bool,
|
||||
}
|
||||
|
||||
impl Default for ValidatorConfig {
|
||||
@ -184,6 +185,7 @@ impl Default for ValidatorConfig {
|
||||
accounts_db_use_index_hash_calculation: true,
|
||||
tpu_coalesce_ms: DEFAULT_TPU_COALESCE_MS,
|
||||
validator_exit: Arc::new(RwLock::new(ValidatorExit::default())),
|
||||
no_wait_for_vote_to_start_leader: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -629,15 +631,20 @@ impl Validator {
|
||||
check_poh_speed(&genesis_config, None);
|
||||
}
|
||||
|
||||
if wait_for_supermajority(
|
||||
let waited_for_supermajority = if let Ok(waited) = wait_for_supermajority(
|
||||
config,
|
||||
&bank,
|
||||
&cluster_info,
|
||||
rpc_override_health_check,
|
||||
&start_progress,
|
||||
) {
|
||||
waited
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
};
|
||||
|
||||
let wait_for_vote_to_start_leader =
|
||||
!waited_for_supermajority && !config.no_wait_for_vote_to_start_leader;
|
||||
|
||||
let poh_service = PohService::new(
|
||||
poh_recorder.clone(),
|
||||
@ -725,6 +732,7 @@ impl Validator {
|
||||
use_index_hash_calculation: config.accounts_db_use_index_hash_calculation,
|
||||
rocksdb_compaction_interval: config.rocksdb_compaction_interval,
|
||||
rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval,
|
||||
wait_for_vote_to_start_leader,
|
||||
},
|
||||
&max_slots,
|
||||
);
|
||||
@ -1292,17 +1300,28 @@ fn initialize_rpc_transaction_history_services(
|
||||
}
|
||||
}
|
||||
|
||||
// Return true on error, indicating the validator should exit.
|
||||
#[derive(Debug, PartialEq)]
|
||||
enum ValidatorError {
|
||||
BadExpectedBankHash,
|
||||
NotEnoughLedgerData,
|
||||
}
|
||||
|
||||
// Return if the validator waited on other nodes to start. In this case
|
||||
// it should not wait for one of it's votes to land to produce blocks
|
||||
// because if the whole network is waiting, then it will stall.
|
||||
//
|
||||
// Error indicates that a bad hash was encountered or another condition
|
||||
// that is unrecoverable and the validator should exit.
|
||||
fn wait_for_supermajority(
|
||||
config: &ValidatorConfig,
|
||||
bank: &Bank,
|
||||
cluster_info: &ClusterInfo,
|
||||
rpc_override_health_check: Arc<AtomicBool>,
|
||||
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
|
||||
) -> bool {
|
||||
) -> Result<bool, ValidatorError> {
|
||||
if let Some(wait_for_supermajority) = config.wait_for_supermajority {
|
||||
match wait_for_supermajority.cmp(&bank.slot()) {
|
||||
std::cmp::Ordering::Less => return false,
|
||||
std::cmp::Ordering::Less => return Ok(false),
|
||||
std::cmp::Ordering::Greater => {
|
||||
error!(
|
||||
"Ledger does not have enough data to wait for supermajority, \
|
||||
@ -1310,12 +1329,12 @@ fn wait_for_supermajority(
|
||||
bank.slot(),
|
||||
wait_for_supermajority
|
||||
);
|
||||
return true;
|
||||
return Err(ValidatorError::NotEnoughLedgerData);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if let Some(expected_bank_hash) = config.expected_bank_hash {
|
||||
@ -1325,7 +1344,7 @@ fn wait_for_supermajority(
|
||||
bank.hash(),
|
||||
expected_bank_hash
|
||||
);
|
||||
return true;
|
||||
return Err(ValidatorError::BadExpectedBankHash);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1350,7 +1369,7 @@ fn wait_for_supermajority(
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
rpc_override_health_check.store(false, Ordering::Relaxed);
|
||||
false
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn report_target_features() {
|
||||
@ -1641,17 +1660,21 @@ mod tests {
|
||||
&cluster_info,
|
||||
rpc_override_health_check.clone(),
|
||||
&start_progress,
|
||||
));
|
||||
)
|
||||
.unwrap());
|
||||
|
||||
// bank=0, wait=1, should fail
|
||||
config.wait_for_supermajority = Some(1);
|
||||
assert!(wait_for_supermajority(
|
||||
&config,
|
||||
&bank,
|
||||
&cluster_info,
|
||||
rpc_override_health_check.clone(),
|
||||
&start_progress,
|
||||
));
|
||||
assert_eq!(
|
||||
wait_for_supermajority(
|
||||
&config,
|
||||
&bank,
|
||||
&cluster_info,
|
||||
rpc_override_health_check.clone(),
|
||||
&start_progress,
|
||||
),
|
||||
Err(ValidatorError::NotEnoughLedgerData)
|
||||
);
|
||||
|
||||
// bank=1, wait=0, should pass, bank is past the wait slot
|
||||
let bank = Bank::new_from_parent(&bank, &Pubkey::default(), 1);
|
||||
@ -1662,18 +1685,22 @@ mod tests {
|
||||
&cluster_info,
|
||||
rpc_override_health_check.clone(),
|
||||
&start_progress,
|
||||
));
|
||||
)
|
||||
.unwrap());
|
||||
|
||||
// bank=1, wait=1, equal, but bad hash provided
|
||||
config.wait_for_supermajority = Some(1);
|
||||
config.expected_bank_hash = Some(hash(&[1]));
|
||||
assert!(wait_for_supermajority(
|
||||
&config,
|
||||
&bank,
|
||||
&cluster_info,
|
||||
rpc_override_health_check,
|
||||
&start_progress,
|
||||
));
|
||||
assert_eq!(
|
||||
wait_for_supermajority(
|
||||
&config,
|
||||
&bank,
|
||||
&cluster_info,
|
||||
rpc_override_health_check,
|
||||
&start_progress,
|
||||
),
|
||||
Err(ValidatorError::BadExpectedBankHash)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
Reference in New Issue
Block a user