* skips retransmit for shreds with unknown slot leader (#19472)
Shreds' signatures should be verified before they reach retransmit
stage, and if the leader is unknown they should fail signature check.
Therefore retransmit-stage can as well expect to know who the slot
leader is and otherwise just skip the shred.
Blockstore checking signature of recovered shreds before sending them to
retransmit stage:
https://github.com/solana-labs/solana/blob/4305d4b7b/ledger/src/blockstore.rs#L884-L930
Shred signature verifier:
https://github.com/solana-labs/solana/blob/4305d4b7b/core/src/sigverify_shreds.rs#L41-L57
https://github.com/solana-labs/solana/blob/4305d4b7b/ledger/src/sigverify_shreds.rs#L105
(cherry picked from commit 6d9818b8e4
)
# Conflicts:
# core/src/broadcast_stage/broadcast_duplicates_run.rs
# ledger/src/shred.rs
* removes backport merge conflicts
Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
@ -406,7 +406,7 @@ pub fn broadcast_shreds(
|
|||||||
let mut result = Ok(());
|
let mut result = Ok(());
|
||||||
let mut shred_select = Measure::start("shred_select");
|
let mut shred_select = Measure::start("shred_select");
|
||||||
// Only the leader broadcasts shreds.
|
// Only the leader broadcasts shreds.
|
||||||
let leader = Some(cluster_info.id());
|
let leader = cluster_info.id();
|
||||||
let (root_bank, working_bank) = {
|
let (root_bank, working_bank) = {
|
||||||
let bank_forks = bank_forks.read().unwrap();
|
let bank_forks = bank_forks.read().unwrap();
|
||||||
(bank_forks.root_bank(), bank_forks.working_bank())
|
(bank_forks.root_bank(), bank_forks.working_bank())
|
||||||
|
@ -122,25 +122,21 @@ impl ClusterNodes<RetransmitStage> {
|
|||||||
&self,
|
&self,
|
||||||
shred_seed: [u8; 32],
|
shred_seed: [u8; 32],
|
||||||
fanout: usize,
|
fanout: usize,
|
||||||
slot_leader: Option<Pubkey>,
|
slot_leader: Pubkey,
|
||||||
) -> (
|
) -> (
|
||||||
Vec<&ContactInfo>, // neighbors
|
Vec<&ContactInfo>, // neighbors
|
||||||
Vec<&ContactInfo>, // children
|
Vec<&ContactInfo>, // children
|
||||||
) {
|
) {
|
||||||
// Exclude leader from list of nodes.
|
// Exclude leader from list of nodes.
|
||||||
let index = self.index.iter().copied();
|
let (weights, index): (Vec<u64>, Vec<usize>) = if slot_leader == self.pubkey {
|
||||||
let (weights, index): (Vec<u64>, Vec<usize>) = match slot_leader {
|
|
||||||
None => {
|
|
||||||
error!("unknown leader for shred slot");
|
|
||||||
index.unzip()
|
|
||||||
}
|
|
||||||
Some(slot_leader) if slot_leader == self.pubkey => {
|
|
||||||
error!("retransmit from slot leader: {}", slot_leader);
|
error!("retransmit from slot leader: {}", slot_leader);
|
||||||
index.unzip()
|
self.index.iter().copied().unzip()
|
||||||
}
|
} else {
|
||||||
Some(slot_leader) => index
|
self.index
|
||||||
|
.iter()
|
||||||
.filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader)
|
.filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader)
|
||||||
.unzip(),
|
.copied()
|
||||||
|
.unzip()
|
||||||
};
|
};
|
||||||
let index: Vec<_> = {
|
let index: Vec<_> = {
|
||||||
let shuffle = weighted_shuffle(&weights, shred_seed);
|
let shuffle = weighted_shuffle(&weights, shred_seed);
|
||||||
@ -462,7 +458,7 @@ mod tests {
|
|||||||
let (neighbors_indices, children_indices) =
|
let (neighbors_indices, children_indices) =
|
||||||
compute_retransmit_peers(fanout, self_index, &shuffled_index);
|
compute_retransmit_peers(fanout, self_index, &shuffled_index);
|
||||||
let (neighbors, children) =
|
let (neighbors, children) =
|
||||||
cluster_nodes.get_retransmit_peers(shred_seed, fanout, Some(slot_leader));
|
cluster_nodes.get_retransmit_peers(shred_seed, fanout, slot_leader);
|
||||||
assert_eq!(children.len(), children_indices.len());
|
assert_eq!(children.len(), children_indices.len());
|
||||||
for (node, index) in children.into_iter().zip(children_indices) {
|
for (node, index) in children.into_iter().zip(children_indices) {
|
||||||
assert_eq!(*node, peers[index]);
|
assert_eq!(*node, peers[index]);
|
||||||
|
@ -292,7 +292,14 @@ fn retransmit(
|
|||||||
|
|
||||||
let mut compute_turbine_peers = Measure::start("turbine_start");
|
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||||
// TODO: consider using root-bank here for leader lookup!
|
// TODO: consider using root-bank here for leader lookup!
|
||||||
let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank));
|
// Shreds' signatures should be verified before they reach here, and if
|
||||||
|
// the leader is unknown they should fail signature check. So here we
|
||||||
|
// should expect to know the slot leader and otherwise skip the shred.
|
||||||
|
let slot_leader =
|
||||||
|
match leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)) {
|
||||||
|
Some(pubkey) => pubkey,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
let cluster_nodes =
|
let cluster_nodes =
|
||||||
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
|
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
|
||||||
let shred_seed = shred.seed(slot_leader, &root_bank);
|
let shred_seed = shred.seed(slot_leader, &root_bank);
|
||||||
|
@ -49,24 +49,25 @@
|
|||||||
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
|
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
|
||||||
//! payload can fit into one coding shred / packet.
|
//! payload can fit into one coding shred / packet.
|
||||||
|
|
||||||
use crate::{
|
use {
|
||||||
|
crate::{
|
||||||
blockstore::MAX_DATA_SHREDS_PER_SLOT,
|
blockstore::MAX_DATA_SHREDS_PER_SLOT,
|
||||||
entry::{create_ticks, Entry},
|
entry::{create_ticks, Entry},
|
||||||
erasure::Session,
|
erasure::Session,
|
||||||
};
|
},
|
||||||
use bincode::config::Options;
|
bincode::config::Options,
|
||||||
use core::cell::RefCell;
|
core::cell::RefCell,
|
||||||
use rayon::{
|
rayon::{
|
||||||
iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator},
|
iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator},
|
||||||
slice::ParallelSlice,
|
slice::ParallelSlice,
|
||||||
ThreadPool,
|
ThreadPool,
|
||||||
};
|
},
|
||||||
use serde::{Deserialize, Serialize};
|
serde::{Deserialize, Serialize},
|
||||||
use solana_measure::measure::Measure;
|
solana_measure::measure::Measure,
|
||||||
use solana_perf::packet::{limited_deserialize, Packet};
|
solana_perf::packet::{limited_deserialize, Packet},
|
||||||
use solana_rayon_threadlimit::get_thread_count;
|
solana_rayon_threadlimit::get_thread_count,
|
||||||
use solana_runtime::bank::Bank;
|
solana_runtime::bank::Bank,
|
||||||
use solana_sdk::{
|
solana_sdk::{
|
||||||
clock::Slot,
|
clock::Slot,
|
||||||
feature_set,
|
feature_set,
|
||||||
hash::hashv,
|
hash::hashv,
|
||||||
@ -74,9 +75,10 @@ use solana_sdk::{
|
|||||||
packet::PACKET_DATA_SIZE,
|
packet::PACKET_DATA_SIZE,
|
||||||
pubkey::Pubkey,
|
pubkey::Pubkey,
|
||||||
signature::{Keypair, Signature, Signer},
|
signature::{Keypair, Signature, Signer},
|
||||||
|
},
|
||||||
|
std::{convert::TryInto, mem::size_of, ops::Deref, sync::Arc},
|
||||||
|
thiserror::Error,
|
||||||
};
|
};
|
||||||
use std::{mem::size_of, ops::Deref, sync::Arc};
|
|
||||||
use thiserror::Error;
|
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
pub struct ProcessShredsStats {
|
pub struct ProcessShredsStats {
|
||||||
@ -469,25 +471,21 @@ impl Shred {
|
|||||||
self.common_header.signature
|
self.common_header.signature
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn seed(&self, leader_pubkey: Option<Pubkey>, root_bank: &Bank) -> [u8; 32] {
|
pub fn seed(&self, leader_pubkey: Pubkey, root_bank: &Bank) -> [u8; 32] {
|
||||||
if let Some(leader_pubkey) = leader_pubkey {
|
|
||||||
if enable_deterministic_seed(self.slot(), root_bank) {
|
if enable_deterministic_seed(self.slot(), root_bank) {
|
||||||
let h = hashv(&[
|
hashv(&[
|
||||||
&self.slot().to_le_bytes(),
|
&self.slot().to_le_bytes(),
|
||||||
&self.index().to_le_bytes(),
|
&self.index().to_le_bytes(),
|
||||||
&leader_pubkey.to_bytes(),
|
&leader_pubkey.to_bytes(),
|
||||||
]);
|
])
|
||||||
return h.to_bytes();
|
.to_bytes()
|
||||||
|
} else {
|
||||||
|
let signature = self.common_header.signature.as_ref();
|
||||||
|
let offset = signature.len().checked_sub(32).unwrap();
|
||||||
|
signature[offset..].try_into().unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut seed = [0; 32];
|
|
||||||
let seed_len = seed.len();
|
|
||||||
let sig = self.common_header.signature.as_ref();
|
|
||||||
seed[0..seed_len].copy_from_slice(&sig[(sig.len() - seed_len)..]);
|
|
||||||
seed
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_data(&self) -> bool {
|
pub fn is_data(&self) -> bool {
|
||||||
self.common_header.shred_type == ShredType(DATA_SHRED)
|
self.common_header.shred_type == ShredType(DATA_SHRED)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user