From 55ccff760406deab322f0962cae3016aff38d0d8 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 28 Sep 2021 15:26:30 +0000 Subject: [PATCH] skips retransmit for shreds with unknown slot leader (backport #19472) (#20291) * skips retransmit for shreds with unknown slot leader (#19472) Shreds' signatures should be verified before they reach retransmit stage, and if the leader is unknown they should fail signature check. Therefore retransmit-stage can as well expect to know who the slot leader is and otherwise just skip the shred. Blockstore checking signature of recovered shreds before sending them to retransmit stage: https://github.com/solana-labs/solana/blob/4305d4b7b/ledger/src/blockstore.rs#L884-L930 Shred signature verifier: https://github.com/solana-labs/solana/blob/4305d4b7b/core/src/sigverify_shreds.rs#L41-L57 https://github.com/solana-labs/solana/blob/4305d4b7b/ledger/src/sigverify_shreds.rs#L105 (cherry picked from commit 6d9818b8e4133faef3bc41cfdadab0fd1cf2e1f7) # Conflicts: # core/src/broadcast_stage/broadcast_duplicates_run.rs # ledger/src/shred.rs * removes backport merge conflicts Co-authored-by: behzad nouri --- core/src/broadcast_stage.rs | 2 +- core/src/cluster_nodes.rs | 24 +++++------ core/src/retransmit_stage.rs | 9 +++- ledger/src/shred.rs | 84 ++++++++++++++++++------------------ 4 files changed, 60 insertions(+), 59 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 99ebc67dfe..a128878fc2 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -406,7 +406,7 @@ pub fn broadcast_shreds( let mut result = Ok(()); let mut shred_select = Measure::start("shred_select"); // Only the leader broadcasts shreds. - let leader = Some(cluster_info.id()); + let leader = cluster_info.id(); let (root_bank, working_bank) = { let bank_forks = bank_forks.read().unwrap(); (bank_forks.root_bank(), bank_forks.working_bank()) diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 86c6906d4a..9927689561 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -122,25 +122,21 @@ impl ClusterNodes { &self, shred_seed: [u8; 32], fanout: usize, - slot_leader: Option, + slot_leader: Pubkey, ) -> ( Vec<&ContactInfo>, // neighbors Vec<&ContactInfo>, // children ) { // Exclude leader from list of nodes. - let index = self.index.iter().copied(); - let (weights, index): (Vec, Vec) = match slot_leader { - None => { - error!("unknown leader for shred slot"); - index.unzip() - } - Some(slot_leader) if slot_leader == self.pubkey => { - error!("retransmit from slot leader: {}", slot_leader); - index.unzip() - } - Some(slot_leader) => index + let (weights, index): (Vec, Vec) = if slot_leader == self.pubkey { + error!("retransmit from slot leader: {}", slot_leader); + self.index.iter().copied().unzip() + } else { + self.index + .iter() .filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader) - .unzip(), + .copied() + .unzip() }; let index: Vec<_> = { let shuffle = weighted_shuffle(&weights, shred_seed); @@ -462,7 +458,7 @@ mod tests { let (neighbors_indices, children_indices) = compute_retransmit_peers(fanout, self_index, &shuffled_index); let (neighbors, children) = - cluster_nodes.get_retransmit_peers(shred_seed, fanout, Some(slot_leader)); + cluster_nodes.get_retransmit_peers(shred_seed, fanout, slot_leader); assert_eq!(children.len(), children_indices.len()); for (node, index) in children.into_iter().zip(children_indices) { assert_eq!(*node, peers[index]); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index e092f6b8e3..0bbb179282 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -292,7 +292,14 @@ fn retransmit( let mut compute_turbine_peers = Measure::start("turbine_start"); // TODO: consider using root-bank here for leader lookup! - let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)); + // Shreds' signatures should be verified before they reach here, and if + // the leader is unknown they should fail signature check. So here we + // should expect to know the slot leader and otherwise skip the shred. + let slot_leader = + match leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)) { + Some(pubkey) => pubkey, + None => continue, + }; let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); let shred_seed = shred.seed(slot_leader, &root_bank); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 97bccff161..b0520e2440 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -49,34 +49,36 @@ //! So, given a) - c), we must restrict data shred's payload length such that the entire coding //! payload can fit into one coding shred / packet. -use crate::{ - blockstore::MAX_DATA_SHREDS_PER_SLOT, - entry::{create_ticks, Entry}, - erasure::Session, +use { + crate::{ + blockstore::MAX_DATA_SHREDS_PER_SLOT, + entry::{create_ticks, Entry}, + erasure::Session, + }, + bincode::config::Options, + core::cell::RefCell, + rayon::{ + iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}, + slice::ParallelSlice, + ThreadPool, + }, + serde::{Deserialize, Serialize}, + solana_measure::measure::Measure, + solana_perf::packet::{limited_deserialize, Packet}, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::bank::Bank, + solana_sdk::{ + clock::Slot, + feature_set, + hash::hashv, + hash::Hash, + packet::PACKET_DATA_SIZE, + pubkey::Pubkey, + signature::{Keypair, Signature, Signer}, + }, + std::{convert::TryInto, mem::size_of, ops::Deref, sync::Arc}, + thiserror::Error, }; -use bincode::config::Options; -use core::cell::RefCell; -use rayon::{ - iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}, - slice::ParallelSlice, - ThreadPool, -}; -use serde::{Deserialize, Serialize}; -use solana_measure::measure::Measure; -use solana_perf::packet::{limited_deserialize, Packet}; -use solana_rayon_threadlimit::get_thread_count; -use solana_runtime::bank::Bank; -use solana_sdk::{ - clock::Slot, - feature_set, - hash::hashv, - hash::Hash, - packet::PACKET_DATA_SIZE, - pubkey::Pubkey, - signature::{Keypair, Signature, Signer}, -}; -use std::{mem::size_of, ops::Deref, sync::Arc}; -use thiserror::Error; #[derive(Default, Clone)] pub struct ProcessShredsStats { @@ -469,23 +471,19 @@ impl Shred { self.common_header.signature } - pub fn seed(&self, leader_pubkey: Option, root_bank: &Bank) -> [u8; 32] { - if let Some(leader_pubkey) = leader_pubkey { - if enable_deterministic_seed(self.slot(), root_bank) { - let h = hashv(&[ - &self.slot().to_le_bytes(), - &self.index().to_le_bytes(), - &leader_pubkey.to_bytes(), - ]); - return h.to_bytes(); - } + pub fn seed(&self, leader_pubkey: Pubkey, root_bank: &Bank) -> [u8; 32] { + if enable_deterministic_seed(self.slot(), root_bank) { + hashv(&[ + &self.slot().to_le_bytes(), + &self.index().to_le_bytes(), + &leader_pubkey.to_bytes(), + ]) + .to_bytes() + } else { + let signature = self.common_header.signature.as_ref(); + let offset = signature.len().checked_sub(32).unwrap(); + signature[offset..].try_into().unwrap() } - - let mut seed = [0; 32]; - let seed_len = seed.len(); - let sig = self.common_header.signature.as_ref(); - seed[0..seed_len].copy_from_slice(&sig[(sig.len() - seed_len)..]); - seed } pub fn is_data(&self) -> bool {