diff --git a/Cargo.lock b/Cargo.lock index 2dc077f08c..f85587b76f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3625,6 +3625,7 @@ dependencies = [ "solana-perf 1.1.0", "solana-sdk 1.1.0", "solana-storage-program 1.1.0", + "solana-streamer 1.1.0", "thiserror 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -3655,8 +3656,10 @@ dependencies = [ "solana-ledger 1.1.0", "solana-logger 1.1.0", "solana-measure 1.1.0", + "solana-perf 1.1.0", "solana-runtime 1.1.0", "solana-sdk 1.1.0", + "solana-streamer 1.1.0", ] [[package]] @@ -3692,9 +3695,9 @@ version = "1.1.0" dependencies = [ "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "solana-clap-utils 1.1.0", - "solana-core 1.1.0", "solana-logger 1.1.0", "solana-net-utils 1.1.0", + "solana-streamer 1.1.0", ] [[package]] @@ -3936,10 +3939,8 @@ dependencies = [ "jsonrpc-http-server 14.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-pubsub 14.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-ws-server 14.0.6 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "nix 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3969,6 +3970,7 @@ dependencies = [ "solana-sdk 1.1.0", "solana-stake-program 1.1.0", "solana-storage-program 1.1.0", + "solana-streamer 1.1.0", "solana-sys-tuner 1.1.0", "solana-vote-program 1.1.0", "solana-vote-signer 1.1.0", @@ -4595,6 +4597,21 @@ dependencies = [ "solana-sdk 1.1.0", ] +[[package]] +name = "solana-streamer" +version = "1.1.0" +dependencies = [ + "libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "nix 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", + "solana-logger 1.1.0", + "solana-measure 1.1.0", + "solana-metrics 1.1.0", + "solana-perf 1.1.0", + "solana-sdk 1.1.0", + "thiserror 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "solana-sys-tuner" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 1763e84eeb..f2700287ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "logger", "log-analyzer", "merkle-tree", + "streamer", "measure", "metrics", "net-shaper", diff --git a/archiver-lib/Cargo.toml b/archiver-lib/Cargo.toml index e659deae65..e6c032f2ca 100644 --- a/archiver-lib/Cargo.toml +++ b/archiver-lib/Cargo.toml @@ -29,6 +29,7 @@ solana-logger = { path = "../logger", version = "1.1.0" } solana-perf = { path = "../perf", version = "1.1.0" } solana-sdk = { path = "../sdk", version = "1.1.0" } solana-core = { path = "../core", version = "1.1.0" } +solana-streamer = { path = "../streamer", version = "1.1.0" } solana-archiver-utils = { path = "../archiver-utils", version = "1.1.0" } solana-metrics = { path = "../metrics", version = "1.1.0" } diff --git a/archiver-lib/src/archiver.rs b/archiver-lib/src/archiver.rs index 26a1ed030f..caa59d5cb7 100644 --- a/archiver-lib/src/archiver.rs +++ b/archiver-lib/src/archiver.rs @@ -12,14 +12,12 @@ use solana_core::{ cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE}, contact_info::ContactInfo, gossip_service::GossipService, - packet::{limited_deserialize, PACKET_DATA_SIZE}, repair_service, repair_service::{RepairService, RepairSlotRange, RepairStrategy}, serve_repair::ServeRepair, shred_fetch_stage::ShredFetchStage, sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, storage_stage::NUM_STORAGE_SAMPLES, - streamer::{receiver, responder, PacketReceiver}, window_service::WindowService, }; use solana_ledger::{ @@ -27,6 +25,7 @@ use solana_ledger::{ }; use solana_net_utils::bind_in_range; use solana_perf::packet::Packets; +use solana_perf::packet::{limited_deserialize, PACKET_DATA_SIZE}; use solana_perf::recycler::Recycler; use solana_sdk::packet::Packet; use solana_sdk::{ @@ -45,6 +44,7 @@ use solana_storage_program::{ storage_contract::StorageContract, storage_instruction::{self, StorageAccountType}, }; +use solana_streamer::streamer::{receiver, responder, PacketReceiver}; use std::{ io::{self, ErrorKind}, net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, diff --git a/banking-bench/Cargo.toml b/banking-bench/Cargo.toml index 08df7c9bb2..90125f01fc 100644 --- a/banking-bench/Cargo.toml +++ b/banking-bench/Cargo.toml @@ -11,6 +11,8 @@ homepage = "https://solana.com/" log = "0.4.6" rayon = "1.3.0" solana-core = { path = "../core", version = "1.1.0" } +solana-streamer = { path = "../streamer", version = "1.1.0" } +solana-perf = { path = "../perf", version = "1.1.0" } solana-ledger = { path = "../ledger", version = "1.1.0" } solana-logger = { path = "../logger", version = "1.1.0" } solana-runtime = { path = "../runtime", version = "1.1.0" } diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index c793f0d5a1..d8b777fdb8 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -6,12 +6,12 @@ use solana_core::banking_stage::{create_test_recorder, BankingStage}; use solana_core::cluster_info::ClusterInfo; use solana_core::cluster_info::Node; use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo}; -use solana_core::packet::to_packets_chunked; use solana_core::poh_recorder::PohRecorder; use solana_core::poh_recorder::WorkingBankEntry; use solana_ledger::bank_forks::BankForks; use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; use solana_measure::measure::Measure; +use solana_perf::packet::to_packets_chunked; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; diff --git a/bench-streamer/Cargo.toml b/bench-streamer/Cargo.toml index db89bb56c3..5e76c5515f 100644 --- a/bench-streamer/Cargo.toml +++ b/bench-streamer/Cargo.toml @@ -10,6 +10,6 @@ homepage = "https://solana.com/" [dependencies] clap = "2.33.0" solana-clap-utils = { path = "../clap-utils", version = "1.1.0" } -solana-core = { path = "../core", version = "1.1.0" } +solana-streamer = { path = "../streamer", version = "1.1.0" } solana-logger = { path = "../logger", version = "1.1.0" } solana-net-utils = { path = "../net-utils", version = "1.1.0" } diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 60eca32425..708e14d6fd 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -1,6 +1,6 @@ use clap::{crate_description, crate_name, App, Arg}; -use solana_core::packet::{Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE}; -use solana_core::streamer::{receiver, PacketReceiver}; +use solana_streamer::packet::{Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE}; +use solana_streamer::streamer::{receiver, PacketReceiver}; use std::cmp::max; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; diff --git a/core/Cargo.toml b/core/Cargo.toml index 869a14eeb6..4b760d7957 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -31,9 +31,7 @@ jsonrpc-derive = "14.0.5" jsonrpc-http-server = "14.0.6" jsonrpc-pubsub = "14.0.6" jsonrpc-ws-server = "14.0.6" -libc = "0.2.67" log = "0.4.8" -nix = "0.17.0" num_cpus = "1.0.0" num-traits = "0.2" rand = "0.6.5" @@ -60,6 +58,7 @@ solana-runtime = { path = "../runtime", version = "1.1.0" } solana-sdk = { path = "../sdk", version = "1.1.0" } solana-stake-program = { path = "../programs/stake", version = "1.1.0" } solana-storage-program = { path = "../programs/storage", version = "1.1.0" } +solana-streamer = { path = "../streamer", version = "1.1.0" } solana-vote-program = { path = "../programs/vote", version = "1.1.0" } solana-vote-signer = { path = "../vote-signer", version = "1.1.0" } solana-sys-tuner = { path = "../sys-tuner", version = "1.1.0" } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 03850085d1..e4ae27df96 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -10,11 +10,11 @@ use solana_core::banking_stage::{create_test_recorder, BankingStage}; use solana_core::cluster_info::ClusterInfo; use solana_core::cluster_info::Node; use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo}; -use solana_core::packet::to_packets_chunked; use solana_core::poh_recorder::WorkingBankEntry; use solana_ledger::blockstore_processor::process_entries; use solana_ledger::entry::{next_hash, Entry}; use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; +use solana_perf::packet::to_packets_chunked; use solana_perf::test_tx::test_tx; use solana_runtime::bank::Bank; use solana_sdk::genesis_config::GenesisConfig; diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index b4dac48247..14f034da41 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -7,11 +7,11 @@ use log::*; use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::contact_info::ContactInfo; use solana_core::genesis_utils::{create_genesis_config, GenesisConfigInfo}; -use solana_core::packet::to_packets_chunked; use solana_core::retransmit_stage::retransmitter; use solana_ledger::bank_forks::BankForks; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_measure::measure::Measure; +use solana_perf::packet::to_packets_chunked; use solana_perf::test_tx::test_tx; use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 252179dc35..2974f2a754 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -6,9 +6,9 @@ extern crate test; use crossbeam_channel::unbounded; use log::*; use rand::{thread_rng, Rng}; -use solana_core::packet::to_packets_chunked; use solana_core::sigverify::TransactionSigVerifier; use solana_core::sigverify_stage::SigVerifyStage; +use solana_perf::packet::to_packets_chunked; use solana_perf::test_tx::test_tx; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, Signer}; diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a16de62a4f..bd93423ad4 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -3,7 +3,6 @@ //! can do its processing in parallel with signature verification on the GPU. use crate::{ cluster_info::ClusterInfo, - packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH}, poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}, poh_service::PohService, }; @@ -17,7 +16,11 @@ use solana_ledger::{ }; use solana_measure::{measure::Measure, thread_mem_usage}; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn}; -use solana_perf::{cuda_runtime::PinnedVec, perf_libs}; +use solana_perf::{ + cuda_runtime::PinnedVec, + packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH}, + perf_libs, +}; use solana_runtime::{ accounts_db::ErrorCounters, bank::{Bank, TransactionBalancesSet, TransactionProcessResult}, @@ -1011,7 +1014,6 @@ mod tests { use crate::{ cluster_info::Node, genesis_utils::{create_genesis_config, GenesisConfigInfo}, - packet::to_packets, poh_recorder::WorkingBank, transaction_status_service::TransactionStatusService, }; @@ -1023,6 +1025,7 @@ mod tests { entry::{next_entry, Entry, EntrySlice}, get_tmp_ledger_path, }; + use solana_perf::packet::to_packets; use solana_runtime::bank::HashAgeKind; use solana_sdk::{ instruction::InstructionError, diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 42db2e4b11..543ccacb5a 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -12,8 +12,6 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight -use crate::packet::limited_deserialize; -use crate::streamer::{PacketReceiver, PacketSender}; use crate::{ contact_info::ContactInfo, crds_gossip::CrdsGossip, @@ -23,9 +21,7 @@ use crate::{ self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash, Vote, }, epoch_slots::EpochSlots, - packet::{Packet, PACKET_DATA_SIZE}, result::{Error, Result}, - sendmmsg::{multicast, send_mmsg}, weighted_shuffle::{weighted_best, weighted_shuffle}, }; use bincode::{serialize, serialized_size}; @@ -41,7 +37,10 @@ use solana_net_utils::{ bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, multi_bind_in_range, PortRange, }; -use solana_perf::packet::{to_packets_with_destination, Packets, PacketsRecycler}; +use solana_perf::packet::{ + limited_deserialize, to_packets_with_destination, Packet, Packets, PacketsRecycler, + PACKET_DATA_SIZE, +}; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::hash::Hash; use solana_sdk::timing::duration_as_s; @@ -52,6 +51,8 @@ use solana_sdk::{ timing::{duration_as_ms, timestamp}, transaction::Transaction, }; +use solana_streamer::sendmmsg::{multicast, send_mmsg}; +use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ borrow::Cow, cmp::min, diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 02e81cf832..d312ad2632 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,7 +1,6 @@ use crate::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, crds_value::CrdsValueLabel, - packet::{self, Packets}, poh_recorder::PohRecorder, result::{Error, Result}, sigverify, @@ -14,6 +13,7 @@ use itertools::izip; use log::*; use solana_ledger::bank_forks::BankForks; use solana_metrics::inc_new_counter_debug; +use solana_perf::packet::{self, Packets}; use solana_runtime::bank::Bank; use solana_sdk::{ account::Account, @@ -668,7 +668,7 @@ impl ClusterInfoVoteListener { #[cfg(test)] mod tests { use super::*; - use crate::packet; + use solana_perf::packet; use solana_runtime::{ bank::Bank, genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index aeff431aa6..0966a2b091 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -1,14 +1,14 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET; -use crate::packet::PacketsRecycler; use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; -use crate::streamer::{self, PacketReceiver, PacketSender}; use solana_measure::thread_mem_usage; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info}; +use solana_perf::packet::PacketsRecycler; use solana_perf::recycler::Recycler; use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; +use solana_streamer::streamer::{self, PacketReceiver, PacketSender}; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, RecvTimeoutError}; diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index d6a087b293..08e9add236 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -2,13 +2,13 @@ use crate::cluster_info::{ClusterInfo, VALIDATOR_PORT_RANGE}; use crate::contact_info::ContactInfo; -use crate::streamer; use rand::{thread_rng, Rng}; use solana_client::thin_client::{create_client, ThinClient}; use solana_ledger::bank_forks::BankForks; use solana_perf::recycler::Recycler; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, Signer}; +use solana_streamer::streamer; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; diff --git a/core/src/lib.rs b/core/src/lib.rs index 75e3552193..b5cdf468d8 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -32,10 +32,8 @@ pub mod genesis_utils; pub mod gossip_service; pub mod ledger_cleanup_service; pub mod local_vote_signer_service; -pub mod packet; pub mod poh_recorder; pub mod poh_service; -pub mod recvmmsg; pub mod repair_service; pub mod replay_stage; mod result; @@ -46,7 +44,6 @@ pub mod rpc_pubsub; pub mod rpc_pubsub_service; pub mod rpc_service; pub mod rpc_subscriptions; -pub mod sendmmsg; pub mod serve_repair; pub mod serve_repair_service; pub mod sigverify; @@ -54,7 +51,6 @@ pub mod sigverify_shreds; pub mod sigverify_stage; pub mod snapshot_packager_service; pub mod storage_stage; -pub mod streamer; pub mod tpu; pub mod transaction_status_service; pub mod tvu; diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 5c79f889fb..794572a8c9 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -2,10 +2,8 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, - packet::Packets, repair_service::RepairStrategy, result::{Error, Result}, - streamer::PacketReceiver, window_service::{should_retransmit_and_persist, WindowService}, }; use crossbeam_channel::Receiver as CrossbeamReceiver; @@ -17,7 +15,9 @@ use solana_ledger::{ }; use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_error; +use solana_perf::packet::Packets; use solana_sdk::epoch_schedule::EpochSchedule; +use solana_streamer::streamer::PacketReceiver; use std::{ cmp, net::UdpSocket, @@ -279,10 +279,10 @@ mod tests { use super::*; use crate::contact_info::ContactInfo; use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use crate::packet::{self, Meta, Packet, Packets}; use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions}; use solana_ledger::create_new_tmp_ledger; use solana_net_utils::find_available_port_in_range; + use solana_perf::packet::{Meta, Packet, Packets}; use solana_sdk::pubkey::Pubkey; use std::net::{IpAddr, Ipv4Addr}; @@ -333,7 +333,7 @@ mod tests { // it should send this over the sockets. retransmit_sender.send(packets).unwrap(); let mut packets = Packets::new(vec![]); - packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); + solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); assert_eq!(packets.packets.len(), 1); assert_eq!(packets.packets[0].meta.repair, false); @@ -349,7 +349,7 @@ mod tests { let packets = Packets::new(vec![repair, Packet::default()]); retransmit_sender.send(packets).unwrap(); let mut packets = Packets::new(vec![]); - packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); + solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); assert_eq!(packets.packets.len(), 1); assert_eq!(packets.packets[0].meta.repair, false); } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 5a21aaa50d..9ae6039cf1 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -2,7 +2,7 @@ use crate::{ cluster_info::ClusterInfo, commitment::BlockCommitmentCache, contact_info::ContactInfo, - packet::PACKET_DATA_SIZE, storage_stage::StorageState, validator::ValidatorExit, + storage_stage::StorageState, validator::ValidatorExit, }; use bincode::serialize; use jsonrpc_core::{Error, Metadata, Result}; @@ -18,6 +18,7 @@ use solana_faucet::faucet::request_airdrop_transaction; use solana_ledger::{ bank_forks::BankForks, blockstore::Blockstore, rooted_slot_iterator::RootedSlotIterator, }; +use solana_perf::packet::PACKET_DATA_SIZE; use solana_runtime::bank::Bank; use solana_sdk::{ clock::{Slot, UnixTimestamp}, diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 0e107aeba2..7f7b7fb268 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -1,10 +1,7 @@ -use crate::packet::limited_deserialize; -use crate::streamer::{PacketReceiver, PacketSender}; use crate::{ cluster_info::{ClusterInfo, ClusterInfoError}, cluster_slots::ClusterSlots, contact_info::ContactInfo, - packet::Packet, result::{Error, Result}, weighted_shuffle::weighted_best, }; @@ -12,13 +9,14 @@ use bincode::serialize; use solana_ledger::blockstore::Blockstore; use solana_measure::thread_mem_usage; use solana_metrics::{datapoint_debug, inc_new_counter_debug}; -use solana_perf::packet::{Packets, PacketsRecycler}; +use solana_perf::packet::{limited_deserialize, Packet, Packets, PacketsRecycler}; use solana_sdk::{ clock::Slot, pubkey::Pubkey, signature::{Keypair, Signer}, timing::duration_as_ms, }; +use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ collections::HashMap, net::SocketAddr, diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index 2e883fac77..f252ab1e4b 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -1,7 +1,7 @@ use crate::serve_repair::ServeRepair; -use crate::streamer; use solana_ledger::blockstore::Blockstore; use solana_perf::recycler::Recycler; +use solana_streamer::streamer; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 6bdd813fd4..ef21d65196 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,12 +1,11 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. -use crate::packet::{Packet, PacketsRecycler}; -use crate::streamer::{self, PacketReceiver, PacketSender}; use solana_ledger::blockstore::MAX_DATA_SHREDS_PER_SLOT; use solana_ledger::shred::{OFFSET_OF_SHRED_INDEX, SIZE_OF_SHRED_INDEX}; use solana_perf::cuda_runtime::PinnedVec; -use solana_perf::packet::limited_deserialize; +use solana_perf::packet::{limited_deserialize, Packet, PacketsRecycler}; use solana_perf::recycler::Recycler; +use solana_streamer::streamer::{self, PacketReceiver, PacketSender}; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 78cc8827ee..c939e9b4f4 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -1,11 +1,11 @@ #![allow(clippy::implicit_hasher)] -use crate::packet::{limited_deserialize, Packets}; use crate::sigverify; use crate::sigverify_stage::SigVerifier; use solana_ledger::bank_forks::BankForks; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::shred::{OFFSET_OF_SHRED_SLOT, SIZE_OF_SHRED_SLOT}; use solana_ledger::sigverify_shreds::verify_shreds_gpu; +use solana_perf::packet::{limited_deserialize, Packets}; use solana_perf::recycler_cache::RecyclerCache; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; @@ -73,8 +73,8 @@ impl SigVerifier for ShredSigVerifier { pub mod tests { use super::*; use crate::genesis_utils::create_genesis_config_with_leader; - use crate::packet::Packet; use solana_ledger::shred::{Shred, Shredder}; + use solana_perf::packet::Packet; use solana_runtime::bank::Bank; use solana_sdk::signature::{Keypair, Signer}; diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 31091ce15a..cd67e54661 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -5,14 +5,14 @@ //! transaction. All processing is done on the CPU by default and on a GPU //! if perf-libs are available -use crate::packet::Packets; use crate::sigverify; -use crate::streamer::{self, PacketReceiver, StreamerError}; use crossbeam_channel::{SendError, Sender as CrossbeamSender}; use solana_measure::measure::Measure; use solana_metrics::datapoint_debug; +use solana_perf::packet::Packets; use solana_perf::perf_libs; use solana_sdk::timing; +use solana_streamer::streamer::{self, PacketReceiver, StreamerError}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index 8178f55956..019e29ac3a 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -1,7 +1,8 @@ use crate::{ cluster_info_vote_listener::VerifiedVotePacketsReceiver, crds_value::CrdsValueLabel, - packet::Packets, result::Result, + result::Result, }; +use solana_perf::packet::Packets; use std::{collections::HashMap, ops::Deref, time::Duration}; #[derive(Default)] @@ -55,11 +56,9 @@ impl VerifiedVotePackets { #[cfg(test)] mod tests { use super::*; - use crate::{ - packet::{Meta, Packet}, - result::Error, - }; + use crate::result::Error; use crossbeam_channel::{unbounded, RecvTimeoutError}; + use solana_perf::packet::{Meta, Packet}; use solana_sdk::pubkey::Pubkey; #[test] diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 3fa88f91ad..f2c534b597 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -2,10 +2,8 @@ //! blockstore and retransmitting where required //! use crate::cluster_info::ClusterInfo; -use crate::packet::Packets; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; -use crate::streamer::PacketSender; use crossbeam_channel::{ unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, }; @@ -17,10 +15,12 @@ use solana_ledger::blockstore::{self, Blockstore, MAX_DATA_SHREDS_PER_SLOT}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::shred::Shred; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; +use solana_perf::packet::Packets; use solana_rayon_threadlimit::get_thread_count; use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; +use solana_streamer::streamer::PacketSender; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -476,11 +476,8 @@ impl WindowService { mod test { use super::*; use crate::{ - cluster_info::ClusterInfo, - contact_info::ContactInfo, - genesis_utils::create_genesis_config_with_leader, - packet::{Packet, Packets}, - repair_service::RepairSlotRange, + cluster_info::ClusterInfo, contact_info::ContactInfo, + genesis_utils::create_genesis_config_with_leader, repair_service::RepairSlotRange, }; use rand::thread_rng; use solana_ledger::shred::DataShredHeader; @@ -490,6 +487,7 @@ mod test { get_tmp_ledger_path, shred::Shredder, }; + use solana_perf::packet::Packet; use solana_sdk::{ clock::Slot, epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index 33edff165a..14bfe424a8 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -5,7 +5,7 @@ use rayon::iter::*; use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::gossip_service::GossipService; -use solana_core::packet::Packet; +use solana_perf::packet::Packet; use solana_sdk::signature::{Keypair, Signer}; use solana_sdk::timing::timestamp; use std::net::UdpSocket; diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml new file mode 100644 index 0000000000..3f80c6fc17 --- /dev/null +++ b/streamer/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "solana-streamer" +version = "1.1.0" +description = "Solana Streamer" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +edition = "2018" + +[dependencies] +log = "0.4.8" +solana-metrics = { path = "../metrics", version = "1.1.0" } +solana-sdk = { path = "../sdk", version = "1.1.0" } +thiserror = "1.0" +solana-measure = { path = "../measure", version = "1.1.0" } +solana-logger = { path = "../logger", version = "1.1.0" } +libc = "0.2.67" +nix = "0.17.0" +solana-perf = { path = "../perf", version = "1.1.0" } + +[dev-dependencies] + +[lib] +crate-type = ["lib"] +name = "solana_streamer" diff --git a/streamer/src/lib.rs b/streamer/src/lib.rs new file mode 100644 index 0000000000..f2c74d9efb --- /dev/null +++ b/streamer/src/lib.rs @@ -0,0 +1,10 @@ +pub mod packet; +pub mod recvmmsg; +pub mod sendmmsg; +pub mod streamer; + +#[macro_use] +extern crate log; + +#[macro_use] +extern crate solana_metrics; diff --git a/core/src/packet.rs b/streamer/src/packet.rs similarity index 98% rename from core/src/packet.rs rename to streamer/src/packet.rs index e36223712c..21ddb08383 100644 --- a/core/src/packet.rs +++ b/streamer/src/packet.rs @@ -72,7 +72,7 @@ mod tests { #[test] fn test_packets_set_addr() { // test that the address is actually being updated - let send_addr = socketaddr!([127, 0, 0, 1], 123); + let send_addr: SocketAddr = "127.0.0.1:123".parse().unwrap(); let packets = vec![Packet::default()]; let mut msgs = Packets::new(packets); msgs.set_addr(&send_addr); diff --git a/core/src/recvmmsg.rs b/streamer/src/recvmmsg.rs similarity index 100% rename from core/src/recvmmsg.rs rename to streamer/src/recvmmsg.rs diff --git a/core/src/sendmmsg.rs b/streamer/src/sendmmsg.rs similarity index 96% rename from core/src/sendmmsg.rs rename to streamer/src/sendmmsg.rs index e43a4adc1c..4ad5b327e4 100644 --- a/core/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -164,8 +164,8 @@ mod tests { .map(|_| (vec![0u8; PACKET_DATA_SIZE], &addr)) .collect(); - let sent = send_mmsg(&sender, &mut packets); - assert_matches!(sent, Ok(32)); + let sent = send_mmsg(&sender, &mut packets).ok(); + assert_eq!(sent, Some(32)); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; @@ -192,8 +192,8 @@ mod tests { }) .collect(); - let sent = send_mmsg(&sender, &mut packets); - assert_matches!(sent, Ok(32)); + let sent = send_mmsg(&sender, &mut packets).ok(); + assert_eq!(sent, Some(32)); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; @@ -226,8 +226,9 @@ mod tests { &sender, &mut packet.data[..packet.meta.size], &[&addr, &addr2, &addr3, &addr4], - ); - assert_matches!(sent, Ok(4)); + ) + .ok(); + assert_eq!(sent, Some(4)); let mut packets = vec![Packet::default(); 32]; let recv = recv_mmsg(&reader, &mut packets[..]).unwrap().1; diff --git a/core/src/streamer.rs b/streamer/src/streamer.rs similarity index 100% rename from core/src/streamer.rs rename to streamer/src/streamer.rs