diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 4c93e2ec4e..e710a91012 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -11,12 +11,11 @@ use clap::{App, Arg}; use solana::client::mk_client; use solana::crdt::Node; use solana::drone::DRONE_PORT; -use solana::fullnode::{Config, Fullnode, NodeRole}; +use solana::fullnode::{Config, Fullnode, FullnodeReturnType}; use solana::logger; use solana::metrics::set_panic_hook; use solana::signature::{Keypair, KeypairUtil}; use solana::thin_client::poll_gossip_for_leader; -use solana::tpu::TpuReturnType; use solana::wallet::request_airdrop; use std::fs::File; use std::net::{Ipv4Addr, SocketAddr}; @@ -125,23 +124,14 @@ fn main() -> () { } loop { - let node_role = fullnode.node_role.take(); - match node_role { - Some(NodeRole::Leader(leader_services)) => { - match leader_services.join() { - Ok(Some(TpuReturnType::LeaderRotation)) => (), - //fullnode.start_tvu(); - Err(e) => { - eprintln!("Leader returned error: {:?}", e); - exit(1); - } - _ => (), - } + let status = fullnode.handle_role_transition(); + match status { + Ok(Some(FullnodeReturnType::LeaderRotation)) => (), + _ => { + // Fullnode tpu/tvu exited for some unexpected + // reason, so exit + exit(1); } - Some(NodeRole::Validator(validator_services)) => { - let _ = validator_services.join(); - } - _ => (), } } } diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 98817a8faf..bafad3486c 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -328,8 +328,15 @@ mod tests { #[test] fn test_broadcast_stage_leader_rotation_exit() { - let (id, buddy_id, broadcast_stage, shared_window, entry_sender, crdt, entries) = - setup_dummy_broadcast_stage(); + let ( + id, + buddy_id, + broadcast_stage, + shared_window, + entry_sender, + crdt, + entries, + ) = setup_dummy_broadcast_stage(); { let mut wcrdt = crdt.write().unwrap(); // Set leader to myself diff --git a/src/drone.rs b/src/drone.rs index 697395f56e..e111d19906 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -335,7 +335,7 @@ mod tests { &[], leader, None, - Some(&ledger_path), + &ledger_path, false, ); diff --git a/src/fullnode.rs b/src/fullnode.rs index 4774e615bf..08b17e929a 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -12,6 +12,7 @@ use rpc::{JsonRpcService, RPC_PORT}; use rpu::Rpu; use service::Service; use signature::{Keypair, KeypairUtil}; +use std::net::UdpSocket; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -64,11 +65,23 @@ pub enum FullnodeReturnType { } pub struct Fullnode { + pub node_role: Option, + keypair: Arc, exit: Arc, rpu: Rpu, rpc_service: JsonRpcService, ncp: Ncp, - pub node_role: Option, + bank: Arc, + crdt: Arc>, + ledger_path: String, + sigverify_disabled: bool, + shared_window: window::SharedWindow, + replicate_socket: Vec, + repair_socket: UdpSocket, + retransmit_socket: UdpSocket, + transaction_sockets: Vec, + broadcast_socket: UdpSocket, + blob_recycler: BlobRecycler, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -102,22 +115,11 @@ impl Fullnode { sigverify_disabled: bool, ) -> Self { info!("creating bank..."); - let bank = Bank::new_default(leader_addr.is_none()); - - let entries = read_ledger(ledger_path, true).expect("opening ledger"); - - let entries = entries - .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); - - info!("processing ledger..."); - let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger"); - // entry_height is the network-wide agreed height of the ledger. - // initialize it from the input ledger - info!("processed {} ledger...", entry_height); + let (bank, entry_height, ledger_tail) = Self::new_bank_from_ledger(ledger_path); info!("creating networking stack..."); - let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); + info!( "starting... local gossip address: {} (advertising {})", local_gossip_addr, node.info.contact_info.ncp @@ -133,7 +135,7 @@ impl Fullnode { &ledger_tail, node, leader_info.as_ref(), - Some(ledger_path), + ledger_path, sigverify_disabled, ); @@ -212,7 +214,7 @@ impl Fullnode { ledger_tail: &[Entry], mut node: Node, leader_info: Option<&NodeInfo>, - ledger_path: Option<&str>, + ledger_path: &str, sigverify_disabled: bool, ) -> Self { if leader_info.is_none() { @@ -253,11 +255,12 @@ impl Fullnode { &crdt, shared_window.clone(), blob_recycler.clone(), - ledger_path, + Some(ledger_path), node.sockets.gossip, exit.clone(), ); + let keypair = Arc::new(keypair); let node_role; match leader_info { Some(leader_info) => { @@ -265,16 +268,26 @@ impl Fullnode { // TODO: let Crdt get that data from the network? crdt.write().unwrap().insert(leader_info); let tvu = Tvu::new( - keypair, + keypair.clone(), &bank, entry_height, - crdt, - shared_window, + crdt.clone(), + shared_window.clone(), blob_recycler.clone(), - node.sockets.replicate, - node.sockets.repair, - node.sockets.retransmit, - ledger_path, + node.sockets + .replicate + .iter() + .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) + .collect(), + node.sockets + .repair + .try_clone() + .expect("Failed to clone repair socket"), + node.sockets + .retransmit + .try_clone() + .expect("Failed to clone retransmit socket"), + Some(ledger_path), exit.clone(), ); let validator_state = ValidatorServices::new(tvu); @@ -282,17 +295,20 @@ impl Fullnode { } None => { // Start in leader mode. - let ledger_path = ledger_path.expect("ledger path"); let tick_duration = None; // TODO: To light up PoH, uncomment the following line: //let tick_duration = Some(Duration::from_millis(1000)); let (tpu, entry_receiver) = Tpu::new( - keypair, + keypair.clone(), &bank, &crdt, tick_duration, - node.sockets.transaction, + node.sockets + .transaction + .iter() + .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) + .collect(), &blob_recycler, exit.clone(), ledger_path, @@ -301,9 +317,12 @@ impl Fullnode { ); let broadcast_stage = BroadcastStage::new( - node.sockets.broadcast, - crdt, - shared_window, + node.sockets + .broadcast + .try_clone() + .expect("Failed to clone broadcast socket"), + crdt.clone(), + shared_window.clone(), entry_height, blob_recycler.clone(), entry_receiver, @@ -314,11 +333,81 @@ impl Fullnode { } Fullnode { + keypair, + crdt, + shared_window, + bank, + sigverify_disabled, rpu, ncp, rpc_service, node_role, + blob_recycler: blob_recycler.clone(), + ledger_path: ledger_path.to_owned(), exit, + replicate_socket: node.sockets.replicate, + repair_socket: node.sockets.repair, + retransmit_socket: node.sockets.retransmit, + transaction_sockets: node.sockets.transaction, + broadcast_socket: node.sockets.broadcast, + } + } + + fn leader_to_validator(&mut self) { + // TODO: We can avoid building the bank again once RecordStage is + // integrated with BankingStage + let (bank, entry_height, _) = Self::new_bank_from_ledger(&self.ledger_path); + self.bank = Arc::new(bank); + + { + let mut wcrdt = self.crdt.write().unwrap(); + let scheduled_leader = wcrdt.get_scheduled_leader(entry_height); + match scheduled_leader { + //TODO: Handle the case where we don't know who the next + //scheduled leader is + None => (), + Some(leader_id) => wcrdt.set_leader(leader_id), + } + } + + let tvu = Tvu::new( + self.keypair.clone(), + &self.bank, + entry_height, + self.crdt.clone(), + self.shared_window.clone(), + self.blob_recycler.clone(), + self.replicate_socket + .iter() + .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) + .collect(), + self.repair_socket + .try_clone() + .expect("Failed to clone repair socket"), + self.retransmit_socket + .try_clone() + .expect("Failed to clone retransmit socket"), + Some(&self.ledger_path), + self.exit.clone(), + ); + let validator_state = ValidatorServices::new(tvu); + self.node_role = Some(NodeRole::Validator(validator_state)); + } + + pub fn handle_role_transition(&mut self) -> Result> { + let node_role = self.node_role.take(); + match node_role { + Some(NodeRole::Leader(leader_services)) => match leader_services.join()? { + Some(TpuReturnType::LeaderRotation) => { + self.leader_to_validator(); + Ok(Some(FullnodeReturnType::LeaderRotation)) + } + _ => Ok(None), + }, + Some(NodeRole::Validator(validator_services)) => match validator_services.join()? { + _ => Ok(None), + }, + None => Ok(None), } } @@ -331,6 +420,19 @@ impl Fullnode { self.exit(); self.join() } + + fn new_bank_from_ledger(ledger_path: &str) -> (Bank, u64, Vec) { + let bank = Bank::new_default(false); + let entries = read_ledger(ledger_path, true).expect("opening ledger"); + let entries = entries + .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); + info!("processing ledger..."); + let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger"); + // entry_height is the network-wide agreed height of the ledger. + // initialize it from the input ledger + info!("processed {} ledger...", entry_height); + (bank, entry_height, ledger_tail) + } } impl Service for Fullnode { @@ -365,31 +467,56 @@ mod tests { use bank::Bank; use crdt::Node; use fullnode::Fullnode; - use mint::Mint; + use ledger::genesis; use service::Service; use signature::{Keypair, KeypairUtil}; + use std::fs::remove_dir_all; #[test] fn validator_exit() { let keypair = Keypair::new(); let tn = Node::new_localhost_with_pubkey(keypair.pubkey()); - let alice = Mint::new(10_000); + let (alice, validator_ledger_path) = genesis("validator_exit", 10_000); let bank = Bank::new(&alice); let entry = tn.info.clone(); - let v = Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), None, false); + let v = Fullnode::new_with_bank( + keypair, + bank, + 0, + &[], + tn, + Some(&entry), + &validator_ledger_path, + false, + ); v.close().unwrap(); + remove_dir_all(validator_ledger_path).unwrap(); } #[test] fn validator_parallel_exit() { + let mut ledger_paths = vec![]; let vals: Vec = (0..2) - .map(|_| { + .map(|i| { let keypair = Keypair::new(); let tn = Node::new_localhost_with_pubkey(keypair.pubkey()); - let alice = Mint::new(10_000); + let (alice, validator_ledger_path) = + genesis(&format!("validator_parallel_exit_{}", i), 10_000); + ledger_paths.push(validator_ledger_path.clone()); let bank = Bank::new(&alice); let entry = tn.info.clone(); - Fullnode::new_with_bank(keypair, bank, 0, &[], tn, Some(&entry), None, false) - }).collect(); + Fullnode::new_with_bank( + keypair, + bank, + 0, + &[], + tn, + Some(&entry), + &validator_ledger_path, + false, + ) + }) + .collect(); + //each validator can exit in parallel to speed many sequential calls to `join` vals.iter().for_each(|v| v.exit()); //while join is called sequentially, the above exit call notified all the @@ -397,5 +524,9 @@ mod tests { vals.into_iter().for_each(|v| { v.join().unwrap(); }); + + for path in ledger_paths { + remove_dir_all(path).unwrap(); + } } } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 39610802b4..d147cad2d0 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -67,7 +67,7 @@ impl ReplicateStage { Ok(()) } pub fn new( - keypair: Keypair, + keypair: Arc, bank: Arc, crdt: Arc>, blob_recycler: BlobRecycler, @@ -85,7 +85,7 @@ impl ReplicateStage { ); let vote_stage = VoteStage::new( - Arc::new(keypair), + keypair, bank.clone(), crdt.clone(), blob_recycler.clone(), diff --git a/src/thin_client.rs b/src/thin_client.rs index db353f8612..d3a3def4d9 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -470,7 +470,7 @@ mod tests { &[], leader, None, - Some(&ledger_path), + &ledger_path, false, ); sleep(Duration::from_millis(900)); @@ -515,7 +515,7 @@ mod tests { &[], leader, None, - Some(&ledger_path), + &ledger_path, false, ); //TODO: remove this sleep, or add a retry so CI is stable @@ -573,7 +573,7 @@ mod tests { &[], leader, None, - Some(&ledger_path), + &ledger_path, false, ); sleep(Duration::from_millis(300)); @@ -632,7 +632,7 @@ mod tests { &[], leader, None, - Some(&ledger_path), + &ledger_path, false, ); sleep(Duration::from_millis(900)); diff --git a/src/tpu.rs b/src/tpu.rs index 32ae4ad6ee..4dd6555f8f 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -57,7 +57,7 @@ pub struct Tpu { impl Tpu { pub fn new( - keypair: Keypair, + keypair: Arc, bank: &Arc, crdt: &Arc>, tick_duration: Option, diff --git a/src/tvu.rs b/src/tvu.rs index 1998be18d0..7d1431cd07 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -70,7 +70,7 @@ impl Tvu { /// * `exit` - The exit signal. #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new( - keypair: Keypair, + keypair: Arc, bank: &Arc, entry_height: u64, crdt: Arc>, @@ -236,7 +236,7 @@ pub mod tests { let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()); let tvu = Tvu::new( - target1_keypair, + Arc::new(target1_keypair), &bank, 0, cref1, diff --git a/src/wallet.rs b/src/wallet.rs index acc8aabdbb..b12046a13b 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -385,7 +385,7 @@ mod tests { &[], leader, None, - Some(&ledger_path), + &ledger_path, false, ); sleep(Duration::from_millis(200)); @@ -451,7 +451,7 @@ mod tests { &[], leader, None, - Some(&ledger_path), + &ledger_path, false, ); sleep(Duration::from_millis(200)); diff --git a/src/write_stage.rs b/src/write_stage.rs index bc10d4521b..890a694d11 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -170,7 +170,7 @@ impl WriteStage { /// Create a new WriteStage for writing and broadcasting entries. pub fn new( - keypair: Keypair, + keypair: Arc, bank: Arc, crdt: Arc>, blob_recycler: BlobRecycler, @@ -315,7 +315,7 @@ mod tests { Vec, ) { // Setup leader info - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let id = leader_keypair.pubkey(); let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());