Cleanup type aliases and imports

This commit is contained in:
Greg Fitzgerald
2018-06-27 12:33:56 -06:00
committed by Greg Fitzgerald
parent bb89d6f54d
commit 4aedd3f1b6
16 changed files with 80 additions and 91 deletions

View File

@ -5,8 +5,7 @@
use bank::Bank; use bank::Bank;
use bincode::deserialize; use bincode::deserialize;
use counter::Counter; use counter::Counter;
use packet; use packet::{PacketRecycler, Packets, SharedPackets};
use packet::SharedPackets;
use rayon::prelude::*; use rayon::prelude::*;
use record_stage::Signal; use record_stage::Signal;
use result::Result; use result::Result;
@ -38,7 +37,7 @@ impl BankingStage {
bank: Arc<Bank>, bank: Arc<Bank>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
packet_recycler: packet::PacketRecycler, packet_recycler: PacketRecycler,
) -> Self { ) -> Self {
let (signal_sender, signal_receiver) = channel(); let (signal_sender, signal_receiver) = channel();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
@ -65,7 +64,7 @@ impl BankingStage {
/// Convert the transactions from a blob of binary data to a vector of transactions and /// Convert the transactions from a blob of binary data to a vector of transactions and
/// an unused `SocketAddr` that could be used to send a response. /// an unused `SocketAddr` that could be used to send a response.
fn deserialize_transactions(p: &packet::Packets) -> Vec<Option<(Transaction, SocketAddr)>> { fn deserialize_transactions(p: &Packets) -> Vec<Option<(Transaction, SocketAddr)>> {
p.packets p.packets
.par_iter() .par_iter()
.map(|x| { .map(|x| {
@ -82,7 +81,7 @@ impl BankingStage {
bank: Arc<Bank>, bank: Arc<Bank>,
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>, verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
signal_sender: &Sender<Signal>, signal_sender: &Sender<Signal>,
packet_recycler: &packet::PacketRecycler, packet_recycler: &PacketRecycler,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let recv_start = Instant::now(); let recv_start = Instant::now();

View File

@ -1,30 +1,26 @@
//! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel. //! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel.
use packet; use packet::BlobRecycler;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use streamer; use streamer::{self, BlobReceiver};
pub struct BlobFetchStage { pub struct BlobFetchStage {
pub blob_receiver: streamer::BlobReceiver, pub blob_receiver: BlobReceiver,
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
impl BlobFetchStage { impl BlobFetchStage {
pub fn new( pub fn new(socket: UdpSocket, exit: Arc<AtomicBool>, blob_recycler: BlobRecycler) -> Self {
socket: UdpSocket,
exit: Arc<AtomicBool>,
blob_recycler: packet::BlobRecycler,
) -> Self {
Self::new_multi_socket(vec![socket], exit, blob_recycler) Self::new_multi_socket(vec![socket], exit, blob_recycler)
} }
pub fn new_multi_socket( pub fn new_multi_socket(
sockets: Vec<UdpSocket>, sockets: Vec<UdpSocket>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
blob_recycler: packet::BlobRecycler, blob_recycler: BlobRecycler,
) -> Self { ) -> Self {
let (blob_sender, blob_receiver) = channel(); let (blob_sender, blob_receiver) = channel();
let thread_hdls: Vec<_> = sockets let thread_hdls: Vec<_> = sockets

View File

@ -5,7 +5,7 @@
use bank::Bank; use bank::Bank;
use entry::Entry; use entry::Entry;
use ledger::Block; use ledger::Block;
use packet; use packet::BlobRecycler;
use result::Result; use result::Result;
use serde_json; use serde_json;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -14,7 +14,7 @@ use std::io::Write;
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use streamer; use streamer::BlobSender;
pub struct EntryWriter<'a> { pub struct EntryWriter<'a> {
bank: &'a Bank, bank: &'a Bank,
@ -57,8 +57,8 @@ impl<'a> EntryWriter<'a> {
/// continuosly broadcast blobs of entries out /// continuosly broadcast blobs of entries out
pub fn write_and_send_entries<W: Write>( pub fn write_and_send_entries<W: Write>(
&self, &self,
broadcast: &streamer::BlobSender, broadcast: &BlobSender,
blob_recycler: &packet::BlobRecycler, blob_recycler: &BlobRecycler,
writer: &Mutex<W>, writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>, entry_receiver: &Receiver<Entry>,
) -> Result<()> { ) -> Result<()> {

View File

@ -1,30 +1,26 @@
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel. //! The `fetch_stage` batches input from a UDP socket and sends it to a channel.
use packet; use packet::PacketRecycler;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use streamer; use streamer::{self, PacketReceiver};
pub struct FetchStage { pub struct FetchStage {
pub packet_receiver: streamer::PacketReceiver, pub packet_receiver: PacketReceiver,
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
impl FetchStage { impl FetchStage {
pub fn new( pub fn new(socket: UdpSocket, exit: Arc<AtomicBool>, packet_recycler: PacketRecycler) -> Self {
socket: UdpSocket,
exit: Arc<AtomicBool>,
packet_recycler: packet::PacketRecycler,
) -> Self {
Self::new_multi_socket(vec![socket], exit, packet_recycler) Self::new_multi_socket(vec![socket], exit, packet_recycler)
} }
pub fn new_multi_socket( pub fn new_multi_socket(
sockets: Vec<UdpSocket>, sockets: Vec<UdpSocket>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
packet_recycler: packet::PacketRecycler, packet_recycler: PacketRecycler,
) -> Self { ) -> Self {
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let thread_hdls: Vec<_> = sockets let thread_hdls: Vec<_> = sockets

View File

@ -1,7 +1,7 @@
//! The `ncp` module implements the network control plane. //! The `ncp` module implements the network control plane.
use crdt; use crdt::Crdt;
use packet; use packet::{BlobRecycler, SharedBlob};
use result::Result; use result::Result;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
@ -16,13 +16,13 @@ pub struct Ncp {
impl Ncp { impl Ncp {
pub fn new( pub fn new(
crdt: Arc<RwLock<crdt::Crdt>>, crdt: Arc<RwLock<Crdt>>,
window: Arc<RwLock<Vec<Option<packet::SharedBlob>>>>, window: Arc<RwLock<Vec<Option<SharedBlob>>>>,
gossip_listen_socket: UdpSocket, gossip_listen_socket: UdpSocket,
gossip_send_socket: UdpSocket, gossip_send_socket: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Result<Ncp> { ) -> Result<Ncp> {
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let (request_sender, request_receiver) = channel(); let (request_sender, request_receiver) = channel();
trace!( trace!(
"Ncp: id: {:?}, listening on: {:?}", "Ncp: id: {:?}, listening on: {:?}",
@ -42,7 +42,7 @@ impl Ncp {
blob_recycler.clone(), blob_recycler.clone(),
response_receiver, response_receiver,
); );
let t_listen = crdt::Crdt::listen( let t_listen = Crdt::listen(
crdt.clone(), crdt.clone(),
window, window,
blob_recycler.clone(), blob_recycler.clone(),
@ -50,7 +50,7 @@ impl Ncp {
response_sender.clone(), response_sender.clone(),
exit.clone(), exit.clone(),
); );
let t_gossip = crdt::Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit); let t_gossip = Crdt::gossip(crdt.clone(), blob_recycler, response_sender, exit);
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Ok(Ncp { thread_hdls }) Ok(Ncp { thread_hdls })
} }

View File

@ -16,6 +16,7 @@ use std::time::Instant;
pub type SharedPackets = Arc<RwLock<Packets>>; pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>; pub type SharedBlob = Arc<RwLock<Blob>>;
pub type SharedBlobs = VecDeque<SharedBlob>;
pub type PacketRecycler = Recycler<Packets>; pub type PacketRecycler = Recycler<Packets>;
pub type BlobRecycler = Recycler<Blob>; pub type BlobRecycler = Recycler<Blob>;
@ -274,7 +275,7 @@ pub fn to_blob<T: Serialize>(
pub fn to_blobs<T: Serialize>( pub fn to_blobs<T: Serialize>(
rsps: Vec<(T, SocketAddr)>, rsps: Vec<(T, SocketAddr)>,
blob_recycler: &BlobRecycler, blob_recycler: &BlobRecycler,
) -> Result<VecDeque<SharedBlob>> { ) -> Result<SharedBlobs> {
let mut blobs = VecDeque::new(); let mut blobs = VecDeque::new();
for (resp, rsp_addr) in rsps { for (resp, rsp_addr) in rsps {
blobs.push_back(to_blob(resp, rsp_addr, blob_recycler)?); blobs.push_back(to_blob(resp, rsp_addr, blob_recycler)?);
@ -367,7 +368,7 @@ impl Blob {
self.meta.size = new_size; self.meta.size = new_size;
self.set_data_size(new_size as u64).unwrap(); self.set_data_size(new_size as u64).unwrap();
} }
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<VecDeque<SharedBlob>> { pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<SharedBlobs> {
let mut v = VecDeque::new(); let mut v = VecDeque::new();
//DOCUMENTED SIDE-EFFECT //DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll //Performance out of the IO without poll
@ -405,11 +406,7 @@ impl Blob {
} }
Ok(v) Ok(v)
} }
pub fn send_to( pub fn send_to(re: &BlobRecycler, socket: &UdpSocket, v: &mut SharedBlobs) -> Result<()> {
re: &BlobRecycler,
socket: &UdpSocket,
v: &mut VecDeque<SharedBlob>,
) -> Result<()> {
while let Some(r) = v.pop_front() { while let Some(r) = v.pop_front() {
{ {
let p = r.read().expect("'r' read lock in pub fn send_to"); let p = r.read().expect("'r' read lock in pub fn send_to");

View File

@ -3,7 +3,7 @@
use entry::Entry; use entry::Entry;
use hash::{hash, Hash}; use hash::{hash, Hash};
use ledger::next_entries_mut; use ledger;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use transaction::Transaction; use transaction::Transaction;
@ -28,7 +28,7 @@ impl Recorder {
} }
pub fn record(&mut self, transactions: Vec<Transaction>) -> Vec<Entry> { pub fn record(&mut self, transactions: Vec<Transaction>) -> Vec<Entry> {
next_entries_mut(&mut self.last_hash, &mut self.num_hashes, transactions) ledger::next_entries_mut(&mut self.last_hash, &mut self.num_hashes, transactions)
} }
pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option<Entry> { pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option<Entry> {

View File

@ -2,13 +2,13 @@
use bank::Bank; use bank::Bank;
use ledger; use ledger;
use packet; use packet::BlobRecycler;
use result::Result; use result::Result;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer; use streamer::BlobReceiver;
pub struct ReplicateStage { pub struct ReplicateStage {
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
@ -18,8 +18,8 @@ impl ReplicateStage {
/// Process entry blobs, already in order /// Process entry blobs, already in order
fn replicate_requests( fn replicate_requests(
bank: &Arc<Bank>, bank: &Arc<Bank>,
blob_receiver: &streamer::BlobReceiver, blob_receiver: &BlobReceiver,
blob_recycler: &packet::BlobRecycler, blob_recycler: &BlobRecycler,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let blobs = blob_receiver.recv_timeout(timer)?; let blobs = blob_receiver.recv_timeout(timer)?;
@ -36,8 +36,8 @@ impl ReplicateStage {
pub fn new( pub fn new(
bank: Arc<Bank>, bank: Arc<Bank>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
window_receiver: streamer::BlobReceiver, window_receiver: BlobReceiver,
blob_recycler: packet::BlobRecycler, blob_recycler: BlobRecycler,
) -> Self { ) -> Self {
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-replicate-stage".to_string()) .name("solana-replicate-stage".to_string())

View File

@ -1,8 +1,7 @@
//! The `request_stage` processes thin client Request messages. //! The `request_stage` processes thin client Request messages.
use bincode::deserialize; use bincode::deserialize;
use packet; use packet::{to_blobs, BlobRecycler, PacketRecycler, Packets, SharedPackets};
use packet::SharedPackets;
use rayon::prelude::*; use rayon::prelude::*;
use request::Request; use request::Request;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
@ -13,17 +12,17 @@ use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Instant; use std::time::Instant;
use streamer; use streamer::{self, BlobReceiver, BlobSender};
use timing; use timing;
pub struct RequestStage { pub struct RequestStage {
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
pub blob_receiver: streamer::BlobReceiver, pub blob_receiver: BlobReceiver,
pub request_processor: Arc<RequestProcessor>, pub request_processor: Arc<RequestProcessor>,
} }
impl RequestStage { impl RequestStage {
pub fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> { pub fn deserialize_requests(p: &Packets) -> Vec<Option<(Request, SocketAddr)>> {
p.packets p.packets
.par_iter() .par_iter()
.map(|x| { .map(|x| {
@ -37,9 +36,9 @@ impl RequestStage {
pub fn process_request_packets( pub fn process_request_packets(
request_processor: &RequestProcessor, request_processor: &RequestProcessor,
packet_receiver: &Receiver<SharedPackets>, packet_receiver: &Receiver<SharedPackets>,
blob_sender: &streamer::BlobSender, blob_sender: &BlobSender,
packet_recycler: &packet::PacketRecycler, packet_recycler: &PacketRecycler,
blob_recycler: &packet::BlobRecycler, blob_recycler: &BlobRecycler,
) -> Result<()> { ) -> Result<()> {
let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; let (batch, batch_len) = streamer::recv_batch(packet_receiver)?;
@ -60,7 +59,7 @@ impl RequestStage {
let rsps = request_processor.process_requests(reqs); let rsps = request_processor.process_requests(reqs);
let blobs = packet::to_blobs(rsps, blob_recycler)?; let blobs = to_blobs(rsps, blob_recycler)?;
if !blobs.is_empty() { if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len()); info!("process: sending blobs: {}", blobs.len());
//don't wake up the other side if there is nothing //don't wake up the other side if there is nothing
@ -84,8 +83,8 @@ impl RequestStage {
request_processor: RequestProcessor, request_processor: RequestProcessor,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
packet_receiver: Receiver<SharedPackets>, packet_receiver: Receiver<SharedPackets>,
packet_recycler: packet::PacketRecycler, packet_recycler: PacketRecycler,
blob_recycler: packet::BlobRecycler, blob_recycler: BlobRecycler,
) -> Self { ) -> Self {
let request_processor = Arc::new(request_processor); let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone(); let request_processor_ = request_processor.clone();

View File

@ -24,7 +24,7 @@
//! ``` //! ```
use bank::Bank; use bank::Bank;
use packet; use packet::{BlobRecycler, PacketRecycler};
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
use request_stage::RequestStage; use request_stage::RequestStage;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -45,7 +45,7 @@ impl Rpu {
respond_socket: UdpSocket, respond_socket: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
let packet_recycler = packet::PacketRecycler::default(); let packet_recycler = PacketRecycler::default();
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver( let t_receiver = streamer::receiver(
requests_socket, requests_socket,
@ -54,7 +54,7 @@ impl Rpu {
packet_sender, packet_sender,
); );
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let request_processor = RequestProcessor::new(bank.clone()); let request_processor = RequestProcessor::new(bank.clone());
let request_stage = RequestStage::new( let request_stage = RequestStage::new(
request_processor, request_processor,

View File

@ -3,7 +3,7 @@
use bank::Bank; use bank::Bank;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use ncp::Ncp; use ncp::Ncp;
use packet; use packet::BlobRecycler;
use rpu::Rpu; use rpu::Rpu;
use std::io::Write; use std::io::Write;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -61,7 +61,7 @@ impl Server {
let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone());
thread_hdls.extend(rpu.thread_hdls); thread_hdls.extend(rpu.thread_hdls);
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let tpu = Tpu::new( let tpu = Tpu::new(
bank.clone(), bank.clone(),
tick_duration, tick_duration,

View File

@ -14,7 +14,7 @@ use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Instant; use std::time::Instant;
use streamer; use streamer::{self, PacketReceiver};
use timing; use timing;
pub struct SigVerifyStage { pub struct SigVerifyStage {
@ -38,7 +38,7 @@ impl SigVerifyStage {
} }
fn verifier( fn verifier(
recvr: &Arc<Mutex<streamer::PacketReceiver>>, recvr: &Arc<Mutex<PacketReceiver>>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>, sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> { ) -> Result<()> {
let (batch, len) = let (batch, len) =
@ -76,7 +76,7 @@ impl SigVerifyStage {
fn verifier_service( fn verifier_service(
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
packet_receiver: Arc<Mutex<streamer::PacketReceiver>>, packet_receiver: Arc<Mutex<PacketReceiver>>,
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>, verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
@ -89,7 +89,7 @@ impl SigVerifyStage {
fn verifier_services( fn verifier_services(
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
packet_receiver: streamer::PacketReceiver, packet_receiver: PacketReceiver,
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>, verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender)); let sender = Arc::new(Mutex::new(verified_sender));

View File

@ -3,22 +3,24 @@
use crdt::Crdt; use crdt::Crdt;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, BLOB_SIZE}; use packet::{
Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE,
};
use result::{Error, Result}; use result::{Error, Result};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::mem; use std::mem;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
pub const WINDOW_SIZE: usize = 2 * 1024; pub const WINDOW_SIZE: usize = 2 * 1024;
pub type PacketReceiver = mpsc::Receiver<SharedPackets>; pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = mpsc::Sender<SharedPackets>; pub type PacketSender = Sender<SharedPackets>;
pub type BlobSender = mpsc::Sender<VecDeque<SharedBlob>>; pub type BlobSender = Sender<SharedBlobs>;
pub type BlobReceiver = mpsc::Receiver<VecDeque<SharedBlob>>; pub type BlobReceiver = Receiver<SharedBlobs>;
pub type Window = Arc<RwLock<Vec<Option<SharedBlob>>>>; pub type Window = Arc<RwLock<Vec<Option<SharedBlob>>>>;
fn recv_loop( fn recv_loop(

View File

@ -37,13 +37,13 @@
use bank::Bank; use bank::Bank;
use blob_fetch_stage::BlobFetchStage; use blob_fetch_stage::BlobFetchStage;
use crdt::Crdt; use crdt::Crdt;
use packet; use packet::BlobRecycler;
use replicate_stage::ReplicateStage; use replicate_stage::ReplicateStage;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::JoinHandle; use std::thread::JoinHandle;
use streamer; use streamer::Window;
use window_stage::WindowStage; use window_stage::WindowStage;
pub struct Tvu { pub struct Tvu {
@ -64,13 +64,13 @@ impl Tvu {
pub fn new( pub fn new(
bank: Arc<Bank>, bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
window: streamer::Window, window: Window,
replicate_socket: UdpSocket, replicate_socket: UdpSocket,
repair_socket: UdpSocket, repair_socket: UdpSocket,
retransmit_socket: UdpSocket, retransmit_socket: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let fetch_stage = BlobFetchStage::new_multi_socket( let fetch_stage = BlobFetchStage::new_multi_socket(
vec![replicate_socket, repair_socket], vec![replicate_socket, repair_socket],
exit.clone(), exit.clone(),
@ -120,7 +120,7 @@ pub mod tests {
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
use streamer; use streamer::{self, Window};
use transaction::Transaction; use transaction::Transaction;
use tvu::Tvu; use tvu::Tvu;
@ -128,7 +128,7 @@ pub mod tests {
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
listen: UdpSocket, listen: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Result<(Ncp, streamer::Window)> { ) -> Result<(Ncp, Window)> {
let window = streamer::default_window(); let window = streamer::default_window();
let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let ncp = Ncp::new(crdt, window.clone(), listen, send_sock, exit)?; let ncp = Ncp::new(crdt, window.clone(), listen, send_sock, exit)?;

View File

@ -1,27 +1,27 @@
//! The `window_stage` maintains the blob window //! The `window_stage` maintains the blob window
use crdt::Crdt; use crdt::Crdt;
use packet; use packet::BlobRecycler;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::JoinHandle; use std::thread::JoinHandle;
use streamer; use streamer::{self, BlobReceiver, Window};
pub struct WindowStage { pub struct WindowStage {
pub blob_receiver: streamer::BlobReceiver, pub blob_receiver: BlobReceiver,
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
impl WindowStage { impl WindowStage {
pub fn new( pub fn new(
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
window: streamer::Window, window: Window,
retransmit_socket: UdpSocket, retransmit_socket: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
blob_recycler: packet::BlobRecycler, blob_recycler: BlobRecycler,
fetch_stage_receiver: streamer::BlobReceiver, fetch_stage_receiver: BlobReceiver,
entry_count: usize, entry_count: usize,
) -> Self { ) -> Self {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();

View File

@ -5,17 +5,17 @@
use bank::Bank; use bank::Bank;
use entry::Entry; use entry::Entry;
use entry_writer::EntryWriter; use entry_writer::EntryWriter;
use packet; use packet::BlobRecycler;
use std::io::Write; use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use streamer; use streamer::BlobReceiver;
pub struct WriteStage { pub struct WriteStage {
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
pub blob_receiver: streamer::BlobReceiver, pub blob_receiver: BlobReceiver,
} }
impl WriteStage { impl WriteStage {
@ -23,7 +23,7 @@ impl WriteStage {
pub fn new<W: Write + Send + 'static>( pub fn new<W: Write + Send + 'static>(
bank: Arc<Bank>, bank: Arc<Bank>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
blob_recycler: packet::BlobRecycler, blob_recycler: BlobRecycler,
writer: Mutex<W>, writer: Mutex<W>,
entry_receiver: Receiver<Entry>, entry_receiver: Receiver<Entry>,
) -> Self { ) -> Self {