From 3ce9a16e7f4564dde3bd0d2dc8fed0dbb878b397 Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 19 May 2020 12:34:26 -0700 Subject: [PATCH] v1.0: Add nonce to shreds repairs, add shred data size to header (#10110) * Add nonce to shreds/repairs * Add data shred size to header * Align with future epoch Co-authored-by: Carl --- archiver-lib/src/archiver.rs | 5 +- core/benches/cluster_info.rs | 4 +- core/benches/shredder.rs | 23 +- .../broadcast_stage/standard_broadcast_run.rs | 8 +- core/src/lib.rs | 1 + core/src/repair_response.rs | 129 +++++++ core/src/repair_service.rs | 28 +- core/src/replay_stage.rs | 1 + core/src/serve_repair.rs | 335 +++++++++++++----- core/src/window_service.rs | 148 +++++--- ledger/src/blockstore.rs | 30 +- ledger/src/shred.rs | 257 +++++++++++--- ledger/src/sigverify_shreds.rs | 101 ++++-- ledger/tests/shred.rs | 18 +- 14 files changed, 831 insertions(+), 257 deletions(-) create mode 100644 core/src/repair_response.rs diff --git a/archiver-lib/src/archiver.rs b/archiver-lib/src/archiver.rs index ed3e412731..6a2641457a 100644 --- a/archiver-lib/src/archiver.rs +++ b/archiver-lib/src/archiver.rs @@ -13,8 +13,7 @@ use solana_core::{ contact_info::ContactInfo, gossip_service::GossipService, packet::{limited_deserialize, PACKET_DATA_SIZE}, - repair_service, - repair_service::{RepairService, RepairSlotRange, RepairStats, RepairStrategy}, + repair_service::{self, RepairService, RepairSlotRange, RepairStats, RepairStrategy}, serve_repair::ServeRepair, shred_fetch_stage::ShredFetchStage, sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, @@ -846,7 +845,7 @@ impl Archiver { .into_iter() .filter_map(|repair_request| { serve_repair - .map_repair_request(&repair_request, &mut repair_stats) + .map_repair_request(&repair_request, &mut repair_stats, Some(0)) .map(|result| ((archiver_info.gossip, result), repair_request)) .ok() }) diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 3ad6ac803a..f58040e102 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -5,6 +5,7 @@ extern crate test; use rand::{thread_rng, Rng}; use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::contact_info::ContactInfo; +use solana_ledger::shred::{Shred, NONCE_SHRED_PAYLOAD_SIZE}; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::collections::HashMap; @@ -20,9 +21,8 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone()); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - const SHRED_SIZE: usize = 1024; const NUM_SHREDS: usize = 32; - let shreds = vec![vec![0; SHRED_SIZE]; NUM_SHREDS]; + let shreds = vec![vec![0; NONCE_SHRED_PAYLOAD_SIZE]; NUM_SHREDS]; let seeds = vec![[0u8; 32]; NUM_SHREDS]; let mut stakes = HashMap::new(); const NUM_PEERS: usize = 200; diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index ec157bf98b..c3ba2c780b 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -5,11 +5,11 @@ extern crate test; use solana_ledger::entry::{create_ticks, Entry}; use solana_ledger::shred::{ max_entries_per_n_shred, max_ticks_per_n_shreds, Shred, Shredder, RECOMMENDED_FEC_RATE, - SIZE_OF_DATA_SHRED_PAYLOAD, + SIZE_OF_NONCE_DATA_SHRED_PAYLOAD, }; use solana_perf::test_tx; use solana_sdk::hash::Hash; -use solana_sdk::signature::{Keypair, Signer}; +use solana_sdk::signature::Keypair; use std::sync::Arc; use test::Bencher; @@ -29,10 +29,11 @@ fn make_large_unchained_entries(txs_per_entry: u64, num_entries: u64) -> Vec, +) -> Option { + if Shred::is_nonce_unlocked(slot) && nonce.is_none() + || !Shred::is_nonce_unlocked(slot) && nonce.is_some() + { + return None; + } + let shred = blockstore + .get_data_shred(slot, shred_index) + .expect("Blockstore could not get data shred"); + shred.map(|shred| repair_response_packet_from_shred(slot, shred, dest, nonce)) +} + +pub fn repair_response_packet_from_shred( + slot: Slot, + shred: Vec, + dest: &SocketAddr, + nonce: Option, +) -> Packet { + let size_of_nonce = { + if Shred::is_nonce_unlocked(slot) { + assert!(nonce.is_some()); + SIZE_OF_NONCE + } else { + assert!(nonce.is_none()); + 0 + } + }; + let mut packet = Packet::default(); + packet.meta.size = shred.len() + size_of_nonce; + packet.meta.set_addr(dest); + packet.data[..shred.len()].copy_from_slice(&shred); + let mut wr = io::Cursor::new(&mut packet.data[shred.len()..]); + if let Some(nonce) = nonce { + bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce"); + } + packet +} + +pub fn nonce(buf: &[u8]) -> Option { + if buf.len() < SIZE_OF_NONCE { + None + } else { + limited_deserialize(&buf[buf.len() - SIZE_OF_NONCE..]).ok() + } +} + +#[cfg(test)] +mod test { + use super::*; + use solana_ledger::{ + shred::{Shred, Shredder, UNLOCK_NONCE_SLOT}, + sigverify_shreds::verify_shred_cpu, + }; + use solana_sdk::signature::{Keypair, Signer}; + use std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr}, + }; + + fn run_test_sigverify_shred_cpu_repair(slot: Slot) { + solana_logger::setup(); + let mut shred = Shred::new_from_data( + slot, + 0xc0de, + 0xdead, + Some(&[1, 2, 3, 4]), + true, + true, + 0, + 0, + 0xc0de, + ); + assert_eq!(shred.slot(), slot); + let keypair = Keypair::new(); + Shredder::sign_shred(&keypair, &mut shred); + trace!("signature {}", shred.common_header.signature); + let nonce = if Shred::is_nonce_unlocked(slot) { + Some(9) + } else { + None + }; + let mut packet = repair_response_packet_from_shred( + slot, + shred.payload, + &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080), + nonce, + ); + packet.meta.repair = true; + + let leader_slots = [(slot, keypair.pubkey().to_bytes())] + .iter() + .cloned() + .collect(); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, Some(1)); + + let wrong_keypair = Keypair::new(); + let leader_slots = [(slot, wrong_keypair.pubkey().to_bytes())] + .iter() + .cloned() + .collect(); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, Some(0)); + + let leader_slots = HashMap::new(); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, None); + } + + #[test] + fn test_sigverify_shred_cpu_repair() { + run_test_sigverify_shred_cpu_repair(UNLOCK_NONCE_SLOT); + run_test_sigverify_shred_cpu_repair(UNLOCK_NONCE_SLOT + 1); + } +} diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index da223a3233..52ea0ecf9a 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -168,22 +168,16 @@ impl RepairService { }; if let Ok(repairs) = repairs { - let reqs: Vec<_> = repairs - .into_iter() - .filter_map(|repair_request| { - serve_repair - .repair_request(&repair_request, &mut repair_stats) - .map(|result| (result, repair_request)) - .ok() - }) - .collect(); - - for ((to, req), _) in reqs { - repair_socket.send_to(&req, to).unwrap_or_else(|e| { - info!("{} repair req send_to({}) error {:?}", id, to, e); - 0 - }); - } + repairs.into_iter().for_each(|repair_request| { + if let Ok((to, req)) = + serve_repair.repair_request(&repair_request, &mut repair_stats) + { + repair_socket.send_to(&req, to).unwrap_or_else(|e| { + info!("{} repair req send_to({}) error {:?}", id, to, e); + 0 + }); + } + }); } if last_stats.elapsed().as_secs() > 1 { let repair_total = repair_stats.shred.count @@ -607,7 +601,7 @@ mod test { let blockstore = Blockstore::open(&blockstore_path).unwrap(); let slots: Vec = vec![1, 3, 5, 7, 8]; - let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1; + let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1; let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); for (mut slot_shreds, _) in shreds.into_iter() { diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 893e1aaca4..ca7de49a8d 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1738,6 +1738,7 @@ pub(crate) mod tests { ShredCommonHeader::default(), data_header, CodingShredHeader::default(), + PACKET_DATA_SIZE, ); bincode::serialize_into( &mut shred.payload[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER..], diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index b81104804f..0dcb426fb5 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -1,19 +1,21 @@ -use crate::packet::limited_deserialize; use crate::streamer::{PacketReceiver, PacketSender}; use crate::{ cluster_info::{ClusterInfo, ClusterInfoError}, contact_info::ContactInfo, - packet::Packet, + repair_response, repair_service::RepairStats, result::{Error, Result}, }; use bincode::serialize; use rand::{thread_rng, Rng}; -use solana_ledger::blockstore::Blockstore; +use solana_ledger::{ + blockstore::Blockstore, + shred::{Nonce, Shred}, +}; use solana_measure::measure::Measure; use solana_measure::thread_mem_usage; use solana_metrics::{datapoint_debug, inc_new_counter_debug}; -use solana_perf::packet::{Packets, PacketsRecycler}; +use solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler}; use solana_sdk::{ clock::Slot, signature::{Keypair, Signer}, @@ -29,6 +31,7 @@ use std::{ /// the number of slots to respond with when responding to `Orphan` requests pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; +pub const DEFAULT_NONCE: u32 = 42; #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] pub enum RepairType { @@ -63,6 +66,9 @@ enum RepairProtocol { WindowIndex(ContactInfo, u64, u64), HighestWindowIndex(ContactInfo, u64, u64), Orphan(ContactInfo, u64), + WindowIndexWithNonce(ContactInfo, u64, u64, Nonce), + HighestWindowIndexWithNonce(ContactInfo, u64, u64, Nonce), + OrphanWithNonce(ContactInfo, u64, Nonce), } #[derive(Clone)] @@ -106,6 +112,9 @@ impl ServeRepair { RepairProtocol::WindowIndex(ref from, _, _) => from, RepairProtocol::HighestWindowIndex(ref from, _, _) => from, RepairProtocol::Orphan(ref from, _) => from, + RepairProtocol::WindowIndexWithNonce(ref from, _, _, _) => from, + RepairProtocol::HighestWindowIndexWithNonce(ref from, _, _, _) => from, + RepairProtocol::OrphanWithNonce(ref from, _, _) => from, } } @@ -140,6 +149,7 @@ impl ServeRepair { &me.read().unwrap().my_info, *slot, *shred_index, + None, ), "WindowIndex", ) @@ -154,6 +164,7 @@ impl ServeRepair { blockstore, *slot, *highest_index, + None, ), "HighestWindowIndex", ) @@ -167,10 +178,55 @@ impl ServeRepair { blockstore, *slot, MAX_ORPHAN_REPAIR_RESPONSES, + None, ), "Orphan", ) } + RepairProtocol::WindowIndexWithNonce(_, slot, shred_index, nonce) => { + stats.window_index += 1; + ( + Self::run_window_request( + recycler, + from, + &from_addr, + blockstore, + &me.read().unwrap().my_info, + *slot, + *shred_index, + Some(*nonce), + ), + "WindowIndexWithNonce", + ) + } + RepairProtocol::HighestWindowIndexWithNonce(_, slot, highest_index, nonce) => { + stats.highest_window_index += 1; + ( + Self::run_highest_window_request( + recycler, + &from_addr, + blockstore, + *slot, + *highest_index, + Some(*nonce), + ), + "HighestWindowIndexWithNonce", + ) + } + RepairProtocol::OrphanWithNonce(_, slot, nonce) => { + stats.orphan += 1; + ( + Self::run_orphan( + recycler, + &from_addr, + blockstore, + *slot, + MAX_ORPHAN_REPAIR_RESPONSES, + Some(*nonce), + ), + "OrphanWithNonce", + ) + } } }; @@ -321,20 +377,47 @@ impl ServeRepair { }); } - fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result> { - let req = RepairProtocol::WindowIndex(self.my_info.clone(), slot, shred_index); + fn window_index_request_bytes( + &self, + slot: Slot, + shred_index: u64, + nonce: Option, + ) -> Result> { + let req = if let Some(nonce) = nonce { + RepairProtocol::WindowIndexWithNonce(self.my_info.clone(), slot, shred_index, nonce) + } else { + RepairProtocol::WindowIndex(self.my_info.clone(), slot, shred_index) + }; let out = serialize(&req)?; Ok(out) } - fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result> { - let req = RepairProtocol::HighestWindowIndex(self.my_info.clone(), slot, shred_index); + fn window_highest_index_request_bytes( + &self, + slot: Slot, + shred_index: u64, + nonce: Option, + ) -> Result> { + let req = if let Some(nonce) = nonce { + RepairProtocol::HighestWindowIndexWithNonce( + self.my_info.clone(), + slot, + shred_index, + nonce, + ) + } else { + RepairProtocol::HighestWindowIndex(self.my_info.clone(), slot, shred_index) + }; let out = serialize(&req)?; Ok(out) } - fn orphan_bytes(&self, slot: Slot) -> Result> { - let req = RepairProtocol::Orphan(self.my_info.clone(), slot); + fn orphan_bytes(&self, slot: Slot, nonce: Option) -> Result> { + let req = if let Some(nonce) = nonce { + RepairProtocol::OrphanWithNonce(self.my_info.clone(), slot, nonce) + } else { + RepairProtocol::Orphan(self.my_info.clone(), slot) + }; let out = serialize(&req)?; Ok(out) } @@ -346,6 +429,7 @@ impl ServeRepair { ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location + let slot = repair_request.slot(); let valid: Vec<_> = self .cluster_info .read() @@ -356,7 +440,12 @@ impl ServeRepair { } let n = thread_rng().gen::() % valid.len(); let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port - let out = self.map_repair_request(repair_request, repair_stats)?; + let nonce = if Shred::is_nonce_unlocked(slot) { + Some(DEFAULT_NONCE) + } else { + None + }; + let out = self.map_repair_request(&repair_request, repair_stats, nonce)?; Ok((addr, out)) } @@ -365,19 +454,24 @@ impl ServeRepair { &self, repair_request: &RepairType, repair_stats: &mut RepairStats, + nonce: Option, ) -> Result> { + let slot = repair_request.slot(); + if Shred::is_nonce_unlocked(slot) { + assert!(nonce.is_some()); + } match repair_request { RepairType::Shred(slot, shred_index) => { repair_stats.shred.update(*slot); - Ok(self.window_index_request_bytes(*slot, *shred_index)?) + Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?) } RepairType::HighestShred(slot, shred_index) => { repair_stats.highest_shred.update(*slot); - Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?) + Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?) } RepairType::Orphan(slot) => { repair_stats.orphan.update(*slot); - Ok(self.orphan_bytes(*slot)?) + Ok(self.orphan_bytes(*slot, nonce)?) } } } @@ -390,12 +484,19 @@ impl ServeRepair { me: &ContactInfo, slot: Slot, shred_index: u64, + nonce: Option, ) -> Option { if let Some(blockstore) = blockstore { // Try to find the requested index in one of the slots - let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr); + let packet = repair_response::repair_response_packet( + blockstore, + slot, + shred_index, + from_addr, + nonce, + ); - if let Ok(Some(packet)) = packet { + if let Some(packet) = packet { inc_new_counter_debug!("serve_repair-window-request-ledger", 1); return Some(Packets::new_with_recycler_data( recycler, @@ -423,15 +524,20 @@ impl ServeRepair { blockstore: Option<&Arc>, slot: Slot, highest_index: u64, + nonce: Option, ) -> Option { let blockstore = blockstore?; // Try to find the requested index in one of the slots let meta = blockstore.meta(slot).ok()??; if meta.received > highest_index { // meta.received must be at least 1 by this point - let packet = - Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr) - .ok()??; + let packet = repair_response::repair_response_packet( + blockstore, + slot, + meta.received - 1, + from_addr, + nonce, + )?; return Some(Packets::new_with_recycler_data( recycler, "run_highest_window_request", @@ -447,6 +553,7 @@ impl ServeRepair { blockstore: Option<&Arc>, mut slot: Slot, max_responses: usize, + nonce: Option, ) -> Option { let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); if let Some(blockstore) = blockstore { @@ -455,9 +562,19 @@ impl ServeRepair { if meta.received == 0 { break; } - let packet = - Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr); - if let Ok(Some(packet)) = packet { + let nonce = if Shred::is_nonce_unlocked(slot) { + nonce + } else { + None + }; + let packet = repair_response::repair_response_packet( + blockstore, + slot, + meta.received - 1, + from_addr, + nonce, + ); + if let Some(packet) = packet { res.packets.push(packet); } if meta.is_parent_set() && res.packets.len() <= max_responses { @@ -472,41 +589,31 @@ impl ServeRepair { } Some(res) } - - fn get_data_shred_as_packet( - blockstore: &Arc, - slot: Slot, - shred_index: u64, - dest: &SocketAddr, - ) -> Result> { - let data = blockstore.get_data_shred(slot, shred_index)?; - Ok(data.map(|data| { - let mut packet = Packet::default(); - packet.meta.size = data.len(); - packet.meta.set_addr(dest); - packet.data.copy_from_slice(&data); - packet - })) - } } #[cfg(test)] mod tests { use super::*; - use crate::result::Error; + use crate::{repair_response, result::Error}; use solana_ledger::get_tmp_ledger_path; use solana_ledger::{ blockstore::make_many_slot_entries, blockstore_processor::fill_blockstore_slot_with_ticks, shred::{ max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, + NONCE_SHRED_PAYLOAD_SIZE, UNLOCK_NONCE_SLOT, }, }; use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp}; - /// test run_window_requestwindow requests respond with the right shred, and do not overrun #[test] - fn run_highest_window_request() { + fn test_run_highest_window_request() { + run_highest_window_request(UNLOCK_NONCE_SLOT + 3, 3, Some(9)); + run_highest_window_request(UNLOCK_NONCE_SLOT, 3, None); + } + + /// test run_window_request responds with the right shred, and do not overrun + fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Option) { let recycler = PacketsRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); @@ -518,41 +625,51 @@ mod tests { Some(&blockstore), 0, 0, + nonce, ); assert!(rv.is_none()); let _ = fill_blockstore_slot_with_ticks( &blockstore, - max_ticks_per_n_shreds(1) + 1, - 2, - 1, + max_ticks_per_n_shreds(1, None) + 1, + slot, + slot - num_slots + 1, Hash::default(), ); + let index = 1; let rv = ServeRepair::run_highest_window_request( &recycler, &socketaddr_any!(), Some(&blockstore), - 2, - 1, - ); + slot, + index, + nonce, + ) + .expect("packets"); + let rv: Vec = rv - .expect("packets") .packets .into_iter() - .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) + .filter_map(|b| { + if nonce.is_some() { + assert_eq!(repair_response::nonce(&b.data[..]), nonce); + } + Shred::new_from_serialized_shred(b.data.to_vec()).ok() + }) .collect(); assert!(!rv.is_empty()); - let index = blockstore.meta(2).unwrap().unwrap().received - 1; + let index = blockstore.meta(slot).unwrap().unwrap().received - 1; assert_eq!(rv[0].index(), index as u32); - assert_eq!(rv[0].slot(), 2); + assert_eq!(rv[0].slot(), slot); let rv = ServeRepair::run_highest_window_request( &recycler, &socketaddr_any!(), Some(&blockstore), - 2, + slot, index + 1, + nonce, ); assert!(rv.is_none()); } @@ -560,9 +677,14 @@ mod tests { Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); } - /// test window requests respond with the right shred, and do not overrun #[test] - fn run_window_request() { + fn test_run_window_request() { + run_window_request(UNLOCK_NONCE_SLOT + 1, Some(9)); + run_window_request(UNLOCK_NONCE_SLOT - 3, None); + } + + /// test window requests respond with the right shred, and do not overrun + fn run_window_request(slot: Slot, nonce: Option) { let recycler = PacketsRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); @@ -589,12 +711,13 @@ mod tests { &socketaddr_any!(), Some(&blockstore), &me, + slot, 0, - 0, + nonce, ); assert!(rv.is_none()); let mut common_header = ShredCommonHeader::default(); - common_header.slot = 2; + common_header.slot = slot; common_header.index = 1; let mut data_header = DataShredHeader::default(); data_header.parent_offset = 1; @@ -602,30 +725,37 @@ mod tests { common_header, data_header, CodingShredHeader::default(), + NONCE_SHRED_PAYLOAD_SIZE, ); blockstore .insert_shreds(vec![shred_info], None, false) .expect("Expect successful ledger write"); + let index = 1; let rv = ServeRepair::run_window_request( &recycler, &me, &socketaddr_any!(), Some(&blockstore), &me, - 2, - 1, - ); - assert!(!rv.is_none()); + slot, + index, + nonce, + ) + .expect("packets"); let rv: Vec = rv - .expect("packets") .packets .into_iter() - .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) + .filter_map(|b| { + if nonce.is_some() { + assert_eq!(repair_response::nonce(&b.data[..]), nonce); + } + Shred::new_from_serialized_shred(b.data.to_vec()).ok() + }) .collect(); assert_eq!(rv[0].index(), 1); - assert_eq!(rv[0].slot(), 2); + assert_eq!(rv[0].slot(), slot); } Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); @@ -697,52 +827,85 @@ mod tests { } #[test] - fn run_orphan() { + fn test_run_orphan() { + run_orphan(UNLOCK_NONCE_SLOT + 1, 3, Some(9)); + // Test where the response will be for some slots <= UNLOCK_NONCE_SLOT, + // and some of the response will be for some slots > UNLOCK_NONCE_SLOT. + // Should not panic. + run_orphan(UNLOCK_NONCE_SLOT, 3, None); + run_orphan(UNLOCK_NONCE_SLOT, 3, Some(9)); + } + + fn run_orphan(slot: Slot, num_slots: u64, nonce: Option) { solana_logger::setup(); let recycler = PacketsRecycler::default(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let rv = - ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0); + let rv = ServeRepair::run_orphan( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + slot, + 0, + nonce, + ); assert!(rv.is_none()); - // Create slots 1, 2, 3 with 5 shreds apiece - let (shreds, _) = make_many_slot_entries(1, 3, 5); + // Create slots [slot, slot + num_slots) with 5 shreds apiece + let (shreds, _) = make_many_slot_entries(slot, num_slots, 5); blockstore .insert_shreds(shreds, None, false) .expect("Expect successful ledger write"); - // We don't have slot 4, so we don't know how to service this requeset - let rv = - ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5); + // We don't have slot `slot + num_slots`, so we don't know how to service this request + let rv = ServeRepair::run_orphan( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + slot + num_slots, + 5, + nonce, + ); assert!(rv.is_none()); - // For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively - // for this request - let rv: Vec<_> = - ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5) - .expect("run_orphan packets") - .packets - .iter() - .map(|b| b.clone()) - .collect(); - let expected: Vec<_> = (1..=3) + // For a orphan request for `slot + num_slots - 1`, we should return the highest shreds + // from slots in the range [slot, slot + num_slots - 1] + let rv: Vec<_> = ServeRepair::run_orphan( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + slot + num_slots - 1, + 5, + nonce, + ) + .expect("run_orphan packets") + .packets + .iter() + .map(|b| b.clone()) + .collect(); + + // Verify responses + let expected: Vec<_> = (slot..slot + num_slots) .rev() - .map(|slot| { + .filter_map(|slot| { + let nonce = if Shred::is_nonce_unlocked(slot) { + nonce + } else { + None + }; let index = blockstore.meta(slot).unwrap().unwrap().received - 1; - ServeRepair::get_data_shred_as_packet( + repair_response::repair_response_packet( &blockstore, slot, index, &socketaddr_any!(), + nonce, ) - .unwrap() - .unwrap() }) .collect(); - assert_eq!(rv, expected) + assert_eq!(rv, expected); } Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 3fa88f91ad..ff0988ea37 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -1,31 +1,37 @@ //! `window_service` handles the data plane incoming shreds, storing them in //! blockstore and retransmitting where required -//! -use crate::cluster_info::ClusterInfo; -use crate::packet::Packets; -use crate::repair_service::{RepairService, RepairStrategy}; -use crate::result::{Error, Result}; -use crate::streamer::PacketSender; +use crate::{ + cluster_info::ClusterInfo, + packet::Packets, + repair_response, + repair_service::{RepairService, RepairStrategy}, + result::{Error, Result}, + serve_repair::DEFAULT_NONCE, + streamer::PacketSender, +}; use crossbeam_channel::{ unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, }; use rayon::iter::IntoParallelRefMutIterator; use rayon::iter::ParallelIterator; use rayon::ThreadPool; -use solana_ledger::bank_forks::BankForks; -use solana_ledger::blockstore::{self, Blockstore, MAX_DATA_SHREDS_PER_SLOT}; -use solana_ledger::leader_schedule_cache::LeaderScheduleCache; -use solana_ledger::shred::Shred; +use solana_ledger::{ + bank_forks::BankForks, + blockstore::{self, Blockstore, MAX_DATA_SHREDS_PER_SLOT}, + leader_schedule_cache::LeaderScheduleCache, + shred::{Nonce, Shred}, +}; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; use solana_rayon_threadlimit::get_thread_count; use solana_runtime::bank::Bank; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::timing::duration_as_ms; -use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; -use std::thread::{self, Builder, JoinHandle}; -use std::time::{Duration, Instant}; +use solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}; +use std::{ + net::{SocketAddr, UdpSocket}, + sync::atomic::{AtomicBool, Ordering}, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, +}; fn verify_shred_slot(shred: &Shred, root: u64) -> bool { if shred.is_data() { @@ -102,8 +108,15 @@ fn run_check_duplicate( Ok(()) } +fn verify_repair(_shred: &Shred, repair_info: &Option) -> bool { + repair_info + .as_ref() + .map(|repair_info| repair_info.nonce == DEFAULT_NONCE) + .unwrap_or(true) +} + fn run_insert( - shred_receiver: &CrossbeamReceiver>, + shred_receiver: &CrossbeamReceiver<(Vec, Vec>)>, blockstore: &Arc, leader_schedule_cache: &Arc, handle_duplicate: F, @@ -112,12 +125,16 @@ where F: Fn(Shred) -> (), { let timer = Duration::from_millis(200); - let mut shreds = shred_receiver.recv_timeout(timer)?; - - while let Ok(mut more_shreds) = shred_receiver.try_recv() { - shreds.append(&mut more_shreds) + let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?; + while let Ok((more_shreds, more_repair_infos)) = shred_receiver.try_recv() { + shreds.extend(more_shreds); + repair_infos.extend(more_repair_infos); } + assert_eq!(shreds.len(), repair_infos.len()); + let mut i = 0; + shreds.retain(|shred| (verify_repair(&shred, &repair_infos[i]), i += 1).0); + let blockstore_insert_metrics = blockstore.insert_shreds_handle_duplicate( shreds, Some(leader_schedule_cache), @@ -131,7 +148,7 @@ where fn recv_window( blockstore: &Arc, - insert_shred_sender: &CrossbeamSender>, + insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, my_pubkey: &Pubkey, verified_receiver: &CrossbeamReceiver>, retransmit: &PacketSender, @@ -155,7 +172,7 @@ where inc_new_counter_debug!("streamer-recv_window-recv", total_packets); let last_root = blockstore.last_root(); - let shreds: Vec<_> = thread_pool.install(|| { + let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets .par_iter_mut() .flat_map(|packets| { @@ -164,34 +181,59 @@ where .iter_mut() .filter_map(|packet| { if packet.meta.discard { - inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1); + inc_new_counter_debug!( + "streamer-recv_window-invalid_or_unnecessary_packet", + 1 + ); None - } else if let Ok(shred) = - Shred::new_from_serialized_shred(packet.data.to_vec()) - { - if shred_filter(&shred, last_root) { - // Mark slot as dead if the current shred is on the boundary - // of max shreds per slot. However, let the current shred - // get retransmitted. It'll allow peer nodes to see this shred - // and trigger them to mark the slot as dead. - if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 { - let _ = blockstore.set_dead_slot(shred.slot()); + } else { + // shred fetch stage should be sending packets + // with sufficiently large buffers. Needed to ensure + // call to `new_from_serialized_shred` is safe. + assert_eq!(packet.data.len(), PACKET_DATA_SIZE); + let serialized_shred = packet.data.to_vec(); + if let Ok(shred) = Shred::new_from_serialized_shred(serialized_shred) { + let repair_info = { + if packet.meta.repair && Shred::is_nonce_unlocked(shred.slot()) + { + if let Some(nonce) = repair_response::nonce(&packet.data) { + let repair_info = RepairMeta { + _from_addr: packet.meta.addr(), + nonce, + }; + Some(repair_info) + } else { + // If can't parse the nonce, dump the packet + return None; + } + } else { + None + } + }; + if shred_filter(&shred, last_root) { + // Mark slot as dead if the current shred is on the boundary + // of max shreds per slot. However, let the current shred + // get retransmitted. It'll allow peer nodes to see this shred + // and trigger them to mark the slot as dead. + if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 { + let _ = blockstore.set_dead_slot(shred.slot()); + } + packet.meta.slot = shred.slot(); + packet.meta.seed = shred.seed(); + Some((shred, repair_info)) + } else { + packet.meta.discard = true; + None } - packet.meta.slot = shred.slot(); - packet.meta.seed = shred.seed(); - Some(shred) } else { packet.meta.discard = true; None } - } else { - packet.meta.discard = true; - None } }) .collect::>() }) - .collect() + .unzip() }); trace!("{:?} shreds from packets", shreds.len()); @@ -205,7 +247,7 @@ where } } - insert_shred_sender.send(shreds)?; + insert_shred_sender.send((shreds, repair_infos))?; trace!( "Elapsed processing time in recv_window(): {}", @@ -215,6 +257,11 @@ where Ok(()) } +struct RepairMeta { + _from_addr: SocketAddr, + nonce: Nonce, +} + // Implement a destructor for the window_service thread to signal it exited // even on panics struct Finalizer { @@ -336,7 +383,7 @@ impl WindowService { exit: &Arc, blockstore: &Arc, leader_schedule_cache: &Arc, - insert_receiver: CrossbeamReceiver>, + insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, duplicate_sender: CrossbeamSender, ) -> JoinHandle<()> { let exit = exit.clone(); @@ -377,7 +424,7 @@ impl WindowService { id: Pubkey, exit: &Arc, blockstore: &Arc, - insert_sender: CrossbeamSender>, + insert_sender: CrossbeamSender<(Vec, Vec>)>, verified_receiver: CrossbeamReceiver>, shred_filter: F, bank_forks: Option>>, @@ -483,12 +530,11 @@ mod test { repair_service::RepairSlotRange, }; use rand::thread_rng; - use solana_ledger::shred::DataShredHeader; use solana_ledger::{ blockstore::{make_many_slot_entries, Blockstore}, entry::{create_ticks, Entry}, get_tmp_ledger_path, - shred::Shredder, + shred::{DataShredHeader, Shredder, NONCE_SHRED_PAYLOAD_SIZE}, }; use solana_sdk::{ clock::Slot, @@ -562,8 +608,12 @@ mod test { // If it's a coding shred, test that slot >= root let (common, coding) = Shredder::new_coding_shred_header(5, 5, 5, 6, 6, 0, 0); - let mut coding_shred = - Shred::new_empty_from_header(common, DataShredHeader::default(), coding); + let mut coding_shred = Shred::new_empty_from_header( + common, + DataShredHeader::default(), + coding, + NONCE_SHRED_PAYLOAD_SIZE, + ); Shredder::sign_shred(&leader_keypair, &mut coding_shred); assert_eq!( should_retransmit_and_persist(&coding_shred, Some(bank.clone()), &cache, &me_id, 0, 0), diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 5cf48f184c..e89d3a0727 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1982,10 +1982,11 @@ impl Blockstore { let data_shreds = data_shreds?; assert!(data_shreds.last().unwrap().data_complete()); - let deshred_payload = Shredder::deshred(&data_shreds).map_err(|_| { - BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( - "Could not reconstruct data block from constituent shreds".to_string(), - ))) + let deshred_payload = Shredder::deshred(&data_shreds).map_err(|e| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!( + "Could not reconstruct data block from constituent shreds, error: {:?}", + e + )))) })?; debug!("{:?} shreds in last FEC set", data_shreds.len(),); @@ -2830,7 +2831,7 @@ pub mod tests { entry::{next_entry, next_entry_mut}, genesis_utils::{create_genesis_config, GenesisConfigInfo}, leader_schedule::{FixedSchedule, LeaderSchedule}, - shred::{max_ticks_per_n_shreds, DataShredHeader}, + shred::{max_ticks_per_n_shreds, DataShredHeader, NONCE_SHRED_PAYLOAD_SIZE}, }; use assert_matches::assert_matches; use bincode::serialize; @@ -2980,7 +2981,7 @@ pub mod tests { #[test] fn test_insert_get_bytes() { // Create enough entries to ensure there are at least two shreds created - let num_entries = max_ticks_per_n_shreds(1) + 1; + let num_entries = max_ticks_per_n_shreds(1, None) + 1; assert!(num_entries > 1); let (mut shreds, _) = make_slot_entries(0, 0, num_entries); @@ -3220,7 +3221,7 @@ pub mod tests { #[test] fn test_insert_data_shreds_basic() { // Create enough entries to ensure there are at least two shreds created - let num_entries = max_ticks_per_n_shreds(1) + 1; + let num_entries = max_ticks_per_n_shreds(1, None) + 1; assert!(num_entries > 1); let (mut shreds, entries) = make_slot_entries(0, 0, num_entries); @@ -3267,7 +3268,7 @@ pub mod tests { #[test] fn test_insert_data_shreds_reverse() { let num_shreds = 10; - let num_entries = max_ticks_per_n_shreds(num_shreds); + let num_entries = max_ticks_per_n_shreds(num_shreds, None); let (mut shreds, entries) = make_slot_entries(0, 0, num_entries); let num_shreds = shreds.len() as u64; @@ -3444,7 +3445,7 @@ pub mod tests { { let blockstore = Blockstore::open(&blockstore_path).unwrap(); // Create enough entries to ensure there are at least two shreds created - let min_entries = max_ticks_per_n_shreds(1) + 1; + let min_entries = max_ticks_per_n_shreds(1, None) + 1; for i in 0..4 { let slot = i; let parent_slot = if i == 0 { 0 } else { i - 1 }; @@ -3871,7 +3872,7 @@ pub mod tests { let blockstore = Blockstore::open(&blockstore_path).unwrap(); let num_slots = 15; // Create enough entries to ensure there are at least two shreds created - let entries_per_slot = max_ticks_per_n_shreds(1) + 1; + let entries_per_slot = max_ticks_per_n_shreds(1, None) + 1; assert!(entries_per_slot > 1); let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot); @@ -4241,7 +4242,7 @@ pub mod tests { let gap: u64 = 10; assert!(gap > 3); // Create enough entries to ensure there are at least two shreds created - let num_entries = max_ticks_per_n_shreds(1) + 1; + let num_entries = max_ticks_per_n_shreds(1, None) + 1; let entries = create_ticks(num_entries, 0, Hash::default()); let mut shreds = entries_to_test_shreds(entries, slot, 0, true, 0); let num_shreds = shreds.len(); @@ -4553,6 +4554,7 @@ pub mod tests { shred.clone(), DataShredHeader::default(), coding.clone(), + NONCE_SHRED_PAYLOAD_SIZE, ); // Insert a good coding shred @@ -4585,6 +4587,7 @@ pub mod tests { shred.clone(), DataShredHeader::default(), coding.clone(), + NONCE_SHRED_PAYLOAD_SIZE, ); let index = index_cf.get(shred.slot).unwrap().unwrap(); assert!(Blockstore::should_insert_coding_shred( @@ -4600,6 +4603,7 @@ pub mod tests { shred.clone(), DataShredHeader::default(), coding.clone(), + NONCE_SHRED_PAYLOAD_SIZE, ); let index = coding_shred.coding_header.position - 1; coding_shred.set_index(index as u32); @@ -4618,6 +4622,7 @@ pub mod tests { shred.clone(), DataShredHeader::default(), coding.clone(), + NONCE_SHRED_PAYLOAD_SIZE, ); coding_shred.coding_header.num_coding_shreds = 0; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); @@ -4634,6 +4639,7 @@ pub mod tests { shred.clone(), DataShredHeader::default(), coding.clone(), + NONCE_SHRED_PAYLOAD_SIZE, ); coding_shred.coding_header.num_coding_shreds = coding_shred.coding_header.position; let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); @@ -4651,6 +4657,7 @@ pub mod tests { shred.clone(), DataShredHeader::default(), coding.clone(), + NONCE_SHRED_PAYLOAD_SIZE, ); coding_shred.common_header.fec_set_index = std::u32::MAX - 1; coding_shred.coding_header.num_coding_shreds = 3; @@ -4683,6 +4690,7 @@ pub mod tests { shred.clone(), DataShredHeader::default(), coding.clone(), + NONCE_SHRED_PAYLOAD_SIZE, ); let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); coding_shred.set_slot(*last_root.read().unwrap()); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 0895568623..4093896d4e 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -9,7 +9,7 @@ use rayon::{ slice::ParallelSlice, ThreadPool, }; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize, Serializer}; use solana_metrics::datapoint_debug; use solana_perf::packet::Packet; use solana_rayon_threadlimit::get_thread_count; @@ -24,25 +24,33 @@ use std::mem::size_of; use std::{sync::Arc, time::Instant}; use thiserror::Error; +pub type Nonce = u32; + /// The following constants are computed by hand, and hardcoded. /// `test_shred_constants` ensures that the values are correct. /// Constants are used over lazy_static for performance reasons. pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; pub const SIZE_OF_DATA_SHRED_HEADER: usize = 3; +pub const SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD: usize = 2; pub const SIZE_OF_CODING_SHRED_HEADER: usize = 6; pub const SIZE_OF_SIGNATURE: usize = 64; pub const SIZE_OF_SHRED_TYPE: usize = 1; pub const SIZE_OF_SHRED_SLOT: usize = 8; pub const SIZE_OF_SHRED_INDEX: usize = 4; +pub const SIZE_OF_NONCE: usize = 4; pub const SIZE_OF_DATA_SHRED_IGNORED_TAIL: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER; pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE - SIZE_OF_COMMON_SHRED_HEADER - SIZE_OF_DATA_SHRED_HEADER - SIZE_OF_DATA_SHRED_IGNORED_TAIL; +pub const SIZE_OF_NONCE_DATA_SHRED_PAYLOAD: usize = + SIZE_OF_DATA_SHRED_PAYLOAD - SIZE_OF_NONCE - SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD; pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE; pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; +pub const NONCE_SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE; +pub const UNLOCK_NONCE_SLOT: Slot = 13_115_515; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) @@ -107,6 +115,20 @@ pub struct ShredCommonHeader { pub struct DataShredHeader { pub parent_offset: u16, pub flags: u8, + #[serde(skip_deserializing)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "option_as_u16_serialize")] + pub size: Option, +} + +#[allow(clippy::trivially_copy_pass_by_ref)] +fn option_as_u16_serialize(x: &Option, s: S) -> std::result::Result +where + S: Serializer, +{ + assert!(x.is_some()); + let num = x.unwrap(); + s.serialize_u16(num) } /// The coding shred header has FEC information @@ -168,7 +190,8 @@ impl Shred { version: u16, fec_set_index: u32, ) -> Self { - let mut payload = vec![0; PACKET_DATA_SIZE]; + let payload_size = Self::get_expected_payload_size_from_slot(slot); + let mut payload = vec![0; payload_size]; let common_header = ShredCommonHeader { slot, index, @@ -177,9 +200,20 @@ impl Shred { ..ShredCommonHeader::default() }; + let size = if Self::is_nonce_unlocked(slot) { + Some( + (data.map(|d| d.len()).unwrap_or(0) + + SIZE_OF_DATA_SHRED_HEADER + + SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD + + SIZE_OF_COMMON_SHRED_HEADER) as u16, + ) + } else { + None + }; let mut data_header = DataShredHeader { parent_offset, flags: reference_tick.min(SHRED_TICK_REFERENCE_MASK), + size, }; if is_last_data { @@ -198,9 +232,10 @@ impl Shred { &common_header, ) .expect("Failed to write header into shred buffer"); + let size_of_data_shred_header = Shredder::get_expected_data_header_size_from_slot(slot); Self::serialize_obj_into( &mut start, - SIZE_OF_DATA_SHRED_HEADER, + size_of_data_shred_header, &mut payload, &data_header, ) @@ -218,11 +253,21 @@ impl Shred { } } - pub fn new_from_serialized_shred(payload: Vec) -> Result { + pub fn new_from_serialized_shred(mut payload: Vec) -> Result { let mut start = 0; let common_header: ShredCommonHeader = Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?; + let slot = common_header.slot; + let expected_data_size = Self::get_expected_payload_size_from_slot(slot); + // Safe because any payload from the network must have passed through + // window service, which implies payload wll be of size + // PACKET_DATA_SIZE, and `expected_data_size` <= PACKET_DATA_SIZE. + // + // On the other hand, if this function is called locally, the payload size should match + // the `expected_data_size`. + assert!(payload.len() >= expected_data_size); + payload.truncate(expected_data_size); let shred = if common_header.shred_type == ShredType(CODING_SHRED) { let coding_header: CodingShredHeader = Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?; @@ -233,11 +278,14 @@ impl Shred { payload, } } else if common_header.shred_type == ShredType(DATA_SHRED) { + // This doesn't need to change since we skip deserialization of the + // "size" field in the header for now + let size_of_data_shred_header = SIZE_OF_DATA_SHRED_HEADER; let data_header: DataShredHeader = - Self::deserialize_obj(&mut start, SIZE_OF_DATA_SHRED_HEADER, &payload)?; + Self::deserialize_obj(&mut start, size_of_data_shred_header, &payload)?; if u64::from(data_header.parent_offset) > common_header.slot { return Err(ShredError::InvalidParentOffset { - slot: common_header.slot, + slot, parent_offset: data_header.parent_offset, }); } @@ -258,8 +306,10 @@ impl Shred { common_header: ShredCommonHeader, data_header: DataShredHeader, coding_header: CodingShredHeader, + payload_size: usize, ) -> Self { - let mut payload = vec![0; PACKET_DATA_SIZE]; + assert!(payload_size == NONCE_SHRED_PAYLOAD_SIZE || payload_size == PACKET_DATA_SIZE); + let mut payload = vec![0; payload_size]; let mut start = 0; Self::serialize_obj_into( &mut start, @@ -268,10 +318,15 @@ impl Shred { &common_header, ) .expect("Failed to write header into shred buffer"); + let expected_data_header_size = if payload_size == NONCE_SHRED_PAYLOAD_SIZE { + SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD + } else { + SIZE_OF_DATA_SHRED_HEADER + }; if common_header.shred_type == ShredType(DATA_SHRED) { Self::serialize_obj_into( &mut start, - SIZE_OF_DATA_SHRED_HEADER, + expected_data_header_size, &mut payload, &data_header, ) @@ -293,11 +348,13 @@ impl Shred { } } - pub fn new_empty_data_shred() -> Self { + pub fn new_empty_data_shred(payload_size: usize) -> Self { + assert!(payload_size == NONCE_SHRED_PAYLOAD_SIZE || payload_size == PACKET_DATA_SIZE); Self::new_empty_from_header( ShredCommonHeader::default(), DataShredHeader::default(), CodingShredHeader::default(), + payload_size, ) } @@ -403,6 +460,18 @@ impl Shred { self.signature() .verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..]) } + + pub fn is_nonce_unlocked(slot: Slot) -> bool { + slot > UNLOCK_NONCE_SLOT + } + + fn get_expected_payload_size_from_slot(slot: Slot) -> usize { + if Self::is_nonce_unlocked(slot) { + NONCE_SHRED_PAYLOAD_SIZE + } else { + PACKET_DATA_SIZE + } + } } #[derive(Debug)] @@ -467,7 +536,7 @@ impl Shredder { let now = Instant::now(); - let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD; + let no_header_size = Self::get_expected_data_shred_payload_size_from_slot(self.slot); let num_shreds = (serialized_shreds.len() + no_header_size - 1) / no_header_size; let last_shred_index = next_shred_index + num_shreds as u32 - 1; @@ -628,7 +697,8 @@ impl Shredder { let start_index = data_shred_batch[0].common_header.index; // All information after coding shred field in a data shred is encoded - let valid_data_len = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL; + let expected_payload_size = Shred::get_expected_payload_size_from_slot(slot); + let valid_data_len = expected_payload_size - SIZE_OF_DATA_SHRED_IGNORED_TAIL; let data_ptrs: Vec<_> = data_shred_batch .iter() .map(|data| &data.payload[..valid_data_len]) @@ -646,8 +716,12 @@ impl Shredder { i, version, ); - let shred = - Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header); + let shred = Shred::new_empty_from_header( + header, + DataShredHeader::default(), + coding_header, + expected_payload_size, + ); coding_shreds.push(shred.payload); }); @@ -701,7 +775,10 @@ impl Shredder { expected_index: usize, index_found: usize, present: &mut [bool], + payload_size: usize, ) -> Vec> { + // Safe to assert because `new_from_serialized_shred` guarantees the size + assert!(payload_size == NONCE_SHRED_PAYLOAD_SIZE || payload_size == PACKET_DATA_SIZE); let end_index = index_found.saturating_sub(1); // The index of current shred must be within the range of shreds that are being // recovered @@ -715,9 +792,9 @@ impl Shredder { .map(|missing| { present[missing.saturating_sub(first_index_in_fec_set)] = false; if missing < first_index_in_fec_set + num_data { - Shred::new_empty_data_shred().payload + Shred::new_empty_data_shred(payload_size).payload } else { - vec![0; PACKET_DATA_SIZE] + vec![0; payload_size] } }) .collect(); @@ -732,6 +809,8 @@ impl Shredder { first_code_index: usize, slot: Slot, ) -> std::result::Result, reed_solomon_erasure::Error> { + let expected_payload_size = + Self::verify_consistent_shred_payload_sizes(&"try_recovery()", &shreds)?; let mut recovered_data = vec![]; let fec_set_size = num_data + num_coding; @@ -751,6 +830,7 @@ impl Shredder { next_expected_index, index, &mut present, + expected_payload_size, ); blocks.push(shred.payload); next_expected_index = index + 1; @@ -767,6 +847,7 @@ impl Shredder { next_expected_index, first_index + fec_set_size, &mut present, + expected_payload_size, ); shred_bufs.append(&mut pending_shreds); @@ -777,7 +858,7 @@ impl Shredder { let session = Session::new(num_data, num_coding)?; - let valid_data_len = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL; + let valid_data_len = expected_payload_size - SIZE_OF_DATA_SHRED_IGNORED_TAIL; let coding_block_offset = SIZE_OF_CODING_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER; let mut blocks: Vec<(&mut [u8], bool)> = shred_bufs .iter_mut() @@ -822,8 +903,11 @@ impl Shredder { /// Combines all shreds to recreate the original buffer pub fn deshred(shreds: &[Shred]) -> std::result::Result, reed_solomon_erasure::Error> { let num_data = shreds.len(); - let data_shred_bufs = { + let expected_payload_size = + Self::verify_consistent_shred_payload_sizes(&"deshred()", shreds)?; + let (data_shred_bufs, slot) = { let first_index = shreds.first().unwrap().index() as usize; + let slot = shreds.first().unwrap().slot(); let last_shred = shreds.last().unwrap(); let last_index = if last_shred.data_complete() || last_shred.last_in_slot() { last_shred.index() as usize @@ -835,10 +919,32 @@ impl Shredder { return Err(reed_solomon_erasure::Error::TooFewDataShards); } - shreds.iter().map(|shred| &shred.payload).collect() + (shreds.iter().map(|shred| &shred.payload).collect(), slot) }; - Ok(Self::reassemble_payload(num_data, data_shred_bufs)) + let expected_data_header_size = Self::get_expected_data_header_size_from_slot(slot); + Ok(Self::reassemble_payload( + num_data, + data_shred_bufs, + expected_payload_size, + expected_data_header_size, + )) + } + + pub fn get_expected_data_shred_payload_size_from_slot(slot: Slot) -> usize { + if Shred::is_nonce_unlocked(slot) { + SIZE_OF_NONCE_DATA_SHRED_PAYLOAD + } else { + SIZE_OF_DATA_SHRED_PAYLOAD + } + } + + pub fn get_expected_data_header_size_from_slot(slot: Slot) -> usize { + if Shred::is_nonce_unlocked(slot) { + SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD + } else { + SIZE_OF_DATA_SHRED_HEADER + } } fn get_shred_index( @@ -854,26 +960,60 @@ impl Shredder { } } - fn reassemble_payload(num_data: usize, data_shred_bufs: Vec<&Vec>) -> Vec { - let valid_data_len = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL; + fn reassemble_payload( + num_data: usize, + data_shred_bufs: Vec<&Vec>, + expected_payload_size: usize, + expected_data_header_size: usize, + ) -> Vec { + let valid_data_len = expected_payload_size - SIZE_OF_DATA_SHRED_IGNORED_TAIL; data_shred_bufs[..num_data] .iter() .flat_map(|data| { - let offset = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER; + let offset = SIZE_OF_COMMON_SHRED_HEADER + expected_data_header_size; data[offset..valid_data_len].iter() }) .cloned() .collect() } + + fn verify_consistent_shred_payload_sizes( + caller: &str, + shreds: &[Shred], + ) -> std::result::Result { + if shreds.is_empty() { + return Err(reed_solomon_erasure::Error::TooFewShardsPresent); + } + let slot = shreds[0].slot(); + let expected_payload_size = Shred::get_expected_payload_size_from_slot(slot); + for shred in shreds { + if shred.payload.len() != expected_payload_size { + error!( + "{} Shreds for slot: {} are inconsistent sizes. One shred: {} Another shred: {}", + caller, + slot, + expected_payload_size, + shred.payload.len() + ); + return Err(reed_solomon_erasure::Error::IncorrectShardSize); + } + } + + Ok(expected_payload_size) + } } -pub fn max_ticks_per_n_shreds(num_shreds: u64) -> u64 { +pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option) -> u64 { let ticks = create_ticks(1, 0, Hash::default()); - max_entries_per_n_shred(&ticks[0], num_shreds) + max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size) } -pub fn max_entries_per_n_shred(entry: &Entry, num_shreds: u64) -> u64 { - let shred_data_size = SIZE_OF_DATA_SHRED_PAYLOAD as u64; +pub fn max_entries_per_n_shred( + entry: &Entry, + num_shreds: u64, + shred_data_size: Option, +) -> u64 { + let shred_data_size = shred_data_size.unwrap_or(SIZE_OF_NONCE_DATA_SHRED_PAYLOAD) as u64; let vec_size = bincode::serialized_size(&vec![entry]).unwrap(); let entry_size = bincode::serialized_size(entry).unwrap(); let count_size = vec_size - entry_size; @@ -891,7 +1031,8 @@ pub fn verify_test_data_shred( is_last_in_slot: bool, is_last_in_fec_set: bool, ) { - assert_eq!(shred.payload.len(), PACKET_DATA_SIZE); + let expected_payload_size = Shred::get_expected_payload_size_from_slot(slot); + assert_eq!(shred.payload.len(), expected_payload_size); assert!(shred.is_data()); assert_eq!(shred.index(), index); assert_eq!(shred.slot(), slot); @@ -932,6 +1073,14 @@ pub mod tests { SIZE_OF_DATA_SHRED_HEADER, serialized_size(&DataShredHeader::default()).unwrap() as usize ); + let data_shred_header_with_size = DataShredHeader { + size: Some(1000), + ..DataShredHeader::default() + }; + assert_eq!( + SIZE_OF_DATA_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER_SIZE_FIELD, + serialized_size(&data_shred_header_with_size).unwrap() as usize + ); assert_eq!( SIZE_OF_SIGNATURE, bincode::serialized_size(&Signature::default()).unwrap() as usize @@ -951,17 +1100,16 @@ pub mod tests { } fn verify_test_code_shred(shred: &Shred, index: u32, slot: Slot, pk: &Pubkey, verify: bool) { - assert_eq!(shred.payload.len(), PACKET_DATA_SIZE); + let expected_payload_size = Shred::get_expected_payload_size_from_slot(slot); + assert_eq!(shred.payload.len(), expected_payload_size); assert!(!shred.is_data()); assert_eq!(shred.index(), index); assert_eq!(shred.slot(), slot); assert_eq!(verify, shred.verify(pk)); } - #[test] - fn test_data_shredder() { + fn run_test_data_shredder(slot: Slot) { let keypair = Arc::new(Keypair::new()); - let slot = 0x123456789abcdef0; // Test that parent cannot be > current slot assert_matches!( @@ -996,7 +1144,7 @@ pub mod tests { .collect(); let size = serialized_size(&entries).unwrap(); - let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD as u64; + let no_header_size = Shredder::get_expected_data_shred_payload_size_from_slot(slot) as u64; let num_expected_data_shreds = (size + no_header_size - 1) / no_header_size; let num_expected_coding_shreds = Shredder::calculate_num_coding_shreds(num_expected_data_shreds as f32, fec_rate); @@ -1051,6 +1199,12 @@ pub mod tests { assert_eq!(entries, deshred_entries); } + #[test] + fn test_data_shredder() { + run_test_data_shredder(UNLOCK_NONCE_SLOT); + run_test_data_shredder(UNLOCK_NONCE_SLOT + 1); + } + #[test] fn test_deserialize_shred_payload() { let keypair = Arc::new(Keypair::new()); @@ -1077,12 +1231,10 @@ pub mod tests { assert_eq!(deserialized_shred, *data_shreds.last().unwrap()); } - #[test] - fn test_shred_reference_tick() { + fn run_test_shred_reference_tick(slot: Slot) { let keypair = Arc::new(Keypair::new()); - let slot = 1; - let parent_slot = 0; + let parent_slot = slot - 1; let shredder = Shredder::new(slot, parent_slot, 0.0, keypair.clone(), 5, 0) .expect("Failed in creating shredder"); @@ -1107,6 +1259,12 @@ pub mod tests { assert_eq!(deserialized_shred.reference_tick(), 5); } + #[test] + fn test_shred_reference_tick() { + run_test_shred_reference_tick(UNLOCK_NONCE_SLOT); + run_test_shred_reference_tick(UNLOCK_NONCE_SLOT + 1); + } + #[test] fn test_shred_reference_tick_overflow() { let keypair = Arc::new(Keypair::new()); @@ -1143,22 +1301,21 @@ pub mod tests { ); } - #[test] - fn test_data_and_code_shredder() { + fn run_test_data_and_code_shredder(slot: Slot) { let keypair = Arc::new(Keypair::new()); - let slot = 0x123456789abcdef0; // Test that FEC rate cannot be > 1.0 assert_matches!( Shredder::new(slot, slot - 5, 1.001, keypair.clone(), 0, 0), Err(ShredError::InvalidFecRate(_)) ); - let shredder = Shredder::new(0x123456789abcdef0, slot - 5, 1.0, keypair.clone(), 0, 0) + let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0) .expect("Failed in creating shredder"); // Create enough entries to make > 1 shred - let num_entries = max_ticks_per_n_shreds(1) + 1; + let no_header_size = Shredder::get_expected_data_shred_payload_size_from_slot(slot); + let num_entries = max_ticks_per_n_shreds(1, Some(no_header_size)) + 1; let entries: Vec<_> = (0..num_entries) .map(|_| { let keypair0 = Keypair::new(); @@ -1190,9 +1347,13 @@ pub mod tests { } #[test] - fn test_recovery_and_reassembly() { + fn test_data_and_code_shredder() { + run_test_data_and_code_shredder(UNLOCK_NONCE_SLOT); + run_test_data_and_code_shredder(UNLOCK_NONCE_SLOT + 1); + } + + fn run_test_recovery_and_reassembly(slot: Slot) { let keypair = Arc::new(Keypair::new()); - let slot = 0x123456789abcdef0; let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0) .expect("Failed in creating shredder"); @@ -1202,7 +1363,9 @@ pub mod tests { let entry = Entry::new(&Hash::default(), 1, vec![tx0]); let num_data_shreds: usize = 5; - let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64); + let no_header_size = Shredder::get_expected_data_shred_payload_size_from_slot(slot); + let num_entries = + max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(no_header_size)); let entries: Vec<_> = (0..num_entries) .map(|_| { let keypair0 = Keypair::new(); @@ -1441,6 +1604,12 @@ pub mod tests { ); } + #[test] + fn test_recovery_and_reassembly() { + run_test_recovery_and_reassembly(UNLOCK_NONCE_SLOT); + run_test_recovery_and_reassembly(UNLOCK_NONCE_SLOT + 1); + } + #[test] fn test_shred_version() { let keypair = Arc::new(Keypair::new()); diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 7753054fe7..f6c8f73e29 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -1,5 +1,5 @@ #![allow(clippy::implicit_hasher)] -use crate::shred::ShredType; +use crate::shred::{Shred, ShredType, SIZE_OF_NONCE}; use rayon::{ iter::{ IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator, @@ -16,9 +16,12 @@ use solana_perf::{ sigverify::{self, batch_size, TxOffset}, }; use solana_rayon_threadlimit::get_thread_count; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::Signature; -use solana_sdk::signature::{Keypair, Signer}; +use solana_sdk::{ + clock::Slot, + pubkey::Pubkey, + signature::Signature, + signature::{Keypair, Signer}, +}; use std::sync::Arc; use std::{collections::HashMap, mem::size_of}; @@ -40,13 +43,12 @@ lazy_static! { /// ... /// } /// Signature is the first thing in the packet, and slot is the first thing in the signed message. -fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> Option { +pub fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> Option { let sig_start = 0; let sig_end = size_of::(); let slot_start = sig_end + size_of::(); let slot_end = slot_start + size_of::(); let msg_start = sig_end; - let msg_end = packet.meta.size; if packet.meta.discard { return Some(0); } @@ -55,6 +57,11 @@ fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> O return Some(0); } let slot: u64 = limited_deserialize(&packet.data[slot_start..slot_end]).ok()?; + let msg_end = if packet.meta.repair && Shred::is_nonce_unlocked(slot) { + packet.meta.size.saturating_sub(SIZE_OF_NONCE) + } else { + packet.meta.size + }; trace!("slot {}", slot); let pubkey = slot_leaders.get(&slot)?; if packet.meta.size < sig_end { @@ -94,10 +101,10 @@ fn slot_key_data_for_gpu< batches: &[Packets], slot_keys: &HashMap, recycler_cache: &RecyclerCache, -) -> (PinnedVec, TxOffset, usize) { +) -> (PinnedVec, TxOffset, usize, Vec>) { //TODO: mark Pubkey::default shreds as failed after the GPU returns assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default())); - let slots: Vec> = SIGVERIFY_THREAD_POOL.install(|| { + let slots: Vec> = SIGVERIFY_THREAD_POOL.install(|| { batches .into_par_iter() .map(|p| { @@ -157,7 +164,7 @@ fn slot_key_data_for_gpu< trace!("keyvec.len: {}", keyvec.len()); trace!("keyvec: {:?}", keyvec); trace!("offsets: {:?}", offsets); - (keyvec, offsets, num_in_packets) + (keyvec, offsets, num_in_packets, slots) } fn vec_size_in_packets(keyvec: &PinnedVec) -> usize { @@ -177,6 +184,7 @@ fn shred_gpu_offsets( mut pubkeys_end: usize, batches: &[Packets], recycler_cache: &RecyclerCache, + slots: Option>>, ) -> (TxOffset, TxOffset, TxOffset, Vec>) { let mut signature_offsets = recycler_cache.offsets().allocate("shred_signatures"); signature_offsets.set_pinnable(); @@ -185,13 +193,30 @@ fn shred_gpu_offsets( let mut msg_sizes = recycler_cache.offsets().allocate("shred_msg_sizes"); msg_sizes.set_pinnable(); let mut v_sig_lens = vec![]; - for batch in batches { + let mut slots_iter; + let mut slots_iter_ref: &mut dyn Iterator> = &mut std::iter::repeat(vec![]); + if let Some(slots) = slots { + slots_iter = slots.into_iter(); + slots_iter_ref = &mut slots_iter; + } + for (batch, slots) in batches.iter().zip(slots_iter_ref) { let mut sig_lens = Vec::new(); - for packet in &batch.packets { + let mut inner_slot_iter; + let mut inner_slot_iter_ref: &mut dyn Iterator = &mut std::iter::repeat(0); + if !slots.is_empty() { + inner_slot_iter = slots.into_iter(); + inner_slot_iter_ref = &mut inner_slot_iter; + }; + + for (packet, slot) in batch.packets.iter().zip(inner_slot_iter_ref) { let sig_start = pubkeys_end; let sig_end = sig_start + size_of::(); let msg_start = sig_end; - let msg_end = sig_start + packet.meta.size; + let msg_end = if packet.meta.repair && Shred::is_nonce_unlocked(slot) { + sig_start + packet.meta.size.saturating_sub(SIZE_OF_NONCE) + } else { + sig_start + packet.meta.size + }; signature_offsets.push(sig_start as u32); msg_start_offsets.push(msg_start as u32); let msg_size = if msg_end < msg_start { @@ -222,7 +247,7 @@ pub fn verify_shreds_gpu( let mut elems = Vec::new(); let mut rvs = Vec::new(); let count = batch_size(batches); - let (pubkeys, pubkey_offsets, mut num_packets) = + let (pubkeys, pubkey_offsets, mut num_packets, slots) = slot_key_data_for_gpu(0, batches, slot_leaders, recycler_cache); //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data @@ -230,7 +255,7 @@ pub fn verify_shreds_gpu( trace!("num_packets: {}", num_packets); trace!("pubkeys_len: {}", pubkeys_len); let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) = - shred_gpu_offsets(pubkeys_len, batches, recycler_cache); + shred_gpu_offsets(pubkeys_len, batches, recycler_cache, Some(slots)); let mut out = recycler_cache.buffer().allocate("out_buffer"); out.set_pinnable(); elems.push( @@ -367,7 +392,7 @@ pub fn sign_shreds_gpu( trace!("offset: {}", offset); let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) = - shred_gpu_offsets(offset, batches, recycler_cache); + shred_gpu_offsets(offset, batches, recycler_cache, None); let total_sigs = signature_offsets.len(); let mut signatures_out = recycler_cache.buffer().allocate("ed25519 signatures"); signatures_out.set_pinnable(); @@ -445,14 +470,12 @@ pub fn sign_shreds_gpu( #[cfg(test)] pub mod tests { use super::*; - use crate::shred::SIZE_OF_DATA_SHRED_PAYLOAD; - use crate::shred::{Shred, Shredder}; + use crate::shred::{Shred, Shredder, SIZE_OF_DATA_SHRED_PAYLOAD, UNLOCK_NONCE_SLOT}; use solana_sdk::signature::{Keypair, Signer}; - #[test] - fn test_sigverify_shred_cpu() { + + fn run_test_sigverify_shred_cpu(slot: Slot) { solana_logger::setup(); let mut packet = Packet::default(); - let slot = 0xdeadc0de; let mut shred = Shred::new_from_data( slot, 0xc0de, @@ -492,10 +515,14 @@ pub mod tests { } #[test] - fn test_sigverify_shreds_cpu() { + fn test_sigverify_shred_cpu() { + run_test_sigverify_shred_cpu(UNLOCK_NONCE_SLOT); + run_test_sigverify_shred_cpu(UNLOCK_NONCE_SLOT + 1); + } + + fn run_test_sigverify_shreds_cpu(slot: Slot) { solana_logger::setup(); let mut batch = [Packets::default()]; - let slot = 0xdeadc0de; let mut shred = Shred::new_from_data( slot, 0xc0de, @@ -542,12 +569,16 @@ pub mod tests { } #[test] - fn test_sigverify_shreds_gpu() { + fn test_sigverify_shreds_cpu() { + run_test_sigverify_shreds_cpu(UNLOCK_NONCE_SLOT); + run_test_sigverify_shreds_cpu(UNLOCK_NONCE_SLOT + 1); + } + + fn run_test_sigverify_shreds_gpu(slot: Slot) { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); let mut batch = [Packets::default()]; - let slot = 0xdeadc0de; let mut shred = Shred::new_from_data( slot, 0xc0de, @@ -603,14 +634,18 @@ pub mod tests { } #[test] - fn test_sigverify_shreds_sign_gpu() { + fn test_sigverify_shreds_gpu() { + run_test_sigverify_shreds_gpu(UNLOCK_NONCE_SLOT); + run_test_sigverify_shreds_gpu(UNLOCK_NONCE_SLOT + 1); + } + + fn run_test_sigverify_shreds_sign_gpu(slot: Slot) { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); let mut packets = Packets::default(); let num_packets = 32; let num_batches = 100; - let slot = 0xdeadc0de; packets.packets.resize(num_packets, Packet::default()); for (i, p) in packets.packets.iter_mut().enumerate() { let shred = Shred::new_from_data( @@ -650,11 +685,15 @@ pub mod tests { } #[test] - fn test_sigverify_shreds_sign_cpu() { + fn test_sigverify_shreds_sign_gpu() { + run_test_sigverify_shreds_sign_gpu(UNLOCK_NONCE_SLOT); + run_test_sigverify_shreds_sign_gpu(UNLOCK_NONCE_SLOT + 1); + } + + fn run_test_sigverify_shreds_sign_cpu(slot: Slot) { solana_logger::setup(); let mut batch = [Packets::default()]; - let slot = 0xdeadc0de; let keypair = Keypair::new(); let shred = Shred::new_from_data( slot, @@ -685,4 +724,10 @@ pub mod tests { let rv = verify_shreds_cpu(&batch, &pubkeys); assert_eq!(rv, vec![vec![1]]); } + + #[test] + fn test_sigverify_shreds_sign_cpu() { + run_test_sigverify_shreds_sign_cpu(UNLOCK_NONCE_SLOT); + run_test_sigverify_shreds_sign_cpu(UNLOCK_NONCE_SLOT + 1); + } } diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index 4168fda76f..f9012c7ec0 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -1,16 +1,15 @@ use solana_ledger::entry::Entry; use solana_ledger::shred::{ - max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, + max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder, + MAX_DATA_SHREDS_PER_FEC_BLOCK, UNLOCK_NONCE_SLOT, }; use solana_sdk::signature::{Keypair, Signer}; -use solana_sdk::{hash::Hash, system_transaction}; +use solana_sdk::{clock::Slot, hash::Hash, system_transaction}; use std::convert::TryInto; use std::sync::Arc; -#[test] -fn test_multi_fec_block_coding() { +fn run_test_multi_fec_block_coding(slot: Slot) { let keypair = Arc::new(Keypair::new()); - let slot = 0x123456789abcdef0; let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0) .expect("Failed in creating shredder"); @@ -20,7 +19,8 @@ fn test_multi_fec_block_coding() { let keypair1 = Keypair::new(); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); let entry = Entry::new(&Hash::default(), 1, vec![tx0]); - let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64); + let no_header_size = Shredder::get_expected_data_shred_payload_size_from_slot(slot); + let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(no_header_size)); let entries: Vec<_> = (0..num_entries) .map(|_| { @@ -94,3 +94,9 @@ fn test_multi_fec_block_coding() { let result = Shredder::deshred(&all_shreds[..]).unwrap(); assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); } + +#[test] +fn test_multi_fec_block_coding() { + run_test_multi_fec_block_coding(UNLOCK_NONCE_SLOT); + run_test_multi_fec_block_coding(UNLOCK_NONCE_SLOT + 1); +}