diff --git a/src/fullnode.rs b/src/fullnode.rs index 98077a6ad5..c87cd2bbc8 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -110,54 +110,7 @@ impl Fullnode { server } - pub fn new_with_bank( - keypair: Keypair, - bank: Bank, - entry_height: u64, - ledger_tail: &[Entry], - mut node: TestNode, - leader_info: Option, - exit: Arc, - ledger_path: Option<&str>, - sigverify_disabled: bool, - ) -> Self { - let bank = Arc::new(bank); - let thread_hdls = match leader_info { - Some(leader_info) => { - // Start in validator mode. - Self::create_validator_threads( - keypair, - &bank, - entry_height, - &ledger_tail, - node, - &leader_info, - exit.clone(), - ledger_path, - sigverify_disabled, - ) - } - None => { - // Start in leader mode. - node.data.leader_id = node.data.id; - - Self::create_leader_threads( - keypair, - &bank, - entry_height, - &ledger_tail, - node, - exit.clone(), - ledger_path.expect("ledger path"), - sigverify_disabled, - ) - } - }; - - Fullnode { exit, thread_hdls } - } - - /// Create a server instance acting as a leader. + /// Create a fullnode instance acting as a leader or validator. /// /// ```text /// .---------------------. @@ -180,86 +133,7 @@ impl Fullnode { /// | `-----` `-----` | | | /// | | `------------` /// `---------------------` - /// ``` - fn create_leader_threads( - keypair: Keypair, - bank: &Arc, - entry_height: u64, - ledger_tail: &[Entry], - node: TestNode, - exit: Arc, - ledger_path: &str, - sigverify_disabled: bool, - ) -> Vec> { - let tick_duration = None; - // TODO: To light up PoH, uncomment the following line: - //let tick_duration = Some(Duration::from_millis(1000)); - - let mut thread_hdls = vec![]; - let rpu = Rpu::new( - &bank, - node.sockets.requests, - node.sockets.respond, - exit.clone(), - ); - thread_hdls.extend(rpu.thread_hdls()); - - let mut drone_addr = node.data.contact_info.tpu; - drone_addr.set_port(DRONE_PORT); - let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT); - let rpc_service = JsonRpcService::new( - &bank, - node.data.contact_info.tpu, - drone_addr, - rpc_addr, - exit.clone(), - ); - thread_hdls.extend(rpc_service.thread_hdls()); - - let blob_recycler = BlobRecycler::default(); - let window = - window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler); - - let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); - - let (tpu, blob_receiver) = Tpu::new( - keypair, - &bank, - &crdt, - tick_duration, - node.sockets.transaction, - &blob_recycler, - exit.clone(), - ledger_path, - sigverify_disabled, - ); - thread_hdls.extend(tpu.thread_hdls()); - let ncp = Ncp::new( - &crdt, - window.clone(), - Some(ledger_path), - node.sockets.gossip, - node.sockets.gossip_send, - exit.clone(), - ).expect("Ncp::new"); - thread_hdls.extend(ncp.thread_hdls()); - - let broadcast_stage = BroadcastStage::new( - node.sockets.broadcast, - crdt, - window, - entry_height, - blob_recycler.clone(), - blob_receiver, - ); - thread_hdls.extend(broadcast_stage.thread_hdls()); - - thread_hdls - } - - /// Create a server instance acting as a validator. /// - /// ```text /// .-------------------------------. /// | Validator | /// | | @@ -286,71 +160,143 @@ impl Fullnode { /// `--------` | | `------------` /// `-------------------------------` /// ``` - fn create_validator_threads( + pub fn new_with_bank( keypair: Keypair, - bank: &Arc, + bank: Bank, entry_height: u64, ledger_tail: &[Entry], - node: TestNode, - entry_point: &NodeInfo, + mut node: TestNode, + leader_info: Option, exit: Arc, ledger_path: Option<&str>, - _sigverify_disabled: bool, - ) -> Vec> { - let mut thread_hdls = vec![]; - let rpu = Rpu::new( - &bank, - node.sockets.requests, - node.sockets.respond, - exit.clone(), - ); - thread_hdls.extend(rpu.thread_hdls()); + sigverify_disabled: bool, + ) -> Self { + if leader_info.is_none() { + node.data.leader_id = node.data.id; + } - let mut drone_addr = entry_point.contact_info.ncp; - drone_addr.set_port(DRONE_PORT); - let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT); - let rpc_service = JsonRpcService::new( - &bank, - node.data.contact_info.tpu, - drone_addr, - rpc_addr, - exit.clone(), - ); - thread_hdls.extend(rpc_service.thread_hdls()); + let bank = Arc::new(bank); + let thread_hdls = match leader_info { + Some(leader_info) => { + // Start in validator mode. + let mut thread_hdls = vec![]; + let rpu = Rpu::new( + &bank, + node.sockets.requests, + node.sockets.respond, + exit.clone(), + ); + thread_hdls.extend(rpu.thread_hdls()); - let blob_recycler = BlobRecycler::default(); - let window = - window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler); + let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT); + let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr, exit.clone()); + thread_hdls.extend(rpc_service.thread_hdls()); - let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); - crdt.write() - .expect("'crdt' write lock before insert() in pub fn replicate") - .insert(&entry_point); + let blob_recycler = BlobRecycler::default(); + let window = window::new_window_from_entries( + ledger_tail, + entry_height, + &node.data, + &blob_recycler, + ); - let ncp = Ncp::new( - &crdt, - window.clone(), - ledger_path, - node.sockets.gossip, - node.sockets.gossip_send, - exit.clone(), - ).expect("Ncp::new"); + let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); + crdt.write() + .expect("'crdt' write lock before insert() in pub fn replicate") + .insert(&leader_info); - let tvu = Tvu::new( - keypair, - &bank, - entry_height, - crdt.clone(), - window.clone(), - node.sockets.replicate, - node.sockets.repair, - node.sockets.retransmit, - ledger_path, - exit.clone(), - ); - thread_hdls.extend(tvu.thread_hdls()); - thread_hdls.extend(ncp.thread_hdls()); - thread_hdls + let ncp = Ncp::new( + &crdt, + window.clone(), + ledger_path, + node.sockets.gossip, + node.sockets.gossip_send, + exit.clone(), + ).expect("Ncp::new"); + + let tvu = Tvu::new( + keypair, + &bank, + entry_height, + crdt.clone(), + window.clone(), + node.sockets.replicate, + node.sockets.repair, + node.sockets.retransmit, + ledger_path, + exit.clone(), + ); + thread_hdls.extend(tvu.thread_hdls()); + thread_hdls.extend(ncp.thread_hdls()); + thread_hdls + } + None => { + // Start in leader mode. + let ledger_path = ledger_path.expect("ledger path"); + let tick_duration = None; + // TODO: To light up PoH, uncomment the following line: + //let tick_duration = Some(Duration::from_millis(1000)); + + let mut thread_hdls = vec![]; + let rpu = Rpu::new( + &bank, + node.sockets.requests, + node.sockets.respond, + exit.clone(), + ); + thread_hdls.extend(rpu.thread_hdls()); + + let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT); + let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr, exit.clone()); + thread_hdls.extend(rpc_service.thread_hdls()); + + let blob_recycler = BlobRecycler::default(); + let window = window::new_window_from_entries( + ledger_tail, + entry_height, + &node.data, + &blob_recycler, + ); + + let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); + + let (tpu, blob_receiver) = Tpu::new( + keypair, + &bank, + &crdt, + tick_duration, + node.sockets.transaction, + &blob_recycler, + exit.clone(), + ledger_path, + sigverify_disabled, + ); + thread_hdls.extend(tpu.thread_hdls()); + let ncp = Ncp::new( + &crdt, + window.clone(), + Some(ledger_path), + node.sockets.gossip, + node.sockets.gossip_send, + exit.clone(), + ).expect("Ncp::new"); + thread_hdls.extend(ncp.thread_hdls()); + + let broadcast_stage = BroadcastStage::new( + node.sockets.broadcast, + crdt, + window, + entry_height, + blob_recycler.clone(), + blob_receiver, + ); + thread_hdls.extend(broadcast_stage.thread_hdls()); + + thread_hdls + } + }; + + Fullnode { exit, thread_hdls } } //used for notifying many nodes in parallel to exit