diff --git a/Cargo.lock b/Cargo.lock index 7ea2575f17..e1e616bfbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4503,6 +4503,7 @@ name = "solana-banks-server" version = "1.10.0" dependencies = [ "bincode", + "crossbeam-channel", "futures 0.3.19", "solana-banks-interface", "solana-runtime", @@ -4519,6 +4520,7 @@ name = "solana-bench-streamer" version = "1.10.0" dependencies = [ "clap 2.33.3", + "crossbeam-channel", "solana-net-utils", "solana-streamer", "solana-version", @@ -4529,6 +4531,7 @@ name = "solana-bench-tps" version = "1.10.0" dependencies = [ "clap 2.33.3", + "crossbeam-channel", "log 0.4.14", "rayon", "serde_json", @@ -4633,6 +4636,7 @@ dependencies = [ "console", "const_format", "criterion-stats", + "crossbeam-channel", "ctrlc", "humantime", "log 0.4.14", @@ -4711,6 +4715,7 @@ dependencies = [ "bincode", "bs58 0.4.0", "clap 2.33.3", + "crossbeam-channel", "indicatif", "jsonrpc-core", "jsonrpc-http-server", @@ -4885,6 +4890,7 @@ name = "solana-entry" version = "1.10.0" dependencies = [ "bincode", + "crossbeam-channel", "dlopen", "dlopen_derive", "log 0.4.14", @@ -4908,6 +4914,7 @@ dependencies = [ "bincode", "byteorder", "clap 2.33.3", + "crossbeam-channel", "log 0.4.14", "serde", "serde_derive", @@ -5020,6 +5027,7 @@ dependencies = [ "bincode", "bv", "clap 2.33.3", + "crossbeam-channel", "flate2", "indexmap", "itertools 0.10.3", @@ -5066,6 +5074,7 @@ dependencies = [ "chrono", "clap 2.33.3", "console", + "crossbeam-channel", "ctrlc", "dirs-next", "indicatif", @@ -5166,6 +5175,7 @@ dependencies = [ "bs58 0.4.0", "bytecount", "clap 2.33.3", + "crossbeam-channel", "csv", "dashmap", "histogram", @@ -5289,6 +5299,7 @@ dependencies = [ name = "solana-metrics" version = "1.10.0" dependencies = [ + "crossbeam-channel", "env_logger 0.9.0", "gethostname", "lazy_static", @@ -5316,6 +5327,7 @@ version = "1.10.0" dependencies = [ "bincode", "clap 2.33.3", + "crossbeam-channel", "log 0.4.14", "nix", "rand 0.7.3", @@ -5655,6 +5667,7 @@ version = "1.10.0" dependencies = [ "bincode", "bs58 0.4.0", + "crossbeam-channel", "jsonrpc-core", "jsonrpc-core-client", "log 0.4.14", @@ -5806,6 +5819,7 @@ dependencies = [ name = "solana-send-transaction-service" version = "1.10.0" dependencies = [ + "crossbeam-channel", "log 0.4.14", "solana-logger 1.10.0", "solana-metrics", @@ -5906,6 +5920,7 @@ dependencies = [ name = "solana-streamer" version = "1.10.0" dependencies = [ + "crossbeam-channel", "histogram", "itertools 0.10.3", "libc", @@ -6053,6 +6068,7 @@ dependencies = [ "clap 2.33.3", "console", "core_affinity", + "crossbeam-channel", "fd-lock", "indicatif", "jsonrpc-core", diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 874b2e2c90..24181c666d 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -1,7 +1,7 @@ #![allow(clippy::integer_arithmetic)] use { clap::{crate_description, crate_name, value_t, App, Arg}, - crossbeam_channel::unbounded, + crossbeam_channel::{unbounded, Receiver}, log::*, rand::{thread_rng, Rng}, rayon::prelude::*, @@ -28,7 +28,7 @@ use { }, solana_streamer::socket::SocketAddrSpace, std::{ - sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex, RwLock}, + sync::{atomic::Ordering, Arc, Mutex, RwLock}, thread::sleep, time::{Duration, Instant}, }, diff --git a/banks-server/Cargo.toml b/banks-server/Cargo.toml index 8e52027051..7d93dcee2d 100644 --- a/banks-server/Cargo.toml +++ b/banks-server/Cargo.toml @@ -11,6 +11,7 @@ edition = "2021" [dependencies] bincode = "1.3.3" +crossbeam-channel = "0.5" futures = "0.3" solana-banks-interface = { path = "../banks-interface", version = "=1.10.0" } solana-runtime = { path = "../runtime", version = "=1.10.0" } diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index 6e94d24401..bb98125672 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -1,5 +1,6 @@ use { bincode::{deserialize, serialize}, + crossbeam_channel::{unbounded, Receiver, Sender}, futures::{future, prelude::stream::StreamExt}, solana_banks_interface::{ Banks, BanksRequest, BanksResponse, BanksTransactionResultWithSimulation, @@ -30,10 +31,7 @@ use { convert::TryFrom, io, net::{Ipv4Addr, SocketAddr}, - sync::{ - mpsc::{channel, Receiver, Sender}, - Arc, RwLock, - }, + sync::{Arc, RwLock}, thread::Builder, time::Duration, }, @@ -96,7 +94,7 @@ impl BanksServer { block_commitment_cache: Arc>, poll_signature_status_sleep_duration: Duration, ) -> Self { - let (transaction_sender, transaction_receiver) = channel(); + let (transaction_sender, transaction_receiver) = unbounded(); let bank = bank_forks.read().unwrap().working_bank(); let slot = bank.slot(); { @@ -392,7 +390,7 @@ pub async fn start_tcp_server( // serve is generated by the service attribute. It takes as input any type implementing // the generated Banks trait. .map(move |chan| { - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); SendTransactionService::new::( tpu_addr, diff --git a/bench-streamer/Cargo.toml b/bench-streamer/Cargo.toml index 923022cacf..b669aebc0f 100644 --- a/bench-streamer/Cargo.toml +++ b/bench-streamer/Cargo.toml @@ -9,6 +9,7 @@ homepage = "https://solana.com/" publish = false [dependencies] +crossbeam-channel = "0.5" clap = "2.33.1" solana-streamer = { path = "../streamer", version = "=1.10.0" } solana-net-utils = { path = "../net-utils", version = "=1.10.0" } diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 46eeeb7613..eba249311c 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -1,6 +1,7 @@ #![allow(clippy::integer_arithmetic)] use { clap::{crate_description, crate_name, App, Arg}, + crossbeam_channel::unbounded, solana_streamer::{ packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, streamer::{receiver, PacketBatchReceiver}, @@ -10,7 +11,6 @@ use { net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - mpsc::channel, Arc, }, thread::{sleep, spawn, JoinHandle, Result}, @@ -89,7 +89,7 @@ fn main() -> Result<()> { addr = read.local_addr().unwrap(); port = addr.port(); - let (s_reader, r_reader) = channel(); + let (s_reader, r_reader) = unbounded(); read_channels.push(r_reader); read_threads.push(receiver( Arc::new(read), diff --git a/bench-tps/Cargo.toml b/bench-tps/Cargo.toml index a9f954a582..e1559e90d3 100644 --- a/bench-tps/Cargo.toml +++ b/bench-tps/Cargo.toml @@ -10,6 +10,7 @@ publish = false [dependencies] clap = "2.33.1" +crossbeam-channel = "0.5" log = "0.4.14" rayon = "1.5.1" serde_json = "1.0.74" diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index 8b40aab0a8..7411903e6a 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -1,5 +1,6 @@ #![allow(clippy::integer_arithmetic)] use { + crossbeam_channel::unbounded, serial_test::serial, solana_bench_tps::{ bench::{do_bench_tps, generate_and_fund_keypairs}, @@ -15,10 +16,7 @@ use { }, solana_sdk::signature::{Keypair, Signer}, solana_streamer::socket::SocketAddrSpace, - std::{ - sync::{mpsc::channel, Arc}, - time::Duration, - }, + std::{sync::Arc, time::Duration}, }; fn test_bench_tps_local_cluster(config: Config) { @@ -52,7 +50,7 @@ fn test_bench_tps_local_cluster(config: Config) { VALIDATOR_PORT_RANGE, )); - let (addr_sender, addr_receiver) = channel(); + let (addr_sender, addr_receiver) = unbounded(); run_local_faucet_with_port(faucet_keypair, addr_sender, None, 0); let faucet_addr = addr_receiver .recv_timeout(Duration::from_secs(2)) diff --git a/cli/Cargo.toml b/cli/Cargo.toml index ad3536e232..5386c0c008 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -17,6 +17,7 @@ criterion-stats = "0.3.0" ctrlc = { version = "3.2.1", features = ["termination"] } console = "0.15.0" const_format = "0.2.22" +crossbeam-channel = "0.5" log = "0.4.14" humantime = "2.0.1" num-traits = "0.2" diff --git a/cli/src/cluster_query.rs b/cli/src/cluster_query.rs index f72f651ed9..8f06b5d603 100644 --- a/cli/src/cluster_query.rs +++ b/cli/src/cluster_query.rs @@ -5,6 +5,7 @@ use { }, clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand}, console::{style, Emoji}, + crossbeam_channel::unbounded, serde::{Deserialize, Serialize}, solana_clap_utils::{ input_parsers::*, @@ -1368,7 +1369,7 @@ pub fn process_ping( println_name_value("Source Account:", &config.signers[0].pubkey().to_string()); println!(); - let (signal_sender, signal_receiver) = std::sync::mpsc::channel(); + let (signal_sender, signal_receiver) = unbounded(); ctrlc::set_handler(move || { let _ = signal_sender.send(()); }) diff --git a/client/Cargo.toml b/client/Cargo.toml index a0a7824464..f98d05b558 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -14,6 +14,7 @@ base64 = "0.13.0" bincode = "1.3.3" bs58 = "0.4.0" clap = "2.33.0" +crossbeam-channel = "0.5" indicatif = "0.16.2" jsonrpc-core = "18.0.0" log = "0.4.14" diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index 79b33b6e94..5a6abd2850 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -10,6 +10,7 @@ use { RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate, }, }, + crossbeam_channel::{unbounded, Receiver, Sender}, log::*, serde::de::DeserializeOwned, serde_json::{ @@ -24,7 +25,6 @@ use { net::TcpStream, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{channel, Receiver, Sender}, Arc, RwLock, }, thread::{sleep, JoinHandle}, @@ -242,7 +242,7 @@ impl PubsubClient { ) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); @@ -283,7 +283,7 @@ impl PubsubClient { ) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); @@ -322,7 +322,7 @@ impl PubsubClient { ) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); @@ -361,7 +361,7 @@ impl PubsubClient { ) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); @@ -398,7 +398,7 @@ impl PubsubClient { pub fn vote_subscribe(url: &str) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); @@ -431,7 +431,7 @@ impl PubsubClient { pub fn root_subscribe(url: &str) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); @@ -468,7 +468,7 @@ impl PubsubClient { ) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); @@ -506,7 +506,7 @@ impl PubsubClient { pub fn slot_subscribe(url: &str) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; - let (sender, receiver) = channel::(); + let (sender, receiver) = unbounded::(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index e7c1e82a13..5b447ada8b 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -4940,6 +4940,7 @@ mod tests { super::*, crate::{client_error::ClientErrorKind, mock_sender::PUBKEY}, assert_matches::assert_matches, + crossbeam_channel::unbounded, jsonrpc_core::{futures::prelude::*, Error, IoHandler, Params}, jsonrpc_http_server::{AccessControlAllowOrigin, DomainsValidation, ServerBuilder}, serde_json::Number, @@ -4949,7 +4950,7 @@ mod tests { system_transaction, transaction::TransactionError, }, - std::{io, sync::mpsc::channel, thread}, + std::{io, thread}, }; #[test] @@ -4969,7 +4970,7 @@ mod tests { } fn _test_send() { - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); thread::spawn(move || { let rpc_addr = "0.0.0.0:0".parse().unwrap(); let mut io = IoHandler::default(); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 25c5bcf540..ef2ada5710 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -4,7 +4,7 @@ extern crate test; use { - crossbeam_channel::unbounded, + crossbeam_channel::{unbounded, Receiver}, log::*, rand::{thread_rng, Rng}, rayon::prelude::*, @@ -37,7 +37,7 @@ use { solana_streamer::socket::SocketAddrSpace, std::{ collections::VecDeque, - sync::{atomic::Ordering, mpsc::Receiver, Arc, RwLock}, + sync::{atomic::Ordering, Arc, RwLock}, time::{Duration, Instant}, }, test::Bencher, diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 3ae1b60598..5fbb66b225 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -4,6 +4,7 @@ extern crate solana_core; extern crate test; use { + crossbeam_channel::unbounded, log::*, solana_core::retransmit_stage::retransmitter, solana_entry::entry::Entry, @@ -30,7 +31,6 @@ use { net::UdpSocket, sync::{ atomic::{AtomicUsize, Ordering}, - mpsc::channel, Arc, RwLock, }, thread::{sleep, Builder}, @@ -77,7 +77,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { let bank_forks = BankForks::new(bank0); let bank = bank_forks.working_bank(); let bank_forks = Arc::new(RwLock::new(bank_forks)); - let (shreds_sender, shreds_receiver) = channel(); + let (shreds_sender, shreds_receiver) = unbounded(); const NUM_THREADS: usize = 2; let sockets = (0..NUM_THREADS) .map(|_| UdpSocket::bind("0.0.0.0:0").unwrap()) diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 6755db61eb..4ab9cc5713 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -15,10 +15,7 @@ use { system_transaction, timing::duration_as_ms, }, - std::{ - sync::mpsc::channel, - time::{Duration, Instant}, - }, + std::time::{Duration, Instant}, test::Bencher, }; @@ -63,7 +60,7 @@ fn bench_packet_discard(bencher: &mut Bencher) { #[bench] fn bench_sigverify_stage(bencher: &mut Bencher) { solana_logger::setup(); - let (packet_s, packet_r) = channel(); + let (packet_s, packet_r) = unbounded(); let (verified_s, verified_r) = unbounded(); let verifier = TransactionSigVerifier::default(); let stage = SigVerifyStage::new(packet_r, verified_s, verifier); diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index ff8647f64d..59761633d8 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -5,6 +5,7 @@ // set and halt the node if a mismatch is detected. use { + crossbeam_channel::RecvTimeoutError, rayon::ThreadPool, solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}, solana_measure::measure::Measure, @@ -24,7 +25,6 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::RecvTimeoutError, Arc, }, thread::{self, Builder, JoinHandle}, diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index 9ce9a5efb2..db06cec05b 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -29,7 +29,6 @@ use { net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::channel, Arc, RwLock, }, thread::{self, sleep, Builder, JoinHandle}, @@ -147,7 +146,7 @@ impl AncestorHashesService { ) -> Self { let outstanding_requests: Arc> = Arc::new(RwLock::new(OutstandingAncestorHashesRepairs::default())); - let (response_sender, response_receiver) = channel(); + let (response_sender, response_receiver) = unbounded(); let t_receiver = streamer::receiver( ancestor_hashes_request_socket.clone(), &exit, @@ -703,7 +702,7 @@ mod test { solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks}, solana_sdk::{hash::Hash, signature::Keypair}, solana_streamer::socket::SocketAddrSpace, - std::{collections::HashMap, sync::mpsc::channel}, + std::collections::HashMap, trees::tr, }; @@ -896,8 +895,8 @@ mod test { // Set up thread to give us responses let ledger_path = get_tmp_ledger_path!(); let exit = Arc::new(AtomicBool::new(false)); - let (requests_sender, requests_receiver) = channel(); - let (response_sender, response_receiver) = channel(); + let (requests_sender, requests_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); // Set up blockstore for responses let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index f42088e4f9..cf3784ccd0 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1475,7 +1475,7 @@ where mod tests { use { super::*, - crossbeam_channel::unbounded, + crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, solana_entry::entry::{next_entry, Entry, EntrySlice}, solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, @@ -1507,10 +1507,7 @@ mod tests { std::{ net::SocketAddr, path::Path, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::Receiver, - }, + sync::atomic::{AtomicBool, Ordering}, thread::sleep, }, }; diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index e05b7384bb..84cdb9fa22 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -12,10 +12,7 @@ use { cluster_nodes::{ClusterNodes, ClusterNodesCache}, result::{Error, Result}, }, - crossbeam_channel::{ - Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError, - Sender as CrossbeamSender, - }, + crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender}, itertools::Itertools, solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}, solana_ledger::{blockstore::Blockstore, shred::Shred}, @@ -39,7 +36,6 @@ use { net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender}, Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -58,8 +54,8 @@ const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); pub(crate) const NUM_INSERT_THREADS: usize = 2; -pub(crate) type RetransmitSlotsSender = CrossbeamSender; -pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver; +pub(crate) type RetransmitSlotsSender = Sender; +pub(crate) type RetransmitSlotsReceiver = Receiver; pub(crate) type RecordReceiver = Receiver<(Arc>, Option)>; pub(crate) type TransmitReceiver = Receiver<(Arc>, Option)>; @@ -211,12 +207,10 @@ impl BroadcastStage { match e { Error::RecvTimeout(RecvTimeoutError::Disconnected) | Error::Send - | Error::Recv(RecvError) - | Error::CrossbeamRecvTimeout(CrossbeamRecvTimeoutError::Disconnected) => { + | Error::Recv(RecvError) => { return Some(BroadcastStageReturnType::ChannelDisconnected); } Error::RecvTimeout(RecvTimeoutError::Timeout) - | Error::CrossbeamRecvTimeout(CrossbeamRecvTimeoutError::Timeout) | Error::ClusterInfo(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? _ => { inc_new_counter_error!("streamer-broadcaster-error", 1, 1); @@ -256,8 +250,8 @@ impl BroadcastStage { ) -> Self { let btree = blockstore.clone(); let exit = exit_sender.clone(); - let (socket_sender, socket_receiver) = channel(); - let (blockstore_sender, blockstore_receiver) = channel(); + let (socket_sender, socket_receiver) = unbounded(); + let (blockstore_sender, blockstore_receiver) = unbounded(); let bs_run = broadcast_stage_run.clone(); let socket_sender_ = socket_sender.clone(); @@ -474,7 +468,7 @@ pub mod test { }, std::{ path::Path, - sync::{atomic::AtomicBool, mpsc::channel, Arc}, + sync::{atomic::AtomicBool, Arc}, thread::sleep, }, }; @@ -546,7 +540,7 @@ pub mod test { // Setup let ledger_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let (transmit_sender, transmit_receiver) = channel(); + let (transmit_sender, transmit_receiver) = unbounded(); let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); // Make some shreds @@ -651,7 +645,7 @@ pub mod test { // Create the leader scheduler let leader_keypair = Keypair::new(); - let (entry_sender, entry_receiver) = channel(); + let (entry_sender, entry_receiver) = unbounded(); let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let broadcast_service = setup_dummy_broadcast_service( &leader_keypair.pubkey(), diff --git a/core/src/broadcast_stage/broadcast_metrics.rs b/core/src/broadcast_stage/broadcast_metrics.rs index a05a3d9384..cfa49b01eb 100644 --- a/core/src/broadcast_stage/broadcast_metrics.rs +++ b/core/src/broadcast_stage/broadcast_metrics.rs @@ -287,7 +287,7 @@ mod test { let slot_broadcast_stats = Arc::new(Mutex::new(SlotBroadcastStats::default())); let num_threads = 5; let slot = 0; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let thread_handles: Vec<_> = (0..num_threads) .map(|i| { let slot_broadcast_stats = slot_broadcast_stats.clone(); diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index 902b5672d7..c9135d0a60 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -1,12 +1,13 @@ use { crate::result::Result, + crossbeam_channel::Receiver, solana_entry::entry::Entry, solana_ledger::shred::Shred, solana_poh::poh_recorder::WorkingBankEntry, solana_runtime::bank::Bank, solana_sdk::clock::Slot, std::{ - sync::{mpsc::Receiver, Arc}, + sync::Arc, time::{Duration, Instant}, }, }; @@ -84,12 +85,12 @@ pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result mod tests { use { super::*, + crossbeam_channel::unbounded, solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, solana_sdk::{ genesis_config::GenesisConfig, pubkey::Pubkey, system_transaction, transaction::Transaction, }, - std::sync::mpsc::channel, }; fn setup_test() -> (GenesisConfig, Arc, Transaction) { @@ -114,7 +115,7 @@ mod tests { let (genesis_config, bank0, tx) = setup_test(); let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); - let (s, r) = channel(); + let (s, r) = unbounded(); let mut last_hash = genesis_config.hash(); assert!(bank1.max_tick_height() > 1); @@ -144,7 +145,7 @@ mod tests { let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); let bank2 = Arc::new(Bank::new_from_parent(&bank1, &Pubkey::default(), 2)); - let (s, r) = channel(); + let (s, r) = unbounded(); let mut last_hash = genesis_config.hash(); assert!(bank1.max_tick_height() > 1); diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 1788b19e86..017c28a13f 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -166,8 +166,8 @@ impl StandardBroadcastRun { receive_results: ReceiveResults, bank_forks: &Arc>, ) -> Result<()> { - let (bsend, brecv) = channel(); - let (ssend, srecv) = channel(); + let (bsend, brecv) = unbounded(); + let (ssend, srecv) = unbounded(); self.process_receive_results(keypair, blockstore, &ssend, &bsend, receive_results)?; let srecv = Arc::new(Mutex::new(srecv)); let brecv = Arc::new(Mutex::new(brecv)); @@ -763,8 +763,8 @@ mod test { let num_shreds_per_slot = 2; let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket, _bank_forks) = setup(num_shreds_per_slot); - let (bsend, brecv) = channel(); - let (ssend, _srecv) = channel(); + let (bsend, brecv) = unbounded(); + let (ssend, _srecv) = unbounded(); let mut last_tick_height = 0; let mut standard_broadcast_run = StandardBroadcastRun::new(0); let mut process_ticks = |num_ticks| { diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 728cc50800..eb137ef69c 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -9,10 +9,7 @@ use { }, vote_stake_tracker::VoteStakeTracker, }, - crossbeam_channel::{ - unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Select, - Sender as CrossbeamSender, - }, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Select, Sender}, log::*, solana_gossip::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, @@ -57,16 +54,16 @@ use { // Map from a vote account to the authorized voter for an epoch pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>; -pub type VerifiedLabelVotePacketsSender = CrossbeamSender>; -pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver>; -pub type VerifiedVoteTransactionsSender = CrossbeamSender>; -pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; -pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec)>; -pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vec)>; -pub type GossipVerifiedVoteHashSender = CrossbeamSender<(Pubkey, Slot, Hash)>; -pub type GossipVerifiedVoteHashReceiver = CrossbeamReceiver<(Pubkey, Slot, Hash)>; -pub type GossipDuplicateConfirmedSlotsSender = CrossbeamSender; -pub type GossipDuplicateConfirmedSlotsReceiver = CrossbeamReceiver; +pub type VerifiedLabelVotePacketsSender = Sender>; +pub type VerifiedLabelVotePacketsReceiver = Receiver>; +pub type VerifiedVoteTransactionsSender = Sender>; +pub type VerifiedVoteTransactionsReceiver = Receiver>; +pub type VerifiedVoteSender = Sender<(Pubkey, Vec)>; +pub type VerifiedVoteReceiver = Receiver<(Pubkey, Vec)>; +pub type GossipVerifiedVoteHashSender = Sender<(Pubkey, Slot, Hash)>; +pub type GossipVerifiedVoteHashReceiver = Receiver<(Pubkey, Slot, Hash)>; +pub type GossipDuplicateConfirmedSlotsSender = Sender; +pub type GossipDuplicateConfirmedSlotsReceiver = Receiver; const THRESHOLDS_TO_CHECK: [f64; 2] = [DUPLICATE_THRESHOLD, VOTE_THRESHOLD_SIZE]; const BANK_SEND_VOTES_LOOP_SLEEP_MS: u128 = 10; @@ -198,7 +195,7 @@ impl ClusterInfoVoteListener { pub fn new( exit: Arc, cluster_info: Arc, - verified_packets_sender: CrossbeamSender>, + verified_packets_sender: Sender>, poh_recorder: Arc>, vote_tracker: Arc, bank_forks: Arc>, @@ -337,7 +334,7 @@ impl ClusterInfoVoteListener { exit: Arc, verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver, poh_recorder: Arc>, - verified_packets_sender: &CrossbeamSender>, + verified_packets_sender: &Sender>, ) -> Result<()> { let mut verified_vote_packets = VerifiedVotePackets::default(); let mut time_since_lock = Instant::now(); @@ -358,8 +355,8 @@ impl ClusterInfoVoteListener { would_be_leader, ) { match e { - Error::CrossbeamRecvTimeout(RecvTimeoutError::Disconnected) - | Error::CrossbeamRecvTimeout(RecvTimeoutError::Timeout) => (), + Error::RecvTimeout(RecvTimeoutError::Disconnected) + | Error::RecvTimeout(RecvTimeoutError::Timeout) => (), _ => { error!("thread {:?} error {:?}", thread::current().name(), e); } @@ -385,7 +382,7 @@ impl ClusterInfoVoteListener { fn check_for_leader_bank_and_send_votes( bank_vote_sender_state_option: &mut Option, current_working_bank: Arc, - verified_packets_sender: &CrossbeamSender>, + verified_packets_sender: &Sender>, verified_vote_packets: &VerifiedVotePackets, ) -> Result<()> { // We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS` @@ -489,7 +486,7 @@ impl ClusterInfoVoteListener { .add_new_optimistic_confirmed_slots(confirmed_slots.clone()); } Err(e) => match e { - Error::CrossbeamRecvTimeout(RecvTimeoutError::Disconnected) => { + Error::RecvTimeout(RecvTimeoutError::Disconnected) => { return Ok(()); } Error::ReadyTimeout => (), diff --git a/core/src/commitment_service.rs b/core/src/commitment_service.rs index dcdc769f78..1a27d00981 100644 --- a/core/src/commitment_service.rs +++ b/core/src/commitment_service.rs @@ -1,5 +1,6 @@ use { crate::consensus::Stake, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, solana_measure::measure::Measure, solana_metrics::datapoint_info, solana_rpc::rpc_subscriptions::RpcSubscriptions, @@ -14,7 +15,6 @@ use { collections::HashMap, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{channel, Receiver, RecvTimeoutError, Sender}, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -63,7 +63,7 @@ impl AggregateCommitmentService { let (sender, receiver): ( Sender, Receiver, - ) = channel(); + ) = unbounded(); let exit_ = exit.clone(); ( sender, diff --git a/core/src/cost_update_service.rs b/core/src/cost_update_service.rs index 55fb0e738d..662348b949 100644 --- a/core/src/cost_update_service.rs +++ b/core/src/cost_update_service.rs @@ -4,6 +4,7 @@ //! table to blockstore. use { + crossbeam_channel::Receiver, solana_ledger::blockstore::Blockstore, solana_measure::measure::Measure, solana_program_runtime::timings::ExecuteTimings, @@ -12,7 +13,6 @@ use { std::{ sync::{ atomic::{AtomicBool, Ordering}, - mpsc::Receiver, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, diff --git a/core/src/drop_bank_service.rs b/core/src/drop_bank_service.rs index 8359bf1e63..aac1a02ee0 100644 --- a/core/src/drop_bank_service.rs +++ b/core/src/drop_bank_service.rs @@ -1,8 +1,9 @@ use { + crossbeam_channel::Receiver, solana_measure::measure::Measure, solana_runtime::bank::Bank, std::{ - sync::{mpsc::Receiver, Arc}, + sync::Arc, thread::{self, Builder, JoinHandle}, }, }; diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index a560a5c281..f70be823fc 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -5,6 +5,7 @@ use { banking_stage::HOLD_TRANSACTIONS_SLOT_OFFSET, result::{Error, Result}, }, + crossbeam_channel::{unbounded, RecvTimeoutError}, solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, solana_poh::poh_recorder::PohRecorder, @@ -15,11 +16,7 @@ use { solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, std::{ net::UdpSocket, - sync::{ - atomic::AtomicBool, - mpsc::{channel, RecvTimeoutError}, - Arc, Mutex, - }, + sync::{atomic::AtomicBool, Arc, Mutex}, thread::{self, Builder, JoinHandle}, }, }; @@ -38,8 +35,8 @@ impl FetchStage { poh_recorder: &Arc>, coalesce_ms: u64, ) -> (Self, PacketBatchReceiver, PacketBatchReceiver) { - let (sender, receiver) = channel(); - let (vote_sender, vote_receiver) = channel(); + let (sender, receiver) = unbounded(); + let (vote_sender, vote_receiver) = unbounded(); ( Self::new_with_sender( sockets, @@ -147,7 +144,7 @@ impl FetchStage { ) }); - let (forward_sender, forward_receiver) = channel(); + let (forward_sender, forward_receiver) = unbounded(); let tpu_forwards_threads = tpu_forwards_sockets.into_iter().map(|socket| { streamer::receiver( socket, diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 841609d3cc..d3347ee31d 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -1,6 +1,7 @@ //! The `ledger_cleanup_service` drops older ledger data to limit disk space usage use { + crossbeam_channel::{Receiver, RecvTimeoutError}, rand::{thread_rng, Rng}, solana_ledger::{ blockstore::{Blockstore, PurgeType}, @@ -12,7 +13,6 @@ use { string::ToString, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc::{Receiver, RecvTimeoutError}, Arc, }, thread::{self, sleep, Builder, JoinHandle}, @@ -312,8 +312,8 @@ impl LedgerCleanupService { mod tests { use { super::*, + crossbeam_channel::unbounded, solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path}, - std::sync::mpsc::channel, }; #[test] @@ -324,7 +324,7 @@ mod tests { let (shreds, _) = make_many_slot_entries(0, 50, 5); blockstore.insert_shreds(shreds, None, false).unwrap(); let blockstore = Arc::new(blockstore); - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); //send a signal to kill all but 5 shreds, which will be in the newest slots let mut last_purge_slot = 0; @@ -371,7 +371,7 @@ mod tests { let mut blockstore = Blockstore::open(&blockstore_path).unwrap(); blockstore.set_no_compaction(true); let blockstore = Arc::new(blockstore); - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let mut first_insert = Measure::start("first_insert"); let initial_slots = 50; diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index eef711d1db..40903fc5bd 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -3,6 +3,7 @@ //! how transactions are included in blocks, and optimize those blocks. //! use { + crossbeam_channel::{unbounded, Receiver, Sender}, solana_measure::measure::Measure, solana_runtime::{ bank::Bank, @@ -16,7 +17,6 @@ use { std::{ sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc::{channel, Receiver, Sender}, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -59,7 +59,7 @@ impl Drop for QosService { impl QosService { pub fn new(cost_model: Arc>, id: u32) -> Self { - let (report_sender, report_receiver) = channel(); + let (report_sender, report_receiver) = unbounded(); let running_flag = Arc::new(AtomicBool::new(true)); let metrics = Arc::new(QosServiceMetrics::new(id)); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index aa6d364241..40784497ac 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -26,6 +26,7 @@ use { voting_service::VoteOp, window_service::DuplicateSlotReceiver, }, + crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, solana_accountsdb_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock, solana_client::rpc_response::SlotUpdate, solana_entry::entry::VerifyRecyclers, @@ -67,7 +68,6 @@ use { result, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{Receiver, RecvTimeoutError, Sender}, Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -3011,7 +3011,7 @@ pub mod tests { std::{ fs::remove_dir_all, iter, - sync::{atomic::AtomicU64, mpsc::channel, Arc, RwLock}, + sync::{atomic::AtomicU64, Arc, RwLock}, }, trees::{tr, Tree}, }; @@ -3289,7 +3289,7 @@ pub mod tests { .into_iter() .map(|slot| (slot, Hash::default())) .collect(); - let (bank_drop_sender, _bank_drop_receiver) = channel(); + let (bank_drop_sender, _bank_drop_receiver) = unbounded(); ReplayStage::handle_new_root( root, &bank_forks, @@ -3370,7 +3370,7 @@ pub mod tests { for i in 0..=root { progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0)); } - let (bank_drop_sender, _bank_drop_receiver) = channel(); + let (bank_drop_sender, _bank_drop_receiver) = unbounded(); ReplayStage::handle_new_root( root, &bank_forks, @@ -5707,7 +5707,7 @@ pub mod tests { let my_vote_pubkey = my_vote_keypair[0].pubkey(); let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); - let (voting_sender, voting_receiver) = channel(); + let (voting_sender, voting_receiver) = unbounded(); // Simulate landing a vote for slot 0 landing in slot 1 let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); diff --git a/core/src/result.rs b/core/src/result.rs index dbe32850c9..6c9b66b6d4 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -8,12 +8,10 @@ use { #[derive(Debug)] pub enum Error { Io(std::io::Error), - Recv(std::sync::mpsc::RecvError), - CrossbeamRecvTimeout(crossbeam_channel::RecvTimeoutError), + Recv(crossbeam_channel::RecvError), ReadyTimeout, - RecvTimeout(std::sync::mpsc::RecvTimeoutError), - CrossbeamSend, - TryCrossbeamSend, + RecvTimeout(crossbeam_channel::RecvTimeoutError), + TrySend, Serialize(std::boxed::Box), ClusterInfo(cluster_info::ClusterInfoError), Send, @@ -32,23 +30,18 @@ impl std::fmt::Display for Error { impl std::error::Error for Error {} -impl std::convert::From for Error { - fn from(e: std::sync::mpsc::RecvError) -> Error { +impl std::convert::From for Error { + fn from(e: crossbeam_channel::RecvError) -> Error { Error::Recv(e) } } -impl std::convert::From for Error { - fn from(e: crossbeam_channel::RecvTimeoutError) -> Error { - Error::CrossbeamRecvTimeout(e) - } -} impl std::convert::From for Error { fn from(_e: crossbeam_channel::ReadyTimeoutError) -> Error { Error::ReadyTimeout } } -impl std::convert::From for Error { - fn from(e: std::sync::mpsc::RecvTimeoutError) -> Error { +impl std::convert::From for Error { + fn from(e: crossbeam_channel::RecvTimeoutError) -> Error { Error::RecvTimeout(e) } } @@ -57,18 +50,13 @@ impl std::convert::From for Error { Error::ClusterInfo(e) } } -impl std::convert::From> for Error { - fn from(_e: crossbeam_channel::SendError) -> Error { - Error::CrossbeamSend - } -} impl std::convert::From> for Error { fn from(_e: crossbeam_channel::TrySendError) -> Error { - Error::TryCrossbeamSend + Error::TrySend } } -impl std::convert::From> for Error { - fn from(_e: std::sync::mpsc::SendError) -> Error { +impl std::convert::From> for Error { + fn from(_e: crossbeam_channel::SendError) -> Error { Error::Send } } @@ -102,16 +90,12 @@ impl std::convert::From for Error { mod tests { use { crate::result::{Error, Result}, - std::{ - io, - io::Write, - panic, - sync::mpsc::{channel, RecvError, RecvTimeoutError}, - }, + crossbeam_channel::{unbounded, RecvError, RecvTimeoutError}, + std::{io, io::Write, panic}, }; fn send_error() -> Result<()> { - let (s, r) = channel(); + let (s, r) = unbounded(); drop(r); s.send(())?; Ok(()) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index da678dac8e..383e4908ec 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -13,7 +13,7 @@ use { repair_service::{DuplicateSlotsResetSender, RepairInfo}, window_service::{should_retransmit_and_persist, WindowService}, }, - crossbeam_channel::{Receiver, Sender}, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, lru::LruCache, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_client::rpc_response::SlotUpdate, @@ -39,7 +39,6 @@ use { ops::{AddAssign, DerefMut}, sync::{ atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - mpsc::{self, channel, RecvTimeoutError}, Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -216,7 +215,7 @@ fn retransmit( bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, cluster_info: &ClusterInfo, - shreds_receiver: &mpsc::Receiver>, + shreds_receiver: &Receiver>, sockets: &[UdpSocket], stats: &mut RetransmitStats, cluster_nodes_cache: &ClusterNodesCache, @@ -369,7 +368,7 @@ pub fn retransmitter( bank_forks: Arc>, leader_schedule_cache: Arc, cluster_info: Arc, - shreds_receiver: mpsc::Receiver>, + shreds_receiver: Receiver>, max_slots: Arc, rpc_subscriptions: Option>, ) -> JoinHandle<()> { @@ -450,9 +449,7 @@ impl RetransmitStage { duplicate_slots_sender: Sender, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> Self { - let (retransmit_sender, retransmit_receiver) = channel(); - // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 - let _retransmit_sender = retransmit_sender.clone(); + let (retransmit_sender, retransmit_receiver) = unbounded(); let retransmit_thread_handle = retransmitter( retransmit_sockets, @@ -553,7 +550,7 @@ mod tests { full_leader_cache: true, ..ProcessOptions::default() }; - let (accounts_package_sender, _) = channel(); + let (accounts_package_sender, _) = unbounded(); let (bank_forks, cached_leader_schedule, _) = process_blockstore( &genesis_config, &blockstore, @@ -597,7 +594,7 @@ mod tests { let retransmit_socket = Arc::new(vec![UdpSocket::bind("0.0.0.0:0").unwrap()]); let cluster_info = Arc::new(cluster_info); - let (retransmit_sender, retransmit_receiver) = channel(); + let (retransmit_sender, retransmit_receiver) = unbounded(); let _retransmit_sender = retransmit_sender.clone(); let _t_retransmit = retransmitter( retransmit_socket, diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index 25a9156099..329d9e9e0f 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -1,15 +1,12 @@ use { crate::serve_repair::ServeRepair, + crossbeam_channel::{unbounded, Sender}, solana_ledger::blockstore::Blockstore, solana_perf::recycler::Recycler, solana_streamer::{socket::SocketAddrSpace, streamer}, std::{ net::UdpSocket, - sync::{ - atomic::AtomicBool, - mpsc::{channel, Sender}, - Arc, RwLock, - }, + sync::{atomic::AtomicBool, Arc, RwLock}, thread::{self, JoinHandle}, }, }; @@ -27,7 +24,7 @@ impl ServeRepairService { stats_reporter_sender: Sender>, exit: &Arc, ) -> Self { - let (request_sender, request_receiver) = channel(); + let (request_sender, request_receiver) = unbounded(); let serve_repair_socket = Arc::new(serve_repair_socket); trace!( "ServeRepairService: id: {}, listening on: {:?}", @@ -43,7 +40,7 @@ impl ServeRepairService { 1, false, ); - let (response_sender, response_receiver) = channel(); + let (response_sender, response_receiver) = unbounded(); let t_responder = streamer::responder( "serve-repairs", serve_repair_socket, diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index d231ae8b93..ff3db68b13 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -2,6 +2,7 @@ use { crate::packet_hasher::PacketHasher, + crossbeam_channel::unbounded, lru::LruCache, solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}, solana_perf::{ @@ -14,7 +15,7 @@ use { solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, std::{ net::UdpSocket, - sync::{atomic::AtomicBool, mpsc::channel, Arc, RwLock}, + sync::{atomic::AtomicBool, Arc, RwLock}, thread::{self, Builder, JoinHandle}, time::Instant, }, @@ -142,7 +143,7 @@ impl ShredFetchStage { where F: Fn(&mut Packet) + Send + 'static, { - let (packet_sender, packet_receiver) = channel(); + let (packet_sender, packet_receiver) = unbounded(); let streamers = sockets .into_iter() .map(|s| { diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 62f190de8a..a899595a64 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -7,14 +7,13 @@ use { crate::sigverify, - crossbeam_channel::{SendError, Sender as CrossbeamSender}, + crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, solana_sdk::timing, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, std::{ collections::HashMap, - sync::mpsc::{Receiver, RecvTimeoutError}, thread::{self, Builder, JoinHandle}, time::Instant, }, @@ -132,7 +131,7 @@ impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] pub fn new( packet_receiver: Receiver, - verified_sender: CrossbeamSender>, + verified_sender: Sender>, verifier: T, ) -> Self { let thread_hdl = Self::verifier_services(packet_receiver, verified_sender, verifier); @@ -172,7 +171,7 @@ impl SigVerifyStage { fn verifier( recvr: &PacketBatchReceiver, - sendr: &CrossbeamSender>, + sendr: &Sender>, verifier: &T, stats: &mut SigVerifierStats, ) -> Result<()> { @@ -219,7 +218,7 @@ impl SigVerifyStage { fn verifier_service( packet_receiver: PacketBatchReceiver, - verified_sender: CrossbeamSender>, + verified_sender: Sender>, verifier: &T, ) -> JoinHandle<()> { let verifier = verifier.clone(); @@ -255,7 +254,7 @@ impl SigVerifyStage { fn verifier_services( packet_receiver: PacketBatchReceiver, - verified_sender: CrossbeamSender>, + verified_sender: Sender>, verifier: T, ) -> JoinHandle<()> { Self::verifier_service(packet_receiver, verified_sender, &verifier) diff --git a/core/src/stats_reporter_service.rs b/core/src/stats_reporter_service.rs index 453b7c0d6e..b6f23e4162 100644 --- a/core/src/stats_reporter_service.rs +++ b/core/src/stats_reporter_service.rs @@ -1,12 +1,14 @@ -use std::{ - result::Result, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{Receiver, RecvTimeoutError}, - Arc, +use { + crossbeam_channel::{Receiver, RecvTimeoutError}, + std::{ + result::Result, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, }, - thread::{self, Builder, JoinHandle}, - time::Duration, }; pub struct StatsReporterService { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index d2d3b7e732..0309212f68 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -14,7 +14,7 @@ use { sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, }, - crossbeam_channel::unbounded, + crossbeam_channel::{unbounded, Receiver}, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender}, solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, @@ -29,11 +29,7 @@ use { }, std::{ net::UdpSocket, - sync::{ - atomic::AtomicBool, - mpsc::{channel, Receiver}, - Arc, Mutex, RwLock, - }, + sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, thread, }, }; @@ -88,8 +84,8 @@ impl Tpu { broadcast: broadcast_sockets, } = sockets; - let (packet_sender, packet_receiver) = channel(); - let (vote_packet_sender, vote_packet_receiver) = channel(); + let (packet_sender, packet_receiver) = unbounded(); + let (vote_packet_sender, vote_packet_receiver) = unbounded(); let fetch_stage = FetchStage::new_with_sender( transactions_sockets, tpu_forwards_sockets, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index b78150f44c..b9029d3bcd 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -25,7 +25,7 @@ use { tower_storage::TowerStorage, voting_service::VotingService, }, - crossbeam_channel::unbounded, + crossbeam_channel::{unbounded, Receiver}, solana_accountsdb_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ @@ -56,11 +56,7 @@ use { boxed::Box, collections::HashSet, net::UdpSocket, - sync::{ - atomic::AtomicBool, - mpsc::{channel, Receiver}, - Arc, Mutex, RwLock, - }, + sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, thread, }, }; @@ -154,7 +150,7 @@ impl Tvu { ancestor_hashes_requests: ancestor_hashes_socket, } = sockets; - let (fetch_sender, fetch_receiver) = channel(); + let (fetch_sender, fetch_receiver) = unbounded(); let repair_socket = Arc::new(repair_socket); let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket); @@ -210,7 +206,7 @@ impl Tvu { ancestor_hashes_replay_update_receiver, ); - let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); + let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = unbounded(); let snapshot_interval_slots = { if let Some(config) = bank_forks.read().unwrap().snapshot_config() { @@ -295,7 +291,7 @@ impl Tvu { tower_storage: tower_storage.clone(), }; - let (voting_sender, voting_receiver) = channel(); + let (voting_sender, voting_receiver) = unbounded(); let voting_service = VotingService::new( voting_receiver, cluster_info.clone(), @@ -304,7 +300,7 @@ impl Tvu { bank_forks.clone(), ); - let (cost_update_sender, cost_update_receiver) = channel(); + let (cost_update_sender, cost_update_receiver) = unbounded(); let cost_update_service = CostUpdateService::new( exit.clone(), blockstore.clone(), @@ -312,7 +308,7 @@ impl Tvu { cost_update_receiver, ); - let (drop_bank_sender, drop_bank_receiver) = channel(); + let (drop_bank_sender, drop_bank_receiver) = unbounded(); let drop_bank_service = DropBankService::new(drop_bank_receiver); let replay_stage = ReplayStage::new( @@ -456,7 +452,7 @@ pub mod tests { let (_, gossip_confirmed_slots_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let tower = Tower::default(); - let accounts_package_channel = channel(); + let accounts_package_channel = unbounded(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let tvu = Tvu::new( &vote_keypair.pubkey(), diff --git a/core/src/validator.rs b/core/src/validator.rs index 98fdfc2b17..f1eee5326f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -20,7 +20,7 @@ use { tpu::{Tpu, TpuSockets, DEFAULT_TPU_COALESCE_MS}, tvu::{Tvu, TvuConfig, TvuSockets}, }, - crossbeam_channel::{bounded, unbounded}, + crossbeam_channel::{bounded, unbounded, Receiver}, rand::{thread_rng, Rng}, solana_accountsdb_plugin_manager::accountsdb_plugin_service::AccountsDbPluginService, solana_entry::poh::compute_hash_time_ns, @@ -100,7 +100,6 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc::{channel, Receiver}, Arc, Mutex, RwLock, }, thread::{sleep, Builder, JoinHandle}, @@ -411,7 +410,7 @@ impl Validator { .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed))); } - let accounts_package_channel = channel(); + let accounts_package_channel = unbounded(); let accounts_update_notifier = accountsdb_plugin_service @@ -707,9 +706,7 @@ impl Validator { )), }; - let (stats_reporter_sender, stats_reporter_receiver) = channel(); - // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 - let _stats_reporter_sender = stats_reporter_sender.clone(); + let (stats_reporter_sender, stats_reporter_receiver) = unbounded(); let stats_reporter_service = StatsReporterService::new(stats_reporter_receiver, &exit); diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index 191d23039e..ff8eefe48b 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -283,7 +283,7 @@ mod tests { // No new messages, should time out assert_matches!( verified_vote_packets.receive_and_process_vote_packets(&r, true), - Err(Error::CrossbeamRecvTimeout(_)) + Err(Error::RecvTimeout(_)) ); } diff --git a/core/src/vote_simulator.rs b/core/src/vote_simulator.rs index c3cb9286a9..54cb86de4c 100644 --- a/core/src/vote_simulator.rs +++ b/core/src/vote_simulator.rs @@ -13,6 +13,7 @@ use { replay_stage::{HeaviestForkFailures, ReplayStage}, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, }, + crossbeam_channel::unbounded, solana_runtime::{ accounts_background_service::AbsRequestSender, bank::Bank, @@ -204,7 +205,7 @@ impl VoteSimulator { } pub fn set_root(&mut self, new_root: Slot) { - let (drop_bank_sender, _drop_bank_receiver) = std::sync::mpsc::channel(); + let (drop_bank_sender, _drop_bank_receiver) = unbounded(); ReplayStage::handle_new_root( new_root, &self.bank_forks, diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 647612484f..b2b3974807 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -1,12 +1,13 @@ use { crate::tower_storage::{SavedTower, TowerStorage}, + crossbeam_channel::Receiver, solana_gossip::cluster_info::ClusterInfo, solana_measure::measure::Measure, solana_poh::poh_recorder::PohRecorder, solana_runtime::bank_forks::BankForks, solana_sdk::{clock::Slot, transaction::Transaction}, std::{ - sync::{mpsc::Receiver, Arc, Mutex, RwLock}, + sync::{Arc, Mutex, RwLock}, thread::{self, Builder, JoinHandle}, }, }; diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 7a3e2c5cb0..39576d269c 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -10,9 +10,7 @@ use { repair_service::{OutstandingShredRepairs, RepairInfo, RepairService}, result::{Error, Result}, }, - crossbeam_channel::{ - unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, - }, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, rayon::{prelude::*, ThreadPool}, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ @@ -32,7 +30,6 @@ use { net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::Sender, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -40,8 +37,8 @@ use { }, }; -type DuplicateSlotSender = CrossbeamSender; -pub(crate) type DuplicateSlotReceiver = CrossbeamReceiver; +type DuplicateSlotSender = Sender; +pub(crate) type DuplicateSlotReceiver = Receiver; #[derive(Default)] struct WindowServiceMetrics { @@ -97,8 +94,8 @@ impl WindowServiceMetrics { fn record_error(&mut self, err: &Error) { self.num_errors += 1; match err { - Error::TryCrossbeamSend => self.num_errors_try_crossbeam_send += 1, - Error::CrossbeamRecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1, + Error::TrySend => self.num_errors_try_crossbeam_send += 1, + Error::RecvTimeout(_) => self.num_errors_cross_beam_recv_timeout += 1, Error::Blockstore(err) => { self.num_errors_blockstore += 1; error!("blockstore error: {}", err); @@ -211,7 +208,7 @@ pub(crate) fn should_retransmit_and_persist( fn run_check_duplicate( cluster_info: &ClusterInfo, blockstore: &Blockstore, - shred_receiver: &CrossbeamReceiver, + shred_receiver: &Receiver, duplicate_slot_sender: &DuplicateSlotSender, ) -> Result<()> { let check_duplicate = |shred: Shred| -> Result<()> { @@ -287,7 +284,7 @@ fn prune_shreds_invalid_repair( } fn run_insert( - shred_receiver: &CrossbeamReceiver<(Vec, Vec>)>, + shred_receiver: &Receiver<(Vec, Vec>)>, blockstore: &Blockstore, leader_schedule_cache: &LeaderScheduleCache, handle_duplicate: F, @@ -345,8 +342,8 @@ where fn recv_window( blockstore: &Blockstore, bank_forks: &RwLock, - insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, - verified_receiver: &CrossbeamReceiver>, + insert_shred_sender: &Sender<(Vec, Vec>)>, + verified_receiver: &Receiver>, retransmit_sender: &Sender>, shred_filter: F, thread_pool: &ThreadPool, @@ -451,7 +448,7 @@ impl WindowService { #[allow(clippy::too_many_arguments)] pub(crate) fn new( blockstore: Arc, - verified_receiver: CrossbeamReceiver>, + verified_receiver: Receiver>, retransmit_sender: Sender>, repair_socket: Arc, ancestor_hashes_socket: Arc, @@ -532,7 +529,7 @@ impl WindowService { cluster_info: Arc, exit: Arc, blockstore: Arc, - duplicate_receiver: CrossbeamReceiver, + duplicate_receiver: Receiver, duplicate_slot_sender: DuplicateSlotSender, ) -> JoinHandle<()> { let handle_error = || { @@ -564,8 +561,8 @@ impl WindowService { exit: Arc, blockstore: Arc, leader_schedule_cache: Arc, - insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, - check_duplicate_sender: CrossbeamSender, + insert_receiver: Receiver<(Vec, Vec>)>, + check_duplicate_sender: Sender, completed_data_sets_sender: CompletedDataSetsSender, retransmit_sender: Sender>, outstanding_requests: Arc>, @@ -623,8 +620,8 @@ impl WindowService { id: Pubkey, exit: Arc, blockstore: Arc, - insert_sender: CrossbeamSender<(Vec, Vec>)>, - verified_receiver: CrossbeamReceiver>, + insert_sender: Sender<(Vec, Vec>)>, + verified_receiver: Receiver>, shred_filter: F, bank_forks: Arc>, retransmit_sender: Sender>, @@ -688,12 +685,12 @@ impl WindowService { H: Fn(), { match e { - Error::CrossbeamRecvTimeout(RecvTimeoutError::Disconnected) => true, - Error::CrossbeamRecvTimeout(RecvTimeoutError::Timeout) => { + Error::RecvTimeout(RecvTimeoutError::Disconnected) => true, + Error::RecvTimeout(RecvTimeoutError::Timeout) => { handle_timeout(); false } - Error::CrossbeamSend => true, + Error::Send => true, _ => { handle_error(); error!("thread {:?} error {:?}", thread::current().name(), e); diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index 24e127341b..785ab07128 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -4,6 +4,7 @@ #[cfg(test)] mod tests { use { + crossbeam_channel::unbounded, log::*, solana_core::ledger_cleanup_service::LedgerCleanupService, solana_ledger::{ @@ -17,7 +18,6 @@ mod tests { str::FromStr, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc::channel, Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -309,7 +309,7 @@ mod tests { let num_batches = benchmark_slots / batch_size_slots; let num_shreds_total = benchmark_slots * shreds_per_slot; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let exit = Arc::new(AtomicBool::new(false)); let cleaner = if cleanup_service { @@ -587,7 +587,7 @@ mod tests { let u1 = blockstore.storage_size().unwrap() as f64; // send signal to cleanup slots - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); sender.send(n).unwrap(); let mut last_purge_slot = 0; let highest_compact_slot = Arc::new(AtomicU64::new(0)); diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index cab0f0887b..9f9af147ff 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -91,7 +91,6 @@ mod tests { path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::channel, Arc, RwLock, }, time::Duration, @@ -248,7 +247,7 @@ mod tests { let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; let (s, snapshot_request_receiver) = unbounded(); - let (accounts_package_sender, _r) = channel(); + let (accounts_package_sender, _r) = unbounded(); let request_sender = AbsRequestSender::new(Some(s)); let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), @@ -368,8 +367,8 @@ mod tests { .unwrap(); // Set up snapshotting channels - let (sender, receiver) = channel(); - let (fake_sender, _fake_receiver) = channel(); + let (sender, receiver) = unbounded(); + let (fake_sender, _fake_receiver) = unbounded(); // Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted // and the snapshot purging logic will run on every snapshot taken. This means the three @@ -671,7 +670,7 @@ mod tests { let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); - let (accounts_package_sender, _accounts_package_receiver) = channel(); + let (accounts_package_sender, _accounts_package_receiver) = unbounded(); let request_sender = AbsRequestSender::new(Some(snapshot_request_sender)); let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), @@ -895,7 +894,7 @@ mod tests { let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); - let (accounts_package_sender, accounts_package_receiver) = channel(); + let (accounts_package_sender, accounts_package_receiver) = unbounded(); let pending_snapshot_package = PendingSnapshotPackage::default(); let bank_forks = Arc::new(RwLock::new(snapshot_test_config.bank_forks)); diff --git a/entry/Cargo.toml b/entry/Cargo.toml index f86823e1dc..7d401d752a 100644 --- a/entry/Cargo.toml +++ b/entry/Cargo.toml @@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-poh" edition = "2021" [dependencies] +crossbeam-channel = "0.5" dlopen = "0.1.8" dlopen_derive = "0.1.4" log = "0.4.11" diff --git a/entry/src/entry.rs b/entry/src/entry.rs index f02354225d..1a45e86023 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -4,6 +4,7 @@ //! represents an approximate amount of time since the last Entry was created. use { crate::poh::Poh, + crossbeam_channel::{Receiver, Sender}, dlopen::symbor::{Container, SymBorApi, Symbol}, dlopen_derive::SymBorApi, log::*, @@ -34,10 +35,7 @@ use { cell::RefCell, cmp, ffi::OsStr, - sync::{ - mpsc::{Receiver, Sender}, - Arc, Mutex, Once, - }, + sync::{Arc, Mutex, Once}, thread::{self, JoinHandle}, time::Instant, }, diff --git a/faucet/Cargo.toml b/faucet/Cargo.toml index 8c662bf665..a8cf021c46 100644 --- a/faucet/Cargo.toml +++ b/faucet/Cargo.toml @@ -13,6 +13,7 @@ edition = "2021" bincode = "1.3.3" byteorder = "1.4.3" clap = "2.33" +crossbeam-channel = "0.5" log = "0.4.14" serde = "1.0.133" serde_derive = "1.0.103" diff --git a/faucet/src/faucet.rs b/faucet/src/faucet.rs index d57567357d..d440d34fff 100644 --- a/faucet/src/faucet.rs +++ b/faucet/src/faucet.rs @@ -7,6 +7,7 @@ use { bincode::{deserialize, serialize, serialized_size}, byteorder::{ByteOrder, LittleEndian}, + crossbeam_channel::{unbounded, Sender}, log::*, serde_derive::{Deserialize, Serialize}, solana_metrics::datapoint_info, @@ -25,7 +26,7 @@ use { collections::{HashMap, HashSet}, io::{Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, - sync::{mpsc::Sender, Arc, Mutex}, + sync::{Arc, Mutex}, thread, time::Duration, }, @@ -347,7 +348,7 @@ pub fn run_local_faucet_with_port( // For integration tests. Listens on random open port and reports port to Sender. pub fn run_local_faucet(faucet_keypair: Keypair, per_time_cap: Option) -> SocketAddr { - let (sender, receiver) = std::sync::mpsc::channel(); + let (sender, receiver) = unbounded(); run_local_faucet_with_port(faucet_keypair, sender, per_time_cap, 0); receiver .recv() diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index ea7a9759bc..2aadb98f76 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -13,6 +13,7 @@ documentation = "https://docs.rs/solana-gossip" bincode = "1.3.3" bv = { version = "0.11.1", features = ["serde"] } clap = "2.33.1" +crossbeam-channel = "0.5" flate2 = "1.0" indexmap = { version = "1.8", features = ["rayon"] } itertools = "0.10.3" diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 860e553f53..99372a5b8a 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -33,6 +33,7 @@ use { weighted_shuffle::WeightedShuffle, }, bincode::{serialize, serialized_size}, + crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, itertools::Itertools, rand::{seq::SliceRandom, thread_rng, CryptoRng, Rng}, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, @@ -85,7 +86,6 @@ use { result::Result, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{Receiver, RecvTimeoutError, Sender}, Arc, Mutex, RwLock, RwLockReadGuard, }, thread::{sleep, Builder, JoinHandle}, diff --git a/gossip/src/gossip_error.rs b/gossip/src/gossip_error.rs index df2be3a9c7..87db5ccfad 100644 --- a/gossip/src/gossip_error.rs +++ b/gossip/src/gossip_error.rs @@ -1,6 +1,7 @@ use { crate::duplicate_shred, - std::{io, sync}, + crossbeam_channel::{RecvTimeoutError, SendError}, + std::io, thiserror::Error, }; @@ -13,15 +14,15 @@ pub enum GossipError { #[error(transparent)] Io(#[from] io::Error), #[error(transparent)] - RecvTimeoutError(#[from] sync::mpsc::RecvTimeoutError), + RecvTimeoutError(#[from] RecvTimeoutError), #[error("send error")] SendError, #[error("serialization error")] Serialize(#[from] Box), } -impl std::convert::From> for GossipError { - fn from(_e: sync::mpsc::SendError) -> GossipError { +impl std::convert::From> for GossipError { + fn from(_e: SendError) -> GossipError { GossipError::SendError } } diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index db9e295b72..42b2ee2363 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -5,6 +5,7 @@ use { cluster_info::{ClusterInfo, VALIDATOR_PORT_RANGE}, contact_info::ContactInfo, }, + crossbeam_channel::{unbounded, Sender}, rand::{thread_rng, Rng}, solana_client::thin_client::{create_client, ThinClient}, solana_perf::recycler::Recycler, @@ -19,7 +20,6 @@ use { net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{channel, Sender}, Arc, RwLock, }, thread::{self, sleep, JoinHandle}, @@ -41,7 +41,7 @@ impl GossipService { stats_reporter_sender: Option>>, exit: &Arc, ) -> Self { - let (request_sender, request_receiver) = channel(); + let (request_sender, request_receiver) = unbounded(); let gossip_socket = Arc::new(gossip_socket); trace!( "GossipService: id: {}, listening on: {:?}", @@ -58,15 +58,13 @@ impl GossipService { 1, false, ); - let (consume_sender, listen_receiver) = channel(); - // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 - let _consume_sender = consume_sender.clone(); + let (consume_sender, listen_receiver) = unbounded(); let t_socket_consume = cluster_info.clone().start_socket_consume_thread( request_receiver, consume_sender, exit.clone(), ); - let (response_sender, response_receiver) = channel(); + let (response_sender, response_receiver) = unbounded(); let t_listen = cluster_info.clone().listen( bank_forks.clone(), listen_receiver, @@ -80,10 +78,6 @@ impl GossipService { gossip_validators, exit.clone(), ); - // To work around: - // https://github.com/rust-lang/rust/issues/54267 - // responder thread should start after response_sender.clone(). see: - // https://github.com/rust-lang/rust/issues/39364#issuecomment-381446873 let t_responder = streamer::responder( "gossip", gossip_socket, diff --git a/gossip/tests/cluster_info.rs b/gossip/tests/cluster_info.rs index 231c2399cd..9bdd9e88d8 100644 --- a/gossip/tests/cluster_info.rs +++ b/gossip/tests/cluster_info.rs @@ -1,5 +1,6 @@ #![allow(clippy::integer_arithmetic)] use { + crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError}, rayon::{iter::ParallelIterator, prelude::*}, serial_test::serial, solana_gossip::{ @@ -11,10 +12,7 @@ use { solana_streamer::socket::SocketAddrSpace, std::{ collections::{HashMap, HashSet}, - sync::{ - mpsc::{channel, Receiver, Sender, TryRecvError}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, time::Instant, }, }; @@ -90,7 +88,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { let mut staked_nodes = HashMap::new(); // setup accounts for all nodes (leader has 0 bal) - let (s, r) = channel(); + let (s, r) = unbounded(); let senders: Arc>>> = Arc::new(Mutex::new(HashMap::new())); senders.lock().unwrap().insert(leader_info.id, s); @@ -109,7 +107,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { let node = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); staked_nodes.insert(node.id, stakes[*i - 1]); cluster_info.insert_info(node.clone()); - let (s, r) = channel(); + let (s, r) = unbounded(); batches .get_mut(batch_ix) .unwrap() diff --git a/install/Cargo.toml b/install/Cargo.toml index ce32e042dc..e22f1598b5 100644 --- a/install/Cargo.toml +++ b/install/Cargo.toml @@ -17,6 +17,7 @@ chrono = { version = "0.4.11", features = ["serde"] } clap = { version = "2.33.1" } console = "0.15.0" ctrlc = { version = "3.2.1", features = ["termination"] } +crossbeam-channel = "0.5" dirs-next = "2.0.0" indicatif = "0.16.2" lazy_static = "1.4.0" diff --git a/install/src/command.rs b/install/src/command.rs index ad4b0341f2..3993a731eb 100644 --- a/install/src/command.rs +++ b/install/src/command.rs @@ -6,6 +6,7 @@ use { }, chrono::{Local, TimeZone}, console::{style, Emoji}, + crossbeam_channel::unbounded, indicatif::{ProgressBar, ProgressStyle}, serde::{Deserialize, Serialize}, solana_client::rpc_client::RpcClient, @@ -21,7 +22,6 @@ use { fs::{self, File}, io::{self, BufReader, Read}, path::{Path, PathBuf}, - sync::mpsc, time::{Duration, Instant, SystemTime}, }, tempfile::TempDir, @@ -1196,7 +1196,7 @@ pub fn run( let mut child_option: Option = None; let mut now = Instant::now(); - let (signal_sender, signal_receiver) = mpsc::channel(); + let (signal_sender, signal_receiver) = unbounded(); ctrlc::set_handler(move || { let _ = signal_sender.send(()); }) diff --git a/ledger-tool/Cargo.toml b/ledger-tool/Cargo.toml index 0ce2e8b648..20469c136a 100644 --- a/ledger-tool/Cargo.toml +++ b/ledger-tool/Cargo.toml @@ -12,6 +12,7 @@ documentation = "https://docs.rs/solana-ledger-tool" [dependencies] bs58 = "0.4.0" clap = "2.33.1" +crossbeam-channel = "0.5" csv = "1.1.6" dashmap = "5.0.0" histogram = "*" diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index c4e05e24af..5129a15a5b 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -4,6 +4,7 @@ use { crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand, }, + crossbeam_channel::unbounded, dashmap::DashMap, itertools::Itertools, log::*, @@ -72,7 +73,6 @@ use { str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::channel, Arc, RwLock, }, }, @@ -741,7 +741,7 @@ fn load_bank_forks( vec![non_primary_accounts_path] }; - let (accounts_package_sender, _) = channel(); + let (accounts_package_sender, _) = unbounded(); bank_forks_utils::load( genesis_config, blockstore, diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index 860a6348bf..f7ae973b86 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -1,5 +1,6 @@ use { crate::blockstore::Blockstore, + crossbeam_channel::bounded, log::*, solana_measure::measure::Measure, solana_sdk::clock::Slot, @@ -130,7 +131,7 @@ pub async fn upload_confirmed_blocks( let (_loader_thread, receiver) = { let exit = exit.clone(); - let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH); + let (sender, receiver) = bounded(BLOCK_READ_AHEAD_DEPTH); ( std::thread::spawn(move || { let mut measure = Measure::start("block loader thread"); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 2bf567e3ee..52965fe2f6 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -18,6 +18,7 @@ use { }, }, bincode::deserialize, + crossbeam_channel::{bounded, Receiver, Sender, TrySendError}, log::*, rayon::{ iter::{IntoParallelRefIterator, ParallelIterator}, @@ -57,7 +58,6 @@ use { rc::Rc, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{sync_channel, Receiver, Sender, SyncSender, TrySendError}, Arc, Mutex, RwLock, RwLockWriteGuard, }, time::Instant, @@ -92,7 +92,7 @@ pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_P // (32K shreds per slot * 4 TX per shred * 2.5 slots per sec) pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768; -pub type CompletedSlotsSender = SyncSender>; +pub type CompletedSlotsSender = Sender>; pub type CompletedSlotsReceiver = Receiver>; type CompletedRanges = Vec<(u32, u32)>; @@ -160,7 +160,7 @@ pub struct Blockstore { bank_hash_cf: LedgerColumn, last_root: Arc>, insert_shreds_lock: Arc>, - pub new_shreds_signals: Vec>, + pub new_shreds_signals: Vec>, pub completed_slots_senders: Vec, pub lowest_cleanup_slot: Arc>, no_compaction: bool, @@ -455,9 +455,9 @@ impl Blockstore { options: BlockstoreOptions, ) -> Result { let mut blockstore = Self::open_with_options(ledger_path, options)?; - let (ledger_signal_sender, ledger_signal_receiver) = sync_channel(1); + let (ledger_signal_sender, ledger_signal_receiver) = bounded(1); let (completed_slots_sender, completed_slots_receiver) = - sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); + bounded(MAX_COMPLETED_SLOTS_IN_CHANNEL); blockstore.new_shreds_signals = vec![ledger_signal_sender]; blockstore.completed_slots_senders = vec![completed_slots_sender]; @@ -3377,8 +3377,8 @@ fn is_valid_write_to_slot_0(slot_to_write: u64, parent_slot: Slot, last_root: u6 } fn send_signals( - new_shreds_signals: &[SyncSender], - completed_slots_senders: &[SyncSender>], + new_shreds_signals: &[Sender], + completed_slots_senders: &[Sender>], should_signal: bool, newly_completed_slots: Vec, ) { @@ -3430,7 +3430,7 @@ fn send_signals( /// newly completed. fn commit_slot_meta_working_set( slot_meta_working_set: &HashMap, - completed_slots_senders: &[SyncSender>], + completed_slots_senders: &[Sender>], write_batch: &mut WriteBatch, ) -> Result<(bool, Vec)> { let mut should_signal = false; @@ -4137,6 +4137,7 @@ pub mod tests { }, assert_matches::assert_matches, bincode::serialize, + crossbeam_channel::unbounded, itertools::Itertools, rand::{seq::SliceRandom, thread_rng}, solana_account_decoder::parse_token::UiTokenAmount, @@ -4152,7 +4153,7 @@ pub mod tests { }, solana_storage_proto::convert::generated, solana_transaction_status::{InnerInstructions, Reward, Rewards, TransactionTokenBalance}, - std::{sync::mpsc::channel, thread::Builder, time::Duration}, + std::{thread::Builder, time::Duration}, }; // used for tests only @@ -8958,9 +8959,9 @@ pub mod tests { let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); - let (slot_sender, slot_receiver) = channel(); - let (shred_sender, shred_receiver) = channel::>(); - let (signal_sender, signal_receiver) = channel(); + let (slot_sender, slot_receiver) = unbounded(); + let (shred_sender, shred_receiver) = unbounded::>(); + let (signal_sender, signal_receiver) = unbounded(); let t_entry_getter = { let blockstore = blockstore.clone(); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index a643649219..ef78f2c6dc 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1530,10 +1530,7 @@ pub mod tests { vote_state::{VoteState, VoteStateVersions, MAX_LOCKOUT_HISTORY}, vote_transaction, }, - std::{ - collections::BTreeSet, - sync::{mpsc::channel, RwLock}, - }, + std::{collections::BTreeSet, sync::RwLock}, tempfile::TempDir, trees::tr, }; @@ -1543,7 +1540,7 @@ pub mod tests { blockstore: &Blockstore, opts: ProcessOptions, ) -> BlockstoreProcessorInner { - let (accounts_package_sender, _) = channel(); + let (accounts_package_sender, _) = unbounded(); process_blockstore( genesis_config, blockstore, @@ -3119,7 +3116,7 @@ pub mod tests { bank1.squash(); // Test process_blockstore_from_root() from slot 1 onwards - let (accounts_package_sender, _) = channel(); + let (accounts_package_sender, _) = unbounded(); let (bank_forks, ..) = do_process_blockstore_from_root( &blockstore, bank1, @@ -3223,7 +3220,7 @@ pub mod tests { ..SnapshotConfig::default() }; - let (accounts_package_sender, accounts_package_receiver) = channel(); + let (accounts_package_sender, accounts_package_receiver) = unbounded(); do_process_blockstore_from_root( &blockstore, diff --git a/ledger/src/leader_schedule_cache.rs b/ledger/src/leader_schedule_cache.rs index 390969e58b..4560ab7cb8 100644 --- a/ledger/src/leader_schedule_cache.rs +++ b/ledger/src/leader_schedule_cache.rs @@ -261,6 +261,7 @@ mod tests { get_tmp_ledger_path_auto_delete, staking_utils::tests::setup_vote_and_stake_accounts, }, + crossbeam_channel::unbounded, solana_runtime::bank::Bank, solana_sdk::{ clock::NUM_CONSECUTIVE_LEADER_SLOTS, @@ -270,10 +271,7 @@ mod tests { }, signature::{Keypair, Signer}, }, - std::{ - sync::{mpsc::channel, Arc}, - thread::Builder, - }, + std::{sync::Arc, thread::Builder}, }; #[test] @@ -350,7 +348,7 @@ mod tests { .map(|_| { let cache = cache.clone(); let bank = bank.clone(); - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); ( Builder::new() .name("test_thread_race_leader_schedule_cache".to_string()) diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index ce0ea54d2c..1ca62f9d64 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-metrics" edition = "2021" [dependencies] +crossbeam-channel = "0.5" gethostname = "0.2.1" lazy_static = "1.4.0" log = "0.4.14" diff --git a/metrics/src/metrics.rs b/metrics/src/metrics.rs index db9bea65e3..4c334749b2 100644 --- a/metrics/src/metrics.rs +++ b/metrics/src/metrics.rs @@ -2,6 +2,7 @@ use { crate::{counter::CounterPoint, datapoint::DataPoint}, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, gethostname::gethostname, lazy_static::lazy_static, log::*, @@ -11,10 +12,7 @@ use { collections::HashMap, convert::Into, env, - sync::{ - mpsc::{channel, Receiver, RecvTimeoutError, Sender}, - Arc, Barrier, Mutex, Once, RwLock, - }, + sync::{Arc, Barrier, Mutex, Once, RwLock}, thread, time::{Duration, Instant, UNIX_EPOCH}, }, @@ -155,7 +153,7 @@ impl MetricsAgent { write_frequency: Duration, max_points_per_sec: usize, ) -> Self { - let (sender, receiver) = channel::(); + let (sender, receiver) = unbounded::(); thread::spawn(move || Self::run(&receiver, &writer, write_frequency, max_points_per_sec)); Self { sender } diff --git a/net-utils/Cargo.toml b/net-utils/Cargo.toml index cd3989d173..c3b29173fc 100644 --- a/net-utils/Cargo.toml +++ b/net-utils/Cargo.toml @@ -12,6 +12,7 @@ edition = "2021" [dependencies] bincode = "1.3.3" clap = "2.33.1" +crossbeam-channel = "0.5" log = "0.4.14" nix = "0.23.1" rand = "0.7.0" diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 67efad8aee..b63ab8534f 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -1,6 +1,7 @@ //! The `net_utils` module assists with networking #![allow(clippy::integer_arithmetic)] use { + crossbeam_channel::unbounded, log::*, rand::{thread_rng, Rng}, socket2::{Domain, SockAddr, Socket, Type}, @@ -8,7 +9,7 @@ use { collections::{BTreeMap, HashSet}, io::{self, Read, Write}, net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket}, - sync::{mpsc::channel, Arc, RwLock}, + sync::{Arc, RwLock}, time::{Duration, Instant}, }, url::Url, @@ -138,7 +139,7 @@ fn do_verify_reachable_ports( // Wait for a connection to open on each TCP port for (port, tcp_listener) in tcp_listeners { - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let listening_addr = tcp_listener.local_addr().unwrap(); let thread_handle = std::thread::spawn(move || { debug!("Waiting for incoming connection on tcp/{}", port); diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 611eaa5cce..49d07b1beb 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -13,9 +13,7 @@ pub use solana_sdk::clock::Slot; use { crate::poh_service::PohService, - crossbeam_channel::{ - unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, - }, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender}, log::*, solana_entry::{entry::Entry, poh::Poh}, solana_ledger::{ @@ -32,7 +30,6 @@ use { cmp, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{channel, Receiver, SendError, Sender, SyncSender}, Arc, Mutex, }, time::{Duration, Instant}, @@ -84,14 +81,14 @@ pub struct Record { pub mixin: Hash, pub transactions: Vec, pub slot: Slot, - pub sender: CrossbeamSender>, + pub sender: Sender>, } impl Record { pub fn new( mixin: Hash, transactions: Vec, slot: Slot, - sender: CrossbeamSender>, + sender: Sender>, ) -> Self { Self { mixin, @@ -104,7 +101,7 @@ impl Record { pub struct TransactionRecorder { // shared by all users of PohRecorder - pub record_sender: CrossbeamSender, + pub record_sender: Sender, pub is_exited: Arc, } @@ -115,7 +112,7 @@ impl Clone for TransactionRecorder { } impl TransactionRecorder { - pub fn new(record_sender: CrossbeamSender, is_exited: Arc) -> Self { + pub fn new(record_sender: Sender, is_exited: Arc) -> Self { Self { // shared record_sender, @@ -196,7 +193,7 @@ pub struct WorkingBank { pub struct PohRecorder { pub poh: Arc>, tick_height: u64, - clear_bank_signal: Option>, + clear_bank_signal: Option>, start_bank: Arc, // parent slot start_tick_height: u64, // first tick_height this recorder will observe tick_cache: Vec<(Entry, u64)>, // cache of entry and its tick_height @@ -222,7 +219,7 @@ pub struct PohRecorder { record_us: u64, ticks_from_record: u64, last_metric: Instant, - record_sender: CrossbeamSender, + record_sender: Sender, pub is_exited: Arc, } @@ -663,11 +660,11 @@ impl PohRecorder { ticks_per_slot: u64, id: &Pubkey, blockstore: &Arc, - clear_bank_signal: Option>, + clear_bank_signal: Option>, leader_schedule_cache: &Arc, poh_config: &Arc, is_exited: Arc, - ) -> (Self, Receiver, CrossbeamReceiver) { + ) -> (Self, Receiver, Receiver) { let tick_number = 0; let poh = Arc::new(Mutex::new(Poh::new_with_slot_info( last_entry_hash, @@ -680,7 +677,7 @@ impl PohRecorder { ticks_per_slot, poh_config.target_tick_duration.as_nanos() as u64, ); - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let (record_sender, record_receiver) = unbounded(); let (leader_first_tick_height, leader_last_tick_height, grace_ticks) = Self::compute_leader_slot_tick_heights(next_leader_slot, ticks_per_slot); @@ -737,7 +734,7 @@ impl PohRecorder { leader_schedule_cache: &Arc, poh_config: &Arc, is_exited: Arc, - ) -> (Self, Receiver, CrossbeamReceiver) { + ) -> (Self, Receiver, Receiver) { Self::new_with_clear_signal( tick_height, last_entry_hash, @@ -825,10 +822,10 @@ mod tests { use { super::*, bincode::serialize, + crossbeam_channel::bounded, solana_ledger::{blockstore::Blockstore, blockstore_meta::SlotMeta, get_tmp_ledger_path}, solana_perf::test_tx::test_tx, solana_sdk::{clock::DEFAULT_TICKS_PER_SLOT, hash::hash}, - std::sync::mpsc::sync_channel, }; #[test] @@ -1431,7 +1428,7 @@ mod tests { .expect("Expected to be able to open database ledger"); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new_for_tests(&genesis_config)); - let (sender, receiver) = sync_channel(1); + let (sender, receiver) = bounded(1); let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new_with_clear_signal( 0, diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index a1e64a761b..6c9fbbaf52 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -2639,6 +2639,7 @@ name = "solana-banks-server" version = "1.10.0" dependencies = [ "bincode", + "crossbeam-channel", "futures", "solana-banks-interface", "solana-runtime", @@ -3081,6 +3082,7 @@ dependencies = [ "bincode", "bs58 0.4.0", "clap", + "crossbeam-channel", "indicatif", "jsonrpc-core", "log", @@ -3132,6 +3134,7 @@ dependencies = [ "bincode", "byteorder 1.4.3", "clap", + "crossbeam-channel", "log", "serde", "serde_derive", @@ -3237,6 +3240,7 @@ dependencies = [ name = "solana-metrics" version = "1.10.0" dependencies = [ + "crossbeam-channel", "gethostname", "lazy_static", "log", @@ -3250,6 +3254,7 @@ version = "1.10.0" dependencies = [ "bincode", "clap", + "crossbeam-channel", "log", "nix", "rand 0.7.3", @@ -3555,6 +3560,7 @@ dependencies = [ name = "solana-send-transaction-service" version = "1.10.0" dependencies = [ + "crossbeam-channel", "log", "solana-metrics", "solana-runtime", diff --git a/rpc-test/Cargo.toml b/rpc-test/Cargo.toml index 92f29bbea5..1acebd603f 100644 --- a/rpc-test/Cargo.toml +++ b/rpc-test/Cargo.toml @@ -12,6 +12,7 @@ edition = "2021" [dependencies] bincode = "1.3.3" bs58 = "0.4.0" +crossbeam-channel = "0.5" jsonrpc-core = "18.0.0" jsonrpc-core-client = { version = "18.0.0", features = ["ipc", "ws"] } log = "0.4.11" diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index 1c9fb9fdcb..1b68e5a6d9 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -1,5 +1,6 @@ use { bincode::serialize, + crossbeam_channel::unbounded, jsonrpc_core::futures::StreamExt, jsonrpc_core_client::transports::ws, log::*, @@ -29,7 +30,7 @@ use { std::{ collections::HashSet, net::UdpSocket, - sync::{mpsc::channel, Arc}, + sync::Arc, thread::sleep, time::{Duration, Instant}, }, @@ -167,7 +168,7 @@ fn test_rpc_slot_updates() { // Create the pub sub runtime let rt = Runtime::new().unwrap(); let rpc_pubsub_url = test_validator.rpc_pubsub_url(); - let (update_sender, update_receiver) = channel::>(); + let (update_sender, update_receiver) = unbounded::>(); // Subscribe to slot updates rt.spawn(async move { @@ -257,11 +258,11 @@ fn test_rpc_subscriptions() { .collect(); // Track when subscriptions are ready - let (ready_sender, ready_receiver) = channel::<()>(); + let (ready_sender, ready_receiver) = unbounded::<()>(); // Track account notifications are received - let (account_sender, account_receiver) = channel::>(); + let (account_sender, account_receiver) = unbounded::>(); // Track when status notifications are received - let (status_sender, status_receiver) = channel::<(String, RpcResponse)>(); + let (status_sender, status_receiver) = unbounded::<(String, RpcResponse)>(); // Create the pub sub runtime let rt = Runtime::new().unwrap(); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 987980cb53..91746a46d7 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -6,6 +6,7 @@ use { parsed_token_accounts::*, rpc_health::*, }, bincode::{config::Options, serialize}, + crossbeam_channel::{unbounded, Receiver, Sender}, jsonrpc_core::{futures::future, types::error, BoxFuture, Error, Metadata, Result}, jsonrpc_derive::rpc, serde::{Deserialize, Serialize}, @@ -94,7 +95,6 @@ use { str::FromStr, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc::{channel, Receiver, Sender}, Arc, Mutex, RwLock, }, time::Duration, @@ -262,7 +262,7 @@ impl JsonRpcRequestProcessor { leader_schedule_cache: Arc, max_complete_transaction_status_slot: Arc, ) -> (Self, Receiver) { - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); ( Self { config, @@ -301,7 +301,7 @@ impl JsonRpcRequestProcessor { socket_addr_space, )); let tpu_address = cluster_info.my_contact_info().tpu; - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); SendTransactionService::new::( tpu_address, &bank_forks, @@ -4300,8 +4300,8 @@ pub fn create_test_transactions_and_populate_blockstore( blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.set_roots(std::iter::once(&slot)).unwrap(); - let (transaction_status_sender, transaction_status_receiver) = crossbeam_channel::unbounded(); - let (replay_vote_sender, _replay_vote_receiver) = crossbeam_channel::unbounded(); + let (transaction_status_sender, transaction_status_receiver) = unbounded(); + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let transaction_status_service = crate::transaction_status_service::TransactionStatusService::new( transaction_status_receiver, diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 2a3cdce90c..287b175963 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -11,6 +11,7 @@ use { }, rpc_health::*, }, + crossbeam_channel::unbounded, jsonrpc_core::{futures::prelude::*, MetaIoHandler}, jsonrpc_http_server::{ hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware, @@ -42,7 +43,6 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc::channel, Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -415,7 +415,7 @@ impl JsonRpcService { let ledger_path = ledger_path.to_path_buf(); - let (close_handle_sender, close_handle_receiver) = channel(); + let (close_handle_sender, close_handle_receiver) = unbounded(); let thread_hdl = Builder::new() .name("solana-jsonrpc".to_string()) .spawn(move || { diff --git a/runtime/src/bank_client.rs b/runtime/src/bank_client.rs index 8a8731810c..554fe59d46 100644 --- a/runtime/src/bank_client.rs +++ b/runtime/src/bank_client.rs @@ -1,5 +1,6 @@ use { crate::bank::Bank, + crossbeam_channel::{unbounded, Receiver, Sender}, solana_sdk::{ account::Account, client::{AsyncClient, Client, SyncClient}, @@ -19,10 +20,7 @@ use { std::{ convert::TryFrom, io, - sync::{ - mpsc::{channel, Receiver, Sender}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, thread::{sleep, Builder}, time::{Duration, Instant}, }, @@ -340,7 +338,7 @@ impl BankClient { } pub fn new_shared(bank: &Arc) -> Self { - let (transaction_sender, transaction_receiver) = channel(); + let (transaction_sender, transaction_receiver) = unbounded(); let transaction_sender = Mutex::new(transaction_sender); let thread_bank = bank.clone(); let bank = bank.clone(); diff --git a/runtime/src/snapshot_package.rs b/runtime/src/snapshot_package.rs index 59ffdbb64e..0547f554be 100644 --- a/runtime/src/snapshot_package.rs +++ b/runtime/src/snapshot_package.rs @@ -8,15 +8,13 @@ use { TMP_BANK_SNAPSHOT_PREFIX, }, }, + crossbeam_channel::{Receiver, SendError, Sender}, log::*, solana_sdk::{clock::Slot, genesis_config::ClusterType, hash::Hash}, std::{ fs, path::{Path, PathBuf}, - sync::{ - mpsc::{Receiver, SendError, Sender}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, }, tempfile::TempDir, }; diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index d99c14910b..72ee3e03ae 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -10,6 +10,7 @@ license = "Apache-2.0" edition = "2021" [dependencies] +crossbeam-channel = "0.5" log = "0.4.14" solana-metrics = { path = "../metrics", version = "=1.10.0" } solana-runtime = { path = "../runtime", version = "=1.10.0" } diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index ae498e8f9a..86508297d9 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -1,5 +1,6 @@ use { crate::tpu_info::TpuInfo, + crossbeam_channel::{Receiver, RecvTimeoutError}, log::*, solana_metrics::{datapoint_warn, inc_new_counter_info}, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -7,10 +8,7 @@ use { std::{ collections::HashMap, net::{SocketAddr, UdpSocket}, - sync::{ - mpsc::{Receiver, RecvTimeoutError}, - Arc, RwLock, - }, + sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }, @@ -327,11 +325,11 @@ mod test { use { super::*, crate::tpu_info::NullTpuInfo, + crossbeam_channel::unbounded, solana_sdk::{ account::AccountSharedData, genesis_config::create_genesis_config, nonce, pubkey::Pubkey, signature::Signer, system_program, system_transaction, }, - std::sync::mpsc::channel, }; #[test] @@ -339,7 +337,7 @@ mod test { let tpu_address = "127.0.0.1:0".parse().unwrap(); let bank = Bank::default_for_tests(); let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); let send_tranaction_service = SendTransactionService::new::( tpu_address, diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index bce936e59c..e371b035ce 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-streamer" edition = "2021" [dependencies] +crossbeam-channel = "0.5" histogram = "0.6.9" itertools = "0.10.3" log = "0.4.14" diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 2328ff784b..def53c6eb9 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -7,6 +7,7 @@ use { recvmmsg::NUM_RCVMMSGS, socket::SocketAddrSpace, }, + crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, histogram::Histogram, solana_sdk::{packet::Packet, timing::timestamp}, std::{ @@ -15,7 +16,6 @@ use { net::{IpAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, Arc, }, thread::{Builder, JoinHandle}, @@ -324,6 +324,7 @@ mod test { packet::{Packet, PacketBatch, PACKET_DATA_SIZE}, streamer::{receiver, responder}, }, + crossbeam_channel::unbounded, solana_perf::recycler::Recycler, std::{ io, @@ -331,7 +332,6 @@ mod test { net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::channel, Arc, }, time::Duration, @@ -366,7 +366,7 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let (s_reader, r_reader) = channel(); + let (s_reader, r_reader) = unbounded(); let t_receiver = receiver( Arc::new(read), &exit, @@ -377,7 +377,7 @@ mod test { true, ); let t_responder = { - let (s_responder, r_responder) = channel(); + let (s_responder, r_responder) = unbounded(); let t_responder = responder( "streamer_send_test", Arc::new(send), diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 4c65322181..97194e572b 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -15,6 +15,7 @@ chrono = { version = "0.4.11", features = ["serde"] } clap = "2.33.1" console = "0.15.0" core_affinity = "0.5.10" +crossbeam-channel = "0.5" fd-lock = "3.0.2" indicatif = "0.16.2" jsonrpc-core = "18.0.0" diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index 34bc7ec740..b019b08af5 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -1,5 +1,6 @@ use { clap::{crate_name, value_t, value_t_or_exit, values_t_or_exit, App, Arg}, + crossbeam_channel::unbounded, log::*, solana_clap_utils::{ input_parsers::{pubkey_of, pubkeys_of, value_of}, @@ -35,7 +36,7 @@ use { net::{IpAddr, Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, process::exit, - sync::{mpsc::channel, Arc, RwLock}, + sync::{Arc, RwLock}, time::{Duration, SystemTime, UNIX_EPOCH}, }, }; @@ -524,7 +525,7 @@ fn main() { let faucet_pubkey = faucet_keypair.pubkey(); if let Some(faucet_addr) = &faucet_addr { - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); run_local_faucet_with_port(faucet_keypair, sender, None, faucet_addr.port()); let _ = receiver.recv().expect("run faucet").unwrap_or_else(|err| { println!("Error: failed to start faucet: {}", err);