broadcasts duplicate shreds through gossip (bp #14699) (#14812)

* broadcasts duplicate shreds through gossip (#14699)

(cherry picked from commit 491b059755)

# Conflicts:
#	core/src/cluster_info.rs

* removes backport merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2021-01-24 17:45:35 +00:00
committed by GitHub
parent c9da25836a
commit 65a7621b17
7 changed files with 257 additions and 79 deletions

View File

@ -27,10 +27,10 @@ use crate::{
result::{Error, Result},
weighted_shuffle::weighted_shuffle,
};
use rand::distributions::{Distribution, WeightedIndex};
use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_ledger::shred::Shred;
use solana_sdk::sanitize::{Sanitize, SanitizeError};
use bincode::{serialize, serialized_size};
@ -96,6 +96,7 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
/// is equal to PACKET_DATA_SIZE minus serialized size of an empty push
/// message: Protocol::PushMessage(Pubkey::default(), Vec::default())
const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44;
const DUPLICATE_SHRED_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 115;
/// Maximum number of hashes in SnapshotHashes/AccountsHashes a node publishes
/// such that the serialized size of the push/pull message stays below
/// PACKET_DATA_SIZE.
@ -403,7 +404,7 @@ pub fn make_accounts_hashes_message(
type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "HAFjUDgiGthYTiAg6CYJxA8PqfwuhrC82NtHYYmee4vb")]
#[frozen_abi(digest = "DdTxrwwnbe571Di4rLtrAQorFDE58vYnmzzbaeQ7sQMC")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
enum Protocol {
@ -1126,6 +1127,17 @@ impl ClusterInfo {
(labels, txs, max_ts)
}
pub(crate) fn push_duplicate_shred(&self, shred: &Shred, other_payload: &[u8]) -> Result<()> {
self.gossip.write().unwrap().push_duplicate_shred(
&self.keypair,
shred,
other_payload,
None::<fn(Slot) -> Option<Pubkey>>, // Leader schedule
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)?;
Ok(())
}
pub fn get_accounts_hash_for_node<F, Y>(&self, pubkey: &Pubkey, map: F) -> Option<Y>
where
F: FnOnce(&Vec<(Slot, Hash)>) -> Y,
@ -3143,9 +3155,13 @@ pub fn stake_weight_peers<S: std::hash::BuildHasher>(
#[cfg(test)]
mod tests {
use super::*;
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote};
use crate::{
crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote},
duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS},
};
use itertools::izip;
use rand::seq::SliceRandom;
use solana_ledger::shred::Shredder;
use solana_perf::test_tx::test_tx;
use solana_sdk::signature::{Keypair, Signer};
use solana_vote_program::{vote_instruction, vote_state::Vote};
@ -3433,6 +3449,53 @@ mod tests {
);
}
#[test]
fn test_duplicate_shred_max_payload_size() {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let keypair = Keypair::new();
let (slot, parent_slot, fec_rate, reference_tick, version) =
(53084024, 53084023, 0.0, 0, 0);
let shredder = Shredder::new(
slot,
parent_slot,
fec_rate,
leader.clone(),
reference_tick,
version,
)
.unwrap();
let next_shred_index = rng.gen();
let shred = new_rand_shred(&mut rng, next_shred_index, &shredder);
let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder).payload;
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};
let chunks: Vec<_> = duplicate_shred::from_shred(
shred,
keypair.pubkey(),
other_payload,
Some(leader_schedule),
timestamp(),
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap()
.collect();
assert!(chunks.len() > 1);
for chunk in chunks {
let data = CrdsData::DuplicateShred(MAX_DUPLICATE_SHREDS - 1, chunk);
let value = CrdsValue::new_signed(data, &keypair);
let pull_response = Protocol::PullResponse(keypair.pubkey(), vec![value.clone()]);
assert!(serialized_size(&pull_response).unwrap() < PACKET_DATA_SIZE as u64);
let push_message = Protocol::PushMessage(keypair.pubkey(), vec![value.clone()]);
assert!(serialized_size(&push_message).unwrap() < PACKET_DATA_SIZE as u64);
}
}
#[test]
fn test_pull_response_min_serialized_size() {
let mut rng = rand::thread_rng();

View File

@ -42,9 +42,7 @@ use std::ops::{Index, IndexMut};
const CRDS_SHARDS_BITS: u32 = 8;
// Limit number of crds values associated with each unique pubkey. This
// excludes crds values which by label design are limited per each pubkey.
// TODO: Find the right value for this once duplicate shreds and corresponding
// votes are broadcasted over gossip.
const MAX_CRDS_VALUES_PER_PUBKEY: usize = 512;
const MAX_CRDS_VALUES_PER_PUBKEY: usize = 32;
#[derive(Clone)]
pub struct Crds {
@ -232,6 +230,15 @@ impl Crds {
self.votes.iter().map(move |i| self.table.index(*i))
}
/// Returns all records associated with a pubkey.
pub(crate) fn get_records(&self, pubkey: &Pubkey) -> impl Iterator<Item = &VersionedCrdsValue> {
self.records
.get(pubkey)
.into_iter()
.flat_map(|records| records.into_iter())
.map(move |i| self.table.index(*i))
}
pub fn len(&self) -> usize {
self.table.len()
}

View File

@ -8,10 +8,17 @@ use crate::{
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
crds_value::{CrdsValue, CrdsValueLabel},
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
};
use rayon::ThreadPool;
use solana_sdk::{hash::Hash, pubkey::Pubkey};
use solana_ledger::shred::Shred;
use solana_sdk::{
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::timestamp,
};
use std::collections::{HashMap, HashSet};
///The min size for bloom filters
@ -105,6 +112,68 @@ impl CrdsGossip {
(self.id, push_messages)
}
pub(crate) fn push_duplicate_shred(
&mut self,
keypair: &Keypair,
shred: &Shred,
other_payload: &[u8],
leader_schedule: Option<impl LeaderScheduleFn>,
// Maximum serialized size of each DuplicateShred chunk payload.
max_payload_size: usize,
) -> Result<(), duplicate_shred::Error> {
let pubkey = keypair.pubkey();
// Skip if there are already records of duplicate shreds for this slot.
let shred_slot = shred.slot();
if self
.crds
.get_records(&pubkey)
.any(|value| match &value.value.data {
CrdsData::DuplicateShred(_, value) => value.slot == shred_slot,
_ => false,
})
{
return Ok(());
}
let chunks = duplicate_shred::from_shred(
shred.clone(),
pubkey,
Vec::from(other_payload),
leader_schedule,
timestamp(),
max_payload_size,
)?;
// Find the index of oldest duplicate shred.
let mut num_dup_shreds = 0;
let offset = self
.crds
.get_records(&pubkey)
.filter_map(|value| match &value.value.data {
CrdsData::DuplicateShred(ix, value) => {
num_dup_shreds += 1;
Some((value.wallclock, *ix))
}
_ => None,
})
.min() // Override the oldest records.
.map(|(_ /*wallclock*/, ix)| ix)
.unwrap_or(0);
let offset = if num_dup_shreds < MAX_DUPLICATE_SHREDS {
num_dup_shreds
} else {
offset
};
let entries = chunks
.enumerate()
.map(|(k, chunk)| {
let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS;
let data = CrdsData::DuplicateShred(index, chunk);
CrdsValue::new_signed(data, keypair)
})
.collect();
self.process_push_message(&pubkey, entries, timestamp());
Ok(())
}
/// add the `from` to the peer's filter of nodes
pub fn process_prune_msg(
&self,

View File

@ -2,7 +2,7 @@ use crate::{
cluster_info::MAX_SNAPSHOT_HASHES,
contact_info::ContactInfo,
deprecated,
duplicate_shred::{DuplicateShred, DuplicateShredIndex},
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
epoch_slots::EpochSlots,
};
use bincode::{serialize, serialized_size};
@ -83,7 +83,7 @@ pub enum CrdsData {
LegacyVersion(LegacyVersion),
Version(Version),
NodeInstance(NodeInstance),
DuplicateShred(DuplicateShred),
DuplicateShred(DuplicateShredIndex, DuplicateShred),
}
impl Sanitize for CrdsData {
@ -113,7 +113,13 @@ impl Sanitize for CrdsData {
CrdsData::LegacyVersion(version) => version.sanitize(),
CrdsData::Version(version) => version.sanitize(),
CrdsData::NodeInstance(node) => node.sanitize(),
CrdsData::DuplicateShred(shred) => shred.sanitize(),
CrdsData::DuplicateShred(ix, shred) => {
if *ix >= MAX_DUPLICATE_SHREDS {
Err(SanitizeError::ValueOutOfBounds)
} else {
shred.sanitize()
}
}
}
}
}
@ -408,7 +414,7 @@ impl fmt::Display for CrdsValueLabel {
CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()),
CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()),
CrdsValueLabel::NodeInstance(pk, token) => write!(f, "NodeInstance({}, {})", pk, token),
CrdsValueLabel::DuplicateShred(ix, pk) => write!(f, "DuplicateShred({:?}, {})", ix, pk),
CrdsValueLabel::DuplicateShred(ix, pk) => write!(f, "DuplicateShred({}, {})", ix, pk),
}
}
}
@ -442,7 +448,7 @@ impl CrdsValueLabel {
CrdsValueLabel::LegacyVersion(_) => Some(1),
CrdsValueLabel::Version(_) => Some(1),
CrdsValueLabel::NodeInstance(_, _) => None,
CrdsValueLabel::DuplicateShred(_, _) => None,
CrdsValueLabel::DuplicateShred(_, _) => Some(MAX_DUPLICATE_SHREDS as usize),
}
}
}
@ -490,7 +496,7 @@ impl CrdsValue {
CrdsData::LegacyVersion(version) => version.wallclock,
CrdsData::Version(version) => version.wallclock,
CrdsData::NodeInstance(node) => node.wallclock,
CrdsData::DuplicateShred(shred) => shred.wallclock,
CrdsData::DuplicateShred(_, shred) => shred.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
@ -504,7 +510,7 @@ impl CrdsValue {
CrdsData::LegacyVersion(version) => version.from,
CrdsData::Version(version) => version.from,
CrdsData::NodeInstance(node) => node.from,
CrdsData::DuplicateShred(shred) => shred.from,
CrdsData::DuplicateShred(_, shred) => shred.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
@ -518,9 +524,7 @@ impl CrdsValue {
CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()),
CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()),
CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from, node.token),
CrdsData::DuplicateShred(shred) => {
CrdsValueLabel::DuplicateShred(DuplicateShredIndex::from(shred), shred.from)
}
CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from),
}
}
pub fn contact_info(&self) -> Option<&ContactInfo> {

View File

@ -18,6 +18,9 @@ use thiserror::Error;
const DUPLICATE_SHRED_HEADER_SIZE: usize = 63;
pub(crate) type DuplicateShredIndex = u16;
pub(crate) const MAX_DUPLICATE_SHREDS: DuplicateShredIndex = 512;
/// Function returning leader at a given slot.
pub trait LeaderScheduleFn: FnOnce(Slot) -> Option<Pubkey> {}
impl<F> LeaderScheduleFn for F where F: FnOnce(Slot) -> Option<Pubkey> {}
@ -26,7 +29,7 @@ impl<F> LeaderScheduleFn for F where F: FnOnce(Slot) -> Option<Pubkey> {}
pub struct DuplicateShred {
pub(crate) from: Pubkey,
pub(crate) wallclock: u64,
slot: Slot,
pub(crate) slot: Slot,
shred_index: u32,
shred_type: ShredType,
// Serialized DuplicateSlotProof split into chunks.
@ -36,23 +39,10 @@ pub struct DuplicateShred {
chunk: Vec<u8>,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct DuplicateShredIndex {
slot: Slot,
shred_index: u32,
shred_type: ShredType,
num_chunks: u8,
chunk_index: u8,
}
#[derive(Debug, Error)]
pub enum Error {
#[error("data chunk mismatch")]
DataChunkMismatch,
#[error("decoding error")]
DecodingError(std::io::Error),
#[error("encoding error")]
EncodingError(std::io::Error),
#[error("invalid chunk index")]
InvalidChunkIndex,
#[error("invalid duplicate shreds")]
@ -87,7 +77,7 @@ pub enum Error {
// the same triplet of (slot, shred-index, and shred-type_), and
// that they have valid signatures from the slot leader.
fn check_shreds(
leader: impl LeaderScheduleFn,
leader_schedule: Option<impl LeaderScheduleFn>,
shred1: &Shred,
shred2: &Shred,
) -> Result<(), Error> {
@ -100,12 +90,13 @@ fn check_shreds(
} else if shred1.payload == shred2.payload {
Err(Error::InvalidDuplicateShreds)
} else {
let slot_leader = leader(shred1.slot()).ok_or(Error::UnknownSlotLeader)?;
if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
Err(Error::InvalidSignature)
} else {
Ok(())
if let Some(leader_schedule) = leader_schedule {
let slot_leader = leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader)?;
if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
return Err(Error::InvalidSignature);
}
}
Ok(())
}
}
@ -114,24 +105,65 @@ fn check_shreds(
pub fn from_duplicate_slot_proof(
proof: &DuplicateSlotProof,
self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value.
leader: impl LeaderScheduleFn,
leader_schedule: Option<impl LeaderScheduleFn>,
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
encoder: impl FnOnce(Vec<u8>) -> Result<Vec<u8>, std::io::Error>,
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
if proof.shred1 == proof.shred2 {
return Err(Error::InvalidDuplicateSlotProof);
}
let shred1 = Shred::new_from_serialized_shred(proof.shred1.clone())?;
let shred2 = Shred::new_from_serialized_shred(proof.shred2.clone())?;
check_shreds(leader, &shred1, &shred2)?;
check_shreds(leader_schedule, &shred1, &shred2)?;
let (slot, shred_index, shred_type) = (
shred1.slot(),
shred1.index(),
shred1.common_header.shred_type,
);
let data = bincode::serialize(proof)?;
let data = encoder(data).map_err(Error::EncodingError)?;
let chunk_size = if DUPLICATE_SHRED_HEADER_SIZE < max_size {
max_size - DUPLICATE_SHRED_HEADER_SIZE
} else {
return Err(Error::InvalidSizeLimit);
};
let chunks: Vec<_> = data.chunks(chunk_size).map(Vec::from).collect();
let num_chunks = u8::try_from(chunks.len())?;
let chunks = chunks
.into_iter()
.enumerate()
.map(move |(i, chunk)| DuplicateShred {
from: self_pubkey,
wallclock,
slot,
shred_index,
shred_type,
num_chunks,
chunk_index: i as u8,
chunk,
});
Ok(chunks)
}
pub(crate) fn from_shred(
shred: Shred,
self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value.
other_payload: Vec<u8>,
leader_schedule: Option<impl LeaderScheduleFn>,
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
if shred.payload == other_payload {
return Err(Error::InvalidDuplicateShreds);
}
let other_shred = Shred::new_from_serialized_shred(other_payload.clone())?;
check_shreds(leader_schedule, &shred, &other_shred)?;
let (slot, shred_index, shred_type) =
(shred.slot(), shred.index(), shred.common_header.shred_type);
let proof = DuplicateSlotProof {
shred1: shred.payload,
shred2: other_payload,
};
let data = bincode::serialize(&proof)?;
let chunk_size = if DUPLICATE_SHRED_HEADER_SIZE < max_size {
max_size - DUPLICATE_SHRED_HEADER_SIZE
} else {
@ -184,7 +216,6 @@ fn check_chunk(
pub fn into_shreds(
chunks: impl IntoIterator<Item = DuplicateShred>,
leader: impl LeaderScheduleFn,
decoder: impl FnOnce(Vec<u8>) -> Result<Vec<u8>, std::io::Error>,
) -> Result<(Shred, Shred), Error> {
let mut chunks = chunks.into_iter();
let DuplicateShred {
@ -219,8 +250,7 @@ pub fn into_shreds(
if data.len() != num_chunks as usize {
return Err(Error::MissingDataChunk);
}
let data = (0..num_chunks).map(|k| data.remove(&k).unwrap());
let data = decoder(data.concat()).map_err(Error::DecodingError)?;
let data = (0..num_chunks).map(|k| data.remove(&k).unwrap()).concat();
let proof: DuplicateSlotProof = bincode::deserialize(&data)?;
if proof.shred1 == proof.shred2 {
return Err(Error::InvalidDuplicateSlotProof);
@ -254,20 +284,8 @@ impl Sanitize for DuplicateShred {
}
}
impl From<&DuplicateShred> for DuplicateShredIndex {
fn from(shred: &DuplicateShred) -> Self {
Self {
slot: shred.slot,
shred_index: shred.shred_index,
shred_type: shred.shred_type,
num_chunks: shred.num_chunks,
chunk_index: shred.chunk_index,
}
}
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use rand::Rng;
use solana_ledger::{entry::Entry, shred::Shredder};
@ -296,7 +314,11 @@ mod tests {
);
}
fn new_rand_shred<R: Rng>(rng: &mut R, next_shred_index: u32, shredder: &Shredder) -> Shred {
pub fn new_rand_shred<R: Rng>(
rng: &mut R,
next_shred_index: u32,
shredder: &Shredder,
) -> Shred {
let entries: Vec<_> = std::iter::repeat_with(|| {
let tx = system_transaction::transfer(
&Keypair::new(), // from
@ -338,29 +360,25 @@ mod tests {
let next_shred_index = rng.gen();
let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder);
let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder);
let leader = |s| {
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};
let proof = DuplicateSlotProof {
shred1: shred1.payload.clone(),
shred2: shred2.payload.clone(),
};
let chunks: Vec<_> = from_duplicate_slot_proof(
&proof,
let chunks: Vec<_> = from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
leader,
shred2.payload.clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
Ok, // encoder
)
.unwrap()
.collect();
assert!(chunks.len() > 4);
let (shred3, shred4) = into_shreds(chunks, leader, Ok).unwrap();
let (shred3, shred4) = into_shreds(chunks, leader_schedule).unwrap();
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
}

View File

@ -1,7 +1,7 @@
//! The `result` module exposes a Result type that propagates one of many different Error types.
use crate::cluster_info;
use crate::poh_recorder;
use crate::{cluster_info, duplicate_shred};
use solana_ledger::block_error;
use solana_ledger::blockstore;
use solana_runtime::snapshot_utils;
@ -33,6 +33,7 @@ pub enum Error {
SnapshotError(snapshot_utils::SnapshotError),
WeightedIndexError(rand::distributions::weighted::WeightedError),
DuplicateNodeInstance,
DuplicateShredError(duplicate_shred::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
@ -150,6 +151,11 @@ impl std::convert::From<rand::distributions::weighted::WeightedError> for Error
Error::WeightedIndexError(e)
}
}
impl std::convert::From<duplicate_shred::Error> for Error {
fn from(e: duplicate_shred::Error) -> Error {
Error::DuplicateShredError(e)
}
}
#[cfg(test)]
mod tests {

View File

@ -83,7 +83,8 @@ pub fn should_retransmit_and_persist(
}
fn run_check_duplicate(
blockstore: &Arc<Blockstore>,
cluster_info: &ClusterInfo,
blockstore: &Blockstore,
shred_receiver: &CrossbeamReceiver<Shred>,
) -> Result<()> {
let check_duplicate = |shred: Shred| -> Result<()> {
@ -94,6 +95,7 @@ fn run_check_duplicate(
&shred.payload,
shred.is_data(),
) {
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
blockstore.store_duplicate_slot(
shred.slot(),
existing_shred_payload,
@ -339,8 +341,12 @@ impl WindowService {
let (insert_sender, insert_receiver) = unbounded();
let (duplicate_sender, duplicate_receiver) = unbounded();
let t_check_duplicate =
Self::start_check_duplicate_thread(exit, &blockstore, duplicate_receiver);
let t_check_duplicate = Self::start_check_duplicate_thread(
cluster_info.clone(),
exit.clone(),
blockstore.clone(),
duplicate_receiver,
);
let t_insert = Self::start_window_insert_thread(
exit,
@ -371,12 +377,11 @@ impl WindowService {
}
fn start_check_duplicate_thread(
exit: &Arc<AtomicBool>,
blockstore: &Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
duplicate_receiver: CrossbeamReceiver<Shred>,
) -> JoinHandle<()> {
let exit = exit.clone();
let blockstore = blockstore.clone();
let handle_error = || {
inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
};
@ -388,7 +393,8 @@ impl WindowService {
}
let mut noop = || {};
if let Err(e) = run_check_duplicate(&blockstore, &duplicate_receiver) {
if let Err(e) = run_check_duplicate(&cluster_info, &blockstore, &duplicate_receiver)
{
if Self::should_exit_on_error(e, &mut noop, &handle_error) {
break;
}
@ -551,6 +557,7 @@ impl WindowService {
#[cfg(test)]
mod test {
use super::*;
use crate::contact_info::ContactInfo;
use solana_ledger::{
blockstore::{make_many_slot_entries, Blockstore},
entry::{create_ticks, Entry},
@ -563,6 +570,7 @@ mod test {
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
hash::Hash,
signature::{Keypair, Signer},
timing::timestamp,
};
use std::sync::Arc;
@ -681,7 +689,10 @@ mod test {
let duplicate_shred_slot = duplicate_shred.slot();
sender.send(duplicate_shred).unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
run_check_duplicate(&blockstore, &receiver).unwrap();
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
let cluster_info = ClusterInfo::new(contact_info, Arc::new(keypair));
run_check_duplicate(&cluster_info, &blockstore, &receiver).unwrap();
assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
}
}