v1.1 gossip lock optimizations (#10459)

* Skip gossip requests with different shred version and split lock (#10240)


(cherry picked from commit 3f508b37fd)

* More cluster stats and add epoch stakes cache in retransmit stage (#10345)

* More cluster info metrics for push request/response counts

* Cache staked peers for the epoch

(cherry picked from commit ef37b82ffa)

* Cache tvu peers for broadcast (#10373)


(cherry picked from commit 2cf719ac2c)

* Add pull request count metrics (#10421)


(cherry picked from commit 3d2230f1a9)
This commit is contained in:
sakridge
2020-06-08 17:05:55 -07:00
committed by GitHub
parent 3110def6c3
commit a705764ca7
10 changed files with 472 additions and 111 deletions

View File

@ -3,6 +3,7 @@
extern crate test;
use rand::{thread_rng, Rng};
use solana_core::broadcast_stage::broadcast_metrics::TransmitShredsStats;
use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers};
use solana_core::cluster_info::{ClusterInfo, Node};
use solana_core::contact_info::ContactInfo;
@ -48,7 +49,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
&peers_and_stakes,
&peers,
&last_datapoint,
&mut 0,
&mut TransmitShredsStats::default(),
)
.unwrap();
});

View File

@ -35,7 +35,7 @@ use std::{
};
mod broadcast_fake_shreds_run;
pub(crate) mod broadcast_metrics;
pub mod broadcast_metrics;
pub(crate) mod broadcast_utils;
mod fail_entry_verification_broadcast_run;
mod standard_broadcast_run;
@ -374,13 +374,14 @@ pub fn broadcast_shreds(
peers_and_stakes: &[(u64, usize)],
peers: &[ContactInfo],
last_datapoint_submit: &Arc<AtomicU64>,
send_mmsg_total: &mut u64,
transmit_stats: &mut TransmitShredsStats,
) -> Result<()> {
let broadcast_len = peers_and_stakes.len();
if broadcast_len == 0 {
update_peer_stats(1, 1, last_datapoint_submit);
return Ok(());
}
let mut shred_select = Measure::start("shred_select");
let packets: Vec<_> = shreds
.iter()
.map(|shred| {
@ -389,6 +390,8 @@ pub fn broadcast_shreds(
(&shred.payload, &peers[broadcast_index].tvu)
})
.collect();
shred_select.stop();
transmit_stats.shred_select += shred_select.as_us();
let mut sent = 0;
let mut send_mmsg_time = Measure::start("send_mmsg");
@ -401,7 +404,7 @@ pub fn broadcast_shreds(
}
}
send_mmsg_time.stop();
*send_mmsg_total += send_mmsg_time.as_us();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
let num_live_peers = num_live_peers(&peers);
update_peer_stats(

View File

@ -29,11 +29,12 @@ impl ProcessShredsStats {
}
#[derive(Default, Clone)]
pub(crate) struct TransmitShredsStats {
pub(crate) transmit_elapsed: u64,
pub(crate) send_mmsg_elapsed: u64,
pub(crate) get_peers_elapsed: u64,
pub(crate) num_shreds: usize,
pub struct TransmitShredsStats {
pub transmit_elapsed: u64,
pub send_mmsg_elapsed: u64,
pub get_peers_elapsed: u64,
pub shred_select: u64,
pub num_shreds: usize,
}
impl BroadcastStats for TransmitShredsStats {
@ -42,6 +43,7 @@ impl BroadcastStats for TransmitShredsStats {
self.send_mmsg_elapsed += new_stats.send_mmsg_elapsed;
self.get_peers_elapsed += new_stats.get_peers_elapsed;
self.num_shreds += new_stats.num_shreds;
self.shred_select += new_stats.shred_select;
}
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
datapoint_info!(
@ -58,6 +60,7 @@ impl BroadcastStats for TransmitShredsStats {
("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64),
("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
("num_shreds", self.num_shreds as i64, i64),
("shred_select", self.shred_select as i64, i64),
);
}
}
@ -176,15 +179,16 @@ mod test {
}
#[test]
fn test_update() {
fn test_update_broadcast() {
let start = Instant::now();
let mut slot_broadcast_stats = SlotBroadcastStats::default();
slot_broadcast_stats.update(
&TransmitShredsStats {
transmit_elapsed: 1,
get_peers_elapsed: 1,
send_mmsg_elapsed: 1,
num_shreds: 1,
get_peers_elapsed: 2,
send_mmsg_elapsed: 3,
shred_select: 4,
num_shreds: 5,
},
&Some(BroadcastShredBatchInfo {
slot: 0,
@ -198,16 +202,18 @@ mod test {
assert_eq!(slot_0_stats.num_batches, 1);
assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2);
assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 2);
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3);
assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5);
slot_broadcast_stats.update(
&TransmitShredsStats {
transmit_elapsed: 1,
get_peers_elapsed: 1,
send_mmsg_elapsed: 1,
num_shreds: 1,
transmit_elapsed: 7,
get_peers_elapsed: 8,
send_mmsg_elapsed: 9,
shred_select: 10,
num_shreds: 11,
},
&None,
);
@ -217,9 +223,10 @@ mod test {
assert_eq!(slot_0_stats.num_batches, 1);
assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2);
assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 2);
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3);
assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5);
// If another batch is given, then total number of batches == num_expected_batches == 2,
// so the batch should be purged from the HashMap
@ -228,6 +235,7 @@ mod test {
transmit_elapsed: 1,
get_peers_elapsed: 1,
send_mmsg_elapsed: 1,
shred_select: 1,
num_shreds: 1,
},
&Some(BroadcastShredBatchInfo {

View File

@ -81,14 +81,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
// Broadcast data
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
let mut send_mmsg_total = 0;
broadcast_shreds(
sock,
&shreds,
&peers_and_stakes,
&peers,
&Arc::new(AtomicU64::new(0)),
&mut send_mmsg_total,
&mut TransmitShredsStats::default(),
)?;
Ok(())

View File

@ -9,6 +9,7 @@ use solana_ledger::{
};
use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us};
use std::collections::HashMap;
use std::sync::RwLock;
use std::time::Duration;
#[derive(Clone)]
@ -23,6 +24,14 @@ pub struct StandardBroadcastRun {
shred_version: u16,
last_datapoint_submit: Arc<AtomicU64>,
num_batches: usize,
broadcast_peer_cache: Arc<RwLock<BroadcastPeerCache>>,
last_peer_update: Arc<AtomicU64>,
}
#[derive(Default)]
struct BroadcastPeerCache {
peers: Vec<ContactInfo>,
peers_and_stakes: Vec<(u64, usize)>,
}
impl StandardBroadcastRun {
@ -38,6 +47,8 @@ impl StandardBroadcastRun {
shred_version,
last_datapoint_submit: Arc::new(AtomicU64::new(0)),
num_batches: 0,
broadcast_peer_cache: Arc::new(RwLock::new(BroadcastPeerCache::default())),
last_peer_update: Arc::new(AtomicU64::new(0)),
}
}
@ -293,33 +304,46 @@ impl StandardBroadcastRun {
shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
) -> Result<()> {
const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000;
trace!("Broadcasting {:?} shreds", shreds.len());
// Get the list of peers to broadcast to
let get_peers_start = Instant::now();
let mut get_peers_time = Measure::start("broadcast::get_peers");
let now = timestamp();
let last = self.last_peer_update.load(Ordering::Relaxed);
if now - last > BROADCAST_PEER_UPDATE_INTERVAL_MS
&& self
.last_peer_update
.compare_and_swap(now, last, Ordering::Relaxed)
== last
{
let mut w_broadcast_peer_cache = self.broadcast_peer_cache.write().unwrap();
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
let get_peers_elapsed = get_peers_start.elapsed();
w_broadcast_peer_cache.peers = peers;
w_broadcast_peer_cache.peers_and_stakes = peers_and_stakes;
}
get_peers_time.stop();
let r_broadcast_peer_cache = self.broadcast_peer_cache.read().unwrap();
let mut transmit_stats = TransmitShredsStats::default();
// Broadcast the shreds
let transmit_start = Instant::now();
let mut send_mmsg_total = 0;
let mut transmit_time = Measure::start("broadcast_shreds");
broadcast_shreds(
sock,
&shreds,
&peers_and_stakes,
&peers,
&r_broadcast_peer_cache.peers_and_stakes,
&r_broadcast_peer_cache.peers,
&self.last_datapoint_submit,
&mut send_mmsg_total,
&mut transmit_stats,
)?;
let transmit_elapsed = transmit_start.elapsed();
let new_transmit_shreds_stats = TransmitShredsStats {
transmit_elapsed: duration_as_us(&transmit_elapsed),
get_peers_elapsed: duration_as_us(&get_peers_elapsed),
send_mmsg_elapsed: send_mmsg_total,
num_shreds: shreds.len(),
};
drop(r_broadcast_peer_cache);
transmit_time.stop();
transmit_stats.transmit_elapsed = transmit_time.as_us();
transmit_stats.get_peers_elapsed = get_peers_time.as_us();
transmit_stats.num_shreds = shreds.len();
// Process metrics
self.update_transmit_metrics(&new_transmit_shreds_stats, &broadcast_shred_batch_info);
self.update_transmit_metrics(&transmit_stats, &broadcast_shred_batch_info);
Ok(())
}

View File

@ -214,11 +214,15 @@ struct GossipStats {
repair_peers: Counter,
new_push_requests: Counter,
new_push_requests2: Counter,
new_push_requests_num: Counter,
process_pull_response: Counter,
process_pull_response_count: Counter,
process_pull_response_len: Counter,
process_pull_response_timeout: Counter,
process_pull_response_fail: Counter,
process_pull_response_success: Counter,
process_pull_requests: Counter,
generate_pull_responses: Counter,
process_prune: Counter,
process_push_message: Counter,
prune_received_cache: Counter,
@ -227,7 +231,14 @@ struct GossipStats {
epoch_slots_push: Counter,
push_message: Counter,
new_pull_requests: Counter,
new_pull_requests_count: Counter,
mark_pull_request: Counter,
skip_pull_response_shred_version: Counter,
skip_pull_shred_version: Counter,
skip_push_message_shred_version: Counter,
push_message_count: Counter,
push_message_value_count: Counter,
push_response_count: Counter,
}
pub struct ClusterInfo {
@ -1397,6 +1408,9 @@ impl ClusterInfo {
.collect()
};
self.append_entrypoint_to_pulls(&mut pulls);
self.stats
.new_pull_requests_count
.add_relaxed(pulls.len() as u64);
pulls
.into_iter()
.map(|(peer, filter, gossip, self_info)| {
@ -1411,7 +1425,7 @@ impl ClusterInfo {
let (_, push_messages) = self
.time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests)
.new_push_messages(timestamp());
push_messages
let messages: Vec<_> = push_messages
.into_iter()
.filter_map(|(peer, messages)| {
let peer_label = CrdsValueLabel::ContactInfo(peer);
@ -1426,7 +1440,11 @@ impl ClusterInfo {
.into_iter()
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload)))
})
.collect()
.collect();
self.stats
.new_push_requests_num
.add_relaxed(messages.len() as u64);
messages
}
fn gossip_request(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
@ -1583,12 +1601,17 @@ impl ClusterInfo {
if contact_info.id == me.id() {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
} else {
} else if contact_info.shred_version == 0
|| contact_info.shred_version == me.my_shred_version()
|| me.my_shred_version() == 0
{
gossip_pull_data.push(PullData {
from_addr,
caller,
filter,
});
} else {
me.stats.skip_pull_shred_version.add_relaxed(1);
}
}
datapoint_debug!(
@ -1677,20 +1700,8 @@ impl ClusterInfo {
}
}
// Pull requests take an incoming bloom filter of contained entries from a node
// and tries to send back to them the values it detects are missing.
fn handle_pull_requests(
me: &Self,
recycler: &PacketsRecycler,
requests: Vec<PullData>,
stakes: &HashMap<Pubkey, u64>,
) -> Option<Packets> {
// split the requests into addrs and filters
let mut caller_and_filters = vec![];
let mut addrs = vec![];
let mut time = Measure::start("handle_pull_requests");
{
let mut w_outbound_budget = me.outbound_budget.write().unwrap();
fn update_data_budget(&self, stakes: &HashMap<Pubkey, u64>) {
let mut w_outbound_budget = self.outbound_budget.write().unwrap();
let now = timestamp();
const INTERVAL_MS: u64 = 100;
@ -1708,14 +1719,32 @@ impl ClusterInfo {
w_outbound_budget.last_timestamp_ms = now;
}
}
// Pull requests take an incoming bloom filter of contained entries from a node
// and tries to send back to them the values it detects are missing.
fn handle_pull_requests(
me: &Self,
recycler: &PacketsRecycler,
requests: Vec<PullData>,
stakes: &HashMap<Pubkey, u64>,
) -> Option<Packets> {
// split the requests into addrs and filters
let mut caller_and_filters = vec![];
let mut addrs = vec![];
let mut time = Measure::start("handle_pull_requests");
me.update_data_budget(stakes);
for pull_data in requests {
caller_and_filters.push((pull_data.caller, pull_data.filter));
addrs.push(pull_data.from_addr);
}
let now = timestamp();
let self_id = me.id();
let pull_responses = me
.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
.time_gossip_read_lock("generate_pull_responses", &me.stats.generate_pull_responses)
.generate_pull_responses(&caller_and_filters);
me.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
.process_pull_requests(caller_and_filters, now);
// Filter bad to addresses
@ -1812,37 +1841,94 @@ impl ClusterInfo {
Some(packets)
}
// Returns (failed, timeout, success)
fn handle_pull_response(
me: &Self,
from: &Pubkey,
data: Vec<CrdsValue>,
mut crds_values: Vec<CrdsValue>,
timeouts: &HashMap<Pubkey, u64>,
) {
let len = data.len();
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", me.id, from, len);
let (_fail, timeout_count) = me
if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) {
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
me.my_shred_version(),
);
}
let filtered_len = crds_values.len();
let (fail, timeout_count, success) = me
.time_gossip_write_lock("process_pull", &me.stats.process_pull_response)
.process_pull_response(from, timeouts, data, timestamp());
.process_pull_response(from, timeouts, crds_values, timestamp());
me.stats
.skip_pull_response_shred_version
.add_relaxed((len - filtered_len) as u64);
me.stats.process_pull_response_count.add_relaxed(1);
me.stats.process_pull_response_len.add_relaxed(len as u64);
me.stats
.process_pull_response_len
.add_relaxed(filtered_len as u64);
me.stats
.process_pull_response_timeout
.add_relaxed(timeout_count as u64);
me.stats.process_pull_response_fail.add_relaxed(fail as u64);
me.stats
.process_pull_response_success
.add_relaxed(success as u64);
(fail, timeout_count, success)
}
fn filter_by_shred_version(
from: &Pubkey,
crds_values: &mut Vec<CrdsValue>,
shred_version: u16,
my_shred_version: u16,
) {
if my_shred_version != 0 && shred_version != 0 && shred_version != my_shred_version {
// Allow someone to update their own ContactInfo so they
// can change shred versions if needed.
crds_values.retain(|crds_value| match &crds_value.data {
CrdsData::ContactInfo(contact_info) => contact_info.id == *from,
_ => false,
});
}
}
fn handle_push_message(
me: &Self,
recycler: &PacketsRecycler,
from: &Pubkey,
data: Vec<CrdsValue>,
mut crds_values: Vec<CrdsValue>,
stakes: &HashMap<Pubkey, u64>,
) -> Option<Packets> {
let self_id = me.id();
inc_new_counter_debug!("cluster_info-push_message", 1);
me.stats.push_message_count.add_relaxed(1);
let len = crds_values.len();
if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) {
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
me.my_shred_version(),
);
}
let filtered_len = crds_values.len();
me.stats
.push_message_value_count
.add_relaxed(filtered_len as u64);
me.stats
.skip_push_message_shred_version
.add_relaxed((len - filtered_len) as u64);
let updated: Vec<_> = me
.time_gossip_write_lock("process_push", &me.stats.process_push_message)
.process_push_message(from, data, timestamp());
.process_push_message(from, crds_values, timestamp());
let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect();
let prunes_map: HashMap<Pubkey, HashSet<Pubkey>> = me
@ -1872,6 +1958,9 @@ impl ClusterInfo {
return None;
}
let mut packets = to_packets_with_destination(recycler.clone(), &rsp);
me.stats
.push_response_count
.add_relaxed(packets.packets.len() as u64);
if !packets.is_empty() {
let pushes: Vec<_> = me.new_push_requests();
inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
@ -1963,6 +2052,11 @@ impl ClusterInfo {
),
("all_tvu_peers", self.stats.all_tvu_peers.clear(), i64),
("tvu_peers", self.stats.tvu_peers.clear(), i64),
(
"new_push_requests_num",
self.stats.new_push_requests2.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats2",
@ -1989,6 +2083,26 @@ impl ClusterInfo {
self.stats.process_pull_response_count.clear(),
i64
),
(
"process_pull_resp_success",
self.stats.process_pull_response_success.clear(),
i64
),
(
"process_pull_resp_timeout",
self.stats.process_pull_response_timeout.clear(),
i64
),
(
"process_pull_resp_fail",
self.stats.process_pull_response_fail.clear(),
i64
),
(
"push_response_count",
self.stats.push_response_count.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats3",
@ -2002,6 +2116,11 @@ impl ClusterInfo {
self.stats.process_pull_requests.clear(),
i64
),
(
"generate_pull_responses",
self.stats.generate_pull_responses.clear(),
i64
),
("process_prune", self.stats.process_prune.clear(), i64),
(
"process_push_message",
@ -2031,6 +2150,39 @@ impl ClusterInfo {
i64
),
);
datapoint_info!(
"cluster_info_stats4",
(
"skip_push_message_shred_version",
self.stats.skip_push_message_shred_version.clear(),
i64
),
(
"skip_pull_response_shred_version",
self.stats.skip_pull_response_shred_version.clear(),
i64
),
(
"skip_pull_shred_version",
self.stats.skip_pull_shred_version.clear(),
i64
),
(
"push_message_count",
self.stats.push_message_count.clear(),
i64
),
(
"push_message_value_count",
self.stats.push_message_value_count.clear(),
i64
),
(
"new_pull_requests_count",
self.stats.new_pull_requests_count.clear(),
i64
),
);
*last_print = Instant::now();
}
@ -2412,6 +2564,91 @@ mod tests {
assert!(ClusterInfo::is_spy_node(&node));
}
#[test]
fn test_handle_pull() {
let node = Node::new_localhost();
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
let entrypoint_pubkey = Pubkey::new_rand();
let data = test_crds_values(entrypoint_pubkey);
let timeouts = HashMap::new();
assert_eq!(
(0, 0, 1),
ClusterInfo::handle_pull_response(
&cluster_info,
&entrypoint_pubkey,
data.clone(),
&timeouts
)
);
let entrypoint_pubkey2 = Pubkey::new_rand();
assert_eq!(
(1, 0, 0),
ClusterInfo::handle_pull_response(&cluster_info, &entrypoint_pubkey2, data, &timeouts)
);
}
fn test_crds_values(pubkey: Pubkey) -> Vec<CrdsValue> {
let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp());
let entrypoint_crdsvalue =
CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone()));
vec![entrypoint_crdsvalue]
}
#[test]
fn test_filter_shred_version() {
let from = Pubkey::new_rand();
let my_shred_version = 1;
let other_shred_version = 1;
// Allow same shred_version
let mut values = test_crds_values(from);
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
// Allow shred_version=0.
let other_shred_version = 0;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
let snapshot_hash_data = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash {
from: Pubkey::new_rand(),
hashes: vec![],
wallclock: 0,
}));
values.push(snapshot_hash_data);
// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
}
#[test]
fn test_cluster_spy_gossip() {
//check that gossip doesn't try to push to invalid addresses

View File

@ -158,14 +158,18 @@ impl CrdsGossip {
self.pull.mark_pull_request_creation_time(from, now)
}
/// process a pull request and create a response
pub fn process_pull_requests(
&mut self,
filters: Vec<(CrdsValue, CrdsFilter)>,
now: u64,
) -> Vec<Vec<CrdsValue>> {
pub fn process_pull_requests(&mut self, filters: Vec<(CrdsValue, CrdsFilter)>, now: u64) {
self.pull
.process_pull_requests(&mut self.crds, filters, now)
.process_pull_requests(&mut self.crds, filters, now);
}
pub fn generate_pull_responses(
&self,
filters: &[(CrdsValue, CrdsFilter)],
) -> Vec<Vec<CrdsValue>> {
self.pull.generate_pull_responses(&self.crds, filters)
}
/// process a pull response
pub fn process_pull_response(
&mut self,
@ -173,7 +177,7 @@ impl CrdsGossip {
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
) -> (usize, usize) {
) -> (usize, usize, usize) {
self.pull
.process_pull_response(&mut self.crds, from, timeouts, response, now)
}

View File

@ -204,14 +204,13 @@ impl CrdsGossipPull {
self.purged_values.push_back((hash, timestamp))
}
/// process a pull request and create a response
/// process a pull request
pub fn process_pull_requests(
&mut self,
crds: &mut Crds,
requests: Vec<(CrdsValue, CrdsFilter)>,
now: u64,
) -> Vec<Vec<CrdsValue>> {
let rv = self.filter_crds_values(crds, &requests);
) {
requests.into_iter().for_each(|(caller, _)| {
let key = caller.label().pubkey();
let old = crds.insert(caller, now);
@ -221,8 +220,17 @@ impl CrdsGossipPull {
}
crds.update_record_timestamp(&key, now);
});
rv
}
/// Create gossip responses to pull requests
pub fn generate_pull_responses(
&self,
crds: &Crds,
requests: &[(CrdsValue, CrdsFilter)],
) -> Vec<Vec<CrdsValue>> {
self.filter_crds_values(crds, requests)
}
/// process a pull response
pub fn process_pull_response(
&mut self,
@ -231,9 +239,10 @@ impl CrdsGossipPull {
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
) -> (usize, usize) {
) -> (usize, usize, usize) {
let mut failed = 0;
let mut timeout_count = 0;
let mut success = 0;
for r in response {
let owner = r.label().pubkey();
// Check if the crds value is older than the msg_timeout
@ -274,7 +283,11 @@ impl CrdsGossipPull {
}
}
let old = crds.insert(r, now);
failed += old.is_err() as usize;
if old.is_err() {
failed += 1;
} else {
success += 1;
}
old.ok().map(|opt| {
crds.update_record_timestamp(&owner, now);
opt.map(|val| {
@ -284,7 +297,7 @@ impl CrdsGossipPull {
});
}
crds.update_record_timestamp(from, now);
(failed, timeout_count)
(failed, timeout_count, success)
}
// build a set of filters of the current crds table
// num_filters - used to increase the likelyhood of a value in crds being added to some filter
@ -573,8 +586,9 @@ mod test {
let mut dest_crds = Crds::default();
let mut dest = CrdsGossipPull::default();
let (_, filters, caller) = req.unwrap();
let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.process_pull_requests(&mut dest_crds, filters, 1);
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses(&dest_crds, &filters);
dest.process_pull_requests(&mut dest_crds, filters, 1);
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
assert!(dest_crds.lookup(&caller.label()).is_some());
assert_eq!(
@ -643,8 +657,9 @@ mod test {
PACKET_DATA_SIZE,
);
let (_, filters, caller) = req.unwrap();
let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let mut rsp = dest.process_pull_requests(&mut dest_crds, filters, 0);
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters);
dest.process_pull_requests(&mut dest_crds, filters, 0);
// if there is a false positive this is empty
// prob should be around 0.1 per iteration
if rsp.is_empty() {

View File

@ -3,6 +3,7 @@
use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
cluster_slots::ClusterSlots,
contact_info::ContactInfo,
repair_service::RepairStrategy,
result::{Error, Result},
window_service::{should_retransmit_and_persist, WindowService},
@ -17,8 +18,9 @@ use solana_ledger::{
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_error;
use solana_perf::packet::Packets;
use solana_sdk::clock::Slot;
use solana_sdk::clock::{Epoch, Slot};
use solana_sdk::epoch_schedule::EpochSchedule;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use solana_streamer::streamer::PacketReceiver;
use std::{
@ -43,6 +45,8 @@ struct RetransmitStats {
total_packets: AtomicU64,
total_batches: AtomicU64,
total_time: AtomicU64,
epoch_fetch: AtomicU64,
epoch_cache_update: AtomicU64,
repair_total: AtomicU64,
discard_total: AtomicU64,
retransmit_total: AtomicU64,
@ -64,6 +68,8 @@ fn update_retransmit_stats(
peers_len: usize,
packets_by_slot: HashMap<Slot, usize>,
packets_by_source: HashMap<String, usize>,
epoch_fetch: u64,
epoch_cach_update: u64,
) {
stats.total_time.fetch_add(total_time, Ordering::Relaxed);
stats
@ -82,6 +88,10 @@ fn update_retransmit_stats(
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
stats.total_batches.fetch_add(1, Ordering::Relaxed);
stats.epoch_fetch.fetch_add(epoch_fetch, Ordering::Relaxed);
stats
.epoch_cache_update
.fetch_add(epoch_cach_update, Ordering::Relaxed);
{
let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap();
for (slot, count) in packets_by_slot {
@ -106,6 +116,16 @@ fn update_retransmit_stats(
stats.total_time.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"epoch_fetch",
stats.epoch_fetch.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"epoch_cache_update",
stats.epoch_cache_update.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_batches",
stats.total_batches.swap(0, Ordering::Relaxed) as i64,
@ -147,6 +167,14 @@ fn update_retransmit_stats(
}
}
#[derive(Default)]
struct EpochStakesCache {
epoch: Epoch,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
peers: Vec<ContactInfo>,
stakes_and_index: Vec<(u64, usize)>,
}
fn retransmit(
bank_forks: &Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
@ -155,6 +183,8 @@ fn retransmit(
sock: &UdpSocket,
id: u32,
stats: &Arc<RetransmitStats>,
epoch_stakes_cache: &Arc<RwLock<EpochStakesCache>>,
last_peer_update: &Arc<AtomicU64>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let r_lock = r.lock().unwrap();
@ -171,12 +201,42 @@ fn retransmit(
}
drop(r_lock);
let mut epoch_fetch = Measure::start("retransmit_epoch_fetch");
let r_bank = bank_forks.read().unwrap().working_bank();
let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot());
let mut peers_len = 0;
epoch_fetch.stop();
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
let mut r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
if r_epoch_stakes_cache.epoch != bank_epoch {
drop(r_epoch_stakes_cache);
let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
if w_epoch_stakes_cache.epoch != bank_epoch {
let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch);
let stakes = stakes.map(Arc::new);
let (peers, stakes_and_index) = cluster_info.sorted_retransmit_peers_and_stakes(stakes);
w_epoch_stakes_cache.stakes = stakes;
w_epoch_stakes_cache.epoch = bank_epoch;
}
drop(w_epoch_stakes_cache);
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
}
let now = timestamp();
let last = last_peer_update.load(Ordering::Relaxed);
if now - last > 1000 && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last
{
drop(r_epoch_stakes_cache);
let mut w_epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
let (peers, stakes_and_index) =
cluster_info.sorted_retransmit_peers_and_stakes(w_epoch_stakes_cache.stakes.clone());
w_epoch_stakes_cache.peers = peers;
w_epoch_stakes_cache.stakes_and_index = stakes_and_index;
drop(w_epoch_stakes_cache);
r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
}
let mut peers_len = 0;
epoch_cache_update.stop();
let my_id = cluster_info.id();
let mut discard_total = 0;
let mut repair_total = 0;
@ -201,8 +261,8 @@ fn retransmit(
let mut compute_turbine_peers = Measure::start("turbine_start");
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id,
&peers,
&stakes_and_index,
&r_epoch_stakes_cache.peers,
&r_epoch_stakes_cache.stakes_and_index,
packet.meta.seed,
);
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
@ -215,8 +275,14 @@ fn retransmit(
let (neighbors, children) =
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect();
let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect();
let neighbors: Vec<_> = neighbors
.into_iter()
.map(|index| &r_epoch_stakes_cache.peers[index])
.collect();
let children: Vec<_> = children
.into_iter()
.map(|index| &r_epoch_stakes_cache.peers[index])
.collect();
compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us();
@ -257,6 +323,8 @@ fn retransmit(
peers_len,
packets_by_slot,
packets_by_source,
epoch_fetch.as_us(),
epoch_cache_update.as_us(),
);
Ok(())
@ -286,6 +354,8 @@ pub fn retransmitter(
let r = r.clone();
let cluster_info = cluster_info.clone();
let stats = stats.clone();
let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default()));
let last_peer_update = Arc::new(AtomicU64::new(0));
Builder::new()
.name("solana-retransmitter".to_string())
@ -300,6 +370,8 @@ pub fn retransmitter(
&sockets[s],
s as u32,
&stats,
&epoch_stakes_cache,
&last_peer_update,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,

View File

@ -426,23 +426,21 @@ fn network_run_pull(
.map(|f| f.filter.bits.len() as usize / 8)
.sum::<usize>();
bytes += serialized_size(&caller_info).unwrap() as usize;
let filters = filters
let filters: Vec<_> = filters
.into_iter()
.map(|f| (caller_info.clone(), f))
.collect();
let rsp = network
let rsp: Vec<_> = network
.get(&to)
.map(|node| {
let mut rsp = vec![];
rsp.append(
&mut node
let rsp = node
.lock()
.unwrap()
.process_pull_requests(filters, now)
.generate_pull_responses(&filters)
.into_iter()
.flatten()
.collect(),
);
.collect();
node.lock().unwrap().process_pull_requests(filters, now);
rsp
})
.unwrap();