diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index 6ae5d7962b..8ac89e9dc4 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -958,8 +958,8 @@ mod tests { // 3) There should only be repairmen who are not responsible for repairing this slot // if we have more repairman than `num_blobs_in_slot * repair_redundancy`. In this case the - // first `num_blobs_in_slot * repair_redundancy` repairmen woudl send one blob, and the rest - // would noe be responsible for sending any repairs + // first `num_blobs_in_slot * repair_redundancy` repairmen would send one blob, and the rest + // would not be responsible for sending any repairs assert_eq!( none_results, num_repairmen.saturating_sub(num_blobs_in_slot * repair_redundancy) diff --git a/core/src/leader_schedule_cache.rs b/core/src/leader_schedule_cache.rs index cf59f9288d..e1bc6eb18c 100644 --- a/core/src/leader_schedule_cache.rs +++ b/core/src/leader_schedule_cache.rs @@ -103,7 +103,7 @@ impl LeaderScheduleCache { // Forbid asking for slots in an unconfirmed epoch let bank_epoch = self.epoch_schedule.get_epoch_and_slot_index(slot).0; if bank_epoch > *self.max_epoch.read().unwrap() { - error!( + debug!( "Requested leader in slot: {} of unconfirmed epoch: {}", slot, bank_epoch ); diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index f4b8a10142..ebfbd6e097 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -64,6 +64,7 @@ pub struct ClusterConfig { pub cluster_lamports: u64, pub ticks_per_slot: u64, pub slots_per_epoch: u64, + pub stakers_slot_offset: u64, pub native_instruction_processors: Vec<(String, Pubkey)>, pub poh_config: PohConfig, } @@ -78,6 +79,7 @@ impl Default for ClusterConfig { cluster_lamports: 0, ticks_per_slot: DEFAULT_TICKS_PER_SLOT, slots_per_epoch: DEFAULT_SLOTS_PER_EPOCH, + stakers_slot_offset: DEFAULT_SLOTS_PER_EPOCH, native_instruction_processors: vec![], poh_config: PohConfig::default(), } @@ -130,6 +132,7 @@ impl LocalCluster { ); genesis_block.ticks_per_slot = config.ticks_per_slot; genesis_block.slots_per_epoch = config.slots_per_epoch; + genesis_block.stakers_slot_offset = config.stakers_slot_offset; genesis_block.poh_config = config.poh_config.clone(); genesis_block .native_instruction_processors @@ -223,7 +226,7 @@ impl LocalCluster { } } - fn add_validator(&mut self, validator_config: &ValidatorConfig, stake: u64) { + pub fn add_validator(&mut self, validator_config: &ValidatorConfig, stake: u64) { let client = create_client( self.entry_point_info.client_facing_addr(), FULLNODE_PORT_RANGE, diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 57186d56a6..524bbd1632 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -4,6 +4,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::{Blocktree, CompletedSlotsReceiver, SlotMeta}; use crate::cluster_info::ClusterInfo; +use crate::cluster_info_repair_listener::ClusterInfoRepairListener; use crate::result::Result; use crate::service::Service; use solana_metrics::datapoint_info; @@ -56,23 +57,36 @@ impl Default for RepairSlotRange { pub struct RepairService { t_repair: JoinHandle<()>, + cluster_info_repair_listener: Option, } impl RepairService { pub fn new( blocktree: Arc, - exit: &Arc, + exit: Arc, repair_socket: Arc, cluster_info: Arc>, repair_strategy: RepairStrategy, ) -> Self { - let exit = exit.clone(); + let cluster_info_repair_listener = match repair_strategy { + RepairStrategy::RepairAll { + ref epoch_schedule, .. + } => Some(ClusterInfoRepairListener::new( + &blocktree, + &exit, + cluster_info.clone(), + *epoch_schedule, + )), + + _ => None, + }; + let t_repair = Builder::new() .name("solana-repair-service".to_string()) .spawn(move || { Self::run( &blocktree, - exit, + &exit, &repair_socket, &cluster_info, repair_strategy, @@ -80,12 +94,15 @@ impl RepairService { }) .unwrap(); - RepairService { t_repair } + RepairService { + t_repair, + cluster_info_repair_listener, + } } fn run( blocktree: &Arc, - exit: Arc, + exit: &Arc, repair_socket: &Arc, cluster_info: &Arc>, repair_strategy: RepairStrategy, @@ -373,7 +390,14 @@ impl Service for RepairService { type JoinReturnType = (); fn join(self) -> thread::Result<()> { - self.t_repair.join() + let mut results = vec![self.t_repair.join()]; + if let Some(cluster_info_repair_listener) = self.cluster_info_repair_listener { + results.push(cluster_info_repair_listener.join()); + } + for r in results { + r?; + } + Ok(()) } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 33acef566b..abc9dcbb6f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -317,9 +317,14 @@ impl ReplayStage { .collect::>(); rooted_slots.push(root_bank.slot()); let old_root = bank_forks.read().unwrap().root(); - bank_forks.write().unwrap().set_root(new_root); + blocktree + .set_root(new_root, old_root) + .expect("Ledger set root failed"); + // Set root first in leader schedule_cache before bank_forks because bank_forks.root + // is consumed by repair_service to update gossip, so we don't want to get blobs for + // repair on gossip before we update leader schedule, otherwise they may get dropped. leader_schedule_cache.set_root(new_root); - blocktree.set_root(new_root, old_root)?; + bank_forks.write().unwrap().set_root(new_root); Self::handle_new_root(&bank_forks, progress); root_slot_sender.send(rooted_slots)?; } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 39c92471e9..eeccd3c87d 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -92,11 +92,13 @@ pub fn should_retransmit_and_persist( false } else if slot_leader_pubkey == None { inc_new_counter_debug!("streamer-recv_window-unknown_leader", 1); - true + false } else if slot_leader_pubkey != Some(blob.id()) { inc_new_counter_debug!("streamer-recv_window-wrong_leader", 1); false } else { + // At this point, slot_leader_id == blob.id() && blob.id() != *my_id, so + // the blob is valid to process true } } @@ -190,7 +192,7 @@ impl WindowService { let repair_service = RepairService::new( blocktree.clone(), - exit, + exit.clone(), repair_socket, cluster_info.clone(), repair_strategy, @@ -303,10 +305,10 @@ mod test { let mut blob = Blob::default(); blob.set_id(&leader_pubkey); - // without a Bank and blobs not from me, blob continues + // without a Bank and blobs not from me, blob gets thrown out assert_eq!( should_retransmit_and_persist(&blob, None, &cache, &me_id), - true + false ); // with a Bank for slot 0, blob continues @@ -322,12 +324,11 @@ mod test { false ); - // with a Bank and no idea who leader is, we keep the blobs (for now) - // TODO: persist in blocktree that we didn't know who the leader was at the time? + // with a Bank and no idea who leader is, blob gets thrown out blob.set_slot(MINIMUM_SLOT_LENGTH as u64 * 3); assert_eq!( should_retransmit_and_persist(&blob, Some(bank), &cache, &me_id), - true + false ); // if the blob came back from me, it doesn't continue, whether or not I have a bank diff --git a/core/tests/local_cluster.rs b/core/tests/local_cluster.rs index 3dd106793d..16900cb9cf 100644 --- a/core/tests/local_cluster.rs +++ b/core/tests/local_cluster.rs @@ -1,11 +1,14 @@ extern crate solana; +use crate::solana::blocktree::Blocktree; +use hashbrown::HashSet; +use log::*; use solana::cluster::Cluster; use solana::cluster_tests; use solana::gossip_service::discover_cluster; use solana::local_cluster::{ClusterConfig, LocalCluster}; use solana::validator::ValidatorConfig; -use solana_runtime::epoch_schedule::MINIMUM_SLOT_LENGTH; +use solana_runtime::epoch_schedule::{EpochSchedule, MINIMUM_SLOT_LENGTH}; use solana_sdk::poh_config::PohConfig; use solana_sdk::timing; use std::time::Duration; @@ -207,3 +210,106 @@ fn test_listener_startup() { let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, 4).unwrap(); assert_eq!(cluster_nodes.len(), 4); } + +#[test] +fn test_repairman_catchup() { + run_repairman_catchup(5); +} + +fn run_repairman_catchup(num_repairmen: u64) { + let mut validator_config = ValidatorConfig::default(); + let num_ticks_per_second = 100; + let num_ticks_per_slot = 40; + let num_slots_per_epoch = MINIMUM_SLOT_LENGTH as u64; + let num_root_buffer_slots = 10; + // Calculate the leader schedule num_root_buffer slots ahead. Otherwise, if stakers_slot_offset == + // num_slots_per_epoch, and num_slots_per_epoch == MINIMUM_SLOT_LENGTH, then repairmen + // will stop sending repairs after the last slot in epoch 1 (0-indexed), because the root + // is at most in the first epoch. + // + // For example: + // Assume: + // 1) num_slots_per_epoch = 32 + // 2) stakers_slot_offset = 32 + // 3) MINIMUM_SLOT_LENGTH = 32 + // + // Then the last slot in epoch 1 is slot 63. After completing slots 0 to 63, the root on the + // repairee is at most 31. Because, the stakers_slot_offset == 32, then the max confirmed epoch + // on the repairee is epoch 1. + // Thus the repairmen won't send any slots past epoch 1, slot 63 to this repairee until the repairee + // updates their root, and the repairee can't update their root until they get slot 64, so no progress + // is made. This is also not accounting for the fact that the repairee may not vote on every slot, so + // their root could actually be much less than 31. This is why we give a num_root_buffer_slots buffer. + let stakers_slot_offset = num_slots_per_epoch + num_root_buffer_slots; + + validator_config.rpc_config.enable_fullnode_exit = true; + + let lamports_per_repairman = 1000; + + // Make the repairee_stake small relative to the repairmen stake so that the repairee doesn't + // get included in the leader schedule, causing slots to get skipped while it's still trying + // to catch up + let repairee_stake = 3; + let cluster_lamports = 2 * lamports_per_repairman * num_repairmen + repairee_stake; + let node_stakes: Vec<_> = (0..num_repairmen).map(|_| lamports_per_repairman).collect(); + let mut cluster = LocalCluster::new(&ClusterConfig { + node_stakes, + cluster_lamports, + validator_config: validator_config.clone(), + ticks_per_slot: num_ticks_per_slot, + slots_per_epoch: num_slots_per_epoch, + stakers_slot_offset, + poh_config: PohConfig::new_sleep(Duration::from_millis(1000 / num_ticks_per_second)), + ..ClusterConfig::default() + }); + + let repairman_pubkeys: HashSet<_> = cluster.get_node_pubkeys().into_iter().collect(); + let epoch_schedule = EpochSchedule::new(num_slots_per_epoch, stakers_slot_offset, true); + let num_warmup_epochs = (epoch_schedule.get_stakers_epoch(0) + 1) as f64; + + // Sleep for longer than the first N warmup epochs, with a one epoch buffer for timing issues + cluster_tests::sleep_n_epochs( + num_warmup_epochs + 1.0, + &cluster.genesis_block.poh_config, + num_ticks_per_slot, + num_slots_per_epoch, + ); + + // Start up a new node, wait for catchup. Backwards repair won't be sufficient because the + // leader is sending blobs past this validator's first two confirmed epochs. Thus, the repairman + // protocol will have to kick in for this validator to repair. + + cluster.add_validator(&validator_config, repairee_stake); + + let all_pubkeys = cluster.get_node_pubkeys(); + let repairee_id = all_pubkeys + .into_iter() + .find(|x| !repairman_pubkeys.contains(x)) + .unwrap(); + + // Wait for repairman protocol to catch this validator up + cluster_tests::sleep_n_epochs( + num_warmup_epochs + 1.0, + &cluster.genesis_block.poh_config, + num_ticks_per_slot, + num_slots_per_epoch, + ); + + cluster.close_preserve_ledgers(); + let validator_ledger_path = cluster.fullnode_infos[&repairee_id].ledger_path.clone(); + + // Expect at least the the first two epochs to have been rooted after waiting 3 epochs. + let num_expected_slots = num_slots_per_epoch * 2; + let validator_ledger = Blocktree::open(&validator_ledger_path).unwrap(); + let validator_rooted_slots: Vec<_> = + validator_ledger.rooted_slot_iterator(0).unwrap().collect(); + + if validator_rooted_slots.len() as u64 <= num_expected_slots { + error!( + "Num expected slots: {}, number of rooted slots: {}", + num_expected_slots, + validator_rooted_slots.len() + ); + } + assert!(validator_rooted_slots.len() as u64 > num_expected_slots); +}