diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 68698ce12e..bb10023c47 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -27,10 +27,10 @@ use crate::{ result::{Error, Result}, weighted_shuffle::weighted_shuffle, }; - use rand::distributions::{Distribution, WeightedIndex}; use rand::{CryptoRng, Rng, SeedableRng}; use rand_chacha::ChaChaRng; +use solana_ledger::shred::Shred; use solana_sdk::sanitize::{Sanitize, SanitizeError}; use bincode::{serialize, serialized_size}; @@ -96,6 +96,7 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; /// is equal to PACKET_DATA_SIZE minus serialized size of an empty push /// message: Protocol::PushMessage(Pubkey::default(), Vec::default()) const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44; +const DUPLICATE_SHRED_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 115; /// Maximum number of hashes in SnapshotHashes/AccountsHashes a node publishes /// such that the serialized size of the push/pull message stays below /// PACKET_DATA_SIZE. @@ -403,7 +404,7 @@ pub fn make_accounts_hashes_message( type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "HAFjUDgiGthYTiAg6CYJxA8PqfwuhrC82NtHYYmee4vb")] +#[frozen_abi(digest = "DdTxrwwnbe571Di4rLtrAQorFDE58vYnmzzbaeQ7sQMC")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] enum Protocol { @@ -1126,6 +1127,17 @@ impl ClusterInfo { (labels, txs, max_ts) } + pub(crate) fn push_duplicate_shred(&self, shred: &Shred, other_payload: &[u8]) -> Result<()> { + self.gossip.write().unwrap().push_duplicate_shred( + &self.keypair, + shred, + other_payload, + None:: Option>, // Leader schedule + DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, + )?; + Ok(()) + } + pub fn get_accounts_hash_for_node(&self, pubkey: &Pubkey, map: F) -> Option where F: FnOnce(&Vec<(Slot, Hash)>) -> Y, @@ -3143,9 +3155,13 @@ pub fn stake_weight_peers( #[cfg(test)] mod tests { use super::*; - use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}; + use crate::{ + crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}, + duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS}, + }; use itertools::izip; use rand::seq::SliceRandom; + use solana_ledger::shred::Shredder; use solana_perf::test_tx::test_tx; use solana_sdk::signature::{Keypair, Signer}; use solana_vote_program::{vote_instruction, vote_state::Vote}; @@ -3433,6 +3449,53 @@ mod tests { ); } + #[test] + fn test_duplicate_shred_max_payload_size() { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let keypair = Keypair::new(); + let (slot, parent_slot, fec_rate, reference_tick, version) = + (53084024, 53084023, 0.0, 0, 0); + let shredder = Shredder::new( + slot, + parent_slot, + fec_rate, + leader.clone(), + reference_tick, + version, + ) + .unwrap(); + let next_shred_index = rng.gen(); + let shred = new_rand_shred(&mut rng, next_shred_index, &shredder); + let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder).payload; + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let chunks: Vec<_> = duplicate_shred::from_shred( + shred, + keypair.pubkey(), + other_payload, + Some(leader_schedule), + timestamp(), + DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, + ) + .unwrap() + .collect(); + assert!(chunks.len() > 1); + for chunk in chunks { + let data = CrdsData::DuplicateShred(MAX_DUPLICATE_SHREDS - 1, chunk); + let value = CrdsValue::new_signed(data, &keypair); + let pull_response = Protocol::PullResponse(keypair.pubkey(), vec![value.clone()]); + assert!(serialized_size(&pull_response).unwrap() < PACKET_DATA_SIZE as u64); + let push_message = Protocol::PushMessage(keypair.pubkey(), vec![value.clone()]); + assert!(serialized_size(&push_message).unwrap() < PACKET_DATA_SIZE as u64); + } + } + #[test] fn test_pull_response_min_serialized_size() { let mut rng = rand::thread_rng(); diff --git a/core/src/crds.rs b/core/src/crds.rs index 92fb09a7af..ac15dae6cd 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -42,9 +42,7 @@ use std::ops::{Index, IndexMut}; const CRDS_SHARDS_BITS: u32 = 8; // Limit number of crds values associated with each unique pubkey. This // excludes crds values which by label design are limited per each pubkey. -// TODO: Find the right value for this once duplicate shreds and corresponding -// votes are broadcasted over gossip. -const MAX_CRDS_VALUES_PER_PUBKEY: usize = 512; +const MAX_CRDS_VALUES_PER_PUBKEY: usize = 32; #[derive(Clone)] pub struct Crds { @@ -232,6 +230,15 @@ impl Crds { self.votes.iter().map(move |i| self.table.index(*i)) } + /// Returns all records associated with a pubkey. + pub(crate) fn get_records(&self, pubkey: &Pubkey) -> impl Iterator { + self.records + .get(pubkey) + .into_iter() + .flat_map(|records| records.into_iter()) + .map(move |i| self.table.index(*i)) + } + pub fn len(&self) -> usize { self.table.len() } diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 48d585a440..f528b0e241 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -8,10 +8,17 @@ use crate::{ crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats}, crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}, - crds_value::{CrdsValue, CrdsValueLabel}, + crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, + duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS}, }; use rayon::ThreadPool; -use solana_sdk::{hash::Hash, pubkey::Pubkey}; +use solana_ledger::shred::Shred; +use solana_sdk::{ + hash::Hash, + pubkey::Pubkey, + signature::{Keypair, Signer}, + timing::timestamp, +}; use std::collections::{HashMap, HashSet}; ///The min size for bloom filters @@ -105,6 +112,68 @@ impl CrdsGossip { (self.id, push_messages) } + pub(crate) fn push_duplicate_shred( + &mut self, + keypair: &Keypair, + shred: &Shred, + other_payload: &[u8], + leader_schedule: Option, + // Maximum serialized size of each DuplicateShred chunk payload. + max_payload_size: usize, + ) -> Result<(), duplicate_shred::Error> { + let pubkey = keypair.pubkey(); + // Skip if there are already records of duplicate shreds for this slot. + let shred_slot = shred.slot(); + if self + .crds + .get_records(&pubkey) + .any(|value| match &value.value.data { + CrdsData::DuplicateShred(_, value) => value.slot == shred_slot, + _ => false, + }) + { + return Ok(()); + } + let chunks = duplicate_shred::from_shred( + shred.clone(), + pubkey, + Vec::from(other_payload), + leader_schedule, + timestamp(), + max_payload_size, + )?; + // Find the index of oldest duplicate shred. + let mut num_dup_shreds = 0; + let offset = self + .crds + .get_records(&pubkey) + .filter_map(|value| match &value.value.data { + CrdsData::DuplicateShred(ix, value) => { + num_dup_shreds += 1; + Some((value.wallclock, *ix)) + } + _ => None, + }) + .min() // Override the oldest records. + .map(|(_ /*wallclock*/, ix)| ix) + .unwrap_or(0); + let offset = if num_dup_shreds < MAX_DUPLICATE_SHREDS { + num_dup_shreds + } else { + offset + }; + let entries = chunks + .enumerate() + .map(|(k, chunk)| { + let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS; + let data = CrdsData::DuplicateShred(index, chunk); + CrdsValue::new_signed(data, keypair) + }) + .collect(); + self.process_push_message(&pubkey, entries, timestamp()); + Ok(()) + } + /// add the `from` to the peer's filter of nodes pub fn process_prune_msg( &self, diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 9104caf128..5daca9943a 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -2,7 +2,7 @@ use crate::{ cluster_info::MAX_SNAPSHOT_HASHES, contact_info::ContactInfo, deprecated, - duplicate_shred::{DuplicateShred, DuplicateShredIndex}, + duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, epoch_slots::EpochSlots, }; use bincode::{serialize, serialized_size}; @@ -83,7 +83,7 @@ pub enum CrdsData { LegacyVersion(LegacyVersion), Version(Version), NodeInstance(NodeInstance), - DuplicateShred(DuplicateShred), + DuplicateShred(DuplicateShredIndex, DuplicateShred), } impl Sanitize for CrdsData { @@ -113,7 +113,13 @@ impl Sanitize for CrdsData { CrdsData::LegacyVersion(version) => version.sanitize(), CrdsData::Version(version) => version.sanitize(), CrdsData::NodeInstance(node) => node.sanitize(), - CrdsData::DuplicateShred(shred) => shred.sanitize(), + CrdsData::DuplicateShred(ix, shred) => { + if *ix >= MAX_DUPLICATE_SHREDS { + Err(SanitizeError::ValueOutOfBounds) + } else { + shred.sanitize() + } + } } } } @@ -408,7 +414,7 @@ impl fmt::Display for CrdsValueLabel { CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()), CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()), CrdsValueLabel::NodeInstance(pk, token) => write!(f, "NodeInstance({}, {})", pk, token), - CrdsValueLabel::DuplicateShred(ix, pk) => write!(f, "DuplicateShred({:?}, {})", ix, pk), + CrdsValueLabel::DuplicateShred(ix, pk) => write!(f, "DuplicateShred({}, {})", ix, pk), } } } @@ -442,7 +448,7 @@ impl CrdsValueLabel { CrdsValueLabel::LegacyVersion(_) => Some(1), CrdsValueLabel::Version(_) => Some(1), CrdsValueLabel::NodeInstance(_, _) => None, - CrdsValueLabel::DuplicateShred(_, _) => None, + CrdsValueLabel::DuplicateShred(_, _) => Some(MAX_DUPLICATE_SHREDS as usize), } } } @@ -490,7 +496,7 @@ impl CrdsValue { CrdsData::LegacyVersion(version) => version.wallclock, CrdsData::Version(version) => version.wallclock, CrdsData::NodeInstance(node) => node.wallclock, - CrdsData::DuplicateShred(shred) => shred.wallclock, + CrdsData::DuplicateShred(_, shred) => shred.wallclock, } } pub fn pubkey(&self) -> Pubkey { @@ -504,7 +510,7 @@ impl CrdsValue { CrdsData::LegacyVersion(version) => version.from, CrdsData::Version(version) => version.from, CrdsData::NodeInstance(node) => node.from, - CrdsData::DuplicateShred(shred) => shred.from, + CrdsData::DuplicateShred(_, shred) => shred.from, } } pub fn label(&self) -> CrdsValueLabel { @@ -518,9 +524,7 @@ impl CrdsValue { CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()), CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()), CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from, node.token), - CrdsData::DuplicateShred(shred) => { - CrdsValueLabel::DuplicateShred(DuplicateShredIndex::from(shred), shred.from) - } + CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from), } } pub fn contact_info(&self) -> Option<&ContactInfo> { diff --git a/core/src/duplicate_shred.rs b/core/src/duplicate_shred.rs index ff86223485..b9193b9c8b 100644 --- a/core/src/duplicate_shred.rs +++ b/core/src/duplicate_shred.rs @@ -18,6 +18,9 @@ use thiserror::Error; const DUPLICATE_SHRED_HEADER_SIZE: usize = 63; +pub(crate) type DuplicateShredIndex = u16; +pub(crate) const MAX_DUPLICATE_SHREDS: DuplicateShredIndex = 512; + /// Function returning leader at a given slot. pub trait LeaderScheduleFn: FnOnce(Slot) -> Option {} impl LeaderScheduleFn for F where F: FnOnce(Slot) -> Option {} @@ -26,7 +29,7 @@ impl LeaderScheduleFn for F where F: FnOnce(Slot) -> Option {} pub struct DuplicateShred { pub(crate) from: Pubkey, pub(crate) wallclock: u64, - slot: Slot, + pub(crate) slot: Slot, shred_index: u32, shred_type: ShredType, // Serialized DuplicateSlotProof split into chunks. @@ -36,23 +39,10 @@ pub struct DuplicateShred { chunk: Vec, } -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct DuplicateShredIndex { - slot: Slot, - shred_index: u32, - shred_type: ShredType, - num_chunks: u8, - chunk_index: u8, -} - #[derive(Debug, Error)] pub enum Error { #[error("data chunk mismatch")] DataChunkMismatch, - #[error("decoding error")] - DecodingError(std::io::Error), - #[error("encoding error")] - EncodingError(std::io::Error), #[error("invalid chunk index")] InvalidChunkIndex, #[error("invalid duplicate shreds")] @@ -87,7 +77,7 @@ pub enum Error { // the same triplet of (slot, shred-index, and shred-type_), and // that they have valid signatures from the slot leader. fn check_shreds( - leader: impl LeaderScheduleFn, + leader_schedule: Option, shred1: &Shred, shred2: &Shred, ) -> Result<(), Error> { @@ -100,12 +90,13 @@ fn check_shreds( } else if shred1.payload == shred2.payload { Err(Error::InvalidDuplicateShreds) } else { - let slot_leader = leader(shred1.slot()).ok_or(Error::UnknownSlotLeader)?; - if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) { - Err(Error::InvalidSignature) - } else { - Ok(()) + if let Some(leader_schedule) = leader_schedule { + let slot_leader = leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader)?; + if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) { + return Err(Error::InvalidSignature); + } } + Ok(()) } } @@ -114,24 +105,65 @@ fn check_shreds( pub fn from_duplicate_slot_proof( proof: &DuplicateSlotProof, self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value. - leader: impl LeaderScheduleFn, + leader_schedule: Option, wallclock: u64, max_size: usize, // Maximum serialized size of each DuplicateShred. - encoder: impl FnOnce(Vec) -> Result, std::io::Error>, ) -> Result, Error> { if proof.shred1 == proof.shred2 { return Err(Error::InvalidDuplicateSlotProof); } let shred1 = Shred::new_from_serialized_shred(proof.shred1.clone())?; let shred2 = Shred::new_from_serialized_shred(proof.shred2.clone())?; - check_shreds(leader, &shred1, &shred2)?; + check_shreds(leader_schedule, &shred1, &shred2)?; let (slot, shred_index, shred_type) = ( shred1.slot(), shred1.index(), shred1.common_header.shred_type, ); let data = bincode::serialize(proof)?; - let data = encoder(data).map_err(Error::EncodingError)?; + let chunk_size = if DUPLICATE_SHRED_HEADER_SIZE < max_size { + max_size - DUPLICATE_SHRED_HEADER_SIZE + } else { + return Err(Error::InvalidSizeLimit); + }; + let chunks: Vec<_> = data.chunks(chunk_size).map(Vec::from).collect(); + let num_chunks = u8::try_from(chunks.len())?; + let chunks = chunks + .into_iter() + .enumerate() + .map(move |(i, chunk)| DuplicateShred { + from: self_pubkey, + wallclock, + slot, + shred_index, + shred_type, + num_chunks, + chunk_index: i as u8, + chunk, + }); + Ok(chunks) +} + +pub(crate) fn from_shred( + shred: Shred, + self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value. + other_payload: Vec, + leader_schedule: Option, + wallclock: u64, + max_size: usize, // Maximum serialized size of each DuplicateShred. +) -> Result, Error> { + if shred.payload == other_payload { + return Err(Error::InvalidDuplicateShreds); + } + let other_shred = Shred::new_from_serialized_shred(other_payload.clone())?; + check_shreds(leader_schedule, &shred, &other_shred)?; + let (slot, shred_index, shred_type) = + (shred.slot(), shred.index(), shred.common_header.shred_type); + let proof = DuplicateSlotProof { + shred1: shred.payload, + shred2: other_payload, + }; + let data = bincode::serialize(&proof)?; let chunk_size = if DUPLICATE_SHRED_HEADER_SIZE < max_size { max_size - DUPLICATE_SHRED_HEADER_SIZE } else { @@ -184,7 +216,6 @@ fn check_chunk( pub fn into_shreds( chunks: impl IntoIterator, leader: impl LeaderScheduleFn, - decoder: impl FnOnce(Vec) -> Result, std::io::Error>, ) -> Result<(Shred, Shred), Error> { let mut chunks = chunks.into_iter(); let DuplicateShred { @@ -219,8 +250,7 @@ pub fn into_shreds( if data.len() != num_chunks as usize { return Err(Error::MissingDataChunk); } - let data = (0..num_chunks).map(|k| data.remove(&k).unwrap()); - let data = decoder(data.concat()).map_err(Error::DecodingError)?; + let data = (0..num_chunks).map(|k| data.remove(&k).unwrap()).concat(); let proof: DuplicateSlotProof = bincode::deserialize(&data)?; if proof.shred1 == proof.shred2 { return Err(Error::InvalidDuplicateSlotProof); @@ -254,20 +284,8 @@ impl Sanitize for DuplicateShred { } } -impl From<&DuplicateShred> for DuplicateShredIndex { - fn from(shred: &DuplicateShred) -> Self { - Self { - slot: shred.slot, - shred_index: shred.shred_index, - shred_type: shred.shred_type, - num_chunks: shred.num_chunks, - chunk_index: shred.chunk_index, - } - } -} - #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use rand::Rng; use solana_ledger::{entry::Entry, shred::Shredder}; @@ -296,7 +314,11 @@ mod tests { ); } - fn new_rand_shred(rng: &mut R, next_shred_index: u32, shredder: &Shredder) -> Shred { + pub fn new_rand_shred( + rng: &mut R, + next_shred_index: u32, + shredder: &Shredder, + ) -> Shred { let entries: Vec<_> = std::iter::repeat_with(|| { let tx = system_transaction::transfer( &Keypair::new(), // from @@ -338,29 +360,25 @@ mod tests { let next_shred_index = rng.gen(); let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder); let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder); - let leader = |s| { + let leader_schedule = |s| { if s == slot { Some(leader.pubkey()) } else { None } }; - let proof = DuplicateSlotProof { - shred1: shred1.payload.clone(), - shred2: shred2.payload.clone(), - }; - let chunks: Vec<_> = from_duplicate_slot_proof( - &proof, + let chunks: Vec<_> = from_shred( + shred1.clone(), Pubkey::new_unique(), // self_pubkey - leader, + shred2.payload.clone(), + Some(leader_schedule), rng.gen(), // wallclock 512, // max_size - Ok, // encoder ) .unwrap() .collect(); assert!(chunks.len() > 4); - let (shred3, shred4) = into_shreds(chunks, leader, Ok).unwrap(); + let (shred3, shred4) = into_shreds(chunks, leader_schedule).unwrap(); assert_eq!(shred1, shred3); assert_eq!(shred2, shred4); } diff --git a/core/src/result.rs b/core/src/result.rs index f46ac16fbe..bca8f79544 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -1,7 +1,7 @@ //! The `result` module exposes a Result type that propagates one of many different Error types. -use crate::cluster_info; use crate::poh_recorder; +use crate::{cluster_info, duplicate_shred}; use solana_ledger::block_error; use solana_ledger::blockstore; use solana_runtime::snapshot_utils; @@ -33,6 +33,7 @@ pub enum Error { SnapshotError(snapshot_utils::SnapshotError), WeightedIndexError(rand::distributions::weighted::WeightedError), DuplicateNodeInstance, + DuplicateShredError(duplicate_shred::Error), } pub type Result = std::result::Result; @@ -150,6 +151,11 @@ impl std::convert::From for Error Error::WeightedIndexError(e) } } +impl std::convert::From for Error { + fn from(e: duplicate_shred::Error) -> Error { + Error::DuplicateShredError(e) + } +} #[cfg(test)] mod tests { diff --git a/core/src/window_service.rs b/core/src/window_service.rs index a56eb7cbcf..614c2f26c6 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -83,7 +83,8 @@ pub fn should_retransmit_and_persist( } fn run_check_duplicate( - blockstore: &Arc, + cluster_info: &ClusterInfo, + blockstore: &Blockstore, shred_receiver: &CrossbeamReceiver, ) -> Result<()> { let check_duplicate = |shred: Shred| -> Result<()> { @@ -94,6 +95,7 @@ fn run_check_duplicate( &shred.payload, shred.is_data(), ) { + cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?; blockstore.store_duplicate_slot( shred.slot(), existing_shred_payload, @@ -339,8 +341,12 @@ impl WindowService { let (insert_sender, insert_receiver) = unbounded(); let (duplicate_sender, duplicate_receiver) = unbounded(); - let t_check_duplicate = - Self::start_check_duplicate_thread(exit, &blockstore, duplicate_receiver); + let t_check_duplicate = Self::start_check_duplicate_thread( + cluster_info.clone(), + exit.clone(), + blockstore.clone(), + duplicate_receiver, + ); let t_insert = Self::start_window_insert_thread( exit, @@ -371,12 +377,11 @@ impl WindowService { } fn start_check_duplicate_thread( - exit: &Arc, - blockstore: &Arc, + cluster_info: Arc, + exit: Arc, + blockstore: Arc, duplicate_receiver: CrossbeamReceiver, ) -> JoinHandle<()> { - let exit = exit.clone(); - let blockstore = blockstore.clone(); let handle_error = || { inc_new_counter_error!("solana-check-duplicate-error", 1, 1); }; @@ -388,7 +393,8 @@ impl WindowService { } let mut noop = || {}; - if let Err(e) = run_check_duplicate(&blockstore, &duplicate_receiver) { + if let Err(e) = run_check_duplicate(&cluster_info, &blockstore, &duplicate_receiver) + { if Self::should_exit_on_error(e, &mut noop, &handle_error) { break; } @@ -551,6 +557,7 @@ impl WindowService { #[cfg(test)] mod test { use super::*; + use crate::contact_info::ContactInfo; use solana_ledger::{ blockstore::{make_many_slot_entries, Blockstore}, entry::{create_ticks, Entry}, @@ -563,6 +570,7 @@ mod test { epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, hash::Hash, signature::{Keypair, Signer}, + timing::timestamp, }; use std::sync::Arc; @@ -681,7 +689,10 @@ mod test { let duplicate_shred_slot = duplicate_shred.slot(); sender.send(duplicate_shred).unwrap(); assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot)); - run_check_duplicate(&blockstore, &receiver).unwrap(); + let keypair = Keypair::new(); + let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp()); + let cluster_info = ClusterInfo::new(contact_info, Arc::new(keypair)); + run_check_duplicate(&cluster_info, &blockstore, &receiver).unwrap(); assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot)); } }