Add a version field to shreds (#7023)

* Add a version field to shreds

* Clippy

* Fix Chacha Golden

* Fix shredder bench compile

* Fix blocktree bench compile
This commit is contained in:
Sagar Dhawan
2019-11-18 18:05:02 -08:00
committed by GitHub
parent bfa2535ea1
commit 6bfe0fca1f
21 changed files with 235 additions and 81 deletions

View File

@ -21,7 +21,7 @@ fn bench_write_shreds(bench: &mut Bencher, entries: Vec<Entry>, ledger_path: &Pa
let blocktree =
Blocktree::open(ledger_path).expect("Expected to be able to open database ledger");
bench.iter(move || {
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true, 0);
blocktree.insert_shreds(shreds, None, false).unwrap();
});
@ -43,7 +43,7 @@ fn setup_read_bench(
);
// Convert the entries to shreds, write the shreds to the ledger
let shreds = entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true);
let shreds = entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true, 0);
blocktree
.insert_shreds(shreds, None, false)
.expect("Expectd successful insertion of shreds into ledger");
@ -136,7 +136,7 @@ fn bench_insert_data_shred_small(bench: &mut Bencher) {
let num_entries = 32 * 1024;
let entries = create_ticks(num_entries, 0, Hash::default());
bench.iter(move || {
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true, 0);
blocktree.insert_shreds(shreds, None, false).unwrap();
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
@ -151,7 +151,7 @@ fn bench_insert_data_shred_big(bench: &mut Bencher) {
let num_entries = 32 * 1024;
let entries = create_ticks(num_entries, 0, Hash::default());
bench.iter(move || {
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true, 0);
blocktree.insert_shreds(shreds, None, false).unwrap();
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");

View File

@ -35,7 +35,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
bencher.iter(|| {
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0).unwrap();
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap();
shredder.entries_to_shreds(&entries, true, 0);
})
}
@ -50,7 +50,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
// 1Mb
bencher.iter(|| {
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0).unwrap();
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap();
shredder.entries_to_shreds(&entries, true, 0);
})
}
@ -63,7 +63,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0).unwrap();
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0, 0).unwrap();
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
bencher.iter(|| {
let raw = &mut Shredder::deshred(&data_shreds).unwrap();
@ -75,7 +75,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
fn bench_deserialize_hdr(bencher: &mut Bencher) {
let data = vec![0; SIZE_OF_DATA_SHRED_PAYLOAD];
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0);
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0);
bencher.iter(|| {
let payload = shred.payload.clone();

View File

@ -153,6 +153,7 @@ mod test {
true,
&Arc::new(Keypair::new()),
entries,
0,
)
.unwrap();

View File

@ -42,6 +42,7 @@ impl BroadcastStageType {
receiver: Receiver<WorkingBankEntry>,
exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
shred_version: u16,
) -> BroadcastStage {
match self {
BroadcastStageType::Standard => {
@ -52,7 +53,7 @@ impl BroadcastStageType {
receiver,
exit_sender,
blocktree,
StandardBroadcastRun::new(keypair),
StandardBroadcastRun::new(keypair, shred_version),
)
}
@ -62,7 +63,7 @@ impl BroadcastStageType {
receiver,
exit_sender,
blocktree,
FailEntryVerificationBroadcastRun::new(),
FailEntryVerificationBroadcastRun::new(shred_version),
),
BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new(
@ -71,7 +72,7 @@ impl BroadcastStageType {
receiver,
exit_sender,
blocktree,
BroadcastFakeShredsRun::new(0),
BroadcastFakeShredsRun::new(0, shred_version),
),
}
}
@ -240,7 +241,7 @@ mod test {
entry_receiver,
&exit_sender,
&blocktree,
StandardBroadcastRun::new(leader_keypair),
StandardBroadcastRun::new(leader_keypair, 0),
);
MockBroadcastStage {

View File

@ -6,13 +6,15 @@ use solana_sdk::hash::Hash;
pub(super) struct BroadcastFakeShredsRun {
last_blockhash: Hash,
partition: usize,
shred_version: u16,
}
impl BroadcastFakeShredsRun {
pub(super) fn new(partition: usize) -> Self {
pub(super) fn new(partition: usize, shred_version: u16) -> Self {
Self {
last_blockhash: Hash::default(),
partition,
shred_version,
}
}
}
@ -45,6 +47,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
RECOMMENDED_FEC_RATE,
keypair.clone(),
(bank.tick_height() % bank.ticks_per_slot()) as u8,
self.shred_version,
)
.expect("Expected to create a new shredder");

View File

@ -2,11 +2,13 @@ use super::*;
use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_sdk::hash::Hash;
pub(super) struct FailEntryVerificationBroadcastRun {}
pub(super) struct FailEntryVerificationBroadcastRun {
shred_version: u16,
}
impl FailEntryVerificationBroadcastRun {
pub(super) fn new() -> Self {
Self {}
pub(super) fn new(shred_version: u16) -> Self {
Self { shred_version }
}
}
@ -43,6 +45,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
RECOMMENDED_FEC_RATE,
keypair.clone(),
(bank.tick_height() % bank.ticks_per_slot()) as u8,
self.shred_version,
)
.expect("Expected to create a new shredder");

View File

@ -35,16 +35,18 @@ pub(super) struct StandardBroadcastRun {
current_slot_and_parent: Option<(u64, u64)>,
slot_broadcast_start: Option<Instant>,
keypair: Arc<Keypair>,
shred_version: u16,
}
impl StandardBroadcastRun {
pub(super) fn new(keypair: Arc<Keypair>) -> Self {
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
Self {
stats: BroadcastStats::default(),
unfinished_slot: None,
current_slot_and_parent: None,
slot_broadcast_start: None,
keypair,
shred_version,
}
}
@ -63,6 +65,7 @@ impl StandardBroadcastRun {
true,
true,
max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK,
self.shred_version,
))
} else {
None
@ -93,6 +96,7 @@ impl StandardBroadcastRun {
RECOMMENDED_FEC_RATE,
self.keypair.clone(),
reference_tick,
self.shred_version,
)
.expect("Expected to create a new shredder");
@ -350,7 +354,7 @@ mod test {
#[test]
fn test_interrupted_slot_last_shred() {
let keypair = Arc::new(Keypair::new());
let mut run = StandardBroadcastRun::new(keypair.clone());
let mut run = StandardBroadcastRun::new(keypair.clone(), 0);
// Set up the slot to be interrupted
let next_shred_index = 10;
@ -396,7 +400,7 @@ mod test {
};
// Step 1: Make an incomplete transmission for slot 0
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair.clone());
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair.clone(), 0);
standard_broadcast_run
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
.unwrap();
@ -472,7 +476,7 @@ mod test {
last_tick_height: ticks.len() as u64,
};
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair);
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair, 0);
standard_broadcast_run
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
.unwrap();

View File

@ -149,6 +149,7 @@ mod tests {
true,
&Arc::new(keypair),
entries,
0,
)
.unwrap();
@ -165,7 +166,7 @@ mod tests {
hasher.hash(&buf[..size]);
// golden needs to be updated if shred structure changes....
let golden: Hash = "HLzH7Nrh4q2K5WTh3e9vPNFZ1QVYhVDRMN9u5v51GqpJ"
let golden: Hash = "9K6NR4cazo7Jzk2CpyXmNaZMGqvfXG83JzyJipkoHare"
.parse()
.unwrap();

View File

@ -146,6 +146,7 @@ mod tests {
true,
&Arc::new(Keypair::new()),
entries,
0,
)
.unwrap();
@ -206,6 +207,7 @@ mod tests {
true,
&Arc::new(Keypair::new()),
entries,
0,
)
.unwrap();

View File

@ -1151,7 +1151,7 @@ mod test {
), // should cause AccountNotFound error
],
);
entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false)
entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false, 0)
});
assert_matches!(
@ -1179,7 +1179,7 @@ mod test {
blockhash,
)],
);
entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false)
entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false, 0)
});
if let Err(Error::BlockError(block_error)) = res {
@ -1203,6 +1203,7 @@ mod test {
slot,
slot.saturating_sub(1),
false,
0,
)
});
@ -1225,6 +1226,7 @@ mod test {
slot,
slot.saturating_sub(1),
false,
0,
)
});
@ -1244,6 +1246,7 @@ mod test {
slot,
slot.saturating_sub(1),
true,
0,
)
});
@ -1265,6 +1268,7 @@ mod test {
slot,
slot.saturating_sub(1),
false,
0,
)
});
@ -1289,7 +1293,7 @@ mod test {
system_transaction::transfer(&genesis_keypair, &keypair.pubkey(), 2, blockhash);
let trailing_entry = entry::next_entry(&last_entry_hash, 1, vec![tx]);
entries.push(trailing_entry);
entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true)
entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true, 0)
});
if let Err(Error::BlockError(block_error)) = res {

View File

@ -213,6 +213,7 @@ impl RetransmitStage {
completed_slots_receiver: CompletedSlotsReceiver,
epoch_schedule: EpochSchedule,
cfg: Option<PartitionCfg>,
shred_version: u16,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
@ -251,6 +252,7 @@ impl RetransmitStage {
&leader_schedule_cache,
id,
last_root,
shred_version,
);
rv && is_connected
},

View File

@ -91,6 +91,7 @@ pub mod tests {
true,
true,
0,
0,
);
let mut batch = [Packets::default(), Packets::default()];
@ -108,6 +109,7 @@ pub mod tests {
true,
true,
0,
0,
);
Shredder::sign_shred(&keypair, &mut shred);
batch[1].packets.resize(1, Packet::default());
@ -131,12 +133,14 @@ pub mod tests {
let mut batch = vec![Packets::default()];
batch[0].packets.resize(2, Packet::default());
let mut shred = Shred::new_from_data(0, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true, 0);
let mut shred =
Shred::new_from_data(0, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true, 0, 0);
Shredder::sign_shred(&leader_keypair, &mut shred);
batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload);
batch[0].packets[0].meta.size = shred.payload.len();
let mut shred = Shred::new_from_data(0, 0xbeef, 0xc0de, Some(&[1, 2, 3, 4]), true, true, 0);
let mut shred =
Shred::new_from_data(0, 0xbeef, 0xc0de, Some(&[1, 2, 3, 4]), true, true, 0, 0);
let wrong_keypair = Keypair::new();
Shredder::sign_shred(&wrong_keypair, &mut shred);
batch[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload);

View File

@ -38,6 +38,7 @@ impl Tpu {
blocktree: &Arc<Blocktree>,
broadcast_type: &BroadcastStageType,
exit: &Arc<AtomicBool>,
shred_version: u16,
) -> Self {
let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender(
@ -79,6 +80,7 @@ impl Tpu {
entry_receiver,
&exit,
blocktree,
shred_version,
);
Self {

View File

@ -74,6 +74,7 @@ impl Tvu {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
sigverify_disabled: bool,
cfg: Option<PartitionCfg>,
shred_version: u16,
) -> Self
where
T: 'static + KeypairUtil + Sync + Send,
@ -132,6 +133,7 @@ impl Tvu {
completed_slots_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg,
shred_version,
);
let (blockstream_slot_sender, blockstream_slot_receiver) = channel();
@ -294,6 +296,7 @@ pub mod tests {
block_commitment_cache,
false,
None,
0,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();

View File

@ -37,6 +37,7 @@ use solana_sdk::{
timing::timestamp,
};
use solana_ledger::shred::Shred;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
@ -184,6 +185,8 @@ impl Validator {
let bank = bank_forks[bank_info.bank_slot].clone();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
// The version used by shreds, derived from genesis
let shred_version = Shred::version_from_hash(&genesis_hash);
let mut validator_exit = ValidatorExit::default();
let exit_ = exit.clone();
@ -346,6 +349,7 @@ impl Validator {
block_commitment_cache,
config.dev_sigverify_disabled,
config.partition_cfg.clone(),
shred_version,
);
if config.dev_sigverify_disabled {
@ -363,6 +367,7 @@ impl Validator {
&blocktree,
&config.broadcast_stage_type,
&exit,
shred_version,
);
datapoint_info!("validator-new", ("id", id.to_string(), String));

View File

@ -42,6 +42,7 @@ pub fn should_retransmit_and_persist(
leader_schedule_cache: &Arc<LeaderScheduleCache>,
my_pubkey: &Pubkey,
root: u64,
shred_version: u16,
) -> bool {
let slot_leader_pubkey = match bank {
None => leader_schedule_cache.slot_leader_at(shred.slot(), None),
@ -54,6 +55,9 @@ pub fn should_retransmit_and_persist(
} else if !verify_shred_slot(shred, root) {
inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1);
false
} else if shred.version() != shred_version {
inc_new_counter_debug!("streamer-recv_window-incorrect_shred_version", 1);
false
} else {
true
}
@ -309,7 +313,7 @@ mod test {
parent: Slot,
keypair: &Arc<Keypair>,
) -> Vec<Shred> {
let shredder = Shredder::new(slot, parent, 0.0, keypair.clone(), 0)
let shredder = Shredder::new(slot, parent, 0.0, keypair.clone(), 0, 0)
.expect("Failed to create entry shredder");
shredder.entries_to_shreds(&entries, true, 0).0
}
@ -349,32 +353,37 @@ mod test {
// with a Bank for slot 0, shred continues
assert_eq!(
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0,),
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0, 0),
true
);
// with the wrong shred_version, shred gets thrown out
assert_eq!(
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0, 1),
false
);
// If it's a coding shred, test that slot >= root
let (common, coding) = Shredder::new_coding_shred_header(5, 5, 6, 6, 0);
let (common, coding) = Shredder::new_coding_shred_header(5, 5, 6, 6, 0, 0);
let mut coding_shred =
Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
Shredder::sign_shred(&leader_keypair, &mut coding_shred);
assert_eq!(
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 0),
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 0, 0),
true
);
assert_eq!(
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 5),
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 5, 0),
true
);
assert_eq!(
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 6),
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 6, 0),
false
);
// with a Bank and no idea who leader is, shred gets thrown out
shreds[0].set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3);
assert_eq!(
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0),
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0, 0),
false
);
@ -382,7 +391,7 @@ mod test {
let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds = local_entries_to_shred(&[Entry::default()], slot, slot - 1, &leader_keypair);
assert_eq!(
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot),
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot, 0),
false
);
@ -391,13 +400,13 @@ mod test {
let shreds =
local_entries_to_shred(&[Entry::default()], slot + 1, slot - 1, &leader_keypair);
assert_eq!(
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot),
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot, 0),
false
);
// if the shred came back from me, it doesn't continue, whether or not I have a bank
assert_eq!(
should_retransmit_and_persist(&shreds[0], None, &cache, &me_id, 0),
should_retransmit_and_persist(&shreds[0], None, &cache, &me_id, 0, 0),
false
);
}