diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 2b949a552c..35fdd5c103 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -17,11 +17,10 @@ use { shred::Shredder, }, solana_measure::measure::Measure, - solana_perf::packet::{Packet, Packets}, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ hash::Hash, - pubkey, + pubkey::Pubkey, signature::{Keypair, Signer}, system_transaction, timing::timestamp, @@ -40,6 +39,13 @@ use { test::Bencher, }; +// TODO: The benchmark is ignored as it currently may indefinitely block. +// The code incorrectly expects that the node receiving the shred on tvu socket +// retransmits that to other nodes in its neighborhood. But that is no longer +// the case since https://github.com/solana-labs/solana/pull/17716. +// So depending on shred seed, peers may not receive packets and the receive +// threads loop indefinitely. +#[ignore] #[bench] #[allow(clippy::same_item_push)] fn bench_retransmitter(bencher: &mut Bencher) { @@ -52,12 +58,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { const NUM_PEERS: usize = 4; let mut peer_sockets = Vec::new(); for _ in 0..NUM_PEERS { - // This ensures that cluster_info.id() is the root of turbine - // retransmit tree and so the shreds are retransmited to all other - // nodes in the cluster. - let id = std::iter::repeat_with(pubkey::new_rand) - .find(|pk| cluster_info.id() < *pk) - .unwrap(); + let id = Pubkey::new_unique(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); contact_info.tvu = socket.local_addr().unwrap(); @@ -76,8 +77,8 @@ fn bench_retransmitter(bencher: &mut Bencher) { let bank_forks = BankForks::new(bank0); let bank = bank_forks.working_bank(); let bank_forks = Arc::new(RwLock::new(bank_forks)); - let (packet_sender, packet_receiver) = channel(); - let packet_receiver = Arc::new(Mutex::new(packet_receiver)); + let (shreds_sender, shreds_receiver) = channel(); + let shreds_receiver = Arc::new(Mutex::new(shreds_receiver)); const NUM_THREADS: usize = 2; let sockets = (0..NUM_THREADS) .map(|_| UdpSocket::bind("0.0.0.0:0").unwrap()) @@ -107,9 +108,9 @@ fn bench_retransmitter(bencher: &mut Bencher) { let retransmitter_handles = retransmitter( Arc::new(sockets), bank_forks, - &leader_schedule_cache, + leader_schedule_cache, cluster_info, - packet_receiver, + shreds_receiver, Arc::default(), // solana_rpc::max_slots::MaxSlots None, ); @@ -148,9 +149,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { shred.set_index(index); index += 1; index %= 200; - let mut p = Packet::default(); - shred.copy_to_packet(&mut p); - let _ = packet_sender.send(Packets::new(vec![p])); + let _ = shreds_sender.send(vec![shred.clone()]); } slot += 1; diff --git a/core/src/packet_hasher.rs b/core/src/packet_hasher.rs index 77612428c5..575c9733fd 100644 --- a/core/src/packet_hasher.rs +++ b/core/src/packet_hasher.rs @@ -1,10 +1,13 @@ // Get a unique hash value for a packet // Used in retransmit and shred fetch to prevent dos with same packet data. -use ahash::AHasher; -use rand::{thread_rng, Rng}; -use solana_perf::packet::Packet; -use std::hash::Hasher; +use { + ahash::AHasher, + rand::{thread_rng, Rng}, + solana_ledger::shred::Shred, + solana_perf::packet::Packet, + std::hash::Hasher, +}; #[derive(Clone)] pub struct PacketHasher { @@ -22,9 +25,18 @@ impl Default for PacketHasher { } impl PacketHasher { - pub fn hash_packet(&self, packet: &Packet) -> u64 { + pub(crate) fn hash_packet(&self, packet: &Packet) -> u64 { + let size = packet.data.len().min(packet.meta.size); + self.hash_data(&packet.data[..size]) + } + + pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 { + self.hash_data(&shred.payload) + } + + fn hash_data(&self, data: &[u8]) -> u64 { let mut hasher = AHasher::new_with_keys(self.seed1, self.seed2); - hasher.write(&packet.data[0..packet.meta.size]); + hasher.write(data); hasher.finish() } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index c81185cdcc..e092f6b8e3 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -18,7 +18,7 @@ use { solana_client::rpc_response::SlotUpdate, solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, solana_ledger::{ - shred::{get_shred_slot_index_type, ShredFetchStats}, + shred::Shred, { blockstore::{Blockstore, CompletedSlotsReceiver}, leader_schedule_cache::LeaderScheduleCache, @@ -26,7 +26,7 @@ use { }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_error, - solana_perf::packet::{Packet, Packets}, + solana_perf::packet::Packets, solana_rpc::{ max_slots::MaxSlots, rpc_completed_slots_service::RpcCompletedSlotsService, rpc_subscriptions::RpcSubscriptions, @@ -38,17 +38,13 @@ use { pubkey::Pubkey, timing::{timestamp, AtomicInterval}, }, - solana_streamer::streamer::PacketReceiver, std::{ - collections::{ - hash_set::HashSet, - {BTreeMap, BTreeSet, HashMap}, - }, + collections::{BTreeSet, HashSet}, net::UdpSocket, ops::DerefMut, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc::{channel, RecvTimeoutError}, + mpsc::{self, channel, RecvTimeoutError}, Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -59,64 +55,48 @@ use { const MAX_DUPLICATE_COUNT: usize = 2; const DEFAULT_LRU_SIZE: usize = 10_000; -// Limit a given thread to consume about this many packets so that +// Limit a given thread to consume about this many shreds so that // it doesn't pull up too much work. -const MAX_PACKET_BATCH_SIZE: usize = 100; +const MAX_SHREDS_BATCH_SIZE: usize = 100; const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); #[derive(Default)] struct RetransmitStats { - total_packets: AtomicU64, + num_shreds: AtomicU64, + num_shreds_skipped: AtomicU64, total_batches: AtomicU64, total_time: AtomicU64, epoch_fetch: AtomicU64, epoch_cache_update: AtomicU64, - repair_total: AtomicU64, - discard_total: AtomicU64, - duplicate_retransmit: AtomicU64, retransmit_total: AtomicU64, last_ts: AtomicInterval, compute_turbine_peers_total: AtomicU64, - retransmit_tree_mismatch: AtomicU64, - packets_by_slot: Mutex>, - packets_by_source: Mutex>, } #[allow(clippy::too_many_arguments)] fn update_retransmit_stats( stats: &RetransmitStats, total_time: u64, - total_packets: usize, + num_shreds: usize, + num_shreds_skipped: usize, retransmit_total: u64, - discard_total: u64, - repair_total: u64, - duplicate_retransmit: u64, compute_turbine_peers_total: u64, peers_len: usize, - packets_by_slot: HashMap, - packets_by_source: HashMap, epoch_fetch: u64, epoch_cach_update: u64, - retransmit_tree_mismatch: u64, ) { stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats - .total_packets - .fetch_add(total_packets as u64, Ordering::Relaxed); + .num_shreds + .fetch_add(num_shreds as u64, Ordering::Relaxed); + stats + .num_shreds_skipped + .fetch_add(num_shreds_skipped as u64, Ordering::Relaxed); stats .retransmit_total .fetch_add(retransmit_total, Ordering::Relaxed); - stats - .repair_total - .fetch_add(repair_total, Ordering::Relaxed); - stats - .discard_total - .fetch_add(discard_total, Ordering::Relaxed); - stats - .duplicate_retransmit - .fetch_add(duplicate_retransmit, Ordering::Relaxed); stats .compute_turbine_peers_total .fetch_add(compute_turbine_peers_total, Ordering::Relaxed); @@ -125,22 +105,6 @@ fn update_retransmit_stats( stats .epoch_cache_update .fetch_add(epoch_cach_update, Ordering::Relaxed); - stats - .retransmit_tree_mismatch - .fetch_add(retransmit_tree_mismatch, Ordering::Relaxed); - { - let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); - for (slot, count) in packets_by_slot { - *stats_packets_by_slot.entry(slot).or_insert(0) += count; - } - } - { - let mut stats_packets_by_source = stats.packets_by_source.lock().unwrap(); - for (source, count) in packets_by_source { - *stats_packets_by_source.entry(source).or_insert(0) += count; - } - } - if stats.last_ts.should_update(2000) { datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); datapoint_info!( @@ -166,8 +130,13 @@ fn update_retransmit_stats( i64 ), ( - "total_packets", - stats.total_packets.swap(0, Ordering::Relaxed) as i64, + "num_shreds", + stats.num_shreds.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_shreds_skipped", + stats.num_shreds_skipped.swap(0, Ordering::Relaxed) as i64, i64 ), ( @@ -175,94 +144,40 @@ fn update_retransmit_stats( stats.retransmit_total.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "retransmit_tree_mismatch", - stats.retransmit_tree_mismatch.swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "compute_turbine", stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "repair_total", - stats.repair_total.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "discard_total", - stats.discard_total.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "duplicate_retransmit", - stats.duplicate_retransmit.swap(0, Ordering::Relaxed) as i64, - i64 - ), ); - let mut packets_by_slot = stats.packets_by_slot.lock().unwrap(); - let old_packets_by_slot = std::mem::take(&mut *packets_by_slot); - drop(packets_by_slot); - - for (slot, num_shreds) in old_packets_by_slot { - datapoint_info!( - "retransmit-slot-num-packets", - ("slot", slot, i64), - ("num_shreds", num_shreds, i64) - ); - } - let mut packets_by_source = stats.packets_by_source.lock().unwrap(); - let mut top = BTreeMap::new(); - let mut max = 0; - for (source, num) in packets_by_source.iter() { - if *num > max { - top.insert(*num, source.clone()); - if top.len() > 5 { - let last = *top.iter().next().unwrap().0; - top.remove(&last); - } - max = *top.iter().next().unwrap().0; - } - } - info!( - "retransmit: top packets_by_source: {:?} len: {}", - top, - packets_by_source.len() - ); - packets_by_source.clear(); } } // Map of shred (slot, index, is_data) => list of hash values seen for that key. -pub type ShredFilter = LruCache<(Slot, u32, bool), Vec>; +type ShredFilter = LruCache<(Slot, u32, bool), Vec>; -pub type ShredFilterAndHasher = (ShredFilter, PacketHasher); +type ShredFilterAndHasher = (ShredFilter, PacketHasher); -// Returns None if shred is already received and should skip retransmit. -// Otherwise returns shred's slot and whether the shred is a data shred. -fn check_if_already_received( - packet: &Packet, - shreds_received: &Mutex, -) -> Option { - let shred = get_shred_slot_index_type(packet, &mut ShredFetchStats::default())?; +// Returns true if shred is already received and should skip retransmit. +fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex) -> bool { + let key = (shred.slot(), shred.index(), shred.is_data()); let mut shreds_received = shreds_received.lock().unwrap(); let (cache, hasher) = shreds_received.deref_mut(); - match cache.get_mut(&shred) { - Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => None, + match cache.get_mut(&key) { + Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true, Some(sent) => { - let hash = hasher.hash_packet(packet); + let hash = hasher.hash_shred(shred); if sent.contains(&hash) { - None + true } else { sent.push(hash); - Some(shred.0) + false } } None => { - let hash = hasher.hash_packet(packet); - cache.put(shred, vec![hash]); - Some(shred.0) + let hash = hasher.hash_shred(shred); + cache.put(key, vec![hash]); + false } } } @@ -317,7 +232,7 @@ fn retransmit( bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, cluster_info: &ClusterInfo, - r: &Mutex, + shreds_receiver: &Mutex>>, sock: &UdpSocket, id: u32, stats: &RetransmitStats, @@ -326,22 +241,19 @@ fn retransmit( shreds_received: &Mutex, max_slots: &MaxSlots, first_shreds_received: &Mutex>, - rpc_subscriptions: &Option>, + rpc_subscriptions: Option<&RpcSubscriptions>, ) -> Result<()> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); - let r_lock = r.lock().unwrap(); - let packets = r_lock.recv_timeout(RECV_TIMEOUT)?; + let shreds_receiver = shreds_receiver.lock().unwrap(); + let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?; let mut timer_start = Measure::start("retransmit"); - let mut total_packets = packets.packets.len(); - let mut packets = vec![packets]; - while let Ok(nq) = r_lock.try_recv() { - total_packets += nq.packets.len(); - packets.push(nq); - if total_packets >= MAX_PACKET_BATCH_SIZE { + while let Ok(more_shreds) = shreds_receiver.try_recv() { + shreds.extend(more_shreds); + if shreds.len() >= MAX_SHREDS_BATCH_SIZE { break; } } - drop(r_lock); + drop(shreds_receiver); let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); let (working_bank, root_bank) = { @@ -354,37 +266,19 @@ fn retransmit( maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts); epoch_cache_update.stop(); + let num_shreds = shreds.len(); let my_id = cluster_info.id(); let socket_addr_space = cluster_info.socket_addr_space(); - let mut discard_total = 0; - let mut repair_total = 0; - let mut duplicate_retransmit = 0; let mut retransmit_total = 0; + let mut num_shreds_skipped = 0; let mut compute_turbine_peers_total = 0; - let mut retransmit_tree_mismatch = 0; - let mut packets_by_slot: HashMap = HashMap::new(); - let mut packets_by_source: HashMap = HashMap::new(); let mut max_slot = 0; - for packet in packets.iter().flat_map(|p| p.packets.iter()) { - // skip discarded packets and repair packets - if packet.meta.discard { - total_packets -= 1; - discard_total += 1; + for shred in shreds { + if should_skip_retransmit(&shred, shreds_received) { + num_shreds_skipped += 1; continue; } - if packet.meta.repair { - total_packets -= 1; - repair_total += 1; - continue; - } - let shred_slot = match check_if_already_received(packet, shreds_received) { - Some(slot) => slot, - None => { - total_packets -= 1; - duplicate_retransmit += 1; - continue; - } - }; + let shred_slot = shred.slot(); max_slot = max_slot.max(shred_slot); if let Some(rpc_subscriptions) = rpc_subscriptions { @@ -397,28 +291,17 @@ fn retransmit( } let mut compute_turbine_peers = Measure::start("turbine_start"); + // TODO: consider using root-bank here for leader lookup! let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)); let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); + let shred_seed = shred.seed(slot_leader, &root_bank); let (neighbors, children) = - cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); - // If the node is on the critical path (i.e. the first node in each - // neighborhood), then we expect that the packet arrives at tvu socket - // as opposed to tvu-forwards. If this is not the case, then the - // turbine broadcast/retransmit tree is mismatched across nodes. + cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader); let anchor_node = neighbors[0].id == my_id; - if packet.meta.forward == anchor_node { - // TODO: Consider forwarding the packet to the root node here. - retransmit_tree_mismatch += 1; - } compute_turbine_peers.stop(); compute_turbine_peers_total += compute_turbine_peers.as_us(); - *packets_by_slot.entry(packet.meta.slot).or_default() += 1; - *packets_by_source - .entry(packet.meta.addr().to_string()) - .or_default() += 1; - let mut retransmit_time = Measure::start("retransmit_to"); // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its @@ -428,15 +311,15 @@ fn retransmit( // First neighbor is this node itself, so skip it. ClusterInfo::retransmit_to( &neighbors[1..], - packet, + &shred.payload, sock, - /*forward socket=*/ true, + true, // forward socket socket_addr_space, ); } ClusterInfo::retransmit_to( &children, - packet, + &shred.payload, sock, !anchor_node, // send to forward socket! socket_addr_space, @@ -447,8 +330,8 @@ fn retransmit( max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed); timer_start.stop(); debug!( - "retransmitted {} packets in {}ms retransmit_time: {}ms id: {}", - total_packets, + "retransmitted {} shreds in {}ms retransmit_time: {}ms id: {}", + num_shreds, timer_start.as_ms(), retransmit_total, id, @@ -458,18 +341,13 @@ fn retransmit( update_retransmit_stats( stats, timer_start.as_us(), - total_packets, + num_shreds, + num_shreds_skipped, retransmit_total, - discard_total, - repair_total, - duplicate_retransmit, compute_turbine_peers_total, cluster_nodes.num_peers(), - packets_by_slot, - packets_by_source, epoch_fetch.as_us(), epoch_cache_update.as_us(), - retransmit_tree_mismatch, ); Ok(()) @@ -486,9 +364,9 @@ fn retransmit( pub fn retransmitter( sockets: Arc>, bank_forks: Arc>, - leader_schedule_cache: &Arc, + leader_schedule_cache: Arc, cluster_info: Arc, - r: Arc>, + shreds_receiver: Arc>>>, max_slots: Arc, rpc_subscriptions: Option>, ) -> Vec> { @@ -508,7 +386,7 @@ pub fn retransmitter( let sockets = sockets.clone(); let bank_forks = bank_forks.clone(); let leader_schedule_cache = leader_schedule_cache.clone(); - let r = r.clone(); + let shreds_receiver = shreds_receiver.clone(); let cluster_info = cluster_info.clone(); let stats = stats.clone(); let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache); @@ -527,7 +405,7 @@ pub fn retransmitter( &bank_forks, &leader_schedule_cache, &cluster_info, - &r, + &shreds_receiver, &sockets[s], s as u32, &stats, @@ -536,7 +414,7 @@ pub fn retransmitter( &shreds_received, &max_slots, &first_shreds_received, - &rpc_subscriptions, + rpc_subscriptions.as_deref(), ) { match e { Error::RecvTimeout(RecvTimeoutError::Disconnected) => break, @@ -554,7 +432,7 @@ pub fn retransmitter( .collect() } -pub struct RetransmitStage { +pub(crate) struct RetransmitStage { thread_hdls: Vec>, window_service: WindowService, cluster_slots_service: ClusterSlotsService, @@ -563,15 +441,15 @@ pub struct RetransmitStage { impl RetransmitStage { #[allow(clippy::new_ret_no_self)] #[allow(clippy::too_many_arguments)] - pub fn new( + pub(crate) fn new( bank_forks: Arc>, - leader_schedule_cache: &Arc, + leader_schedule_cache: Arc, blockstore: Arc, - cluster_info: &Arc, + cluster_info: Arc, retransmit_sockets: Arc>, repair_socket: Arc, verified_receiver: Receiver>, - exit: &Arc, + exit: Arc, rpc_completed_slots_receiver: CompletedSlotsReceiver, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, epoch_schedule: EpochSchedule, @@ -582,20 +460,22 @@ impl RetransmitStage { verified_vote_receiver: VerifiedVoteReceiver, repair_validators: Option>, completed_data_sets_sender: CompletedDataSetsSender, - max_slots: &Arc, + max_slots: Arc, rpc_subscriptions: Option>, duplicate_slots_sender: Sender, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); + // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 + let _retransmit_sender = retransmit_sender.clone(); let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver)); let t_retransmit = retransmitter( retransmit_sockets, bank_forks.clone(), - leader_schedule_cache, + leader_schedule_cache.clone(), cluster_info.clone(), retransmit_receiver, - Arc::clone(max_slots), + max_slots, rpc_subscriptions.clone(), ); @@ -619,13 +499,13 @@ impl RetransmitStage { }; let window_service = WindowService::new( blockstore, - cluster_info.clone(), + cluster_info, verified_receiver, retransmit_sender, repair_socket, - exit.clone(), + exit, repair_info, - leader_schedule_cache.clone(), + leader_schedule_cache, move |id, shred, working_bank, last_root| { let is_connected = cfg .as_ref() @@ -659,7 +539,7 @@ impl RetransmitStage { } } - pub fn join(self) -> thread::Result<()> { + pub(crate) fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } @@ -671,17 +551,19 @@ impl RetransmitStage { #[cfg(test)] mod tests { - use super::*; - use solana_gossip::contact_info::ContactInfo; - use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions}; - use solana_ledger::create_new_tmp_ledger; - use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use solana_ledger::shred::Shred; - use solana_net_utils::find_available_port_in_range; - use solana_perf::packet::{Packet, Packets}; - use solana_sdk::signature::Keypair; - use solana_streamer::socket::SocketAddrSpace; - use std::net::{IpAddr, Ipv4Addr}; + use { + super::*, + solana_gossip::contact_info::ContactInfo, + solana_ledger::{ + blockstore_processor::{process_blockstore, ProcessOptions}, + create_new_tmp_ledger, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }, + solana_net_utils::find_available_port_in_range, + solana_sdk::signature::Keypair, + solana_streamer::socket::SocketAddrSpace, + std::net::{IpAddr, Ipv4Addr}, + }; #[test] fn test_skip_repair() { @@ -729,37 +611,20 @@ mod tests { let cluster_info = Arc::new(cluster_info); let (retransmit_sender, retransmit_receiver) = channel(); - let t_retransmit = retransmitter( + let _retransmit_sender = retransmit_sender.clone(); + let _t_retransmit = retransmitter( retransmit_socket, bank_forks, - &leader_schedule_cache, + leader_schedule_cache, cluster_info, Arc::new(Mutex::new(retransmit_receiver)), Arc::default(), // MaxSlots None, ); - let _thread_hdls = vec![t_retransmit]; - let mut shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); - let mut packet = Packet::default(); - shred.copy_to_packet(&mut packet); - - let packets = Packets::new(vec![packet.clone()]); + let shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); // it should send this over the sockets. - retransmit_sender.send(packets).unwrap(); - let mut packets = Packets::new(vec![]); - solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); - assert_eq!(packets.packets.len(), 1); - assert!(!packets.packets[0].meta.repair); - - let mut repair = packet.clone(); - repair.meta.repair = true; - - shred.set_slot(1); - shred.copy_to_packet(&mut packet); - // send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from - let packets = Packets::new(vec![repair, packet]); - retransmit_sender.send(packets).unwrap(); + retransmit_sender.send(vec![shred]).unwrap(); let mut packets = Packets::new(vec![]); solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); assert_eq!(packets.packets.len(), 1); @@ -768,61 +633,42 @@ mod tests { #[test] fn test_already_received() { - let mut packet = Packet::default(); let slot = 1; let index = 5; let version = 0x40; let shred = Shred::new_from_data(slot, index, 0, None, true, true, 0, version, 0); - shred.copy_to_packet(&mut packet); let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default()))); // unique shred for (1, 5) should pass - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // duplicate shred for (1, 5) blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0); - shred.copy_to_packet(&mut packet); // first duplicate shred for (1, 5) passed - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // then blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0); - shred.copy_to_packet(&mut packet); // 2nd duplicate shred for (1, 5) blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, version); - shred.copy_to_packet(&mut packet); // Coding at (1, 5) passes - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // then blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, version); - shred.copy_to_packet(&mut packet); // 2nd unique coding at (1, 5) passes - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // same again is blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, version); - shred.copy_to_packet(&mut packet); // Another unique coding at (1, 5) always blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); + assert!(should_skip_retransmit(&shred, &shreds_received)); } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d543e6a841..d83c9051e8 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -172,13 +172,13 @@ impl Tvu { let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded(); let retransmit_stage = RetransmitStage::new( bank_forks.clone(), - leader_schedule_cache, + leader_schedule_cache.clone(), blockstore.clone(), - cluster_info, + cluster_info.clone(), Arc::new(retransmit_sockets), repair_socket, verified_receiver, - &exit, + exit.clone(), completed_slots_receiver, cluster_slots_update_receiver, *bank_forks.read().unwrap().working_bank().epoch_schedule(), @@ -189,7 +189,7 @@ impl Tvu { verified_vote_receiver, tvu_config.repair_validators, completed_data_sets_sender, - max_slots, + max_slots.clone(), Some(rpc_subscriptions.clone()), duplicate_slots_sender, ); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index cfd1d4ed52..22da8affc8 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -25,14 +25,15 @@ use { solana_perf::packet::{Packet, Packets}, solana_rayon_threadlimit::get_thread_count, solana_runtime::{bank::Bank, bank_forks::BankForks}, - solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}, - solana_streamer::streamer::PacketSender, + solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey}, std::collections::HashSet, std::{ + cmp::Reverse, + collections::HashMap, net::{SocketAddr, UdpSocket}, - ops::Deref, sync::{ atomic::{AtomicBool, Ordering}, + mpsc::Sender, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -71,6 +72,58 @@ impl WindowServiceMetrics { } } +#[derive(Default)] +struct ReceiveWindowStats { + num_packets: usize, + num_shreds: usize, // num_discards: num_packets - num_shreds + num_repairs: usize, + elapsed: Duration, // excludes waiting time on the receiver channel. + slots: HashMap, + addrs: HashMap, + since: Option, +} + +impl ReceiveWindowStats { + fn maybe_submit(&mut self) { + const MAX_NUM_ADDRS: usize = 5; + const SUBMIT_CADENCE: Duration = Duration::from_secs(2); + let elapsed = self.since.as_ref().map(Instant::elapsed); + if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default() { + return; + } + datapoint_info!( + "receive_window_stats", + ("num_packets", self.num_packets, i64), + ("num_shreds", self.num_shreds, i64), + ("num_repairs", self.num_repairs, i64), + ("elapsed_micros", self.elapsed.as_micros(), i64), + ); + for (slot, num_shreds) in &self.slots { + datapoint_info!( + "receive_window_num_slot_shreds", + ("slot", *slot, i64), + ("num_shreds", *num_shreds, i64) + ); + } + let mut addrs: Vec<_> = std::mem::take(&mut self.addrs).into_iter().collect(); + let reverse_count = |(_addr, count): &_| Reverse(*count); + if addrs.len() > MAX_NUM_ADDRS { + addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count); + addrs.truncate(MAX_NUM_ADDRS); + } + addrs.sort_unstable_by_key(reverse_count); + info!( + "num addresses: {}, top packets by source: {:?}", + self.addrs.len(), + addrs + ); + *self = Self { + since: Some(Instant::now()), + ..Self::default() + }; + } +} + fn verify_shred_slot(shred: &Shred, root: u64) -> bool { if shred.is_data() { // Only data shreds have parent information @@ -204,6 +257,7 @@ fn run_insert( metrics: &mut BlockstoreInsertionMetrics, ws_metrics: &mut WindowServiceMetrics, completed_data_sets_sender: &CompletedDataSetsSender, + retransmit_sender: &Sender>, outstanding_requests: &RwLock, ) -> Result<()> where @@ -231,7 +285,8 @@ where shreds, repairs, Some(leader_schedule_cache), - false, + false, // is_trusted + Some(retransmit_sender), &handle_duplicate, metrics, )?; @@ -252,14 +307,13 @@ where fn recv_window( blockstore: &Blockstore, - leader_schedule_cache: &LeaderScheduleCache, bank_forks: &RwLock, insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, - my_pubkey: &Pubkey, verified_receiver: &CrossbeamReceiver>, - retransmit: &PacketSender, + retransmit_sender: &Sender>, shred_filter: F, thread_pool: &ThreadPool, + stats: &mut ReceiveWindowStats, ) -> Result<()> where F: Fn(&Shred, Arc, /*last root:*/ Slot) -> bool + Sync, @@ -267,16 +321,10 @@ where let timer = Duration::from_millis(200); let mut packets = verified_receiver.recv_timeout(timer)?; packets.extend(verified_receiver.try_iter().flatten()); - let total_packets: usize = packets.iter().map(|p| p.packets.len()).sum(); let now = Instant::now(); - inc_new_counter_debug!("streamer-recv_window-recv", total_packets); - - let (root_bank, working_bank) = { - let bank_forks = bank_forks.read().unwrap(); - (bank_forks.root_bank(), bank_forks.working_bank()) - }; let last_root = blockstore.last_root(); - let handle_packet = |packet: &mut Packet| { + let working_bank = bank_forks.read().unwrap().working_bank(); + let handle_packet = |packet: &Packet| { if packet.meta.discard { inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1); return None; @@ -286,20 +334,10 @@ where // call to `new_from_serialized_shred` is safe. assert_eq!(packet.data.len(), PACKET_DATA_SIZE); let serialized_shred = packet.data.to_vec(); - let working_bank = Arc::clone(&working_bank); - let shred = match Shred::new_from_serialized_shred(serialized_shred) { - Ok(shred) if shred_filter(&shred, working_bank, last_root) => { - let leader_pubkey = - leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref())); - packet.meta.slot = shred.slot(); - packet.meta.seed = shred.seed(leader_pubkey, root_bank.deref()); - shred - } - Ok(_) | Err(_) => { - packet.meta.discard = true; - return None; - } - }; + let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?; + if !shred_filter(&shred, working_bank.clone(), last_root) { + return None; + } if packet.meta.repair { let repair_info = RepairMeta { _from_addr: packet.meta.addr(), @@ -313,29 +351,32 @@ where }; let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets - .par_iter_mut() - .flat_map_iter(|packet| packet.packets.iter_mut().filter_map(handle_packet)) + .par_iter() + .flat_map_iter(|pkt| pkt.packets.iter().filter_map(handle_packet)) .unzip() }); - - trace!("{:?} shreds from packets", shreds.len()); - - trace!("{} num total shreds received: {}", my_pubkey, total_packets); - - for packets in packets.into_iter() { - if !packets.is_empty() { - // Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit) - let _ = retransmit.send(packets); - } + // Exclude repair packets from retransmit. + let _ = retransmit_sender.send( + shreds + .iter() + .zip(&repair_infos) + .filter(|(_, repair_info)| repair_info.is_none()) + .map(|(shred, _)| shred) + .cloned() + .collect(), + ); + stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count(); + stats.num_shreds += shreds.len(); + for shred in &shreds { + *stats.slots.entry(shred.slot()).or_default() += 1; } - insert_shred_sender.send((shreds, repair_infos))?; - trace!( - "Elapsed processing time in recv_window(): {}", - duration_as_ms(&now.elapsed()) - ); - + stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::(); + for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) { + *stats.addrs.entry(packet.meta.addr()).or_default() += 1; + } + stats.elapsed += now.elapsed(); Ok(()) } @@ -375,7 +416,7 @@ impl WindowService { blockstore: Arc, cluster_info: Arc, verified_receiver: CrossbeamReceiver>, - retransmit: PacketSender, + retransmit_sender: Sender>, repair_socket: Arc, exit: Arc, repair_info: RepairInfo, @@ -421,10 +462,11 @@ impl WindowService { let t_insert = Self::start_window_insert_thread( exit.clone(), blockstore.clone(), - leader_schedule_cache.clone(), + leader_schedule_cache, insert_receiver, duplicate_sender, completed_data_sets_sender, + retransmit_sender.clone(), outstanding_requests, ); @@ -435,9 +477,8 @@ impl WindowService { insert_sender, verified_receiver, shred_filter, - leader_schedule_cache, bank_forks, - retransmit, + retransmit_sender, ); WindowService { @@ -487,6 +528,7 @@ impl WindowService { insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, check_duplicate_sender: CrossbeamSender, completed_data_sets_sender: CompletedDataSetsSender, + retransmit_sender: Sender>, outstanding_requests: Arc>, ) -> JoinHandle<()> { let mut handle_timeout = || {}; @@ -516,6 +558,7 @@ impl WindowService { &mut metrics, &mut ws_metrics, &completed_data_sets_sender, + &retransmit_sender, &outstanding_requests, ) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { @@ -543,9 +586,8 @@ impl WindowService { insert_sender: CrossbeamSender<(Vec, Vec>)>, verified_receiver: CrossbeamReceiver>, shred_filter: F, - leader_schedule_cache: Arc, bank_forks: Arc>, - retransmit: PacketSender, + retransmit_sender: Sender>, ) -> JoinHandle<()> where F: 'static @@ -553,6 +595,7 @@ impl WindowService { + std::marker::Send + std::marker::Sync, { + let mut stats = ReceiveWindowStats::default(); Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -567,34 +610,25 @@ impl WindowService { inc_new_counter_error!("solana-window-error", 1, 1); }; - loop { - if exit.load(Ordering::Relaxed) { - break; - } - + while !exit.load(Ordering::Relaxed) { let mut handle_timeout = || { if now.elapsed() > Duration::from_secs(30) { - warn!("Window does not seem to be receiving data. Ensure port configuration is correct..."); + warn!( + "Window does not seem to be receiving data. \ + Ensure port configuration is correct..." + ); now = Instant::now(); } }; if let Err(e) = recv_window( &blockstore, - &leader_schedule_cache, &bank_forks, &insert_sender, - &id, &verified_receiver, - &retransmit, - |shred, bank, last_root| { - shred_filter( - &id, - shred, - Some(bank), - last_root, - ) - }, + &retransmit_sender, + |shred, bank, last_root| shred_filter(&id, shred, Some(bank), last_root), &thread_pool, + &mut stats, ) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { break; @@ -602,6 +636,7 @@ impl WindowService { } else { now = Instant::now(); } + stats.maybe_submit(); } }) .unwrap() diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 4f81701101..ef5624885e 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1272,7 +1272,7 @@ impl ClusterInfo { /// We need to avoid having obj locked while doing a io, such as the `send_to` pub fn retransmit_to( peers: &[&ContactInfo], - packet: &Packet, + data: &[u8], s: &UdpSocket, forwarded: bool, socket_addr_space: &SocketAddrSpace, @@ -1291,8 +1291,6 @@ impl ClusterInfo { .filter(|addr| socket_addr_space.check(addr)) .collect() }; - let data = &packet.data[..packet.meta.size]; - if let Err(SendPktsError::IoError(ioerr, num_failed)) = multi_target_send(s, data, &dests) { inc_new_counter_info!("cluster_info-retransmit-packets", dests.len(), 1); inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index b565a1e04c..b6fa2ed44d 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -220,7 +220,7 @@ pub fn cluster_info_retransmit() { let retransmit_peers: Vec<_> = peers.iter().collect(); ClusterInfo::retransmit_to( &retransmit_peers, - &p, + &p.data[..p.meta.size], &tn1, false, &SocketAddrSpace::Unspecified, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 9a60a74797..7c1138dac8 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1,65 +1,67 @@ //! The `blockstore` module provides functions for parallel verification of the //! Proof of History ledger as well as iterative read, append write, and random //! access read to a persistent file-based ledger. -use crate::{ - ancestor_iterator::AncestorIterator, - blockstore_db::{ - columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection, - IteratorMode, LedgerColumn, Result, WriteBatch, - }, - blockstore_meta::*, - entry::{create_ticks, Entry}, - erasure::ErasureConfig, - leader_schedule_cache::LeaderScheduleCache, - next_slots_iterator::NextSlotsIterator, - shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK}, -}; pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta}; -use bincode::deserialize; -use log::*; -use rayon::{ - iter::{IntoParallelRefIterator, ParallelIterator}, - ThreadPool, -}; -use rocksdb::DBRawIterator; -use solana_measure::measure::Measure; -use solana_metrics::{datapoint_debug, datapoint_error}; -use solana_rayon_threadlimit::get_thread_count; -use solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}; -use solana_sdk::{ - clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK}, - genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, - hash::Hash, - pubkey::Pubkey, - sanitize::Sanitize, - signature::{Keypair, Signature, Signer}, - timing::timestamp, - transaction::Transaction, -}; -use solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta}; -use solana_transaction_status::{ - ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards, - TransactionStatusMeta, TransactionWithStatusMeta, -}; -use std::{ - borrow::Cow, - cell::RefCell, - cmp, - collections::{BTreeMap, HashMap, HashSet}, - convert::TryInto, - fs, - io::{Error as IoError, ErrorKind}, - path::{Path, PathBuf}, - rc::Rc, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{sync_channel, Receiver, SyncSender, TrySendError}, - Arc, Mutex, RwLock, RwLockWriteGuard, +use { + crate::{ + ancestor_iterator::AncestorIterator, + blockstore_db::{ + columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection, + IteratorMode, LedgerColumn, Result, WriteBatch, + }, + blockstore_meta::*, + entry::{create_ticks, Entry}, + erasure::ErasureConfig, + leader_schedule_cache::LeaderScheduleCache, + next_slots_iterator::NextSlotsIterator, + shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK}, }, - time::Instant, + bincode::deserialize, + log::*, + rayon::{ + iter::{IntoParallelRefIterator, ParallelIterator}, + ThreadPool, + }, + rocksdb::DBRawIterator, + solana_measure::measure::Measure, + solana_metrics::{datapoint_debug, datapoint_error}, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, + solana_sdk::{ + clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK}, + genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, + hash::Hash, + pubkey::Pubkey, + sanitize::Sanitize, + signature::{Keypair, Signature, Signer}, + timing::timestamp, + transaction::Transaction, + }, + solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta}, + solana_transaction_status::{ + ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards, + TransactionStatusMeta, TransactionWithStatusMeta, + }, + std::{ + borrow::Cow, + cell::RefCell, + cmp, + collections::{BTreeMap, HashMap, HashSet}, + convert::TryInto, + fs, + io::{Error as IoError, ErrorKind}, + path::{Path, PathBuf}, + rc::Rc, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{sync_channel, Receiver, Sender, SyncSender, TrySendError}, + Arc, Mutex, RwLock, RwLockWriteGuard, + }, + time::Instant, + }, + thiserror::Error, + trees::{Tree, TreeWalk}, }; -use thiserror::Error; -use trees::{Tree, TreeWalk}; pub mod blockstore_purge; @@ -804,6 +806,7 @@ impl Blockstore { is_repaired: Vec, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, + retransmit_sender: Option<&Sender>>, handle_duplicate: &F, metrics: &mut BlockstoreInsertionMetrics, ) -> Result<(Vec, Vec)> @@ -815,7 +818,7 @@ impl Blockstore { let mut start = Measure::start("Blockstore lock"); let _lock = self.insert_shreds_lock.lock().unwrap(); start.stop(); - let insert_lock_elapsed = start.as_us(); + metrics.insert_lock_elapsed += start.as_us(); let db = &*self.db; let mut write_batch = db.batch()?; @@ -826,73 +829,56 @@ impl Blockstore { let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); - let num_shreds = shreds.len(); + metrics.num_shreds += shreds.len(); let mut start = Measure::start("Shred insertion"); - let mut num_inserted = 0; let mut index_meta_time = 0; let mut newly_completed_data_sets: Vec = vec![]; let mut inserted_indices = Vec::new(); - shreds - .into_iter() - .zip(is_repaired.into_iter()) - .enumerate() - .for_each(|(i, (shred, is_repaired))| { - if shred.is_data() { - let shred_slot = shred.slot(); - let shred_source = if is_repaired { - ShredSource::Repaired - } else { - ShredSource::Turbine - }; - if let Ok(completed_data_sets) = self.check_insert_data_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - &mut index_meta_time, - is_trusted, - handle_duplicate, - leader_schedule, - shred_source, - ) { - newly_completed_data_sets.extend(completed_data_sets.into_iter().map( - |(start_index, end_index)| CompletedDataSetInfo { - slot: shred_slot, - start_index, - end_index, - }, - )); - inserted_indices.push(i); - num_inserted += 1; - } - } else if shred.is_code() { - self.check_cache_coding_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut just_inserted_coding_shreds, - &mut index_meta_time, - handle_duplicate, - is_trusted, - is_repaired, - ); + for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() { + if shred.is_data() { + let shred_source = if is_repaired { + ShredSource::Repaired } else { - panic!("There should be no other case"); + ShredSource::Turbine + }; + if let Ok(completed_data_sets) = self.check_insert_data_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + &mut index_meta_time, + is_trusted, + handle_duplicate, + leader_schedule, + shred_source, + ) { + newly_completed_data_sets.extend(completed_data_sets); + inserted_indices.push(i); + metrics.num_inserted += 1; } - }); + } else if shred.is_code() { + self.check_cache_coding_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut just_inserted_coding_shreds, + &mut index_meta_time, + handle_duplicate, + is_trusted, + is_repaired, + ); + } else { + panic!("There should be no other case"); + } + } start.stop(); - let insert_shreds_elapsed = start.as_us(); + metrics.insert_shreds_elapsed += start.as_us(); let mut start = Measure::start("Shred recovery"); - let mut num_recovered = 0; - let mut num_recovered_inserted = 0; - let mut num_recovered_failed_sig = 0; - let mut num_recovered_failed_invalid = 0; - let mut num_recovered_exists = 0; if let Some(leader_schedule_cache) = leader_schedule { - let recovered_data = Self::try_shred_recovery( + let recovered_data_shreds = Self::try_shred_recovery( db, &erasure_metas, &mut index_working_set, @@ -900,71 +886,73 @@ impl Blockstore { &mut just_inserted_coding_shreds, ); - num_recovered = recovered_data.len(); - recovered_data.into_iter().for_each(|shred| { - if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { - let shred_slot = shred.slot(); - if shred.verify(&leader) { - match self.check_insert_data_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - &mut index_meta_time, - is_trusted, - &handle_duplicate, - leader_schedule, - ShredSource::Recovered, - ) { - Err(InsertDataShredError::Exists) => { - num_recovered_exists += 1; - } - Err(InsertDataShredError::InvalidShred) => { - num_recovered_failed_invalid += 1; - } - Err(InsertDataShredError::BlockstoreError(_)) => {} - Ok(completed_data_sets) => { - newly_completed_data_sets.extend( - completed_data_sets.into_iter().map( - |(start_index, end_index)| CompletedDataSetInfo { - slot: shred_slot, - start_index, - end_index, - }, - ), - ); - num_recovered_inserted += 1; - } - } - } else { - num_recovered_failed_sig += 1; + metrics.num_recovered += recovered_data_shreds.len(); + let recovered_data_shreds: Vec<_> = recovered_data_shreds + .into_iter() + .filter_map(|shred| { + let leader = + leader_schedule_cache.slot_leader_at(shred.slot(), /*bank=*/ None)?; + if !shred.verify(&leader) { + metrics.num_recovered_failed_sig += 1; + return None; } + match self.check_insert_data_shred( + shred.clone(), + &mut erasure_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + &mut index_meta_time, + is_trusted, + &handle_duplicate, + leader_schedule, + ShredSource::Recovered, + ) { + Err(InsertDataShredError::Exists) => { + metrics.num_recovered_exists += 1; + None + } + Err(InsertDataShredError::InvalidShred) => { + metrics.num_recovered_failed_invalid += 1; + None + } + Err(InsertDataShredError::BlockstoreError(_)) => None, + Ok(completed_data_sets) => { + newly_completed_data_sets.extend(completed_data_sets); + metrics.num_recovered_inserted += 1; + Some(shred) + } + } + }) + // Always collect recovered-shreds so that above insert code is + // executed even if retransmit-sender is None. + .collect(); + if !recovered_data_shreds.is_empty() { + if let Some(retransmit_sender) = retransmit_sender { + let _ = retransmit_sender.send(recovered_data_shreds); } - }); + } } start.stop(); - let shred_recovery_elapsed = start.as_us(); + metrics.shred_recovery_elapsed += start.as_us(); - just_inserted_coding_shreds - .into_iter() - .for_each(|((_, _), shred)| { - self.check_insert_coding_shred( - shred, - &mut index_working_set, - &mut write_batch, - &mut index_meta_time, - ); - num_inserted += 1; - }); + metrics.num_inserted += just_inserted_coding_shreds.len() as u64; + for (_, shred) in just_inserted_coding_shreds.into_iter() { + self.check_insert_coding_shred( + shred, + &mut index_working_set, + &mut write_batch, + &mut index_meta_time, + ); + } let mut start = Measure::start("Shred recovery"); // Handle chaining for the members of the slot_meta_working_set that were inserted into, // drop the others handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?; start.stop(); - let chaining_elapsed = start.as_us(); + metrics.chaining_elapsed += start.as_us(); let mut start = Measure::start("Commit Working Sets"); let (should_signal, newly_completed_slots) = commit_slot_meta_working_set( @@ -983,12 +971,12 @@ impl Blockstore { } } start.stop(); - let commit_working_sets_elapsed = start.as_us(); + metrics.commit_working_sets_elapsed += start.as_us(); let mut start = Measure::start("Write Batch"); self.db.write(write_batch)?; start.stop(); - let write_batch_elapsed = start.as_us(); + metrics.write_batch_elapsed += start.as_us(); send_signals( &self.new_shreds_signals, @@ -999,20 +987,7 @@ impl Blockstore { total_start.stop(); - metrics.num_shreds += num_shreds; metrics.total_elapsed += total_start.as_us(); - metrics.insert_lock_elapsed += insert_lock_elapsed; - metrics.insert_shreds_elapsed += insert_shreds_elapsed; - metrics.shred_recovery_elapsed += shred_recovery_elapsed; - metrics.chaining_elapsed += chaining_elapsed; - metrics.commit_working_sets_elapsed += commit_working_sets_elapsed; - metrics.write_batch_elapsed += write_batch_elapsed; - metrics.num_inserted += num_inserted; - metrics.num_recovered += num_recovered; - metrics.num_recovered_inserted += num_recovered_inserted; - metrics.num_recovered_failed_sig += num_recovered_failed_sig; - metrics.num_recovered_failed_invalid = num_recovered_failed_invalid; - metrics.num_recovered_exists = num_recovered_exists; metrics.index_meta_time += index_meta_time; Ok((newly_completed_data_sets, inserted_indices)) @@ -1054,7 +1029,8 @@ impl Blockstore { vec![false; shreds_len], leader_schedule, is_trusted, - &|_| {}, + None, // retransmit-sender + &|_| {}, // handle-duplicates &mut BlockstoreInsertionMetrics::default(), ) } @@ -1230,7 +1206,7 @@ impl Blockstore { handle_duplicate: &F, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, - ) -> std::result::Result, InsertDataShredError> + ) -> std::result::Result, InsertDataShredError> where F: Fn(Shred), { @@ -1496,7 +1472,7 @@ impl Blockstore { shred: &Shred, write_batch: &mut WriteBatch, shred_source: ShredSource, - ) -> Result> { + ) -> Result> { let slot = shred.slot(); let index = u64::from(shred.index()); @@ -1545,7 +1521,14 @@ impl Blockstore { new_consumed, shred.reference_tick(), data_index, - ); + ) + .into_iter() + .map(|(start_index, end_index)| CompletedDataSetInfo { + slot, + start_index, + end_index, + }) + .collect(); if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered { let mut slots_stats = self.slots_stats.lock().unwrap(); let mut e = slots_stats.stats.entry(slot_meta.slot).or_default();