@@ -18,7 +18,7 @@ fn bench_write_shreds(bench: &mut Bencher, entries: Vec<Entry>, ledger_path: &Pa
|
||||
let blocktree =
|
||||
Blocktree::open(ledger_path).expect("Expected to be able to open database ledger");
|
||||
bench.iter(move || {
|
||||
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
|
||||
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true, 0);
|
||||
blocktree.insert_shreds(shreds, None, false).unwrap();
|
||||
});
|
||||
|
||||
@@ -36,7 +36,7 @@ fn setup_read_bench(
|
||||
let entries = create_ticks(num_large_shreds * 4 + num_small_shreds * 2, Hash::default());
|
||||
|
||||
// Convert the entries to shreds, write the shreds to the ledger
|
||||
let shreds = entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true);
|
||||
let shreds = entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true, 0);
|
||||
blocktree
|
||||
.insert_shreds(shreds, None, false)
|
||||
.expect("Expectd successful insertion of shreds into ledger");
|
||||
@@ -129,7 +129,7 @@ fn bench_insert_data_shred_small(bench: &mut Bencher) {
|
||||
let num_entries = 32 * 1024;
|
||||
let entries = create_ticks(num_entries, Hash::default());
|
||||
bench.iter(move || {
|
||||
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
|
||||
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true, 0);
|
||||
blocktree.insert_shreds(shreds, None, false).unwrap();
|
||||
});
|
||||
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||
@@ -144,7 +144,7 @@ fn bench_insert_data_shred_big(bench: &mut Bencher) {
|
||||
let num_entries = 32 * 1024;
|
||||
let entries = create_ticks(num_entries, Hash::default());
|
||||
bench.iter(move || {
|
||||
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
|
||||
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true, 0);
|
||||
blocktree.insert_shreds(shreds, None, false).unwrap();
|
||||
});
|
||||
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
|
||||
|
@@ -35,7 +35,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
|
||||
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
|
||||
let entries = create_ticks(num_ticks, Hash::default());
|
||||
bencher.iter(|| {
|
||||
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0).unwrap();
|
||||
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap();
|
||||
shredder.entries_to_shreds(&entries, true, 0);
|
||||
})
|
||||
}
|
||||
@@ -50,7 +50,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
|
||||
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
|
||||
// 1Mb
|
||||
bencher.iter(|| {
|
||||
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0).unwrap();
|
||||
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap();
|
||||
shredder.entries_to_shreds(&entries, true, 0);
|
||||
})
|
||||
}
|
||||
@@ -63,7 +63,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
|
||||
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
|
||||
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
|
||||
let entries = create_ticks(num_ticks, Hash::default());
|
||||
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0).unwrap();
|
||||
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0, 0).unwrap();
|
||||
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
|
||||
bencher.iter(|| {
|
||||
let raw = &mut Shredder::deshred(&data_shreds).unwrap();
|
||||
@@ -75,7 +75,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
|
||||
fn bench_deserialize_hdr(bencher: &mut Bencher) {
|
||||
let data = vec![0; SIZE_OF_DATA_SHRED_PAYLOAD];
|
||||
|
||||
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0);
|
||||
let shred = Shred::new_from_data(2, 1, 1, Some(&data), true, true, 0, 0);
|
||||
|
||||
bencher.iter(|| {
|
||||
let payload = shred.payload.clone();
|
||||
|
@@ -158,6 +158,7 @@ mod test {
|
||||
true,
|
||||
&Arc::new(Keypair::new()),
|
||||
entries,
|
||||
0,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@@ -43,6 +43,7 @@ impl BroadcastStageType {
|
||||
receiver: Receiver<WorkingBankEntry>,
|
||||
exit_sender: &Arc<AtomicBool>,
|
||||
blocktree: &Arc<Blocktree>,
|
||||
shred_version: u16,
|
||||
) -> BroadcastStage {
|
||||
match self {
|
||||
BroadcastStageType::Standard => {
|
||||
@@ -53,7 +54,7 @@ impl BroadcastStageType {
|
||||
receiver,
|
||||
exit_sender,
|
||||
blocktree,
|
||||
StandardBroadcastRun::new(keypair),
|
||||
StandardBroadcastRun::new(keypair, shred_version),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -63,7 +64,7 @@ impl BroadcastStageType {
|
||||
receiver,
|
||||
exit_sender,
|
||||
blocktree,
|
||||
FailEntryVerificationBroadcastRun::new(),
|
||||
FailEntryVerificationBroadcastRun::new(shred_version),
|
||||
),
|
||||
|
||||
BroadcastStageType::BroadcastFakeBlobs => BroadcastStage::new(
|
||||
@@ -72,7 +73,7 @@ impl BroadcastStageType {
|
||||
receiver,
|
||||
exit_sender,
|
||||
blocktree,
|
||||
BroadcastFakeBlobsRun::new(0),
|
||||
BroadcastFakeBlobsRun::new(0, shred_version),
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -246,7 +247,7 @@ mod test {
|
||||
entry_receiver,
|
||||
&exit_sender,
|
||||
&blocktree,
|
||||
StandardBroadcastRun::new(leader_keypair),
|
||||
StandardBroadcastRun::new(leader_keypair, 0),
|
||||
);
|
||||
|
||||
MockBroadcastStage {
|
||||
|
@@ -6,13 +6,15 @@ use solana_sdk::hash::Hash;
|
||||
pub(super) struct BroadcastFakeBlobsRun {
|
||||
last_blockhash: Hash,
|
||||
partition: usize,
|
||||
shred_version: u16,
|
||||
}
|
||||
|
||||
impl BroadcastFakeBlobsRun {
|
||||
pub(super) fn new(partition: usize) -> Self {
|
||||
pub(super) fn new(partition: usize, shred_version: u16) -> Self {
|
||||
Self {
|
||||
last_blockhash: Hash::default(),
|
||||
partition,
|
||||
shred_version,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -45,6 +47,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
|
||||
RECOMMENDED_FEC_RATE,
|
||||
keypair.clone(),
|
||||
(bank.tick_height() % bank.ticks_per_slot()) as u8,
|
||||
self.shred_version,
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
|
@@ -2,11 +2,13 @@ use super::*;
|
||||
use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE};
|
||||
use solana_sdk::hash::Hash;
|
||||
|
||||
pub(super) struct FailEntryVerificationBroadcastRun {}
|
||||
pub(super) struct FailEntryVerificationBroadcastRun {
|
||||
shred_version: u16,
|
||||
}
|
||||
|
||||
impl FailEntryVerificationBroadcastRun {
|
||||
pub(super) fn new() -> Self {
|
||||
Self {}
|
||||
pub(super) fn new(shred_version: u16) -> Self {
|
||||
Self { shred_version }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +45,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
||||
RECOMMENDED_FEC_RATE,
|
||||
keypair.clone(),
|
||||
(bank.tick_height() % bank.ticks_per_slot()) as u8,
|
||||
self.shred_version,
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
|
@@ -35,16 +35,18 @@ pub(super) struct StandardBroadcastRun {
|
||||
current_slot_and_parent: Option<(u64, u64)>,
|
||||
slot_broadcast_start: Option<Instant>,
|
||||
keypair: Arc<Keypair>,
|
||||
shred_version: u16,
|
||||
}
|
||||
|
||||
impl StandardBroadcastRun {
|
||||
pub(super) fn new(keypair: Arc<Keypair>) -> Self {
|
||||
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
|
||||
Self {
|
||||
stats: BroadcastStats::default(),
|
||||
unfinished_slot: None,
|
||||
current_slot_and_parent: None,
|
||||
slot_broadcast_start: None,
|
||||
keypair,
|
||||
shred_version,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,6 +65,7 @@ impl StandardBroadcastRun {
|
||||
true,
|
||||
true,
|
||||
max_ticks_in_slot & SHRED_TICK_REFERENCE_MASK,
|
||||
self.shred_version,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
@@ -93,6 +96,7 @@ impl StandardBroadcastRun {
|
||||
RECOMMENDED_FEC_RATE,
|
||||
self.keypair.clone(),
|
||||
reference_tick,
|
||||
self.shred_version,
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
@@ -346,7 +350,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_interrupted_slot_last_shred() {
|
||||
let keypair = Arc::new(Keypair::new());
|
||||
let mut run = StandardBroadcastRun::new(keypair.clone());
|
||||
let mut run = StandardBroadcastRun::new(keypair.clone(), 0);
|
||||
|
||||
// Set up the slot to be interrupted
|
||||
let next_shred_index = 10;
|
||||
@@ -392,7 +396,7 @@ mod test {
|
||||
};
|
||||
|
||||
// Step 1: Make an incomplete transmission for slot 0
|
||||
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair.clone());
|
||||
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair.clone(), 0);
|
||||
standard_broadcast_run
|
||||
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
|
||||
.unwrap();
|
||||
@@ -468,7 +472,7 @@ mod test {
|
||||
last_tick_height: ticks.len() as u64,
|
||||
};
|
||||
|
||||
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair);
|
||||
let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair, 0);
|
||||
standard_broadcast_run
|
||||
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
|
||||
.unwrap();
|
||||
|
@@ -149,6 +149,7 @@ mod tests {
|
||||
true,
|
||||
&Arc::new(keypair),
|
||||
entries,
|
||||
0,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -164,8 +165,8 @@ mod tests {
|
||||
let mut hasher = Hasher::default();
|
||||
hasher.hash(&buf[..size]);
|
||||
|
||||
// golden needs to be updated if blob stuff changes....
|
||||
let golden: Hash = "HLzH7Nrh4q2K5WTh3e9vPNFZ1QVYhVDRMN9u5v51GqpJ"
|
||||
// golden needs to be updated if blob structure changes....
|
||||
let golden: Hash = "9K6NR4cazo7Jzk2CpyXmNaZMGqvfXG83JzyJipkoHare"
|
||||
.parse()
|
||||
.unwrap();
|
||||
|
||||
|
@@ -147,6 +147,7 @@ mod tests {
|
||||
true,
|
||||
&Arc::new(Keypair::new()),
|
||||
entries,
|
||||
0,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -208,6 +209,7 @@ mod tests {
|
||||
true,
|
||||
&Arc::new(Keypair::new()),
|
||||
entries,
|
||||
0,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@@ -1087,7 +1087,7 @@ mod test {
|
||||
), // should cause AccountNotFound error
|
||||
],
|
||||
);
|
||||
entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false)
|
||||
entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false, 0)
|
||||
});
|
||||
|
||||
assert_matches!(
|
||||
@@ -1112,7 +1112,7 @@ mod test {
|
||||
*blockhash,
|
||||
)],
|
||||
);
|
||||
entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false)
|
||||
entries_to_test_shreds(vec![entry], slot, slot.saturating_sub(1), false, 0)
|
||||
});
|
||||
|
||||
assert_matches!(res, Err(Error::BlobError(BlobError::VerificationFailed)));
|
||||
|
@@ -214,6 +214,7 @@ impl RetransmitStage {
|
||||
completed_slots_receiver: CompletedSlotsReceiver,
|
||||
epoch_schedule: EpochSchedule,
|
||||
cfg: Option<PartitionCfg>,
|
||||
shred_version: u16,
|
||||
) -> Self {
|
||||
let (retransmit_sender, retransmit_receiver) = channel();
|
||||
|
||||
@@ -252,6 +253,7 @@ impl RetransmitStage {
|
||||
&leader_schedule_cache,
|
||||
id,
|
||||
last_root,
|
||||
shred_version,
|
||||
);
|
||||
rv && is_connected
|
||||
},
|
||||
|
@@ -38,6 +38,7 @@ impl Tpu {
|
||||
blocktree: &Arc<Blocktree>,
|
||||
broadcast_type: &BroadcastStageType,
|
||||
exit: &Arc<AtomicBool>,
|
||||
shred_version: u16,
|
||||
) -> Self {
|
||||
let (packet_sender, packet_receiver) = channel();
|
||||
let fetch_stage = FetchStage::new_with_sender(
|
||||
@@ -74,6 +75,7 @@ impl Tpu {
|
||||
entry_receiver,
|
||||
&exit,
|
||||
blocktree,
|
||||
shred_version,
|
||||
);
|
||||
|
||||
Self {
|
||||
|
@@ -81,6 +81,7 @@ impl Tvu {
|
||||
completed_slots_receiver: CompletedSlotsReceiver,
|
||||
fork_confidence_cache: Arc<RwLock<ForkConfidenceCache>>,
|
||||
cfg: Option<PartitionCfg>,
|
||||
shred_version: u16,
|
||||
) -> Self
|
||||
where
|
||||
T: 'static + KeypairUtil + Sync + Send,
|
||||
@@ -127,6 +128,7 @@ impl Tvu {
|
||||
completed_slots_receiver,
|
||||
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
|
||||
cfg,
|
||||
shred_version,
|
||||
);
|
||||
|
||||
let (blockstream_slot_sender, blockstream_slot_receiver) = channel();
|
||||
@@ -290,6 +292,7 @@ pub mod tests {
|
||||
completed_slots_receiver,
|
||||
fork_confidence_cache,
|
||||
None,
|
||||
0,
|
||||
);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
tvu.join().unwrap();
|
||||
|
@@ -37,6 +37,7 @@ use solana_sdk::{
|
||||
timing::timestamp,
|
||||
};
|
||||
|
||||
use solana_ledger::shred::Shred;
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
path::{Path, PathBuf},
|
||||
@@ -184,6 +185,8 @@ impl Validator {
|
||||
let bank = bank_forks[bank_info.bank_slot].clone();
|
||||
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||
let fork_confidence_cache = Arc::new(RwLock::new(ForkConfidenceCache::default()));
|
||||
// The version used by shreds, derived from genesis
|
||||
let shred_version = Shred::version_from_hash(&genesis_blockhash);
|
||||
|
||||
let mut validator_exit = ValidatorExit::default();
|
||||
let exit_ = exit.clone();
|
||||
@@ -344,6 +347,7 @@ impl Validator {
|
||||
completed_slots_receiver,
|
||||
fork_confidence_cache,
|
||||
config.partition_cfg.clone(),
|
||||
shred_version,
|
||||
);
|
||||
|
||||
if config.dev_sigverify_disabled {
|
||||
@@ -361,6 +365,7 @@ impl Validator {
|
||||
&blocktree,
|
||||
&config.broadcast_stage_type,
|
||||
&exit,
|
||||
shred_version,
|
||||
);
|
||||
|
||||
datapoint_info!("validator-new", ("id", id.to_string(), String));
|
||||
|
@@ -41,6 +41,7 @@ pub fn should_retransmit_and_persist(
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
my_pubkey: &Pubkey,
|
||||
root: u64,
|
||||
shred_version: u16,
|
||||
) -> bool {
|
||||
let slot_leader_pubkey = match bank {
|
||||
None => leader_schedule_cache.slot_leader_at(shred.slot(), None),
|
||||
@@ -56,6 +57,9 @@ pub fn should_retransmit_and_persist(
|
||||
} else if !shred.verify(&leader_id) {
|
||||
inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1);
|
||||
false
|
||||
} else if shred.version() != shred_version {
|
||||
inc_new_counter_debug!("streamer-recv_window-incorrect_shred_version", 1);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
@@ -309,7 +313,7 @@ mod test {
|
||||
parent: u64,
|
||||
keypair: &Arc<Keypair>,
|
||||
) -> Vec<Shred> {
|
||||
let shredder = Shredder::new(slot, parent, 0.0, keypair.clone(), 0)
|
||||
let shredder = Shredder::new(slot, parent, 0.0, keypair.clone(), 0, 0)
|
||||
.expect("Failed to create entry shredder");
|
||||
shredder.entries_to_shreds(&entries, true, 0).0
|
||||
}
|
||||
@@ -349,25 +353,30 @@ mod test {
|
||||
|
||||
// with a Bank for slot 0, blob continues
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0,),
|
||||
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0, 0),
|
||||
true
|
||||
);
|
||||
// with the wrong shred_version, shred gets thrown out
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0, 1),
|
||||
false
|
||||
);
|
||||
|
||||
// If it's a coding shred, test that slot >= root
|
||||
let (common, coding) = Shredder::new_coding_shred_header(5, 5, 6, 6, 0);
|
||||
let (common, coding) = Shredder::new_coding_shred_header(5, 5, 6, 6, 0, 0);
|
||||
let mut coding_shred =
|
||||
Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
|
||||
Shredder::sign_shred(&leader_keypair, &mut coding_shred);
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 0),
|
||||
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 0, 0),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 5),
|
||||
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 5, 0),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 6),
|
||||
should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 6, 0),
|
||||
false
|
||||
);
|
||||
|
||||
@@ -384,7 +393,8 @@ mod test {
|
||||
Some(wrong_bank.clone()),
|
||||
&wrong_cache,
|
||||
&me_id,
|
||||
0
|
||||
0,
|
||||
0,
|
||||
),
|
||||
false
|
||||
);
|
||||
@@ -392,7 +402,7 @@ mod test {
|
||||
// with a Bank and no idea who leader is, blob gets thrown out
|
||||
shreds[0].set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3);
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0),
|
||||
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0, 0),
|
||||
false
|
||||
);
|
||||
|
||||
@@ -400,7 +410,7 @@ mod test {
|
||||
let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
|
||||
let shreds = local_entries_to_shred(&[Entry::default()], slot, slot - 1, &leader_keypair);
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot),
|
||||
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot, 0),
|
||||
false
|
||||
);
|
||||
|
||||
@@ -409,13 +419,13 @@ mod test {
|
||||
let shreds =
|
||||
local_entries_to_shred(&[Entry::default()], slot + 1, slot - 1, &leader_keypair);
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot),
|
||||
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, slot, 0),
|
||||
false
|
||||
);
|
||||
|
||||
// if the blob came back from me, it doesn't continue, whether or not I have a bank
|
||||
assert_eq!(
|
||||
should_retransmit_and_persist(&shreds[0], None, &cache, &me_id, 0),
|
||||
should_retransmit_and_persist(&shreds[0], None, &cache, &me_id, 0, 0),
|
||||
false
|
||||
);
|
||||
}
|
||||
|
Reference in New Issue
Block a user