diff --git a/src/fullnode.rs b/src/fullnode.rs index d2cd35634b..2718846365 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -177,49 +177,45 @@ impl Fullnode { let bank = Arc::new(bank); let mut thread_hdls = vec![]; + + let rpu = Rpu::new( + &bank, + node.sockets.requests, + node.sockets.respond, + exit.clone(), + ); + thread_hdls.extend(rpu.thread_hdls()); + + let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT); + let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr, exit.clone()); + thread_hdls.extend(rpc_service.thread_hdls()); + + let blob_recycler = BlobRecycler::default(); + let window = + window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler); + + let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); + + let ncp = Ncp::new( + &crdt, + window.clone(), + ledger_path, + node.sockets.gossip, + node.sockets.gossip_send, + exit.clone(), + ).expect("Ncp::new"); + thread_hdls.extend(ncp.thread_hdls()); + match leader_info { - Some(leader_info) => { + Some(ref leader_info) => { // Start in validator mode. - let rpu = Rpu::new( - &bank, - node.sockets.requests, - node.sockets.respond, - exit.clone(), - ); - thread_hdls.extend(rpu.thread_hdls()); - - let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT); - let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr, exit.clone()); - thread_hdls.extend(rpc_service.thread_hdls()); - - let blob_recycler = BlobRecycler::default(); - let window = window::new_window_from_entries( - ledger_tail, - entry_height, - &node.data, - &blob_recycler, - ); - - let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); - crdt.write() - .expect("'crdt' write lock before insert() in pub fn replicate") - .insert(&leader_info); - - let ncp = Ncp::new( - &crdt, - window.clone(), - ledger_path, - node.sockets.gossip, - node.sockets.gossip_send, - exit.clone(), - ).expect("Ncp::new"); - + crdt.write().unwrap().insert(leader_info); let tvu = Tvu::new( keypair, &bank, entry_height, - crdt.clone(), - window.clone(), + crdt, + window, node.sockets.replicate, node.sockets.repair, node.sockets.retransmit, @@ -227,7 +223,6 @@ impl Fullnode { exit.clone(), ); thread_hdls.extend(tvu.thread_hdls()); - thread_hdls.extend(ncp.thread_hdls()); } None => { // Start in leader mode. @@ -236,28 +231,6 @@ impl Fullnode { // TODO: To light up PoH, uncomment the following line: //let tick_duration = Some(Duration::from_millis(1000)); - let rpu = Rpu::new( - &bank, - node.sockets.requests, - node.sockets.respond, - exit.clone(), - ); - thread_hdls.extend(rpu.thread_hdls()); - - let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT); - let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr, exit.clone()); - thread_hdls.extend(rpc_service.thread_hdls()); - - let blob_recycler = BlobRecycler::default(); - let window = window::new_window_from_entries( - ledger_tail, - entry_height, - &node.data, - &blob_recycler, - ); - - let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); - let (tpu, blob_receiver) = Tpu::new( keypair, &bank, @@ -270,15 +243,6 @@ impl Fullnode { sigverify_disabled, ); thread_hdls.extend(tpu.thread_hdls()); - let ncp = Ncp::new( - &crdt, - window.clone(), - Some(ledger_path), - node.sockets.gossip, - node.sockets.gossip_send, - exit.clone(), - ).expect("Ncp::new"); - thread_hdls.extend(ncp.thread_hdls()); let broadcast_stage = BroadcastStage::new( node.sockets.broadcast,