Start leader based on Poh tick height. (#3084)

* Start leader based on poh and test

* Equalize validator and leader stakes in LocalCluster

* Clear WorkingBank on poh_recorder reset
This commit is contained in:
anatoly yakovenko
2019-03-05 17:56:51 -08:00
committed by carllin
parent 9491999a95
commit 1c0cfb17a3
7 changed files with 160 additions and 50 deletions

View File

@ -7,9 +7,12 @@ use crate::contact_info::ContactInfo;
use crate::gossip_service::discover; use crate::gossip_service::discover;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND};
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
const SLOT_MILLIS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / NUM_TICKS_PER_SECOND;
/// Spend and verify from every node in the network /// Spend and verify from every node in the network
pub fn spend_and_verify_all_nodes( pub fn spend_and_verify_all_nodes(
entry_point_info: &ContactInfo, entry_point_info: &ContactInfo,
@ -49,9 +52,56 @@ pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) {
let mut client = mk_client(&node); let mut client = mk_client(&node);
assert!(client.fullnode_exit().unwrap()); assert!(client.fullnode_exit().unwrap());
} }
sleep(Duration::from_millis(250)); sleep(Duration::from_millis(SLOT_MILLIS));
for node in &cluster_nodes { for node in &cluster_nodes {
let mut client = mk_client(&node); let mut client = mk_client(&node);
assert!(client.fullnode_exit().is_err()); assert!(client.fullnode_exit().is_err());
} }
} }
pub fn kill_entry_and_spend_and_verify_rest(
entry_point_info: &ContactInfo,
funding_keypair: &Keypair,
nodes: usize,
) {
solana_logger::setup();
let cluster_nodes = discover(&entry_point_info, nodes);
assert!(cluster_nodes.len() >= nodes);
let mut client = mk_client(&entry_point_info);
info!("sleeping for an epoch");
sleep(Duration::from_millis(SLOT_MILLIS * DEFAULT_SLOTS_PER_EPOCH));
info!("done sleeping for an epoch");
info!("killing entry point");
assert!(client.fullnode_exit().unwrap());
info!("sleeping for a slot");
sleep(Duration::from_millis(SLOT_MILLIS));
info!("done sleeping for a slot");
for ingress_node in &cluster_nodes {
if ingress_node.id == entry_point_info.id {
continue;
}
let random_keypair = Keypair::new();
let mut client = mk_client(&ingress_node);
let bal = client
.poll_get_balance(&funding_keypair.pubkey())
.expect("balance in source");
assert!(bal > 0);
let mut transaction = SystemTransaction::new_move(
&funding_keypair,
random_keypair.pubkey(),
1,
client.get_recent_blockhash(),
0,
);
let sig = client
.retry_transfer(&funding_keypair, &mut transaction, 5)
.unwrap();
for validator in &cluster_nodes {
if validator.id == entry_point_info.id {
continue;
}
let mut client = mk_client(&validator);
client.poll_for_signature(&sig).unwrap();
}
}
}

View File

@ -45,6 +45,10 @@ pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 {
bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1 bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1
} }
pub fn tick_height_to_slot(bank: &Bank, tick_height: u64) -> u64 {
tick_height / bank.ticks_per_slot()
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -41,6 +41,8 @@ impl LocalCluster {
lamports_per_node: u64, lamports_per_node: u64,
fullnode_config: &FullnodeConfig, fullnode_config: &FullnodeConfig,
) -> Self { ) -> Self {
// Must have enough tokens to fund vote account and set delegate
assert!(lamports_per_node > 2);
let leader_keypair = Arc::new(Keypair::new()); let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey(); let leader_pubkey = leader_keypair.pubkey();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
@ -83,7 +85,12 @@ impl LocalCluster {
validator_pubkey, validator_balance validator_pubkey, validator_balance
); );
Self::create_and_fund_vote_account(&mut client, &voting_keypair, &validator_keypair, 1) Self::create_and_fund_vote_account(
&mut client,
&voting_keypair,
&validator_keypair,
lamports_per_node - 1,
)
.unwrap(); .unwrap();
let validator_server = Fullnode::new( let validator_server = Fullnode::new(
validator_node, validator_node,

View File

@ -61,6 +61,7 @@ impl PohRecorder {
} }
// synchronize PoH with a bank // synchronize PoH with a bank
pub fn reset(&mut self, tick_height: u64, blockhash: Hash) { pub fn reset(&mut self, tick_height: u64, blockhash: Hash) {
self.clear_bank();
let existing = self.tick_cache.iter().any(|(entry, entry_tick_height)| { let existing = self.tick_cache.iter().any(|(entry, entry_tick_height)| {
if entry.hash == blockhash { if entry.hash == blockhash {
assert_eq!(*entry_tick_height, tick_height); assert_eq!(*entry_tick_height, tick_height);
@ -489,4 +490,19 @@ mod tests {
poh_recorder.tick(); poh_recorder.tick();
assert_eq!(poh_recorder.poh.tick_height, 2); assert_eq!(poh_recorder.poh.tick_height, 2);
} }
#[test]
fn test_reset_clear_bank() {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default());
let working_bank = WorkingBank {
bank,
min_tick_height: 2,
max_tick_height: 3,
};
poh_recorder.set_working_bank(working_bank);
poh_recorder.reset(1, hash(b"hello"));
assert!(poh_recorder.working_bank.is_none());
}
} }

View File

@ -79,9 +79,7 @@ impl ReplayStage {
.name("solana-replay-stage".to_string()) .name("solana-replay-stage".to_string())
.spawn(move || { .spawn(move || {
let _exit = Finalizer::new(exit_.clone()); let _exit = Finalizer::new(exit_.clone());
let mut first_block = false;
let mut progress = HashMap::new(); let mut progress = HashMap::new();
loop { loop {
let now = Instant::now(); let now = Instant::now();
// Stop getting entries if we get exit signal // Stop getting entries if we get exit signal
@ -115,16 +113,6 @@ impl ReplayStage {
} }
} }
} }
let frozen = bank_forks.read().unwrap().frozen_banks();
if votable.is_empty()
&& frozen.len() == 1
&& active_banks.is_empty()
&& !first_block
{
first_block = true;
votable.extend(frozen.keys());
info!("voting on first block {:?}", votable);
}
// TODO: fork selection // TODO: fork selection
// vote on the latest one for now // vote on the latest one for now
votable.sort(); votable.sort();
@ -136,10 +124,6 @@ impl ReplayStage {
.get(*latest_slot_vote) .get(*latest_slot_vote)
.unwrap() .unwrap()
.clone(); .clone();
let next_slot = *latest_slot_vote + 1;
let next_leader_id =
leader_schedule_utils::slot_leader_at(next_slot, &parent);
cluster_info.write().unwrap().set_leader(next_leader_id);
subscriptions.notify_subscribers(&parent); subscriptions.notify_subscribers(&parent);
@ -157,31 +141,9 @@ impl ReplayStage {
.lock() .lock()
.unwrap() .unwrap()
.reset(parent.tick_height(), parent.last_blockhash()); .reset(parent.tick_height(), parent.last_blockhash());
}
if next_leader_id == my_id { Self::start_leader(my_id, &bank_forks, &poh_recorder, &cluster_info);
let frozen = bank_forks.read().unwrap().frozen_banks();
assert!(frozen.get(&next_slot).is_none());
assert!(bank_forks.read().unwrap().get(next_slot).is_none());
let tpu_bank = Bank::new_from_parent(&parent, my_id, next_slot);
bank_forks.write().unwrap().insert(next_slot, tpu_bank);
if let Some(tpu_bank) =
bank_forks.read().unwrap().get(next_slot).cloned()
{
assert_eq!(
bank_forks.read().unwrap().working_bank().slot(),
tpu_bank.slot()
);
debug!(
"new working bank: me: {} next_slot: {} next_leader: {}",
my_id,
tpu_bank.slot(),
next_leader_id
);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
}
}
}
inc_new_counter_info!( inc_new_counter_info!(
"replicate_stage-duration", "replicate_stage-duration",
duration_as_ms(&now.elapsed()) as usize duration_as_ms(&now.elapsed()) as usize
@ -203,7 +165,52 @@ impl ReplayStage {
forward_entry_receiver, forward_entry_receiver,
) )
} }
pub fn start_leader(
my_id: Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) {
let frozen = bank_forks.read().unwrap().frozen_banks();
// TODO: fork selection
let mut newest_frozen: Vec<(&u64, &Arc<Bank>)> = frozen.iter().collect();
newest_frozen.sort_by_key(|x| *x.0);
if let Some((_, parent)) = newest_frozen.last() {
let poh_tick_height = poh_recorder.lock().unwrap().tick_height();
let poh_slot = leader_schedule_utils::tick_height_to_slot(parent, poh_tick_height + 1);
assert!(frozen.get(&poh_slot).is_none());
trace!("checking poh slot for leader {}", poh_slot);
if bank_forks.read().unwrap().get(poh_slot).is_none() {
let next_leader = leader_schedule_utils::slot_leader_at(poh_slot, parent);
debug!(
"me: {} leader {} at poh slot {}",
my_id, next_leader, poh_slot
);
cluster_info.write().unwrap().set_leader(next_leader);
if next_leader == my_id {
debug!("starting tpu for slot {}", poh_slot);
let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot);
bank_forks.write().unwrap().insert(poh_slot, tpu_bank);
if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() {
assert_eq!(
bank_forks.read().unwrap().working_bank().slot(),
tpu_bank.slot()
);
debug!(
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
my_id,
tpu_bank.slot(),
next_leader
);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
}
}
}
} else {
error!("No frozen banks available!");
}
}
pub fn replay_blocktree_into_bank( pub fn replay_blocktree_into_bank(
bank: &Bank, bank: &Bank,
blocktree: &Blocktree, blocktree: &Blocktree,

View File

@ -6,8 +6,8 @@ pub const NUM_TICKS_PER_SECOND: u64 = 10;
// At 10 ticks/s, 8 ticks per slot implies that leader rotation and voting will happen // At 10 ticks/s, 8 ticks per slot implies that leader rotation and voting will happen
// every 800 ms. A fast voting cadence ensures faster finality and convergence // every 800 ms. A fast voting cadence ensures faster finality and convergence
pub const DEFAULT_TICKS_PER_SLOT: u64 = 80; pub const DEFAULT_TICKS_PER_SLOT: u64 = 16;
pub const DEFAULT_SLOTS_PER_EPOCH: u64 = 64; pub const DEFAULT_SLOTS_PER_EPOCH: u64 = 16;
/// The time window of recent block hash values that the bank will track the signatures /// The time window of recent block hash values that the bank will track the signatures
/// of over. Once the bank discards a block hash, it will reject any transactions that use /// of over. Once the bank discards a block hash, it will reject any transactions that use

View File

@ -6,7 +6,7 @@ use solana::local_cluster::LocalCluster;
use solana::rpc::JsonRpcConfig; use solana::rpc::JsonRpcConfig;
#[test] #[test]
fn test_spend_and_verify_all_nodes_1() -> () { fn test_spend_and_verify_all_nodes_1() {
solana_logger::setup(); solana_logger::setup();
let num_nodes = 1; let num_nodes = 1;
let local = LocalCluster::new(num_nodes, 10_000, 100); let local = LocalCluster::new(num_nodes, 10_000, 100);
@ -18,7 +18,7 @@ fn test_spend_and_verify_all_nodes_1() -> () {
} }
#[test] #[test]
fn test_spend_and_verify_all_nodes_2() -> () { fn test_spend_and_verify_all_nodes_2() {
solana_logger::setup(); solana_logger::setup();
let num_nodes = 2; let num_nodes = 2;
let local = LocalCluster::new(num_nodes, 10_000, 100); let local = LocalCluster::new(num_nodes, 10_000, 100);
@ -30,7 +30,7 @@ fn test_spend_and_verify_all_nodes_2() -> () {
} }
#[test] #[test]
fn test_spend_and_verify_all_nodes_3() -> () { fn test_spend_and_verify_all_nodes_3() {
solana_logger::setup(); solana_logger::setup();
let num_nodes = 3; let num_nodes = 3;
let local = LocalCluster::new(num_nodes, 10_000, 100); let local = LocalCluster::new(num_nodes, 10_000, 100);
@ -43,7 +43,7 @@ fn test_spend_and_verify_all_nodes_3() -> () {
#[test] #[test]
#[should_panic] #[should_panic]
fn test_fullnode_exit_safe_config_should_panic_2() -> () { fn test_fullnode_exit_default_config_should_panic() {
solana_logger::setup(); solana_logger::setup();
let num_nodes = 2; let num_nodes = 2;
let local = LocalCluster::new(num_nodes, 10_000, 100); let local = LocalCluster::new(num_nodes, 10_000, 100);
@ -51,7 +51,7 @@ fn test_fullnode_exit_safe_config_should_panic_2() -> () {
} }
#[test] #[test]
fn test_fullnode_exit_unsafe_config_2() -> () { fn test_fullnode_exit_2() {
solana_logger::setup(); solana_logger::setup();
let num_nodes = 2; let num_nodes = 2;
let mut fullnode_exit = FullnodeConfig::default(); let mut fullnode_exit = FullnodeConfig::default();
@ -59,3 +59,29 @@ fn test_fullnode_exit_unsafe_config_2() -> () {
let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_exit); let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_exit);
cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes); cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes);
} }
#[test]
fn test_leader_failure_2() {
let num_nodes = 2;
let mut fullnode_exit = FullnodeConfig::default();
fullnode_exit.rpc_config = JsonRpcConfig::TestOnlyAllowRpcFullnodeExit;
let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_exit);
cluster_tests::kill_entry_and_spend_and_verify_rest(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
fn test_leader_failure_3() {
let num_nodes = 3;
let mut fullnode_exit = FullnodeConfig::default();
fullnode_exit.rpc_config = JsonRpcConfig::TestOnlyAllowRpcFullnodeExit;
let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_exit);
cluster_tests::kill_entry_and_spend_and_verify_rest(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}