Don't verify blobs that are less than root in window service (#5901)

This commit is contained in:
carllin
2019-09-16 13:13:53 -07:00
committed by GitHub
parent 167adff22c
commit d5ba90d375
4 changed files with 88 additions and 46 deletions

View File

@ -751,33 +751,7 @@ impl Blocktree {
} }
let last_root = *last_root.read().unwrap(); let last_root = *last_root.read().unwrap();
if !is_valid_write_to_slot_0(slot, slot_meta.parent_slot, last_root) { verify_shred_slots(slot, slot_meta.parent_slot, last_root);
// Check that the parent_slot < slot
if slot_meta.parent_slot >= slot {
datapoint_error!(
"blocktree_error",
(
"error",
format!(
"Received blob with parent_slot {} >= slot {}",
slot_meta.parent_slot, slot
),
String
)
);
return false;
}
// Check that the blob is for a slot that is past the root
if slot <= last_root {
return false;
}
// Ignore blobs that chain to slots before the last root
if slot_meta.parent_slot < last_root {
return false;
}
}
true true
} }
@ -1699,6 +1673,24 @@ macro_rules! create_new_tmp_ledger {
}; };
} }
pub fn verify_shred_slots(slot: u64, parent_slot: u64, last_root: u64) -> bool {
if !is_valid_write_to_slot_0(slot, parent_slot, last_root) {
// Check that the parent_slot < slot
if parent_slot >= slot {
return false;
}
// Ignore blobs that chain to slots before the last root
if parent_slot < last_root {
return false;
}
// Above two checks guarantee that by this point, slot > last_root
}
true
}
// Same as `create_new_ledger()` but use a temporary ledger name based on the provided `name` // Same as `create_new_ledger()` but use a temporary ledger name based on the provided `name`
// //
// Note: like `create_new_ledger` the returned ledger will have slot 0 full of ticks (and only // Note: like `create_new_ledger` the returned ledger will have slot 0 full of ticks (and only

View File

@ -477,7 +477,7 @@ impl Replicator {
&exit, &exit,
RepairStrategy::RepairRange(repair_slot_range), RepairStrategy::RepairRange(repair_slot_range),
&Arc::new(LeaderScheduleCache::default()), &Arc::new(LeaderScheduleCache::default()),
|_, _, _, _| true, |_, _, _, _, _| true,
); );
info!("waiting for ledger download"); info!("waiting for ledger download");
Self::wait_for_segment_download( Self::wait_for_segment_download(

View File

@ -151,13 +151,14 @@ impl RetransmitStage {
exit, exit,
repair_strategy, repair_strategy,
&leader_schedule_cache.clone(), &leader_schedule_cache.clone(),
move |id, shred, shred_buf, working_bank| { move |id, shred, shred_buf, working_bank, last_root| {
should_retransmit_and_persist( should_retransmit_and_persist(
shred, shred,
shred_buf, shred_buf,
working_bank, working_bank,
&leader_schedule_cache, &leader_schedule_cache,
id, id,
last_root,
) )
}, },
); );

View File

@ -1,7 +1,7 @@
//! `window_service` handles the data plane incoming blobs, storing them in //! `window_service` handles the data plane incoming blobs, storing them in
//! blocktree and retransmitting where required //! blocktree and retransmitting where required
//! //!
use crate::blocktree::Blocktree; use crate::blocktree::{self, Blocktree};
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::repair_service::{RepairService, RepairStrategy}; use crate::repair_service::{RepairService, RepairStrategy};
@ -33,16 +33,19 @@ pub fn should_retransmit_and_persist(
bank: Option<Arc<Bank>>, bank: Option<Arc<Bank>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
root: u64,
) -> bool { ) -> bool {
let slot_leader_pubkey = match bank { let slot_leader_pubkey = match bank {
None => leader_schedule_cache.slot_leader_at(shred.slot(), None), None => leader_schedule_cache.slot_leader_at(shred.slot(), None),
Some(bank) => leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank)), Some(bank) => leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank)),
}; };
if let Some(leader_id) = slot_leader_pubkey { if let Some(leader_id) = slot_leader_pubkey {
if leader_id == *my_pubkey { if leader_id == *my_pubkey {
inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1); inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1);
false false
} else if !blocktree::verify_shred_slots(shred.slot(), shred.parent(), root) {
inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1);
false
} else if !shred.fast_verify(&shred_buf, &leader_id) { } else if !shred.fast_verify(&shred_buf, &leader_id) {
inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1); inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1);
false false
@ -65,7 +68,7 @@ fn recv_window<F>(
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> Result<()> ) -> Result<()>
where where
F: Fn(&Shred, &[u8]) -> bool, F: Fn(&Shred, &[u8], u64) -> bool,
F: Sync, F: Sync,
{ {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
@ -77,6 +80,7 @@ where
let now = Instant::now(); let now = Instant::now();
inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len()); inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len());
let last_root = blocktree.last_root();
let (shreds, packets_ix): (Vec<_>, Vec<_>) = thread_pool.install(|| { let (shreds, packets_ix): (Vec<_>, Vec<_>) = thread_pool.install(|| {
packets packets
.packets .packets
@ -85,7 +89,7 @@ where
.filter_map(|(i, packet)| { .filter_map(|(i, packet)| {
if let Ok(s) = bincode::deserialize(&packet.data) { if let Ok(s) = bincode::deserialize(&packet.data) {
let shred: Shred = s; let shred: Shred = s;
if shred_filter(&shred, &packet.data) { if shred_filter(&shred, &packet.data, last_root) {
packet.meta.slot = shred.slot(); packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed(); packet.meta.seed = shred.seed();
Some((shred, i)) Some((shred, i))
@ -175,7 +179,7 @@ impl WindowService {
) -> WindowService ) -> WindowService
where where
F: 'static F: 'static
+ Fn(&Pubkey, &Shred, &[u8], Option<Arc<Bank>>) -> bool + Fn(&Pubkey, &Shred, &[u8], Option<Arc<Bank>>, u64) -> bool
+ std::marker::Send + std::marker::Send
+ std::marker::Sync, + std::marker::Sync,
{ {
@ -219,7 +223,7 @@ impl WindowService {
&id, &id,
&r, &r,
&retransmit, &retransmit,
|shred, shred_buf| { |shred, shred_buf, last_root| {
shred_filter( shred_filter(
&id, &id,
shred, shred,
@ -227,6 +231,7 @@ impl WindowService {
bank_forks bank_forks
.as_ref() .as_ref()
.map(|bank_forks| bank_forks.read().unwrap().working_bank()), .map(|bank_forks| bank_forks.read().unwrap().working_bank()),
last_root,
) )
}, },
&thread_pool, &thread_pool,
@ -297,9 +302,14 @@ mod test {
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
fn local_entries_to_shred(entries: Vec<Entry>, keypair: &Arc<Keypair>) -> Vec<Shred> { fn local_entries_to_shred(
entries: Vec<Entry>,
slot: u64,
parent: u64,
keypair: &Arc<Keypair>,
) -> Vec<Shred> {
let mut shredder = let mut shredder =
Shredder::new(0, 0, 0.0, keypair, 0).expect("Failed to create entry shredder"); Shredder::new(slot, parent, 0.0, keypair, 0).expect("Failed to create entry shredder");
bincode::serialize_into(&mut shredder, &entries) bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds"); .expect("Expect to write all entries to shreds");
shredder.finalize_slot(); shredder.finalize_slot();
@ -313,7 +323,7 @@ mod test {
let num_entries = 10; let num_entries = 10;
let original_entries = make_tiny_test_entries(num_entries); let original_entries = make_tiny_test_entries(num_entries);
let mut shreds = let mut shreds =
local_entries_to_shred(original_entries.clone(), &Arc::new(Keypair::new())); local_entries_to_shred(original_entries.clone(), 0, 0, &Arc::new(Keypair::new()));
shreds.reverse(); shreds.reverse();
blocktree blocktree
.insert_shreds(shreds, None) .insert_shreds(shreds, None)
@ -331,15 +341,14 @@ mod test {
#[test] #[test]
fn test_should_retransmit_and_persist() { fn test_should_retransmit_and_persist() {
let me_id = Pubkey::new_rand(); let me_id = Pubkey::new_rand();
let leader_keypair = Keypair::new(); let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey(); let leader_pubkey = leader_keypair.pubkey();
let bank = Arc::new(Bank::new( let bank = Arc::new(Bank::new(
&create_genesis_block_with_leader(100, &leader_pubkey, 10).genesis_block, &create_genesis_block_with_leader(100, &leader_pubkey, 10).genesis_block,
)); ));
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let entry = Entry::default(); let mut shreds = local_entries_to_shred(vec![Entry::default()], 0, 0, &leader_keypair);
let mut shreds = local_entries_to_shred(vec![entry], &Arc::new(leader_keypair));
let shred_bufs: Vec<_> = shreds let shred_bufs: Vec<_> = shreds
.iter() .iter()
.map(|s| bincode::serialize(s).unwrap()) .map(|s| bincode::serialize(s).unwrap())
@ -352,7 +361,8 @@ mod test {
&shred_bufs[0], &shred_bufs[0],
Some(bank.clone()), Some(bank.clone()),
&cache, &cache,
&me_id &me_id,
0,
), ),
true true
); );
@ -368,7 +378,46 @@ mod test {
// with a Bank and no idea who leader is, blob gets thrown out // with a Bank and no idea who leader is, blob gets thrown out
shreds[0].set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3); shreds[0].set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3);
assert_eq!( assert_eq!(
should_retransmit_and_persist(&shreds[0], &shred_bufs[0], Some(bank), &cache, &me_id), should_retransmit_and_persist(
&shreds[0],
&shred_bufs[0],
Some(bank.clone()),
&cache,
&me_id,
0
),
false
);
// with a shred where shred.slot() == root, blob gets thrown out
let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds =
local_entries_to_shred(vec![Entry::default()], slot, slot - 1, &leader_keypair);
assert_eq!(
should_retransmit_and_persist(
&shreds[0],
&shred_bufs[0],
Some(bank.clone()),
&cache,
&me_id,
slot
),
false
);
// with a shred where shred.parent() < root, blob gets thrown out
let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds =
local_entries_to_shred(vec![Entry::default()], slot + 1, slot - 1, &leader_keypair);
assert_eq!(
should_retransmit_and_persist(
&shreds[0],
&shred_bufs[0],
Some(bank.clone()),
&cache,
&me_id,
slot
),
false false
); );
@ -429,7 +478,7 @@ mod test {
&exit, &exit,
repair_strategy, repair_strategy,
&Arc::new(LeaderScheduleCache::default()), &Arc::new(LeaderScheduleCache::default()),
|_, _, _, _| true, |_, _, _, _, _| true,
); );
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
@ -519,7 +568,7 @@ mod test {
&exit, &exit,
repair_strategy, repair_strategy,
&Arc::new(LeaderScheduleCache::default()), &Arc::new(LeaderScheduleCache::default()),
|_, _, _, _| true, |_, _, _, _, _| true,
); );
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
@ -580,7 +629,7 @@ mod test {
&exit, &exit,
RepairStrategy::RepairRange(RepairSlotRange { start: 0, end: 0 }), RepairStrategy::RepairRange(RepairSlotRange { start: 0, end: 0 }),
&Arc::new(LeaderScheduleCache::default()), &Arc::new(LeaderScheduleCache::default()),
|_, _, _, _| true, |_, _, _, _, _| true,
); );
window window
} }