From a1a0c638620d8f33ed3d29e606c0103ecde770bd Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 27 Sep 2021 18:11:37 +0000 Subject: [PATCH] retransmits shreds recovered from erasure codes (backport #19233) (#20249) * removes packet-count metrics from retransmit stage Working towards sending shreds (instead of packets) to retransmit stage so that shreds recovered from erasure codes are as well retransmitted. Following commit will add these metrics back to window-service, earlier in the pipeline. (cherry picked from commit bf437b033692dfb058130ffc0382be810a6b10ed) # Conflicts: # core/src/retransmit_stage.rs * adds packet/shred count stats to window-service Adding back these metrics from the earlier commit which removed them from retransmit stage. (cherry picked from commit 8198a7eae1e64e3b713921b68032bf45d7e9df62) * removes erroneous uses of Arc<...> from retransmit stage (cherry picked from commit 6e413331b57ee3b095519dcdd79f9b8d8aa17489) # Conflicts: # core/src/retransmit_stage.rs # core/src/tvu.rs * sends shreds (instead of packets) to retransmit stage Working towards channelling through shreds recovered from erasure codes to retransmit stage. (cherry picked from commit 3efccbffab6393358ad3728742d9e70d40d4e560) # Conflicts: # core/src/retransmit_stage.rs * returns completed-data-set-info from insert_data_shred instead of opaque (u32, u32) which are then converted to CompletedDataSetInfo at the call-site. (cherry picked from commit 3c71670bd9e3d3d8ef1ae3a05c71dc8b09b4f5cd) # Conflicts: # ledger/src/blockstore.rs * retransmits shreds recovered from erasure codes Shreds recovered from erasure codes have not been received from turbine and have not been retransmitted to other nodes downstream. This results in more repairs across the cluster which is slower. This commit channels through recovered shreds to retransmit stage in order to further broadcast the shreds to downstream nodes in the tree. (cherry picked from commit 7a8807b8bba2a0bd41f696d3309487d6423a0b4b) # Conflicts: # core/src/retransmit_stage.rs # core/src/window_service.rs * removes backport merge conflicts Co-authored-by: behzad nouri --- core/benches/retransmit_stage.rs | 29 ++- core/src/packet_hasher.rs | 24 +- core/src/retransmit_stage.rs | 370 +++++++++---------------------- core/src/tvu.rs | 8 +- core/src/window_service.rs | 175 +++++++++------ gossip/src/cluster_info.rs | 4 +- gossip/tests/gossip.rs | 2 +- ledger/src/blockstore.rs | 357 ++++++++++++++--------------- 8 files changed, 421 insertions(+), 548 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 2b949a552c..35fdd5c103 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -17,11 +17,10 @@ use { shred::Shredder, }, solana_measure::measure::Measure, - solana_perf::packet::{Packet, Packets}, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ hash::Hash, - pubkey, + pubkey::Pubkey, signature::{Keypair, Signer}, system_transaction, timing::timestamp, @@ -40,6 +39,13 @@ use { test::Bencher, }; +// TODO: The benchmark is ignored as it currently may indefinitely block. +// The code incorrectly expects that the node receiving the shred on tvu socket +// retransmits that to other nodes in its neighborhood. But that is no longer +// the case since https://github.com/solana-labs/solana/pull/17716. +// So depending on shred seed, peers may not receive packets and the receive +// threads loop indefinitely. +#[ignore] #[bench] #[allow(clippy::same_item_push)] fn bench_retransmitter(bencher: &mut Bencher) { @@ -52,12 +58,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { const NUM_PEERS: usize = 4; let mut peer_sockets = Vec::new(); for _ in 0..NUM_PEERS { - // This ensures that cluster_info.id() is the root of turbine - // retransmit tree and so the shreds are retransmited to all other - // nodes in the cluster. - let id = std::iter::repeat_with(pubkey::new_rand) - .find(|pk| cluster_info.id() < *pk) - .unwrap(); + let id = Pubkey::new_unique(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); contact_info.tvu = socket.local_addr().unwrap(); @@ -76,8 +77,8 @@ fn bench_retransmitter(bencher: &mut Bencher) { let bank_forks = BankForks::new(bank0); let bank = bank_forks.working_bank(); let bank_forks = Arc::new(RwLock::new(bank_forks)); - let (packet_sender, packet_receiver) = channel(); - let packet_receiver = Arc::new(Mutex::new(packet_receiver)); + let (shreds_sender, shreds_receiver) = channel(); + let shreds_receiver = Arc::new(Mutex::new(shreds_receiver)); const NUM_THREADS: usize = 2; let sockets = (0..NUM_THREADS) .map(|_| UdpSocket::bind("0.0.0.0:0").unwrap()) @@ -107,9 +108,9 @@ fn bench_retransmitter(bencher: &mut Bencher) { let retransmitter_handles = retransmitter( Arc::new(sockets), bank_forks, - &leader_schedule_cache, + leader_schedule_cache, cluster_info, - packet_receiver, + shreds_receiver, Arc::default(), // solana_rpc::max_slots::MaxSlots None, ); @@ -148,9 +149,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { shred.set_index(index); index += 1; index %= 200; - let mut p = Packet::default(); - shred.copy_to_packet(&mut p); - let _ = packet_sender.send(Packets::new(vec![p])); + let _ = shreds_sender.send(vec![shred.clone()]); } slot += 1; diff --git a/core/src/packet_hasher.rs b/core/src/packet_hasher.rs index 77612428c5..575c9733fd 100644 --- a/core/src/packet_hasher.rs +++ b/core/src/packet_hasher.rs @@ -1,10 +1,13 @@ // Get a unique hash value for a packet // Used in retransmit and shred fetch to prevent dos with same packet data. -use ahash::AHasher; -use rand::{thread_rng, Rng}; -use solana_perf::packet::Packet; -use std::hash::Hasher; +use { + ahash::AHasher, + rand::{thread_rng, Rng}, + solana_ledger::shred::Shred, + solana_perf::packet::Packet, + std::hash::Hasher, +}; #[derive(Clone)] pub struct PacketHasher { @@ -22,9 +25,18 @@ impl Default for PacketHasher { } impl PacketHasher { - pub fn hash_packet(&self, packet: &Packet) -> u64 { + pub(crate) fn hash_packet(&self, packet: &Packet) -> u64 { + let size = packet.data.len().min(packet.meta.size); + self.hash_data(&packet.data[..size]) + } + + pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 { + self.hash_data(&shred.payload) + } + + fn hash_data(&self, data: &[u8]) -> u64 { let mut hasher = AHasher::new_with_keys(self.seed1, self.seed2); - hasher.write(&packet.data[0..packet.meta.size]); + hasher.write(data); hasher.finish() } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index c81185cdcc..e092f6b8e3 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -18,7 +18,7 @@ use { solana_client::rpc_response::SlotUpdate, solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, solana_ledger::{ - shred::{get_shred_slot_index_type, ShredFetchStats}, + shred::Shred, { blockstore::{Blockstore, CompletedSlotsReceiver}, leader_schedule_cache::LeaderScheduleCache, @@ -26,7 +26,7 @@ use { }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_error, - solana_perf::packet::{Packet, Packets}, + solana_perf::packet::Packets, solana_rpc::{ max_slots::MaxSlots, rpc_completed_slots_service::RpcCompletedSlotsService, rpc_subscriptions::RpcSubscriptions, @@ -38,17 +38,13 @@ use { pubkey::Pubkey, timing::{timestamp, AtomicInterval}, }, - solana_streamer::streamer::PacketReceiver, std::{ - collections::{ - hash_set::HashSet, - {BTreeMap, BTreeSet, HashMap}, - }, + collections::{BTreeSet, HashSet}, net::UdpSocket, ops::DerefMut, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc::{channel, RecvTimeoutError}, + mpsc::{self, channel, RecvTimeoutError}, Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -59,64 +55,48 @@ use { const MAX_DUPLICATE_COUNT: usize = 2; const DEFAULT_LRU_SIZE: usize = 10_000; -// Limit a given thread to consume about this many packets so that +// Limit a given thread to consume about this many shreds so that // it doesn't pull up too much work. -const MAX_PACKET_BATCH_SIZE: usize = 100; +const MAX_SHREDS_BATCH_SIZE: usize = 100; const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); #[derive(Default)] struct RetransmitStats { - total_packets: AtomicU64, + num_shreds: AtomicU64, + num_shreds_skipped: AtomicU64, total_batches: AtomicU64, total_time: AtomicU64, epoch_fetch: AtomicU64, epoch_cache_update: AtomicU64, - repair_total: AtomicU64, - discard_total: AtomicU64, - duplicate_retransmit: AtomicU64, retransmit_total: AtomicU64, last_ts: AtomicInterval, compute_turbine_peers_total: AtomicU64, - retransmit_tree_mismatch: AtomicU64, - packets_by_slot: Mutex>, - packets_by_source: Mutex>, } #[allow(clippy::too_many_arguments)] fn update_retransmit_stats( stats: &RetransmitStats, total_time: u64, - total_packets: usize, + num_shreds: usize, + num_shreds_skipped: usize, retransmit_total: u64, - discard_total: u64, - repair_total: u64, - duplicate_retransmit: u64, compute_turbine_peers_total: u64, peers_len: usize, - packets_by_slot: HashMap, - packets_by_source: HashMap, epoch_fetch: u64, epoch_cach_update: u64, - retransmit_tree_mismatch: u64, ) { stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats - .total_packets - .fetch_add(total_packets as u64, Ordering::Relaxed); + .num_shreds + .fetch_add(num_shreds as u64, Ordering::Relaxed); + stats + .num_shreds_skipped + .fetch_add(num_shreds_skipped as u64, Ordering::Relaxed); stats .retransmit_total .fetch_add(retransmit_total, Ordering::Relaxed); - stats - .repair_total - .fetch_add(repair_total, Ordering::Relaxed); - stats - .discard_total - .fetch_add(discard_total, Ordering::Relaxed); - stats - .duplicate_retransmit - .fetch_add(duplicate_retransmit, Ordering::Relaxed); stats .compute_turbine_peers_total .fetch_add(compute_turbine_peers_total, Ordering::Relaxed); @@ -125,22 +105,6 @@ fn update_retransmit_stats( stats .epoch_cache_update .fetch_add(epoch_cach_update, Ordering::Relaxed); - stats - .retransmit_tree_mismatch - .fetch_add(retransmit_tree_mismatch, Ordering::Relaxed); - { - let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); - for (slot, count) in packets_by_slot { - *stats_packets_by_slot.entry(slot).or_insert(0) += count; - } - } - { - let mut stats_packets_by_source = stats.packets_by_source.lock().unwrap(); - for (source, count) in packets_by_source { - *stats_packets_by_source.entry(source).or_insert(0) += count; - } - } - if stats.last_ts.should_update(2000) { datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); datapoint_info!( @@ -166,8 +130,13 @@ fn update_retransmit_stats( i64 ), ( - "total_packets", - stats.total_packets.swap(0, Ordering::Relaxed) as i64, + "num_shreds", + stats.num_shreds.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_shreds_skipped", + stats.num_shreds_skipped.swap(0, Ordering::Relaxed) as i64, i64 ), ( @@ -175,94 +144,40 @@ fn update_retransmit_stats( stats.retransmit_total.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "retransmit_tree_mismatch", - stats.retransmit_tree_mismatch.swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "compute_turbine", stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "repair_total", - stats.repair_total.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "discard_total", - stats.discard_total.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "duplicate_retransmit", - stats.duplicate_retransmit.swap(0, Ordering::Relaxed) as i64, - i64 - ), ); - let mut packets_by_slot = stats.packets_by_slot.lock().unwrap(); - let old_packets_by_slot = std::mem::take(&mut *packets_by_slot); - drop(packets_by_slot); - - for (slot, num_shreds) in old_packets_by_slot { - datapoint_info!( - "retransmit-slot-num-packets", - ("slot", slot, i64), - ("num_shreds", num_shreds, i64) - ); - } - let mut packets_by_source = stats.packets_by_source.lock().unwrap(); - let mut top = BTreeMap::new(); - let mut max = 0; - for (source, num) in packets_by_source.iter() { - if *num > max { - top.insert(*num, source.clone()); - if top.len() > 5 { - let last = *top.iter().next().unwrap().0; - top.remove(&last); - } - max = *top.iter().next().unwrap().0; - } - } - info!( - "retransmit: top packets_by_source: {:?} len: {}", - top, - packets_by_source.len() - ); - packets_by_source.clear(); } } // Map of shred (slot, index, is_data) => list of hash values seen for that key. -pub type ShredFilter = LruCache<(Slot, u32, bool), Vec>; +type ShredFilter = LruCache<(Slot, u32, bool), Vec>; -pub type ShredFilterAndHasher = (ShredFilter, PacketHasher); +type ShredFilterAndHasher = (ShredFilter, PacketHasher); -// Returns None if shred is already received and should skip retransmit. -// Otherwise returns shred's slot and whether the shred is a data shred. -fn check_if_already_received( - packet: &Packet, - shreds_received: &Mutex, -) -> Option { - let shred = get_shred_slot_index_type(packet, &mut ShredFetchStats::default())?; +// Returns true if shred is already received and should skip retransmit. +fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex) -> bool { + let key = (shred.slot(), shred.index(), shred.is_data()); let mut shreds_received = shreds_received.lock().unwrap(); let (cache, hasher) = shreds_received.deref_mut(); - match cache.get_mut(&shred) { - Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => None, + match cache.get_mut(&key) { + Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true, Some(sent) => { - let hash = hasher.hash_packet(packet); + let hash = hasher.hash_shred(shred); if sent.contains(&hash) { - None + true } else { sent.push(hash); - Some(shred.0) + false } } None => { - let hash = hasher.hash_packet(packet); - cache.put(shred, vec![hash]); - Some(shred.0) + let hash = hasher.hash_shred(shred); + cache.put(key, vec![hash]); + false } } } @@ -317,7 +232,7 @@ fn retransmit( bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, cluster_info: &ClusterInfo, - r: &Mutex, + shreds_receiver: &Mutex>>, sock: &UdpSocket, id: u32, stats: &RetransmitStats, @@ -326,22 +241,19 @@ fn retransmit( shreds_received: &Mutex, max_slots: &MaxSlots, first_shreds_received: &Mutex>, - rpc_subscriptions: &Option>, + rpc_subscriptions: Option<&RpcSubscriptions>, ) -> Result<()> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); - let r_lock = r.lock().unwrap(); - let packets = r_lock.recv_timeout(RECV_TIMEOUT)?; + let shreds_receiver = shreds_receiver.lock().unwrap(); + let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?; let mut timer_start = Measure::start("retransmit"); - let mut total_packets = packets.packets.len(); - let mut packets = vec![packets]; - while let Ok(nq) = r_lock.try_recv() { - total_packets += nq.packets.len(); - packets.push(nq); - if total_packets >= MAX_PACKET_BATCH_SIZE { + while let Ok(more_shreds) = shreds_receiver.try_recv() { + shreds.extend(more_shreds); + if shreds.len() >= MAX_SHREDS_BATCH_SIZE { break; } } - drop(r_lock); + drop(shreds_receiver); let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); let (working_bank, root_bank) = { @@ -354,37 +266,19 @@ fn retransmit( maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts); epoch_cache_update.stop(); + let num_shreds = shreds.len(); let my_id = cluster_info.id(); let socket_addr_space = cluster_info.socket_addr_space(); - let mut discard_total = 0; - let mut repair_total = 0; - let mut duplicate_retransmit = 0; let mut retransmit_total = 0; + let mut num_shreds_skipped = 0; let mut compute_turbine_peers_total = 0; - let mut retransmit_tree_mismatch = 0; - let mut packets_by_slot: HashMap = HashMap::new(); - let mut packets_by_source: HashMap = HashMap::new(); let mut max_slot = 0; - for packet in packets.iter().flat_map(|p| p.packets.iter()) { - // skip discarded packets and repair packets - if packet.meta.discard { - total_packets -= 1; - discard_total += 1; + for shred in shreds { + if should_skip_retransmit(&shred, shreds_received) { + num_shreds_skipped += 1; continue; } - if packet.meta.repair { - total_packets -= 1; - repair_total += 1; - continue; - } - let shred_slot = match check_if_already_received(packet, shreds_received) { - Some(slot) => slot, - None => { - total_packets -= 1; - duplicate_retransmit += 1; - continue; - } - }; + let shred_slot = shred.slot(); max_slot = max_slot.max(shred_slot); if let Some(rpc_subscriptions) = rpc_subscriptions { @@ -397,28 +291,17 @@ fn retransmit( } let mut compute_turbine_peers = Measure::start("turbine_start"); + // TODO: consider using root-bank here for leader lookup! let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)); let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); + let shred_seed = shred.seed(slot_leader, &root_bank); let (neighbors, children) = - cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); - // If the node is on the critical path (i.e. the first node in each - // neighborhood), then we expect that the packet arrives at tvu socket - // as opposed to tvu-forwards. If this is not the case, then the - // turbine broadcast/retransmit tree is mismatched across nodes. + cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader); let anchor_node = neighbors[0].id == my_id; - if packet.meta.forward == anchor_node { - // TODO: Consider forwarding the packet to the root node here. - retransmit_tree_mismatch += 1; - } compute_turbine_peers.stop(); compute_turbine_peers_total += compute_turbine_peers.as_us(); - *packets_by_slot.entry(packet.meta.slot).or_default() += 1; - *packets_by_source - .entry(packet.meta.addr().to_string()) - .or_default() += 1; - let mut retransmit_time = Measure::start("retransmit_to"); // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its @@ -428,15 +311,15 @@ fn retransmit( // First neighbor is this node itself, so skip it. ClusterInfo::retransmit_to( &neighbors[1..], - packet, + &shred.payload, sock, - /*forward socket=*/ true, + true, // forward socket socket_addr_space, ); } ClusterInfo::retransmit_to( &children, - packet, + &shred.payload, sock, !anchor_node, // send to forward socket! socket_addr_space, @@ -447,8 +330,8 @@ fn retransmit( max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed); timer_start.stop(); debug!( - "retransmitted {} packets in {}ms retransmit_time: {}ms id: {}", - total_packets, + "retransmitted {} shreds in {}ms retransmit_time: {}ms id: {}", + num_shreds, timer_start.as_ms(), retransmit_total, id, @@ -458,18 +341,13 @@ fn retransmit( update_retransmit_stats( stats, timer_start.as_us(), - total_packets, + num_shreds, + num_shreds_skipped, retransmit_total, - discard_total, - repair_total, - duplicate_retransmit, compute_turbine_peers_total, cluster_nodes.num_peers(), - packets_by_slot, - packets_by_source, epoch_fetch.as_us(), epoch_cache_update.as_us(), - retransmit_tree_mismatch, ); Ok(()) @@ -486,9 +364,9 @@ fn retransmit( pub fn retransmitter( sockets: Arc>, bank_forks: Arc>, - leader_schedule_cache: &Arc, + leader_schedule_cache: Arc, cluster_info: Arc, - r: Arc>, + shreds_receiver: Arc>>>, max_slots: Arc, rpc_subscriptions: Option>, ) -> Vec> { @@ -508,7 +386,7 @@ pub fn retransmitter( let sockets = sockets.clone(); let bank_forks = bank_forks.clone(); let leader_schedule_cache = leader_schedule_cache.clone(); - let r = r.clone(); + let shreds_receiver = shreds_receiver.clone(); let cluster_info = cluster_info.clone(); let stats = stats.clone(); let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache); @@ -527,7 +405,7 @@ pub fn retransmitter( &bank_forks, &leader_schedule_cache, &cluster_info, - &r, + &shreds_receiver, &sockets[s], s as u32, &stats, @@ -536,7 +414,7 @@ pub fn retransmitter( &shreds_received, &max_slots, &first_shreds_received, - &rpc_subscriptions, + rpc_subscriptions.as_deref(), ) { match e { Error::RecvTimeout(RecvTimeoutError::Disconnected) => break, @@ -554,7 +432,7 @@ pub fn retransmitter( .collect() } -pub struct RetransmitStage { +pub(crate) struct RetransmitStage { thread_hdls: Vec>, window_service: WindowService, cluster_slots_service: ClusterSlotsService, @@ -563,15 +441,15 @@ pub struct RetransmitStage { impl RetransmitStage { #[allow(clippy::new_ret_no_self)] #[allow(clippy::too_many_arguments)] - pub fn new( + pub(crate) fn new( bank_forks: Arc>, - leader_schedule_cache: &Arc, + leader_schedule_cache: Arc, blockstore: Arc, - cluster_info: &Arc, + cluster_info: Arc, retransmit_sockets: Arc>, repair_socket: Arc, verified_receiver: Receiver>, - exit: &Arc, + exit: Arc, rpc_completed_slots_receiver: CompletedSlotsReceiver, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, epoch_schedule: EpochSchedule, @@ -582,20 +460,22 @@ impl RetransmitStage { verified_vote_receiver: VerifiedVoteReceiver, repair_validators: Option>, completed_data_sets_sender: CompletedDataSetsSender, - max_slots: &Arc, + max_slots: Arc, rpc_subscriptions: Option>, duplicate_slots_sender: Sender, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); + // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 + let _retransmit_sender = retransmit_sender.clone(); let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver)); let t_retransmit = retransmitter( retransmit_sockets, bank_forks.clone(), - leader_schedule_cache, + leader_schedule_cache.clone(), cluster_info.clone(), retransmit_receiver, - Arc::clone(max_slots), + max_slots, rpc_subscriptions.clone(), ); @@ -619,13 +499,13 @@ impl RetransmitStage { }; let window_service = WindowService::new( blockstore, - cluster_info.clone(), + cluster_info, verified_receiver, retransmit_sender, repair_socket, - exit.clone(), + exit, repair_info, - leader_schedule_cache.clone(), + leader_schedule_cache, move |id, shred, working_bank, last_root| { let is_connected = cfg .as_ref() @@ -659,7 +539,7 @@ impl RetransmitStage { } } - pub fn join(self) -> thread::Result<()> { + pub(crate) fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } @@ -671,17 +551,19 @@ impl RetransmitStage { #[cfg(test)] mod tests { - use super::*; - use solana_gossip::contact_info::ContactInfo; - use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions}; - use solana_ledger::create_new_tmp_ledger; - use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use solana_ledger::shred::Shred; - use solana_net_utils::find_available_port_in_range; - use solana_perf::packet::{Packet, Packets}; - use solana_sdk::signature::Keypair; - use solana_streamer::socket::SocketAddrSpace; - use std::net::{IpAddr, Ipv4Addr}; + use { + super::*, + solana_gossip::contact_info::ContactInfo, + solana_ledger::{ + blockstore_processor::{process_blockstore, ProcessOptions}, + create_new_tmp_ledger, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }, + solana_net_utils::find_available_port_in_range, + solana_sdk::signature::Keypair, + solana_streamer::socket::SocketAddrSpace, + std::net::{IpAddr, Ipv4Addr}, + }; #[test] fn test_skip_repair() { @@ -729,37 +611,20 @@ mod tests { let cluster_info = Arc::new(cluster_info); let (retransmit_sender, retransmit_receiver) = channel(); - let t_retransmit = retransmitter( + let _retransmit_sender = retransmit_sender.clone(); + let _t_retransmit = retransmitter( retransmit_socket, bank_forks, - &leader_schedule_cache, + leader_schedule_cache, cluster_info, Arc::new(Mutex::new(retransmit_receiver)), Arc::default(), // MaxSlots None, ); - let _thread_hdls = vec![t_retransmit]; - let mut shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); - let mut packet = Packet::default(); - shred.copy_to_packet(&mut packet); - - let packets = Packets::new(vec![packet.clone()]); + let shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); // it should send this over the sockets. - retransmit_sender.send(packets).unwrap(); - let mut packets = Packets::new(vec![]); - solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); - assert_eq!(packets.packets.len(), 1); - assert!(!packets.packets[0].meta.repair); - - let mut repair = packet.clone(); - repair.meta.repair = true; - - shred.set_slot(1); - shred.copy_to_packet(&mut packet); - // send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from - let packets = Packets::new(vec![repair, packet]); - retransmit_sender.send(packets).unwrap(); + retransmit_sender.send(vec![shred]).unwrap(); let mut packets = Packets::new(vec![]); solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); assert_eq!(packets.packets.len(), 1); @@ -768,61 +633,42 @@ mod tests { #[test] fn test_already_received() { - let mut packet = Packet::default(); let slot = 1; let index = 5; let version = 0x40; let shred = Shred::new_from_data(slot, index, 0, None, true, true, 0, version, 0); - shred.copy_to_packet(&mut packet); let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default()))); // unique shred for (1, 5) should pass - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // duplicate shred for (1, 5) blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0); - shred.copy_to_packet(&mut packet); // first duplicate shred for (1, 5) passed - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // then blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0); - shred.copy_to_packet(&mut packet); // 2nd duplicate shred for (1, 5) blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, version); - shred.copy_to_packet(&mut packet); // Coding at (1, 5) passes - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // then blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, version); - shred.copy_to_packet(&mut packet); // 2nd unique coding at (1, 5) passes - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // same again is blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, version); - shred.copy_to_packet(&mut packet); // Another unique coding at (1, 5) always blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); + assert!(should_skip_retransmit(&shred, &shreds_received)); } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d543e6a841..d83c9051e8 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -172,13 +172,13 @@ impl Tvu { let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded(); let retransmit_stage = RetransmitStage::new( bank_forks.clone(), - leader_schedule_cache, + leader_schedule_cache.clone(), blockstore.clone(), - cluster_info, + cluster_info.clone(), Arc::new(retransmit_sockets), repair_socket, verified_receiver, - &exit, + exit.clone(), completed_slots_receiver, cluster_slots_update_receiver, *bank_forks.read().unwrap().working_bank().epoch_schedule(), @@ -189,7 +189,7 @@ impl Tvu { verified_vote_receiver, tvu_config.repair_validators, completed_data_sets_sender, - max_slots, + max_slots.clone(), Some(rpc_subscriptions.clone()), duplicate_slots_sender, ); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index cfd1d4ed52..22da8affc8 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -25,14 +25,15 @@ use { solana_perf::packet::{Packet, Packets}, solana_rayon_threadlimit::get_thread_count, solana_runtime::{bank::Bank, bank_forks::BankForks}, - solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}, - solana_streamer::streamer::PacketSender, + solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey}, std::collections::HashSet, std::{ + cmp::Reverse, + collections::HashMap, net::{SocketAddr, UdpSocket}, - ops::Deref, sync::{ atomic::{AtomicBool, Ordering}, + mpsc::Sender, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -71,6 +72,58 @@ impl WindowServiceMetrics { } } +#[derive(Default)] +struct ReceiveWindowStats { + num_packets: usize, + num_shreds: usize, // num_discards: num_packets - num_shreds + num_repairs: usize, + elapsed: Duration, // excludes waiting time on the receiver channel. + slots: HashMap, + addrs: HashMap, + since: Option, +} + +impl ReceiveWindowStats { + fn maybe_submit(&mut self) { + const MAX_NUM_ADDRS: usize = 5; + const SUBMIT_CADENCE: Duration = Duration::from_secs(2); + let elapsed = self.since.as_ref().map(Instant::elapsed); + if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default() { + return; + } + datapoint_info!( + "receive_window_stats", + ("num_packets", self.num_packets, i64), + ("num_shreds", self.num_shreds, i64), + ("num_repairs", self.num_repairs, i64), + ("elapsed_micros", self.elapsed.as_micros(), i64), + ); + for (slot, num_shreds) in &self.slots { + datapoint_info!( + "receive_window_num_slot_shreds", + ("slot", *slot, i64), + ("num_shreds", *num_shreds, i64) + ); + } + let mut addrs: Vec<_> = std::mem::take(&mut self.addrs).into_iter().collect(); + let reverse_count = |(_addr, count): &_| Reverse(*count); + if addrs.len() > MAX_NUM_ADDRS { + addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count); + addrs.truncate(MAX_NUM_ADDRS); + } + addrs.sort_unstable_by_key(reverse_count); + info!( + "num addresses: {}, top packets by source: {:?}", + self.addrs.len(), + addrs + ); + *self = Self { + since: Some(Instant::now()), + ..Self::default() + }; + } +} + fn verify_shred_slot(shred: &Shred, root: u64) -> bool { if shred.is_data() { // Only data shreds have parent information @@ -204,6 +257,7 @@ fn run_insert( metrics: &mut BlockstoreInsertionMetrics, ws_metrics: &mut WindowServiceMetrics, completed_data_sets_sender: &CompletedDataSetsSender, + retransmit_sender: &Sender>, outstanding_requests: &RwLock, ) -> Result<()> where @@ -231,7 +285,8 @@ where shreds, repairs, Some(leader_schedule_cache), - false, + false, // is_trusted + Some(retransmit_sender), &handle_duplicate, metrics, )?; @@ -252,14 +307,13 @@ where fn recv_window( blockstore: &Blockstore, - leader_schedule_cache: &LeaderScheduleCache, bank_forks: &RwLock, insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, - my_pubkey: &Pubkey, verified_receiver: &CrossbeamReceiver>, - retransmit: &PacketSender, + retransmit_sender: &Sender>, shred_filter: F, thread_pool: &ThreadPool, + stats: &mut ReceiveWindowStats, ) -> Result<()> where F: Fn(&Shred, Arc, /*last root:*/ Slot) -> bool + Sync, @@ -267,16 +321,10 @@ where let timer = Duration::from_millis(200); let mut packets = verified_receiver.recv_timeout(timer)?; packets.extend(verified_receiver.try_iter().flatten()); - let total_packets: usize = packets.iter().map(|p| p.packets.len()).sum(); let now = Instant::now(); - inc_new_counter_debug!("streamer-recv_window-recv", total_packets); - - let (root_bank, working_bank) = { - let bank_forks = bank_forks.read().unwrap(); - (bank_forks.root_bank(), bank_forks.working_bank()) - }; let last_root = blockstore.last_root(); - let handle_packet = |packet: &mut Packet| { + let working_bank = bank_forks.read().unwrap().working_bank(); + let handle_packet = |packet: &Packet| { if packet.meta.discard { inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1); return None; @@ -286,20 +334,10 @@ where // call to `new_from_serialized_shred` is safe. assert_eq!(packet.data.len(), PACKET_DATA_SIZE); let serialized_shred = packet.data.to_vec(); - let working_bank = Arc::clone(&working_bank); - let shred = match Shred::new_from_serialized_shred(serialized_shred) { - Ok(shred) if shred_filter(&shred, working_bank, last_root) => { - let leader_pubkey = - leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref())); - packet.meta.slot = shred.slot(); - packet.meta.seed = shred.seed(leader_pubkey, root_bank.deref()); - shred - } - Ok(_) | Err(_) => { - packet.meta.discard = true; - return None; - } - }; + let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?; + if !shred_filter(&shred, working_bank.clone(), last_root) { + return None; + } if packet.meta.repair { let repair_info = RepairMeta { _from_addr: packet.meta.addr(), @@ -313,29 +351,32 @@ where }; let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets - .par_iter_mut() - .flat_map_iter(|packet| packet.packets.iter_mut().filter_map(handle_packet)) + .par_iter() + .flat_map_iter(|pkt| pkt.packets.iter().filter_map(handle_packet)) .unzip() }); - - trace!("{:?} shreds from packets", shreds.len()); - - trace!("{} num total shreds received: {}", my_pubkey, total_packets); - - for packets in packets.into_iter() { - if !packets.is_empty() { - // Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit) - let _ = retransmit.send(packets); - } + // Exclude repair packets from retransmit. + let _ = retransmit_sender.send( + shreds + .iter() + .zip(&repair_infos) + .filter(|(_, repair_info)| repair_info.is_none()) + .map(|(shred, _)| shred) + .cloned() + .collect(), + ); + stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count(); + stats.num_shreds += shreds.len(); + for shred in &shreds { + *stats.slots.entry(shred.slot()).or_default() += 1; } - insert_shred_sender.send((shreds, repair_infos))?; - trace!( - "Elapsed processing time in recv_window(): {}", - duration_as_ms(&now.elapsed()) - ); - + stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::(); + for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) { + *stats.addrs.entry(packet.meta.addr()).or_default() += 1; + } + stats.elapsed += now.elapsed(); Ok(()) } @@ -375,7 +416,7 @@ impl WindowService { blockstore: Arc, cluster_info: Arc, verified_receiver: CrossbeamReceiver>, - retransmit: PacketSender, + retransmit_sender: Sender>, repair_socket: Arc, exit: Arc, repair_info: RepairInfo, @@ -421,10 +462,11 @@ impl WindowService { let t_insert = Self::start_window_insert_thread( exit.clone(), blockstore.clone(), - leader_schedule_cache.clone(), + leader_schedule_cache, insert_receiver, duplicate_sender, completed_data_sets_sender, + retransmit_sender.clone(), outstanding_requests, ); @@ -435,9 +477,8 @@ impl WindowService { insert_sender, verified_receiver, shred_filter, - leader_schedule_cache, bank_forks, - retransmit, + retransmit_sender, ); WindowService { @@ -487,6 +528,7 @@ impl WindowService { insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, check_duplicate_sender: CrossbeamSender, completed_data_sets_sender: CompletedDataSetsSender, + retransmit_sender: Sender>, outstanding_requests: Arc>, ) -> JoinHandle<()> { let mut handle_timeout = || {}; @@ -516,6 +558,7 @@ impl WindowService { &mut metrics, &mut ws_metrics, &completed_data_sets_sender, + &retransmit_sender, &outstanding_requests, ) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { @@ -543,9 +586,8 @@ impl WindowService { insert_sender: CrossbeamSender<(Vec, Vec>)>, verified_receiver: CrossbeamReceiver>, shred_filter: F, - leader_schedule_cache: Arc, bank_forks: Arc>, - retransmit: PacketSender, + retransmit_sender: Sender>, ) -> JoinHandle<()> where F: 'static @@ -553,6 +595,7 @@ impl WindowService { + std::marker::Send + std::marker::Sync, { + let mut stats = ReceiveWindowStats::default(); Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -567,34 +610,25 @@ impl WindowService { inc_new_counter_error!("solana-window-error", 1, 1); }; - loop { - if exit.load(Ordering::Relaxed) { - break; - } - + while !exit.load(Ordering::Relaxed) { let mut handle_timeout = || { if now.elapsed() > Duration::from_secs(30) { - warn!("Window does not seem to be receiving data. Ensure port configuration is correct..."); + warn!( + "Window does not seem to be receiving data. \ + Ensure port configuration is correct..." + ); now = Instant::now(); } }; if let Err(e) = recv_window( &blockstore, - &leader_schedule_cache, &bank_forks, &insert_sender, - &id, &verified_receiver, - &retransmit, - |shred, bank, last_root| { - shred_filter( - &id, - shred, - Some(bank), - last_root, - ) - }, + &retransmit_sender, + |shred, bank, last_root| shred_filter(&id, shred, Some(bank), last_root), &thread_pool, + &mut stats, ) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { break; @@ -602,6 +636,7 @@ impl WindowService { } else { now = Instant::now(); } + stats.maybe_submit(); } }) .unwrap() diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 4f81701101..ef5624885e 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1272,7 +1272,7 @@ impl ClusterInfo { /// We need to avoid having obj locked while doing a io, such as the `send_to` pub fn retransmit_to( peers: &[&ContactInfo], - packet: &Packet, + data: &[u8], s: &UdpSocket, forwarded: bool, socket_addr_space: &SocketAddrSpace, @@ -1291,8 +1291,6 @@ impl ClusterInfo { .filter(|addr| socket_addr_space.check(addr)) .collect() }; - let data = &packet.data[..packet.meta.size]; - if let Err(SendPktsError::IoError(ioerr, num_failed)) = multi_target_send(s, data, &dests) { inc_new_counter_info!("cluster_info-retransmit-packets", dests.len(), 1); inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index b565a1e04c..b6fa2ed44d 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -220,7 +220,7 @@ pub fn cluster_info_retransmit() { let retransmit_peers: Vec<_> = peers.iter().collect(); ClusterInfo::retransmit_to( &retransmit_peers, - &p, + &p.data[..p.meta.size], &tn1, false, &SocketAddrSpace::Unspecified, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 9a60a74797..7c1138dac8 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1,65 +1,67 @@ //! The `blockstore` module provides functions for parallel verification of the //! Proof of History ledger as well as iterative read, append write, and random //! access read to a persistent file-based ledger. -use crate::{ - ancestor_iterator::AncestorIterator, - blockstore_db::{ - columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection, - IteratorMode, LedgerColumn, Result, WriteBatch, - }, - blockstore_meta::*, - entry::{create_ticks, Entry}, - erasure::ErasureConfig, - leader_schedule_cache::LeaderScheduleCache, - next_slots_iterator::NextSlotsIterator, - shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK}, -}; pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta}; -use bincode::deserialize; -use log::*; -use rayon::{ - iter::{IntoParallelRefIterator, ParallelIterator}, - ThreadPool, -}; -use rocksdb::DBRawIterator; -use solana_measure::measure::Measure; -use solana_metrics::{datapoint_debug, datapoint_error}; -use solana_rayon_threadlimit::get_thread_count; -use solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}; -use solana_sdk::{ - clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK}, - genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, - hash::Hash, - pubkey::Pubkey, - sanitize::Sanitize, - signature::{Keypair, Signature, Signer}, - timing::timestamp, - transaction::Transaction, -}; -use solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta}; -use solana_transaction_status::{ - ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards, - TransactionStatusMeta, TransactionWithStatusMeta, -}; -use std::{ - borrow::Cow, - cell::RefCell, - cmp, - collections::{BTreeMap, HashMap, HashSet}, - convert::TryInto, - fs, - io::{Error as IoError, ErrorKind}, - path::{Path, PathBuf}, - rc::Rc, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{sync_channel, Receiver, SyncSender, TrySendError}, - Arc, Mutex, RwLock, RwLockWriteGuard, +use { + crate::{ + ancestor_iterator::AncestorIterator, + blockstore_db::{ + columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection, + IteratorMode, LedgerColumn, Result, WriteBatch, + }, + blockstore_meta::*, + entry::{create_ticks, Entry}, + erasure::ErasureConfig, + leader_schedule_cache::LeaderScheduleCache, + next_slots_iterator::NextSlotsIterator, + shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK}, }, - time::Instant, + bincode::deserialize, + log::*, + rayon::{ + iter::{IntoParallelRefIterator, ParallelIterator}, + ThreadPool, + }, + rocksdb::DBRawIterator, + solana_measure::measure::Measure, + solana_metrics::{datapoint_debug, datapoint_error}, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, + solana_sdk::{ + clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK}, + genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, + hash::Hash, + pubkey::Pubkey, + sanitize::Sanitize, + signature::{Keypair, Signature, Signer}, + timing::timestamp, + transaction::Transaction, + }, + solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta}, + solana_transaction_status::{ + ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards, + TransactionStatusMeta, TransactionWithStatusMeta, + }, + std::{ + borrow::Cow, + cell::RefCell, + cmp, + collections::{BTreeMap, HashMap, HashSet}, + convert::TryInto, + fs, + io::{Error as IoError, ErrorKind}, + path::{Path, PathBuf}, + rc::Rc, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{sync_channel, Receiver, Sender, SyncSender, TrySendError}, + Arc, Mutex, RwLock, RwLockWriteGuard, + }, + time::Instant, + }, + thiserror::Error, + trees::{Tree, TreeWalk}, }; -use thiserror::Error; -use trees::{Tree, TreeWalk}; pub mod blockstore_purge; @@ -804,6 +806,7 @@ impl Blockstore { is_repaired: Vec, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, + retransmit_sender: Option<&Sender>>, handle_duplicate: &F, metrics: &mut BlockstoreInsertionMetrics, ) -> Result<(Vec, Vec)> @@ -815,7 +818,7 @@ impl Blockstore { let mut start = Measure::start("Blockstore lock"); let _lock = self.insert_shreds_lock.lock().unwrap(); start.stop(); - let insert_lock_elapsed = start.as_us(); + metrics.insert_lock_elapsed += start.as_us(); let db = &*self.db; let mut write_batch = db.batch()?; @@ -826,73 +829,56 @@ impl Blockstore { let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); - let num_shreds = shreds.len(); + metrics.num_shreds += shreds.len(); let mut start = Measure::start("Shred insertion"); - let mut num_inserted = 0; let mut index_meta_time = 0; let mut newly_completed_data_sets: Vec = vec![]; let mut inserted_indices = Vec::new(); - shreds - .into_iter() - .zip(is_repaired.into_iter()) - .enumerate() - .for_each(|(i, (shred, is_repaired))| { - if shred.is_data() { - let shred_slot = shred.slot(); - let shred_source = if is_repaired { - ShredSource::Repaired - } else { - ShredSource::Turbine - }; - if let Ok(completed_data_sets) = self.check_insert_data_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - &mut index_meta_time, - is_trusted, - handle_duplicate, - leader_schedule, - shred_source, - ) { - newly_completed_data_sets.extend(completed_data_sets.into_iter().map( - |(start_index, end_index)| CompletedDataSetInfo { - slot: shred_slot, - start_index, - end_index, - }, - )); - inserted_indices.push(i); - num_inserted += 1; - } - } else if shred.is_code() { - self.check_cache_coding_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut just_inserted_coding_shreds, - &mut index_meta_time, - handle_duplicate, - is_trusted, - is_repaired, - ); + for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() { + if shred.is_data() { + let shred_source = if is_repaired { + ShredSource::Repaired } else { - panic!("There should be no other case"); + ShredSource::Turbine + }; + if let Ok(completed_data_sets) = self.check_insert_data_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + &mut index_meta_time, + is_trusted, + handle_duplicate, + leader_schedule, + shred_source, + ) { + newly_completed_data_sets.extend(completed_data_sets); + inserted_indices.push(i); + metrics.num_inserted += 1; } - }); + } else if shred.is_code() { + self.check_cache_coding_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut just_inserted_coding_shreds, + &mut index_meta_time, + handle_duplicate, + is_trusted, + is_repaired, + ); + } else { + panic!("There should be no other case"); + } + } start.stop(); - let insert_shreds_elapsed = start.as_us(); + metrics.insert_shreds_elapsed += start.as_us(); let mut start = Measure::start("Shred recovery"); - let mut num_recovered = 0; - let mut num_recovered_inserted = 0; - let mut num_recovered_failed_sig = 0; - let mut num_recovered_failed_invalid = 0; - let mut num_recovered_exists = 0; if let Some(leader_schedule_cache) = leader_schedule { - let recovered_data = Self::try_shred_recovery( + let recovered_data_shreds = Self::try_shred_recovery( db, &erasure_metas, &mut index_working_set, @@ -900,71 +886,73 @@ impl Blockstore { &mut just_inserted_coding_shreds, ); - num_recovered = recovered_data.len(); - recovered_data.into_iter().for_each(|shred| { - if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { - let shred_slot = shred.slot(); - if shred.verify(&leader) { - match self.check_insert_data_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - &mut index_meta_time, - is_trusted, - &handle_duplicate, - leader_schedule, - ShredSource::Recovered, - ) { - Err(InsertDataShredError::Exists) => { - num_recovered_exists += 1; - } - Err(InsertDataShredError::InvalidShred) => { - num_recovered_failed_invalid += 1; - } - Err(InsertDataShredError::BlockstoreError(_)) => {} - Ok(completed_data_sets) => { - newly_completed_data_sets.extend( - completed_data_sets.into_iter().map( - |(start_index, end_index)| CompletedDataSetInfo { - slot: shred_slot, - start_index, - end_index, - }, - ), - ); - num_recovered_inserted += 1; - } - } - } else { - num_recovered_failed_sig += 1; + metrics.num_recovered += recovered_data_shreds.len(); + let recovered_data_shreds: Vec<_> = recovered_data_shreds + .into_iter() + .filter_map(|shred| { + let leader = + leader_schedule_cache.slot_leader_at(shred.slot(), /*bank=*/ None)?; + if !shred.verify(&leader) { + metrics.num_recovered_failed_sig += 1; + return None; } + match self.check_insert_data_shred( + shred.clone(), + &mut erasure_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + &mut index_meta_time, + is_trusted, + &handle_duplicate, + leader_schedule, + ShredSource::Recovered, + ) { + Err(InsertDataShredError::Exists) => { + metrics.num_recovered_exists += 1; + None + } + Err(InsertDataShredError::InvalidShred) => { + metrics.num_recovered_failed_invalid += 1; + None + } + Err(InsertDataShredError::BlockstoreError(_)) => None, + Ok(completed_data_sets) => { + newly_completed_data_sets.extend(completed_data_sets); + metrics.num_recovered_inserted += 1; + Some(shred) + } + } + }) + // Always collect recovered-shreds so that above insert code is + // executed even if retransmit-sender is None. + .collect(); + if !recovered_data_shreds.is_empty() { + if let Some(retransmit_sender) = retransmit_sender { + let _ = retransmit_sender.send(recovered_data_shreds); } - }); + } } start.stop(); - let shred_recovery_elapsed = start.as_us(); + metrics.shred_recovery_elapsed += start.as_us(); - just_inserted_coding_shreds - .into_iter() - .for_each(|((_, _), shred)| { - self.check_insert_coding_shred( - shred, - &mut index_working_set, - &mut write_batch, - &mut index_meta_time, - ); - num_inserted += 1; - }); + metrics.num_inserted += just_inserted_coding_shreds.len() as u64; + for (_, shred) in just_inserted_coding_shreds.into_iter() { + self.check_insert_coding_shred( + shred, + &mut index_working_set, + &mut write_batch, + &mut index_meta_time, + ); + } let mut start = Measure::start("Shred recovery"); // Handle chaining for the members of the slot_meta_working_set that were inserted into, // drop the others handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?; start.stop(); - let chaining_elapsed = start.as_us(); + metrics.chaining_elapsed += start.as_us(); let mut start = Measure::start("Commit Working Sets"); let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( @@ -983,12 +971,12 @@ impl Blockstore { } } start.stop(); - let commit_working_sets_elapsed = start.as_us(); + metrics.commit_working_sets_elapsed += start.as_us(); let mut start = Measure::start("Write Batch"); self.db.write(write_batch)?; start.stop(); - let write_batch_elapsed = start.as_us(); + metrics.write_batch_elapsed += start.as_us(); send_signals( &self.new_shreds_signals, @@ -999,20 +987,7 @@ impl Blockstore { total_start.stop(); - metrics.num_shreds += num_shreds; metrics.total_elapsed += total_start.as_us(); - metrics.insert_lock_elapsed += insert_lock_elapsed; - metrics.insert_shreds_elapsed += insert_shreds_elapsed; - metrics.shred_recovery_elapsed += shred_recovery_elapsed; - metrics.chaining_elapsed += chaining_elapsed; - metrics.commit_working_sets_elapsed += commit_working_sets_elapsed; - metrics.write_batch_elapsed += write_batch_elapsed; - metrics.num_inserted += num_inserted; - metrics.num_recovered += num_recovered; - metrics.num_recovered_inserted += num_recovered_inserted; - metrics.num_recovered_failed_sig += num_recovered_failed_sig; - metrics.num_recovered_failed_invalid = num_recovered_failed_invalid; - metrics.num_recovered_exists = num_recovered_exists; metrics.index_meta_time += index_meta_time; Ok((newly_completed_data_sets, inserted_indices)) @@ -1054,7 +1029,8 @@ impl Blockstore { vec![false; shreds_len], leader_schedule, is_trusted, - &|_| {}, + None, // retransmit-sender + &|_| {}, // handle-duplicates &mut BlockstoreInsertionMetrics::default(), ) } @@ -1230,7 +1206,7 @@ impl Blockstore { handle_duplicate: &F, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, - ) -> std::result::Result, InsertDataShredError> + ) -> std::result::Result, InsertDataShredError> where F: Fn(Shred), { @@ -1496,7 +1472,7 @@ impl Blockstore { shred: &Shred, write_batch: &mut WriteBatch, shred_source: ShredSource, - ) -> Result> { + ) -> Result> { let slot = shred.slot(); let index = u64::from(shred.index()); @@ -1545,7 +1521,14 @@ impl Blockstore { new_consumed, shred.reference_tick(), data_index, - ); + ) + .into_iter() + .map(|(start_index, end_index)| CompletedDataSetInfo { + slot, + start_index, + end_index, + }) + .collect(); if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered { let mut slots_stats = self.slots_stats.lock().unwrap(); let mut e = slots_stats.stats.entry(slot_meta.slot).or_default();