2018-05-25 23:00:47 -06:00
|
|
|
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
|
|
|
//!
|
2018-11-16 08:45:59 -08:00
|
|
|
|
2021-07-29 21:43:24 +00:00
|
|
|
use crate::{
|
|
|
|
packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH},
|
|
|
|
recvmmsg::NUM_RCVMMSGS,
|
|
|
|
socket::SocketAddrSpace,
|
|
|
|
};
|
Cost model 1.7 (#20188)
* Cost Model to limit transactions which are not parallelizeable (#16694)
* * Add following to banking_stage:
1. CostModel as immutable ref shared between threads, to provide estimated cost for transactions.
2. CostTracker which is shared between threads, tracks transaction costs for each block.
* replace hard coded program ID with id() calls
* Add Account Access Cost as part of TransactionCost. Account Access cost are weighted differently between read and write, signed and non-signed.
* Establish instruction_execution_cost_table, add function to update or insert instruction cost, unit tested. It is read-only for now; it allows Replay to insert realtime instruction execution costs to the table.
* add test for cost_tracker atomically try_add operation, serves as safety guard for future changes
* check cost against local copy of cost_tracker, return transactions that would exceed limit as unprocessed transaction to be buffered; only apply bank processed transactions cost to tracker;
* bencher to new banking_stage with max cost limit to allow cost model being hit consistently during bench iterations
* replay stage feed back program cost (#17731)
* replay stage feeds back realtime per-program execution cost to cost model;
* program cost execution table is initialized into empty table, no longer populated with hardcoded numbers;
* changed cost unit to microsecond, using value collected from mainnet;
* add ExecuteCostTable with fixed capacity for security concern, when its limit is reached, programs with old age AND less occurrence will be pushed out to make room for new programs.
* investigate system performance test degradation (#17919)
* Add stats and counter around cost model ops, mainly:
- calculate transaction cost
- check transaction can fit in a block
- update block cost tracker after transactions are added to block
- replay_stage to update/insert execution cost to table
* Change mutex on cost_tracker to RwLock
* removed cloning cost_tracker for local use, as the metrics show clone is very expensive.
* acquire and hold locks for block of TXs, instead of acquire and release per transaction;
* remove redundant would_fit check from cost_tracker update execution path
* refactor cost checking with less frequent lock acquiring
* avoid many Transaction_cost heap allocation when calculate cost, which
is in the hot path - executed per transaction.
* create hashmap with new_capacity to reduce runtime heap realloc.
* code review changes: categorize stats, replace explicit drop calls, concisely initiate to default
* address potential deadlock by acquiring locks one at time
* Persist cost table to blockstore (#18123)
* Add `ProgramCosts` Column Family to blockstore, implement LedgerColumn; add `delete_cf` to Rocks
* Add ProgramCosts to compaction excluding list alone side with TransactionStatusIndex in one place: `excludes_from_compaction()`
* Write cost table to blockstore after `replay_stage` replayed active banks; add stats to measure persist time
* Deletes program from `ProgramCosts` in blockstore when they are removed from cost_table in memory
* Only try to persist to blockstore when cost_table is changed.
* Restore cost table during validator startup
* Offload `cost_model` related operations from replay main thread to dedicated service thread, add channel to send execute_timings between these threads;
* Move `cost_update_service` to its own module; replay_stage is now decoupled from cost_model.
* log warning when channel send fails (#18391)
* Aggregate cost_model into cost_tracker (#18374)
* * aggregate cost_model into cost_tracker, decouple it from banking_stage to prevent accidental deadlock. * Simplified code, removed unused functions
* review fixes
* update ledger tool to restore cost table from blockstore (#18489)
* update ledger tool to restore cost model from blockstore when compute-slot-cost
* Move initialize_cost_table into cost_model, so the function can be tested and shared between validator and ledger-tool
* refactor and simplify a test
* manually fix merge conflicts
* Per-program id timings (#17554)
* more manual fixing
* solve a merge conflict
* featurize cost model
* more merge fix
* cost model uses compute_unit to replace microsecond as cost unit
(#18934)
* Reject blocks for costs above the max block cost (#18994)
* Update block max cost limit to fix performance regession (#19276)
* replace function with const var for better readability (#19285)
* Add few more metrics data points (#19624)
* periodically report sigverify_stage stats (#19674)
* manual merge
* cost model nits (#18528)
* Accumulate consumed units (#18714)
* tx wide compute budget (#18631)
* more manual merge
* ignore zerorize drop security
* - update const cost values with data collected by #19627
- update cost calculation to closely proposed fee schedule #16984
* add transaction cost histogram metrics (#20350)
* rebase to 1.7.15
* add tx count and thread id to stats (#20451)
each stat reports and resets when slot changes
* remove cost_model feature_set
* ignore vote transactions from cost model
Co-authored-by: sakridge <sakridge@gmail.com>
Co-authored-by: Jeff Biseda <jbiseda@gmail.com>
Co-authored-by: Jack May <jack@solana.com>
2021-10-06 15:11:41 -05:00
|
|
|
use solana_sdk::timing::timestamp;
|
2018-08-09 13:31:34 -06:00
|
|
|
use std::net::UdpSocket;
|
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2020-01-02 20:50:43 -07:00
|
|
|
use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender};
|
2019-04-17 18:15:50 -07:00
|
|
|
use std::sync::Arc;
|
2018-05-30 13:13:14 -07:00
|
|
|
use std::thread::{Builder, JoinHandle};
|
2018-09-24 17:13:49 -07:00
|
|
|
use std::time::{Duration, Instant};
|
2020-01-02 20:50:43 -07:00
|
|
|
use thiserror::Error;
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2019-04-17 18:15:50 -07:00
|
|
|
pub type PacketReceiver = Receiver<Packets>;
|
|
|
|
pub type PacketSender = Sender<Packets>;
|
2018-07-17 15:00:22 -07:00
|
|
|
|
2020-01-02 20:50:43 -07:00
|
|
|
#[derive(Error, Debug)]
|
|
|
|
pub enum StreamerError {
|
|
|
|
#[error("I/O error")]
|
2021-02-18 23:42:09 -08:00
|
|
|
Io(#[from] std::io::Error),
|
2020-01-02 20:50:43 -07:00
|
|
|
|
|
|
|
#[error("receive timeout error")]
|
2021-06-18 11:47:40 -07:00
|
|
|
RecvTimeout(#[from] RecvTimeoutError),
|
2020-01-02 20:50:43 -07:00
|
|
|
|
|
|
|
#[error("send packets error")]
|
2021-06-18 11:47:40 -07:00
|
|
|
Send(#[from] SendError<Packets>),
|
2020-01-02 20:50:43 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
pub type Result<T> = std::result::Result<T, StreamerError>;
|
|
|
|
|
2019-06-27 09:32:32 +02:00
|
|
|
fn recv_loop(
|
|
|
|
sock: &UdpSocket,
|
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
channel: &PacketSender,
|
|
|
|
recycler: &PacketsRecycler,
|
|
|
|
name: &'static str,
|
2021-02-26 09:15:45 -08:00
|
|
|
coalesce_ms: u64,
|
2021-06-14 16:30:51 +00:00
|
|
|
use_pinned_memory: bool,
|
2019-06-27 09:32:32 +02:00
|
|
|
) -> Result<()> {
|
2019-10-08 09:54:49 -07:00
|
|
|
let mut recv_count = 0;
|
|
|
|
let mut call_count = 0;
|
|
|
|
let mut now = Instant::now();
|
|
|
|
let mut num_max_received = 0; // Number of times maximum packets were received
|
2018-03-07 13:47:13 -08:00
|
|
|
loop {
|
2021-06-14 16:30:51 +00:00
|
|
|
let mut msgs = if use_pinned_memory {
|
|
|
|
Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name)
|
|
|
|
} else {
|
|
|
|
Packets::with_capacity(PACKETS_PER_BATCH)
|
|
|
|
};
|
2018-03-07 13:47:13 -08:00
|
|
|
loop {
|
2018-09-25 15:41:29 -07:00
|
|
|
// Check for exit signal, even if socket is busy
|
2019-11-14 10:24:53 -08:00
|
|
|
// (for instance the leader transaction socket)
|
2018-09-25 15:41:29 -07:00
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
return Ok(());
|
|
|
|
}
|
2021-02-26 09:15:45 -08:00
|
|
|
if let Ok(len) = packet::recv_from(&mut msgs, sock, coalesce_ms) {
|
2019-10-08 09:54:49 -07:00
|
|
|
if len == NUM_RCVMMSGS {
|
|
|
|
num_max_received += 1;
|
|
|
|
}
|
|
|
|
recv_count += len;
|
|
|
|
call_count += 1;
|
2021-04-07 11:15:38 -04:00
|
|
|
if len > 0 {
|
2019-12-10 11:28:07 -08:00
|
|
|
channel.send(msgs)?;
|
|
|
|
}
|
2018-09-27 14:49:50 -06:00
|
|
|
break;
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
}
|
2019-10-08 09:54:49 -07:00
|
|
|
if recv_count > 1024 {
|
2019-10-15 11:43:52 -07:00
|
|
|
datapoint_debug!(
|
2019-10-24 19:27:19 -07:00
|
|
|
name,
|
2019-10-08 09:54:49 -07:00
|
|
|
("received", recv_count as i64, i64),
|
|
|
|
("call_count", i64::from(call_count), i64),
|
|
|
|
("elapsed", now.elapsed().as_millis() as i64, i64),
|
|
|
|
("max_received", i64::from(num_max_received), i64),
|
|
|
|
);
|
|
|
|
recv_count = 0;
|
|
|
|
call_count = 0;
|
|
|
|
num_max_received = 0;
|
|
|
|
}
|
2019-12-10 11:28:07 -08:00
|
|
|
now = Instant::now();
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn receiver(
|
2018-08-28 16:32:40 -07:00
|
|
|
sock: Arc<UdpSocket>,
|
2019-03-04 20:50:02 -08:00
|
|
|
exit: &Arc<AtomicBool>,
|
2018-05-15 09:53:51 -06:00
|
|
|
packet_sender: PacketSender,
|
2019-06-27 09:32:32 +02:00
|
|
|
recycler: PacketsRecycler,
|
|
|
|
name: &'static str,
|
2021-02-26 09:15:45 -08:00
|
|
|
coalesce_ms: u64,
|
2021-06-14 16:30:51 +00:00
|
|
|
use_pinned_memory: bool,
|
2018-05-15 09:53:51 -06:00
|
|
|
) -> JoinHandle<()> {
|
2018-05-22 09:46:52 -07:00
|
|
|
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
|
|
|
|
if res.is_err() {
|
|
|
|
panic!("streamer::receiver set_read_timeout error");
|
|
|
|
}
|
2019-03-04 20:50:02 -08:00
|
|
|
let exit = exit.clone();
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2018-05-30 13:20:58 -07:00
|
|
|
.name("solana-receiver".to_string())
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || {
|
2021-02-26 09:15:45 -08:00
|
|
|
let _ = recv_loop(
|
|
|
|
&sock,
|
|
|
|
exit,
|
|
|
|
&packet_sender,
|
|
|
|
&recycler.clone(),
|
|
|
|
name,
|
|
|
|
coalesce_ms,
|
2021-06-14 16:30:51 +00:00
|
|
|
use_pinned_memory,
|
2021-02-26 09:15:45 -08:00
|
|
|
);
|
2018-12-07 20:01:28 -07:00
|
|
|
})
|
|
|
|
.unwrap()
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
|
2021-07-29 21:43:24 +00:00
|
|
|
fn recv_send(
|
|
|
|
sock: &UdpSocket,
|
|
|
|
r: &PacketReceiver,
|
|
|
|
socket_addr_space: &SocketAddrSpace,
|
|
|
|
) -> Result<()> {
|
2018-03-07 13:47:13 -08:00
|
|
|
let timer = Duration::new(1, 0);
|
2018-09-03 00:22:47 -10:00
|
|
|
let msgs = r.recv_timeout(timer)?;
|
2021-07-29 21:43:24 +00:00
|
|
|
send_to(&msgs, sock, socket_addr_space)?;
|
2018-03-07 13:47:13 -08:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-09-29 20:40:48 -07:00
|
|
|
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<Packets>, usize, u64)> {
|
2018-05-10 15:47:42 -06:00
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
let msgs = recvr.recv_timeout(timer)?;
|
2018-09-24 17:13:49 -07:00
|
|
|
let recv_start = Instant::now();
|
2018-05-12 19:00:22 -07:00
|
|
|
trace!("got msgs");
|
2019-04-17 18:15:50 -07:00
|
|
|
let mut len = msgs.packets.len();
|
2018-05-10 15:47:42 -06:00
|
|
|
let mut batch = vec![msgs];
|
|
|
|
while let Ok(more) = recvr.try_recv() {
|
|
|
|
trace!("got more msgs");
|
2019-04-17 18:15:50 -07:00
|
|
|
len += more.packets.len();
|
2018-05-10 15:47:42 -06:00
|
|
|
batch.push(more);
|
|
|
|
}
|
Cost model 1.7 (#20188)
* Cost Model to limit transactions which are not parallelizeable (#16694)
* * Add following to banking_stage:
1. CostModel as immutable ref shared between threads, to provide estimated cost for transactions.
2. CostTracker which is shared between threads, tracks transaction costs for each block.
* replace hard coded program ID with id() calls
* Add Account Access Cost as part of TransactionCost. Account Access cost are weighted differently between read and write, signed and non-signed.
* Establish instruction_execution_cost_table, add function to update or insert instruction cost, unit tested. It is read-only for now; it allows Replay to insert realtime instruction execution costs to the table.
* add test for cost_tracker atomically try_add operation, serves as safety guard for future changes
* check cost against local copy of cost_tracker, return transactions that would exceed limit as unprocessed transaction to be buffered; only apply bank processed transactions cost to tracker;
* bencher to new banking_stage with max cost limit to allow cost model being hit consistently during bench iterations
* replay stage feed back program cost (#17731)
* replay stage feeds back realtime per-program execution cost to cost model;
* program cost execution table is initialized into empty table, no longer populated with hardcoded numbers;
* changed cost unit to microsecond, using value collected from mainnet;
* add ExecuteCostTable with fixed capacity for security concern, when its limit is reached, programs with old age AND less occurrence will be pushed out to make room for new programs.
* investigate system performance test degradation (#17919)
* Add stats and counter around cost model ops, mainly:
- calculate transaction cost
- check transaction can fit in a block
- update block cost tracker after transactions are added to block
- replay_stage to update/insert execution cost to table
* Change mutex on cost_tracker to RwLock
* removed cloning cost_tracker for local use, as the metrics show clone is very expensive.
* acquire and hold locks for block of TXs, instead of acquire and release per transaction;
* remove redundant would_fit check from cost_tracker update execution path
* refactor cost checking with less frequent lock acquiring
* avoid many Transaction_cost heap allocation when calculate cost, which
is in the hot path - executed per transaction.
* create hashmap with new_capacity to reduce runtime heap realloc.
* code review changes: categorize stats, replace explicit drop calls, concisely initiate to default
* address potential deadlock by acquiring locks one at time
* Persist cost table to blockstore (#18123)
* Add `ProgramCosts` Column Family to blockstore, implement LedgerColumn; add `delete_cf` to Rocks
* Add ProgramCosts to compaction excluding list alone side with TransactionStatusIndex in one place: `excludes_from_compaction()`
* Write cost table to blockstore after `replay_stage` replayed active banks; add stats to measure persist time
* Deletes program from `ProgramCosts` in blockstore when they are removed from cost_table in memory
* Only try to persist to blockstore when cost_table is changed.
* Restore cost table during validator startup
* Offload `cost_model` related operations from replay main thread to dedicated service thread, add channel to send execute_timings between these threads;
* Move `cost_update_service` to its own module; replay_stage is now decoupled from cost_model.
* log warning when channel send fails (#18391)
* Aggregate cost_model into cost_tracker (#18374)
* * aggregate cost_model into cost_tracker, decouple it from banking_stage to prevent accidental deadlock. * Simplified code, removed unused functions
* review fixes
* update ledger tool to restore cost table from blockstore (#18489)
* update ledger tool to restore cost model from blockstore when compute-slot-cost
* Move initialize_cost_table into cost_model, so the function can be tested and shared between validator and ledger-tool
* refactor and simplify a test
* manually fix merge conflicts
* Per-program id timings (#17554)
* more manual fixing
* solve a merge conflict
* featurize cost model
* more merge fix
* cost model uses compute_unit to replace microsecond as cost unit
(#18934)
* Reject blocks for costs above the max block cost (#18994)
* Update block max cost limit to fix performance regession (#19276)
* replace function with const var for better readability (#19285)
* Add few more metrics data points (#19624)
* periodically report sigverify_stage stats (#19674)
* manual merge
* cost model nits (#18528)
* Accumulate consumed units (#18714)
* tx wide compute budget (#18631)
* more manual merge
* ignore zerorize drop security
* - update const cost values with data collected by #19627
- update cost calculation to closely proposed fee schedule #16984
* add transaction cost histogram metrics (#20350)
* rebase to 1.7.15
* add tx count and thread id to stats (#20451)
each stat reports and resets when slot changes
* remove cost_model feature_set
* ignore vote transactions from cost model
Co-authored-by: sakridge <sakridge@gmail.com>
Co-authored-by: Jeff Biseda <jbiseda@gmail.com>
Co-authored-by: Jack May <jack@solana.com>
2021-10-06 15:11:41 -05:00
|
|
|
let recv_duration = recv_start.elapsed();
|
2018-06-28 14:51:53 -07:00
|
|
|
trace!("batch len {}", batch.len());
|
Cost model 1.7 (#20188)
* Cost Model to limit transactions which are not parallelizeable (#16694)
* * Add following to banking_stage:
1. CostModel as immutable ref shared between threads, to provide estimated cost for transactions.
2. CostTracker which is shared between threads, tracks transaction costs for each block.
* replace hard coded program ID with id() calls
* Add Account Access Cost as part of TransactionCost. Account Access cost are weighted differently between read and write, signed and non-signed.
* Establish instruction_execution_cost_table, add function to update or insert instruction cost, unit tested. It is read-only for now; it allows Replay to insert realtime instruction execution costs to the table.
* add test for cost_tracker atomically try_add operation, serves as safety guard for future changes
* check cost against local copy of cost_tracker, return transactions that would exceed limit as unprocessed transaction to be buffered; only apply bank processed transactions cost to tracker;
* bencher to new banking_stage with max cost limit to allow cost model being hit consistently during bench iterations
* replay stage feed back program cost (#17731)
* replay stage feeds back realtime per-program execution cost to cost model;
* program cost execution table is initialized into empty table, no longer populated with hardcoded numbers;
* changed cost unit to microsecond, using value collected from mainnet;
* add ExecuteCostTable with fixed capacity for security concern, when its limit is reached, programs with old age AND less occurrence will be pushed out to make room for new programs.
* investigate system performance test degradation (#17919)
* Add stats and counter around cost model ops, mainly:
- calculate transaction cost
- check transaction can fit in a block
- update block cost tracker after transactions are added to block
- replay_stage to update/insert execution cost to table
* Change mutex on cost_tracker to RwLock
* removed cloning cost_tracker for local use, as the metrics show clone is very expensive.
* acquire and hold locks for block of TXs, instead of acquire and release per transaction;
* remove redundant would_fit check from cost_tracker update execution path
* refactor cost checking with less frequent lock acquiring
* avoid many Transaction_cost heap allocation when calculate cost, which
is in the hot path - executed per transaction.
* create hashmap with new_capacity to reduce runtime heap realloc.
* code review changes: categorize stats, replace explicit drop calls, concisely initiate to default
* address potential deadlock by acquiring locks one at time
* Persist cost table to blockstore (#18123)
* Add `ProgramCosts` Column Family to blockstore, implement LedgerColumn; add `delete_cf` to Rocks
* Add ProgramCosts to compaction excluding list alone side with TransactionStatusIndex in one place: `excludes_from_compaction()`
* Write cost table to blockstore after `replay_stage` replayed active banks; add stats to measure persist time
* Deletes program from `ProgramCosts` in blockstore when they are removed from cost_table in memory
* Only try to persist to blockstore when cost_table is changed.
* Restore cost table during validator startup
* Offload `cost_model` related operations from replay main thread to dedicated service thread, add channel to send execute_timings between these threads;
* Move `cost_update_service` to its own module; replay_stage is now decoupled from cost_model.
* log warning when channel send fails (#18391)
* Aggregate cost_model into cost_tracker (#18374)
* * aggregate cost_model into cost_tracker, decouple it from banking_stage to prevent accidental deadlock. * Simplified code, removed unused functions
* review fixes
* update ledger tool to restore cost table from blockstore (#18489)
* update ledger tool to restore cost model from blockstore when compute-slot-cost
* Move initialize_cost_table into cost_model, so the function can be tested and shared between validator and ledger-tool
* refactor and simplify a test
* manually fix merge conflicts
* Per-program id timings (#17554)
* more manual fixing
* solve a merge conflict
* featurize cost model
* more merge fix
* cost model uses compute_unit to replace microsecond as cost unit
(#18934)
* Reject blocks for costs above the max block cost (#18994)
* Update block max cost limit to fix performance regession (#19276)
* replace function with const var for better readability (#19285)
* Add few more metrics data points (#19624)
* periodically report sigverify_stage stats (#19674)
* manual merge
* cost model nits (#18528)
* Accumulate consumed units (#18714)
* tx wide compute budget (#18631)
* more manual merge
* ignore zerorize drop security
* - update const cost values with data collected by #19627
- update cost calculation to closely proposed fee schedule #16984
* add transaction cost histogram metrics (#20350)
* rebase to 1.7.15
* add tx count and thread id to stats (#20451)
each stat reports and resets when slot changes
* remove cost_model feature_set
* ignore vote transactions from cost model
Co-authored-by: sakridge <sakridge@gmail.com>
Co-authored-by: Jeff Biseda <jbiseda@gmail.com>
Co-authored-by: Jack May <jack@solana.com>
2021-10-06 15:11:41 -05:00
|
|
|
Ok((
|
|
|
|
batch,
|
|
|
|
len,
|
|
|
|
solana_sdk::timing::duration_as_ms(&recv_duration),
|
|
|
|
))
|
2018-05-10 15:47:42 -06:00
|
|
|
}
|
|
|
|
|
2021-07-29 21:43:24 +00:00
|
|
|
pub fn responder(
|
|
|
|
name: &'static str,
|
|
|
|
sock: Arc<UdpSocket>,
|
|
|
|
r: PacketReceiver,
|
|
|
|
socket_addr_space: SocketAddrSpace,
|
|
|
|
) -> JoinHandle<()> {
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2018-07-11 07:38:57 -07:00
|
|
|
.name(format!("solana-responder-{}", name))
|
2020-06-18 13:30:55 -07:00
|
|
|
.spawn(move || {
|
|
|
|
let mut errors = 0;
|
|
|
|
let mut last_error = None;
|
|
|
|
let mut last_print = 0;
|
|
|
|
loop {
|
2021-07-29 21:43:24 +00:00
|
|
|
if let Err(e) = recv_send(&sock, &r, &socket_addr_space) {
|
2020-06-18 13:30:55 -07:00
|
|
|
match e {
|
2021-06-18 11:47:40 -07:00
|
|
|
StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
|
|
|
|
StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
|
2020-06-18 13:30:55 -07:00
|
|
|
_ => {
|
|
|
|
errors += 1;
|
|
|
|
last_error = Some(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let now = timestamp();
|
|
|
|
if now - last_print > 1000 && errors != 0 {
|
|
|
|
datapoint_info!(name, ("errors", errors, i64),);
|
|
|
|
info!("{} last-error: {:?} count: {}", name, last_error, errors);
|
|
|
|
last_print = now;
|
|
|
|
errors = 0;
|
2018-07-05 16:41:03 -06:00
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
}
|
2018-12-07 20:01:28 -07:00
|
|
|
})
|
|
|
|
.unwrap()
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
|
2018-03-19 17:09:47 -06:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
2019-03-08 19:10:18 -08:00
|
|
|
use super::*;
|
2019-11-04 20:13:43 -08:00
|
|
|
use crate::packet::{Packet, Packets, PACKET_DATA_SIZE};
|
2018-12-07 20:16:27 -07:00
|
|
|
use crate::streamer::{receiver, responder};
|
2019-11-04 20:13:43 -08:00
|
|
|
use solana_perf::recycler::Recycler;
|
2018-03-26 21:07:11 -07:00
|
|
|
use std::io;
|
|
|
|
use std::io::Write;
|
2018-03-19 17:09:47 -06:00
|
|
|
use std::net::UdpSocket;
|
2018-03-22 14:05:23 -06:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2018-03-19 17:09:47 -06:00
|
|
|
use std::sync::mpsc::channel;
|
2018-08-09 13:31:34 -06:00
|
|
|
use std::sync::Arc;
|
2018-03-26 22:03:26 -06:00
|
|
|
use std::time::Duration;
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2020-12-13 17:26:34 -08:00
|
|
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
2019-03-18 22:08:21 -07:00
|
|
|
for _ in 0..10 {
|
2019-05-29 12:17:50 -07:00
|
|
|
let m = r.recv_timeout(Duration::new(1, 0));
|
|
|
|
if m.is_err() {
|
|
|
|
continue;
|
|
|
|
}
|
2019-03-18 22:08:21 -07:00
|
|
|
|
2019-05-29 12:17:50 -07:00
|
|
|
*num -= m.unwrap().packets.len();
|
2019-03-18 22:08:21 -07:00
|
|
|
|
|
|
|
if *num == 0 {
|
2018-03-07 13:47:13 -08:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-12-13 17:26:34 -08:00
|
|
|
|
2018-03-07 13:47:13 -08:00
|
|
|
#[test]
|
2019-03-10 17:33:01 -07:00
|
|
|
fn streamer_debug() {
|
2018-04-02 19:32:58 -07:00
|
|
|
write!(io::sink(), "{:?}", Packet::default()).unwrap();
|
|
|
|
write!(io::sink(), "{:?}", Packets::default()).unwrap();
|
|
|
|
}
|
|
|
|
#[test]
|
2019-03-10 17:33:01 -07:00
|
|
|
fn streamer_send_test() {
|
2018-04-02 19:32:58 -07:00
|
|
|
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
2018-05-15 09:53:51 -06:00
|
|
|
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
|
|
|
|
2018-03-07 13:47:13 -08:00
|
|
|
let addr = read.local_addr().unwrap();
|
2018-04-02 19:32:58 -07:00
|
|
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
2018-03-07 13:47:13 -08:00
|
|
|
let (s_reader, r_reader) = channel();
|
2021-02-24 00:15:58 -08:00
|
|
|
let t_receiver = receiver(
|
|
|
|
Arc::new(read),
|
|
|
|
&exit,
|
|
|
|
s_reader,
|
2021-04-07 11:15:38 -04:00
|
|
|
Recycler::default(),
|
2021-02-24 00:15:58 -08:00
|
|
|
"test",
|
2021-02-26 09:15:45 -08:00
|
|
|
1,
|
2021-06-14 16:30:51 +00:00
|
|
|
true,
|
2021-02-24 00:15:58 -08:00
|
|
|
);
|
2018-07-05 16:41:03 -06:00
|
|
|
let t_responder = {
|
|
|
|
let (s_responder, r_responder) = channel();
|
2021-07-29 21:43:24 +00:00
|
|
|
let t_responder = responder(
|
|
|
|
"streamer_send_test",
|
|
|
|
Arc::new(send),
|
|
|
|
r_responder,
|
|
|
|
SocketAddrSpace::Unspecified,
|
|
|
|
);
|
2019-11-14 10:24:53 -08:00
|
|
|
let mut msgs = Packets::default();
|
2019-03-18 22:08:21 -07:00
|
|
|
for i in 0..5 {
|
2019-11-14 10:24:53 -08:00
|
|
|
let mut b = Packet::default();
|
2018-07-05 16:41:03 -06:00
|
|
|
{
|
2019-11-14 10:24:53 -08:00
|
|
|
b.data[0] = i as u8;
|
|
|
|
b.meta.size = PACKET_DATA_SIZE;
|
|
|
|
b.meta.set_addr(&addr);
|
2018-07-05 16:41:03 -06:00
|
|
|
}
|
2019-11-14 10:24:53 -08:00
|
|
|
msgs.packets.push(b);
|
2018-06-25 17:13:26 -06:00
|
|
|
}
|
2018-07-05 16:41:03 -06:00
|
|
|
s_responder.send(msgs).expect("send");
|
|
|
|
t_responder
|
|
|
|
};
|
|
|
|
|
2019-03-18 22:08:21 -07:00
|
|
|
let mut num = 5;
|
2020-12-13 17:26:34 -08:00
|
|
|
get_msgs(r_reader, &mut num);
|
2019-03-18 22:08:21 -07:00
|
|
|
assert_eq!(num, 0);
|
2018-03-22 14:05:23 -06:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
2018-03-07 13:47:13 -08:00
|
|
|
t_receiver.join().expect("join");
|
2018-03-24 23:31:54 -07:00
|
|
|
t_responder.join().expect("join");
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
}
|