Remove obsolete references to Blob (#6957)

* Remove the name "blob" from archivers

* Remove the name "blob" from broadcast

* Remove the name "blob" from Cluset Info

* Remove the name "blob" from Repair

* Remove the name "blob" from a bunch more places

* Remove the name "blob" from tests and book
This commit is contained in:
Sagar Dhawan
2019-11-14 11:49:31 -08:00
committed by GitHub
parent e7f63cd336
commit 79d7090867
26 changed files with 258 additions and 259 deletions

View File

@ -261,20 +261,20 @@ impl Archiver {
};
let repair_socket = Arc::new(node.sockets.repair);
let blob_sockets: Vec<Arc<UdpSocket>> =
let shred_sockets: Vec<Arc<UdpSocket>> =
node.sockets.tvu.into_iter().map(Arc::new).collect();
let blob_forward_sockets: Vec<Arc<UdpSocket>> = node
let shred_forward_sockets: Vec<Arc<UdpSocket>> = node
.sockets
.tvu_forwards
.into_iter()
.map(Arc::new)
.collect();
let (blob_fetch_sender, blob_fetch_receiver) = channel();
let (shred_fetch_sender, shred_fetch_receiver) = channel();
let fetch_stage = ShredFetchStage::new(
blob_sockets,
blob_forward_sockets,
shred_sockets,
shred_forward_sockets,
repair_socket.clone(),
&blob_fetch_sender,
&shred_fetch_sender,
&exit,
);
let (slot_sender, slot_receiver) = channel();
@ -299,7 +299,7 @@ impl Archiver {
&node_info,
&storage_keypair,
repair_socket,
blob_fetch_receiver,
shred_fetch_receiver,
slot_sender,
) {
Ok(window_service) => window_service,
@ -448,7 +448,7 @@ impl Archiver {
node_info: &ContactInfo,
storage_keypair: &Arc<Keypair>,
repair_socket: Arc<UdpSocket>,
blob_fetch_receiver: PacketReceiver,
shred_fetch_receiver: PacketReceiver,
slot_sender: Sender<u64>,
) -> Result<(WindowService)> {
let slots_per_segment =
@ -492,7 +492,7 @@ impl Archiver {
let (verified_sender, verified_receiver) = unbounded();
let _sigverify_stage = SigVerifyStage::new(
blob_fetch_receiver,
shred_fetch_receiver,
verified_sender.clone(),
DisabledSigVerifier::default(),
);
@ -845,7 +845,7 @@ impl Archiver {
/// Return the slot at the start of the archiver's segment
///
/// It is recommended to use a temporary blocktree for this since the download will not verify
/// blobs received and might impact the chaining of blobs across slots
/// shreds received and might impact the chaining of shreds across slots
pub fn download_from_archiver(
cluster_info: &Arc<RwLock<ClusterInfo>>,
archiver_info: &ContactInfo,
@ -853,7 +853,7 @@ impl Archiver {
slots_per_segment: u64,
) -> Result<(u64)> {
// Create a client which downloads from the archiver and see that it
// can respond with blobs.
// can respond with shreds.
let start_slot = Self::get_archiver_segment_slot(archiver_info.storage_addr);
info!("Archiver download: start at {}", start_slot);

View File

@ -1,5 +1,5 @@
//! A stage to broadcast data from a leader node to validators
use self::broadcast_fake_blobs_run::BroadcastFakeBlobsRun;
use self::broadcast_fake_shreds_run::BroadcastFakeShredsRun;
use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun;
use self::standard_broadcast_run::StandardBroadcastRun;
use crate::cluster_info::{ClusterInfo, ClusterInfoError};
@ -15,7 +15,7 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
mod broadcast_fake_blobs_run;
mod broadcast_fake_shreds_run;
pub(crate) mod broadcast_utils;
mod fail_entry_verification_broadcast_run;
mod standard_broadcast_run;
@ -31,7 +31,7 @@ pub enum BroadcastStageReturnType {
pub enum BroadcastStageType {
Standard,
FailEntryVerification,
BroadcastFakeBlobs,
BroadcastFakeShreds,
}
impl BroadcastStageType {
@ -65,13 +65,13 @@ impl BroadcastStageType {
FailEntryVerificationBroadcastRun::new(),
),
BroadcastStageType::BroadcastFakeBlobs => BroadcastStage::new(
BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new(
sock,
cluster_info,
receiver,
exit_sender,
blocktree,
BroadcastFakeBlobsRun::new(0),
BroadcastFakeShredsRun::new(0),
),
}
}
@ -141,8 +141,8 @@ impl BroadcastStage {
/// * `sock` - Socket to send from.
/// * `exit` - Boolean to signal system exit.
/// * `cluster_info` - ClusterInfo structure
/// * `window` - Cache of blobs that we have broadcast
/// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
/// * `window` - Cache of Shreds that we have broadcast
/// * `receiver` - Receive channel for Shreds to be retransmitted to all the layer 1 nodes.
/// * `exit_sender` - Set to true when this service exits, allows rest of Tpu to exit cleanly.
/// Otherwise, when a Tpu closes, it only closes the stages that come after it. The stages
/// that come before could be blocked on a receive, and never notice that they need to

View File

@ -3,12 +3,12 @@ use solana_ledger::entry::Entry;
use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_sdk::hash::Hash;
pub(super) struct BroadcastFakeBlobsRun {
pub(super) struct BroadcastFakeShredsRun {
last_blockhash: Hash,
partition: usize,
}
impl BroadcastFakeBlobsRun {
impl BroadcastFakeShredsRun {
pub(super) fn new(partition: usize) -> Self {
Self {
last_blockhash: Hash::default(),
@ -17,7 +17,7 @@ impl BroadcastFakeBlobsRun {
}
}
impl BroadcastRun for BroadcastFakeBlobsRun {
impl BroadcastRun for BroadcastFakeShredsRun {
fn run(
&mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>,
@ -82,7 +82,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
let peers = cluster_info.read().unwrap().tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| {
if i <= self.partition {
// Send fake blobs to the first N peers
// Send fake shreds to the first N peers
fake_data_shreds
.iter()
.chain(fake_coding_shreds.iter())

View File

@ -23,7 +23,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
let bank = receive_results.bank.clone();
let last_tick_height = receive_results.last_tick_height;
// 2) Convert entries to blobs + generate coding blobs. Set a garbage PoH on the last entry
// 2) Convert entries to shreds + generate coding shreds. Set a garbage PoH on the last entry
// in the slot to make verification fail on validators
if last_tick_height == bank.max_tick_height() {
let mut last_entry = receive_results.entries.last_mut().unwrap();

View File

@ -164,7 +164,7 @@ mod tests {
let mut hasher = Hasher::default();
hasher.hash(&buf[..size]);
// golden needs to be updated if blob stuff changes....
// golden needs to be updated if shred structure changes....
let golden: Hash = "HLzH7Nrh4q2K5WTh3e9vPNFZ1QVYhVDRMN9u5v51GqpJ"
.parse()
.unwrap();

View File

@ -804,14 +804,14 @@ impl ClusterInfo {
Ok(())
}
pub fn window_index_request_bytes(&self, slot: Slot, blob_index: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestWindowIndex(self.my_data().clone(), slot, blob_index);
pub fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestWindowIndex(self.my_data().clone(), slot, shred_index);
let out = serialize(&req)?;
Ok(out)
}
fn window_highest_index_request_bytes(&self, slot: Slot, blob_index: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestHighestWindowIndex(self.my_data().clone(), slot, blob_index);
fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestHighestWindowIndex(self.my_data().clone(), slot, shred_index);
let out = serialize(&req)?;
Ok(out)
}
@ -837,21 +837,21 @@ impl ClusterInfo {
}
pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
match repair_request {
RepairType::Shred(slot, blob_index) => {
RepairType::Shred(slot, shred_index) => {
datapoint_debug!(
"cluster_info-repair",
("repair-slot", *slot, i64),
("repair-ix", *blob_index, i64)
("repair-ix", *shred_index, i64)
);
Ok(self.window_index_request_bytes(*slot, *blob_index)?)
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
}
RepairType::HighestBlob(slot, blob_index) => {
RepairType::HighestShred(slot, shred_index) => {
datapoint_debug!(
"cluster_info-repair_highest",
("repair-highest-slot", *slot, i64),
("repair-highest-ix", *blob_index, i64)
("repair-highest-ix", *shred_index, i64)
);
Ok(self.window_highest_index_request_bytes(*slot, *blob_index)?)
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
}
RepairType::Orphan(slot) => {
datapoint_debug!("cluster_info-repair_orphan", ("repair-orphan", *slot, i64));
@ -1165,7 +1165,7 @@ impl ClusterInfo {
packets: Packets,
response_sender: &PacketSender,
) {
// iter over the blobs, collect pulls separately and process everything else
// iter over the packets, collect pulls separately and process everything else
let mut gossip_pull_data: Vec<PullData> = vec![];
packets.packets.iter().for_each(|packet| {
let from_addr = packet.meta.addr();
@ -1404,7 +1404,7 @@ impl ClusterInfo {
let (res, label) = {
match &request {
Protocol::RequestWindowIndex(from, slot, blob_index) => {
Protocol::RequestWindowIndex(from, slot, shred_index) => {
inc_new_counter_debug!("cluster_info-request-window-index", 1);
(
Self::run_window_request(
@ -1413,7 +1413,7 @@ impl ClusterInfo {
blocktree,
&my_info,
*slot,
*blob_index,
*shred_index,
),
"RequestWindowIndex",
)
@ -1944,7 +1944,7 @@ mod tests {
assert!(one && two);
}
/// test window requests respond with the right blob, and do not overrun
/// test window requests respond with the right shred, and do not overrun
#[test]
fn run_window_request() {
solana_logger::setup();
@ -2009,7 +2009,7 @@ mod tests {
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
/// test run_window_requestwindow requests respond with the right blob, and do not overrun
/// test run_window_requestwindow requests respond with the right shred, and do not overrun
#[test]
fn run_highest_window_request() {
solana_logger::setup();
@ -2061,7 +2061,7 @@ mod tests {
let rv = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 2, 0);
assert!(rv.is_empty());
// Create slots 1, 2, 3 with 5 blobs apiece
// Create slots 1, 2, 3 with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(1, 3, 5);
blocktree
@ -2072,7 +2072,7 @@ mod tests {
let rv = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 4, 5);
assert!(rv.is_empty());
// For slot 3, we should return the highest blobs from slots 3, 2, 1 respectively
// For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively
// for this request
let rv: Vec<_> = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 3, 5)
.packets

View File

@ -29,47 +29,47 @@ pub const NUM_SLOTS_PER_UPDATE: usize = 2;
pub const REPAIR_SAME_SLOT_THRESHOLD: u64 = 5000;
use solana_sdk::timing::timestamp;
// Represents the blobs that a repairman is responsible for repairing in specific slot. More
// specifically, a repairman is responsible for every blob in this slot with index
// `(start_index + step_size * i) % num_blobs_in_slot`, for all `0 <= i <= num_blobs_to_send - 1`
// Represents the shreds that a repairman is responsible for repairing in specific slot. More
// specifically, a repairman is responsible for every shred in this slot with index
// `(start_index + step_size * i) % num_shreds_in_slot`, for all `0 <= i <= num_shreds_to_send - 1`
// in this slot.
struct BlobIndexesToRepairIterator {
struct ShredIndexesToRepairIterator {
start_index: usize,
num_blobs_to_send: usize,
num_shreds_to_send: usize,
step_size: usize,
num_blobs_in_slot: usize,
blobs_sent: usize,
num_shreds_in_slot: usize,
shreds_sent: usize,
}
impl BlobIndexesToRepairIterator {
impl ShredIndexesToRepairIterator {
fn new(
start_index: usize,
num_blobs_to_send: usize,
num_shreds_to_send: usize,
step_size: usize,
num_blobs_in_slot: usize,
num_shreds_in_slot: usize,
) -> Self {
Self {
start_index,
num_blobs_to_send,
num_shreds_to_send,
step_size,
num_blobs_in_slot,
blobs_sent: 0,
num_shreds_in_slot,
shreds_sent: 0,
}
}
}
impl Iterator for BlobIndexesToRepairIterator {
impl Iterator for ShredIndexesToRepairIterator {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
if self.blobs_sent == self.num_blobs_to_send {
if self.shreds_sent == self.num_shreds_to_send {
None
} else {
let blob_index = Some(
(self.start_index + self.step_size * self.blobs_sent) % self.num_blobs_in_slot,
let shred_index = Some(
(self.start_index + self.step_size * self.shreds_sent) % self.num_shreds_in_slot,
);
self.blobs_sent += 1;
blob_index
self.shreds_sent += 1;
shred_index
}
}
}
@ -295,8 +295,8 @@ impl ClusterInfoRepairListener {
let mut slot_iter = slot_iter?;
let mut total_data_blobs_sent = 0;
let mut total_coding_blobs_sent = 0;
let mut total_data_shreds_sent = 0;
let mut total_coding_shreds_sent = 0;
let mut num_slots_repaired = 0;
let max_confirmed_repairee_epoch =
epoch_schedule.get_leader_schedule_epoch(repairee_epoch_slots.root);
@ -318,23 +318,23 @@ impl ClusterInfoRepairListener {
break;
}
if !repairee_epoch_slots.slots.contains(&slot) {
// Calculate the blob indexes this node is responsible for repairing. Note that
// Calculate the shred indexes this node is responsible for repairing. Note that
// because we are only repairing slots that are before our root, the slot.received
// should be equal to the actual total number of blobs in the slot. Optimistically
// this means that most repairmen should observe the same "total" number of blobs
// should be equal to the actual total number of shreds in the slot. Optimistically
// this means that most repairmen should observe the same "total" number of shreds
// for a particular slot, and thus the calculation in
// calculate_my_repairman_index_for_slot() will divide responsibility evenly across
// the cluster
let num_blobs_in_slot = slot_meta.received as usize;
let num_shreds_in_slot = slot_meta.received as usize;
// Check if I'm responsible for repairing this slots
if let Some(my_repair_indexes) = Self::calculate_my_repairman_index_for_slot(
my_pubkey,
&eligible_repairmen,
num_blobs_in_slot,
num_shreds_in_slot,
REPAIR_REDUNDANCY,
) {
// If I've already sent blobs >= this slot before, then don't send them again
// If I've already sent shreds >= this slot before, then don't send them again
// until the timeout has expired
if slot > last_repaired_slot
|| timestamp() - last_repaired_ts > REPAIR_SAME_SLOT_THRESHOLD
@ -343,27 +343,27 @@ impl ClusterInfoRepairListener {
"Serving repair for slot {} to {}. Repairee slots: {:?}",
slot, repairee_pubkey, repairee_epoch_slots.slots
);
// Repairee is missing this slot, send them the blobs for this slot
for blob_index in my_repair_indexes {
// Loop over the blob indexes and query the database for these blob that
// Repairee is missing this slot, send them the shreds for this slot
for shred_index in my_repair_indexes {
// Loop over the shred indexes and query the database for these shred that
// this node is reponsible for repairing. This should be faster than using
// a database iterator over the slots because by the time this node is
// sending the blobs in this slot for repair, we expect these slots
// sending the shreds in this slot for repair, we expect these slots
// to be full.
if let Some(blob_data) = blocktree
.get_data_shred(slot, blob_index as u64)
.expect("Failed to read data blob from blocktree")
if let Some(shred_data) = blocktree
.get_data_shred(slot, shred_index as u64)
.expect("Failed to read data shred from blocktree")
{
socket.send_to(&blob_data[..], repairee_addr)?;
total_data_blobs_sent += 1;
socket.send_to(&shred_data[..], repairee_addr)?;
total_data_shreds_sent += 1;
}
if let Some(coding_bytes) = blocktree
.get_coding_shred(slot, blob_index as u64)
.expect("Failed to read coding blob from blocktree")
.get_coding_shred(slot, shred_index as u64)
.expect("Failed to read coding shred from blocktree")
{
socket.send_to(&coding_bytes[..], repairee_addr)?;
total_coding_blobs_sent += 1;
total_coding_shreds_sent += 1;
}
}
@ -371,11 +371,11 @@ impl ClusterInfoRepairListener {
Self::report_repair_metrics(
slot,
repairee_pubkey,
total_data_blobs_sent,
total_coding_blobs_sent,
total_data_shreds_sent,
total_coding_shreds_sent,
);
total_data_blobs_sent = 0;
total_coding_blobs_sent = 0;
total_data_shreds_sent = 0;
total_coding_shreds_sent = 0;
}
num_slots_repaired += 1;
}
@ -388,16 +388,16 @@ impl ClusterInfoRepairListener {
fn report_repair_metrics(
slot: u64,
repairee_id: &Pubkey,
total_data_blobs_sent: u64,
total_coding_blobs_sent: u64,
total_data_shreds_sent: u64,
total_coding_shreds_sent: u64,
) {
if total_data_blobs_sent > 0 || total_coding_blobs_sent > 0 {
if total_data_shreds_sent > 0 || total_coding_shreds_sent > 0 {
datapoint!(
"repairman_activity",
("slot", slot, i64),
("repairee_id", repairee_id.to_string(), String),
("data_sent", total_data_blobs_sent, i64),
("coding_sent", total_coding_blobs_sent, i64)
("data_sent", total_data_shreds_sent, i64),
("coding_sent", total_coding_shreds_sent, i64)
);
}
}
@ -418,21 +418,21 @@ impl ClusterInfoRepairListener {
eligible_repairmen.shuffle(&mut rng);
}
// The calculation should partition the blobs in the slot across the repairmen in the cluster
// such that each blob in the slot is the responsibility of `repair_redundancy` or
// The calculation should partition the shreds in the slot across the repairmen in the cluster
// such that each shred in the slot is the responsibility of `repair_redundancy` or
// `repair_redundancy + 1` number of repairmen in the cluster.
fn calculate_my_repairman_index_for_slot(
my_pubkey: &Pubkey,
eligible_repairmen: &[&Pubkey],
num_blobs_in_slot: usize,
num_shreds_in_slot: usize,
repair_redundancy: usize,
) -> Option<BlobIndexesToRepairIterator> {
let total_blobs = num_blobs_in_slot * repair_redundancy;
let total_repairmen_for_slot = cmp::min(total_blobs, eligible_repairmen.len());
) -> Option<ShredIndexesToRepairIterator> {
let total_shreds = num_shreds_in_slot * repair_redundancy;
let total_repairmen_for_slot = cmp::min(total_shreds, eligible_repairmen.len());
let blobs_per_repairman = cmp::min(
(total_blobs + total_repairmen_for_slot - 1) / total_repairmen_for_slot,
num_blobs_in_slot,
let shreds_per_repairman = cmp::min(
(total_shreds + total_repairmen_for_slot - 1) / total_repairmen_for_slot,
num_shreds_in_slot,
);
// Calculate the indexes this node is responsible for
@ -440,15 +440,15 @@ impl ClusterInfoRepairListener {
.iter()
.position(|id| *id == my_pubkey)
{
let start_index = my_position % num_blobs_in_slot;
Some(BlobIndexesToRepairIterator::new(
let start_index = my_position % num_shreds_in_slot;
Some(ShredIndexesToRepairIterator::new(
start_index,
blobs_per_repairman,
shreds_per_repairman,
total_repairmen_for_slot,
num_blobs_in_slot,
num_shreds_in_slot,
))
} else {
// If there are more repairmen than `total_blobs`, then some repairmen
// If there are more repairmen than `total_shreds`, then some repairmen
// will not have any responsibility to repair this slot
None
}
@ -797,7 +797,7 @@ mod tests {
let mut received_shreds: Vec<Packets> = vec![];
// This repairee was missing exactly `num_slots / 2` slots, so we expect to get
// `(num_slots / 2) * num_shreds_per_slot * REPAIR_REDUNDANCY` blobs.
// `(num_slots / 2) * num_shreds_per_slot * REPAIR_REDUNDANCY` shreds.
let num_expected_shreds = (num_slots / 2) * num_shreds_per_slot * REPAIR_REDUNDANCY as u64;
while (received_shreds
.iter()
@ -833,7 +833,7 @@ mod tests {
let slots_per_epoch = stakers_slot_offset * 2;
let epoch_schedule = EpochSchedule::custom(slots_per_epoch, stakers_slot_offset, false);
// Create blobs for first two epochs and write them to blocktree
// Create shreds for first two epochs and write them to blocktree
let total_slots = slots_per_epoch * 2;
let (shreds, _) = make_many_slot_entries(0, total_slots, 1);
blocktree.insert_shreds(shreds, None, false).unwrap();
@ -853,7 +853,7 @@ mod tests {
// 1) They are missing all of the second epoch, but have all of the first epoch.
// 2) The root only confirms epoch 1, so the leader for epoch 2 is unconfirmed.
//
// Thus, no repairmen should send any blobs to this repairee b/c this repairee
// Thus, no repairmen should send any shreds to this repairee b/c this repairee
// already has all the slots for which they have a confirmed leader schedule
let repairee_root = 0;
let repairee_slots: BTreeSet<_> = (0..=slots_per_epoch).collect();
@ -898,7 +898,7 @@ mod tests {
)
.unwrap();
// Make sure some blobs get sent this time
// Make sure some shreds get sent this time
sleep(Duration::from_millis(1000));
assert!(mock_repairee.receiver.try_recv().is_ok());
@ -932,79 +932,79 @@ mod tests {
#[test]
fn test_calculate_my_repairman_index_for_slot() {
// Test when the number of blobs in the slot > number of repairmen
// Test when the number of shreds in the slot > number of repairmen
let num_repairmen = 10;
let num_blobs_in_slot = 42;
let num_shreds_in_slot = 42;
let repair_redundancy = 3;
run_calculate_my_repairman_index_for_slot(
num_repairmen,
num_blobs_in_slot,
num_shreds_in_slot,
repair_redundancy,
);
// Test when num_blobs_in_slot is a multiple of num_repairmen
// Test when num_shreds_in_slot is a multiple of num_repairmen
let num_repairmen = 12;
let num_blobs_in_slot = 48;
let num_shreds_in_slot = 48;
let repair_redundancy = 3;
run_calculate_my_repairman_index_for_slot(
num_repairmen,
num_blobs_in_slot,
num_shreds_in_slot,
repair_redundancy,
);
// Test when num_repairmen and num_blobs_in_slot are relatively prime
// Test when num_repairmen and num_shreds_in_slot are relatively prime
let num_repairmen = 12;
let num_blobs_in_slot = 47;
let num_shreds_in_slot = 47;
let repair_redundancy = 12;
run_calculate_my_repairman_index_for_slot(
num_repairmen,
num_blobs_in_slot,
num_shreds_in_slot,
repair_redundancy,
);
// Test 1 repairman
let num_repairmen = 1;
let num_blobs_in_slot = 30;
let num_shreds_in_slot = 30;
let repair_redundancy = 3;
run_calculate_my_repairman_index_for_slot(
num_repairmen,
num_blobs_in_slot,
num_shreds_in_slot,
repair_redundancy,
);
// Test when repair_redundancy is 1, and num_blobs_in_slot does not evenly
// Test when repair_redundancy is 1, and num_shreds_in_slot does not evenly
// divide num_repairmen
let num_repairmen = 12;
let num_blobs_in_slot = 47;
let num_shreds_in_slot = 47;
let repair_redundancy = 1;
run_calculate_my_repairman_index_for_slot(
num_repairmen,
num_blobs_in_slot,
num_shreds_in_slot,
repair_redundancy,
);
// Test when the number of blobs in the slot <= number of repairmen
// Test when the number of shreds in the slot <= number of repairmen
let num_repairmen = 10;
let num_blobs_in_slot = 10;
let num_shreds_in_slot = 10;
let repair_redundancy = 3;
run_calculate_my_repairman_index_for_slot(
num_repairmen,
num_blobs_in_slot,
num_shreds_in_slot,
repair_redundancy,
);
// Test when there are more repairmen than repair_redundancy * num_blobs_in_slot
// Test when there are more repairmen than repair_redundancy * num_shreds_in_slot
let num_repairmen = 42;
let num_blobs_in_slot = 10;
let num_shreds_in_slot = 10;
let repair_redundancy = 3;
run_calculate_my_repairman_index_for_slot(
num_repairmen,
num_blobs_in_slot,
num_shreds_in_slot,
repair_redundancy,
);
}
@ -1059,7 +1059,7 @@ mod tests {
fn run_calculate_my_repairman_index_for_slot(
num_repairmen: usize,
num_blobs_in_slot: usize,
num_shreds_in_slot: usize,
repair_redundancy: usize,
) {
let eligible_repairmen: Vec<_> = (0..num_repairmen).map(|_| Pubkey::new_rand()).collect();
@ -1071,13 +1071,13 @@ mod tests {
ClusterInfoRepairListener::calculate_my_repairman_index_for_slot(
pk,
&eligible_repairmen_ref[..],
num_blobs_in_slot,
num_shreds_in_slot,
repair_redundancy,
)
{
for blob_index in my_repair_indexes {
for shred_index in my_repair_indexes {
results
.entry(blob_index)
.entry(shred_index)
.and_modify(|e| *e += 1)
.or_insert(1);
}
@ -1089,7 +1089,7 @@ mod tests {
// Analyze the results:
// 1) If there are a sufficient number of repairmen, then each blob should be sent
// 1) If there are a sufficient number of repairmen, then each shred should be sent
// `repair_redundancy` OR `repair_redundancy + 1` times.
let num_expected_redundancy = cmp::min(num_repairmen, repair_redundancy);
for b in results.keys() {
@ -1098,18 +1098,18 @@ mod tests {
);
}
// 2) The number of times each blob is sent should be evenly distributed
let max_times_blob_sent = results.values().min_by(|x, y| x.cmp(y)).unwrap();
let min_times_blob_sent = results.values().max_by(|x, y| x.cmp(y)).unwrap();
assert!(*max_times_blob_sent <= *min_times_blob_sent + 1);
// 2) The number of times each shred is sent should be evenly distributed
let max_times_shred_sent = results.values().min_by(|x, y| x.cmp(y)).unwrap();
let min_times_shred_sent = results.values().max_by(|x, y| x.cmp(y)).unwrap();
assert!(*max_times_shred_sent <= *min_times_shred_sent + 1);
// 3) There should only be repairmen who are not responsible for repairing this slot
// if we have more repairman than `num_blobs_in_slot * repair_redundancy`. In this case the
// first `num_blobs_in_slot * repair_redundancy` repairmen would send one blob, and the rest
// if we have more repairman than `num_shreds_in_slot * repair_redundancy`. In this case the
// first `num_shreds_in_slot * repair_redundancy` repairmen would send one shred, and the rest
// would not be responsible for sending any repairs
assert_eq!(
none_results,
num_repairmen.saturating_sub(num_blobs_in_slot * repair_redundancy)
num_repairmen.saturating_sub(num_shreds_in_slot * repair_redundancy)
);
}
}

View File

@ -1,7 +1,7 @@
//! Crds Gossip
//! This module ties together Crds and the push and pull gossip overlays. The interface is
//! designed to run with a simulator or over a UDP network connection with messages up to a
//! packet::BLOB_DATA_SIZE size.
//! packet::PACKET_DATA_SIZE size.
use crate::crds::{Crds, VersionedCrdsValue};
use crate::crds_gossip_error::CrdsGossipError;

View File

@ -1,5 +1,5 @@
//! The `repair_service` module implements the tools necessary to generate a thread which
//! regularly finds missing blobs in the ledger and sends repair requests for those blobs
//! regularly finds missing shreds in the ledger and sends repair requests for those shreds
use crate::{
cluster_info::ClusterInfo, cluster_info_repair_listener::ClusterInfoRepairListener,
result::Result,
@ -36,7 +36,7 @@ pub enum RepairStrategy {
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum RepairType {
Orphan(u64),
HighestBlob(u64, u64),
HighestShred(u64, u64),
Shred(u64, u64),
}
@ -187,7 +187,7 @@ impl RepairService {
max_repairs: usize,
repair_range: &RepairSlotRange,
) -> Result<(Vec<RepairType>)> {
// Slot height and blob indexes for blobs we want to repair
// Slot height and shred indexes for shreds we want to repair
let mut repairs: Vec<RepairType> = vec![];
for slot in repair_range.start..=repair_range.end {
if repairs.len() >= max_repairs {
@ -219,7 +219,7 @@ impl RepairService {
root: u64,
max_repairs: usize,
) -> Result<(Vec<RepairType>)> {
// Slot height and blob indexes for blobs we want to repair
// Slot height and shred indexes for shreds we want to repair
let mut repairs: Vec<RepairType> = vec![];
Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, root);
@ -242,7 +242,7 @@ impl RepairService {
if slot_meta.is_full() {
vec![]
} else if slot_meta.consumed == slot_meta.received {
vec![RepairType::HighestBlob(slot, slot_meta.received)]
vec![RepairType::HighestShred(slot, slot_meta.received)]
} else {
let reqs = blocktree.find_missing_data_indexes(
slot,
@ -322,7 +322,7 @@ impl RepairService {
// Safe to set into gossip because by this time, the leader schedule cache should
// also be updated with the latest root (done in blocktree_processor) and thus
// will provide a schedule to window_service for any incoming blobs up to the
// will provide a schedule to window_service for any incoming shreds up to the
// last_confirmed_epoch.
cluster_info
.write()
@ -414,7 +414,7 @@ mod test {
blocktree.insert_shreds(shreds, None, false).unwrap();
assert_eq!(
RepairService::generate_repairs(&blocktree, 0, 2).unwrap(),
vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(2)]
vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)]
);
}
@ -429,14 +429,14 @@ mod test {
let (shreds, _) = make_slot_entries(2, 0, 1);
// Write this blob to slot 2, should chain to slot 0, which we haven't received
// any blobs for
// Write this shred to slot 2, should chain to slot 0, which we haven't received
// any shreds for
blocktree.insert_shreds(shreds, None, false).unwrap();
// Check that repair tries to patch the empty slot
assert_eq!(
RepairService::generate_repairs(&blocktree, 0, 2).unwrap(),
vec![RepairType::HighestBlob(0, 0)]
vec![RepairType::HighestShred(0, 0)]
);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
@ -451,12 +451,12 @@ mod test {
let nth = 3;
let num_slots = 2;
// Create some blobs
// Create some shreds
let (mut shreds, _) = make_many_slot_entries(0, num_slots as u64, 150 as u64);
let num_shreds = shreds.len() as u64;
let num_shreds_per_slot = num_shreds / num_slots;
// write every nth blob
// write every nth shred
let mut shreds_to_write = vec![];
let mut missing_indexes_per_slot = vec![];
for i in (0..num_shreds).rev() {
@ -476,7 +476,7 @@ mod test {
.flat_map(|slot| {
missing_indexes_per_slot
.iter()
.map(move |blob_index| RepairType::Shred(slot as u64, *blob_index))
.map(move |shred_index| RepairType::Shred(slot as u64, *shred_index))
})
.collect();
@ -501,7 +501,7 @@ mod test {
let num_entries_per_slot = 100;
// Create some blobs
// Create some shreds
let (mut shreds, _) = make_slot_entries(0, 0, num_entries_per_slot as u64);
let num_shreds_per_slot = shreds.len() as u64;
@ -510,9 +510,9 @@ mod test {
blocktree.insert_shreds(shreds, None, false).unwrap();
// We didn't get the last blob for this slot, so ask for the highest blob for that slot
// We didn't get the last shred for this slot, so ask for the highest shred for that slot
let expected: Vec<RepairType> =
vec![RepairType::HighestBlob(0, num_shreds_per_slot - 1)];
vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)];
assert_eq!(
RepairService::generate_repairs(&blocktree, 0, std::usize::MAX).unwrap(),
@ -551,7 +551,7 @@ mod test {
if slots.contains(&(slot_index as u64)) {
RepairType::Shred(slot_index as u64, 0)
} else {
RepairType::HighestBlob(slot_index as u64, 0)
RepairType::HighestShred(slot_index as u64, 0)
}
})
.collect();
@ -582,7 +582,7 @@ mod test {
let num_slots = 1;
let start = 5;
// Create some blobs in slots 0..num_slots
// Create some shreds in slots 0..num_slots
for i in start..start + num_slots {
let parent = if i > 0 { i - 1 } else { 0 };
let (shreds, _) = make_slot_entries(i, parent, num_entries_per_slot as u64);
@ -592,9 +592,9 @@ mod test {
let end = 4;
let expected: Vec<RepairType> = vec![
RepairType::HighestBlob(end - 2, 0),
RepairType::HighestBlob(end - 1, 0),
RepairType::HighestBlob(end, 0),
RepairType::HighestShred(end - 2, 0),
RepairType::HighestShred(end - 1, 0),
RepairType::HighestShred(end, 0),
];
let mut repair_slot_range = RepairSlotRange::default();
@ -630,7 +630,7 @@ mod test {
let fork2 = vec![8, 12];
let fork2_shreds = make_chaining_slot_entries(&fork2, num_entries_per_slot);
// Remove the last blob from each slot to make an incomplete slot
// Remove the last shred from each slot to make an incomplete slot
let fork2_incomplete_shreds: Vec<_> = fork2_shreds
.into_iter()
.flat_map(|(mut shreds, _)| {

View File

@ -511,7 +511,7 @@ impl ReplayStage {
let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect();
// Call leader schedule_cache.set_root() before blocktree.set_root() because
// bank_forks.root is consumed by repair_service to update gossip, so we don't want to
// get blobs for repair on gossip before we update leader schedule, otherwise they may
// get shreds for repair on gossip before we update leader schedule, otherwise they may
// get dropped.
leader_schedule_cache.set_root(rooted_banks.last().unwrap());
blocktree
@ -971,7 +971,7 @@ mod test {
let mut bank_forks = BankForks::new(0, bank0);
bank_forks.working_bank().freeze();
// Insert blob for slot 1, generate new forks, check result
// Insert shred for slot 1, generate new forks, check result
let (shreds, _) = make_slot_entries(1, 0, 8);
blocktree.insert_shreds(shreds, None, false).unwrap();
assert!(bank_forks.get(1).is_none());
@ -982,7 +982,7 @@ mod test {
);
assert!(bank_forks.get(1).is_some());
// Insert blob for slot 3, generate new forks, check result
// Insert shred for slot 3, generate new forks, check result
let (shreds, _) = make_slot_entries(2, 0, 8);
blocktree.insert_shreds(shreds, None, false).unwrap();
assert!(bank_forks.get(2).is_none());
@ -1208,7 +1208,7 @@ mod test {
);
}
// Given a blob and a fatal expected error, check that replaying that blob causes causes the fork to be
// Given a shred and a fatal expected error, check that replaying that shred causes causes the fork to be
// marked as dead. Returns the error for caller to verify.
fn check_dead_fork<F>(shred_to_insert: F) -> Result<()>
where

View File

@ -29,7 +29,6 @@ pub enum Error {
BlockError(block_error::BlockError),
BlocktreeError(blocktree::BlocktreeError),
FsExtra(fs_extra::error::Error),
ToBlobError,
SnapshotError(snapshot_utils::SnapshotError),
}

View File

@ -1,4 +1,4 @@
//! The `retransmit_stage` retransmits blobs between validators
//! The `retransmit_stage` retransmits shreds between validators
use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
@ -143,11 +143,11 @@ fn retransmit(
/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
/// See `cluster_info` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `sockets` - Sockets to read from.
/// * `bank_forks` - The BankForks structure
/// * `leader_schedule_cache` - The leader schedule to verify shreds
/// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
/// * `r` - Receive channel for shreds to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sockets: Arc<Vec<UdpSocket>>,
bank_forks: Arc<RwLock<BankForks>>,

View File

@ -269,7 +269,7 @@ impl Validator {
assert_eq!(
blocktree.new_shreds_signals.len(),
1,
"New blob signal for the TVU should be the same as the clear bank signal."
"New shred signal for the TVU should be the same as the clear bank signal."
);
let ip_echo_server = solana_net_utils::ip_echo_server(node.sockets.ip_echo.unwrap());

View File

@ -1,4 +1,4 @@
//! `window_service` handles the data plane incoming blobs, storing them in
//! `window_service` handles the data plane incoming shreds, storing them in
//! blocktree and retransmitting where required
//!
use crate::cluster_info::ClusterInfo;
@ -34,8 +34,8 @@ fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
}
}
/// drop blobs that are from myself or not from the correct leader for the
/// blob's slot
/// drop shreds that are from myself or not from the correct leader for the
/// shred's slot
pub fn should_retransmit_and_persist(
shred: &Shred,
bank: Option<Arc<Bank>>,
@ -347,7 +347,7 @@ mod test {
let mut shreds = local_entries_to_shred(&[Entry::default()], 0, 0, &leader_keypair);
// with a Bank for slot 0, blob continues
// with a Bank for slot 0, shred continues
assert_eq!(
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0,),
true
@ -371,14 +371,14 @@ mod test {
false
);
// with a Bank and no idea who leader is, blob gets thrown out
// with a Bank and no idea who leader is, shred gets thrown out
shreds[0].set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3);
assert_eq!(
should_retransmit_and_persist(&shreds[0], Some(bank.clone()), &cache, &me_id, 0),
false
);
// with a shred where shred.slot() == root, blob gets thrown out
// with a shred where shred.slot() == root, shred gets thrown out
let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds = local_entries_to_shred(&[Entry::default()], slot, slot - 1, &leader_keypair);
assert_eq!(
@ -386,7 +386,7 @@ mod test {
false
);
// with a shred where shred.parent() < root, blob gets thrown out
// with a shred where shred.parent() < root, shred gets thrown out
let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds =
local_entries_to_shred(&[Entry::default()], slot + 1, slot - 1, &leader_keypair);
@ -395,7 +395,7 @@ mod test {
false
);
// if the blob came back from me, it doesn't continue, whether or not I have a bank
// if the shred came back from me, it doesn't continue, whether or not I have a bank
assert_eq!(
should_retransmit_and_persist(&shreds[0], None, &cache, &me_id, 0),
false