From 6967cf7f86d39e84d0af4e9c50a3601ff0b03657 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Wed, 9 May 2018 11:15:14 -0600 Subject: [PATCH] Boot sync_channel() This is less useful now that we send Vec instead of Event. --- src/accounting_stage.rs | 14 +++++++------- src/bin/testnode.rs | 4 ++-- src/historian.rs | 12 ++++++------ src/recorder.rs | 12 ++++++------ src/thin_client.rs | 8 ++++---- src/tpu.rs | 9 ++++----- 6 files changed, 29 insertions(+), 30 deletions(-) diff --git a/src/accounting_stage.rs b/src/accounting_stage.rs index 6c6f7225f4..6837c6ad96 100644 --- a/src/accounting_stage.rs +++ b/src/accounting_stage.rs @@ -9,19 +9,19 @@ use recorder::Signal; use result::Result; use signature::PublicKey; use std::net::{SocketAddr, UdpSocket}; -use std::sync::mpsc::SyncSender; +use std::sync::mpsc::Sender; use std::sync::Mutex; use transaction::Transaction; pub struct AccountingStage { pub acc: Mutex, - historian_input: Mutex>, + historian_input: Mutex>, entry_info_subscribers: Mutex>, } impl AccountingStage { /// Create a new Tpu that wraps the given Accountant. - pub fn new(acc: Accountant, historian_input: SyncSender) -> Self { + pub fn new(acc: Accountant, historian_input: Sender) -> Self { AccountingStage { acc: Mutex::new(acc), entry_info_subscribers: Mutex::new(vec![]), @@ -144,7 +144,7 @@ mod tests { use historian::Historian; use mint::Mint; use signature::{KeyPair, KeyPairUtil}; - use std::sync::mpsc::sync_channel; + use std::sync::mpsc::channel; use transaction::Transaction; #[test] @@ -154,7 +154,7 @@ mod tests { // Entry OR if the verifier tries to parallelize across multiple Entries. let mint = Mint::new(2); let acc = Accountant::new(&mint); - let (input, event_receiver) = sync_channel(10); + let (input, event_receiver) = channel(); let historian = Historian::new(event_receiver, &mint.last_id(), None); let stage = AccountingStage::new(acc, input); @@ -201,7 +201,7 @@ mod bench { use rayon::prelude::*; use signature::{KeyPair, KeyPairUtil}; use std::collections::HashSet; - use std::sync::mpsc::sync_channel; + use std::sync::mpsc::channel; use std::time::Instant; use transaction::Transaction; @@ -245,7 +245,7 @@ mod bench { .map(|tr| Event::Transaction(tr)) .collect(); - let (input, event_receiver) = sync_channel(10); + let (input, event_receiver) = channel(); let historian = Historian::new(event_receiver, &mint.last_id(), None); let stage = AccountingStage::new(acc, input); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index cc4ad246ad..b2bfd17c44 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -18,7 +18,7 @@ use std::io::{stdin, stdout, Read}; use std::net::UdpSocket; use std::process::exit; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::sync_channel; +use std::sync::mpsc::channel; use std::sync::Arc; fn print_usage(program: &str, opts: Options) { @@ -116,7 +116,7 @@ fn main() { eprintln!("creating networking stack..."); - let (input, event_receiver) = sync_channel(10_000); + let (input, event_receiver) = channel(); let historian = Historian::new(event_receiver, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); let tpu = Arc::new(Tpu::new(acc, input, historian)); diff --git a/src/historian.rs b/src/historian.rs index 7d2478bf15..12aa760537 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -4,7 +4,7 @@ use entry::Entry; use hash::Hash; use recorder::{ExitReason, Recorder, Signal}; -use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; +use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::sync::{Arc, Mutex}; use std::thread::{spawn, JoinHandle}; use std::time::Instant; @@ -20,7 +20,7 @@ impl Historian { start_hash: &Hash, ms_per_tick: Option, ) -> Self { - let (entry_sender, output) = sync_channel(10_000); + let (entry_sender, output) = channel(); let thread_hdl = Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); let loutput = Arc::new(Mutex::new(output)); @@ -36,7 +36,7 @@ impl Historian { start_hash: Hash, ms_per_tick: Option, receiver: Receiver, - sender: SyncSender, + sender: Sender, ) -> JoinHandle { spawn(move || { let mut recorder = Recorder::new(receiver, sender, start_hash); @@ -66,7 +66,7 @@ mod tests { #[test] fn test_historian() { - let (input, event_receiver) = sync_channel(10); + let (input, event_receiver) = channel(); let zero = Hash::default(); let hist = Historian::new(event_receiver, &zero, None); @@ -95,7 +95,7 @@ mod tests { #[test] fn test_historian_closed_sender() { - let (input, event_receiver) = sync_channel(10); + let (input, event_receiver) = channel(); let zero = Hash::default(); let hist = Historian::new(event_receiver, &zero, None); drop(hist.output); @@ -108,7 +108,7 @@ mod tests { #[test] fn test_ticking_historian() { - let (input, event_receiver) = sync_channel(10); + let (input, event_receiver) = channel(); let zero = Hash::default(); let hist = Historian::new(event_receiver, &zero, Some(20)); sleep(Duration::from_millis(300)); diff --git a/src/recorder.rs b/src/recorder.rs index 1b1309053d..2ec50aea28 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -8,7 +8,7 @@ use entry::{create_entry_mut, Entry}; use event::Event; use hash::{hash, Hash}; -use std::sync::mpsc::{Receiver, SyncSender, TryRecvError}; +use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::time::{Duration, Instant}; #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] @@ -24,7 +24,7 @@ pub enum ExitReason { } pub struct Recorder { - sender: SyncSender, + sender: Sender, receiver: Receiver, last_hash: Hash, num_hashes: u64, @@ -32,7 +32,7 @@ pub struct Recorder { } impl Recorder { - pub fn new(receiver: Receiver, sender: SyncSender, last_hash: Hash) -> Self { + pub fn new(receiver: Receiver, sender: Sender, last_hash: Hash) -> Self { Recorder { receiver, sender, @@ -88,13 +88,13 @@ impl Recorder { mod tests { use super::*; use signature::{KeyPair, KeyPairUtil}; - use std::sync::mpsc::sync_channel; + use std::sync::mpsc::channel; use transaction::Transaction; #[test] fn test_events() { - let (signal_sender, signal_receiver) = sync_channel(500); - let (entry_sender, entry_receiver) = sync_channel(10); + let (signal_sender, signal_receiver) = channel(); + let (entry_sender, entry_receiver) = channel(); let zero = Hash::default(); let mut recorder = Recorder::new(signal_receiver, entry_sender, zero); let alice_keypair = KeyPair::new(); diff --git a/src/thin_client.rs b/src/thin_client.rs index 3622c155ff..096b43eaad 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -156,7 +156,7 @@ mod tests { use signature::{KeyPair, KeyPairUtil}; use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::sync_channel; + use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -183,7 +183,7 @@ mod tests { let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let (input, event_receiver) = sync_channel(10); + let (input, event_receiver) = channel(); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Arc::new(Tpu::new(acc, input, historian)); let threads = Tpu::serve(&acc, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); @@ -240,14 +240,14 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let leader_acc = { - let (input, event_receiver) = sync_channel(10); + let (input, event_receiver) = channel(); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Accountant::new(&alice); Arc::new(Tpu::new(acc, input, historian)) }; let replicant_acc = { - let (input, event_receiver) = sync_channel(10); + let (input, event_receiver) = channel(); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let acc = Accountant::new(&alice); Arc::new(Tpu::new(acc, input, historian)) diff --git a/src/tpu.rs b/src/tpu.rs index ff651ed9a2..579a9359f2 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -22,7 +22,7 @@ use std::io::{Cursor, Write}; use std::mem::size_of; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; @@ -39,7 +39,7 @@ type SharedTpu = Arc; impl Tpu { /// Create a new Tpu that wraps the given Accountant. - pub fn new(acc: Accountant, historian_input: SyncSender, historian: Historian) -> Self { + pub fn new(acc: Accountant, historian_input: Sender, historian: Historian) -> Self { let accounting = AccountingStage::new(acc, historian_input); Tpu { accounting, @@ -697,7 +697,6 @@ mod tests { use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; - use std::sync::mpsc::sync_channel; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -739,7 +738,7 @@ mod tests { let acc = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let (input, event_receiver) = sync_channel(10); + let (input, event_receiver) = channel(); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let tpu = Arc::new(Tpu::new(acc, input, historian)); let serve_addr = leader_serve.local_addr().unwrap(); @@ -848,7 +847,7 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); let acc = Accountant::new(&alice); - let (input, event_receiver) = sync_channel(10); + let (input, event_receiver) = channel(); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); let tpu = Arc::new(Tpu::new(acc, input, historian)); let replicate_addr = target1_data.replicate_addr;