diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index e710a91012..49a3dc7740 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -83,7 +83,7 @@ fn main() -> () { let node_info = node.info.clone(); let pubkey = keypair.pubkey(); - let mut fullnode = Fullnode::new(node, ledger_path, keypair, network, false); + let mut fullnode = Fullnode::new(node, ledger_path, keypair, network, false, None); // airdrop stuff, probably goes away at some point let leader = match network { diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 4324bc7160..c9417c402c 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -1,7 +1,7 @@ //! The `broadcast_stage` broadcasts data from a leader node to validators //! use counter::Counter; -use crdt::{Crdt, CrdtError, NodeInfo, LEADER_ROTATION_INTERVAL}; +use crdt::{Crdt, CrdtError, NodeInfo}; use entry::Entry; #[cfg(feature = "erasure")] use erasure; @@ -184,9 +184,16 @@ impl BroadcastStage { coding: entry_height, }; let mut receive_index = entry_height; - let me = crdt.read().unwrap().my_data().clone(); + let me; + let leader_rotation_interval; + { + let rcrdt = crdt.read().unwrap(); + me = rcrdt.my_data().clone(); + leader_rotation_interval = rcrdt.get_leader_rotation_interval(); + } + loop { - if transmit_index.data % (LEADER_ROTATION_INTERVAL as u64) == 0 { + if transmit_index.data % (leader_rotation_interval as u64) == 0 { let rcrdt = crdt.read().unwrap(); let my_id = rcrdt.my_data().id; match rcrdt.get_scheduled_leader(transmit_index.data) { @@ -272,7 +279,7 @@ impl Service for BroadcastStage { #[cfg(test)] mod tests { use broadcast_stage::{BroadcastStage, BroadcastStageReturnType}; - use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL}; + use crdt::{Crdt, Node}; use entry::Entry; use mint::Mint; use packet::BlobRecycler; @@ -361,11 +368,12 @@ mod tests { #[test] fn test_broadcast_stage_leader_rotation_exit() { let broadcast_info = setup_dummy_broadcast_stage(); - + let leader_rotation_interval = 10; { let mut wcrdt = broadcast_info.crdt.write().unwrap(); + wcrdt.set_leader_rotation_interval(leader_rotation_interval); // Set the leader for the next rotation to be myself - wcrdt.set_scheduled_leader(LEADER_ROTATION_INTERVAL, broadcast_info.my_id); + wcrdt.set_scheduled_leader(leader_rotation_interval, broadcast_info.my_id); } let genesis_len = broadcast_info.entries.len() as u64; @@ -375,12 +383,12 @@ mod tests { .expect("Ledger should not be empty") .id; - // Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will + // Input enough entries to make exactly leader_rotation_interval entries, which will // trigger a check for leader rotation. Because the next scheduled leader // is ourselves, we won't exit let mut recorder = Recorder::new(last_entry_hash); - for _ in genesis_len..LEADER_ROTATION_INTERVAL { + for _ in genesis_len..leader_rotation_interval { let new_entry = recorder.record(vec![]); broadcast_info.entry_sender.send(new_entry).unwrap(); } @@ -390,12 +398,12 @@ mod tests { .crdt .write() .unwrap() - .set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, broadcast_info.buddy_id); + .set_scheduled_leader(2 * leader_rotation_interval, broadcast_info.buddy_id); - // Input another LEADER_ROTATION_INTERVAL dummy entries, which will take us + // Input another leader_rotation_interval dummy entries, which will take us // past the point of the leader rotation. The write_stage will see that // it's no longer the leader after checking the crdt, and exit - for _ in 0..LEADER_ROTATION_INTERVAL { + for _ in 0..leader_rotation_interval { let new_entry = recorder.record(vec![]); match broadcast_info.entry_sender.send(new_entry) { // We disconnected, break out of loop and check the results @@ -413,6 +421,6 @@ mod tests { let highest_index = find_highest_window_index(&broadcast_info.shared_window); // The blob index is zero indexed, so it will always be one behind the entry height // which starts at one. - assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1); + assert_eq!(highest_index, 2 * leader_rotation_interval - 1); } } diff --git a/src/crdt.rs b/src/crdt.rs index dd55f93177..c132ed83c0 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -37,10 +37,6 @@ use timing::{duration_as_ms, timestamp}; use window::{SharedWindow, WindowIndex}; pub const FULLNODE_PORT_RANGE: (u16, u16) = (8000, 10_000); -#[cfg(test)] -pub const LEADER_ROTATION_INTERVAL: u64 = 10; -#[cfg(not(test))] -pub const LEADER_ROTATION_INTERVAL: u64 = 100; /// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; @@ -213,7 +209,11 @@ pub struct Crdt { /// TODO: Clearly not the correct implementation of this, but a temporary abstraction /// for testing pub scheduled_leaders: HashMap, + // TODO: Is there a better way to do this? We didn't make this a constant because + // we want to be able to set it in integration tests so that the tests don't time out. + pub leader_rotation_interval: u64, } + // TODO These messages should be signed, and go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize, Debug)] enum Protocol { @@ -244,6 +244,7 @@ impl Crdt { id: node_info.id, update_index: 1, scheduled_leaders: HashMap::new(), + leader_rotation_interval: 100, }; me.local.insert(node_info.id, me.update_index); me.table.insert(node_info.id, node_info); @@ -314,6 +315,14 @@ impl Crdt { } } + pub fn set_leader_rotation_interval(&mut self, leader_rotation_interval: u64) { + self.leader_rotation_interval = leader_rotation_interval; + } + + pub fn get_leader_rotation_interval(&self) -> u64 { + self.leader_rotation_interval + } + // TODO: Dummy leader schedule setter, need to implement actual leader scheduling. pub fn set_scheduled_leader(&mut self, entry_height: u64, new_leader_id: Pubkey) -> () { self.scheduled_leaders.insert(entry_height, new_leader_id); diff --git a/src/drone.rs b/src/drone.rs index e111d19906..d5112c8bb1 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -337,6 +337,7 @@ mod tests { None, &ledger_path, false, + None, ); let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket"); @@ -373,7 +374,7 @@ mod tests { let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); - let server = Fullnode::new(leader, &ledger_path, leader_keypair, None, false); + let server = Fullnode::new(leader, &ledger_path, leader_keypair, None, false, None); let requests_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket"); let transactions_socket = diff --git a/src/fullnode.rs b/src/fullnode.rs index 9e04f54c49..ff1bf3855e 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -121,6 +121,7 @@ impl Fullnode { keypair: Keypair, leader_addr: Option, sigverify_disabled: bool, + leader_rotation_interval: Option, ) -> Self { info!("creating bank..."); let (bank, entry_height, ledger_tail) = Self::new_bank_from_ledger(ledger_path); @@ -145,6 +146,7 @@ impl Fullnode { leader_info.as_ref(), ledger_path, sigverify_disabled, + leader_rotation_interval, ); match leader_addr { @@ -224,6 +226,7 @@ impl Fullnode { leader_info: Option<&NodeInfo>, ledger_path: &str, sigverify_disabled: bool, + leader_rotation_interval: Option, ) -> Self { if leader_info.is_none() { node.info.leader_id = node.info.id; @@ -257,7 +260,11 @@ impl Fullnode { window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler); let shared_window = Arc::new(RwLock::new(window)); - let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new"))); + let mut crdt = Crdt::new(node.info).expect("Crdt::new"); + if let Some(interval) = leader_rotation_interval { + crdt.set_leader_rotation_interval(interval); + } + let crdt = Arc::new(RwLock::new(crdt)); let ncp = Ncp::new( &crdt, @@ -440,8 +447,10 @@ impl Fullnode { // TODO: only used for testing, get rid of this once we have actual // leader scheduling pub fn set_scheduled_leader(&self, leader_id: Pubkey, entry_height: u64) { - let mut wcrdt = self.crdt.write().unwrap(); - wcrdt.set_scheduled_leader(entry_height, leader_id); + self.crdt + .write() + .unwrap() + .set_scheduled_leader(entry_height, leader_id); } fn new_bank_from_ledger(ledger_path: &str) -> (Bank, u64, Vec) { @@ -508,6 +517,7 @@ mod tests { Some(&entry), &validator_ledger_path, false, + None, ); v.close().unwrap(); remove_dir_all(validator_ledger_path).unwrap(); @@ -533,6 +543,7 @@ mod tests { Some(&entry), &validator_ledger_path, false, + None, ) }).collect(); diff --git a/src/thin_client.rs b/src/thin_client.rs index d3a3def4d9..0ac399db89 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -472,6 +472,7 @@ mod tests { None, &ledger_path, false, + None, ); sleep(Duration::from_millis(900)); @@ -517,6 +518,7 @@ mod tests { None, &ledger_path, false, + None, ); //TODO: remove this sleep, or add a retry so CI is stable sleep(Duration::from_millis(300)); @@ -575,6 +577,7 @@ mod tests { None, &ledger_path, false, + None, ); sleep(Duration::from_millis(300)); @@ -634,6 +637,7 @@ mod tests { None, &ledger_path, false, + None, ); sleep(Duration::from_millis(900)); diff --git a/src/wallet.rs b/src/wallet.rs index b12046a13b..132520fdcd 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -387,6 +387,7 @@ mod tests { None, &ledger_path, false, + None, ); sleep(Duration::from_millis(200)); @@ -453,6 +454,7 @@ mod tests { None, &ledger_path, false, + None, ); sleep(Duration::from_millis(200)); diff --git a/src/write_stage.rs b/src/write_stage.rs index 98baa90119..5c3c045f46 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -4,7 +4,7 @@ use bank::Bank; use counter::Counter; -use crdt::{Crdt, LEADER_ROTATION_INTERVAL}; +use crdt::Crdt; use entry::Entry; use ledger::{Block, LedgerWriter}; use log::Level; @@ -40,13 +40,14 @@ impl WriteStage { // reflecting whether we actually hit an entry height for leader rotation. fn find_leader_rotation_index( crdt: &Arc>, + leader_rotation_interval: u64, entry_height: u64, mut new_entries: Vec, ) -> (Vec, bool) { // Find out how many more entries we can squeeze in until the next leader // rotation let entries_until_leader_rotation = - LEADER_ROTATION_INTERVAL - (entry_height % LEADER_ROTATION_INTERVAL); + leader_rotation_interval - (entry_height % leader_rotation_interval); let new_entries_length = new_entries.len(); @@ -55,7 +56,7 @@ impl WriteStage { let mut is_leader_rotation = false; loop { - if (entry_height + i as u64) % LEADER_ROTATION_INTERVAL == 0 { + if (entry_height + i as u64) % leader_rotation_interval == 0 { let rcrdt = crdt.read().unwrap(); let my_id = rcrdt.my_data().id; let next_leader = rcrdt.get_scheduled_leader(entry_height + i as u64); @@ -69,7 +70,7 @@ impl WriteStage { break; } - i += cmp::min(LEADER_ROTATION_INTERVAL as usize, new_entries_length - i); + i += cmp::min(leader_rotation_interval as usize, new_entries_length - i); } new_entries.truncate(i as usize); @@ -85,6 +86,7 @@ impl WriteStage { entry_sender: &Sender>, entry_receiver: &Receiver>, entry_height: &mut u64, + leader_rotation_interval: u64, ) -> Result<()> { let mut ventries = Vec::new(); let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; @@ -96,6 +98,7 @@ impl WriteStage { // rotation let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index( crdt, + leader_rotation_interval, *entry_height + num_new_entries as u64, received_entries, ); @@ -194,14 +197,20 @@ impl WriteStage { .spawn(move || { let mut last_vote = 0; let mut last_valid_validator_timestamp = 0; - let id = crdt.read().unwrap().id; + let id; + let leader_rotation_interval; + { + let rcrdt = crdt.read().unwrap(); + id = crdt.read().unwrap().id; + leader_rotation_interval = rcrdt.get_leader_rotation_interval(); + } let mut entry_height = entry_height; loop { // Note that entry height is not zero indexed, it starts at 1, so the // old leader is in power up to and including entry height - // n * LEADER_ROTATION_INTERVAL for some "n". Once we've forwarded + // n * leader_rotation_interval for some "n". Once we've forwarded // that last block, check for the next scheduled leader. - if entry_height % (LEADER_ROTATION_INTERVAL as u64) == 0 { + if entry_height % (leader_rotation_interval as u64) == 0 { let rcrdt = crdt.read().unwrap(); let my_id = rcrdt.my_data().id; let scheduled_leader = rcrdt.get_scheduled_leader(entry_height); @@ -225,6 +234,7 @@ impl WriteStage { &entry_sender, &entry_receiver, &mut entry_height, + leader_rotation_interval, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { @@ -282,7 +292,7 @@ impl Service for WriteStage { #[cfg(test)] mod tests { use bank::Bank; - use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL}; + use crdt::{Crdt, Node}; use entry::Entry; use ledger::{genesis, read_ledger}; use packet::BlobRecycler; @@ -360,12 +370,14 @@ mod tests { #[test] fn test_write_stage_leader_rotation_exit() { let write_stage_info = setup_dummy_write_stage(); + let leader_rotation_interval = 10; - write_stage_info - .crdt - .write() - .unwrap() - .set_scheduled_leader(LEADER_ROTATION_INTERVAL, write_stage_info.my_id); + { + let mut wcrdt = write_stage_info.crdt.write().unwrap(); + + wcrdt.set_leader_rotation_interval(leader_rotation_interval); + wcrdt.set_scheduled_leader(leader_rotation_interval, write_stage_info.my_id); + } let last_entry_hash = write_stage_info .ledger_tail @@ -375,11 +387,11 @@ mod tests { let genesis_entry_height = write_stage_info.ledger_tail.len() as u64; - // Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will + // Input enough entries to make exactly leader_rotation_interval entries, which will // trigger a check for leader rotation. Because the next scheduled leader // is ourselves, we won't exit let mut recorder = Recorder::new(last_entry_hash); - for _ in genesis_entry_height..LEADER_ROTATION_INTERVAL { + for _ in genesis_entry_height..leader_rotation_interval { let new_entry = recorder.record(vec![]); write_stage_info.entry_sender.send(new_entry).unwrap(); } @@ -391,14 +403,14 @@ mod tests { { let mut wcrdt = write_stage_info.crdt.write().unwrap(); wcrdt.insert(&leader2_info.info); - wcrdt.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, leader2_keypair.pubkey()); + wcrdt.set_scheduled_leader(2 * leader_rotation_interval, leader2_keypair.pubkey()); } - // Input another LEADER_ROTATION_INTERVAL dummy entries one at a time, + // Input another leader_rotation_interval dummy entries one at a time, // which will take us past the point of the leader rotation. // The write_stage will see that it's no longer the leader after // checking the schedule, and exit - for _ in 0..LEADER_ROTATION_INTERVAL { + for _ in 0..leader_rotation_interval { let new_entry = recorder.record(vec![]); write_stage_info.entry_sender.send(new_entry).unwrap(); } @@ -408,10 +420,10 @@ mod tests { WriteStageReturnType::LeaderRotation ); - // Make sure the ledger contains exactly LEADER_ROTATION_INTERVAL entries + // Make sure the ledger contains exactly leader_rotation_interval entries let (entry_height, _) = process_ledger(&write_stage_info.leader_ledger_path, &write_stage_info.bank); remove_dir_all(write_stage_info.leader_ledger_path).unwrap(); - assert_eq!(entry_height, 2 * LEADER_ROTATION_INTERVAL); + assert_eq!(entry_height, 2 * leader_rotation_interval); } } diff --git a/tests/multinode.rs b/tests/multinode.rs index 3e75f2f76d..c3cdff1b41 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -5,7 +5,7 @@ extern crate chrono; extern crate serde_json; extern crate solana; -use solana::crdt::{Crdt, Node, NodeInfo, LEADER_ROTATION_INTERVAL}; +use solana::crdt::{Crdt, Node, NodeInfo}; use solana::entry::Entry; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::hash::Hash; @@ -145,7 +145,14 @@ fn test_multi_node_ledger_window() -> result::Result<()> { writer.write_entries(entries).unwrap(); } - let leader = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false); + let leader = Fullnode::new( + leader, + &leader_ledger_path, + leader_keypair, + None, + false, + None, + ); // Send leader some tokens to vote let leader_balance = @@ -163,6 +170,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { keypair, Some(leader_data.contact_info.ncp), false, + None, ); // contains the leader and new node @@ -218,7 +226,14 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { ); ledger_paths.push(zero_ledger_path.clone()); - let server = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false); + let server = Fullnode::new( + leader, + &leader_ledger_path, + leader_keypair, + None, + false, + None, + ); // Send leader some tokens to vote let leader_balance = @@ -241,6 +256,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { keypair, Some(leader_data.contact_info.ncp), false, + None, ); nodes.push(val); } @@ -276,6 +292,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { keypair, Some(leader_data.contact_info.ncp), false, + None, ); nodes.push(val); //contains the leader and new node @@ -335,7 +352,14 @@ fn test_multi_node_basic() { let (alice, leader_ledger_path, _) = genesis("multi_node_basic", 10_000); ledger_paths.push(leader_ledger_path.clone()); - let server = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false); + let server = Fullnode::new( + leader, + &leader_ledger_path, + leader_keypair, + None, + false, + None, + ); // Send leader some tokens to vote let leader_balance = @@ -354,6 +378,7 @@ fn test_multi_node_basic() { keypair, Some(leader_data.contact_info.ncp), false, + None, ); nodes.push(val); } @@ -396,7 +421,14 @@ fn test_boot_validator_from_file() -> result::Result<()> { ledger_paths.push(leader_ledger_path.clone()); let leader_data = leader.info.clone(); - let leader_fullnode = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false); + let leader_fullnode = Fullnode::new( + leader, + &leader_ledger_path, + leader_keypair, + None, + false, + None, + ); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap(); assert_eq!(leader_balance, 500); @@ -415,6 +447,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { keypair, Some(leader_data.contact_info.ncp), false, + None, ); let mut client = mk_client(&validator_data); let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance)); @@ -433,7 +466,7 @@ fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) { let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); - let leader_fullnode = Fullnode::new(leader, &ledger_path, leader_keypair, None, false); + let leader_fullnode = Fullnode::new(leader, &ledger_path, leader_keypair, None, false, None); (leader_data, leader_fullnode) } @@ -487,6 +520,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { keypair, Some(leader_data.contact_info.ncp), false, + None, ); // trigger broadcast, validator should catch up from leader, whose window contains @@ -544,7 +578,14 @@ fn test_multi_node_dynamic_network() { let alice_arc = Arc::new(RwLock::new(alice)); let leader_data = leader.info.clone(); - let server = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, true); + let server = Fullnode::new( + leader, + &leader_ledger_path, + leader_keypair, + None, + true, + None, + ); // Send leader some tokens to vote let leader_balance = send_tx_and_retry_get_balance( @@ -617,6 +658,7 @@ fn test_multi_node_dynamic_network() { keypair, Some(leader_data.contact_info.ncp), true, + None, ); (rd, val) }).unwrap() @@ -712,6 +754,7 @@ fn test_multi_node_dynamic_network() { #[test] fn test_leader_to_validator_transition() { logger::setup(); + let leader_rotation_interval = 20; // Make a dummy address to be the sink for this test's mock transactions let bob_pubkey = Keypair::new().pubkey(); @@ -720,7 +763,7 @@ fn test_leader_to_validator_transition() { // in the leader ledger let (mint, leader_ledger_path, entries) = genesis( "test_leader_to_validator_transition", - (3 * LEADER_ROTATION_INTERVAL) as i64, + (3 * leader_rotation_interval) as i64, ); let genesis_height = entries.len() as u64; @@ -737,10 +780,11 @@ fn test_leader_to_validator_transition() { leader_keypair, None, false, + Some(leader_rotation_interval), ); // Set the next leader to be Bob - leader.set_scheduled_leader(bob_pubkey, LEADER_ROTATION_INTERVAL); + leader.set_scheduled_leader(bob_pubkey, leader_rotation_interval); // Make an extra node for our leader to broadcast to, // who won't vote and mess with our leader's entry count @@ -762,14 +806,14 @@ fn test_leader_to_validator_transition() { assert!(converged); - let extra_transactions = std::cmp::max(LEADER_ROTATION_INTERVAL / 20, 1); + let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1); - // Push leader "extra_transactions" past LEADER_ROTATION_INTERVAL entry height, + // Push leader "extra_transactions" past leader_rotation_interval entry height, // make sure the leader stops. - assert!(genesis_height < LEADER_ROTATION_INTERVAL); - for i in genesis_height..(LEADER_ROTATION_INTERVAL + extra_transactions) { + assert!(genesis_height < leader_rotation_interval); + for i in genesis_height..(leader_rotation_interval + extra_transactions) { let expected_balance = std::cmp::min( - LEADER_ROTATION_INTERVAL - genesis_height, + leader_rotation_interval - genesis_height, i - genesis_height, ); @@ -793,7 +837,7 @@ fn test_leader_to_validator_transition() { // transactions earlier let mut leader_client = mk_client(&leader_info); - let expected_bal = LEADER_ROTATION_INTERVAL - genesis_height; + let expected_bal = leader_rotation_interval - genesis_height; let bal = leader_client .poll_get_balance(&bob_pubkey) .expect("Expected success when polling newly transitioned validator for balance")