diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index b9c3325402..7f79962ca8 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -731,14 +731,8 @@ fn converge( spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); - let window = default_window(); - let ncp = Ncp::new( - &spy_ref, - window.clone(), - None, - gossip_socket, - exit_signal.clone(), - ); + let window = Arc::new(RwLock::new(default_window())); + let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone()); let mut v: Vec = vec![]; //wait for the network to converge, 30 seconds should be plenty for _ in 0..30 { diff --git a/src/crdt.rs b/src/crdt.rs index 8b556d3ec9..2eb04b867d 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -1684,7 +1684,7 @@ mod tests { #[test] fn run_window_request() { logger::setup(); - let window = default_window(); + let window = Arc::new(RwLock::new(default_window())); let me = NodeInfo::new( Keypair::new().pubkey(), socketaddr!("127.0.0.1:1234"), @@ -1767,7 +1767,7 @@ mod tests { /// test window requests respond with the right blob, and do not overrun #[test] fn run_window_request_with_backoff() { - let window = default_window(); + let window = Arc::new(RwLock::new(default_window())); let mut me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); me.leader_id = me.id; @@ -1874,7 +1874,7 @@ mod tests { #[test] fn protocol_requestupdate_alive() { logger::setup(); - let window = default_window(); + let window = Arc::new(RwLock::new(default_window())); let recycler = BlobRecycler::default(); let node = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); diff --git a/src/fullnode.rs b/src/fullnode.rs index 9ee57fa290..eaeb7f3523 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -202,12 +202,13 @@ impl Fullnode { let blob_recycler = BlobRecycler::default(); let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler); + let shared_window = Arc::new(RwLock::new(window)); let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new"))); let ncp = Ncp::new( &crdt, - window.clone(), + shared_window.clone(), ledger_path, node.sockets.gossip, exit.clone(), @@ -224,7 +225,7 @@ impl Fullnode { &bank, entry_height, crdt, - window, + shared_window, node.sockets.replicate, node.sockets.repair, node.sockets.retransmit, @@ -256,7 +257,7 @@ impl Fullnode { let broadcast_stage = BroadcastStage::new( node.sockets.broadcast, crdt, - window, + shared_window, entry_height, blob_recycler.clone(), blob_receiver, diff --git a/src/tvu.rs b/src/tvu.rs index a84648f699..002bc0d814 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -169,7 +169,7 @@ pub mod tests { gossip: UdpSocket, exit: Arc, ) -> (Ncp, SharedWindow) { - let window = window::default_window(); + let window = Arc::new(RwLock::new(window::default_window())); let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit); (ncp, window) } diff --git a/src/window.rs b/src/window.rs index 992d7de9fa..11fd1dd2b7 100644 --- a/src/window.rs +++ b/src/window.rs @@ -326,11 +326,8 @@ pub fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u } } -pub fn default_window() -> SharedWindow { - Arc::new(RwLock::new(vec![ - WindowSlot::default(); - WINDOW_SIZE as usize - ])) +pub fn default_window() -> Window { + vec![WindowSlot::default(); WINDOW_SIZE as usize] } pub fn index_blobs( @@ -361,33 +358,29 @@ pub fn initialized_window( node_info: &NodeInfo, blobs: Vec, entry_height: u64, -) -> SharedWindow { - let window = default_window(); +) -> Window { + let mut window = default_window(); let id = node_info.id; - { - let mut win = window.write().unwrap(); + trace!( + "{} initialized window entry_height:{} blobs_len:{}", + id, + entry_height, + blobs.len() + ); - trace!( - "{} initialized window entry_height:{} blobs_len:{}", - id, - entry_height, - blobs.len() - ); + // Index the blobs + let mut received = entry_height - blobs.len() as u64; + index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window"); - // Index the blobs - let mut received = entry_height - blobs.len() as u64; - index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window"); - - // populate the window, offset by implied index - let diff = cmp::max(blobs.len() as isize - win.len() as isize, 0) as usize; - for b in blobs.into_iter().skip(diff) { - let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix % WINDOW_SIZE) as usize; - trace!("{} caching {} at {}", id, ix, pos); - assert!(win[pos].data.is_none()); - win[pos].data = Some(b); - } + // populate the window, offset by implied index + let diff = cmp::max(blobs.len() as isize - window.len() as isize, 0) as usize; + for b in blobs.into_iter().skip(diff) { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix % WINDOW_SIZE) as usize; + trace!("{} caching {} at {}", id, ix, pos); + assert!(window[pos].data.is_none()); + window[pos].data = Some(b); } window @@ -398,7 +391,7 @@ pub fn new_window_from_entries( entry_height: u64, node_info: &NodeInfo, blob_recycler: &BlobRecycler, -) -> SharedWindow { +) -> Window { // convert to blobs let blobs = ledger_tail.to_blobs(&blob_recycler); initialized_window(&node_info, blobs, entry_height) @@ -406,8 +399,6 @@ pub fn new_window_from_entries( #[cfg(test)] mod test { - use crdt::{Crdt, Node}; - use logger; use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; use signature::Pubkey; use std::io; @@ -415,12 +406,10 @@ mod test { use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; + use std::sync::Arc; use std::time::Duration; - use streamer::{blob_receiver, receiver, responder, BlobReceiver, PacketReceiver}; - use window::{ - blob_idx_in_window, calculate_highest_lost_blob_index, default_window, WINDOW_SIZE, - }; + use streamer::{receiver, responder, PacketReceiver}; + use window::{blob_idx_in_window, calculate_highest_lost_blob_index, WINDOW_SIZE}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { diff --git a/src/window_service.rs b/src/window_service.rs index cdeadbfad1..0cfff352c2 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -303,16 +303,13 @@ pub fn window_service( mod test { use crdt::{Crdt, Node}; use logger; - use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; - use signature::Pubkey; - use std::io; - use std::net::UdpSocket; + use packet::{BlobRecycler, PACKET_DATA_SIZE}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; - use streamer::{blob_receiver, receiver, responder, BlobReceiver, PacketReceiver}; - use window::{default_window, WINDOW_SIZE}; + use streamer::{blob_receiver, responder, BlobReceiver}; + use window::default_window; use window_service::{repair_backoff, window_service}; fn get_blobs(r: BlobReceiver, num: &mut usize) { @@ -353,7 +350,7 @@ mod test { ); let (s_window, r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); - let win = default_window(); + let win = Arc::new(RwLock::new(default_window())); let t_window = window_service( subs, win, @@ -423,7 +420,7 @@ mod test { ); let (s_window, _r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); - let win = default_window(); + let win = Arc::new(RwLock::new(default_window())); let t_window = window_service( subs.clone(), win, @@ -486,7 +483,7 @@ mod test { ); let (s_window, _r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); - let win = default_window(); + let win = Arc::new(RwLock::new(default_window())); let t_window = window_service( subs.clone(), win, diff --git a/tests/multinode.rs b/tests/multinode.rs index f4472bee2e..e4bf1de67b 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -41,7 +41,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_crdt)); - let spy_window = default_window(); + let spy_window = Arc::new(RwLock::new(default_window())); let ncp = Ncp::new(&spy_ref, spy_window, None, spy.sockets.gossip, exit.clone()); //wait for the network to converge let mut converged = false; @@ -439,7 +439,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { let (alice, ledger_path) = genesis( "leader_restart_validator_start_from_old_ledger", - 100_000 + 500 * solana::window::MAX_REPAIR_BACKOFF as i64, + 100_000 + 500 * solana::window_service::MAX_REPAIR_BACKOFF as i64, ); let bob_pubkey = Keypair::new().pubkey(); @@ -487,7 +487,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { // send requests so the validator eventually sees a gap and requests a repair let mut expected = 1500; let mut client = mk_client(&validator_data); - for _ in 0..solana::window::MAX_REPAIR_BACKOFF { + for _ in 0..solana::window_service::MAX_REPAIR_BACKOFF { let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(expected)) .unwrap();