Co-authored-by: sakridge <sakridge@gmail.com>
This commit is contained in:
@@ -36,7 +36,7 @@ use solana_sdk::{
|
||||
pubkey::Pubkey,
|
||||
short_vec::decode_shortu16_len,
|
||||
signature::Signature,
|
||||
timing::{duration_as_ms, timestamp},
|
||||
timing::{duration_as_ms, timestamp, AtomicInterval},
|
||||
transaction::{self, Transaction, TransactionError},
|
||||
};
|
||||
use solana_transaction_status::token_balances::{
|
||||
@@ -78,7 +78,7 @@ const DEFAULT_LRU_SIZE: usize = 200_000;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct BankingStageStats {
|
||||
last_report: AtomicU64,
|
||||
last_report: AtomicInterval,
|
||||
id: u32,
|
||||
process_packets_count: AtomicUsize,
|
||||
new_tx_count: AtomicUsize,
|
||||
@@ -107,19 +107,7 @@ impl BankingStageStats {
|
||||
}
|
||||
|
||||
fn report(&self, report_interval_ms: u64) {
|
||||
let should_report = {
|
||||
let last = self.last_report.load(Ordering::Relaxed);
|
||||
let now = solana_sdk::timing::timestamp();
|
||||
now.saturating_sub(last) > report_interval_ms
|
||||
&& self.last_report.compare_exchange(
|
||||
last,
|
||||
now,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
) == Ok(last)
|
||||
};
|
||||
|
||||
if should_report {
|
||||
if self.last_report.should_update(report_interval_ms) {
|
||||
datapoint_info!(
|
||||
"banking_stage-loop-stats",
|
||||
("id", self.id as i64, i64),
|
||||
|
@@ -20,10 +20,9 @@ use solana_measure::measure::Measure;
|
||||
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
|
||||
use solana_poh::poh_recorder::WorkingBankEntry;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::timing::timestamp;
|
||||
use solana_sdk::timing::{timestamp, AtomicInterval};
|
||||
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
||||
use solana_streamer::{sendmmsg::send_mmsg, socket::SocketAddrSpace};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::UdpSocket,
|
||||
@@ -362,14 +361,9 @@ impl BroadcastStage {
|
||||
fn update_peer_stats(
|
||||
num_live_peers: i64,
|
||||
broadcast_len: i64,
|
||||
last_datapoint_submit: &Arc<AtomicU64>,
|
||||
last_datapoint_submit: &Arc<AtomicInterval>,
|
||||
) {
|
||||
let now = timestamp();
|
||||
let last = last_datapoint_submit.load(Ordering::Relaxed);
|
||||
#[allow(deprecated)]
|
||||
if now.saturating_sub(last) > 1000
|
||||
&& last_datapoint_submit.compare_and_swap(last, now, Ordering::Relaxed) == last
|
||||
{
|
||||
if last_datapoint_submit.should_update(1000) {
|
||||
datapoint_info!(
|
||||
"cluster_info-num_nodes",
|
||||
("live_count", num_live_peers, i64),
|
||||
@@ -384,7 +378,7 @@ pub fn broadcast_shreds(
|
||||
s: &UdpSocket,
|
||||
shreds: &[Shred],
|
||||
cluster_nodes: &ClusterNodes<BroadcastStage>,
|
||||
last_datapoint_submit: &Arc<AtomicU64>,
|
||||
last_datapoint_submit: &Arc<AtomicInterval>,
|
||||
transmit_stats: &mut TransmitShredsStats,
|
||||
socket_addr_space: &SocketAddrSpace,
|
||||
) -> Result<()> {
|
||||
|
@@ -143,7 +143,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||
sock,
|
||||
&shreds,
|
||||
&cluster_nodes,
|
||||
&Arc::new(AtomicU64::new(0)),
|
||||
&Arc::new(AtomicInterval::default()),
|
||||
&mut TransmitShredsStats::default(),
|
||||
cluster_info.socket_addr_space(),
|
||||
)?;
|
||||
|
@@ -12,7 +12,11 @@ use solana_ledger::{
|
||||
SHRED_TICK_REFERENCE_MASK,
|
||||
},
|
||||
};
|
||||
use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us};
|
||||
use solana_sdk::{
|
||||
pubkey::Pubkey,
|
||||
signature::Keypair,
|
||||
timing::{duration_as_us, AtomicInterval},
|
||||
};
|
||||
use std::{collections::HashMap, ops::Deref, sync::RwLock, time::Duration};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -25,10 +29,10 @@ pub struct StandardBroadcastRun {
|
||||
slot_broadcast_start: Option<Instant>,
|
||||
keypair: Arc<Keypair>,
|
||||
shred_version: u16,
|
||||
last_datapoint_submit: Arc<AtomicU64>,
|
||||
last_datapoint_submit: Arc<AtomicInterval>,
|
||||
num_batches: usize,
|
||||
cluster_nodes: Arc<RwLock<ClusterNodes<BroadcastStage>>>,
|
||||
last_peer_update: Arc<AtomicU64>,
|
||||
last_peer_update: Arc<AtomicInterval>,
|
||||
}
|
||||
|
||||
impl StandardBroadcastRun {
|
||||
@@ -45,7 +49,7 @@ impl StandardBroadcastRun {
|
||||
last_datapoint_submit: Arc::default(),
|
||||
num_batches: 0,
|
||||
cluster_nodes: Arc::default(),
|
||||
last_peer_update: Arc::default(),
|
||||
last_peer_update: Arc::new(AtomicInterval::default()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,14 +343,9 @@ impl StandardBroadcastRun {
|
||||
trace!("Broadcasting {:?} shreds", shreds.len());
|
||||
// Get the list of peers to broadcast to
|
||||
let mut get_peers_time = Measure::start("broadcast::get_peers");
|
||||
let now = timestamp();
|
||||
let last = self.last_peer_update.load(Ordering::Relaxed);
|
||||
#[allow(deprecated)]
|
||||
if now - last > BROADCAST_PEER_UPDATE_INTERVAL_MS
|
||||
&& self
|
||||
.last_peer_update
|
||||
.compare_and_swap(last, now, Ordering::Relaxed)
|
||||
== last
|
||||
if self
|
||||
.last_peer_update
|
||||
.should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false)
|
||||
{
|
||||
*self.cluster_nodes.write().unwrap() = ClusterNodes::<BroadcastStage>::new(
|
||||
cluster_info,
|
||||
|
@@ -28,7 +28,12 @@ use solana_rpc::{
|
||||
rpc_subscriptions::RpcSubscriptions,
|
||||
};
|
||||
use solana_runtime::{bank::Bank, bank_forks::BankForks};
|
||||
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
|
||||
use solana_sdk::{
|
||||
clock::Slot,
|
||||
epoch_schedule::EpochSchedule,
|
||||
pubkey::Pubkey,
|
||||
timing::{timestamp, AtomicInterval},
|
||||
};
|
||||
use solana_streamer::streamer::PacketReceiver;
|
||||
use std::{
|
||||
collections::hash_set::HashSet,
|
||||
@@ -61,7 +66,7 @@ struct RetransmitStats {
|
||||
repair_total: AtomicU64,
|
||||
discard_total: AtomicU64,
|
||||
retransmit_total: AtomicU64,
|
||||
last_ts: AtomicU64,
|
||||
last_ts: AtomicInterval,
|
||||
compute_turbine_peers_total: AtomicU64,
|
||||
retransmit_tree_mismatch: AtomicU64,
|
||||
packets_by_slot: Mutex<BTreeMap<Slot, usize>>,
|
||||
@@ -121,12 +126,7 @@ fn update_retransmit_stats(
|
||||
}
|
||||
}
|
||||
|
||||
let now = timestamp();
|
||||
let last = stats.last_ts.load(Ordering::Relaxed);
|
||||
#[allow(deprecated)]
|
||||
if now.saturating_sub(last) > 2000
|
||||
&& stats.last_ts.compare_and_swap(last, now, Ordering::Relaxed) == last
|
||||
{
|
||||
if stats.last_ts.should_update(2000) {
|
||||
datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64));
|
||||
datapoint_info!(
|
||||
"retransmit-stage",
|
||||
@@ -284,7 +284,7 @@ fn retransmit(
|
||||
id: u32,
|
||||
stats: &RetransmitStats,
|
||||
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>,
|
||||
last_peer_update: &AtomicU64,
|
||||
last_peer_update: &AtomicInterval,
|
||||
shreds_received: &Mutex<ShredFilterAndHasher>,
|
||||
max_slots: &MaxSlots,
|
||||
first_shreds_received: &Mutex<BTreeSet<Slot>>,
|
||||
@@ -314,12 +314,7 @@ fn retransmit(
|
||||
epoch_fetch.stop();
|
||||
|
||||
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
|
||||
let now = timestamp();
|
||||
let last = last_peer_update.load(Ordering::Relaxed);
|
||||
#[allow(deprecated)]
|
||||
if now.saturating_sub(last) > 1000
|
||||
&& last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last
|
||||
{
|
||||
if last_peer_update.should_update_ext(1000, false) {
|
||||
let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch);
|
||||
*cluster_nodes.write().unwrap() = ClusterNodes::<RetransmitStage>::new(
|
||||
cluster_info,
|
||||
@@ -478,7 +473,7 @@ pub fn retransmitter(
|
||||
let cluster_info = cluster_info.clone();
|
||||
let stats = stats.clone();
|
||||
let cluster_nodes = Arc::default();
|
||||
let last_peer_update = Arc::new(AtomicU64::new(0));
|
||||
let last_peer_update = Arc::new(AtomicInterval::default());
|
||||
let shreds_received = shreds_received.clone();
|
||||
let max_slots = max_slots.clone();
|
||||
let first_shreds_received = first_shreds_received.clone();
|
||||
@@ -671,6 +666,7 @@ mod tests {
|
||||
let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap();
|
||||
// need to make sure tvu and tpu are valid addresses
|
||||
me.tvu_forwards = me_retransmit.local_addr().unwrap();
|
||||
|
||||
let port = find_available_port_in_range(ip_addr, (8000, 10000)).unwrap();
|
||||
me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port))
|
||||
.unwrap()
|
||||
|
Reference in New Issue
Block a user