Setup ReplayStage confirmation scaffolding for duplicate slots (#9698)

This commit is contained in:
carllin
2021-03-24 23:41:52 -07:00
committed by GitHub
parent 6d5c6c17c5
commit 52703badfa
16 changed files with 2412 additions and 224 deletions

View File

@ -26,7 +26,7 @@ use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
use solana_perf::packet::Packets;
use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms};
use solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms};
use solana_streamer::streamer::PacketSender;
use std::{
net::{SocketAddr, UdpSocket},
@ -36,6 +36,9 @@ use std::{
time::{Duration, Instant},
};
pub type DuplicateSlotSender = CrossbeamSender<Slot>;
pub type DuplicateSlotReceiver = CrossbeamReceiver<Slot>;
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
if shred.is_data() {
// Only data shreds have parent information
@ -86,21 +89,25 @@ fn run_check_duplicate(
cluster_info: &ClusterInfo,
blockstore: &Blockstore,
shred_receiver: &CrossbeamReceiver<Shred>,
duplicate_slot_sender: &DuplicateSlotSender,
) -> Result<()> {
let check_duplicate = |shred: Shred| -> Result<()> {
if !blockstore.has_duplicate_shreds_in_slot(shred.slot()) {
let shred_slot = shred.slot();
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(
shred.slot(),
shred_slot,
shred.index(),
&shred.payload,
shred.is_data(),
) {
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
blockstore.store_duplicate_slot(
shred.slot(),
shred_slot,
existing_shred_payload,
shred.payload,
)?;
duplicate_slot_sender.send(shred_slot)?;
}
}
@ -319,6 +326,7 @@ impl WindowService {
cluster_slots: Arc<ClusterSlots>,
verified_vote_receiver: VerifiedVoteReceiver,
completed_data_sets_sender: CompletedDataSetsSender,
duplicate_slots_sender: DuplicateSlotSender,
) -> WindowService
where
F: 'static
@ -346,6 +354,7 @@ impl WindowService {
exit.clone(),
blockstore.clone(),
duplicate_receiver,
duplicate_slots_sender,
);
let t_insert = Self::start_window_insert_thread(
@ -381,6 +390,7 @@ impl WindowService {
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
duplicate_receiver: CrossbeamReceiver<Shred>,
duplicate_slot_sender: DuplicateSlotSender,
) -> JoinHandle<()> {
let handle_error = || {
inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
@ -393,8 +403,12 @@ impl WindowService {
}
let mut noop = || {};
if let Err(e) = run_check_duplicate(&cluster_info, &blockstore, &duplicate_receiver)
{
if let Err(e) = run_check_duplicate(
&cluster_info,
&blockstore,
&duplicate_receiver,
&duplicate_slot_sender,
) {
if Self::should_exit_on_error(e, &mut noop, &handle_error) {
break;
}
@ -408,7 +422,7 @@ impl WindowService {
blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
duplicate_sender: CrossbeamSender<Shred>,
check_duplicate_sender: CrossbeamSender<Shred>,
completed_data_sets_sender: CompletedDataSetsSender,
) -> JoinHandle<()> {
let exit = exit.clone();
@ -423,7 +437,7 @@ impl WindowService {
.name("solana-window-insert".to_string())
.spawn(move || {
let handle_duplicate = |shred| {
let _ = duplicate_sender.send(shred);
let _ = check_duplicate_sender.send(shred);
};
let mut metrics = BlockstoreInsertionMetrics::default();
let mut last_print = Instant::now();
@ -538,6 +552,7 @@ impl WindowService {
handle_timeout();
false
}
Error::CrossbeamSendError => true,
_ => {
handle_error();
error!("thread {:?} error {:?}", thread::current().name(), e);
@ -566,7 +581,6 @@ mod test {
shred::{DataShredHeader, Shredder},
};
use solana_sdk::{
clock::Slot,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
hash::Hash,
signature::{Keypair, Signer},
@ -680,6 +694,7 @@ mod test {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let (sender, receiver) = unbounded();
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
let (shreds, _) = make_many_slot_entries(5, 5, 10);
blockstore
.insert_shreds(shreds.clone(), None, false)
@ -692,7 +707,17 @@ mod test {
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
let cluster_info = ClusterInfo::new(contact_info, Arc::new(keypair));
run_check_duplicate(&cluster_info, &blockstore, &receiver).unwrap();
run_check_duplicate(
&cluster_info,
&blockstore,
&receiver,
&duplicate_slot_sender,
)
.unwrap();
assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
assert_eq!(
duplicate_slot_receiver.try_recv().unwrap(),
duplicate_shred_slot
);
}
}