diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index eba249311c..99435dd920 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -1,6 +1,6 @@ #![allow(clippy::integer_arithmetic)] use { - clap::{crate_description, crate_name, App, Arg}, + clap::{crate_description, crate_name, value_t, App, Arg}, crossbeam_channel::unbounded, solana_streamer::{ packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, @@ -67,13 +67,22 @@ fn main() -> Result<()> { .takes_value(true) .help("Use NUM receive sockets"), ) + .arg( + Arg::with_name("num-producers") + .long("num-producers") + .value_name("NUM") + .takes_value(true) + .help("Use this many producer threads."), + ) .get_matches(); if let Some(n) = matches.value_of("num-recv-sockets") { num_sockets = max(num_sockets, n.to_string().parse().expect("integer")); } - let mut port = 0; + let num_producers = value_t!(matches, "num_producers", u64).unwrap_or(4); + + let port = 0; let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); let mut addr = SocketAddr::new(ip_addr, 0); @@ -82,13 +91,16 @@ fn main() -> Result<()> { let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); let recycler = PacketBatchRecycler::default(); - for _ in 0..num_sockets { - let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap(); + let (_port, read_sockets) = solana_net_utils::multi_bind_in_range( + ip_addr, + (port, port + num_sockets as u16), + num_sockets, + ) + .unwrap(); + for read in read_sockets { read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); addr = read.local_addr().unwrap(); - port = addr.port(); - let (s_reader, r_reader) = unbounded(); read_channels.push(r_reader); read_threads.push(receiver( @@ -102,9 +114,10 @@ fn main() -> Result<()> { )); } - let t_producer1 = producer(&addr, exit.clone()); - let t_producer2 = producer(&addr, exit.clone()); - let t_producer3 = producer(&addr, exit.clone()); + let producer_threads: Vec<_> = (0..num_producers) + .into_iter() + .map(|_| producer(&addr, exit.clone())) + .collect(); let rvs = Arc::new(AtomicUsize::new(0)); let sink_threads: Vec<_> = read_channels @@ -124,9 +137,9 @@ fn main() -> Result<()> { for t_reader in read_threads { t_reader.join()?; } - t_producer1.join()?; - t_producer2.join()?; - t_producer3.join()?; + for t_producer in producer_threads { + t_producer.join()?; + } for t_sink in sink_threads { t_sink.join()?; }