automerge
This commit is contained in:
@ -214,6 +214,7 @@ struct GossipStats {
|
|||||||
repair_peers: Counter,
|
repair_peers: Counter,
|
||||||
new_push_requests: Counter,
|
new_push_requests: Counter,
|
||||||
new_push_requests2: Counter,
|
new_push_requests2: Counter,
|
||||||
|
new_push_requests_num: Counter,
|
||||||
process_pull_response: Counter,
|
process_pull_response: Counter,
|
||||||
process_pull_response_count: Counter,
|
process_pull_response_count: Counter,
|
||||||
process_pull_response_len: Counter,
|
process_pull_response_len: Counter,
|
||||||
@ -236,6 +237,7 @@ struct GossipStats {
|
|||||||
skip_push_message_shred_version: Counter,
|
skip_push_message_shred_version: Counter,
|
||||||
push_message_count: Counter,
|
push_message_count: Counter,
|
||||||
push_message_value_count: Counter,
|
push_message_value_count: Counter,
|
||||||
|
push_response_count: Counter,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ClusterInfo {
|
pub struct ClusterInfo {
|
||||||
@ -1362,7 +1364,7 @@ impl ClusterInfo {
|
|||||||
let (_, push_messages) = self
|
let (_, push_messages) = self
|
||||||
.time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests)
|
.time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests)
|
||||||
.new_push_messages(timestamp());
|
.new_push_messages(timestamp());
|
||||||
push_messages
|
let messages: Vec<_> = push_messages
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|(peer, messages)| {
|
.filter_map(|(peer, messages)| {
|
||||||
let peer_label = CrdsValueLabel::ContactInfo(peer);
|
let peer_label = CrdsValueLabel::ContactInfo(peer);
|
||||||
@ -1377,7 +1379,11 @@ impl ClusterInfo {
|
|||||||
.into_iter()
|
.into_iter()
|
||||||
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload)))
|
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload)))
|
||||||
})
|
})
|
||||||
.collect()
|
.collect();
|
||||||
|
self.stats
|
||||||
|
.new_push_requests_num
|
||||||
|
.add_relaxed(messages.len() as u64);
|
||||||
|
messages
|
||||||
}
|
}
|
||||||
|
|
||||||
fn gossip_request(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
|
fn gossip_request(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
|
||||||
@ -1891,6 +1897,9 @@ impl ClusterInfo {
|
|||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let mut packets = to_packets_with_destination(recycler.clone(), &rsp);
|
let mut packets = to_packets_with_destination(recycler.clone(), &rsp);
|
||||||
|
me.stats
|
||||||
|
.push_response_count
|
||||||
|
.add_relaxed(packets.packets.len() as u64);
|
||||||
if !packets.is_empty() {
|
if !packets.is_empty() {
|
||||||
let pushes: Vec<_> = me.new_push_requests();
|
let pushes: Vec<_> = me.new_push_requests();
|
||||||
inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
|
inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
|
||||||
@ -1982,6 +1991,11 @@ impl ClusterInfo {
|
|||||||
),
|
),
|
||||||
("all_tvu_peers", self.stats.all_tvu_peers.clear(), i64),
|
("all_tvu_peers", self.stats.all_tvu_peers.clear(), i64),
|
||||||
("tvu_peers", self.stats.tvu_peers.clear(), i64),
|
("tvu_peers", self.stats.tvu_peers.clear(), i64),
|
||||||
|
(
|
||||||
|
"new_push_requests_num",
|
||||||
|
self.stats.new_push_requests2.clear(),
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
datapoint_info!(
|
datapoint_info!(
|
||||||
"cluster_info_stats2",
|
"cluster_info_stats2",
|
||||||
@ -2008,6 +2022,26 @@ impl ClusterInfo {
|
|||||||
self.stats.process_pull_response_count.clear(),
|
self.stats.process_pull_response_count.clear(),
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"process_pull_resp_success",
|
||||||
|
self.stats.process_pull_response_success.clear(),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"process_pull_resp_timeout",
|
||||||
|
self.stats.process_pull_response_timeout.clear(),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"process_pull_resp_fail",
|
||||||
|
self.stats.process_pull_response_fail.clear(),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"push_response_count",
|
||||||
|
self.stats.push_response_count.clear(),
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
datapoint_info!(
|
datapoint_info!(
|
||||||
"cluster_info_stats3",
|
"cluster_info_stats3",
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
|
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
|
||||||
cluster_slots::ClusterSlots,
|
cluster_slots::ClusterSlots,
|
||||||
|
contact_info::ContactInfo,
|
||||||
repair_service::DuplicateSlotsResetSender,
|
repair_service::DuplicateSlotsResetSender,
|
||||||
repair_service::RepairInfo,
|
repair_service::RepairInfo,
|
||||||
result::{Error, Result},
|
result::{Error, Result},
|
||||||
@ -18,8 +19,9 @@ use solana_ledger::{
|
|||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
use solana_metrics::inc_new_counter_error;
|
use solana_metrics::inc_new_counter_error;
|
||||||
use solana_perf::packet::Packets;
|
use solana_perf::packet::Packets;
|
||||||
use solana_sdk::clock::Slot;
|
use solana_sdk::clock::{Epoch, Slot};
|
||||||
use solana_sdk::epoch_schedule::EpochSchedule;
|
use solana_sdk::epoch_schedule::EpochSchedule;
|
||||||
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
use solana_streamer::streamer::PacketReceiver;
|
use solana_streamer::streamer::PacketReceiver;
|
||||||
use std::{
|
use std::{
|
||||||
@ -44,6 +46,8 @@ struct RetransmitStats {
|
|||||||
total_packets: AtomicU64,
|
total_packets: AtomicU64,
|
||||||
total_batches: AtomicU64,
|
total_batches: AtomicU64,
|
||||||
total_time: AtomicU64,
|
total_time: AtomicU64,
|
||||||
|
epoch_fetch: AtomicU64,
|
||||||
|
epoch_cache_update: AtomicU64,
|
||||||
repair_total: AtomicU64,
|
repair_total: AtomicU64,
|
||||||
discard_total: AtomicU64,
|
discard_total: AtomicU64,
|
||||||
retransmit_total: AtomicU64,
|
retransmit_total: AtomicU64,
|
||||||
@ -65,6 +69,8 @@ fn update_retransmit_stats(
|
|||||||
peers_len: usize,
|
peers_len: usize,
|
||||||
packets_by_slot: HashMap<Slot, usize>,
|
packets_by_slot: HashMap<Slot, usize>,
|
||||||
packets_by_source: HashMap<String, usize>,
|
packets_by_source: HashMap<String, usize>,
|
||||||
|
epoch_fetch: u64,
|
||||||
|
epoch_cach_update: u64,
|
||||||
) {
|
) {
|
||||||
stats.total_time.fetch_add(total_time, Ordering::Relaxed);
|
stats.total_time.fetch_add(total_time, Ordering::Relaxed);
|
||||||
stats
|
stats
|
||||||
@ -83,6 +89,10 @@ fn update_retransmit_stats(
|
|||||||
.compute_turbine_peers_total
|
.compute_turbine_peers_total
|
||||||
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
|
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
|
||||||
stats.total_batches.fetch_add(1, Ordering::Relaxed);
|
stats.total_batches.fetch_add(1, Ordering::Relaxed);
|
||||||
|
stats.epoch_fetch.fetch_add(epoch_fetch, Ordering::Relaxed);
|
||||||
|
stats
|
||||||
|
.epoch_cache_update
|
||||||
|
.fetch_add(epoch_cach_update, Ordering::Relaxed);
|
||||||
{
|
{
|
||||||
let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap();
|
let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap();
|
||||||
for (slot, count) in packets_by_slot {
|
for (slot, count) in packets_by_slot {
|
||||||
@ -107,6 +117,16 @@ fn update_retransmit_stats(
|
|||||||
stats.total_time.swap(0, Ordering::Relaxed) as i64,
|
stats.total_time.swap(0, Ordering::Relaxed) as i64,
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"epoch_fetch",
|
||||||
|
stats.epoch_fetch.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"epoch_cache_update",
|
||||||
|
stats.epoch_cache_update.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"total_batches",
|
"total_batches",
|
||||||
stats.total_batches.swap(0, Ordering::Relaxed) as i64,
|
stats.total_batches.swap(0, Ordering::Relaxed) as i64,
|
||||||
@ -148,6 +168,14 @@ fn update_retransmit_stats(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct EpochStakesCache {
|
||||||
|
epoch: Epoch,
|
||||||
|
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
|
||||||
|
peers: Vec<ContactInfo>,
|
||||||
|
stakes_and_index: Vec<(u64, usize)>,
|
||||||
|
}
|
||||||
|
|
||||||
fn retransmit(
|
fn retransmit(
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||||
@ -156,6 +184,8 @@ fn retransmit(
|
|||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
id: u32,
|
id: u32,
|
||||||
stats: &Arc<RetransmitStats>,
|
stats: &Arc<RetransmitStats>,
|
||||||
|
epoch_stakes_cache: &Arc<RwLock<EpochStakesCache>>,
|
||||||
|
last_peer_update: &Arc<AtomicU64>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let r_lock = r.lock().unwrap();
|
let r_lock = r.lock().unwrap();
|
||||||
@ -172,12 +202,42 @@ fn retransmit(
|
|||||||
}
|
}
|
||||||
drop(r_lock);
|
drop(r_lock);
|
||||||
|
|
||||||
|
let mut epoch_fetch = Measure::start("retransmit_epoch_fetch");
|
||||||
let r_bank = bank_forks.read().unwrap().working_bank();
|
let r_bank = bank_forks.read().unwrap().working_bank();
|
||||||
let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot());
|
let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot());
|
||||||
let mut peers_len = 0;
|
epoch_fetch.stop();
|
||||||
|
|
||||||
|
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
|
||||||
|
let mut r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
|
||||||
|
if r_epoch_stakes_cache.epoch != bank_epoch {
|
||||||
|
drop(r_epoch_stakes_cache);
|
||||||
|
let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
|
||||||
|
if w_epoch_stakes_cache.epoch != bank_epoch {
|
||||||
let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch);
|
let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch);
|
||||||
let stakes = stakes.map(Arc::new);
|
let stakes = stakes.map(Arc::new);
|
||||||
let (peers, stakes_and_index) = cluster_info.sorted_retransmit_peers_and_stakes(stakes);
|
w_epoch_stakes_cache.stakes = stakes;
|
||||||
|
w_epoch_stakes_cache.epoch = bank_epoch;
|
||||||
|
}
|
||||||
|
drop(w_epoch_stakes_cache);
|
||||||
|
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = timestamp();
|
||||||
|
let last = last_peer_update.load(Ordering::Relaxed);
|
||||||
|
if now - last > 1000 && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last
|
||||||
|
{
|
||||||
|
drop(r_epoch_stakes_cache);
|
||||||
|
let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
|
||||||
|
let (peers, stakes_and_index) =
|
||||||
|
cluster_info.sorted_retransmit_peers_and_stakes(w_epoch_stakes_cache.stakes.clone());
|
||||||
|
w_epoch_stakes_cache.peers = peers;
|
||||||
|
w_epoch_stakes_cache.stakes_and_index = stakes_and_index;
|
||||||
|
drop(w_epoch_stakes_cache);
|
||||||
|
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
|
||||||
|
}
|
||||||
|
let mut peers_len = 0;
|
||||||
|
epoch_cache_update.stop();
|
||||||
|
|
||||||
let my_id = cluster_info.id();
|
let my_id = cluster_info.id();
|
||||||
let mut discard_total = 0;
|
let mut discard_total = 0;
|
||||||
let mut repair_total = 0;
|
let mut repair_total = 0;
|
||||||
@ -202,8 +262,8 @@ fn retransmit(
|
|||||||
let mut compute_turbine_peers = Measure::start("turbine_start");
|
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||||
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
||||||
&my_id,
|
&my_id,
|
||||||
&peers,
|
&r_epoch_stakes_cache.peers,
|
||||||
&stakes_and_index,
|
&r_epoch_stakes_cache.stakes_and_index,
|
||||||
packet.meta.seed,
|
packet.meta.seed,
|
||||||
);
|
);
|
||||||
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
|
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
|
||||||
@ -216,8 +276,14 @@ fn retransmit(
|
|||||||
|
|
||||||
let (neighbors, children) =
|
let (neighbors, children) =
|
||||||
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
|
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
|
||||||
let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect();
|
let neighbors: Vec<_> = neighbors
|
||||||
let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect();
|
.into_iter()
|
||||||
|
.map(|index| &r_epoch_stakes_cache.peers[index])
|
||||||
|
.collect();
|
||||||
|
let children: Vec<_> = children
|
||||||
|
.into_iter()
|
||||||
|
.map(|index| &r_epoch_stakes_cache.peers[index])
|
||||||
|
.collect();
|
||||||
compute_turbine_peers.stop();
|
compute_turbine_peers.stop();
|
||||||
compute_turbine_peers_total += compute_turbine_peers.as_us();
|
compute_turbine_peers_total += compute_turbine_peers.as_us();
|
||||||
|
|
||||||
@ -258,6 +324,8 @@ fn retransmit(
|
|||||||
peers_len,
|
peers_len,
|
||||||
packets_by_slot,
|
packets_by_slot,
|
||||||
packets_by_source,
|
packets_by_source,
|
||||||
|
epoch_fetch.as_us(),
|
||||||
|
epoch_cache_update.as_us(),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -287,6 +355,8 @@ pub fn retransmitter(
|
|||||||
let r = r.clone();
|
let r = r.clone();
|
||||||
let cluster_info = cluster_info.clone();
|
let cluster_info = cluster_info.clone();
|
||||||
let stats = stats.clone();
|
let stats = stats.clone();
|
||||||
|
let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default()));
|
||||||
|
let last_peer_update = Arc::new(AtomicU64::new(0));
|
||||||
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-retransmitter".to_string())
|
.name("solana-retransmitter".to_string())
|
||||||
@ -301,6 +371,8 @@ pub fn retransmitter(
|
|||||||
&sockets[s],
|
&sockets[s],
|
||||||
s as u32,
|
s as u32,
|
||||||
&stats,
|
&stats,
|
||||||
|
&epoch_stakes_cache,
|
||||||
|
&last_peer_update,
|
||||||
) {
|
) {
|
||||||
match e {
|
match e {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
|
Reference in New Issue
Block a user