Add updated duplicate broadcast test (#18506)
This commit is contained in:
@ -1,8 +1,9 @@
|
||||
//! A stage to broadcast data from a leader node to validators
|
||||
#![allow(clippy::rc_buffer)]
|
||||
use self::{
|
||||
broadcast_duplicates_run::BroadcastDuplicatesRun,
|
||||
broadcast_fake_shreds_run::BroadcastFakeShredsRun, broadcast_metrics::*,
|
||||
broadcast_duplicates_run::{BroadcastDuplicatesConfig, BroadcastDuplicatesRun},
|
||||
broadcast_fake_shreds_run::BroadcastFakeShredsRun,
|
||||
broadcast_metrics::*,
|
||||
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
|
||||
standard_broadcast_run::StandardBroadcastRun,
|
||||
};
|
||||
@ -34,7 +35,7 @@ use std::{
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
mod broadcast_duplicates_run;
|
||||
pub mod broadcast_duplicates_run;
|
||||
mod broadcast_fake_shreds_run;
|
||||
pub mod broadcast_metrics;
|
||||
pub(crate) mod broadcast_utils;
|
||||
@ -52,14 +53,6 @@ pub enum BroadcastStageReturnType {
|
||||
ChannelDisconnected,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone, Debug)]
|
||||
pub struct BroadcastDuplicatesConfig {
|
||||
/// Percentage of stake to send different version of slots to
|
||||
pub stake_partition: u8,
|
||||
/// Number of slots to wait before sending duplicate shreds
|
||||
pub duplicate_send_delay: usize,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone, Debug)]
|
||||
pub enum BroadcastStageType {
|
||||
Standard,
|
||||
|
@ -1,162 +1,206 @@
|
||||
use super::broadcast_utils::ReceiveResults;
|
||||
use super::*;
|
||||
use log::*;
|
||||
use solana_ledger::entry::{create_ticks, Entry, EntrySlice};
|
||||
use solana_ledger::shred::Shredder;
|
||||
use solana_ledger::{entry::Entry, shred::Shredder};
|
||||
use solana_runtime::blockhash_queue::BlockhashQueue;
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::fee_calculator::FeeCalculator;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::{Keypair, Signer};
|
||||
use solana_sdk::transaction::Transaction;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Mutex;
|
||||
use solana_sdk::{
|
||||
hash::Hash,
|
||||
signature::{Keypair, Signer},
|
||||
system_transaction,
|
||||
};
|
||||
|
||||
// Queue which facilitates delivering shreds with a delay
|
||||
type DelayedQueue = VecDeque<(Option<Pubkey>, Option<Vec<Shred>>)>;
|
||||
pub const MINIMUM_DUPLICATE_SLOT: Slot = 20;
|
||||
pub const DUPLICATE_RATE: usize = 10;
|
||||
|
||||
#[derive(PartialEq, Clone, Debug)]
|
||||
pub struct BroadcastDuplicatesConfig {
|
||||
/// Amount of stake (excluding the leader) to send different version of slots to.
|
||||
/// Note this is sampled from a list of stakes sorted least to greatest.
|
||||
pub stake_partition: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(super) struct BroadcastDuplicatesRun {
|
||||
config: BroadcastDuplicatesConfig,
|
||||
// Local queue for broadcast to track which duplicate blockhashes we've sent
|
||||
duplicate_queue: BlockhashQueue,
|
||||
// Shared queue between broadcast and transmit threads
|
||||
delayed_queue: Arc<Mutex<DelayedQueue>>,
|
||||
// Buffer for duplicate entries
|
||||
duplicate_entries_buffer: Vec<Entry>,
|
||||
last_duplicate_entry_hash: Hash,
|
||||
last_broadcast_slot: Slot,
|
||||
current_slot: Slot,
|
||||
next_shred_index: u32,
|
||||
shred_version: u16,
|
||||
recent_blockhash: Option<Hash>,
|
||||
prev_entry_hash: Option<Hash>,
|
||||
num_slots_broadcasted: usize,
|
||||
}
|
||||
|
||||
impl BroadcastDuplicatesRun {
|
||||
pub(super) fn new(shred_version: u16, config: BroadcastDuplicatesConfig) -> Self {
|
||||
let mut delayed_queue = DelayedQueue::new();
|
||||
delayed_queue.resize(config.duplicate_send_delay, (None, None));
|
||||
Self {
|
||||
config,
|
||||
delayed_queue: Arc::new(Mutex::new(delayed_queue)),
|
||||
duplicate_queue: BlockhashQueue::default(),
|
||||
duplicate_entries_buffer: vec![],
|
||||
next_shred_index: u32::MAX,
|
||||
last_broadcast_slot: 0,
|
||||
last_duplicate_entry_hash: Hash::default(),
|
||||
shred_version,
|
||||
current_slot: 0,
|
||||
recent_blockhash: None,
|
||||
prev_entry_hash: None,
|
||||
num_slots_broadcasted: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn queue_or_create_duplicate_entries(
|
||||
&mut self,
|
||||
bank: &Arc<Bank>,
|
||||
receive_results: &ReceiveResults,
|
||||
) -> (Vec<Entry>, u32) {
|
||||
// If the last entry hash is default, grab the last blockhash from the parent bank
|
||||
if self.last_duplicate_entry_hash == Hash::default() {
|
||||
self.last_duplicate_entry_hash = bank.last_blockhash();
|
||||
}
|
||||
fn get_non_partitioned_batches(
|
||||
&self,
|
||||
my_pubkey: &Pubkey,
|
||||
bank: &Bank,
|
||||
data_shreds: Arc<Vec<Shred>>,
|
||||
) -> TransmitShreds {
|
||||
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
||||
let mut stakes: HashMap<Pubkey, u64> = bank.epoch_staked_nodes(bank_epoch).unwrap();
|
||||
stakes.retain(|pubkey, _stake| pubkey != my_pubkey);
|
||||
(Some(Arc::new(stakes)), data_shreds)
|
||||
}
|
||||
|
||||
// Create duplicate entries by..
|
||||
// 1) rearranging real entries so that all transaction entries are moved to
|
||||
// the front and tick entries are moved to the back.
|
||||
// 2) setting all transaction entries to zero hashes and all tick entries to `hashes_per_tick`.
|
||||
// 3) removing any transactions which reference blockhashes which aren't in the
|
||||
// duplicate blockhash queue.
|
||||
let (duplicate_entries, next_shred_index) = if bank.slot() > MINIMUM_DUPLICATE_SLOT {
|
||||
let mut tx_entries: Vec<Entry> = receive_results
|
||||
.entries
|
||||
.iter()
|
||||
.filter_map(|entry| {
|
||||
if entry.is_tick() {
|
||||
return None;
|
||||
}
|
||||
fn get_partitioned_batches(
|
||||
&self,
|
||||
my_pubkey: &Pubkey,
|
||||
bank: &Bank,
|
||||
original_shreds: Arc<Vec<Shred>>,
|
||||
partition_shreds: Arc<Vec<Shred>>,
|
||||
) -> (TransmitShreds, TransmitShreds) {
|
||||
// On the last shred, partition network with duplicate and real shreds
|
||||
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
||||
let mut original_recipients = HashMap::new();
|
||||
let mut partition_recipients = HashMap::new();
|
||||
|
||||
let transactions: Vec<Transaction> = entry
|
||||
.transactions
|
||||
.iter()
|
||||
.filter(|tx| {
|
||||
self.duplicate_queue
|
||||
.get_hash_age(&tx.message.recent_blockhash)
|
||||
.is_some()
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
if !transactions.is_empty() {
|
||||
Some(Entry::new_mut(
|
||||
&mut self.last_duplicate_entry_hash,
|
||||
&mut 0,
|
||||
transactions,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let mut tick_entries = create_ticks(
|
||||
receive_results.entries.tick_count(),
|
||||
bank.hashes_per_tick().unwrap_or_default(),
|
||||
self.last_duplicate_entry_hash,
|
||||
);
|
||||
self.duplicate_entries_buffer.append(&mut tx_entries);
|
||||
self.duplicate_entries_buffer.append(&mut tick_entries);
|
||||
|
||||
// Only send out duplicate entries when the block is finished otherwise the
|
||||
// recipient will start repairing for shreds they haven't received yet and
|
||||
// hit duplicate slot issues before we want them to.
|
||||
let entries = if receive_results.last_tick_height == bank.max_tick_height() {
|
||||
self.duplicate_entries_buffer.drain(..).collect()
|
||||
let mut stakes: Vec<(Pubkey, u64)> = bank
|
||||
.epoch_staked_nodes(bank_epoch)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.filter(|(pubkey, _)| pubkey != my_pubkey)
|
||||
.collect();
|
||||
stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| {
|
||||
if r_stake == l_stake {
|
||||
l_key.cmp(r_key)
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
l_stake.cmp(r_stake)
|
||||
}
|
||||
});
|
||||
|
||||
// Set next shred index to 0 since we are sending the full slot
|
||||
(entries, 0)
|
||||
} else {
|
||||
// Send real entries until we hit min duplicate slot
|
||||
(receive_results.entries.clone(), self.next_shred_index)
|
||||
};
|
||||
|
||||
// Save last duplicate entry hash to avoid invalid entry hash errors
|
||||
if let Some(last_duplicate_entry) = duplicate_entries.last() {
|
||||
self.last_duplicate_entry_hash = last_duplicate_entry.hash;
|
||||
let mut cumulative_stake: u64 = 0;
|
||||
for (pubkey, stake) in stakes.into_iter() {
|
||||
cumulative_stake += stake;
|
||||
if cumulative_stake <= self.config.stake_partition {
|
||||
partition_recipients.insert(pubkey, stake);
|
||||
} else {
|
||||
original_recipients.insert(pubkey, stake);
|
||||
}
|
||||
}
|
||||
|
||||
(duplicate_entries, next_shred_index)
|
||||
warn!(
|
||||
"{} sent duplicate slot {} to nodes: {:?}",
|
||||
my_pubkey,
|
||||
bank.slot(),
|
||||
&partition_recipients,
|
||||
);
|
||||
|
||||
let original_recipients = Arc::new(original_recipients);
|
||||
let original_transmit_shreds = (Some(original_recipients), original_shreds);
|
||||
|
||||
let partition_recipients = Arc::new(partition_recipients);
|
||||
let partition_transmit_shreds = (Some(partition_recipients), partition_shreds);
|
||||
|
||||
(original_transmit_shreds, partition_transmit_shreds)
|
||||
}
|
||||
}
|
||||
|
||||
/// Duplicate slots should only be sent once all validators have started.
|
||||
/// This constant is intended to be used as a buffer so that all validators
|
||||
/// are live before sending duplicate slots.
|
||||
pub const MINIMUM_DUPLICATE_SLOT: Slot = 20;
|
||||
|
||||
impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
fn run(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
_blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
) -> Result<()> {
|
||||
// 1) Pull entries from banking stage
|
||||
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
|
||||
let mut receive_results = broadcast_utils::recv_slot_entries(receiver)?;
|
||||
let bank = receive_results.bank.clone();
|
||||
let last_tick_height = receive_results.last_tick_height;
|
||||
|
||||
if self.next_shred_index == u32::MAX {
|
||||
self.next_shred_index = blockstore
|
||||
.meta(bank.slot())
|
||||
.expect("Database error")
|
||||
.map(|meta| meta.consumed)
|
||||
.unwrap_or(0) as u32
|
||||
if bank.slot() != self.current_slot {
|
||||
self.next_shred_index = 0;
|
||||
self.current_slot = bank.slot();
|
||||
self.prev_entry_hash = None;
|
||||
self.num_slots_broadcasted += 1;
|
||||
}
|
||||
|
||||
// We were not the leader, but just became leader again
|
||||
if bank.slot() > self.last_broadcast_slot + 1 {
|
||||
self.last_duplicate_entry_hash = Hash::default();
|
||||
if receive_results.entries.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
self.last_broadcast_slot = bank.slot();
|
||||
|
||||
// Update the recent blockhash based on transactions in the entries
|
||||
for entry in &receive_results.entries {
|
||||
if !entry.transactions.is_empty() {
|
||||
self.recent_blockhash = Some(entry.transactions[0].message.recent_blockhash);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// 2) Convert entries to shreds + generate coding shreds. Set a garbage PoH on the last entry
|
||||
// in the slot to make verification fail on validators
|
||||
let last_entries = {
|
||||
if last_tick_height == bank.max_tick_height()
|
||||
&& bank.slot() > MINIMUM_DUPLICATE_SLOT
|
||||
&& self.num_slots_broadcasted % DUPLICATE_RATE == 0
|
||||
&& self.recent_blockhash.is_some()
|
||||
{
|
||||
let entry_batch_len = receive_results.entries.len();
|
||||
let prev_entry_hash =
|
||||
// Try to get second-to-last entry before last tick
|
||||
if entry_batch_len > 1 {
|
||||
Some(receive_results.entries[entry_batch_len - 2].hash)
|
||||
} else {
|
||||
self.prev_entry_hash
|
||||
};
|
||||
|
||||
if let Some(prev_entry_hash) = prev_entry_hash {
|
||||
let original_last_entry = receive_results.entries.pop().unwrap();
|
||||
|
||||
// Last entry has to be a tick
|
||||
assert!(original_last_entry.is_tick());
|
||||
|
||||
// Inject an extra entry before the last tick
|
||||
let extra_tx = system_transaction::transfer(
|
||||
keypair,
|
||||
&Pubkey::new_unique(),
|
||||
1,
|
||||
self.recent_blockhash.unwrap(),
|
||||
);
|
||||
let new_extra_entry = Entry::new(&prev_entry_hash, 1, vec![extra_tx]);
|
||||
|
||||
// This will only work with sleepy tick producer where the hashing
|
||||
// checks in replay are turned off, because we're introducing an extra
|
||||
// hash for the last tick in the `new_extra_entry`.
|
||||
let new_last_entry = Entry::new(
|
||||
&new_extra_entry.hash,
|
||||
original_last_entry.num_hashes,
|
||||
vec![],
|
||||
);
|
||||
|
||||
Some((original_last_entry, vec![new_extra_entry, new_last_entry]))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
self.prev_entry_hash = last_entries
|
||||
.as_ref()
|
||||
.map(|(original_last_entry, _)| original_last_entry.hash)
|
||||
.or_else(|| Some(receive_results.entries.last().unwrap().hash));
|
||||
|
||||
let shredder = Shredder::new(
|
||||
bank.slot(),
|
||||
@ -166,165 +210,104 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
||||
)
|
||||
.expect("Expected to create a new shredder");
|
||||
|
||||
let (data_shreds, coding_shreds, last_shred_index) = shredder.entries_to_shreds(
|
||||
let (data_shreds, _, _) = shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&receive_results.entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
|
||||
self.next_shred_index,
|
||||
);
|
||||
|
||||
let (duplicate_entries, next_duplicate_shred_index) =
|
||||
self.queue_or_create_duplicate_entries(&bank, &receive_results);
|
||||
let (duplicate_data_shreds, duplicate_coding_shreds, _) = if !duplicate_entries.is_empty() {
|
||||
shredder.entries_to_shreds(
|
||||
keypair,
|
||||
&duplicate_entries,
|
||||
last_tick_height == bank.max_tick_height(),
|
||||
next_duplicate_shred_index,
|
||||
)
|
||||
} else {
|
||||
(vec![], vec![], 0)
|
||||
};
|
||||
self.next_shred_index += data_shreds.len() as u32;
|
||||
let last_shreds = last_entries.map(|(original_last_entry, duplicate_extra_last_entries)| {
|
||||
let (original_last_data_shred, _, _) =
|
||||
shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index);
|
||||
|
||||
// Manually track the shred index because relying on slot meta consumed is racy
|
||||
if last_tick_height == bank.max_tick_height() {
|
||||
self.next_shred_index = 0;
|
||||
self.duplicate_queue
|
||||
.register_hash(&self.last_duplicate_entry_hash, &FeeCalculator::default());
|
||||
} else {
|
||||
self.next_shred_index = last_shred_index;
|
||||
}
|
||||
let (partition_last_data_shred, _, _) =
|
||||
// Don't mark the last shred as last so that validators won't know that
|
||||
// they've gotten all the shreds, and will continue trying to repair
|
||||
shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index);
|
||||
|
||||
// Partition network with duplicate and real shreds based on stake
|
||||
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
|
||||
let mut duplicate_recipients = HashMap::new();
|
||||
let mut real_recipients = HashMap::new();
|
||||
let sigs: Vec<_> = partition_last_data_shred.iter().map(|s| (s.signature(), s.index())).collect();
|
||||
info!(
|
||||
"duplicate signatures for slot {}, sigs: {:?}",
|
||||
bank.slot(),
|
||||
sigs,
|
||||
);
|
||||
|
||||
let mut stakes: Vec<(Pubkey, u64)> = bank
|
||||
.epoch_staked_nodes(bank_epoch)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.filter(|(pubkey, _)| *pubkey != keypair.pubkey())
|
||||
.collect();
|
||||
stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| {
|
||||
if r_stake == l_stake {
|
||||
l_key.cmp(r_key)
|
||||
} else {
|
||||
r_stake.cmp(l_stake)
|
||||
}
|
||||
self.next_shred_index += 1;
|
||||
(original_last_data_shred, partition_last_data_shred)
|
||||
});
|
||||
|
||||
let highest_staked_node = stakes.first().cloned().map(|x| x.0);
|
||||
let stake_total: u64 = stakes.iter().map(|(_, stake)| *stake).sum();
|
||||
let mut cumulative_stake: u64 = 0;
|
||||
for (pubkey, stake) in stakes.into_iter().rev() {
|
||||
cumulative_stake += stake;
|
||||
if (100 * cumulative_stake / stake_total) as u8 <= self.config.stake_partition {
|
||||
duplicate_recipients.insert(pubkey, stake);
|
||||
} else {
|
||||
real_recipients.insert(pubkey, stake);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(highest_staked_node) = highest_staked_node {
|
||||
if bank.slot() > MINIMUM_DUPLICATE_SLOT && last_tick_height == bank.max_tick_height() {
|
||||
warn!(
|
||||
"{} sent duplicate slot {} to nodes: {:?}",
|
||||
keypair.pubkey(),
|
||||
bank.slot(),
|
||||
&duplicate_recipients,
|
||||
);
|
||||
warn!(
|
||||
"Duplicate shreds for slot {} will be broadcast in {} slot(s)",
|
||||
bank.slot(),
|
||||
self.config.duplicate_send_delay
|
||||
);
|
||||
|
||||
let delayed_shreds: Option<Vec<Shred>> = vec![
|
||||
duplicate_data_shreds.last().cloned(),
|
||||
data_shreds.last().cloned(),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
self.delayed_queue
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push_back((Some(highest_staked_node), delayed_shreds));
|
||||
}
|
||||
}
|
||||
|
||||
let duplicate_recipients = Arc::new(duplicate_recipients);
|
||||
let real_recipients = Arc::new(real_recipients);
|
||||
|
||||
let data_shreds = Arc::new(data_shreds);
|
||||
blockstore_sender.send((data_shreds.clone(), None))?;
|
||||
|
||||
// 3) Start broadcast step
|
||||
socket_sender.send((
|
||||
(
|
||||
Some(duplicate_recipients.clone()),
|
||||
Arc::new(duplicate_data_shreds),
|
||||
),
|
||||
None,
|
||||
))?;
|
||||
socket_sender.send((
|
||||
(
|
||||
Some(duplicate_recipients),
|
||||
Arc::new(duplicate_coding_shreds),
|
||||
),
|
||||
None,
|
||||
))?;
|
||||
socket_sender.send(((Some(real_recipients.clone()), data_shreds), None))?;
|
||||
socket_sender.send(((Some(real_recipients), Arc::new(coding_shreds)), None))?;
|
||||
let transmit_shreds =
|
||||
self.get_non_partitioned_batches(&keypair.pubkey(), &bank, data_shreds.clone());
|
||||
info!(
|
||||
"{} Sending good shreds for slot {} to network",
|
||||
keypair.pubkey(),
|
||||
data_shreds.first().unwrap().slot()
|
||||
);
|
||||
socket_sender.send((transmit_shreds, None))?;
|
||||
|
||||
// Special handling of last shred to cause partition
|
||||
if let Some((original_last_data_shred, partition_last_data_shred)) = last_shreds {
|
||||
let original_last_data_shred = Arc::new(original_last_data_shred);
|
||||
let partition_last_data_shred = Arc::new(partition_last_data_shred);
|
||||
|
||||
// Store the original shreds that this node replayed
|
||||
blockstore_sender.send((original_last_data_shred.clone(), None))?;
|
||||
|
||||
let (original_transmit_shreds, partition_transmit_shreds) = self
|
||||
.get_partitioned_batches(
|
||||
&keypair.pubkey(),
|
||||
&bank,
|
||||
original_last_data_shred,
|
||||
partition_last_data_shred,
|
||||
);
|
||||
|
||||
socket_sender.send((original_transmit_shreds, None))?;
|
||||
socket_sender.send((partition_transmit_shreds, None))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn transmit(
|
||||
&mut self,
|
||||
receiver: &Arc<Mutex<TransmitReceiver>>,
|
||||
cluster_info: &ClusterInfo,
|
||||
sock: &UdpSocket,
|
||||
_bank_forks: &Arc<RwLock<BankForks>>,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
) -> Result<()> {
|
||||
// Check the delay queue for shreds that are ready to be sent
|
||||
let (delayed_recipient, delayed_shreds) = {
|
||||
let mut delayed_deque = self.delayed_queue.lock().unwrap();
|
||||
if delayed_deque.len() > self.config.duplicate_send_delay {
|
||||
delayed_deque.pop_front().unwrap()
|
||||
} else {
|
||||
(None, None)
|
||||
}
|
||||
};
|
||||
|
||||
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
|
||||
let stakes = stakes.unwrap();
|
||||
for peer in cluster_info.tvu_peers() {
|
||||
// Forward shreds to circumvent gossip
|
||||
if stakes.get(&peer.id).is_some() {
|
||||
shreds.iter().for_each(|shred| {
|
||||
sock.send_to(&shred.payload, &peer.tvu_forwards).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
// After a delay, broadcast duplicate shreds to a single node
|
||||
if let Some(shreds) = delayed_shreds.as_ref() {
|
||||
if Some(peer.id) == delayed_recipient {
|
||||
shreds.iter().for_each(|shred| {
|
||||
sock.send_to(&shred.payload, &peer.tvu).unwrap();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
// Broadcast data
|
||||
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(
|
||||
cluster_info,
|
||||
stakes.as_deref().unwrap_or(&HashMap::default()),
|
||||
);
|
||||
broadcast_shreds(
|
||||
sock,
|
||||
&shreds,
|
||||
&cluster_nodes,
|
||||
&Arc::new(AtomicU64::new(0)),
|
||||
&mut TransmitShredsStats::default(),
|
||||
cluster_info.id(),
|
||||
bank_forks,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn record(
|
||||
&mut self,
|
||||
receiver: &Arc<Mutex<RecordReceiver>>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
) -> Result<()> {
|
||||
let (data_shreds, _) = receiver.lock().unwrap().recv()?;
|
||||
blockstore.insert_shreds(data_shreds.to_vec(), None, true)?;
|
||||
let (all_shreds, _) = receiver.lock().unwrap().recv()?;
|
||||
blockstore
|
||||
.insert_shreds(all_shreds.to_vec(), None, true)
|
||||
.expect("Failed to insert shreds in blockstore");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user