Make Fullnode do less work on rotation, ReplayStage can just pass along more details

This commit is contained in:
Michael Vines
2019-02-21 09:15:30 -08:00
parent 40977fa99f
commit dcf1200d2a
5 changed files with 110 additions and 143 deletions

View File

@@ -28,7 +28,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
use std::thread::{sleep, spawn, Result};
use std::thread::{spawn, Result};
use std::time::Duration;
struct NodeServices {
@@ -127,19 +127,9 @@ impl Fullnode {
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new(
&config.leader_scheduler_config,
)));
let (mut bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
new_banks_from_blocktree(ledger_path, &config.ledger_config(), &leader_scheduler);
if bank_forks_info.len() != 1 {
warn!("TODO: figure out what to do with multiple bank forks");
}
bank_forks.set_working_bank_id(bank_forks_info[0].bank_id);
let (bank, entry_height, last_entry_id) = (
bank_forks.working_bank(),
bank_forks_info[0].entry_height,
bank_forks_info[0].last_entry_id,
);
info!("node info: {:?}", node.info);
info!("node entrypoint_info: {:?}", entrypoint_info_option);
info!(
@@ -206,35 +196,6 @@ impl Fullnode {
.insert_info(entrypoint_info.clone());
}
// Figure which node should generate the next tick
let (scheduled_leader, max_tick_height, blob_index) = {
let next_tick = bank.tick_height() + 1;
let leader_scheduler = leader_scheduler.read().unwrap();
let slot_at_next_tick = leader_scheduler.tick_height_to_slot(next_tick);
let scheduled_leader = leader_scheduler
.get_leader_for_slot(slot_at_next_tick)
.expect("Leader not known after processing bank");
let max_tick_height = next_tick + leader_scheduler.num_ticks_left_in_slot(next_tick);
let blob_index =
if let Some(meta) = blocktree.meta(slot_at_next_tick).expect("Database error") {
meta.consumed
} else {
0
};
trace!(
"node {:?} scheduled as leader for ticks ({},{}), starting blob_index={}",
scheduled_leader,
next_tick,
max_tick_height,
blob_index,
);
(scheduled_leader, max_tick_height, blob_index)
};
let sockets = Sockets {
repair: node
.sockets
@@ -263,6 +224,48 @@ impl Fullnode {
// Setup channel for rotation indications
let (rotation_sender, rotation_receiver) = channel();
// TODO: All this into Tpu/ReplayStage....
if bank_forks_info.len() != 1 {
warn!("TODO: figure out what to do with multiple bank forks");
}
let (bank, entry_height, last_entry_id) = {
let mut bank_forks = bank_forks.write().unwrap();
bank_forks.set_working_bank_id(bank_forks_info[0].bank_id);
(
bank_forks.working_bank(),
bank_forks_info[0].entry_height,
bank_forks_info[0].last_entry_id,
)
};
// Figure which node should generate the next tick
let (next_leader, next_slot, blob_index) = {
let next_tick = bank.tick_height() + 1;
let leader_scheduler = leader_scheduler.read().unwrap();
let next_slot = leader_scheduler.tick_height_to_slot(next_tick);
let next_leader = leader_scheduler
.get_leader_for_slot(next_slot)
.expect("Leader not known after processing bank");
let blob_index = if let Some(meta) = blocktree.meta(next_slot).expect("Database error")
{
meta.consumed
} else {
0
};
trace!(
"node {:?} scheduled as leader for slot {}, starting blob_index={}",
next_leader,
next_slot,
blob_index,
);
(next_leader, next_slot, blob_index)
};
// END TODO
let tvu = Tvu::new(
voting_keypair_option,
&bank,
@@ -298,67 +301,21 @@ impl Fullnode {
leader_scheduler,
};
fullnode.rotate(
&bank,
scheduled_leader,
max_tick_height,
blob_index,
&last_entry_id,
);
// TODO: This first rotate should come from the Tvu/ReplayStage
fullnode.rotate(&bank, next_leader, next_slot, blob_index, &last_entry_id);
inc_new_counter_info!("fullnode-new", 1);
fullnode
}
fn get_next_leader(&self, bank: &Arc<Bank>, tick_height: u64) -> (Pubkey, u64) {
loop {
let bank_tick_height = bank.tick_height();
if bank_tick_height >= tick_height {
break;
}
trace!(
"{:?}: Waiting for bank tick_height to catch up from {} to {}",
self.id,
bank_tick_height,
tick_height
);
sleep(Duration::from_millis(10));
}
let (scheduled_leader, max_tick_height) = {
let mut leader_scheduler = self.leader_scheduler.write().unwrap();
// A transition is only permitted on the final tick of a slot
assert_eq!(leader_scheduler.num_ticks_left_in_slot(tick_height), 0);
let first_tick_of_next_slot = tick_height + 1;
leader_scheduler.update_tick_height(first_tick_of_next_slot, bank);
let slot = leader_scheduler.tick_height_to_slot(first_tick_of_next_slot);
(
leader_scheduler.get_leader_for_slot(slot).unwrap(),
first_tick_of_next_slot
+ leader_scheduler.num_ticks_left_in_slot(first_tick_of_next_slot),
)
};
debug!(
"node {:?} scheduled as leader for ticks [{}, {})",
scheduled_leader,
tick_height + 1,
max_tick_height
);
(scheduled_leader, max_tick_height)
}
fn rotate(
&mut self,
bank: &Arc<Bank>,
next_leader: Pubkey,
max_tick_height: u64,
leader: Pubkey,
slot: u64,
blob_index: u64,
last_entry_id: &Hash,
) -> FullnodeReturnType {
if next_leader == self.id {
if leader == self.id {
let transition = match self.node_services.tpu.is_leader() {
Some(was_leader) => {
if was_leader {
@@ -371,6 +328,7 @@ impl Fullnode {
}
None => FullnodeReturnType::LeaderToLeaderRotation, // value doesn't matter here...
};
let tpu_bank = Arc::new(Bank::new_from_parent(bank));
self.node_services.tpu.switch_to_leader(
&tpu_bank,
@@ -383,7 +341,7 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone broadcast socket"),
self.sigverify_disabled,
max_tick_height,
slot,
blob_index,
last_entry_id,
&self.blocktree,
@@ -393,7 +351,7 @@ impl Fullnode {
} else {
debug!("{:?} rotating to validator role", self.id);
self.node_services.tpu.switch_to_forwarder(
next_leader,
leader,
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
@@ -421,17 +379,23 @@ impl Fullnode {
}
match self.rotation_receiver.recv_timeout(timeout) {
Ok(tick_height) => {
trace!("{:?}: rotate at tick_height={}", self.id, tick_height);
Ok((bank_id, slot, leader)) => {
trace!(
"{:?}: rotate for slot={} to leader={:?} using bank={}",
self.id,
slot,
leader,
bank_id
);
// TODO: Uncomment next line once `bank_id` has a valid id in it
//self.bank_forks.write().set_working_bank_id(bank_id);
let bank = self.bank_forks.read().unwrap().working_bank();
let (next_leader, max_tick_height) = self.get_next_leader(&bank, tick_height);
let transition =
self.rotate(&bank, next_leader, max_tick_height, 0, &bank.last_id());
let transition = self.rotate(&bank, leader, slot, 0, &bank.last_id());
debug!("role transition complete: {:?}", transition);
if let Some(ref rotation_notifier) = rotation_notifier {
rotation_notifier
.send((transition, tick_height + 1))
.unwrap();
rotation_notifier.send((transition, slot)).unwrap();
}
}
Err(RecvTimeoutError::Timeout) => continue,
@@ -684,7 +648,7 @@ mod tests {
// cluster it will continue to be the leader
assert_eq!(
rotation_receiver.recv().unwrap(),
(FullnodeReturnType::LeaderToLeaderRotation, ticks_per_slot)
(FullnodeReturnType::LeaderToLeaderRotation, 1)
);
bootstrap_leader_exit();
}