broadcasts duplicate shreds through gossip (#14699)
This commit is contained in:
@ -27,10 +27,10 @@ use crate::{
|
|||||||
result::{Error, Result},
|
result::{Error, Result},
|
||||||
weighted_shuffle::weighted_shuffle,
|
weighted_shuffle::weighted_shuffle,
|
||||||
};
|
};
|
||||||
|
|
||||||
use rand::distributions::{Distribution, WeightedIndex};
|
use rand::distributions::{Distribution, WeightedIndex};
|
||||||
use rand::{CryptoRng, Rng, SeedableRng};
|
use rand::{CryptoRng, Rng, SeedableRng};
|
||||||
use rand_chacha::ChaChaRng;
|
use rand_chacha::ChaChaRng;
|
||||||
|
use solana_ledger::shred::Shred;
|
||||||
use solana_sdk::sanitize::{Sanitize, SanitizeError};
|
use solana_sdk::sanitize::{Sanitize, SanitizeError};
|
||||||
|
|
||||||
use bincode::{serialize, serialized_size};
|
use bincode::{serialize, serialized_size};
|
||||||
@ -97,6 +97,7 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
|
|||||||
/// is equal to PACKET_DATA_SIZE minus serialized size of an empty push
|
/// is equal to PACKET_DATA_SIZE minus serialized size of an empty push
|
||||||
/// message: Protocol::PushMessage(Pubkey::default(), Vec::default())
|
/// message: Protocol::PushMessage(Pubkey::default(), Vec::default())
|
||||||
const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44;
|
const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44;
|
||||||
|
const DUPLICATE_SHRED_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 115;
|
||||||
/// Maximum number of hashes in SnapshotHashes/AccountsHashes a node publishes
|
/// Maximum number of hashes in SnapshotHashes/AccountsHashes a node publishes
|
||||||
/// such that the serialized size of the push/pull message stays below
|
/// such that the serialized size of the push/pull message stays below
|
||||||
/// PACKET_DATA_SIZE.
|
/// PACKET_DATA_SIZE.
|
||||||
@ -404,7 +405,7 @@ pub fn make_accounts_hashes_message(
|
|||||||
type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
|
type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
|
||||||
|
|
||||||
// TODO These messages should go through the gpu pipeline for spam filtering
|
// TODO These messages should go through the gpu pipeline for spam filtering
|
||||||
#[frozen_abi(digest = "HAFjUDgiGthYTiAg6CYJxA8PqfwuhrC82NtHYYmee4vb")]
|
#[frozen_abi(digest = "DdTxrwwnbe571Di4rLtrAQorFDE58vYnmzzbaeQ7sQMC")]
|
||||||
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
|
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
|
||||||
#[allow(clippy::large_enum_variant)]
|
#[allow(clippy::large_enum_variant)]
|
||||||
enum Protocol {
|
enum Protocol {
|
||||||
@ -1165,6 +1166,17 @@ impl ClusterInfo {
|
|||||||
(labels, txs, max_ts)
|
(labels, txs, max_ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn push_duplicate_shred(&self, shred: &Shred, other_payload: &[u8]) -> Result<()> {
|
||||||
|
self.gossip.write().unwrap().push_duplicate_shred(
|
||||||
|
&self.keypair,
|
||||||
|
shred,
|
||||||
|
other_payload,
|
||||||
|
None::<fn(Slot) -> Option<Pubkey>>, // Leader schedule
|
||||||
|
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_accounts_hash_for_node<F, Y>(&self, pubkey: &Pubkey, map: F) -> Option<Y>
|
pub fn get_accounts_hash_for_node<F, Y>(&self, pubkey: &Pubkey, map: F) -> Option<Y>
|
||||||
where
|
where
|
||||||
F: FnOnce(&Vec<(Slot, Hash)>) -> Y,
|
F: FnOnce(&Vec<(Slot, Hash)>) -> Y,
|
||||||
@ -3182,9 +3194,13 @@ pub fn stake_weight_peers<S: std::hash::BuildHasher>(
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote};
|
use crate::{
|
||||||
|
crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote},
|
||||||
|
duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS},
|
||||||
|
};
|
||||||
use itertools::izip;
|
use itertools::izip;
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
|
use solana_ledger::shred::Shredder;
|
||||||
use solana_sdk::signature::{Keypair, Signer};
|
use solana_sdk::signature::{Keypair, Signer};
|
||||||
use solana_vote_program::{vote_instruction, vote_state::Vote};
|
use solana_vote_program::{vote_instruction, vote_state::Vote};
|
||||||
use std::iter::repeat_with;
|
use std::iter::repeat_with;
|
||||||
@ -3471,6 +3487,53 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_duplicate_shred_max_payload_size() {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let leader = Arc::new(Keypair::new());
|
||||||
|
let keypair = Keypair::new();
|
||||||
|
let (slot, parent_slot, fec_rate, reference_tick, version) =
|
||||||
|
(53084024, 53084023, 0.0, 0, 0);
|
||||||
|
let shredder = Shredder::new(
|
||||||
|
slot,
|
||||||
|
parent_slot,
|
||||||
|
fec_rate,
|
||||||
|
leader.clone(),
|
||||||
|
reference_tick,
|
||||||
|
version,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let next_shred_index = rng.gen();
|
||||||
|
let shred = new_rand_shred(&mut rng, next_shred_index, &shredder);
|
||||||
|
let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder).payload;
|
||||||
|
let leader_schedule = |s| {
|
||||||
|
if s == slot {
|
||||||
|
Some(leader.pubkey())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let chunks: Vec<_> = duplicate_shred::from_shred(
|
||||||
|
shred,
|
||||||
|
keypair.pubkey(),
|
||||||
|
other_payload,
|
||||||
|
Some(leader_schedule),
|
||||||
|
timestamp(),
|
||||||
|
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.collect();
|
||||||
|
assert!(chunks.len() > 1);
|
||||||
|
for chunk in chunks {
|
||||||
|
let data = CrdsData::DuplicateShred(MAX_DUPLICATE_SHREDS - 1, chunk);
|
||||||
|
let value = CrdsValue::new_signed(data, &keypair);
|
||||||
|
let pull_response = Protocol::PullResponse(keypair.pubkey(), vec![value.clone()]);
|
||||||
|
assert!(serialized_size(&pull_response).unwrap() < PACKET_DATA_SIZE as u64);
|
||||||
|
let push_message = Protocol::PushMessage(keypair.pubkey(), vec![value.clone()]);
|
||||||
|
assert!(serialized_size(&push_message).unwrap() < PACKET_DATA_SIZE as u64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_pull_response_min_serialized_size() {
|
fn test_pull_response_min_serialized_size() {
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
|
@ -42,9 +42,7 @@ use std::ops::{Index, IndexMut};
|
|||||||
const CRDS_SHARDS_BITS: u32 = 8;
|
const CRDS_SHARDS_BITS: u32 = 8;
|
||||||
// Limit number of crds values associated with each unique pubkey. This
|
// Limit number of crds values associated with each unique pubkey. This
|
||||||
// excludes crds values which by label design are limited per each pubkey.
|
// excludes crds values which by label design are limited per each pubkey.
|
||||||
// TODO: Find the right value for this once duplicate shreds and corresponding
|
const MAX_CRDS_VALUES_PER_PUBKEY: usize = 32;
|
||||||
// votes are broadcasted over gossip.
|
|
||||||
const MAX_CRDS_VALUES_PER_PUBKEY: usize = 512;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Crds {
|
pub struct Crds {
|
||||||
@ -232,6 +230,15 @@ impl Crds {
|
|||||||
self.votes.iter().map(move |i| self.table.index(*i))
|
self.votes.iter().map(move |i| self.table.index(*i))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns all records associated with a pubkey.
|
||||||
|
pub(crate) fn get_records(&self, pubkey: &Pubkey) -> impl Iterator<Item = &VersionedCrdsValue> {
|
||||||
|
self.records
|
||||||
|
.get(pubkey)
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|records| records.into_iter())
|
||||||
|
.map(move |i| self.table.index(*i))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.table.len()
|
self.table.len()
|
||||||
}
|
}
|
||||||
|
@ -8,10 +8,17 @@ use crate::{
|
|||||||
crds_gossip_error::CrdsGossipError,
|
crds_gossip_error::CrdsGossipError,
|
||||||
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
|
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
|
||||||
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
|
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
|
||||||
crds_value::{CrdsValue, CrdsValueLabel},
|
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
|
||||||
|
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
|
||||||
};
|
};
|
||||||
use rayon::ThreadPool;
|
use rayon::ThreadPool;
|
||||||
use solana_sdk::{hash::Hash, pubkey::Pubkey};
|
use solana_ledger::shred::Shred;
|
||||||
|
use solana_sdk::{
|
||||||
|
hash::Hash,
|
||||||
|
pubkey::Pubkey,
|
||||||
|
signature::{Keypair, Signer},
|
||||||
|
timing::timestamp,
|
||||||
|
};
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
///The min size for bloom filters
|
///The min size for bloom filters
|
||||||
@ -105,6 +112,68 @@ impl CrdsGossip {
|
|||||||
(self.id, push_messages)
|
(self.id, push_messages)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn push_duplicate_shred(
|
||||||
|
&mut self,
|
||||||
|
keypair: &Keypair,
|
||||||
|
shred: &Shred,
|
||||||
|
other_payload: &[u8],
|
||||||
|
leader_schedule: Option<impl LeaderScheduleFn>,
|
||||||
|
// Maximum serialized size of each DuplicateShred chunk payload.
|
||||||
|
max_payload_size: usize,
|
||||||
|
) -> Result<(), duplicate_shred::Error> {
|
||||||
|
let pubkey = keypair.pubkey();
|
||||||
|
// Skip if there are already records of duplicate shreds for this slot.
|
||||||
|
let shred_slot = shred.slot();
|
||||||
|
if self
|
||||||
|
.crds
|
||||||
|
.get_records(&pubkey)
|
||||||
|
.any(|value| match &value.value.data {
|
||||||
|
CrdsData::DuplicateShred(_, value) => value.slot == shred_slot,
|
||||||
|
_ => false,
|
||||||
|
})
|
||||||
|
{
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let chunks = duplicate_shred::from_shred(
|
||||||
|
shred.clone(),
|
||||||
|
pubkey,
|
||||||
|
Vec::from(other_payload),
|
||||||
|
leader_schedule,
|
||||||
|
timestamp(),
|
||||||
|
max_payload_size,
|
||||||
|
)?;
|
||||||
|
// Find the index of oldest duplicate shred.
|
||||||
|
let mut num_dup_shreds = 0;
|
||||||
|
let offset = self
|
||||||
|
.crds
|
||||||
|
.get_records(&pubkey)
|
||||||
|
.filter_map(|value| match &value.value.data {
|
||||||
|
CrdsData::DuplicateShred(ix, value) => {
|
||||||
|
num_dup_shreds += 1;
|
||||||
|
Some((value.wallclock, *ix))
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
.min() // Override the oldest records.
|
||||||
|
.map(|(_ /*wallclock*/, ix)| ix)
|
||||||
|
.unwrap_or(0);
|
||||||
|
let offset = if num_dup_shreds < MAX_DUPLICATE_SHREDS {
|
||||||
|
num_dup_shreds
|
||||||
|
} else {
|
||||||
|
offset
|
||||||
|
};
|
||||||
|
let entries = chunks
|
||||||
|
.enumerate()
|
||||||
|
.map(|(k, chunk)| {
|
||||||
|
let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS;
|
||||||
|
let data = CrdsData::DuplicateShred(index, chunk);
|
||||||
|
CrdsValue::new_signed(data, keypair)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
self.process_push_message(&pubkey, entries, timestamp());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// add the `from` to the peer's filter of nodes
|
/// add the `from` to the peer's filter of nodes
|
||||||
pub fn process_prune_msg(
|
pub fn process_prune_msg(
|
||||||
&self,
|
&self,
|
||||||
|
@ -2,7 +2,7 @@ use crate::{
|
|||||||
cluster_info::MAX_SNAPSHOT_HASHES,
|
cluster_info::MAX_SNAPSHOT_HASHES,
|
||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
deprecated,
|
deprecated,
|
||||||
duplicate_shred::{DuplicateShred, DuplicateShredIndex},
|
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
|
||||||
epoch_slots::EpochSlots,
|
epoch_slots::EpochSlots,
|
||||||
};
|
};
|
||||||
use bincode::{serialize, serialized_size};
|
use bincode::{serialize, serialized_size};
|
||||||
@ -87,7 +87,7 @@ pub enum CrdsData {
|
|||||||
LegacyVersion(LegacyVersion),
|
LegacyVersion(LegacyVersion),
|
||||||
Version(Version),
|
Version(Version),
|
||||||
NodeInstance(NodeInstance),
|
NodeInstance(NodeInstance),
|
||||||
DuplicateShred(DuplicateShred),
|
DuplicateShred(DuplicateShredIndex, DuplicateShred),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Sanitize for CrdsData {
|
impl Sanitize for CrdsData {
|
||||||
@ -117,7 +117,13 @@ impl Sanitize for CrdsData {
|
|||||||
CrdsData::LegacyVersion(version) => version.sanitize(),
|
CrdsData::LegacyVersion(version) => version.sanitize(),
|
||||||
CrdsData::Version(version) => version.sanitize(),
|
CrdsData::Version(version) => version.sanitize(),
|
||||||
CrdsData::NodeInstance(node) => node.sanitize(),
|
CrdsData::NodeInstance(node) => node.sanitize(),
|
||||||
CrdsData::DuplicateShred(shred) => shred.sanitize(),
|
CrdsData::DuplicateShred(ix, shred) => {
|
||||||
|
if *ix >= MAX_DUPLICATE_SHREDS {
|
||||||
|
Err(SanitizeError::ValueOutOfBounds)
|
||||||
|
} else {
|
||||||
|
shred.sanitize()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -451,7 +457,7 @@ impl fmt::Display for CrdsValueLabel {
|
|||||||
CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()),
|
CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()),
|
||||||
CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()),
|
CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()),
|
||||||
CrdsValueLabel::NodeInstance(pk, token) => write!(f, "NodeInstance({}, {})", pk, token),
|
CrdsValueLabel::NodeInstance(pk, token) => write!(f, "NodeInstance({}, {})", pk, token),
|
||||||
CrdsValueLabel::DuplicateShred(ix, pk) => write!(f, "DuplicateShred({:?}, {})", ix, pk),
|
CrdsValueLabel::DuplicateShred(ix, pk) => write!(f, "DuplicateShred({}, {})", ix, pk),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -485,7 +491,7 @@ impl CrdsValueLabel {
|
|||||||
CrdsValueLabel::LegacyVersion(_) => Some(1),
|
CrdsValueLabel::LegacyVersion(_) => Some(1),
|
||||||
CrdsValueLabel::Version(_) => Some(1),
|
CrdsValueLabel::Version(_) => Some(1),
|
||||||
CrdsValueLabel::NodeInstance(_, _) => None,
|
CrdsValueLabel::NodeInstance(_, _) => None,
|
||||||
CrdsValueLabel::DuplicateShred(_, _) => None,
|
CrdsValueLabel::DuplicateShred(_, _) => Some(MAX_DUPLICATE_SHREDS as usize),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -533,7 +539,7 @@ impl CrdsValue {
|
|||||||
CrdsData::LegacyVersion(version) => version.wallclock,
|
CrdsData::LegacyVersion(version) => version.wallclock,
|
||||||
CrdsData::Version(version) => version.wallclock,
|
CrdsData::Version(version) => version.wallclock,
|
||||||
CrdsData::NodeInstance(node) => node.wallclock,
|
CrdsData::NodeInstance(node) => node.wallclock,
|
||||||
CrdsData::DuplicateShred(shred) => shred.wallclock,
|
CrdsData::DuplicateShred(_, shred) => shred.wallclock,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn pubkey(&self) -> Pubkey {
|
pub fn pubkey(&self) -> Pubkey {
|
||||||
@ -547,7 +553,7 @@ impl CrdsValue {
|
|||||||
CrdsData::LegacyVersion(version) => version.from,
|
CrdsData::LegacyVersion(version) => version.from,
|
||||||
CrdsData::Version(version) => version.from,
|
CrdsData::Version(version) => version.from,
|
||||||
CrdsData::NodeInstance(node) => node.from,
|
CrdsData::NodeInstance(node) => node.from,
|
||||||
CrdsData::DuplicateShred(shred) => shred.from,
|
CrdsData::DuplicateShred(_, shred) => shred.from,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn label(&self) -> CrdsValueLabel {
|
pub fn label(&self) -> CrdsValueLabel {
|
||||||
@ -561,9 +567,7 @@ impl CrdsValue {
|
|||||||
CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()),
|
CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()),
|
||||||
CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()),
|
CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()),
|
||||||
CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from, node.token),
|
CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(node.from, node.token),
|
||||||
CrdsData::DuplicateShred(shred) => {
|
CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from),
|
||||||
CrdsValueLabel::DuplicateShred(DuplicateShredIndex::from(shred), shred.from)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn contact_info(&self) -> Option<&ContactInfo> {
|
pub fn contact_info(&self) -> Option<&ContactInfo> {
|
||||||
|
@ -18,6 +18,9 @@ use thiserror::Error;
|
|||||||
|
|
||||||
const DUPLICATE_SHRED_HEADER_SIZE: usize = 63;
|
const DUPLICATE_SHRED_HEADER_SIZE: usize = 63;
|
||||||
|
|
||||||
|
pub(crate) type DuplicateShredIndex = u16;
|
||||||
|
pub(crate) const MAX_DUPLICATE_SHREDS: DuplicateShredIndex = 512;
|
||||||
|
|
||||||
/// Function returning leader at a given slot.
|
/// Function returning leader at a given slot.
|
||||||
pub trait LeaderScheduleFn: FnOnce(Slot) -> Option<Pubkey> {}
|
pub trait LeaderScheduleFn: FnOnce(Slot) -> Option<Pubkey> {}
|
||||||
impl<F> LeaderScheduleFn for F where F: FnOnce(Slot) -> Option<Pubkey> {}
|
impl<F> LeaderScheduleFn for F where F: FnOnce(Slot) -> Option<Pubkey> {}
|
||||||
@ -26,7 +29,7 @@ impl<F> LeaderScheduleFn for F where F: FnOnce(Slot) -> Option<Pubkey> {}
|
|||||||
pub struct DuplicateShred {
|
pub struct DuplicateShred {
|
||||||
pub(crate) from: Pubkey,
|
pub(crate) from: Pubkey,
|
||||||
pub(crate) wallclock: u64,
|
pub(crate) wallclock: u64,
|
||||||
slot: Slot,
|
pub(crate) slot: Slot,
|
||||||
shred_index: u32,
|
shred_index: u32,
|
||||||
shred_type: ShredType,
|
shred_type: ShredType,
|
||||||
// Serialized DuplicateSlotProof split into chunks.
|
// Serialized DuplicateSlotProof split into chunks.
|
||||||
@ -36,23 +39,10 @@ pub struct DuplicateShred {
|
|||||||
chunk: Vec<u8>,
|
chunk: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
|
||||||
pub struct DuplicateShredIndex {
|
|
||||||
slot: Slot,
|
|
||||||
shred_index: u32,
|
|
||||||
shred_type: ShredType,
|
|
||||||
num_chunks: u8,
|
|
||||||
chunk_index: u8,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error("data chunk mismatch")]
|
#[error("data chunk mismatch")]
|
||||||
DataChunkMismatch,
|
DataChunkMismatch,
|
||||||
#[error("decoding error")]
|
|
||||||
DecodingError(std::io::Error),
|
|
||||||
#[error("encoding error")]
|
|
||||||
EncodingError(std::io::Error),
|
|
||||||
#[error("invalid chunk index")]
|
#[error("invalid chunk index")]
|
||||||
InvalidChunkIndex,
|
InvalidChunkIndex,
|
||||||
#[error("invalid duplicate shreds")]
|
#[error("invalid duplicate shreds")]
|
||||||
@ -87,7 +77,7 @@ pub enum Error {
|
|||||||
// the same triplet of (slot, shred-index, and shred-type_), and
|
// the same triplet of (slot, shred-index, and shred-type_), and
|
||||||
// that they have valid signatures from the slot leader.
|
// that they have valid signatures from the slot leader.
|
||||||
fn check_shreds(
|
fn check_shreds(
|
||||||
leader: impl LeaderScheduleFn,
|
leader_schedule: Option<impl LeaderScheduleFn>,
|
||||||
shred1: &Shred,
|
shred1: &Shred,
|
||||||
shred2: &Shred,
|
shred2: &Shred,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
@ -100,12 +90,13 @@ fn check_shreds(
|
|||||||
} else if shred1.payload == shred2.payload {
|
} else if shred1.payload == shred2.payload {
|
||||||
Err(Error::InvalidDuplicateShreds)
|
Err(Error::InvalidDuplicateShreds)
|
||||||
} else {
|
} else {
|
||||||
let slot_leader = leader(shred1.slot()).ok_or(Error::UnknownSlotLeader)?;
|
if let Some(leader_schedule) = leader_schedule {
|
||||||
if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
|
let slot_leader = leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader)?;
|
||||||
Err(Error::InvalidSignature)
|
if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
|
||||||
} else {
|
return Err(Error::InvalidSignature);
|
||||||
Ok(())
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,24 +105,65 @@ fn check_shreds(
|
|||||||
pub fn from_duplicate_slot_proof(
|
pub fn from_duplicate_slot_proof(
|
||||||
proof: &DuplicateSlotProof,
|
proof: &DuplicateSlotProof,
|
||||||
self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value.
|
self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value.
|
||||||
leader: impl LeaderScheduleFn,
|
leader_schedule: Option<impl LeaderScheduleFn>,
|
||||||
wallclock: u64,
|
wallclock: u64,
|
||||||
max_size: usize, // Maximum serialized size of each DuplicateShred.
|
max_size: usize, // Maximum serialized size of each DuplicateShred.
|
||||||
encoder: impl FnOnce(Vec<u8>) -> Result<Vec<u8>, std::io::Error>,
|
|
||||||
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
|
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
|
||||||
if proof.shred1 == proof.shred2 {
|
if proof.shred1 == proof.shred2 {
|
||||||
return Err(Error::InvalidDuplicateSlotProof);
|
return Err(Error::InvalidDuplicateSlotProof);
|
||||||
}
|
}
|
||||||
let shred1 = Shred::new_from_serialized_shred(proof.shred1.clone())?;
|
let shred1 = Shred::new_from_serialized_shred(proof.shred1.clone())?;
|
||||||
let shred2 = Shred::new_from_serialized_shred(proof.shred2.clone())?;
|
let shred2 = Shred::new_from_serialized_shred(proof.shred2.clone())?;
|
||||||
check_shreds(leader, &shred1, &shred2)?;
|
check_shreds(leader_schedule, &shred1, &shred2)?;
|
||||||
let (slot, shred_index, shred_type) = (
|
let (slot, shred_index, shred_type) = (
|
||||||
shred1.slot(),
|
shred1.slot(),
|
||||||
shred1.index(),
|
shred1.index(),
|
||||||
shred1.common_header.shred_type,
|
shred1.common_header.shred_type,
|
||||||
);
|
);
|
||||||
let data = bincode::serialize(proof)?;
|
let data = bincode::serialize(proof)?;
|
||||||
let data = encoder(data).map_err(Error::EncodingError)?;
|
let chunk_size = if DUPLICATE_SHRED_HEADER_SIZE < max_size {
|
||||||
|
max_size - DUPLICATE_SHRED_HEADER_SIZE
|
||||||
|
} else {
|
||||||
|
return Err(Error::InvalidSizeLimit);
|
||||||
|
};
|
||||||
|
let chunks: Vec<_> = data.chunks(chunk_size).map(Vec::from).collect();
|
||||||
|
let num_chunks = u8::try_from(chunks.len())?;
|
||||||
|
let chunks = chunks
|
||||||
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(move |(i, chunk)| DuplicateShred {
|
||||||
|
from: self_pubkey,
|
||||||
|
wallclock,
|
||||||
|
slot,
|
||||||
|
shred_index,
|
||||||
|
shred_type,
|
||||||
|
num_chunks,
|
||||||
|
chunk_index: i as u8,
|
||||||
|
chunk,
|
||||||
|
});
|
||||||
|
Ok(chunks)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn from_shred(
|
||||||
|
shred: Shred,
|
||||||
|
self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value.
|
||||||
|
other_payload: Vec<u8>,
|
||||||
|
leader_schedule: Option<impl LeaderScheduleFn>,
|
||||||
|
wallclock: u64,
|
||||||
|
max_size: usize, // Maximum serialized size of each DuplicateShred.
|
||||||
|
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
|
||||||
|
if shred.payload == other_payload {
|
||||||
|
return Err(Error::InvalidDuplicateShreds);
|
||||||
|
}
|
||||||
|
let other_shred = Shred::new_from_serialized_shred(other_payload.clone())?;
|
||||||
|
check_shreds(leader_schedule, &shred, &other_shred)?;
|
||||||
|
let (slot, shred_index, shred_type) =
|
||||||
|
(shred.slot(), shred.index(), shred.common_header.shred_type);
|
||||||
|
let proof = DuplicateSlotProof {
|
||||||
|
shred1: shred.payload,
|
||||||
|
shred2: other_payload,
|
||||||
|
};
|
||||||
|
let data = bincode::serialize(&proof)?;
|
||||||
let chunk_size = if DUPLICATE_SHRED_HEADER_SIZE < max_size {
|
let chunk_size = if DUPLICATE_SHRED_HEADER_SIZE < max_size {
|
||||||
max_size - DUPLICATE_SHRED_HEADER_SIZE
|
max_size - DUPLICATE_SHRED_HEADER_SIZE
|
||||||
} else {
|
} else {
|
||||||
@ -184,7 +216,6 @@ fn check_chunk(
|
|||||||
pub fn into_shreds(
|
pub fn into_shreds(
|
||||||
chunks: impl IntoIterator<Item = DuplicateShred>,
|
chunks: impl IntoIterator<Item = DuplicateShred>,
|
||||||
leader: impl LeaderScheduleFn,
|
leader: impl LeaderScheduleFn,
|
||||||
decoder: impl FnOnce(Vec<u8>) -> Result<Vec<u8>, std::io::Error>,
|
|
||||||
) -> Result<(Shred, Shred), Error> {
|
) -> Result<(Shred, Shred), Error> {
|
||||||
let mut chunks = chunks.into_iter();
|
let mut chunks = chunks.into_iter();
|
||||||
let DuplicateShred {
|
let DuplicateShred {
|
||||||
@ -219,8 +250,7 @@ pub fn into_shreds(
|
|||||||
if data.len() != num_chunks as usize {
|
if data.len() != num_chunks as usize {
|
||||||
return Err(Error::MissingDataChunk);
|
return Err(Error::MissingDataChunk);
|
||||||
}
|
}
|
||||||
let data = (0..num_chunks).map(|k| data.remove(&k).unwrap());
|
let data = (0..num_chunks).map(|k| data.remove(&k).unwrap()).concat();
|
||||||
let data = decoder(data.concat()).map_err(Error::DecodingError)?;
|
|
||||||
let proof: DuplicateSlotProof = bincode::deserialize(&data)?;
|
let proof: DuplicateSlotProof = bincode::deserialize(&data)?;
|
||||||
if proof.shred1 == proof.shred2 {
|
if proof.shred1 == proof.shred2 {
|
||||||
return Err(Error::InvalidDuplicateSlotProof);
|
return Err(Error::InvalidDuplicateSlotProof);
|
||||||
@ -254,20 +284,8 @@ impl Sanitize for DuplicateShred {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<&DuplicateShred> for DuplicateShredIndex {
|
|
||||||
fn from(shred: &DuplicateShred) -> Self {
|
|
||||||
Self {
|
|
||||||
slot: shred.slot,
|
|
||||||
shred_index: shred.shred_index,
|
|
||||||
shred_type: shred.shred_type,
|
|
||||||
num_chunks: shred.num_chunks,
|
|
||||||
chunk_index: shred.chunk_index,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
pub(crate) mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use solana_ledger::{entry::Entry, shred::Shredder};
|
use solana_ledger::{entry::Entry, shred::Shredder};
|
||||||
@ -296,7 +314,11 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_rand_shred<R: Rng>(rng: &mut R, next_shred_index: u32, shredder: &Shredder) -> Shred {
|
pub fn new_rand_shred<R: Rng>(
|
||||||
|
rng: &mut R,
|
||||||
|
next_shred_index: u32,
|
||||||
|
shredder: &Shredder,
|
||||||
|
) -> Shred {
|
||||||
let entries: Vec<_> = std::iter::repeat_with(|| {
|
let entries: Vec<_> = std::iter::repeat_with(|| {
|
||||||
let tx = system_transaction::transfer(
|
let tx = system_transaction::transfer(
|
||||||
&Keypair::new(), // from
|
&Keypair::new(), // from
|
||||||
@ -338,29 +360,25 @@ mod tests {
|
|||||||
let next_shred_index = rng.gen();
|
let next_shred_index = rng.gen();
|
||||||
let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder);
|
let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder);
|
||||||
let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder);
|
let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder);
|
||||||
let leader = |s| {
|
let leader_schedule = |s| {
|
||||||
if s == slot {
|
if s == slot {
|
||||||
Some(leader.pubkey())
|
Some(leader.pubkey())
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let proof = DuplicateSlotProof {
|
let chunks: Vec<_> = from_shred(
|
||||||
shred1: shred1.payload.clone(),
|
shred1.clone(),
|
||||||
shred2: shred2.payload.clone(),
|
|
||||||
};
|
|
||||||
let chunks: Vec<_> = from_duplicate_slot_proof(
|
|
||||||
&proof,
|
|
||||||
Pubkey::new_unique(), // self_pubkey
|
Pubkey::new_unique(), // self_pubkey
|
||||||
leader,
|
shred2.payload.clone(),
|
||||||
|
Some(leader_schedule),
|
||||||
rng.gen(), // wallclock
|
rng.gen(), // wallclock
|
||||||
512, // max_size
|
512, // max_size
|
||||||
Ok, // encoder
|
|
||||||
)
|
)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.collect();
|
.collect();
|
||||||
assert!(chunks.len() > 4);
|
assert!(chunks.len() > 4);
|
||||||
let (shred3, shred4) = into_shreds(chunks, leader, Ok).unwrap();
|
let (shred3, shred4) = into_shreds(chunks, leader_schedule).unwrap();
|
||||||
assert_eq!(shred1, shred3);
|
assert_eq!(shred1, shred3);
|
||||||
assert_eq!(shred2, shred4);
|
assert_eq!(shred2, shred4);
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
//! The `result` module exposes a Result type that propagates one of many different Error types.
|
//! The `result` module exposes a Result type that propagates one of many different Error types.
|
||||||
|
|
||||||
use crate::cluster_info;
|
|
||||||
use crate::poh_recorder;
|
use crate::poh_recorder;
|
||||||
|
use crate::{cluster_info, duplicate_shred};
|
||||||
use solana_ledger::block_error;
|
use solana_ledger::block_error;
|
||||||
use solana_ledger::blockstore;
|
use solana_ledger::blockstore;
|
||||||
use solana_runtime::snapshot_utils;
|
use solana_runtime::snapshot_utils;
|
||||||
@ -33,6 +33,7 @@ pub enum Error {
|
|||||||
SnapshotError(snapshot_utils::SnapshotError),
|
SnapshotError(snapshot_utils::SnapshotError),
|
||||||
WeightedIndexError(rand::distributions::weighted::WeightedError),
|
WeightedIndexError(rand::distributions::weighted::WeightedError),
|
||||||
DuplicateNodeInstance,
|
DuplicateNodeInstance,
|
||||||
|
DuplicateShredError(duplicate_shred::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
@ -150,6 +151,11 @@ impl std::convert::From<rand::distributions::weighted::WeightedError> for Error
|
|||||||
Error::WeightedIndexError(e)
|
Error::WeightedIndexError(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl std::convert::From<duplicate_shred::Error> for Error {
|
||||||
|
fn from(e: duplicate_shred::Error) -> Error {
|
||||||
|
Error::DuplicateShredError(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
@ -83,7 +83,8 @@ pub fn should_retransmit_and_persist(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn run_check_duplicate(
|
fn run_check_duplicate(
|
||||||
blockstore: &Arc<Blockstore>,
|
cluster_info: &ClusterInfo,
|
||||||
|
blockstore: &Blockstore,
|
||||||
shred_receiver: &CrossbeamReceiver<Shred>,
|
shred_receiver: &CrossbeamReceiver<Shred>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let check_duplicate = |shred: Shred| -> Result<()> {
|
let check_duplicate = |shred: Shred| -> Result<()> {
|
||||||
@ -94,6 +95,7 @@ fn run_check_duplicate(
|
|||||||
&shred.payload,
|
&shred.payload,
|
||||||
shred.is_data(),
|
shred.is_data(),
|
||||||
) {
|
) {
|
||||||
|
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
|
||||||
blockstore.store_duplicate_slot(
|
blockstore.store_duplicate_slot(
|
||||||
shred.slot(),
|
shred.slot(),
|
||||||
existing_shred_payload,
|
existing_shred_payload,
|
||||||
@ -339,8 +341,12 @@ impl WindowService {
|
|||||||
let (insert_sender, insert_receiver) = unbounded();
|
let (insert_sender, insert_receiver) = unbounded();
|
||||||
let (duplicate_sender, duplicate_receiver) = unbounded();
|
let (duplicate_sender, duplicate_receiver) = unbounded();
|
||||||
|
|
||||||
let t_check_duplicate =
|
let t_check_duplicate = Self::start_check_duplicate_thread(
|
||||||
Self::start_check_duplicate_thread(exit, &blockstore, duplicate_receiver);
|
cluster_info.clone(),
|
||||||
|
exit.clone(),
|
||||||
|
blockstore.clone(),
|
||||||
|
duplicate_receiver,
|
||||||
|
);
|
||||||
|
|
||||||
let t_insert = Self::start_window_insert_thread(
|
let t_insert = Self::start_window_insert_thread(
|
||||||
exit,
|
exit,
|
||||||
@ -371,12 +377,11 @@ impl WindowService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn start_check_duplicate_thread(
|
fn start_check_duplicate_thread(
|
||||||
exit: &Arc<AtomicBool>,
|
cluster_info: Arc<ClusterInfo>,
|
||||||
blockstore: &Arc<Blockstore>,
|
exit: Arc<AtomicBool>,
|
||||||
|
blockstore: Arc<Blockstore>,
|
||||||
duplicate_receiver: CrossbeamReceiver<Shred>,
|
duplicate_receiver: CrossbeamReceiver<Shred>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
let exit = exit.clone();
|
|
||||||
let blockstore = blockstore.clone();
|
|
||||||
let handle_error = || {
|
let handle_error = || {
|
||||||
inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
|
inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
|
||||||
};
|
};
|
||||||
@ -388,7 +393,8 @@ impl WindowService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut noop = || {};
|
let mut noop = || {};
|
||||||
if let Err(e) = run_check_duplicate(&blockstore, &duplicate_receiver) {
|
if let Err(e) = run_check_duplicate(&cluster_info, &blockstore, &duplicate_receiver)
|
||||||
|
{
|
||||||
if Self::should_exit_on_error(e, &mut noop, &handle_error) {
|
if Self::should_exit_on_error(e, &mut noop, &handle_error) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -551,6 +557,7 @@ impl WindowService {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::contact_info::ContactInfo;
|
||||||
use solana_ledger::{
|
use solana_ledger::{
|
||||||
blockstore::{make_many_slot_entries, Blockstore},
|
blockstore::{make_many_slot_entries, Blockstore},
|
||||||
entry::{create_ticks, Entry},
|
entry::{create_ticks, Entry},
|
||||||
@ -563,6 +570,7 @@ mod test {
|
|||||||
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
|
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
|
||||||
hash::Hash,
|
hash::Hash,
|
||||||
signature::{Keypair, Signer},
|
signature::{Keypair, Signer},
|
||||||
|
timing::timestamp,
|
||||||
};
|
};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@ -681,7 +689,10 @@ mod test {
|
|||||||
let duplicate_shred_slot = duplicate_shred.slot();
|
let duplicate_shred_slot = duplicate_shred.slot();
|
||||||
sender.send(duplicate_shred).unwrap();
|
sender.send(duplicate_shred).unwrap();
|
||||||
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
|
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
|
||||||
run_check_duplicate(&blockstore, &receiver).unwrap();
|
let keypair = Keypair::new();
|
||||||
|
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());
|
||||||
|
let cluster_info = ClusterInfo::new(contact_info, Arc::new(keypair));
|
||||||
|
run_check_duplicate(&cluster_info, &blockstore, &receiver).unwrap();
|
||||||
assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
|
assert!(blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user