@ -14,7 +14,7 @@ use solana_core::{
|
||||
gossip_service::GossipService,
|
||||
packet::{limited_deserialize, PACKET_DATA_SIZE},
|
||||
repair_service,
|
||||
repair_service::{RepairService, RepairSlotRange, RepairStrategy},
|
||||
repair_service::{RepairService, RepairSlotRange, RepairStats, RepairStrategy},
|
||||
serve_repair::ServeRepair,
|
||||
shred_fetch_stage::ShredFetchStage,
|
||||
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
|
||||
@ -839,13 +839,14 @@ impl Archiver {
|
||||
repair_service::MAX_REPAIR_LENGTH,
|
||||
&repair_slot_range,
|
||||
);
|
||||
let mut repair_stats = RepairStats::default();
|
||||
//iter over the repairs and send them
|
||||
if let Ok(repairs) = repairs {
|
||||
let reqs: Vec<_> = repairs
|
||||
.into_iter()
|
||||
.filter_map(|repair_request| {
|
||||
serve_repair
|
||||
.map_repair_request(&repair_request)
|
||||
.map_repair_request(&repair_request, &mut repair_stats)
|
||||
.map(|result| ((archiver_info.gossip, result), repair_request))
|
||||
.ok()
|
||||
})
|
||||
|
@ -20,9 +20,31 @@ use std::{
|
||||
sync::{Arc, RwLock},
|
||||
thread::sleep,
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct RepairStatsGroup {
|
||||
pub count: u64,
|
||||
pub min: u64,
|
||||
pub max: u64,
|
||||
}
|
||||
|
||||
impl RepairStatsGroup {
|
||||
pub fn update(&mut self, slot: u64) {
|
||||
self.count += 1;
|
||||
self.min = std::cmp::min(self.min, slot);
|
||||
self.max = std::cmp::max(self.max, slot);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct RepairStats {
|
||||
pub shred: RepairStatsGroup,
|
||||
pub highest_shred: RepairStatsGroup,
|
||||
pub orphan: RepairStatsGroup,
|
||||
}
|
||||
|
||||
pub const MAX_REPAIR_LENGTH: usize = 512;
|
||||
pub const REPAIR_MS: u64 = 100;
|
||||
pub const MAX_ORPHANS: usize = 5;
|
||||
@ -107,6 +129,8 @@ impl RepairService {
|
||||
cluster_info,
|
||||
);
|
||||
}
|
||||
let mut repair_stats = RepairStats::default();
|
||||
let mut last_stats = Instant::now();
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
@ -148,7 +172,7 @@ impl RepairService {
|
||||
.into_iter()
|
||||
.filter_map(|repair_request| {
|
||||
serve_repair
|
||||
.repair_request(&repair_request)
|
||||
.repair_request(&repair_request, &mut repair_stats)
|
||||
.map(|result| (result, repair_request))
|
||||
.ok()
|
||||
})
|
||||
@ -161,6 +185,24 @@ impl RepairService {
|
||||
});
|
||||
}
|
||||
}
|
||||
if last_stats.elapsed().as_secs() > 1 {
|
||||
let repair_total = repair_stats.shred.count
|
||||
+ repair_stats.highest_shred.count
|
||||
+ repair_stats.orphan.count;
|
||||
if repair_total > 0 {
|
||||
datapoint_info!(
|
||||
"serve_repair-repair",
|
||||
("repair-total", repair_total, i64),
|
||||
("shred-count", repair_stats.shred.count, i64),
|
||||
("highest-shred-count", repair_stats.highest_shred.count, i64),
|
||||
("orphan-count", repair_stats.orphan.count, i64),
|
||||
("repair-highest-slot", repair_stats.highest_shred.max, i64),
|
||||
("repair-orphan", repair_stats.orphan.max, i64),
|
||||
);
|
||||
}
|
||||
repair_stats = RepairStats::default();
|
||||
last_stats = Instant::now();
|
||||
}
|
||||
sleep(Duration::from_millis(REPAIR_MS));
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ use crate::{
|
||||
cluster_info::{ClusterInfo, ClusterInfoError},
|
||||
contact_info::ContactInfo,
|
||||
packet::Packet,
|
||||
repair_service::RepairStats,
|
||||
result::{Error, Result},
|
||||
};
|
||||
use bincode::serialize;
|
||||
@ -46,6 +47,16 @@ impl RepairType {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ServeRepairStats {
|
||||
pub total_packets: usize,
|
||||
pub processed: usize,
|
||||
pub self_repair: usize,
|
||||
pub window_index: usize,
|
||||
pub highest_window_index: usize,
|
||||
pub orphan: usize,
|
||||
}
|
||||
|
||||
/// Window protocol messages
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
enum RepairProtocol {
|
||||
@ -104,6 +115,7 @@ impl ServeRepair {
|
||||
from_addr: &SocketAddr,
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
request: RepairProtocol,
|
||||
stats: &mut ServeRepairStats,
|
||||
) -> Option<Packets> {
|
||||
let now = Instant::now();
|
||||
|
||||
@ -111,18 +123,14 @@ impl ServeRepair {
|
||||
let my_id = me.read().unwrap().keypair.pubkey();
|
||||
let from = Self::get_repair_sender(&request);
|
||||
if from.id == my_id {
|
||||
warn!(
|
||||
"{}: Ignored received repair request from ME {}",
|
||||
my_id, from.id,
|
||||
);
|
||||
inc_new_counter_debug!("serve_repair-handle-repair--eq", 1);
|
||||
stats.self_repair += 1;
|
||||
return None;
|
||||
}
|
||||
|
||||
let (res, label) = {
|
||||
match &request {
|
||||
RepairProtocol::WindowIndex(from, slot, shred_index) => {
|
||||
inc_new_counter_debug!("serve_repair-request-window-index", 1);
|
||||
stats.window_index += 1;
|
||||
(
|
||||
Self::run_window_request(
|
||||
recycler,
|
||||
@ -138,7 +146,7 @@ impl ServeRepair {
|
||||
}
|
||||
|
||||
RepairProtocol::HighestWindowIndex(_, slot, highest_index) => {
|
||||
inc_new_counter_debug!("serve_repair-request-highest-window-index", 1);
|
||||
stats.highest_window_index += 1;
|
||||
(
|
||||
Self::run_highest_window_request(
|
||||
recycler,
|
||||
@ -151,7 +159,7 @@ impl ServeRepair {
|
||||
)
|
||||
}
|
||||
RepairProtocol::Orphan(_, slot) => {
|
||||
inc_new_counter_debug!("serve_repair-request-orphan", 1);
|
||||
stats.orphan += 1;
|
||||
(
|
||||
Self::run_orphan(
|
||||
recycler,
|
||||
@ -186,6 +194,7 @@ impl ServeRepair {
|
||||
requests_receiver: &PacketReceiver,
|
||||
response_sender: &PacketSender,
|
||||
max_packets: &mut usize,
|
||||
stats: &mut ServeRepairStats,
|
||||
) -> Result<()> {
|
||||
//TODO cache connections
|
||||
let timeout = Duration::new(1, 0);
|
||||
@ -202,7 +211,7 @@ impl ServeRepair {
|
||||
|
||||
let mut time = Measure::start("repair::handle_packets");
|
||||
for reqs in reqs_v {
|
||||
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender);
|
||||
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
|
||||
}
|
||||
time.stop();
|
||||
if total_packets >= *max_packets {
|
||||
@ -215,6 +224,31 @@ impl ServeRepair {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn report_reset_stats(me: &Arc<RwLock<Self>>, stats: &mut ServeRepairStats) {
|
||||
if stats.self_repair > 0 {
|
||||
let my_id = me.read().unwrap().keypair.pubkey();
|
||||
warn!(
|
||||
"{}: Ignored received repair requests from ME: {}",
|
||||
my_id, stats.self_repair,
|
||||
);
|
||||
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
|
||||
}
|
||||
|
||||
debug!(
|
||||
"repair_listener: total_packets: {} passed: {}",
|
||||
stats.total_packets, stats.processed
|
||||
);
|
||||
|
||||
inc_new_counter_debug!("serve_repair-request-window-index", stats.window_index);
|
||||
inc_new_counter_debug!(
|
||||
"serve_repair-request-highest-window-index",
|
||||
stats.highest_window_index
|
||||
);
|
||||
inc_new_counter_debug!("serve_repair-request-orphan", stats.orphan);
|
||||
|
||||
*stats = ServeRepairStats::default();
|
||||
}
|
||||
|
||||
pub fn listen(
|
||||
me: Arc<RwLock<Self>>,
|
||||
blockstore: Option<Arc<Blockstore>>,
|
||||
@ -228,6 +262,8 @@ impl ServeRepair {
|
||||
.name("solana-repair-listen".to_string())
|
||||
.spawn(move || {
|
||||
let mut max_packets = 1024;
|
||||
let mut last_print = Instant::now();
|
||||
let mut stats = ServeRepairStats::default();
|
||||
loop {
|
||||
let result = Self::run_listen(
|
||||
&me,
|
||||
@ -236,6 +272,7 @@ impl ServeRepair {
|
||||
&requests_receiver,
|
||||
&response_sender,
|
||||
&mut max_packets,
|
||||
&mut stats,
|
||||
);
|
||||
match result {
|
||||
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
|
||||
@ -244,6 +281,10 @@ impl ServeRepair {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
if last_print.elapsed().as_secs() > 2 {
|
||||
Self::report_reset_stats(&me, &mut stats);
|
||||
last_print = Instant::now();
|
||||
}
|
||||
thread_mem_usage::datapoint("solana-repair-listen");
|
||||
}
|
||||
})
|
||||
@ -256,6 +297,7 @@ impl ServeRepair {
|
||||
blockstore: Option<&Arc<Blockstore>>,
|
||||
packets: Packets,
|
||||
response_sender: &PacketSender,
|
||||
stats: &mut ServeRepairStats,
|
||||
) {
|
||||
// iter over the packets, collect pulls separately and process everything else
|
||||
let allocated = thread_mem_usage::Allocatedp::default();
|
||||
@ -265,7 +307,9 @@ impl ServeRepair {
|
||||
limited_deserialize(&packet.data[..packet.meta.size])
|
||||
.into_iter()
|
||||
.for_each(|request| {
|
||||
let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request);
|
||||
stats.processed += 1;
|
||||
let rsp =
|
||||
Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats);
|
||||
if let Some(rsp) = rsp {
|
||||
let _ignore_disconnect = response_sender.send(rsp);
|
||||
}
|
||||
@ -295,7 +339,11 @@ impl ServeRepair {
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> {
|
||||
pub fn repair_request(
|
||||
&self,
|
||||
repair_request: &RepairType,
|
||||
repair_stats: &mut RepairStats,
|
||||
) -> Result<(SocketAddr, Vec<u8>)> {
|
||||
// find a peer that appears to be accepting replication and has the desired slot, as indicated
|
||||
// by a valid tvu port location
|
||||
let valid: Vec<_> = self
|
||||
@ -308,31 +356,27 @@ impl ServeRepair {
|
||||
}
|
||||
let n = thread_rng().gen::<usize>() % valid.len();
|
||||
let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port
|
||||
let out = self.map_repair_request(repair_request)?;
|
||||
let out = self.map_repair_request(repair_request, repair_stats)?;
|
||||
|
||||
Ok((addr, out))
|
||||
}
|
||||
|
||||
pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
|
||||
pub fn map_repair_request(
|
||||
&self,
|
||||
repair_request: &RepairType,
|
||||
repair_stats: &mut RepairStats,
|
||||
) -> Result<Vec<u8>> {
|
||||
match repair_request {
|
||||
RepairType::Shred(slot, shred_index) => {
|
||||
datapoint_debug!(
|
||||
"serve_repair-repair",
|
||||
("repair-slot", *slot, i64),
|
||||
("repair-ix", *shred_index, i64)
|
||||
);
|
||||
repair_stats.shred.update(*slot);
|
||||
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
|
||||
}
|
||||
RepairType::HighestShred(slot, shred_index) => {
|
||||
datapoint_info!(
|
||||
"serve_repair-repair_highest",
|
||||
("repair-highest-slot", *slot, i64),
|
||||
("repair-highest-ix", *shred_index, i64)
|
||||
);
|
||||
repair_stats.highest_shred.update(*slot);
|
||||
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
|
||||
}
|
||||
RepairType::Orphan(slot) => {
|
||||
datapoint_info!("serve_repair-repair_orphan", ("repair-orphan", *slot, i64));
|
||||
repair_stats.orphan.update(*slot);
|
||||
Ok(self.orphan_bytes(*slot)?)
|
||||
}
|
||||
}
|
||||
@ -592,7 +636,7 @@ mod tests {
|
||||
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
|
||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me)));
|
||||
let serve_repair = ServeRepair::new(cluster_info.clone());
|
||||
let rv = serve_repair.repair_request(&RepairType::Shred(0, 0));
|
||||
let rv = serve_repair.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default());
|
||||
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
|
||||
|
||||
let serve_repair_addr = socketaddr!([127, 0, 0, 1], 1243);
|
||||
@ -613,7 +657,7 @@ mod tests {
|
||||
};
|
||||
cluster_info.write().unwrap().insert_info(nxt.clone());
|
||||
let rv = serve_repair
|
||||
.repair_request(&RepairType::Shred(0, 0))
|
||||
.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default())
|
||||
.unwrap();
|
||||
assert_eq!(nxt.serve_repair, serve_repair_addr);
|
||||
assert_eq!(rv.0, nxt.serve_repair);
|
||||
@ -640,7 +684,7 @@ mod tests {
|
||||
while !one || !two {
|
||||
//this randomly picks an option, so eventually it should pick both
|
||||
let rv = serve_repair
|
||||
.repair_request(&RepairType::Shred(0, 0))
|
||||
.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default())
|
||||
.unwrap();
|
||||
if rv.0 == serve_repair_addr {
|
||||
one = true;
|
||||
|
Reference in New Issue
Block a user