bench-streamer improvements (#22945)

* get the num-recv-sockets working again
* make num producers configurable
This commit is contained in:
sakridge
2022-02-05 14:13:49 +01:00
committed by GitHub
parent ba215e94f6
commit 9548ea61e5

View File

@ -1,6 +1,6 @@
#![allow(clippy::integer_arithmetic)] #![allow(clippy::integer_arithmetic)]
use { use {
clap::{crate_description, crate_name, App, Arg}, clap::{crate_description, crate_name, value_t, App, Arg},
crossbeam_channel::unbounded, crossbeam_channel::unbounded,
solana_streamer::{ solana_streamer::{
packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
@ -67,13 +67,22 @@ fn main() -> Result<()> {
.takes_value(true) .takes_value(true)
.help("Use NUM receive sockets"), .help("Use NUM receive sockets"),
) )
.arg(
Arg::with_name("num-producers")
.long("num-producers")
.value_name("NUM")
.takes_value(true)
.help("Use this many producer threads."),
)
.get_matches(); .get_matches();
if let Some(n) = matches.value_of("num-recv-sockets") { if let Some(n) = matches.value_of("num-recv-sockets") {
num_sockets = max(num_sockets, n.to_string().parse().expect("integer")); num_sockets = max(num_sockets, n.to_string().parse().expect("integer"));
} }
let mut port = 0; let num_producers = value_t!(matches, "num_producers", u64).unwrap_or(4);
let port = 0;
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let mut addr = SocketAddr::new(ip_addr, 0); let mut addr = SocketAddr::new(ip_addr, 0);
@ -82,13 +91,16 @@ fn main() -> Result<()> {
let mut read_channels = Vec::new(); let mut read_channels = Vec::new();
let mut read_threads = Vec::new(); let mut read_threads = Vec::new();
let recycler = PacketBatchRecycler::default(); let recycler = PacketBatchRecycler::default();
for _ in 0..num_sockets { let (_port, read_sockets) = solana_net_utils::multi_bind_in_range(
let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap(); ip_addr,
(port, port + num_sockets as u16),
num_sockets,
)
.unwrap();
for read in read_sockets {
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
addr = read.local_addr().unwrap(); addr = read.local_addr().unwrap();
port = addr.port();
let (s_reader, r_reader) = unbounded(); let (s_reader, r_reader) = unbounded();
read_channels.push(r_reader); read_channels.push(r_reader);
read_threads.push(receiver( read_threads.push(receiver(
@ -102,9 +114,10 @@ fn main() -> Result<()> {
)); ));
} }
let t_producer1 = producer(&addr, exit.clone()); let producer_threads: Vec<_> = (0..num_producers)
let t_producer2 = producer(&addr, exit.clone()); .into_iter()
let t_producer3 = producer(&addr, exit.clone()); .map(|_| producer(&addr, exit.clone()))
.collect();
let rvs = Arc::new(AtomicUsize::new(0)); let rvs = Arc::new(AtomicUsize::new(0));
let sink_threads: Vec<_> = read_channels let sink_threads: Vec<_> = read_channels
@ -124,9 +137,9 @@ fn main() -> Result<()> {
for t_reader in read_threads { for t_reader in read_threads {
t_reader.join()?; t_reader.join()?;
} }
t_producer1.join()?; for t_producer in producer_threads {
t_producer2.join()?; t_producer.join()?;
t_producer3.join()?; }
for t_sink in sink_threads { for t_sink in sink_threads {
t_sink.join()?; t_sink.join()?;
} }