retransmits shreds recovered from erasure codes (backport #19233) (#20249)

* removes packet-count metrics from retransmit stage

Working towards sending shreds (instead of packets) to retransmit stage
so that shreds recovered from erasure codes are as well retransmitted.

Following commit will add these metrics back to window-service, earlier
in the pipeline.

(cherry picked from commit bf437b0336)

# Conflicts:
#	core/src/retransmit_stage.rs

* adds packet/shred count stats to window-service

Adding back these metrics from the earlier commit which removed them
from retransmit stage.

(cherry picked from commit 8198a7eae1)

* removes erroneous uses of Arc<...> from retransmit stage

(cherry picked from commit 6e413331b5)

# Conflicts:
#	core/src/retransmit_stage.rs
#	core/src/tvu.rs

* sends shreds (instead of packets) to retransmit stage

Working towards channelling through shreds recovered from erasure codes
to retransmit stage.

(cherry picked from commit 3efccbffab)

# Conflicts:
#	core/src/retransmit_stage.rs

* returns completed-data-set-info from insert_data_shred

instead of opaque (u32, u32) which are then converted to
CompletedDataSetInfo at the call-site.

(cherry picked from commit 3c71670bd9)

# Conflicts:
#	ledger/src/blockstore.rs

* retransmits shreds recovered from erasure codes

Shreds recovered from erasure codes have not been received from turbine
and have not been retransmitted to other nodes downstream. This results
in more repairs across the cluster which is slower.

This commit channels through recovered shreds to retransmit stage in
order to further broadcast the shreds to downstream nodes in the tree.

(cherry picked from commit 7a8807b8bb)

# Conflicts:
#	core/src/retransmit_stage.rs
#	core/src/window_service.rs

* removes backport merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-09-27 18:11:37 +00:00
committed by GitHub
parent e20fdde0a4
commit a1a0c63862
8 changed files with 421 additions and 548 deletions

View File

@ -17,11 +17,10 @@ use {
shred::Shredder,
},
solana_measure::measure::Measure,
solana_perf::packet::{Packet, Packets},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
hash::Hash,
pubkey,
pubkey::Pubkey,
signature::{Keypair, Signer},
system_transaction,
timing::timestamp,
@ -40,6 +39,13 @@ use {
test::Bencher,
};
// TODO: The benchmark is ignored as it currently may indefinitely block.
// The code incorrectly expects that the node receiving the shred on tvu socket
// retransmits that to other nodes in its neighborhood. But that is no longer
// the case since https://github.com/solana-labs/solana/pull/17716.
// So depending on shred seed, peers may not receive packets and the receive
// threads loop indefinitely.
#[ignore]
#[bench]
#[allow(clippy::same_item_push)]
fn bench_retransmitter(bencher: &mut Bencher) {
@ -52,12 +58,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
const NUM_PEERS: usize = 4;
let mut peer_sockets = Vec::new();
for _ in 0..NUM_PEERS {
// This ensures that cluster_info.id() is the root of turbine
// retransmit tree and so the shreds are retransmited to all other
// nodes in the cluster.
let id = std::iter::repeat_with(pubkey::new_rand)
.find(|pk| cluster_info.id() < *pk)
.unwrap();
let id = Pubkey::new_unique();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut contact_info = ContactInfo::new_localhost(&id, timestamp());
contact_info.tvu = socket.local_addr().unwrap();
@ -76,8 +77,8 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let bank_forks = BankForks::new(bank0);
let bank = bank_forks.working_bank();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let (packet_sender, packet_receiver) = channel();
let packet_receiver = Arc::new(Mutex::new(packet_receiver));
let (shreds_sender, shreds_receiver) = channel();
let shreds_receiver = Arc::new(Mutex::new(shreds_receiver));
const NUM_THREADS: usize = 2;
let sockets = (0..NUM_THREADS)
.map(|_| UdpSocket::bind("0.0.0.0:0").unwrap())
@ -107,9 +108,9 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let retransmitter_handles = retransmitter(
Arc::new(sockets),
bank_forks,
&leader_schedule_cache,
leader_schedule_cache,
cluster_info,
packet_receiver,
shreds_receiver,
Arc::default(), // solana_rpc::max_slots::MaxSlots
None,
);
@ -148,9 +149,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
shred.set_index(index);
index += 1;
index %= 200;
let mut p = Packet::default();
shred.copy_to_packet(&mut p);
let _ = packet_sender.send(Packets::new(vec![p]));
let _ = shreds_sender.send(vec![shred.clone()]);
}
slot += 1;

View File

@ -1,10 +1,13 @@
// Get a unique hash value for a packet
// Used in retransmit and shred fetch to prevent dos with same packet data.
use ahash::AHasher;
use rand::{thread_rng, Rng};
use solana_perf::packet::Packet;
use std::hash::Hasher;
use {
ahash::AHasher,
rand::{thread_rng, Rng},
solana_ledger::shred::Shred,
solana_perf::packet::Packet,
std::hash::Hasher,
};
#[derive(Clone)]
pub struct PacketHasher {
@ -22,9 +25,18 @@ impl Default for PacketHasher {
}
impl PacketHasher {
pub fn hash_packet(&self, packet: &Packet) -> u64 {
pub(crate) fn hash_packet(&self, packet: &Packet) -> u64 {
let size = packet.data.len().min(packet.meta.size);
self.hash_data(&packet.data[..size])
}
pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 {
self.hash_data(&shred.payload)
}
fn hash_data(&self, data: &[u8]) -> u64 {
let mut hasher = AHasher::new_with_keys(self.seed1, self.seed2);
hasher.write(&packet.data[0..packet.meta.size]);
hasher.write(data);
hasher.finish()
}

View File

@ -18,7 +18,7 @@ use {
solana_client::rpc_response::SlotUpdate,
solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
solana_ledger::{
shred::{get_shred_slot_index_type, ShredFetchStats},
shred::Shred,
{
blockstore::{Blockstore, CompletedSlotsReceiver},
leader_schedule_cache::LeaderScheduleCache,
@ -26,7 +26,7 @@ use {
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
solana_perf::packet::{Packet, Packets},
solana_perf::packet::Packets,
solana_rpc::{
max_slots::MaxSlots, rpc_completed_slots_service::RpcCompletedSlotsService,
rpc_subscriptions::RpcSubscriptions,
@ -38,17 +38,13 @@ use {
pubkey::Pubkey,
timing::{timestamp, AtomicInterval},
},
solana_streamer::streamer::PacketReceiver,
std::{
collections::{
hash_set::HashSet,
{BTreeMap, BTreeSet, HashMap},
},
collections::{BTreeSet, HashSet},
net::UdpSocket,
ops::DerefMut,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc::{channel, RecvTimeoutError},
mpsc::{self, channel, RecvTimeoutError},
Arc, Mutex, RwLock,
},
thread::{self, Builder, JoinHandle},
@ -59,64 +55,48 @@ use {
const MAX_DUPLICATE_COUNT: usize = 2;
const DEFAULT_LRU_SIZE: usize = 10_000;
// Limit a given thread to consume about this many packets so that
// Limit a given thread to consume about this many shreds so that
// it doesn't pull up too much work.
const MAX_PACKET_BATCH_SIZE: usize = 100;
const MAX_SHREDS_BATCH_SIZE: usize = 100;
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
#[derive(Default)]
struct RetransmitStats {
total_packets: AtomicU64,
num_shreds: AtomicU64,
num_shreds_skipped: AtomicU64,
total_batches: AtomicU64,
total_time: AtomicU64,
epoch_fetch: AtomicU64,
epoch_cache_update: AtomicU64,
repair_total: AtomicU64,
discard_total: AtomicU64,
duplicate_retransmit: AtomicU64,
retransmit_total: AtomicU64,
last_ts: AtomicInterval,
compute_turbine_peers_total: AtomicU64,
retransmit_tree_mismatch: AtomicU64,
packets_by_slot: Mutex<BTreeMap<Slot, usize>>,
packets_by_source: Mutex<BTreeMap<String, usize>>,
}
#[allow(clippy::too_many_arguments)]
fn update_retransmit_stats(
stats: &RetransmitStats,
total_time: u64,
total_packets: usize,
num_shreds: usize,
num_shreds_skipped: usize,
retransmit_total: u64,
discard_total: u64,
repair_total: u64,
duplicate_retransmit: u64,
compute_turbine_peers_total: u64,
peers_len: usize,
packets_by_slot: HashMap<Slot, usize>,
packets_by_source: HashMap<String, usize>,
epoch_fetch: u64,
epoch_cach_update: u64,
retransmit_tree_mismatch: u64,
) {
stats.total_time.fetch_add(total_time, Ordering::Relaxed);
stats
.total_packets
.fetch_add(total_packets as u64, Ordering::Relaxed);
.num_shreds
.fetch_add(num_shreds as u64, Ordering::Relaxed);
stats
.num_shreds_skipped
.fetch_add(num_shreds_skipped as u64, Ordering::Relaxed);
stats
.retransmit_total
.fetch_add(retransmit_total, Ordering::Relaxed);
stats
.repair_total
.fetch_add(repair_total, Ordering::Relaxed);
stats
.discard_total
.fetch_add(discard_total, Ordering::Relaxed);
stats
.duplicate_retransmit
.fetch_add(duplicate_retransmit, Ordering::Relaxed);
stats
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
@ -125,22 +105,6 @@ fn update_retransmit_stats(
stats
.epoch_cache_update
.fetch_add(epoch_cach_update, Ordering::Relaxed);
stats
.retransmit_tree_mismatch
.fetch_add(retransmit_tree_mismatch, Ordering::Relaxed);
{
let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap();
for (slot, count) in packets_by_slot {
*stats_packets_by_slot.entry(slot).or_insert(0) += count;
}
}
{
let mut stats_packets_by_source = stats.packets_by_source.lock().unwrap();
for (source, count) in packets_by_source {
*stats_packets_by_source.entry(source).or_insert(0) += count;
}
}
if stats.last_ts.should_update(2000) {
datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64));
datapoint_info!(
@ -166,8 +130,13 @@ fn update_retransmit_stats(
i64
),
(
"total_packets",
stats.total_packets.swap(0, Ordering::Relaxed) as i64,
"num_shreds",
stats.num_shreds.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"num_shreds_skipped",
stats.num_shreds_skipped.swap(0, Ordering::Relaxed) as i64,
i64
),
(
@ -175,94 +144,40 @@ fn update_retransmit_stats(
stats.retransmit_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"retransmit_tree_mismatch",
stats.retransmit_tree_mismatch.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"compute_turbine",
stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"repair_total",
stats.repair_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"discard_total",
stats.discard_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"duplicate_retransmit",
stats.duplicate_retransmit.swap(0, Ordering::Relaxed) as i64,
i64
),
);
let mut packets_by_slot = stats.packets_by_slot.lock().unwrap();
let old_packets_by_slot = std::mem::take(&mut *packets_by_slot);
drop(packets_by_slot);
for (slot, num_shreds) in old_packets_by_slot {
datapoint_info!(
"retransmit-slot-num-packets",
("slot", slot, i64),
("num_shreds", num_shreds, i64)
);
}
let mut packets_by_source = stats.packets_by_source.lock().unwrap();
let mut top = BTreeMap::new();
let mut max = 0;
for (source, num) in packets_by_source.iter() {
if *num > max {
top.insert(*num, source.clone());
if top.len() > 5 {
let last = *top.iter().next().unwrap().0;
top.remove(&last);
}
max = *top.iter().next().unwrap().0;
}
}
info!(
"retransmit: top packets_by_source: {:?} len: {}",
top,
packets_by_source.len()
);
packets_by_source.clear();
}
}
// Map of shred (slot, index, is_data) => list of hash values seen for that key.
pub type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>;
type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>;
pub type ShredFilterAndHasher = (ShredFilter, PacketHasher);
type ShredFilterAndHasher = (ShredFilter, PacketHasher);
// Returns None if shred is already received and should skip retransmit.
// Otherwise returns shred's slot and whether the shred is a data shred.
fn check_if_already_received(
packet: &Packet,
shreds_received: &Mutex<ShredFilterAndHasher>,
) -> Option<Slot> {
let shred = get_shred_slot_index_type(packet, &mut ShredFetchStats::default())?;
// Returns true if shred is already received and should skip retransmit.
fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex<ShredFilterAndHasher>) -> bool {
let key = (shred.slot(), shred.index(), shred.is_data());
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
match cache.get_mut(&shred) {
Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => None,
match cache.get_mut(&key) {
Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true,
Some(sent) => {
let hash = hasher.hash_packet(packet);
let hash = hasher.hash_shred(shred);
if sent.contains(&hash) {
None
true
} else {
sent.push(hash);
Some(shred.0)
false
}
}
None => {
let hash = hasher.hash_packet(packet);
cache.put(shred, vec![hash]);
Some(shred.0)
let hash = hasher.hash_shred(shred);
cache.put(key, vec![hash]);
false
}
}
}
@ -317,7 +232,7 @@ fn retransmit(
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
cluster_info: &ClusterInfo,
r: &Mutex<PacketReceiver>,
shreds_receiver: &Mutex<mpsc::Receiver<Vec<Shred>>>,
sock: &UdpSocket,
id: u32,
stats: &RetransmitStats,
@ -326,22 +241,19 @@ fn retransmit(
shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: &Option<Arc<RpcSubscriptions>>,
rpc_subscriptions: Option<&RpcSubscriptions>,
) -> Result<()> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let r_lock = r.lock().unwrap();
let packets = r_lock.recv_timeout(RECV_TIMEOUT)?;
let shreds_receiver = shreds_receiver.lock().unwrap();
let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?;
let mut timer_start = Measure::start("retransmit");
let mut total_packets = packets.packets.len();
let mut packets = vec![packets];
while let Ok(nq) = r_lock.try_recv() {
total_packets += nq.packets.len();
packets.push(nq);
if total_packets >= MAX_PACKET_BATCH_SIZE {
while let Ok(more_shreds) = shreds_receiver.try_recv() {
shreds.extend(more_shreds);
if shreds.len() >= MAX_SHREDS_BATCH_SIZE {
break;
}
}
drop(r_lock);
drop(shreds_receiver);
let mut epoch_fetch = Measure::start("retransmit_epoch_fetch");
let (working_bank, root_bank) = {
@ -354,37 +266,19 @@ fn retransmit(
maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts);
epoch_cache_update.stop();
let num_shreds = shreds.len();
let my_id = cluster_info.id();
let socket_addr_space = cluster_info.socket_addr_space();
let mut discard_total = 0;
let mut repair_total = 0;
let mut duplicate_retransmit = 0;
let mut retransmit_total = 0;
let mut num_shreds_skipped = 0;
let mut compute_turbine_peers_total = 0;
let mut retransmit_tree_mismatch = 0;
let mut packets_by_slot: HashMap<Slot, usize> = HashMap::new();
let mut packets_by_source: HashMap<String, usize> = HashMap::new();
let mut max_slot = 0;
for packet in packets.iter().flat_map(|p| p.packets.iter()) {
// skip discarded packets and repair packets
if packet.meta.discard {
total_packets -= 1;
discard_total += 1;
for shred in shreds {
if should_skip_retransmit(&shred, shreds_received) {
num_shreds_skipped += 1;
continue;
}
if packet.meta.repair {
total_packets -= 1;
repair_total += 1;
continue;
}
let shred_slot = match check_if_already_received(packet, shreds_received) {
Some(slot) => slot,
None => {
total_packets -= 1;
duplicate_retransmit += 1;
continue;
}
};
let shred_slot = shred.slot();
max_slot = max_slot.max(shred_slot);
if let Some(rpc_subscriptions) = rpc_subscriptions {
@ -397,28 +291,17 @@ fn retransmit(
}
let mut compute_turbine_peers = Measure::start("turbine_start");
// TODO: consider using root-bank here for leader lookup!
let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank));
let cluster_nodes =
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
let shred_seed = shred.seed(slot_leader, &root_bank);
let (neighbors, children) =
cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader);
// If the node is on the critical path (i.e. the first node in each
// neighborhood), then we expect that the packet arrives at tvu socket
// as opposed to tvu-forwards. If this is not the case, then the
// turbine broadcast/retransmit tree is mismatched across nodes.
cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader);
let anchor_node = neighbors[0].id == my_id;
if packet.meta.forward == anchor_node {
// TODO: Consider forwarding the packet to the root node here.
retransmit_tree_mismatch += 1;
}
compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us();
*packets_by_slot.entry(packet.meta.slot).or_default() += 1;
*packets_by_source
.entry(packet.meta.addr().to_string())
.or_default() += 1;
let mut retransmit_time = Measure::start("retransmit_to");
// If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its
@ -428,15 +311,15 @@ fn retransmit(
// First neighbor is this node itself, so skip it.
ClusterInfo::retransmit_to(
&neighbors[1..],
packet,
&shred.payload,
sock,
/*forward socket=*/ true,
true, // forward socket
socket_addr_space,
);
}
ClusterInfo::retransmit_to(
&children,
packet,
&shred.payload,
sock,
!anchor_node, // send to forward socket!
socket_addr_space,
@ -447,8 +330,8 @@ fn retransmit(
max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed);
timer_start.stop();
debug!(
"retransmitted {} packets in {}ms retransmit_time: {}ms id: {}",
total_packets,
"retransmitted {} shreds in {}ms retransmit_time: {}ms id: {}",
num_shreds,
timer_start.as_ms(),
retransmit_total,
id,
@ -458,18 +341,13 @@ fn retransmit(
update_retransmit_stats(
stats,
timer_start.as_us(),
total_packets,
num_shreds,
num_shreds_skipped,
retransmit_total,
discard_total,
repair_total,
duplicate_retransmit,
compute_turbine_peers_total,
cluster_nodes.num_peers(),
packets_by_slot,
packets_by_source,
epoch_fetch.as_us(),
epoch_cache_update.as_us(),
retransmit_tree_mismatch,
);
Ok(())
@ -486,9 +364,9 @@ fn retransmit(
pub fn retransmitter(
sockets: Arc<Vec<UdpSocket>>,
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>,
r: Arc<Mutex<PacketReceiver>>,
shreds_receiver: Arc<Mutex<mpsc::Receiver<Vec<Shred>>>>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Vec<JoinHandle<()>> {
@ -508,7 +386,7 @@ pub fn retransmitter(
let sockets = sockets.clone();
let bank_forks = bank_forks.clone();
let leader_schedule_cache = leader_schedule_cache.clone();
let r = r.clone();
let shreds_receiver = shreds_receiver.clone();
let cluster_info = cluster_info.clone();
let stats = stats.clone();
let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache);
@ -527,7 +405,7 @@ pub fn retransmitter(
&bank_forks,
&leader_schedule_cache,
&cluster_info,
&r,
&shreds_receiver,
&sockets[s],
s as u32,
&stats,
@ -536,7 +414,7 @@ pub fn retransmitter(
&shreds_received,
&max_slots,
&first_shreds_received,
&rpc_subscriptions,
rpc_subscriptions.as_deref(),
) {
match e {
Error::RecvTimeout(RecvTimeoutError::Disconnected) => break,
@ -554,7 +432,7 @@ pub fn retransmitter(
.collect()
}
pub struct RetransmitStage {
pub(crate) struct RetransmitStage {
thread_hdls: Vec<JoinHandle<()>>,
window_service: WindowService,
cluster_slots_service: ClusterSlotsService,
@ -563,15 +441,15 @@ pub struct RetransmitStage {
impl RetransmitStage {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::too_many_arguments)]
pub fn new(
pub(crate) fn new(
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
blockstore: Arc<Blockstore>,
cluster_info: &Arc<ClusterInfo>,
cluster_info: Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<Packets>>,
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
rpc_completed_slots_receiver: CompletedSlotsReceiver,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
@ -582,20 +460,22 @@ impl RetransmitStage {
verified_vote_receiver: VerifiedVoteReceiver,
repair_validators: Option<HashSet<Pubkey>>,
completed_data_sets_sender: CompletedDataSetsSender,
max_slots: &Arc<MaxSlots>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
duplicate_slots_sender: Sender<Slot>,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
// https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136
let _retransmit_sender = retransmit_sender.clone();
let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver));
let t_retransmit = retransmitter(
retransmit_sockets,
bank_forks.clone(),
leader_schedule_cache,
leader_schedule_cache.clone(),
cluster_info.clone(),
retransmit_receiver,
Arc::clone(max_slots),
max_slots,
rpc_subscriptions.clone(),
);
@ -619,13 +499,13 @@ impl RetransmitStage {
};
let window_service = WindowService::new(
blockstore,
cluster_info.clone(),
cluster_info,
verified_receiver,
retransmit_sender,
repair_socket,
exit.clone(),
exit,
repair_info,
leader_schedule_cache.clone(),
leader_schedule_cache,
move |id, shred, working_bank, last_root| {
let is_connected = cfg
.as_ref()
@ -659,7 +539,7 @@ impl RetransmitStage {
}
}
pub fn join(self) -> thread::Result<()> {
pub(crate) fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
@ -671,17 +551,19 @@ impl RetransmitStage {
#[cfg(test)]
mod tests {
use super::*;
use solana_gossip::contact_info::ContactInfo;
use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions};
use solana_ledger::create_new_tmp_ledger;
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_ledger::shred::Shred;
use solana_net_utils::find_available_port_in_range;
use solana_perf::packet::{Packet, Packets};
use solana_sdk::signature::Keypair;
use solana_streamer::socket::SocketAddrSpace;
use std::net::{IpAddr, Ipv4Addr};
use {
super::*,
solana_gossip::contact_info::ContactInfo,
solana_ledger::{
blockstore_processor::{process_blockstore, ProcessOptions},
create_new_tmp_ledger,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
},
solana_net_utils::find_available_port_in_range,
solana_sdk::signature::Keypair,
solana_streamer::socket::SocketAddrSpace,
std::net::{IpAddr, Ipv4Addr},
};
#[test]
fn test_skip_repair() {
@ -729,37 +611,20 @@ mod tests {
let cluster_info = Arc::new(cluster_info);
let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = retransmitter(
let _retransmit_sender = retransmit_sender.clone();
let _t_retransmit = retransmitter(
retransmit_socket,
bank_forks,
&leader_schedule_cache,
leader_schedule_cache,
cluster_info,
Arc::new(Mutex::new(retransmit_receiver)),
Arc::default(), // MaxSlots
None,
);
let _thread_hdls = vec![t_retransmit];
let mut shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
let mut packet = Packet::default();
shred.copy_to_packet(&mut packet);
let packets = Packets::new(vec![packet.clone()]);
let shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
// it should send this over the sockets.
retransmit_sender.send(packets).unwrap();
let mut packets = Packets::new(vec![]);
solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap();
assert_eq!(packets.packets.len(), 1);
assert!(!packets.packets[0].meta.repair);
let mut repair = packet.clone();
repair.meta.repair = true;
shred.set_slot(1);
shred.copy_to_packet(&mut packet);
// send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from
let packets = Packets::new(vec![repair, packet]);
retransmit_sender.send(packets).unwrap();
retransmit_sender.send(vec![shred]).unwrap();
let mut packets = Packets::new(vec![]);
solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap();
assert_eq!(packets.packets.len(), 1);
@ -768,61 +633,42 @@ mod tests {
#[test]
fn test_already_received() {
let mut packet = Packet::default();
let slot = 1;
let index = 5;
let version = 0x40;
let shred = Shred::new_from_data(slot, index, 0, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet);
let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default())));
// unique shred for (1, 5) should pass
assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
assert!(!should_skip_retransmit(&shred, &shreds_received));
// duplicate shred for (1, 5) blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert!(should_skip_retransmit(&shred, &shreds_received));
let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet);
// first duplicate shred for (1, 5) passed
assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
assert!(!should_skip_retransmit(&shred, &shreds_received));
// then blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert!(should_skip_retransmit(&shred, &shreds_received));
let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet);
// 2nd duplicate shred for (1, 5) blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(&shred, &shreds_received));
let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, version);
shred.copy_to_packet(&mut packet);
// Coding at (1, 5) passes
assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
assert!(!should_skip_retransmit(&shred, &shreds_received));
// then blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert!(should_skip_retransmit(&shred, &shreds_received));
let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, version);
shred.copy_to_packet(&mut packet);
// 2nd unique coding at (1, 5) passes
assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
assert!(!should_skip_retransmit(&shred, &shreds_received));
// same again is blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert!(should_skip_retransmit(&shred, &shreds_received));
let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, version);
shred.copy_to_packet(&mut packet);
// Another unique coding at (1, 5) always blocked
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(&shred, &shreds_received));
}
}

View File

@ -172,13 +172,13 @@ impl Tvu {
let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
let retransmit_stage = RetransmitStage::new(
bank_forks.clone(),
leader_schedule_cache,
leader_schedule_cache.clone(),
blockstore.clone(),
cluster_info,
cluster_info.clone(),
Arc::new(retransmit_sockets),
repair_socket,
verified_receiver,
&exit,
exit.clone(),
completed_slots_receiver,
cluster_slots_update_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
@ -189,7 +189,7 @@ impl Tvu {
verified_vote_receiver,
tvu_config.repair_validators,
completed_data_sets_sender,
max_slots,
max_slots.clone(),
Some(rpc_subscriptions.clone()),
duplicate_slots_sender,
);

View File

@ -25,14 +25,15 @@ use {
solana_perf::packet::{Packet, Packets},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms},
solana_streamer::streamer::PacketSender,
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey},
std::collections::HashSet,
std::{
cmp::Reverse,
collections::HashMap,
net::{SocketAddr, UdpSocket},
ops::Deref,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::Sender,
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
@ -71,6 +72,58 @@ impl WindowServiceMetrics {
}
}
#[derive(Default)]
struct ReceiveWindowStats {
num_packets: usize,
num_shreds: usize, // num_discards: num_packets - num_shreds
num_repairs: usize,
elapsed: Duration, // excludes waiting time on the receiver channel.
slots: HashMap<Slot, /*num shreds:*/ usize>,
addrs: HashMap</*source:*/ SocketAddr, /*num packets:*/ usize>,
since: Option<Instant>,
}
impl ReceiveWindowStats {
fn maybe_submit(&mut self) {
const MAX_NUM_ADDRS: usize = 5;
const SUBMIT_CADENCE: Duration = Duration::from_secs(2);
let elapsed = self.since.as_ref().map(Instant::elapsed);
if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default() {
return;
}
datapoint_info!(
"receive_window_stats",
("num_packets", self.num_packets, i64),
("num_shreds", self.num_shreds, i64),
("num_repairs", self.num_repairs, i64),
("elapsed_micros", self.elapsed.as_micros(), i64),
);
for (slot, num_shreds) in &self.slots {
datapoint_info!(
"receive_window_num_slot_shreds",
("slot", *slot, i64),
("num_shreds", *num_shreds, i64)
);
}
let mut addrs: Vec<_> = std::mem::take(&mut self.addrs).into_iter().collect();
let reverse_count = |(_addr, count): &_| Reverse(*count);
if addrs.len() > MAX_NUM_ADDRS {
addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count);
addrs.truncate(MAX_NUM_ADDRS);
}
addrs.sort_unstable_by_key(reverse_count);
info!(
"num addresses: {}, top packets by source: {:?}",
self.addrs.len(),
addrs
);
*self = Self {
since: Some(Instant::now()),
..Self::default()
};
}
}
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
if shred.is_data() {
// Only data shreds have parent information
@ -204,6 +257,7 @@ fn run_insert<F>(
metrics: &mut BlockstoreInsertionMetrics,
ws_metrics: &mut WindowServiceMetrics,
completed_data_sets_sender: &CompletedDataSetsSender,
retransmit_sender: &Sender<Vec<Shred>>,
outstanding_requests: &RwLock<OutstandingRepairs>,
) -> Result<()>
where
@ -231,7 +285,8 @@ where
shreds,
repairs,
Some(leader_schedule_cache),
false,
false, // is_trusted
Some(retransmit_sender),
&handle_duplicate,
metrics,
)?;
@ -252,14 +307,13 @@ where
fn recv_window<F>(
blockstore: &Blockstore,
leader_schedule_cache: &LeaderScheduleCache,
bank_forks: &RwLock<BankForks>,
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
my_pubkey: &Pubkey,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
retransmit: &PacketSender,
retransmit_sender: &Sender<Vec<Shred>>,
shred_filter: F,
thread_pool: &ThreadPool,
stats: &mut ReceiveWindowStats,
) -> Result<()>
where
F: Fn(&Shred, Arc<Bank>, /*last root:*/ Slot) -> bool + Sync,
@ -267,16 +321,10 @@ where
let timer = Duration::from_millis(200);
let mut packets = verified_receiver.recv_timeout(timer)?;
packets.extend(verified_receiver.try_iter().flatten());
let total_packets: usize = packets.iter().map(|p| p.packets.len()).sum();
let now = Instant::now();
inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let last_root = blockstore.last_root();
let handle_packet = |packet: &mut Packet| {
let working_bank = bank_forks.read().unwrap().working_bank();
let handle_packet = |packet: &Packet| {
if packet.meta.discard {
inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1);
return None;
@ -286,20 +334,10 @@ where
// call to `new_from_serialized_shred` is safe.
assert_eq!(packet.data.len(), PACKET_DATA_SIZE);
let serialized_shred = packet.data.to_vec();
let working_bank = Arc::clone(&working_bank);
let shred = match Shred::new_from_serialized_shred(serialized_shred) {
Ok(shred) if shred_filter(&shred, working_bank, last_root) => {
let leader_pubkey =
leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref()));
packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed(leader_pubkey, root_bank.deref());
shred
}
Ok(_) | Err(_) => {
packet.meta.discard = true;
return None;
}
};
let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?;
if !shred_filter(&shred, working_bank.clone(), last_root) {
return None;
}
if packet.meta.repair {
let repair_info = RepairMeta {
_from_addr: packet.meta.addr(),
@ -313,29 +351,32 @@ where
};
let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
packets
.par_iter_mut()
.flat_map_iter(|packet| packet.packets.iter_mut().filter_map(handle_packet))
.par_iter()
.flat_map_iter(|pkt| pkt.packets.iter().filter_map(handle_packet))
.unzip()
});
trace!("{:?} shreds from packets", shreds.len());
trace!("{} num total shreds received: {}", my_pubkey, total_packets);
for packets in packets.into_iter() {
if !packets.is_empty() {
// Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit)
let _ = retransmit.send(packets);
}
// Exclude repair packets from retransmit.
let _ = retransmit_sender.send(
shreds
.iter()
.zip(&repair_infos)
.filter(|(_, repair_info)| repair_info.is_none())
.map(|(shred, _)| shred)
.cloned()
.collect(),
);
stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
stats.num_shreds += shreds.len();
for shred in &shreds {
*stats.slots.entry(shred.slot()).or_default() += 1;
}
insert_shred_sender.send((shreds, repair_infos))?;
trace!(
"Elapsed processing time in recv_window(): {}",
duration_as_ms(&now.elapsed())
);
stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::<usize>();
for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) {
*stats.addrs.entry(packet.meta.addr()).or_default() += 1;
}
stats.elapsed += now.elapsed();
Ok(())
}
@ -375,7 +416,7 @@ impl WindowService {
blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
retransmit: PacketSender,
retransmit_sender: Sender<Vec<Shred>>,
repair_socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
repair_info: RepairInfo,
@ -421,10 +462,11 @@ impl WindowService {
let t_insert = Self::start_window_insert_thread(
exit.clone(),
blockstore.clone(),
leader_schedule_cache.clone(),
leader_schedule_cache,
insert_receiver,
duplicate_sender,
completed_data_sets_sender,
retransmit_sender.clone(),
outstanding_requests,
);
@ -435,9 +477,8 @@ impl WindowService {
insert_sender,
verified_receiver,
shred_filter,
leader_schedule_cache,
bank_forks,
retransmit,
retransmit_sender,
);
WindowService {
@ -487,6 +528,7 @@ impl WindowService {
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
check_duplicate_sender: CrossbeamSender<Shred>,
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_sender: Sender<Vec<Shred>>,
outstanding_requests: Arc<RwLock<OutstandingRepairs>>,
) -> JoinHandle<()> {
let mut handle_timeout = || {};
@ -516,6 +558,7 @@ impl WindowService {
&mut metrics,
&mut ws_metrics,
&completed_data_sets_sender,
&retransmit_sender,
&outstanding_requests,
) {
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
@ -543,9 +586,8 @@ impl WindowService {
insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
shred_filter: F,
leader_schedule_cache: Arc<LeaderScheduleCache>,
bank_forks: Arc<RwLock<BankForks>>,
retransmit: PacketSender,
retransmit_sender: Sender<Vec<Shred>>,
) -> JoinHandle<()>
where
F: 'static
@ -553,6 +595,7 @@ impl WindowService {
+ std::marker::Send
+ std::marker::Sync,
{
let mut stats = ReceiveWindowStats::default();
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
@ -567,34 +610,25 @@ impl WindowService {
inc_new_counter_error!("solana-window-error", 1, 1);
};
loop {
if exit.load(Ordering::Relaxed) {
break;
}
while !exit.load(Ordering::Relaxed) {
let mut handle_timeout = || {
if now.elapsed() > Duration::from_secs(30) {
warn!("Window does not seem to be receiving data. Ensure port configuration is correct...");
warn!(
"Window does not seem to be receiving data. \
Ensure port configuration is correct..."
);
now = Instant::now();
}
};
if let Err(e) = recv_window(
&blockstore,
&leader_schedule_cache,
&bank_forks,
&insert_sender,
&id,
&verified_receiver,
&retransmit,
|shred, bank, last_root| {
shred_filter(
&id,
shred,
Some(bank),
last_root,
)
},
&retransmit_sender,
|shred, bank, last_root| shred_filter(&id, shred, Some(bank), last_root),
&thread_pool,
&mut stats,
) {
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
break;
@ -602,6 +636,7 @@ impl WindowService {
} else {
now = Instant::now();
}
stats.maybe_submit();
}
})
.unwrap()

View File

@ -1272,7 +1272,7 @@ impl ClusterInfo {
/// We need to avoid having obj locked while doing a io, such as the `send_to`
pub fn retransmit_to(
peers: &[&ContactInfo],
packet: &Packet,
data: &[u8],
s: &UdpSocket,
forwarded: bool,
socket_addr_space: &SocketAddrSpace,
@ -1291,8 +1291,6 @@ impl ClusterInfo {
.filter(|addr| socket_addr_space.check(addr))
.collect()
};
let data = &packet.data[..packet.meta.size];
if let Err(SendPktsError::IoError(ioerr, num_failed)) = multi_target_send(s, data, &dests) {
inc_new_counter_info!("cluster_info-retransmit-packets", dests.len(), 1);
inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1);

View File

@ -220,7 +220,7 @@ pub fn cluster_info_retransmit() {
let retransmit_peers: Vec<_> = peers.iter().collect();
ClusterInfo::retransmit_to(
&retransmit_peers,
&p,
&p.data[..p.meta.size],
&tn1,
false,
&SocketAddrSpace::Unspecified,

View File

@ -1,65 +1,67 @@
//! The `blockstore` module provides functions for parallel verification of the
//! Proof of History ledger as well as iterative read, append write, and random
//! access read to a persistent file-based ledger.
use crate::{
ancestor_iterator::AncestorIterator,
blockstore_db::{
columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection,
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
erasure::ErasureConfig,
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK},
};
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use bincode::deserialize;
use log::*;
use rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
ThreadPool,
};
use rocksdb::DBRawIterator;
use solana_measure::measure::Measure;
use solana_metrics::{datapoint_debug, datapoint_error};
use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE};
use solana_sdk::{
clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK},
genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE},
hash::Hash,
pubkey::Pubkey,
sanitize::Sanitize,
signature::{Keypair, Signature, Signer},
timing::timestamp,
transaction::Transaction,
};
use solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta};
use solana_transaction_status::{
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards,
TransactionStatusMeta, TransactionWithStatusMeta,
};
use std::{
borrow::Cow,
cell::RefCell,
cmp,
collections::{BTreeMap, HashMap, HashSet},
convert::TryInto,
fs,
io::{Error as IoError, ErrorKind},
path::{Path, PathBuf},
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
Arc, Mutex, RwLock, RwLockWriteGuard,
use {
crate::{
ancestor_iterator::AncestorIterator,
blockstore_db::{
columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection,
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
erasure::ErasureConfig,
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK},
},
time::Instant,
bincode::deserialize,
log::*,
rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
ThreadPool,
},
rocksdb::DBRawIterator,
solana_measure::measure::Measure,
solana_metrics::{datapoint_debug, datapoint_error},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
solana_sdk::{
clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK},
genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE},
hash::Hash,
pubkey::Pubkey,
sanitize::Sanitize,
signature::{Keypair, Signature, Signer},
timing::timestamp,
transaction::Transaction,
},
solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta},
solana_transaction_status::{
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards,
TransactionStatusMeta, TransactionWithStatusMeta,
},
std::{
borrow::Cow,
cell::RefCell,
cmp,
collections::{BTreeMap, HashMap, HashSet},
convert::TryInto,
fs,
io::{Error as IoError, ErrorKind},
path::{Path, PathBuf},
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, Receiver, Sender, SyncSender, TrySendError},
Arc, Mutex, RwLock, RwLockWriteGuard,
},
time::Instant,
},
thiserror::Error,
trees::{Tree, TreeWalk},
};
use thiserror::Error;
use trees::{Tree, TreeWalk};
pub mod blockstore_purge;
@ -804,6 +806,7 @@ impl Blockstore {
is_repaired: Vec<bool>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec<Shred>>>,
handle_duplicate: &F,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)>
@ -815,7 +818,7 @@ impl Blockstore {
let mut start = Measure::start("Blockstore lock");
let _lock = self.insert_shreds_lock.lock().unwrap();
start.stop();
let insert_lock_elapsed = start.as_us();
metrics.insert_lock_elapsed += start.as_us();
let db = &*self.db;
let mut write_batch = db.batch()?;
@ -826,73 +829,56 @@ impl Blockstore {
let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();
let num_shreds = shreds.len();
metrics.num_shreds += shreds.len();
let mut start = Measure::start("Shred insertion");
let mut num_inserted = 0;
let mut index_meta_time = 0;
let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![];
let mut inserted_indices = Vec::new();
shreds
.into_iter()
.zip(is_repaired.into_iter())
.enumerate()
.for_each(|(i, (shred, is_repaired))| {
if shred.is_data() {
let shred_slot = shred.slot();
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
ShredSource::Turbine
};
if let Ok(completed_data_sets) = self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
shred_source,
) {
newly_completed_data_sets.extend(completed_data_sets.into_iter().map(
|(start_index, end_index)| CompletedDataSetInfo {
slot: shred_slot,
start_index,
end_index,
},
));
inserted_indices.push(i);
num_inserted += 1;
}
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
is_repaired,
);
for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() {
if shred.is_data() {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
panic!("There should be no other case");
ShredSource::Turbine
};
if let Ok(completed_data_sets) = self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
shred_source,
) {
newly_completed_data_sets.extend(completed_data_sets);
inserted_indices.push(i);
metrics.num_inserted += 1;
}
});
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
is_repaired,
);
} else {
panic!("There should be no other case");
}
}
start.stop();
let insert_shreds_elapsed = start.as_us();
metrics.insert_shreds_elapsed += start.as_us();
let mut start = Measure::start("Shred recovery");
let mut num_recovered = 0;
let mut num_recovered_inserted = 0;
let mut num_recovered_failed_sig = 0;
let mut num_recovered_failed_invalid = 0;
let mut num_recovered_exists = 0;
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_data = Self::try_shred_recovery(
let recovered_data_shreds = Self::try_shred_recovery(
db,
&erasure_metas,
&mut index_working_set,
@ -900,71 +886,73 @@ impl Blockstore {
&mut just_inserted_coding_shreds,
);
num_recovered = recovered_data.len();
recovered_data.into_iter().for_each(|shred| {
if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) {
let shred_slot = shred.slot();
if shred.verify(&leader) {
match self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
&handle_duplicate,
leader_schedule,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => {
num_recovered_exists += 1;
}
Err(InsertDataShredError::InvalidShred) => {
num_recovered_failed_invalid += 1;
}
Err(InsertDataShredError::BlockstoreError(_)) => {}
Ok(completed_data_sets) => {
newly_completed_data_sets.extend(
completed_data_sets.into_iter().map(
|(start_index, end_index)| CompletedDataSetInfo {
slot: shred_slot,
start_index,
end_index,
},
),
);
num_recovered_inserted += 1;
}
}
} else {
num_recovered_failed_sig += 1;
metrics.num_recovered += recovered_data_shreds.len();
let recovered_data_shreds: Vec<_> = recovered_data_shreds
.into_iter()
.filter_map(|shred| {
let leader =
leader_schedule_cache.slot_leader_at(shred.slot(), /*bank=*/ None)?;
if !shred.verify(&leader) {
metrics.num_recovered_failed_sig += 1;
return None;
}
match self.check_insert_data_shred(
shred.clone(),
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
&handle_duplicate,
leader_schedule,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => {
metrics.num_recovered_exists += 1;
None
}
Err(InsertDataShredError::InvalidShred) => {
metrics.num_recovered_failed_invalid += 1;
None
}
Err(InsertDataShredError::BlockstoreError(_)) => None,
Ok(completed_data_sets) => {
newly_completed_data_sets.extend(completed_data_sets);
metrics.num_recovered_inserted += 1;
Some(shred)
}
}
})
// Always collect recovered-shreds so that above insert code is
// executed even if retransmit-sender is None.
.collect();
if !recovered_data_shreds.is_empty() {
if let Some(retransmit_sender) = retransmit_sender {
let _ = retransmit_sender.send(recovered_data_shreds);
}
});
}
}
start.stop();
let shred_recovery_elapsed = start.as_us();
metrics.shred_recovery_elapsed += start.as_us();
just_inserted_coding_shreds
.into_iter()
.for_each(|((_, _), shred)| {
self.check_insert_coding_shred(
shred,
&mut index_working_set,
&mut write_batch,
&mut index_meta_time,
);
num_inserted += 1;
});
metrics.num_inserted += just_inserted_coding_shreds.len() as u64;
for (_, shred) in just_inserted_coding_shreds.into_iter() {
self.check_insert_coding_shred(
shred,
&mut index_working_set,
&mut write_batch,
&mut index_meta_time,
);
}
let mut start = Measure::start("Shred recovery");
// Handle chaining for the members of the slot_meta_working_set that were inserted into,
// drop the others
handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?;
start.stop();
let chaining_elapsed = start.as_us();
metrics.chaining_elapsed += start.as_us();
let mut start = Measure::start("Commit Working Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
@ -983,12 +971,12 @@ impl Blockstore {
}
}
start.stop();
let commit_working_sets_elapsed = start.as_us();
metrics.commit_working_sets_elapsed += start.as_us();
let mut start = Measure::start("Write Batch");
self.db.write(write_batch)?;
start.stop();
let write_batch_elapsed = start.as_us();
metrics.write_batch_elapsed += start.as_us();
send_signals(
&self.new_shreds_signals,
@ -999,20 +987,7 @@ impl Blockstore {
total_start.stop();
metrics.num_shreds += num_shreds;
metrics.total_elapsed += total_start.as_us();
metrics.insert_lock_elapsed += insert_lock_elapsed;
metrics.insert_shreds_elapsed += insert_shreds_elapsed;
metrics.shred_recovery_elapsed += shred_recovery_elapsed;
metrics.chaining_elapsed += chaining_elapsed;
metrics.commit_working_sets_elapsed += commit_working_sets_elapsed;
metrics.write_batch_elapsed += write_batch_elapsed;
metrics.num_inserted += num_inserted;
metrics.num_recovered += num_recovered;
metrics.num_recovered_inserted += num_recovered_inserted;
metrics.num_recovered_failed_sig += num_recovered_failed_sig;
metrics.num_recovered_failed_invalid = num_recovered_failed_invalid;
metrics.num_recovered_exists = num_recovered_exists;
metrics.index_meta_time += index_meta_time;
Ok((newly_completed_data_sets, inserted_indices))
@ -1054,7 +1029,8 @@ impl Blockstore {
vec![false; shreds_len],
leader_schedule,
is_trusted,
&|_| {},
None, // retransmit-sender
&|_| {}, // handle-duplicates
&mut BlockstoreInsertionMetrics::default(),
)
}
@ -1230,7 +1206,7 @@ impl Blockstore {
handle_duplicate: &F,
leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource,
) -> std::result::Result<Vec<(u32, u32)>, InsertDataShredError>
) -> std::result::Result<Vec<CompletedDataSetInfo>, InsertDataShredError>
where
F: Fn(Shred),
{
@ -1496,7 +1472,7 @@ impl Blockstore {
shred: &Shred,
write_batch: &mut WriteBatch,
shred_source: ShredSource,
) -> Result<Vec<(u32, u32)>> {
) -> Result<Vec<CompletedDataSetInfo>> {
let slot = shred.slot();
let index = u64::from(shred.index());
@ -1545,7 +1521,14 @@ impl Blockstore {
new_consumed,
shred.reference_tick(),
data_index,
);
)
.into_iter()
.map(|(start_index, end_index)| CompletedDataSetInfo {
slot,
start_index,
end_index,
})
.collect();
if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered {
let mut slots_stats = self.slots_stats.lock().unwrap();
let mut e = slots_stats.stats.entry(slot_meta.slot).or_default();