Revert "remove window code from most places" (#2417)
* Revert "Fix link to book in Local Testnet section (#2416)" This reverts commit710c0c9980. * Revert "Add current leader information to dashboard (#2413)" This reverts commitf0300c1711. * Revert "remove window code from most places (#2389)" This reverts commite3c0bd5a3f.
This commit is contained in:
		@@ -7,11 +7,12 @@ use crate::db_ledger::DbLedger;
 | 
			
		||||
use crate::entry::Entry;
 | 
			
		||||
use crate::entry::EntrySlice;
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
use crate::erasure::CodingGenerator;
 | 
			
		||||
use crate::erasure;
 | 
			
		||||
use crate::leader_scheduler::LeaderScheduler;
 | 
			
		||||
use crate::packet::index_blobs;
 | 
			
		||||
use crate::packet::{index_blobs, SharedBlob};
 | 
			
		||||
use crate::result::{Error, Result};
 | 
			
		||||
use crate::service::Service;
 | 
			
		||||
use crate::window::{SharedWindow, WindowIndex, WindowUtil};
 | 
			
		||||
use log::Level;
 | 
			
		||||
use rayon::prelude::*;
 | 
			
		||||
use solana_metrics::{influxdb, submit};
 | 
			
		||||
@@ -31,110 +32,160 @@ pub enum BroadcastServiceReturnType {
 | 
			
		||||
    ExitSignal,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct Broadcast {
 | 
			
		||||
    id: Pubkey,
 | 
			
		||||
#[allow(clippy::too_many_arguments)]
 | 
			
		||||
fn broadcast(
 | 
			
		||||
    db_ledger: &Arc<DbLedger>,
 | 
			
		||||
    max_tick_height: Option<u64>,
 | 
			
		||||
    blob_index: u64,
 | 
			
		||||
    leader_id: Pubkey,
 | 
			
		||||
    node_info: &NodeInfo,
 | 
			
		||||
    broadcast_table: &[NodeInfo],
 | 
			
		||||
    window: &SharedWindow,
 | 
			
		||||
    receiver: &Receiver<Vec<Entry>>,
 | 
			
		||||
    sock: &UdpSocket,
 | 
			
		||||
    transmit_index: &mut WindowIndex,
 | 
			
		||||
    receive_index: &mut u64,
 | 
			
		||||
    leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
 | 
			
		||||
) -> Result<()> {
 | 
			
		||||
    let id = node_info.id;
 | 
			
		||||
    let timer = Duration::new(1, 0);
 | 
			
		||||
    let entries = receiver.recv_timeout(timer)?;
 | 
			
		||||
    let now = Instant::now();
 | 
			
		||||
    let mut num_entries = entries.len();
 | 
			
		||||
    let mut ventries = Vec::new();
 | 
			
		||||
    ventries.push(entries);
 | 
			
		||||
 | 
			
		||||
    #[cfg(feature = "erasure")]
 | 
			
		||||
    coding_generator: CodingGenerator,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Broadcast {
 | 
			
		||||
    fn run(
 | 
			
		||||
        &mut self,
 | 
			
		||||
        broadcast_table: &[NodeInfo],
 | 
			
		||||
        receiver: &Receiver<Vec<Entry>>,
 | 
			
		||||
        sock: &UdpSocket,
 | 
			
		||||
        leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
 | 
			
		||||
        db_ledger: &Arc<DbLedger>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let timer = Duration::new(1, 0);
 | 
			
		||||
        let entries = receiver.recv_timeout(timer)?;
 | 
			
		||||
        let now = Instant::now();
 | 
			
		||||
        let mut num_entries = entries.len();
 | 
			
		||||
        let mut ventries = Vec::new();
 | 
			
		||||
    let mut contains_last_tick = false;
 | 
			
		||||
    while let Ok(entries) = receiver.try_recv() {
 | 
			
		||||
        num_entries += entries.len();
 | 
			
		||||
        ventries.push(entries);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        while let Ok(entries) = receiver.try_recv() {
 | 
			
		||||
            num_entries += entries.len();
 | 
			
		||||
            ventries.push(entries);
 | 
			
		||||
        }
 | 
			
		||||
    if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) {
 | 
			
		||||
        contains_last_tick |= Some(last.tick_height) == max_tick_height;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        let last_tick = match self.max_tick_height {
 | 
			
		||||
            Some(max_tick_height) => {
 | 
			
		||||
                if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) {
 | 
			
		||||
                    last.tick_height == max_tick_height
 | 
			
		||||
                } else {
 | 
			
		||||
                    false
 | 
			
		||||
    inc_new_counter_info!("broadcast_service-entries_received", num_entries);
 | 
			
		||||
 | 
			
		||||
    let to_blobs_start = Instant::now();
 | 
			
		||||
 | 
			
		||||
    // Generate the slot heights for all the entries inside ventries
 | 
			
		||||
    let slot_heights = generate_slots(&ventries, leader_scheduler);
 | 
			
		||||
 | 
			
		||||
    let blobs: Vec<_> = ventries
 | 
			
		||||
        .into_par_iter()
 | 
			
		||||
        .flat_map(|p| p.to_shared_blobs())
 | 
			
		||||
        .collect();
 | 
			
		||||
 | 
			
		||||
    let blobs_slot_heights: Vec<(SharedBlob, u64)> = blobs.into_iter().zip(slot_heights).collect();
 | 
			
		||||
 | 
			
		||||
    let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
 | 
			
		||||
 | 
			
		||||
    let blobs_chunking = Instant::now();
 | 
			
		||||
    // We could receive more blobs than window slots so
 | 
			
		||||
    // break them up into window-sized chunks to process
 | 
			
		||||
    let window_size = window.read().unwrap().window_size();
 | 
			
		||||
    let blobs_chunked = blobs_slot_heights
 | 
			
		||||
        .chunks(window_size as usize)
 | 
			
		||||
        .map(|x| x.to_vec());
 | 
			
		||||
    let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed());
 | 
			
		||||
 | 
			
		||||
    let broadcast_start = Instant::now();
 | 
			
		||||
    for blobs in blobs_chunked {
 | 
			
		||||
        let blobs_len = blobs.len();
 | 
			
		||||
        trace!("{}: broadcast blobs.len: {}", id, blobs_len);
 | 
			
		||||
 | 
			
		||||
        index_blobs(blobs.iter(), &node_info.id, *receive_index);
 | 
			
		||||
 | 
			
		||||
        // keep the cache of blobs that are broadcast
 | 
			
		||||
        inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
 | 
			
		||||
        {
 | 
			
		||||
            let mut win = window.write().unwrap();
 | 
			
		||||
            assert!(blobs.len() <= win.len());
 | 
			
		||||
            let blobs: Vec<_> = blobs.into_iter().map(|(b, _)| b).collect();
 | 
			
		||||
            for b in &blobs {
 | 
			
		||||
                let ix = b.read().unwrap().index().expect("blob index");
 | 
			
		||||
                let pos = (ix % window_size) as usize;
 | 
			
		||||
                if let Some(x) = win[pos].data.take() {
 | 
			
		||||
                    trace!(
 | 
			
		||||
                        "{} popped {} at {}",
 | 
			
		||||
                        id,
 | 
			
		||||
                        x.read().unwrap().index().unwrap(),
 | 
			
		||||
                        pos
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
                if let Some(x) = win[pos].coding.take() {
 | 
			
		||||
                    trace!(
 | 
			
		||||
                        "{} popped {} at {}",
 | 
			
		||||
                        id,
 | 
			
		||||
                        x.read().unwrap().index().unwrap(),
 | 
			
		||||
                        pos
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                trace!("{} null {}", id, pos);
 | 
			
		||||
            }
 | 
			
		||||
            for b in &blobs {
 | 
			
		||||
                {
 | 
			
		||||
                    let ix = b.read().unwrap().index().expect("blob index");
 | 
			
		||||
                    let pos = (ix % window_size) as usize;
 | 
			
		||||
                    trace!("{} caching {} at {}", id, ix, pos);
 | 
			
		||||
                    assert!(win[pos].data.is_none());
 | 
			
		||||
                    win[pos].data = Some(b.clone());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            None => false,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        inc_new_counter_info!("broadcast_service-entries_received", num_entries);
 | 
			
		||||
 | 
			
		||||
        let to_blobs_start = Instant::now();
 | 
			
		||||
 | 
			
		||||
        // Generate the slot heights for all the entries inside ventries
 | 
			
		||||
        //  this may span slots if this leader broadcasts for consecutive slots...
 | 
			
		||||
        let slots = generate_slots(&ventries, leader_scheduler);
 | 
			
		||||
 | 
			
		||||
        let blobs: Vec<_> = ventries
 | 
			
		||||
            .into_par_iter()
 | 
			
		||||
            .flat_map(|p| p.to_shared_blobs())
 | 
			
		||||
            .collect();
 | 
			
		||||
 | 
			
		||||
        // TODO: blob_index should be slot-relative...
 | 
			
		||||
        index_blobs(&blobs, &self.id, self.blob_index, &slots);
 | 
			
		||||
 | 
			
		||||
        let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
 | 
			
		||||
 | 
			
		||||
        let broadcast_start = Instant::now();
 | 
			
		||||
 | 
			
		||||
        inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
 | 
			
		||||
 | 
			
		||||
        db_ledger
 | 
			
		||||
            .write_consecutive_blobs(&blobs)
 | 
			
		||||
            .expect("Unrecoverable failure to write to database");
 | 
			
		||||
 | 
			
		||||
        // don't count coding blobs in the blob indexes
 | 
			
		||||
        self.blob_index += blobs.len() as u64;
 | 
			
		||||
 | 
			
		||||
        // Send out data
 | 
			
		||||
        ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?;
 | 
			
		||||
            db_ledger
 | 
			
		||||
                .write_consecutive_blobs(&blobs)
 | 
			
		||||
                .expect("Unrecoverable failure to write to database");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Fill in the coding blob data from the window data blobs
 | 
			
		||||
        #[cfg(feature = "erasure")]
 | 
			
		||||
        {
 | 
			
		||||
            let coding = self.coding_generator.next(&blobs)?;
 | 
			
		||||
 | 
			
		||||
            // send out erasures
 | 
			
		||||
            ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
 | 
			
		||||
            erasure::generate_coding(
 | 
			
		||||
                &id,
 | 
			
		||||
                &mut window.write().unwrap(),
 | 
			
		||||
                *receive_index,
 | 
			
		||||
                blobs_len,
 | 
			
		||||
                &mut transmit_index.coding,
 | 
			
		||||
            )?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
 | 
			
		||||
        *receive_index += blobs_len as u64;
 | 
			
		||||
 | 
			
		||||
        inc_new_counter_info!(
 | 
			
		||||
            "broadcast_service-time_ms",
 | 
			
		||||
            duration_as_ms(&now.elapsed()) as usize
 | 
			
		||||
        );
 | 
			
		||||
        info!(
 | 
			
		||||
            "broadcast: {} entries, blob time {} broadcast time {}",
 | 
			
		||||
            num_entries, to_blobs_elapsed, broadcast_elapsed
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        submit(
 | 
			
		||||
            influxdb::Point::new("broadcast-service")
 | 
			
		||||
                .add_field(
 | 
			
		||||
                    "transmit-index",
 | 
			
		||||
                    influxdb::Value::Integer(self.blob_index as i64),
 | 
			
		||||
                )
 | 
			
		||||
                .to_owned(),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
        // Send blobs out from the window
 | 
			
		||||
        ClusterInfo::broadcast(
 | 
			
		||||
            contains_last_tick,
 | 
			
		||||
            leader_id,
 | 
			
		||||
            &node_info,
 | 
			
		||||
            &broadcast_table,
 | 
			
		||||
            &window,
 | 
			
		||||
            &sock,
 | 
			
		||||
            transmit_index,
 | 
			
		||||
            *receive_index,
 | 
			
		||||
        )?;
 | 
			
		||||
    }
 | 
			
		||||
    let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
 | 
			
		||||
 | 
			
		||||
    inc_new_counter_info!(
 | 
			
		||||
        "broadcast_service-time_ms",
 | 
			
		||||
        duration_as_ms(&now.elapsed()) as usize
 | 
			
		||||
    );
 | 
			
		||||
    info!(
 | 
			
		||||
        "broadcast: {} entries, blob time {} chunking time {} broadcast time {}",
 | 
			
		||||
        num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    submit(
 | 
			
		||||
        influxdb::Point::new("broadcast-service")
 | 
			
		||||
            .add_field(
 | 
			
		||||
                "transmit-index",
 | 
			
		||||
                influxdb::Value::Integer(transmit_index.data as i64),
 | 
			
		||||
            )
 | 
			
		||||
            .to_owned(),
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn generate_slots(
 | 
			
		||||
@@ -189,41 +240,46 @@ pub struct BroadcastService {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl BroadcastService {
 | 
			
		||||
    #[allow(clippy::too_many_arguments)]
 | 
			
		||||
    fn run(
 | 
			
		||||
        db_ledger: &Arc<DbLedger>,
 | 
			
		||||
        bank: &Arc<Bank>,
 | 
			
		||||
        sock: &UdpSocket,
 | 
			
		||||
        cluster_info: &Arc<RwLock<ClusterInfo>>,
 | 
			
		||||
        window: &SharedWindow,
 | 
			
		||||
        entry_height: u64,
 | 
			
		||||
        leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
 | 
			
		||||
        receiver: &Receiver<Vec<Entry>>,
 | 
			
		||||
        max_tick_height: Option<u64>,
 | 
			
		||||
        exit_signal: &Arc<AtomicBool>,
 | 
			
		||||
    ) -> BroadcastServiceReturnType {
 | 
			
		||||
        let me = cluster_info.read().unwrap().my_data().clone();
 | 
			
		||||
 | 
			
		||||
        let mut broadcast = Broadcast {
 | 
			
		||||
            id: me.id,
 | 
			
		||||
            max_tick_height,
 | 
			
		||||
            blob_index: entry_height,
 | 
			
		||||
            #[cfg(feature = "erasure")]
 | 
			
		||||
            coding_generator: CodingGenerator::new(),
 | 
			
		||||
        let mut transmit_index = WindowIndex {
 | 
			
		||||
            data: entry_height,
 | 
			
		||||
            coding: entry_height,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let mut receive_index = entry_height;
 | 
			
		||||
        let me = cluster_info.read().unwrap().my_data().clone();
 | 
			
		||||
        loop {
 | 
			
		||||
            if exit_signal.load(Ordering::Relaxed) {
 | 
			
		||||
                return BroadcastServiceReturnType::ExitSignal;
 | 
			
		||||
            }
 | 
			
		||||
            let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(&bank);
 | 
			
		||||
            // Layer 1, leader nodes are limited to the fanout size.
 | 
			
		||||
            // Layer 1 nodes are limited to the fanout size.
 | 
			
		||||
            broadcast_table.truncate(DATA_PLANE_FANOUT);
 | 
			
		||||
            inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
 | 
			
		||||
            if let Err(e) = broadcast.run(
 | 
			
		||||
                &broadcast_table,
 | 
			
		||||
                receiver,
 | 
			
		||||
                sock,
 | 
			
		||||
                leader_scheduler,
 | 
			
		||||
            let leader_id = cluster_info.read().unwrap().leader_id();
 | 
			
		||||
            if let Err(e) = broadcast(
 | 
			
		||||
                db_ledger,
 | 
			
		||||
                max_tick_height,
 | 
			
		||||
                leader_id,
 | 
			
		||||
                &me,
 | 
			
		||||
                &broadcast_table,
 | 
			
		||||
                &window,
 | 
			
		||||
                &receiver,
 | 
			
		||||
                &sock,
 | 
			
		||||
                &mut transmit_index,
 | 
			
		||||
                &mut receive_index,
 | 
			
		||||
                leader_scheduler,
 | 
			
		||||
            ) {
 | 
			
		||||
                match e {
 | 
			
		||||
                    Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
 | 
			
		||||
@@ -255,11 +311,13 @@ impl BroadcastService {
 | 
			
		||||
    /// WriteStage is the last stage in the pipeline), which will then close Broadcast service,
 | 
			
		||||
    /// which will then close FetchStage in the Tpu, and then the rest of the Tpu,
 | 
			
		||||
    /// completing the cycle.
 | 
			
		||||
    #[allow(clippy::too_many_arguments)]
 | 
			
		||||
    pub fn new(
 | 
			
		||||
        db_ledger: Arc<DbLedger>,
 | 
			
		||||
        bank: Arc<Bank>,
 | 
			
		||||
        sock: UdpSocket,
 | 
			
		||||
        cluster_info: Arc<RwLock<ClusterInfo>>,
 | 
			
		||||
        window: SharedWindow,
 | 
			
		||||
        entry_height: u64,
 | 
			
		||||
        leader_scheduler: Arc<RwLock<LeaderScheduler>>,
 | 
			
		||||
        receiver: Receiver<Vec<Entry>>,
 | 
			
		||||
@@ -276,6 +334,7 @@ impl BroadcastService {
 | 
			
		||||
                    &bank,
 | 
			
		||||
                    &sock,
 | 
			
		||||
                    &cluster_info,
 | 
			
		||||
                    &window,
 | 
			
		||||
                    entry_height,
 | 
			
		||||
                    &leader_scheduler,
 | 
			
		||||
                    &receiver,
 | 
			
		||||
@@ -305,6 +364,7 @@ mod test {
 | 
			
		||||
    use crate::db_ledger::DbLedger;
 | 
			
		||||
    use crate::entry::create_ticks;
 | 
			
		||||
    use crate::service::Service;
 | 
			
		||||
    use crate::window::new_window;
 | 
			
		||||
    use solana_sdk::hash::Hash;
 | 
			
		||||
    use solana_sdk::signature::{Keypair, KeypairUtil};
 | 
			
		||||
    use std::sync::atomic::AtomicBool;
 | 
			
		||||
@@ -341,6 +401,8 @@ mod test {
 | 
			
		||||
        cluster_info.insert_info(broadcast_buddy.info);
 | 
			
		||||
        let cluster_info = Arc::new(RwLock::new(cluster_info));
 | 
			
		||||
 | 
			
		||||
        let window = new_window(32 * 1024);
 | 
			
		||||
        let shared_window = Arc::new(RwLock::new(window));
 | 
			
		||||
        let exit_sender = Arc::new(AtomicBool::new(false));
 | 
			
		||||
        let bank = Arc::new(Bank::default());
 | 
			
		||||
 | 
			
		||||
@@ -350,6 +412,7 @@ mod test {
 | 
			
		||||
            bank.clone(),
 | 
			
		||||
            leader_info.sockets.broadcast,
 | 
			
		||||
            cluster_info,
 | 
			
		||||
            shared_window,
 | 
			
		||||
            entry_height,
 | 
			
		||||
            leader_scheduler,
 | 
			
		||||
            entry_receiver,
 | 
			
		||||
 
 | 
			
		||||
@@ -25,6 +25,7 @@ use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
 | 
			
		||||
use crate::result::Result;
 | 
			
		||||
use crate::rpc::RPC_PORT;
 | 
			
		||||
use crate::streamer::{BlobReceiver, BlobSender};
 | 
			
		||||
use crate::window::{SharedWindow, WindowIndex};
 | 
			
		||||
use bincode::{deserialize, serialize};
 | 
			
		||||
use hashbrown::HashMap;
 | 
			
		||||
use log::Level;
 | 
			
		||||
@@ -492,33 +493,58 @@ impl ClusterInfo {
 | 
			
		||||
 | 
			
		||||
    /// broadcast messages from the leader to layer 1 nodes
 | 
			
		||||
    /// # Remarks
 | 
			
		||||
    /// We need to avoid having obj locked while doing any io, such as the `send_to`
 | 
			
		||||
    pub fn broadcast(
 | 
			
		||||
        id: &Pubkey,
 | 
			
		||||
        contains_last_tick: bool,
 | 
			
		||||
        leader_id: Pubkey,
 | 
			
		||||
        me: &NodeInfo,
 | 
			
		||||
        broadcast_table: &[NodeInfo],
 | 
			
		||||
        window: &SharedWindow,
 | 
			
		||||
        s: &UdpSocket,
 | 
			
		||||
        blobs: &[SharedBlob],
 | 
			
		||||
        transmit_index: &mut WindowIndex,
 | 
			
		||||
        received_index: u64,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        if broadcast_table.is_empty() {
 | 
			
		||||
            debug!("{}:not enough peers in cluster_info table", id);
 | 
			
		||||
            debug!("{}:not enough peers in cluster_info table", me.id);
 | 
			
		||||
            inc_new_counter_info!("cluster_info-broadcast-not_enough_peers_error", 1);
 | 
			
		||||
            Err(ClusterInfoError::NoPeers)?;
 | 
			
		||||
        }
 | 
			
		||||
        trace!(
 | 
			
		||||
            "{} transmit_index: {:?} received_index: {} broadcast_len: {}",
 | 
			
		||||
            me.id,
 | 
			
		||||
            *transmit_index,
 | 
			
		||||
            received_index,
 | 
			
		||||
            broadcast_table.len()
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let orders = Self::create_broadcast_orders(contains_last_tick, blobs, broadcast_table);
 | 
			
		||||
        let old_transmit_index = transmit_index.data;
 | 
			
		||||
 | 
			
		||||
        let orders = Self::create_broadcast_orders(
 | 
			
		||||
            contains_last_tick,
 | 
			
		||||
            window,
 | 
			
		||||
            broadcast_table,
 | 
			
		||||
            transmit_index,
 | 
			
		||||
            received_index,
 | 
			
		||||
            me,
 | 
			
		||||
        );
 | 
			
		||||
        trace!("broadcast orders table {}", orders.len());
 | 
			
		||||
 | 
			
		||||
        let errs = Self::send_orders(id, s, orders);
 | 
			
		||||
        let errs = Self::send_orders(s, orders, me, leader_id);
 | 
			
		||||
 | 
			
		||||
        for e in errs {
 | 
			
		||||
            if let Err(e) = &e {
 | 
			
		||||
                trace!("{}: broadcast result {:?}", id, e);
 | 
			
		||||
                trace!("broadcast result {:?}", e);
 | 
			
		||||
            }
 | 
			
		||||
            e?;
 | 
			
		||||
            if transmit_index.data < received_index {
 | 
			
		||||
                transmit_index.data += 1;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        inc_new_counter_info!("cluster_info-broadcast-max_idx", blobs.len());
 | 
			
		||||
        inc_new_counter_info!(
 | 
			
		||||
            "cluster_info-broadcast-max_idx",
 | 
			
		||||
            (transmit_index.data - old_transmit_index) as usize
 | 
			
		||||
        );
 | 
			
		||||
        transmit_index.coding = transmit_index.data;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
@@ -577,15 +603,19 @@ impl ClusterInfo {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn send_orders(
 | 
			
		||||
        id: &Pubkey,
 | 
			
		||||
        s: &UdpSocket,
 | 
			
		||||
        orders: Vec<(SharedBlob, Vec<&NodeInfo>)>,
 | 
			
		||||
        orders: Vec<(Option<SharedBlob>, Vec<&NodeInfo>)>,
 | 
			
		||||
        me: &NodeInfo,
 | 
			
		||||
        leader_id: Pubkey,
 | 
			
		||||
    ) -> Vec<io::Result<usize>> {
 | 
			
		||||
        orders
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .flat_map(|(b, vs)| {
 | 
			
		||||
                let blob = b.read().unwrap();
 | 
			
		||||
 | 
			
		||||
                // only leader should be broadcasting
 | 
			
		||||
                assert!(vs.iter().find(|info| info.id == leader_id).is_none());
 | 
			
		||||
                let bl = b.unwrap();
 | 
			
		||||
                let blob = bl.read().unwrap();
 | 
			
		||||
                //TODO profile this, may need multiple sockets for par_iter
 | 
			
		||||
                let ids_and_tvus = if log_enabled!(Level::Trace) {
 | 
			
		||||
                    let v_ids = vs.iter().map(|v| v.id);
 | 
			
		||||
                    let tvus = vs.iter().map(|v| v.tvu);
 | 
			
		||||
@@ -593,7 +623,7 @@ impl ClusterInfo {
 | 
			
		||||
 | 
			
		||||
                    trace!(
 | 
			
		||||
                        "{}: BROADCAST idx: {} sz: {} to {:?} coding: {}",
 | 
			
		||||
                        id,
 | 
			
		||||
                        me.id,
 | 
			
		||||
                        blob.index().unwrap(),
 | 
			
		||||
                        blob.meta.size,
 | 
			
		||||
                        ids_and_tvus,
 | 
			
		||||
@@ -612,7 +642,7 @@ impl ClusterInfo {
 | 
			
		||||
                        let e = s.send_to(&blob.data[..blob.meta.size], &v.tvu);
 | 
			
		||||
                        trace!(
 | 
			
		||||
                            "{}: done broadcast {} to {:?}",
 | 
			
		||||
                            id,
 | 
			
		||||
                            me.id,
 | 
			
		||||
                            blob.meta.size,
 | 
			
		||||
                            ids_and_tvus
 | 
			
		||||
                        );
 | 
			
		||||
@@ -626,36 +656,70 @@ impl ClusterInfo {
 | 
			
		||||
 | 
			
		||||
    fn create_broadcast_orders<'a>(
 | 
			
		||||
        contains_last_tick: bool,
 | 
			
		||||
        blobs: &[SharedBlob],
 | 
			
		||||
        window: &SharedWindow,
 | 
			
		||||
        broadcast_table: &'a [NodeInfo],
 | 
			
		||||
    ) -> Vec<(SharedBlob, Vec<&'a NodeInfo>)> {
 | 
			
		||||
        transmit_index: &mut WindowIndex,
 | 
			
		||||
        received_index: u64,
 | 
			
		||||
        me: &NodeInfo,
 | 
			
		||||
    ) -> Vec<(Option<SharedBlob>, Vec<&'a NodeInfo>)> {
 | 
			
		||||
        // enumerate all the blobs in the window, those are the indices
 | 
			
		||||
        // transmit them to nodes, starting from a different node.
 | 
			
		||||
        if blobs.is_empty() {
 | 
			
		||||
            return vec![];
 | 
			
		||||
        }
 | 
			
		||||
        let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize);
 | 
			
		||||
        let window_l = window.read().unwrap();
 | 
			
		||||
        let mut br_idx = transmit_index.data as usize % broadcast_table.len();
 | 
			
		||||
 | 
			
		||||
        let mut orders = Vec::with_capacity(blobs.len());
 | 
			
		||||
        for idx in transmit_index.data..received_index {
 | 
			
		||||
            let w_idx = idx as usize % window_l.len();
 | 
			
		||||
 | 
			
		||||
        for (i, blob) in blobs.iter().enumerate() {
 | 
			
		||||
            let br_idx = i % broadcast_table.len();
 | 
			
		||||
            trace!(
 | 
			
		||||
                "{} broadcast order data w_idx {} br_idx {}",
 | 
			
		||||
                me.id,
 | 
			
		||||
                w_idx,
 | 
			
		||||
                br_idx
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            trace!("broadcast order data br_idx {}", br_idx);
 | 
			
		||||
 | 
			
		||||
            orders.push((blob.clone(), vec![&broadcast_table[br_idx]]));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if contains_last_tick {
 | 
			
		||||
            // Broadcast the last tick to everyone on the network so it doesn't get dropped
 | 
			
		||||
            // (Need to maximize probability the next leader in line sees this handoff tick
 | 
			
		||||
            // despite packet drops)
 | 
			
		||||
            // If we had a tick at max_tick_height, then we know it must be the last
 | 
			
		||||
            // Blob in the broadcast, There cannot be an entry that got sent after the
 | 
			
		||||
            // last tick, guaranteed by the PohService).
 | 
			
		||||
            let target = if idx == received_index - 1 && contains_last_tick {
 | 
			
		||||
                // If we see a tick at max_tick_height, then we know it must be the last
 | 
			
		||||
                // Blob in the window, at index == received_index. There cannot be an entry
 | 
			
		||||
                // that got sent after the last tick, guaranteed by the PohService).
 | 
			
		||||
                assert!(window_l[w_idx].data.is_some());
 | 
			
		||||
                (
 | 
			
		||||
                    window_l[w_idx].data.clone(),
 | 
			
		||||
                    broadcast_table.iter().collect(),
 | 
			
		||||
                )
 | 
			
		||||
            } else {
 | 
			
		||||
                (window_l[w_idx].data.clone(), vec![&broadcast_table[br_idx]])
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            orders.push(target);
 | 
			
		||||
            br_idx += 1;
 | 
			
		||||
            br_idx %= broadcast_table.len();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for idx in transmit_index.coding..received_index {
 | 
			
		||||
            let w_idx = idx as usize % window_l.len();
 | 
			
		||||
 | 
			
		||||
            // skip over empty slots
 | 
			
		||||
            if window_l[w_idx].coding.is_none() {
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            trace!(
 | 
			
		||||
                "{} broadcast order coding w_idx: {} br_idx  :{}",
 | 
			
		||||
                me.id,
 | 
			
		||||
                w_idx,
 | 
			
		||||
                br_idx,
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            orders.push((
 | 
			
		||||
                blobs.last().unwrap().clone(),
 | 
			
		||||
                broadcast_table.iter().collect(),
 | 
			
		||||
                window_l[w_idx].coding.clone(),
 | 
			
		||||
                vec![&broadcast_table[br_idx]],
 | 
			
		||||
            ));
 | 
			
		||||
            br_idx += 1;
 | 
			
		||||
            br_idx %= broadcast_table.len();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        orders
 | 
			
		||||
 
 | 
			
		||||
@@ -976,7 +976,11 @@ mod tests {
 | 
			
		||||
    fn test_read_blobs_bytes() {
 | 
			
		||||
        let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
 | 
			
		||||
        let slot = DEFAULT_SLOT_HEIGHT;
 | 
			
		||||
        index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, &[slot; 10]);
 | 
			
		||||
        index_blobs(
 | 
			
		||||
            shared_blobs.iter().zip(vec![slot; 10].into_iter()),
 | 
			
		||||
            &Keypair::new().pubkey(),
 | 
			
		||||
            0,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
 | 
			
		||||
        let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
 | 
			
		||||
 
 | 
			
		||||
@@ -562,10 +562,9 @@ mod test {
 | 
			
		||||
        let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();
 | 
			
		||||
 | 
			
		||||
        index_blobs(
 | 
			
		||||
            &shared_blobs,
 | 
			
		||||
            shared_blobs.iter().zip(vec![slot; num_entries].into_iter()),
 | 
			
		||||
            &Keypair::new().pubkey(),
 | 
			
		||||
            0,
 | 
			
		||||
            &vec![slot; num_entries],
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
 | 
			
		||||
@@ -658,10 +657,11 @@ mod test {
 | 
			
		||||
        let shared_blobs = original_entries.clone().to_shared_blobs();
 | 
			
		||||
 | 
			
		||||
        index_blobs(
 | 
			
		||||
            &shared_blobs,
 | 
			
		||||
            shared_blobs
 | 
			
		||||
                .iter()
 | 
			
		||||
                .zip(vec![DEFAULT_SLOT_HEIGHT; num_entries].into_iter()),
 | 
			
		||||
            &Keypair::new().pubkey(),
 | 
			
		||||
            0,
 | 
			
		||||
            &vec![DEFAULT_SLOT_HEIGHT; num_entries],
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let mut consume_queue = vec![];
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										569
									
								
								src/erasure.rs
									
									
									
									
									
								
							
							
						
						
									
										569
									
								
								src/erasure.rs
									
									
									
									
									
								
							@@ -2,6 +2,8 @@
 | 
			
		||||
use crate::db_ledger::DbLedger;
 | 
			
		||||
use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE};
 | 
			
		||||
use crate::result::{Error, Result};
 | 
			
		||||
use crate::window::WindowSlot;
 | 
			
		||||
use solana_sdk::pubkey::Pubkey;
 | 
			
		||||
use std::cmp;
 | 
			
		||||
use std::sync::{Arc, RwLock};
 | 
			
		||||
 | 
			
		||||
@@ -175,79 +177,6 @@ pub fn decode_blocks(
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn decode_blobs(
 | 
			
		||||
    blobs: &[SharedBlob],
 | 
			
		||||
    erasures: &[i32],
 | 
			
		||||
    size: usize,
 | 
			
		||||
    block_start_idx: u64,
 | 
			
		||||
    slot: u64,
 | 
			
		||||
) -> Result<bool> {
 | 
			
		||||
    let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING);
 | 
			
		||||
    let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
 | 
			
		||||
    let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
 | 
			
		||||
 | 
			
		||||
    assert!(blobs.len() == NUM_DATA + NUM_CODING);
 | 
			
		||||
    for b in blobs {
 | 
			
		||||
        locks.push(b.write().unwrap());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    for (i, l) in locks.iter_mut().enumerate() {
 | 
			
		||||
        if i < NUM_DATA {
 | 
			
		||||
            data_ptrs.push(&mut l.data[..size]);
 | 
			
		||||
        } else {
 | 
			
		||||
            coding_ptrs.push(&mut l.data_mut()[..size]);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Decode the blocks
 | 
			
		||||
    decode_blocks(
 | 
			
		||||
        data_ptrs.as_mut_slice(),
 | 
			
		||||
        coding_ptrs.as_mut_slice(),
 | 
			
		||||
        &erasures,
 | 
			
		||||
    )?;
 | 
			
		||||
 | 
			
		||||
    // Create the missing blobs from the reconstructed data
 | 
			
		||||
    let mut corrupt = false;
 | 
			
		||||
 | 
			
		||||
    for i in &erasures[..erasures.len() - 1] {
 | 
			
		||||
        let n = *i as usize;
 | 
			
		||||
        let mut idx = n as u64 + block_start_idx;
 | 
			
		||||
 | 
			
		||||
        let mut data_size;
 | 
			
		||||
        if n < NUM_DATA {
 | 
			
		||||
            data_size = locks[n].data_size().unwrap() as usize;
 | 
			
		||||
            data_size -= BLOB_HEADER_SIZE;
 | 
			
		||||
            if data_size > BLOB_DATA_SIZE {
 | 
			
		||||
                error!("corrupt data blob[{}] data_size: {}", idx, data_size);
 | 
			
		||||
                corrupt = true;
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            data_size = size;
 | 
			
		||||
            idx -= NUM_CODING as u64;
 | 
			
		||||
            locks[n].set_slot(slot).unwrap();
 | 
			
		||||
            locks[n].set_index(idx).unwrap();
 | 
			
		||||
 | 
			
		||||
            if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
 | 
			
		||||
                error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
 | 
			
		||||
                corrupt = true;
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        locks[n].set_size(data_size);
 | 
			
		||||
        trace!(
 | 
			
		||||
            "erasures[{}] ({}) size: {} data[0]: {}",
 | 
			
		||||
            *i,
 | 
			
		||||
            idx,
 | 
			
		||||
            data_size,
 | 
			
		||||
            locks[n].data()[0]
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Ok(corrupt)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Generate coding blocks in window starting from start_idx,
 | 
			
		||||
//   for num_blobs..  For each block place the coding blobs
 | 
			
		||||
//   at the end of the block like so:
 | 
			
		||||
@@ -285,80 +214,137 @@ fn decode_blobs(
 | 
			
		||||
//
 | 
			
		||||
//
 | 
			
		||||
//
 | 
			
		||||
pub struct CodingGenerator {
 | 
			
		||||
    leftover: Vec<SharedBlob>, // SharedBlobs that couldn't be used in last call to next()
 | 
			
		||||
}
 | 
			
		||||
pub fn generate_coding(
 | 
			
		||||
    id: &Pubkey,
 | 
			
		||||
    window: &mut [WindowSlot],
 | 
			
		||||
    receive_index: u64,
 | 
			
		||||
    num_blobs: usize,
 | 
			
		||||
    transmit_index_coding: &mut u64,
 | 
			
		||||
) -> Result<()> {
 | 
			
		||||
    // beginning of the coding blobs of the block that receive_index points into
 | 
			
		||||
    let coding_index_start =
 | 
			
		||||
        receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64;
 | 
			
		||||
 | 
			
		||||
impl CodingGenerator {
 | 
			
		||||
    pub fn new() -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            leftover: Vec::with_capacity(NUM_DATA),
 | 
			
		||||
    let start_idx = receive_index as usize % window.len();
 | 
			
		||||
    let mut block_start = start_idx - (start_idx % NUM_DATA);
 | 
			
		||||
 | 
			
		||||
    loop {
 | 
			
		||||
        let block_end = block_start + NUM_DATA;
 | 
			
		||||
        if block_end > (start_idx + num_blobs) {
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
        info!(
 | 
			
		||||
            "generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}",
 | 
			
		||||
            id, block_start, block_end, start_idx, num_blobs
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
    // must be called with consecutive data blobs from previous invocation
 | 
			
		||||
    pub fn next(&mut self, next_data: &[SharedBlob]) -> Result<Vec<SharedBlob>> {
 | 
			
		||||
        let mut next_coding =
 | 
			
		||||
            Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING);
 | 
			
		||||
        let mut max_data_size = 0;
 | 
			
		||||
 | 
			
		||||
        let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect();
 | 
			
		||||
        // find max_data_size, maybe bail if not all the data is here
 | 
			
		||||
        for i in block_start..block_end {
 | 
			
		||||
            let n = i % window.len();
 | 
			
		||||
            trace!("{} window[{}] = {:?}", id, n, window[n].data);
 | 
			
		||||
 | 
			
		||||
        for data_blobs in next_data.chunks(NUM_DATA) {
 | 
			
		||||
            if data_blobs.len() < NUM_DATA {
 | 
			
		||||
                self.leftover = data_blobs.to_vec();
 | 
			
		||||
                break;
 | 
			
		||||
            if let Some(b) = &window[n].data {
 | 
			
		||||
                max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size);
 | 
			
		||||
            } else {
 | 
			
		||||
                trace!("{} data block is null @ {}", id, n);
 | 
			
		||||
                return Ok(());
 | 
			
		||||
            }
 | 
			
		||||
            self.leftover.clear();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
            // find max_data_size for the chunk
 | 
			
		||||
            let max_data_size = align!(
 | 
			
		||||
                data_blobs
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)),
 | 
			
		||||
                JERASURE_ALIGN
 | 
			
		||||
            );
 | 
			
		||||
        // round up to the nearest jerasure alignment
 | 
			
		||||
        max_data_size = align!(max_data_size, JERASURE_ALIGN);
 | 
			
		||||
 | 
			
		||||
            let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
 | 
			
		||||
            let data_ptrs: Vec<_> = data_locks
 | 
			
		||||
                .iter()
 | 
			
		||||
                .map(|l| &l.data[..max_data_size])
 | 
			
		||||
                .collect();
 | 
			
		||||
        let mut data_blobs = Vec::with_capacity(NUM_DATA);
 | 
			
		||||
        for i in block_start..block_end {
 | 
			
		||||
            let n = i % window.len();
 | 
			
		||||
 | 
			
		||||
            let mut coding_blobs = Vec::with_capacity(NUM_CODING);
 | 
			
		||||
 | 
			
		||||
            for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] {
 | 
			
		||||
                let index = data_blob.index().unwrap();
 | 
			
		||||
                let slot = data_blob.slot().unwrap();
 | 
			
		||||
                let id = data_blob.id().unwrap();
 | 
			
		||||
 | 
			
		||||
                let coding_blob = SharedBlob::default();
 | 
			
		||||
                {
 | 
			
		||||
                    let mut coding_blob = coding_blob.write().unwrap();
 | 
			
		||||
                    coding_blob.set_index(index).unwrap();
 | 
			
		||||
                    coding_blob.set_slot(slot).unwrap();
 | 
			
		||||
                    coding_blob.set_id(&id).unwrap();
 | 
			
		||||
                    coding_blob.set_size(max_data_size);
 | 
			
		||||
                    coding_blob.set_coding().unwrap();
 | 
			
		||||
            if let Some(b) = &window[n].data {
 | 
			
		||||
                // make sure extra bytes in each blob are zero-d out for generation of
 | 
			
		||||
                //  coding blobs
 | 
			
		||||
                let mut b_wl = b.write().unwrap();
 | 
			
		||||
                for i in b_wl.meta.size..max_data_size {
 | 
			
		||||
                    b_wl.data[i] = 0;
 | 
			
		||||
                }
 | 
			
		||||
                coding_blobs.push(coding_blob);
 | 
			
		||||
                data_blobs.push(b);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            {
 | 
			
		||||
                let mut coding_locks: Vec<_> =
 | 
			
		||||
                    coding_blobs.iter().map(|b| b.write().unwrap()).collect();
 | 
			
		||||
 | 
			
		||||
                let mut coding_ptrs: Vec<_> = coding_locks
 | 
			
		||||
                    .iter_mut()
 | 
			
		||||
                    .map(|l| &mut l.data_mut()[..max_data_size])
 | 
			
		||||
                    .collect();
 | 
			
		||||
 | 
			
		||||
                generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
 | 
			
		||||
            }
 | 
			
		||||
            next_coding.append(&mut coding_blobs);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(next_coding)
 | 
			
		||||
        // getting ready to do erasure coding, means that we're potentially
 | 
			
		||||
        // going back in time, tell our caller we've inserted coding blocks
 | 
			
		||||
        // starting at coding_index_start
 | 
			
		||||
        *transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start);
 | 
			
		||||
 | 
			
		||||
        let mut coding_blobs = Vec::with_capacity(NUM_CODING);
 | 
			
		||||
        let coding_start = block_end - NUM_CODING;
 | 
			
		||||
        for i in coding_start..block_end {
 | 
			
		||||
            let n = i % window.len();
 | 
			
		||||
            assert!(window[n].coding.is_none());
 | 
			
		||||
 | 
			
		||||
            window[n].coding = Some(SharedBlob::default());
 | 
			
		||||
 | 
			
		||||
            let coding = window[n].coding.clone().unwrap();
 | 
			
		||||
            let mut coding_wl = coding.write().unwrap();
 | 
			
		||||
            for i in 0..max_data_size {
 | 
			
		||||
                coding_wl.data[i] = 0;
 | 
			
		||||
            }
 | 
			
		||||
            // copy index and id from the data blob
 | 
			
		||||
            if let Some(data) = &window[n].data {
 | 
			
		||||
                let data_rl = data.read().unwrap();
 | 
			
		||||
 | 
			
		||||
                let index = data_rl.index().unwrap();
 | 
			
		||||
                let slot = data_rl.slot().unwrap();
 | 
			
		||||
                let id = data_rl.id().unwrap();
 | 
			
		||||
 | 
			
		||||
                trace!(
 | 
			
		||||
                    "{} copying index {} id {:?} from data to coding",
 | 
			
		||||
                    id,
 | 
			
		||||
                    index,
 | 
			
		||||
                    id
 | 
			
		||||
                );
 | 
			
		||||
                coding_wl.set_index(index).unwrap();
 | 
			
		||||
                coding_wl.set_slot(slot).unwrap();
 | 
			
		||||
                coding_wl.set_id(&id).unwrap();
 | 
			
		||||
            }
 | 
			
		||||
            coding_wl.set_size(max_data_size);
 | 
			
		||||
            if coding_wl.set_coding().is_err() {
 | 
			
		||||
                return Err(Error::ErasureError(ErasureError::EncodeError));
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            coding_blobs.push(coding.clone());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
 | 
			
		||||
 | 
			
		||||
        let data_ptrs: Vec<_> = data_locks
 | 
			
		||||
            .iter()
 | 
			
		||||
            .enumerate()
 | 
			
		||||
            .map(|(i, l)| {
 | 
			
		||||
                trace!("{} i: {} data: {}", id, i, l.data[0]);
 | 
			
		||||
                &l.data[..max_data_size]
 | 
			
		||||
            })
 | 
			
		||||
            .collect();
 | 
			
		||||
 | 
			
		||||
        let mut coding_locks: Vec<_> = coding_blobs.iter().map(|b| b.write().unwrap()).collect();
 | 
			
		||||
 | 
			
		||||
        let mut coding_ptrs: Vec<_> = coding_locks
 | 
			
		||||
            .iter_mut()
 | 
			
		||||
            .enumerate()
 | 
			
		||||
            .map(|(i, l)| {
 | 
			
		||||
                trace!("{} i: {} coding: {}", id, i, l.data[0],);
 | 
			
		||||
                &mut l.data_mut()[..max_data_size]
 | 
			
		||||
            })
 | 
			
		||||
            .collect();
 | 
			
		||||
 | 
			
		||||
        generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
 | 
			
		||||
        debug!(
 | 
			
		||||
            "{} start_idx: {} data: {}:{} coding: {}:{}",
 | 
			
		||||
            id, start_idx, block_start, block_end, coding_start, block_end
 | 
			
		||||
        );
 | 
			
		||||
        block_start = block_end;
 | 
			
		||||
    }
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Recover the missing data and coding blobs from the input ledger. Returns a vector
 | 
			
		||||
@@ -415,6 +401,7 @@ pub fn recover(
 | 
			
		||||
 | 
			
		||||
    let mut missing_data: Vec<SharedBlob> = vec![];
 | 
			
		||||
    let mut missing_coding: Vec<SharedBlob> = vec![];
 | 
			
		||||
    let mut size = None;
 | 
			
		||||
 | 
			
		||||
    // Add the data blobs we have into the recovery vector, mark the missing ones
 | 
			
		||||
    for i in block_start_idx..block_end_idx {
 | 
			
		||||
@@ -429,7 +416,6 @@ pub fn recover(
 | 
			
		||||
        )?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let mut size = None;
 | 
			
		||||
    // Add the coding blobs we have into the recovery vector, mark the missing ones
 | 
			
		||||
    for i in coding_start_idx..block_end_idx {
 | 
			
		||||
        let result = db_ledger.get_coding_blob_bytes(slot, i)?;
 | 
			
		||||
@@ -448,17 +434,78 @@ pub fn recover(
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    // Due to checks above verifying that (data_missing + coding_missing) <= NUM_CODING and
 | 
			
		||||
    //  data_missing > 0, we know at least one coding block must exist, so "size" can
 | 
			
		||||
    //  not remain None after the above processing.
 | 
			
		||||
    let size = size.unwrap();
 | 
			
		||||
 | 
			
		||||
    // Due to check (data_missing + coding_missing) > NUM_CODING from earlier in this function,
 | 
			
		||||
    // we know at least one coding block must exist, so "size" will not remain None after the
 | 
			
		||||
    // below processing.
 | 
			
		||||
    let size = size.unwrap();
 | 
			
		||||
    // marks end of erasures
 | 
			
		||||
    erasures.push(-1);
 | 
			
		||||
 | 
			
		||||
    trace!("erasures[]:{:?} data_size: {}", erasures, size,);
 | 
			
		||||
 | 
			
		||||
    let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx, slot)?;
 | 
			
		||||
    let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING);
 | 
			
		||||
    {
 | 
			
		||||
        let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
 | 
			
		||||
        let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
 | 
			
		||||
 | 
			
		||||
        for b in &blobs {
 | 
			
		||||
            locks.push(b.write().unwrap());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (i, l) in locks.iter_mut().enumerate() {
 | 
			
		||||
            if i < NUM_DATA {
 | 
			
		||||
                data_ptrs.push(&mut l.data[..size]);
 | 
			
		||||
            } else {
 | 
			
		||||
                coding_ptrs.push(&mut l.data_mut()[..size]);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Decode the blocks
 | 
			
		||||
        decode_blocks(
 | 
			
		||||
            data_ptrs.as_mut_slice(),
 | 
			
		||||
            coding_ptrs.as_mut_slice(),
 | 
			
		||||
            &erasures,
 | 
			
		||||
        )?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Create the missing blobs from the reconstructed data
 | 
			
		||||
    let mut corrupt = false;
 | 
			
		||||
 | 
			
		||||
    for i in &erasures[..erasures.len() - 1] {
 | 
			
		||||
        let n = *i as usize;
 | 
			
		||||
        let mut idx = n as u64 + block_start_idx;
 | 
			
		||||
 | 
			
		||||
        let mut data_size;
 | 
			
		||||
        if n < NUM_DATA {
 | 
			
		||||
            data_size = locks[n].data_size().unwrap() as usize;
 | 
			
		||||
            data_size -= BLOB_HEADER_SIZE;
 | 
			
		||||
            if data_size > BLOB_DATA_SIZE {
 | 
			
		||||
                error!("corrupt data blob[{}] data_size: {}", idx, data_size);
 | 
			
		||||
                corrupt = true;
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            data_size = size;
 | 
			
		||||
            idx -= NUM_CODING as u64;
 | 
			
		||||
            locks[n].set_slot(slot).unwrap();
 | 
			
		||||
            locks[n].set_index(idx).unwrap();
 | 
			
		||||
 | 
			
		||||
            if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
 | 
			
		||||
                error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
 | 
			
		||||
                corrupt = true;
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        locks[n].set_size(data_size);
 | 
			
		||||
        trace!(
 | 
			
		||||
            "erasures[{}] ({}) size: {} data[0]: {}",
 | 
			
		||||
            *i,
 | 
			
		||||
            idx,
 | 
			
		||||
            data_size,
 | 
			
		||||
            locks[n].data()[0]
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if corrupt {
 | 
			
		||||
        // Remove the corrupted coding blobs so there's no effort wasted in trying to
 | 
			
		||||
@@ -513,7 +560,7 @@ pub mod test {
 | 
			
		||||
    use std::sync::Arc;
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_coding() {
 | 
			
		||||
    pub fn test_coding() {
 | 
			
		||||
        let zero_vec = vec![0; 16];
 | 
			
		||||
        let mut vs: Vec<Vec<u8>> = (0..4).map(|i| (i..(16 + i)).collect()).collect();
 | 
			
		||||
        let v_orig: Vec<u8> = vs[0].clone();
 | 
			
		||||
@@ -561,75 +608,6 @@ pub mod test {
 | 
			
		||||
        assert_eq!(v_orig, vs[0]);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_erasure_generate_coding() {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
 | 
			
		||||
        // trivial case
 | 
			
		||||
        let mut coding_generator = CodingGenerator::new();
 | 
			
		||||
        let blobs = Vec::new();
 | 
			
		||||
        for _ in 0..NUM_DATA * 2 {
 | 
			
		||||
            let coding = coding_generator.next(&blobs).unwrap();
 | 
			
		||||
            assert_eq!(coding.len(), 0);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // test coding by iterating one blob at a time
 | 
			
		||||
        let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
 | 
			
		||||
 | 
			
		||||
        for (i, blob) in data_blobs.iter().cloned().enumerate() {
 | 
			
		||||
            let coding = coding_generator.next(&[blob]).unwrap();
 | 
			
		||||
 | 
			
		||||
            if !coding.is_empty() {
 | 
			
		||||
                assert_eq!(i % NUM_DATA, NUM_DATA - 1);
 | 
			
		||||
                assert_eq!(coding.len(), NUM_CODING);
 | 
			
		||||
 | 
			
		||||
                let size = coding[0].read().unwrap().size().unwrap();
 | 
			
		||||
 | 
			
		||||
                // toss one data and one coding
 | 
			
		||||
                let erasures: Vec<i32> = vec![0, NUM_DATA as i32, -1];
 | 
			
		||||
 | 
			
		||||
                let block_start_idx = i - (i % NUM_DATA);
 | 
			
		||||
                let mut blobs: Vec<SharedBlob> = Vec::with_capacity(NUM_DATA + NUM_CODING);
 | 
			
		||||
 | 
			
		||||
                blobs.push(SharedBlob::default()); // empty data, erasure at zero
 | 
			
		||||
                for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] {
 | 
			
		||||
                    // skip first blob
 | 
			
		||||
                    blobs.push(blob.clone());
 | 
			
		||||
                }
 | 
			
		||||
                blobs.push(SharedBlob::default()); // empty coding, erasure at NUM_DATA
 | 
			
		||||
                for blob in &coding[1..NUM_CODING] {
 | 
			
		||||
                    blobs.push(blob.clone());
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                let corrupt =
 | 
			
		||||
                    decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap();
 | 
			
		||||
 | 
			
		||||
                assert!(!corrupt);
 | 
			
		||||
 | 
			
		||||
                assert_eq!(
 | 
			
		||||
                    blobs[1].read().unwrap().meta,
 | 
			
		||||
                    data_blobs[block_start_idx + 1].read().unwrap().meta
 | 
			
		||||
                );
 | 
			
		||||
                assert_eq!(
 | 
			
		||||
                    blobs[1].read().unwrap().data(),
 | 
			
		||||
                    data_blobs[block_start_idx + 1].read().unwrap().data()
 | 
			
		||||
                );
 | 
			
		||||
                assert_eq!(
 | 
			
		||||
                    blobs[0].read().unwrap().meta,
 | 
			
		||||
                    data_blobs[block_start_idx].read().unwrap().meta
 | 
			
		||||
                );
 | 
			
		||||
                assert_eq!(
 | 
			
		||||
                    blobs[0].read().unwrap().data(),
 | 
			
		||||
                    data_blobs[block_start_idx].read().unwrap().data()
 | 
			
		||||
                );
 | 
			
		||||
                assert_eq!(
 | 
			
		||||
                    blobs[NUM_DATA].read().unwrap().data(),
 | 
			
		||||
                    coding[0].read().unwrap().data()
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // TODO: Temprorary function used in tests to generate a database ledger
 | 
			
		||||
    // from the window (which is used to generate the erasure coding)
 | 
			
		||||
    // until we also transition generate_coding() and BroadcastStage to use DbLedger
 | 
			
		||||
@@ -685,140 +663,6 @@ pub mod test {
 | 
			
		||||
        db_ledger
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn generate_coding(
 | 
			
		||||
        id: &Pubkey,
 | 
			
		||||
        window: &mut [WindowSlot],
 | 
			
		||||
        receive_index: u64,
 | 
			
		||||
        num_blobs: usize,
 | 
			
		||||
        transmit_index_coding: &mut u64,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        // beginning of the coding blobs of the block that receive_index points into
 | 
			
		||||
        let coding_index_start =
 | 
			
		||||
            receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64;
 | 
			
		||||
 | 
			
		||||
        let start_idx = receive_index as usize % window.len();
 | 
			
		||||
        let mut block_start = start_idx - (start_idx % NUM_DATA);
 | 
			
		||||
 | 
			
		||||
        loop {
 | 
			
		||||
            let block_end = block_start + NUM_DATA;
 | 
			
		||||
            if block_end > (start_idx + num_blobs) {
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
            info!(
 | 
			
		||||
                "generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}",
 | 
			
		||||
                id, block_start, block_end, start_idx, num_blobs
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            let mut max_data_size = 0;
 | 
			
		||||
 | 
			
		||||
            // find max_data_size, maybe bail if not all the data is here
 | 
			
		||||
            for i in block_start..block_end {
 | 
			
		||||
                let n = i % window.len();
 | 
			
		||||
                trace!("{} window[{}] = {:?}", id, n, window[n].data);
 | 
			
		||||
 | 
			
		||||
                if let Some(b) = &window[n].data {
 | 
			
		||||
                    max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size);
 | 
			
		||||
                } else {
 | 
			
		||||
                    trace!("{} data block is null @ {}", id, n);
 | 
			
		||||
                    return Ok(());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // round up to the nearest jerasure alignment
 | 
			
		||||
            max_data_size = align!(max_data_size, JERASURE_ALIGN);
 | 
			
		||||
 | 
			
		||||
            let mut data_blobs = Vec::with_capacity(NUM_DATA);
 | 
			
		||||
            for i in block_start..block_end {
 | 
			
		||||
                let n = i % window.len();
 | 
			
		||||
 | 
			
		||||
                if let Some(b) = &window[n].data {
 | 
			
		||||
                    // make sure extra bytes in each blob are zero-d out for generation of
 | 
			
		||||
                    //  coding blobs
 | 
			
		||||
                    let mut b_wl = b.write().unwrap();
 | 
			
		||||
                    for i in b_wl.meta.size..max_data_size {
 | 
			
		||||
                        b_wl.data[i] = 0;
 | 
			
		||||
                    }
 | 
			
		||||
                    data_blobs.push(b);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // getting ready to do erasure coding, means that we're potentially
 | 
			
		||||
            // going back in time, tell our caller we've inserted coding blocks
 | 
			
		||||
            // starting at coding_index_start
 | 
			
		||||
            *transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start);
 | 
			
		||||
 | 
			
		||||
            let mut coding_blobs = Vec::with_capacity(NUM_CODING);
 | 
			
		||||
            let coding_start = block_end - NUM_CODING;
 | 
			
		||||
            for i in coding_start..block_end {
 | 
			
		||||
                let n = i % window.len();
 | 
			
		||||
                assert!(window[n].coding.is_none());
 | 
			
		||||
 | 
			
		||||
                window[n].coding = Some(SharedBlob::default());
 | 
			
		||||
 | 
			
		||||
                let coding = window[n].coding.clone().unwrap();
 | 
			
		||||
                let mut coding_wl = coding.write().unwrap();
 | 
			
		||||
                for i in 0..max_data_size {
 | 
			
		||||
                    coding_wl.data[i] = 0;
 | 
			
		||||
                }
 | 
			
		||||
                // copy index and id from the data blob
 | 
			
		||||
                if let Some(data) = &window[n].data {
 | 
			
		||||
                    let data_rl = data.read().unwrap();
 | 
			
		||||
 | 
			
		||||
                    let index = data_rl.index().unwrap();
 | 
			
		||||
                    let slot = data_rl.slot().unwrap();
 | 
			
		||||
                    let id = data_rl.id().unwrap();
 | 
			
		||||
 | 
			
		||||
                    trace!(
 | 
			
		||||
                        "{} copying index {} id {:?} from data to coding",
 | 
			
		||||
                        id,
 | 
			
		||||
                        index,
 | 
			
		||||
                        id
 | 
			
		||||
                    );
 | 
			
		||||
                    coding_wl.set_index(index).unwrap();
 | 
			
		||||
                    coding_wl.set_slot(slot).unwrap();
 | 
			
		||||
                    coding_wl.set_id(&id).unwrap();
 | 
			
		||||
                }
 | 
			
		||||
                coding_wl.set_size(max_data_size);
 | 
			
		||||
                if coding_wl.set_coding().is_err() {
 | 
			
		||||
                    return Err(Error::ErasureError(ErasureError::EncodeError));
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                coding_blobs.push(coding.clone());
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
 | 
			
		||||
 | 
			
		||||
            let data_ptrs: Vec<_> = data_locks
 | 
			
		||||
                .iter()
 | 
			
		||||
                .enumerate()
 | 
			
		||||
                .map(|(i, l)| {
 | 
			
		||||
                    trace!("{} i: {} data: {}", id, i, l.data[0]);
 | 
			
		||||
                    &l.data[..max_data_size]
 | 
			
		||||
                })
 | 
			
		||||
                .collect();
 | 
			
		||||
 | 
			
		||||
            let mut coding_locks: Vec<_> =
 | 
			
		||||
                coding_blobs.iter().map(|b| b.write().unwrap()).collect();
 | 
			
		||||
 | 
			
		||||
            let mut coding_ptrs: Vec<_> = coding_locks
 | 
			
		||||
                .iter_mut()
 | 
			
		||||
                .enumerate()
 | 
			
		||||
                .map(|(i, l)| {
 | 
			
		||||
                    trace!("{} i: {} coding: {}", id, i, l.data[0],);
 | 
			
		||||
                    &mut l.data_mut()[..max_data_size]
 | 
			
		||||
                })
 | 
			
		||||
                .collect();
 | 
			
		||||
 | 
			
		||||
            generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
 | 
			
		||||
            debug!(
 | 
			
		||||
                "{} start_idx: {} data: {}:{} coding: {}:{}",
 | 
			
		||||
                id, start_idx, block_start, block_end, coding_start, block_end
 | 
			
		||||
            );
 | 
			
		||||
            block_start = block_end;
 | 
			
		||||
        }
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn setup_window_ledger(
 | 
			
		||||
        offset: usize,
 | 
			
		||||
        num_blobs: usize,
 | 
			
		||||
@@ -896,14 +740,12 @@ pub mod test {
 | 
			
		||||
            blobs.push(b_);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Make some dummy slots
 | 
			
		||||
        index_blobs(
 | 
			
		||||
            &blobs,
 | 
			
		||||
            &Keypair::new().pubkey(),
 | 
			
		||||
            offset as u64,
 | 
			
		||||
            &vec![slot; blobs.len()],
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
            // Make some dummy slots
 | 
			
		||||
            let slot_tick_heights: Vec<(&SharedBlob, u64)> =
 | 
			
		||||
                blobs.iter().zip(vec![slot; blobs.len()]).collect();
 | 
			
		||||
            index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64);
 | 
			
		||||
        }
 | 
			
		||||
        for b in blobs {
 | 
			
		||||
            let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
 | 
			
		||||
 | 
			
		||||
@@ -912,18 +754,6 @@ pub mod test {
 | 
			
		||||
        window
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
 | 
			
		||||
        let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs();
 | 
			
		||||
 | 
			
		||||
        index_blobs(
 | 
			
		||||
            &blobs,
 | 
			
		||||
            &Keypair::new().pubkey(),
 | 
			
		||||
            offset as u64,
 | 
			
		||||
            &vec![DEFAULT_SLOT_HEIGHT; blobs.len()],
 | 
			
		||||
        );
 | 
			
		||||
        blobs
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn generate_entry_window(offset: usize, num_blobs: usize) -> Vec<WindowSlot> {
 | 
			
		||||
        let mut window = vec![
 | 
			
		||||
            WindowSlot {
 | 
			
		||||
@@ -933,8 +763,17 @@ pub mod test {
 | 
			
		||||
            };
 | 
			
		||||
            WINDOW_SIZE
 | 
			
		||||
        ];
 | 
			
		||||
        let entries = make_tiny_test_entries(num_blobs);
 | 
			
		||||
        let blobs = entries.to_shared_blobs();
 | 
			
		||||
 | 
			
		||||
        let blobs = generate_test_blobs(offset, num_blobs);
 | 
			
		||||
        {
 | 
			
		||||
            // Make some dummy slots
 | 
			
		||||
            let slot_tick_heights: Vec<(&SharedBlob, u64)> = blobs
 | 
			
		||||
                .iter()
 | 
			
		||||
                .zip(vec![DEFAULT_SLOT_HEIGHT; blobs.len()])
 | 
			
		||||
                .collect();
 | 
			
		||||
            index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for b in blobs.into_iter() {
 | 
			
		||||
            let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
 | 
			
		||||
 
 | 
			
		||||
@@ -14,6 +14,7 @@ use crate::tpu::{Tpu, TpuReturnType};
 | 
			
		||||
use crate::tpu_forwarder::TpuForwarder;
 | 
			
		||||
use crate::tvu::{Sockets, Tvu, TvuReturnType};
 | 
			
		||||
use crate::vote_signer_proxy::VoteSignerProxy;
 | 
			
		||||
use crate::window::{new_window, SharedWindow};
 | 
			
		||||
use log::Level;
 | 
			
		||||
use solana_sdk::hash::Hash;
 | 
			
		||||
use solana_sdk::signature::{Keypair, KeypairUtil};
 | 
			
		||||
@@ -97,6 +98,7 @@ pub struct Fullnode {
 | 
			
		||||
    bank: Arc<Bank>,
 | 
			
		||||
    cluster_info: Arc<RwLock<ClusterInfo>>,
 | 
			
		||||
    sigverify_disabled: bool,
 | 
			
		||||
    shared_window: SharedWindow,
 | 
			
		||||
    tvu_sockets: Vec<UdpSocket>,
 | 
			
		||||
    repair_socket: UdpSocket,
 | 
			
		||||
    retransmit_socket: UdpSocket,
 | 
			
		||||
@@ -202,6 +204,8 @@ impl Fullnode {
 | 
			
		||||
 | 
			
		||||
        let db_ledger = db_ledger.unwrap_or_else(|| Self::make_db_ledger(ledger_path));
 | 
			
		||||
 | 
			
		||||
        let window = new_window(32 * 1024);
 | 
			
		||||
        let shared_window = Arc::new(RwLock::new(window));
 | 
			
		||||
        node.info.wallclock = timestamp();
 | 
			
		||||
        let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair(
 | 
			
		||||
            node.info,
 | 
			
		||||
@@ -311,6 +315,7 @@ impl Fullnode {
 | 
			
		||||
                    .try_clone()
 | 
			
		||||
                    .expect("Failed to clone broadcast socket"),
 | 
			
		||||
                cluster_info.clone(),
 | 
			
		||||
                shared_window.clone(),
 | 
			
		||||
                entry_height,
 | 
			
		||||
                bank.leader_scheduler.clone(),
 | 
			
		||||
                entry_receiver,
 | 
			
		||||
@@ -326,6 +331,7 @@ impl Fullnode {
 | 
			
		||||
        Fullnode {
 | 
			
		||||
            keypair,
 | 
			
		||||
            cluster_info,
 | 
			
		||||
            shared_window,
 | 
			
		||||
            bank,
 | 
			
		||||
            sigverify_disabled,
 | 
			
		||||
            gossip_service,
 | 
			
		||||
@@ -481,6 +487,7 @@ impl Fullnode {
 | 
			
		||||
                .try_clone()
 | 
			
		||||
                .expect("Failed to clone broadcast socket"),
 | 
			
		||||
            self.cluster_info.clone(),
 | 
			
		||||
            self.shared_window.clone(),
 | 
			
		||||
            entry_height,
 | 
			
		||||
            self.bank.leader_scheduler.clone(),
 | 
			
		||||
            blob_receiver,
 | 
			
		||||
 
 | 
			
		||||
@@ -70,7 +70,6 @@ pub mod tpu;
 | 
			
		||||
pub mod tpu_forwarder;
 | 
			
		||||
pub mod tvu;
 | 
			
		||||
pub mod vote_signer_proxy;
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
pub mod window;
 | 
			
		||||
pub mod window_service;
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -8,6 +8,7 @@ use log::Level;
 | 
			
		||||
use serde::Serialize;
 | 
			
		||||
pub use solana_sdk::packet::PACKET_DATA_SIZE;
 | 
			
		||||
use solana_sdk::pubkey::Pubkey;
 | 
			
		||||
use std::borrow::Borrow;
 | 
			
		||||
use std::cmp;
 | 
			
		||||
use std::fmt;
 | 
			
		||||
use std::io;
 | 
			
		||||
@@ -450,14 +451,21 @@ impl Blob {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slots: &[u64]) {
 | 
			
		||||
pub fn index_blobs<I, J, K>(blobs: I, id: &Pubkey, mut index: u64)
 | 
			
		||||
where
 | 
			
		||||
    I: IntoIterator<Item = J>,
 | 
			
		||||
    J: Borrow<(K, u64)>,
 | 
			
		||||
    K: Borrow<SharedBlob>,
 | 
			
		||||
{
 | 
			
		||||
    // enumerate all the blobs, those are the indices
 | 
			
		||||
    for (blob, slot) in blobs.iter().zip(slots) {
 | 
			
		||||
        let mut blob = blob.write().unwrap();
 | 
			
		||||
    for b in blobs {
 | 
			
		||||
        let (b, slot) = b.borrow();
 | 
			
		||||
        let mut blob = b.borrow().write().unwrap();
 | 
			
		||||
 | 
			
		||||
        blob.set_index(index).expect("set_index");
 | 
			
		||||
        blob.set_slot(*slot).expect("set_slot");
 | 
			
		||||
        blob.set_id(id).expect("set_id");
 | 
			
		||||
        blob.set_flags(0).unwrap();
 | 
			
		||||
 | 
			
		||||
        index += 1;
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user