diff --git a/src/crdt.rs b/src/crdt.rs old mode 100755 new mode 100644 index 5e714f8054..ad332f4152 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -33,7 +33,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; -use streamer::{BlobReceiver, BlobSender, Window, WindowIndex}; +use streamer::{BlobReceiver, BlobSender, SharedWindow, WindowIndex}; use timing::timestamp; use transaction::Vote; @@ -553,7 +553,7 @@ impl Crdt { pub fn broadcast( me: &NodeInfo, broadcast_table: &[NodeInfo], - window: &Window, + window: &SharedWindow, s: &UdpSocket, transmit_index: &mut WindowIndex, received_index: u64, @@ -944,7 +944,7 @@ impl Crdt { .unwrap() } fn run_window_request( - window: &Window, + window: &SharedWindow, me: &NodeInfo, from: &NodeInfo, ix: u64, @@ -1010,7 +1010,7 @@ impl Crdt { //TODO we should first coalesce all the requests fn handle_blob( obj: &Arc>, - window: &Window, + window: &SharedWindow, blob_recycler: &BlobRecycler, blob: &Blob, ) -> Option { @@ -1026,7 +1026,7 @@ impl Crdt { fn handle_protocol( request: Protocol, obj: &Arc>, - window: &Window, + window: &SharedWindow, blob_recycler: &BlobRecycler, ) -> Option { match request { @@ -1122,7 +1122,7 @@ impl Crdt { /// Process messages from the network fn run_listen( obj: &Arc>, - window: &Window, + window: &SharedWindow, blob_recycler: &BlobRecycler, requests_receiver: &BlobReceiver, response_sender: &BlobSender, @@ -1144,7 +1144,7 @@ impl Crdt { } pub fn listen( obj: Arc>, - window: Window, + window: SharedWindow, blob_recycler: BlobRecycler, requests_receiver: BlobReceiver, response_sender: BlobSender, diff --git a/src/fullnode.rs b/src/fullnode.rs index 3ddf20f11b..15b6ea5d2d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -144,7 +144,7 @@ impl FullNode { entry_height: u64, node_info: &NodeInfo, blob_recycler: &BlobRecycler, - ) -> streamer::Window { + ) -> streamer::SharedWindow { match ledger_tail { Some(ledger_tail) => { // convert to blobs diff --git a/src/ncp.rs b/src/ncp.rs index 2bcf624a98..c4ff31df72 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -19,7 +19,7 @@ pub struct Ncp { impl Ncp { pub fn new( crdt: &Arc>, - window: streamer::Window, + window: streamer::SharedWindow, gossip_listen_socket: UdpSocket, gossip_send_socket: UdpSocket, exit: Arc, diff --git a/src/streamer.rs b/src/streamer.rs old mode 100755 new mode 100644 index 952d19625b..495370727a --- a/src/streamer.rs +++ b/src/streamer.rs @@ -31,7 +31,7 @@ pub struct WindowSlot { pub coding: Option, } -pub type Window = Arc>>; +pub type SharedWindow = Arc>>; #[derive(Debug, PartialEq, Eq)] pub enum WindowError { @@ -171,7 +171,7 @@ pub fn blob_receiver( } fn find_next_missing( - window: &Window, + window: &SharedWindow, crdt: &Arc>, consumed: u64, received: u64, @@ -197,7 +197,7 @@ fn find_next_missing( fn repair_window( debug_id: u64, - window: &Window, + window: &SharedWindow, crdt: &Arc>, last: &mut u64, times: &mut usize, @@ -319,7 +319,7 @@ fn process_blob( blob: SharedBlob, pix: u64, consume_queue: &mut SharedBlobs, - window: &Window, + window: &SharedWindow, recycler: &BlobRecycler, consumed: &mut u64, ) { @@ -419,7 +419,7 @@ fn process_blob( fn recv_window( debug_id: u64, - window: &Window, + window: &SharedWindow, crdt: &Arc>, recycler: &BlobRecycler, consumed: &mut u64, @@ -515,7 +515,7 @@ fn recv_window( Ok(()) } -fn print_window(debug_id: u64, window: &Window, consumed: u64) -> String { +fn print_window(debug_id: u64, window: &SharedWindow, consumed: u64) -> String { let pointer: Vec<_> = window .read() .unwrap() @@ -559,7 +559,7 @@ fn print_window(debug_id: u64, window: &Window, consumed: u64) -> String { ) } -pub fn default_window() -> Window { +pub fn default_window() -> SharedWindow { Arc::new(RwLock::new(vec![ WindowSlot::default(); WINDOW_SIZE as usize @@ -594,7 +594,7 @@ pub fn initialized_window( node_info: &NodeInfo, blobs: Vec, entry_height: u64, -) -> Window { +) -> SharedWindow { let window = default_window(); let debug_id = node_info.debug_id(); @@ -628,7 +628,7 @@ pub fn initialized_window( pub fn window( crdt: Arc>, - window: Window, + window: SharedWindow, entry_height: u64, recycler: BlobRecycler, r: BlobReceiver, @@ -676,7 +676,7 @@ pub fn window( fn broadcast( node_info: &NodeInfo, broadcast_table: &[NodeInfo], - window: &Window, + window: &SharedWindow, recycler: &BlobRecycler, r: &BlobReceiver, sock: &UdpSocket, @@ -786,7 +786,7 @@ fn broadcast( pub fn broadcaster( sock: UdpSocket, crdt: Arc>, - window: Window, + window: SharedWindow, entry_height: u64, recycler: BlobRecycler, r: BlobReceiver, diff --git a/src/tvu.rs b/src/tvu.rs index 8221430dc6..7d2e134eec 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -29,7 +29,7 @@ //! //! 1. Fetch Stage //! - Incoming blobs are picked up from the replicate socket and repair socket. -//! 2. Window Stage +//! 2. SharedWindow Stage //! - Blobs are windowed until a contiguous chunk is available. This stage also repairs and //! retransmits blobs that are in the queue. //! 3. Replicate Stage @@ -47,7 +47,7 @@ use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::thread::{self, JoinHandle}; -use streamer::Window; +use streamer::SharedWindow; use window_stage::WindowStage; pub struct Tvu { @@ -73,7 +73,7 @@ impl Tvu { bank: &Arc, entry_height: u64, crdt: Arc>, - window: Window, + window: SharedWindow, replicate_socket: UdpSocket, repair_socket: UdpSocket, retransmit_socket: UdpSocket, @@ -156,7 +156,7 @@ pub mod tests { use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::time::Duration; - use streamer::{self, Window}; + use streamer::{self, SharedWindow}; use transaction::Transaction; use tvu::Tvu; @@ -164,7 +164,7 @@ pub mod tests { crdt: Arc>, listen: UdpSocket, exit: Arc, - ) -> Result<(Ncp, Window)> { + ) -> Result<(Ncp, SharedWindow)> { let window = streamer::default_window(); let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); let ncp = Ncp::new(&crdt, window.clone(), listen, send_sock, exit)?; diff --git a/src/window_stage.rs b/src/window_stage.rs index 244740d6ca..dd7d68f8db 100644 --- a/src/window_stage.rs +++ b/src/window_stage.rs @@ -7,7 +7,7 @@ use std::net::UdpSocket; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::{self, JoinHandle}; -use streamer::{self, BlobReceiver, Window}; +use streamer::{self, BlobReceiver, SharedWindow}; pub struct WindowStage { thread_hdls: Vec>, @@ -16,7 +16,7 @@ pub struct WindowStage { impl WindowStage { pub fn new( crdt: &Arc>, - window: Window, + window: SharedWindow, entry_height: u64, retransmit_socket: UdpSocket, blob_recycler: &BlobRecycler,