persists repair-peers cache across repair service loops (backport #18400) (#19526)

* persists repair-peers cache across repair service loops (#18400)

The repair-peers cache is reset each time repair service loop runs,
and so computed repeatedly for the same slots:
https://github.com/solana-labs/solana/blob/d2b07dca9/core/src/repair_service.rs#L275

This commit uses an LRU cache to persists repair-peers for each slot.
In addition to LRU eviction rules, in order to avoid re-using outdated
data, each entry also has 10 seconds TTL.

(cherry picked from commit a0551b4054)

# Conflicts:
#	core/src/repair_service.rs
#	core/src/serve_repair.rs

* removes backport merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-08-31 21:17:22 +00:00
committed by GitHub
parent 70bdcf06b4
commit 9a005855dc
3 changed files with 68 additions and 33 deletions

View File

@ -136,6 +136,9 @@ impl ClusterSlots {
}
pub fn compute_weights(&self, slot: Slot, repair_peers: &[ContactInfo]) -> Vec<u64> {
if repair_peers.is_empty() {
return Vec::default();
}
let stakes = {
let validator_stakes = self.validator_stakes.read().unwrap();
repair_peers

View File

@ -6,9 +6,10 @@ use crate::{
cluster_slots::ClusterSlots,
repair_weight::RepairWeight,
result::Result,
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE},
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE, REPAIR_PEERS_CACHE_CAPACITY},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use lru::LruCache;
use solana_ledger::{
blockstore::{Blockstore, SlotMeta},
shred::Nonce,
@ -181,6 +182,7 @@ impl RepairService {
let mut last_stats = Instant::now();
let duplicate_slot_repair_statuses: HashMap<Slot, DuplicateSlotRepairStatus> =
HashMap::new();
let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY);
loop {
if exit.load(Ordering::Relaxed) {
@ -259,13 +261,12 @@ impl RepairService {
)
};
let mut cache = HashMap::new();
let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed");
repairs.into_iter().for_each(|repair_request| {
if let Ok((to, req)) = serve_repair.repair_request(
&cluster_slots,
repair_request,
&mut cache,
&mut peers_cache,
&mut repair_stats,
&repair_info.repair_validators,
) {

View File

@ -8,7 +8,11 @@ use crate::{
weighted_shuffle::weighted_best,
};
use bincode::serialize;
use rand::distributions::{Distribution, WeightedIndex};
use lru::LruCache;
use rand::{
distributions::{Distribution, WeightedError, WeightedIndex},
Rng,
};
use solana_ledger::{blockstore::Blockstore, shred::Nonce};
use solana_measure::measure::Measure;
use solana_measure::thread_mem_usage;
@ -22,7 +26,7 @@ use solana_sdk::{
};
use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::HashSet,
net::SocketAddr,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
@ -34,6 +38,11 @@ use std::{
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
pub const DEFAULT_NONCE: u32 = 42;
// Number of slots to cache their respective repair peers and sampling weights.
pub(crate) const REPAIR_PEERS_CACHE_CAPACITY: usize = 128;
// Limit cache entries ttl in order to avoid re-using outdated data.
const REPAIR_PEERS_CACHE_TTL: Duration = Duration::from_secs(10);
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum RepairType {
Orphan(Slot),
@ -81,7 +90,38 @@ pub struct ServeRepair {
cluster_info: Arc<ClusterInfo>,
}
type RepairCache = HashMap<Slot, (Vec<ContactInfo>, WeightedIndex<u64>)>;
// Cache entry for repair peers for a slot.
pub(crate) struct RepairPeers {
asof: Instant,
peers: Vec<(Pubkey, /*ContactInfo.serve_repair:*/ SocketAddr)>,
weighted_index: WeightedIndex<u64>,
}
impl RepairPeers {
fn new(asof: Instant, peers: &[ContactInfo], weights: &[u64]) -> Result<Self> {
if peers.is_empty() {
return Err(Error::from(ClusterInfoError::NoPeers));
}
if peers.len() != weights.len() {
return Err(Error::from(WeightedError::InvalidWeight));
}
let weighted_index = WeightedIndex::new(weights)?;
let peers = peers
.iter()
.map(|peer| (peer.id, peer.serve_repair))
.collect();
Ok(Self {
asof,
peers,
weighted_index,
})
}
fn sample<R: Rng>(&self, rng: &mut R) -> (Pubkey, SocketAddr) {
let index = self.weighted_index.sample(rng);
self.peers[index]
}
}
impl ServeRepair {
/// Without a valid keypair gossip will not function. Only useful for tests.
@ -377,39 +417,30 @@ impl ServeRepair {
Ok(out)
}
pub fn repair_request(
pub(crate) fn repair_request(
&self,
cluster_slots: &ClusterSlots,
repair_request: RepairType,
cache: &mut RepairCache,
peers_cache: &mut LruCache<Slot, RepairPeers>,
repair_stats: &mut RepairStats,
repair_validators: &Option<HashSet<Pubkey>>,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let slot = repair_request.slot();
let (repair_peers, weighted_index) = match cache.entry(slot) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let repair_peers = self.repair_peers(&repair_validators, slot);
if repair_peers.is_empty() {
return Err(Error::from(ClusterInfoError::NoPeers));
}
let repair_peers = match peers_cache.get(&slot) {
Some(entry) if entry.asof.elapsed() < REPAIR_PEERS_CACHE_TTL => entry,
_ => {
peers_cache.pop(&slot);
let repair_peers = self.repair_peers(repair_validators, slot);
let weights = cluster_slots.compute_weights(slot, &repair_peers);
debug_assert_eq!(weights.len(), repair_peers.len());
let weighted_index = WeightedIndex::new(weights)?;
entry.insert((repair_peers, weighted_index))
let repair_peers = RepairPeers::new(Instant::now(), &repair_peers, &weights)?;
peers_cache.put(slot, repair_peers);
peers_cache.get(&slot).unwrap()
}
};
let n = weighted_index.sample(&mut rand::thread_rng());
let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port
let repair_peer_id = repair_peers[n].id;
let out = self.map_repair_request(
&repair_request,
&repair_peer_id,
repair_stats,
DEFAULT_NONCE,
)?;
let (peer, addr) = repair_peers.sample(&mut rand::thread_rng());
let out = self.map_repair_request(&repair_request, &peer, repair_stats, DEFAULT_NONCE)?;
Ok((addr, out))
}
@ -755,7 +786,7 @@ mod tests {
let rv = serve_repair.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
);
@ -782,7 +813,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
)
@ -815,7 +846,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
)
@ -991,7 +1022,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&trusted_validators,
)
@ -1007,7 +1038,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&trusted_validators,
)
@ -1027,7 +1058,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
)