Add separate thread to check for and store duplicate slot proofs (#7834)

This commit is contained in:
carllin
2020-01-16 15:27:54 -08:00
committed by GitHub
parent f5e797e3aa
commit 76e20015a4
2 changed files with 132 additions and 19 deletions

View File

@ -73,11 +73,44 @@ pub fn should_retransmit_and_persist(
} }
} }
fn run_insert( fn run_check_duplicate(
blockstore: &Arc<Blockstore>,
shred_receiver: &CrossbeamReceiver<Shred>,
) -> Result<()> {
let check_duplicate = |shred: Shred| -> Result<()> {
if !blockstore.has_duplicate_shreds_in_slot(shred.slot()) {
if let Some(existing_shred_payload) =
blockstore.is_shred_duplicate(shred.slot(), shred.index(), &shred.payload)
{
blockstore.store_duplicate_slot(
shred.slot(),
existing_shred_payload,
shred.payload,
)?;
}
}
Ok(())
};
let timer = Duration::from_millis(200);
let shred = shred_receiver.recv_timeout(timer)?;
check_duplicate(shred)?;
while let Ok(shred) = shred_receiver.try_recv() {
check_duplicate(shred)?;
}
Ok(())
}
fn run_insert<F>(
shred_receiver: &CrossbeamReceiver<Vec<Shred>>, shred_receiver: &CrossbeamReceiver<Vec<Shred>>,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> Result<()> { handle_duplicate: F,
) -> Result<()>
where
F: Fn(Shred) -> (),
{
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut shreds = shred_receiver.recv_timeout(timer)?; let mut shreds = shred_receiver.recv_timeout(timer)?;
@ -85,8 +118,12 @@ fn run_insert(
shreds.append(&mut more_shreds) shreds.append(&mut more_shreds)
} }
let blockstore_insert_metrics = let blockstore_insert_metrics = blockstore.insert_shreds_handle_duplicate(
blockstore.insert_shreds(shreds, Some(leader_schedule_cache), false)?; shreds,
Some(leader_schedule_cache),
false,
&handle_duplicate,
)?;
blockstore_insert_metrics.report_metrics("recv-window-insert-shreds"); blockstore_insert_metrics.report_metrics("recv-window-insert-shreds");
Ok(()) Ok(())
@ -199,6 +236,7 @@ impl Drop for Finalizer {
pub struct WindowService { pub struct WindowService {
t_window: JoinHandle<()>, t_window: JoinHandle<()>,
t_insert: JoinHandle<()>, t_insert: JoinHandle<()>,
t_check_duplicate: JoinHandle<()>,
repair_service: RepairService, repair_service: RepairService,
} }
@ -235,12 +273,17 @@ impl WindowService {
); );
let (insert_sender, insert_receiver) = unbounded(); let (insert_sender, insert_receiver) = unbounded();
let (duplicate_sender, duplicate_receiver) = unbounded();
let t_check_duplicate =
Self::start_check_duplicate_thread(exit, &blockstore, duplicate_receiver);
let t_insert = Self::start_window_insert_thread( let t_insert = Self::start_window_insert_thread(
exit, exit,
&blockstore, &blockstore,
leader_schedule_cache, leader_schedule_cache,
insert_receiver, insert_receiver,
duplicate_sender,
); );
let t_window = Self::start_recv_window_thread( let t_window = Self::start_recv_window_thread(
@ -257,15 +300,44 @@ impl WindowService {
WindowService { WindowService {
t_window, t_window,
t_insert, t_insert,
t_check_duplicate,
repair_service, repair_service,
} }
} }
fn start_check_duplicate_thread(
exit: &Arc<AtomicBool>,
blockstore: &Arc<Blockstore>,
duplicate_receiver: CrossbeamReceiver<Shred>,
) -> JoinHandle<()> {
let exit = exit.clone();
let blockstore = blockstore.clone();
let handle_error = || {
inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
};
Builder::new()
.name("solana-check-duplicate".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let mut noop = || {};
if let Err(e) = run_check_duplicate(&blockstore, &duplicate_receiver) {
if Self::should_exit_on_error(e, &mut noop, &handle_error) {
break;
}
}
})
.unwrap()
}
fn start_window_insert_thread( fn start_window_insert_thread(
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
insert_receiver: CrossbeamReceiver<Vec<Shred>>, insert_receiver: CrossbeamReceiver<Vec<Shred>>,
duplicate_sender: CrossbeamSender<Shred>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let exit = exit.clone(); let exit = exit.clone();
let blockstore = blockstore.clone(); let blockstore = blockstore.clone();
@ -274,18 +346,29 @@ impl WindowService {
let handle_error = || { let handle_error = || {
inc_new_counter_error!("solana-window-insert-error", 1, 1); inc_new_counter_error!("solana-window-insert-error", 1, 1);
}; };
Builder::new() Builder::new()
.name("solana-window-insert".to_string()) .name("solana-window-insert".to_string())
.spawn(move || loop { .spawn(move || {
let handle_duplicate = |shred| {
let _ = duplicate_sender.send(shred);
};
loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }
if let Err(e) = run_insert(&insert_receiver, &blockstore, &leader_schedule_cache) { if let Err(e) = run_insert(
&insert_receiver,
&blockstore,
&leader_schedule_cache,
&handle_duplicate,
) {
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
break; break;
} }
} }
}
}) })
.unwrap() .unwrap()
} }
@ -384,6 +467,7 @@ impl WindowService {
pub fn join(self) -> thread::Result<()> { pub fn join(self) -> thread::Result<()> {
self.t_window.join()?; self.t_window.join()?;
self.t_insert.join()?; self.t_insert.join()?;
self.t_check_duplicate.join()?;
self.repair_service.join() self.repair_service.join()
} }
} }
@ -587,4 +671,22 @@ mod test {
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
window.join().unwrap(); window.join().unwrap();
} }
#[test]
fn test_run_check_duplicate() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let (sender, receiver) = unbounded();
let (shreds, _) = make_many_slot_entries(5, 5, 10);
blockstore
.insert_shreds(shreds.clone(), None, false)
.unwrap();
let mut duplicate_shred = shreds[1].clone();
duplicate_shred.set_slot(shreds[0].slot());
let duplicate_shred_slot = duplicate_shred.slot();
sender.send(duplicate_shred).unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
run_check_duplicate(&blockstore, &receiver).unwrap();
assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
}
} }

View File

@ -1668,16 +1668,22 @@ impl Blockstore {
} }
// `new_shred` is asssumed to have slot and index equal to the given slot and index. // `new_shred` is asssumed to have slot and index equal to the given slot and index.
// Returns true if `new_shred` is not equal to the existing shred at the given // Returns the existing shred if `new_shred` is not equal to the existing shred at the
// slot and index as this implies the leader generated two different shreds with // given slot and index as this implies the leader generated two different shreds with
// the same slot and index // the same slot and index
pub fn is_shred_duplicate(&self, slot: u64, index: u64, new_shred: &[u8]) -> bool { pub fn is_shred_duplicate(&self, slot: u64, index: u32, new_shred: &[u8]) -> Option<Vec<u8>> {
let res = self let res = self
.get_data_shred(slot, index) .get_data_shred(slot, index as u64)
.expect("fetch from DuplicateSlots column family failed"); .expect("fetch from DuplicateSlots column family failed");
res.map(|existing_shred| existing_shred != new_shred) res.map(|existing_shred| {
.unwrap_or(false) if existing_shred != new_shred {
Some(existing_shred)
} else {
None
}
})
.unwrap_or(None)
} }
pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool { pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool {
@ -5286,8 +5292,13 @@ pub mod tests {
assert!(!blockstore.has_duplicate_shreds_in_slot(slot)); assert!(!blockstore.has_duplicate_shreds_in_slot(slot));
// Check if shreds are duplicated // Check if shreds are duplicated
assert!(blockstore.is_shred_duplicate(slot, 0, &duplicate_shred.payload)); assert_eq!(
assert!(!blockstore.is_shred_duplicate(slot, 0, &non_duplicate_shred.payload)); blockstore.is_shred_duplicate(slot, 0, &duplicate_shred.payload),
Some(shred.payload.clone())
);
assert!(blockstore
.is_shred_duplicate(slot, 0, &non_duplicate_shred.payload)
.is_none());
// Store a duplicate shred // Store a duplicate shred
blockstore blockstore