This reverts commit 596f611ede.
			
			
This commit is contained in:
		
							
								
								
									
										13
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										13
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							@@ -1909,6 +1909,17 @@ dependencies = [
 | 
			
		||||
 "redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "reed-solomon-erasure"
 | 
			
		||||
version = "3.1.1"
 | 
			
		||||
source = "registry+https://github.com/rust-lang/crates.io-index"
 | 
			
		||||
dependencies = [
 | 
			
		||||
 "cc 1.0.31 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
 "libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
 "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
 "smallvec 0.6.9 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
[[package]]
 | 
			
		||||
name = "regex"
 | 
			
		||||
version = "1.1.2"
 | 
			
		||||
@@ -2203,6 +2214,7 @@ dependencies = [
 | 
			
		||||
 "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
 "rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
 "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
 "reed-solomon-erasure 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
 "reqwest 0.9.15 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
 "rocksdb 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
 "serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)",
 | 
			
		||||
@@ -3597,6 +3609,7 @@ dependencies = [
 | 
			
		||||
"checksum redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)" = "423e376fffca3dfa06c9e9790a9ccd282fafb3cc6e6397d01dbf64f9bacc6b85"
 | 
			
		||||
"checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76"
 | 
			
		||||
"checksum redox_users 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe5204c3a17e97dde73f285d49be585df59ed84b50a872baf416e73b62c3828"
 | 
			
		||||
"checksum reed-solomon-erasure 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77cbbd4c02f53e345fe49e74255a1b10080731ffb2a03475e11df7fc8a043c37"
 | 
			
		||||
"checksum regex 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "53ee8cfdddb2e0291adfb9f13d31d3bbe0a03c9a402c01b1e24188d86c35b24f"
 | 
			
		||||
"checksum regex-syntax 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8c2f35eedad5295fdf00a63d7d4b238135723f92b434ec06774dad15c7ab0861"
 | 
			
		||||
"checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5"
 | 
			
		||||
 
 | 
			
		||||
@@ -41,6 +41,7 @@ nix = "0.13.0"
 | 
			
		||||
rand = "0.6.5"
 | 
			
		||||
rand_chacha = "0.1.1"
 | 
			
		||||
rayon = "1.0.0"
 | 
			
		||||
reed-solomon-erasure = "3.1.1"
 | 
			
		||||
reqwest = "0.9.11"
 | 
			
		||||
rocksdb = "0.11.0"
 | 
			
		||||
serde = "1.0.89"
 | 
			
		||||
 
 | 
			
		||||
@@ -24,9 +24,8 @@ fn main() {
 | 
			
		||||
 | 
			
		||||
    let chacha = !env::var("CARGO_FEATURE_CHACHA").is_err();
 | 
			
		||||
    let cuda = !env::var("CARGO_FEATURE_CUDA").is_err();
 | 
			
		||||
    let erasure = !env::var("CARGO_FEATURE_ERASURE").is_err();
 | 
			
		||||
 | 
			
		||||
    if chacha || cuda || erasure {
 | 
			
		||||
    if chacha || cuda {
 | 
			
		||||
        println!("cargo:rerun-if-changed={}", perf_libs_dir);
 | 
			
		||||
        println!("cargo:rustc-link-search=native={}", perf_libs_dir);
 | 
			
		||||
    }
 | 
			
		||||
@@ -46,30 +45,4 @@ fn main() {
 | 
			
		||||
        println!("cargo:rustc-link-lib=dylib=cuda");
 | 
			
		||||
        println!("cargo:rustc-link-lib=dylib=cudadevrt");
 | 
			
		||||
    }
 | 
			
		||||
    if erasure {
 | 
			
		||||
        #[cfg(any(target_os = "macos", target_os = "ios"))]
 | 
			
		||||
        {
 | 
			
		||||
            println!(
 | 
			
		||||
                "cargo:rerun-if-changed={}/libgf_complete.dylib",
 | 
			
		||||
                perf_libs_dir
 | 
			
		||||
            );
 | 
			
		||||
            println!("cargo:rerun-if-changed={}/libJerasure.dylib", perf_libs_dir);
 | 
			
		||||
        }
 | 
			
		||||
        #[cfg(all(unix, not(any(target_os = "macos", target_os = "ios"))))]
 | 
			
		||||
        {
 | 
			
		||||
            println!("cargo:rerun-if-changed={}/libgf_complete.so", perf_libs_dir);
 | 
			
		||||
            println!("cargo:rerun-if-changed={}/libJerasure.so", perf_libs_dir);
 | 
			
		||||
        }
 | 
			
		||||
        #[cfg(windows)]
 | 
			
		||||
        {
 | 
			
		||||
            println!(
 | 
			
		||||
                "cargo:rerun-if-changed={}/libgf_complete.dll",
 | 
			
		||||
                perf_libs_dir
 | 
			
		||||
            );
 | 
			
		||||
            println!("cargo:rerun-if-changed={}/libJerasure.dll", perf_libs_dir);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        println!("cargo:rustc-link-lib=dylib=Jerasure");
 | 
			
		||||
        println!("cargo:rustc-link-lib=dylib=gf_complete");
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -3,7 +3,6 @@
 | 
			
		||||
//! access read to a persistent file-based ledger.
 | 
			
		||||
 | 
			
		||||
use crate::entry::Entry;
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
use crate::erasure;
 | 
			
		||||
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
 | 
			
		||||
use crate::result::{Error, Result};
 | 
			
		||||
@@ -17,7 +16,6 @@ use hashbrown::HashMap;
 | 
			
		||||
#[cfg(not(feature = "kvstore"))]
 | 
			
		||||
use rocksdb;
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
use solana_metrics::counter::Counter;
 | 
			
		||||
 | 
			
		||||
use solana_sdk::genesis_block::GenesisBlock;
 | 
			
		||||
@@ -79,9 +77,9 @@ pub struct Blocktree {
 | 
			
		||||
    meta_cf: LedgerColumn<cf::SlotMeta>,
 | 
			
		||||
    data_cf: LedgerColumn<cf::Data>,
 | 
			
		||||
    erasure_cf: LedgerColumn<cf::Coding>,
 | 
			
		||||
    #[cfg(feature = "erasure")]
 | 
			
		||||
    erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
 | 
			
		||||
    orphans_cf: LedgerColumn<cf::Orphans>,
 | 
			
		||||
    session: Arc<erasure::Session>,
 | 
			
		||||
    pub new_blobs_signals: Vec<SyncSender<bool>>,
 | 
			
		||||
    pub root_slot: RwLock<u64>,
 | 
			
		||||
}
 | 
			
		||||
@@ -92,7 +90,6 @@ pub const META_CF: &str = "meta";
 | 
			
		||||
pub const DATA_CF: &str = "data";
 | 
			
		||||
// Column family for erasure data
 | 
			
		||||
pub const ERASURE_CF: &str = "erasure";
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
pub const ERASURE_META_CF: &str = "erasure_meta";
 | 
			
		||||
// Column family for orphans data
 | 
			
		||||
pub const ORPHANS_CF: &str = "orphans";
 | 
			
		||||
@@ -116,7 +113,7 @@ impl Blocktree {
 | 
			
		||||
 | 
			
		||||
        // Create the erasure column family
 | 
			
		||||
        let erasure_cf = LedgerColumn::new(&db);
 | 
			
		||||
        #[cfg(feature = "erasure")]
 | 
			
		||||
 | 
			
		||||
        let erasure_meta_cf = LedgerColumn::new(&db);
 | 
			
		||||
 | 
			
		||||
        // Create the orphans column family. An "orphan" is defined as
 | 
			
		||||
@@ -124,14 +121,17 @@ impl Blocktree {
 | 
			
		||||
        // known parent
 | 
			
		||||
        let orphans_cf = LedgerColumn::new(&db);
 | 
			
		||||
 | 
			
		||||
        // setup erasure
 | 
			
		||||
        let session = Arc::new(erasure::Session::default());
 | 
			
		||||
 | 
			
		||||
        Ok(Blocktree {
 | 
			
		||||
            db,
 | 
			
		||||
            meta_cf,
 | 
			
		||||
            data_cf,
 | 
			
		||||
            erasure_cf,
 | 
			
		||||
            #[cfg(feature = "erasure")]
 | 
			
		||||
            erasure_meta_cf,
 | 
			
		||||
            orphans_cf,
 | 
			
		||||
            session,
 | 
			
		||||
            new_blobs_signals: vec![],
 | 
			
		||||
            root_slot: RwLock::new(0),
 | 
			
		||||
        })
 | 
			
		||||
@@ -259,7 +259,6 @@ impl Blocktree {
 | 
			
		||||
        // A map from slot to a 2-tuple of metadata: (working copy, backup copy),
 | 
			
		||||
        // so we can detect changes to the slot metadata later
 | 
			
		||||
        let mut slot_meta_working_set = HashMap::new();
 | 
			
		||||
        #[cfg(feature = "erasure")]
 | 
			
		||||
        let mut erasure_meta_working_set = HashMap::new();
 | 
			
		||||
        let new_blobs: Vec<_> = new_blobs.into_iter().collect();
 | 
			
		||||
        let mut prev_inserted_blob_datas = HashMap::new();
 | 
			
		||||
@@ -301,8 +300,6 @@ impl Blocktree {
 | 
			
		||||
                continue;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            #[cfg(feature = "erasure")]
 | 
			
		||||
            {
 | 
			
		||||
            let set_index = ErasureMeta::set_index_for(blob.index());
 | 
			
		||||
            let erasure_meta_entry = erasure_meta_working_set
 | 
			
		||||
                .entry((blob_slot, set_index))
 | 
			
		||||
@@ -313,8 +310,7 @@ impl Blocktree {
 | 
			
		||||
                        .unwrap_or_else(|| ErasureMeta::new(set_index))
 | 
			
		||||
                });
 | 
			
		||||
 | 
			
		||||
                erasure_meta_entry.set_data_present(blob.index());
 | 
			
		||||
            }
 | 
			
		||||
            erasure_meta_entry.set_data_present(blob.index(), true);
 | 
			
		||||
 | 
			
		||||
            let _ = self.insert_data_blob(
 | 
			
		||||
                blob,
 | 
			
		||||
@@ -339,12 +335,9 @@ impl Blocktree {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        #[cfg(feature = "erasure")]
 | 
			
		||||
        {
 | 
			
		||||
        for ((slot, set_index), erasure_meta) in erasure_meta_working_set.iter() {
 | 
			
		||||
            write_batch.put::<cf::ErasureMeta>((*slot, *set_index), erasure_meta)?;
 | 
			
		||||
        }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        self.db.write(write_batch)?;
 | 
			
		||||
 | 
			
		||||
@@ -354,36 +347,8 @@ impl Blocktree {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        #[cfg(feature = "erasure")]
 | 
			
		||||
        for ((slot, set_index), erasure_meta) in erasure_meta_working_set.into_iter() {
 | 
			
		||||
            if erasure_meta.can_recover() {
 | 
			
		||||
                match self.recover(slot, set_index) {
 | 
			
		||||
                    Ok(recovered) => {
 | 
			
		||||
                        inc_new_counter_info!("erasures-recovered", recovered);
 | 
			
		||||
                    }
 | 
			
		||||
                    Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => {
 | 
			
		||||
                        let mut erasure_meta = self
 | 
			
		||||
                            .erasure_meta_cf
 | 
			
		||||
                            .get((slot, set_index))?
 | 
			
		||||
                            .expect("erasure meta should exist");
 | 
			
		||||
 | 
			
		||||
                        let mut batch = self.db.batch()?;
 | 
			
		||||
 | 
			
		||||
                        let start_index = erasure_meta.start_index();
 | 
			
		||||
                        let (_, coding_end_idx) = erasure_meta.end_indexes();
 | 
			
		||||
 | 
			
		||||
                        erasure_meta.coding = 0;
 | 
			
		||||
                        batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
 | 
			
		||||
 | 
			
		||||
                        for idx in start_index..coding_end_idx {
 | 
			
		||||
                            batch.delete::<cf::Coding>((slot, idx))?;
 | 
			
		||||
                        }
 | 
			
		||||
 | 
			
		||||
                        self.db.write(batch)?;
 | 
			
		||||
                    }
 | 
			
		||||
                    Err(e) => return Err(e),
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            self.try_erasure_recover(&erasure_meta, slot, set_index)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
@@ -453,26 +418,42 @@ impl Blocktree {
 | 
			
		||||
    pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
 | 
			
		||||
        self.erasure_cf.get_bytes((slot, index))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> {
 | 
			
		||||
        self.erasure_cf.delete((slot, index))
 | 
			
		||||
        let set_index = ErasureMeta::set_index_for(index);
 | 
			
		||||
 | 
			
		||||
        let mut erasure_meta = self
 | 
			
		||||
            .erasure_meta_cf
 | 
			
		||||
            .get((slot, set_index))?
 | 
			
		||||
            .unwrap_or_else(|| ErasureMeta::new(set_index));
 | 
			
		||||
 | 
			
		||||
        erasure_meta.set_coding_present(index, false);
 | 
			
		||||
 | 
			
		||||
        let mut batch = self.db.batch()?;
 | 
			
		||||
 | 
			
		||||
        batch.delete::<cf::Coding>((slot, index))?;
 | 
			
		||||
        batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
 | 
			
		||||
 | 
			
		||||
        self.db.write(batch)?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
 | 
			
		||||
        self.data_cf.get_bytes((slot, index))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// For benchmarks, testing, and setup.
 | 
			
		||||
    /// Does no metadata tracking. Use with care.
 | 
			
		||||
    pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
 | 
			
		||||
        self.data_cf.put_bytes((slot, index), bytes)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
 | 
			
		||||
        self.erasure_cf.put_bytes((slot, index), bytes)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[cfg(not(feature = "erasure"))]
 | 
			
		||||
    #[inline]
 | 
			
		||||
    pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
 | 
			
		||||
        self.put_coding_blob_bytes_raw(slot, index, bytes)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// this function will insert coding blobs and also automatically track erasure-related
 | 
			
		||||
    /// metadata. If recovery is available it will be done
 | 
			
		||||
    #[cfg(feature = "erasure")]
 | 
			
		||||
    pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
 | 
			
		||||
        let set_index = ErasureMeta::set_index_for(index);
 | 
			
		||||
        let mut erasure_meta = self
 | 
			
		||||
@@ -480,7 +461,7 @@ impl Blocktree {
 | 
			
		||||
            .get((slot, set_index))?
 | 
			
		||||
            .unwrap_or_else(|| ErasureMeta::new(set_index));
 | 
			
		||||
 | 
			
		||||
        erasure_meta.set_coding_present(index);
 | 
			
		||||
        erasure_meta.set_coding_present(index, true);
 | 
			
		||||
 | 
			
		||||
        let mut writebatch = self.db.batch()?;
 | 
			
		||||
 | 
			
		||||
@@ -490,43 +471,28 @@ impl Blocktree {
 | 
			
		||||
 | 
			
		||||
        self.db.write(writebatch)?;
 | 
			
		||||
 | 
			
		||||
        if erasure_meta.can_recover() {
 | 
			
		||||
            match self.recover(slot, set_index) {
 | 
			
		||||
                Ok(recovered) => {
 | 
			
		||||
                    inc_new_counter_info!("erasures-recovered", recovered);
 | 
			
		||||
                    return Ok(());
 | 
			
		||||
                }
 | 
			
		||||
                Err(Error::ErasureError(erasure::ErasureError::CorruptCoding)) => {
 | 
			
		||||
                    let start_index = erasure_meta.start_index();
 | 
			
		||||
                    let (_, coding_end_idx) = erasure_meta.end_indexes();
 | 
			
		||||
                    let mut batch = self.db.batch()?;
 | 
			
		||||
 | 
			
		||||
                    erasure_meta.coding = 0;
 | 
			
		||||
                    batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
 | 
			
		||||
 | 
			
		||||
                    for idx in start_index..coding_end_idx {
 | 
			
		||||
                        batch.delete::<cf::Coding>((slot, idx as u64))?;
 | 
			
		||||
        self.try_erasure_recover(&erasure_meta, slot, set_index)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
                    self.db.write(batch)?;
 | 
			
		||||
 | 
			
		||||
                    return Ok(());
 | 
			
		||||
    fn try_erasure_recover(
 | 
			
		||||
        &self,
 | 
			
		||||
        erasure_meta: &ErasureMeta,
 | 
			
		||||
        slot: u64,
 | 
			
		||||
        set_index: u64,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        match erasure_meta.status() {
 | 
			
		||||
            ErasureMetaStatus::CanRecover => {
 | 
			
		||||
                let recovered = self.recover(slot, set_index)?;
 | 
			
		||||
                inc_new_counter_info!("blocktree-erasure-blobs_recovered", recovered);
 | 
			
		||||
            }
 | 
			
		||||
                Err(e) => return Err(e),
 | 
			
		||||
            ErasureMetaStatus::StillNeed(needed) => {
 | 
			
		||||
                inc_new_counter_info!("blocktree-erasure-blobs_needed", needed)
 | 
			
		||||
            }
 | 
			
		||||
            ErasureMetaStatus::DataFull => inc_new_counter_info!("blocktree-erasure-complete", 1),
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn put_data_raw(&self, slot: u64, index: u64, value: &[u8]) -> Result<()> {
 | 
			
		||||
        self.data_cf.put_bytes((slot, index), value)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> {
 | 
			
		||||
        self.data_cf.put_bytes((slot, index), bytes)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn get_data_blob(&self, slot: u64, blob_index: u64) -> Result<Option<Blob>> {
 | 
			
		||||
        let bytes = self.get_data_blob_bytes(slot, blob_index)?;
 | 
			
		||||
        Ok(bytes.map(|bytes| {
 | 
			
		||||
@@ -626,20 +592,6 @@ impl Blocktree {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn find_missing_coding_indexes(
 | 
			
		||||
        &self,
 | 
			
		||||
        slot: u64,
 | 
			
		||||
        start_index: u64,
 | 
			
		||||
        end_index: u64,
 | 
			
		||||
        max_missing: usize,
 | 
			
		||||
    ) -> Vec<u64> {
 | 
			
		||||
        if let Ok(mut db_iterator) = self.erasure_cf.cursor() {
 | 
			
		||||
            Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing)
 | 
			
		||||
        } else {
 | 
			
		||||
            vec![]
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Returns the entry vector for the slot starting with `blob_start_index`
 | 
			
		||||
    pub fn get_slot_entries(
 | 
			
		||||
        &self,
 | 
			
		||||
@@ -1088,43 +1040,45 @@ impl Blocktree {
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[cfg(feature = "erasure")]
 | 
			
		||||
    /// Attempts recovery using erasure coding
 | 
			
		||||
    fn recover(&self, slot: u64, set_index: u64) -> Result<usize> {
 | 
			
		||||
        use crate::erasure::{ErasureError, NUM_CODING, NUM_DATA};
 | 
			
		||||
        use crate::packet::BLOB_DATA_SIZE;
 | 
			
		||||
        use crate::erasure::{ERASURE_SET_SIZE, NUM_DATA};
 | 
			
		||||
 | 
			
		||||
        let erasure_meta = self.erasure_meta_cf.get((slot, set_index))?.unwrap();
 | 
			
		||||
 | 
			
		||||
        let start_idx = erasure_meta.start_index();
 | 
			
		||||
        let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes();
 | 
			
		||||
 | 
			
		||||
        let mut erasures = Vec::with_capacity(NUM_CODING + 1);
 | 
			
		||||
        let (mut data, mut coding) = (vec![], vec![]);
 | 
			
		||||
        let present = &mut [true; ERASURE_SET_SIZE];
 | 
			
		||||
        let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE);
 | 
			
		||||
        let mut size = 0;
 | 
			
		||||
 | 
			
		||||
        for i in start_idx..coding_end_idx {
 | 
			
		||||
            if erasure_meta.is_coding_present(i) {
 | 
			
		||||
                let blob_bytes = self
 | 
			
		||||
                let mut blob_bytes = self
 | 
			
		||||
                    .erasure_cf
 | 
			
		||||
                    .get_bytes((slot, i))?
 | 
			
		||||
                    .expect("erasure_meta must have no false positives");
 | 
			
		||||
 | 
			
		||||
                blob_bytes.drain(..BLOB_HEADER_SIZE);
 | 
			
		||||
 | 
			
		||||
                if size == 0 {
 | 
			
		||||
                    size = blob_bytes.len() - BLOB_HEADER_SIZE;
 | 
			
		||||
                    size = blob_bytes.len();
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                coding.push(blob_bytes);
 | 
			
		||||
                blobs.push(blob_bytes);
 | 
			
		||||
            } else {
 | 
			
		||||
                let set_relative_idx = (i - start_idx) + NUM_DATA as u64;
 | 
			
		||||
                coding.push(vec![0; crate::packet::BLOB_SIZE]);
 | 
			
		||||
                erasures.push(set_relative_idx as i32);
 | 
			
		||||
                let set_relative_idx = (i - start_idx) as usize + NUM_DATA;
 | 
			
		||||
                blobs.push(vec![0; size]);
 | 
			
		||||
                present[set_relative_idx] = false;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        assert_ne!(size, 0);
 | 
			
		||||
 | 
			
		||||
        for i in start_idx..data_end_idx {
 | 
			
		||||
            let set_relative_idx = (i - start_idx) as usize;
 | 
			
		||||
 | 
			
		||||
            if erasure_meta.is_data_present(i) {
 | 
			
		||||
                let mut blob_bytes = self
 | 
			
		||||
                    .data_cf
 | 
			
		||||
@@ -1132,90 +1086,28 @@ impl Blocktree {
 | 
			
		||||
                    .expect("erasure_meta must have no false positives");
 | 
			
		||||
 | 
			
		||||
                // If data is too short, extend it with zeroes
 | 
			
		||||
                if blob_bytes.len() < size {
 | 
			
		||||
                blob_bytes.resize(size, 0u8);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                data.push(blob_bytes);
 | 
			
		||||
                blobs.insert(set_relative_idx, blob_bytes);
 | 
			
		||||
            } else {
 | 
			
		||||
                let set_relative_index = i - start_idx;
 | 
			
		||||
                data.push(vec![0; size]);
 | 
			
		||||
                blobs.insert(set_relative_idx, vec![0u8; size]);
 | 
			
		||||
                // data erasures must come before any coding erasures if present
 | 
			
		||||
                erasures.insert(0, set_relative_index as i32);
 | 
			
		||||
                present[set_relative_idx] = false;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let mut coding_ptrs: Vec<_> = coding
 | 
			
		||||
            .iter_mut()
 | 
			
		||||
            .map(|coding_bytes| &mut coding_bytes[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size])
 | 
			
		||||
            .collect();
 | 
			
		||||
        let (recovered_data, recovered_coding) = self
 | 
			
		||||
            .session
 | 
			
		||||
            .reconstruct_blobs(&mut blobs, present, size, start_idx, slot)?;
 | 
			
		||||
 | 
			
		||||
        let mut data_ptrs: Vec<_> = data
 | 
			
		||||
            .iter_mut()
 | 
			
		||||
            .map(|data_bytes| &mut data_bytes[..size])
 | 
			
		||||
            .collect();
 | 
			
		||||
 | 
			
		||||
        // Marks the end
 | 
			
		||||
        erasures.push(-1);
 | 
			
		||||
        trace!("erasures: {:?}, size: {}", erasures, size);
 | 
			
		||||
 | 
			
		||||
        erasure::decode_blocks(
 | 
			
		||||
            data_ptrs.as_mut_slice(),
 | 
			
		||||
            coding_ptrs.as_mut_slice(),
 | 
			
		||||
            &erasures,
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        // Create the missing blobs from the reconstructed data
 | 
			
		||||
        let block_start_idx = erasure_meta.start_index();
 | 
			
		||||
        let (mut recovered_data, mut recovered_coding) = (vec![], vec![]);
 | 
			
		||||
 | 
			
		||||
        for i in &erasures[..erasures.len() - 1] {
 | 
			
		||||
            let n = *i as usize;
 | 
			
		||||
 | 
			
		||||
            let (data_size, idx, first_byte);
 | 
			
		||||
 | 
			
		||||
            if n < NUM_DATA {
 | 
			
		||||
                let mut blob = Blob::new(&data_ptrs[n]);
 | 
			
		||||
 | 
			
		||||
                idx = n as u64 + block_start_idx;
 | 
			
		||||
                data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
 | 
			
		||||
                first_byte = blob.data[0];
 | 
			
		||||
 | 
			
		||||
                if data_size > BLOB_DATA_SIZE {
 | 
			
		||||
                    error!("corrupt data blob[{}] data_size: {}", idx, data_size);
 | 
			
		||||
                    return Err(Error::ErasureError(ErasureError::CorruptCoding));
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                blob.set_slot(slot);
 | 
			
		||||
                blob.set_index(idx);
 | 
			
		||||
                blob.set_size(data_size);
 | 
			
		||||
                recovered_data.push(blob);
 | 
			
		||||
            } else {
 | 
			
		||||
                let mut blob = Blob::new(&coding_ptrs[n - NUM_DATA]);
 | 
			
		||||
 | 
			
		||||
                idx = (n - NUM_DATA) as u64 + block_start_idx;
 | 
			
		||||
                data_size = size;
 | 
			
		||||
                first_byte = blob.data[0];
 | 
			
		||||
 | 
			
		||||
                if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
 | 
			
		||||
                    error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
 | 
			
		||||
                    return Err(Error::ErasureError(ErasureError::CorruptCoding));
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                blob.set_slot(slot);
 | 
			
		||||
                blob.set_index(idx);
 | 
			
		||||
                blob.set_data_size(data_size as u64);
 | 
			
		||||
                recovered_coding.push(blob);
 | 
			
		||||
            }
 | 
			
		||||
        let amount_recovered = recovered_data.len() + recovered_coding.len();
 | 
			
		||||
 | 
			
		||||
        trace!(
 | 
			
		||||
                "erasures[{}] ({}) size: {} data[0]: {}",
 | 
			
		||||
                *i,
 | 
			
		||||
                idx,
 | 
			
		||||
                data_size,
 | 
			
		||||
                first_byte,
 | 
			
		||||
            "[recover] reconstruction OK slot: {}, indexes: [{},{})",
 | 
			
		||||
            slot,
 | 
			
		||||
            start_idx,
 | 
			
		||||
            data_end_idx
 | 
			
		||||
        );
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        self.write_blobs(recovered_data)?;
 | 
			
		||||
 | 
			
		||||
@@ -1223,7 +1115,7 @@ impl Blocktree {
 | 
			
		||||
            self.put_coding_blob_bytes_raw(slot, blob.index(), &blob.data[..])?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(erasures.len() - 1)
 | 
			
		||||
        Ok(amount_recovered)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Returns the next consumed index and the number of ticks in the new consumed
 | 
			
		||||
@@ -1821,26 +1713,28 @@ pub mod tests {
 | 
			
		||||
        let blocktree_path = get_tmp_ledger_path("test_insert_data_blobs_consecutive");
 | 
			
		||||
        {
 | 
			
		||||
            let blocktree = Blocktree::open(&blocktree_path).unwrap();
 | 
			
		||||
            let slot = 0;
 | 
			
		||||
            let parent_slot = 0;
 | 
			
		||||
            for i in 0..4 {
 | 
			
		||||
                let slot = i;
 | 
			
		||||
                let parent_slot = if i == 0 { 0 } else { i - 1 };
 | 
			
		||||
                // Write entries
 | 
			
		||||
            let num_entries = 21 as u64;
 | 
			
		||||
                let num_entries = 21 as u64 * (i + 1);
 | 
			
		||||
                let (blobs, original_entries) = make_slot_entries(slot, parent_slot, num_entries);
 | 
			
		||||
 | 
			
		||||
                blocktree
 | 
			
		||||
                    .write_blobs(blobs.iter().skip(1).step_by(2))
 | 
			
		||||
                    .unwrap();
 | 
			
		||||
 | 
			
		||||
            assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]);
 | 
			
		||||
                assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]);
 | 
			
		||||
 | 
			
		||||
                let meta = blocktree.meta_cf.get(slot).unwrap().unwrap();
 | 
			
		||||
                if num_entries % 2 == 0 {
 | 
			
		||||
                    assert_eq!(meta.received, num_entries);
 | 
			
		||||
                } else {
 | 
			
		||||
                    debug!("got here");
 | 
			
		||||
                    assert_eq!(meta.received, num_entries - 1);
 | 
			
		||||
                }
 | 
			
		||||
                assert_eq!(meta.consumed, 0);
 | 
			
		||||
            assert_eq!(meta.parent_slot, 0);
 | 
			
		||||
                assert_eq!(meta.parent_slot, parent_slot);
 | 
			
		||||
                if num_entries % 2 == 0 {
 | 
			
		||||
                    assert_eq!(meta.last_index, num_entries - 1);
 | 
			
		||||
                } else {
 | 
			
		||||
@@ -1850,16 +1744,17 @@ pub mod tests {
 | 
			
		||||
                blocktree.write_blobs(blobs.iter().step_by(2)).unwrap();
 | 
			
		||||
 | 
			
		||||
                assert_eq!(
 | 
			
		||||
                blocktree.get_slot_entries(0, 0, None).unwrap(),
 | 
			
		||||
                    blocktree.get_slot_entries(slot, 0, None).unwrap(),
 | 
			
		||||
                    original_entries,
 | 
			
		||||
                );
 | 
			
		||||
 | 
			
		||||
                let meta = blocktree.meta_cf.get(slot).unwrap().unwrap();
 | 
			
		||||
                assert_eq!(meta.received, num_entries);
 | 
			
		||||
                assert_eq!(meta.consumed, num_entries);
 | 
			
		||||
            assert_eq!(meta.parent_slot, 0);
 | 
			
		||||
                assert_eq!(meta.parent_slot, parent_slot);
 | 
			
		||||
                assert_eq!(meta.last_index, num_entries - 1);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
 | 
			
		||||
    }
 | 
			
		||||
@@ -2665,7 +2560,6 @@ pub mod tests {
 | 
			
		||||
        Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[cfg(feature = "erasure")]
 | 
			
		||||
    mod erasure {
 | 
			
		||||
        use super::*;
 | 
			
		||||
        use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec};
 | 
			
		||||
@@ -2730,7 +2624,7 @@ pub mod tests {
 | 
			
		||||
            assert_eq!(erasure_meta.data, 0x00FF);
 | 
			
		||||
            assert_eq!(erasure_meta.coding, 0x0);
 | 
			
		||||
 | 
			
		||||
            let mut coding_generator = CodingGenerator::new();
 | 
			
		||||
            let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
 | 
			
		||||
            let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]);
 | 
			
		||||
 | 
			
		||||
            for shared_coding_blob in coding_blobs {
 | 
			
		||||
@@ -2749,6 +2643,23 @@ pub mod tests {
 | 
			
		||||
 | 
			
		||||
            assert_eq!(erasure_meta.data, 0xFFFF);
 | 
			
		||||
            assert_eq!(erasure_meta.coding, 0x0F);
 | 
			
		||||
 | 
			
		||||
            let (start_idx, coding_end_idx) =
 | 
			
		||||
                (erasure_meta.start_index(), erasure_meta.end_indexes().1);
 | 
			
		||||
 | 
			
		||||
            for idx in start_idx..coding_end_idx {
 | 
			
		||||
                blocktree.delete_coding_blob(slot, idx).unwrap();
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            let erasure_meta = blocktree
 | 
			
		||||
                .erasure_meta_cf
 | 
			
		||||
                .get((slot, 0))
 | 
			
		||||
                .expect("DB get must succeed")
 | 
			
		||||
                .unwrap();
 | 
			
		||||
 | 
			
		||||
            assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull);
 | 
			
		||||
            assert_eq!(erasure_meta.data, 0xFFFF);
 | 
			
		||||
            assert_eq!(erasure_meta.coding, 0x0);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        #[test]
 | 
			
		||||
@@ -2766,11 +2677,12 @@ pub mod tests {
 | 
			
		||||
                .map(Blob::into)
 | 
			
		||||
                .collect::<Vec<_>>();
 | 
			
		||||
 | 
			
		||||
            let mut coding_generator = CodingGenerator::new();
 | 
			
		||||
            let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
 | 
			
		||||
 | 
			
		||||
            for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() {
 | 
			
		||||
                let focused_index = (set_index + 1) * NUM_DATA - 1;
 | 
			
		||||
                let coding_blobs = coding_generator.next(&data_blobs);
 | 
			
		||||
 | 
			
		||||
                assert_eq!(coding_blobs.len(), NUM_CODING);
 | 
			
		||||
 | 
			
		||||
                let deleted_data = data_blobs[NUM_DATA - 1].clone();
 | 
			
		||||
@@ -2821,13 +2733,12 @@ pub mod tests {
 | 
			
		||||
            Blocktree::destroy(&ledger_path).expect("Expect successful Blocktree destruction");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        /// FIXME: JERASURE Threading: see Issue
 | 
			
		||||
        /// [#3725](https://github.com/solana-labs/solana/issues/3725)
 | 
			
		||||
        #[test]
 | 
			
		||||
        fn test_recovery_multi_slot_multi_thread() {
 | 
			
		||||
            use rand::rngs::SmallRng;
 | 
			
		||||
            use rand::SeedableRng;
 | 
			
		||||
            use std::thread;
 | 
			
		||||
 | 
			
		||||
            const USE_THREADS: bool = true;
 | 
			
		||||
            let slots = vec![0, 3, 5, 50, 100];
 | 
			
		||||
            let max_erasure_sets = 16;
 | 
			
		||||
            solana_logger::setup();
 | 
			
		||||
@@ -2837,7 +2748,7 @@ pub mod tests {
 | 
			
		||||
 | 
			
		||||
            // Specification should generate a ledger where each slot has an random number of
 | 
			
		||||
            // erasure sets. Odd erasure sets will have all data blobs and no coding blobs, and even ones
 | 
			
		||||
            // will have between 1-4 data blobs missing and all coding blobs
 | 
			
		||||
            // will have between 1 data blob missing and 1 coding blob
 | 
			
		||||
            let specs = slots
 | 
			
		||||
                .iter()
 | 
			
		||||
                .map(|&slot| {
 | 
			
		||||
@@ -2848,7 +2759,7 @@ pub mod tests {
 | 
			
		||||
                            let (num_data, num_coding) = if set_index % 2 == 0 {
 | 
			
		||||
                                (NUM_DATA - rng.gen_range(1, 5), NUM_CODING)
 | 
			
		||||
                            } else {
 | 
			
		||||
                                (NUM_DATA, 0)
 | 
			
		||||
                                (NUM_DATA - 1, NUM_CODING - 1)
 | 
			
		||||
                            };
 | 
			
		||||
                            ErasureSpec {
 | 
			
		||||
                                set_index,
 | 
			
		||||
@@ -2873,8 +2784,13 @@ pub mod tests {
 | 
			
		||||
            for slot_model in model.clone() {
 | 
			
		||||
                let blocktree = Arc::clone(&blocktree);
 | 
			
		||||
                let slot = slot_model.slot;
 | 
			
		||||
                let closure = move || {
 | 
			
		||||
                let mut rng = SmallRng::from_rng(&mut rng).unwrap();
 | 
			
		||||
                let handle = thread::spawn(move || {
 | 
			
		||||
                    for erasure_set in slot_model.chunks {
 | 
			
		||||
                        // for even sets, write data blobs first, then write coding blobs, which
 | 
			
		||||
                        // should trigger recovery since all coding blobs will be inserted and
 | 
			
		||||
                        // between 1-4 data blobs are missing
 | 
			
		||||
                        if rng.gen() {
 | 
			
		||||
                            blocktree
 | 
			
		||||
                                .write_shared_blobs(erasure_set.data)
 | 
			
		||||
                                .expect("Writing data blobs must succeed");
 | 
			
		||||
@@ -2894,14 +2810,34 @@ pub mod tests {
 | 
			
		||||
                                "multislot: wrote coding: slot: {}, erasure_set: {}",
 | 
			
		||||
                                slot, erasure_set.set_index
 | 
			
		||||
                            );
 | 
			
		||||
                    }
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                if USE_THREADS {
 | 
			
		||||
                    handles.push(thread::spawn(closure));
 | 
			
		||||
                        } else {
 | 
			
		||||
                    closure();
 | 
			
		||||
                            // for odd sets, write coding blobs first, then write the data blobs.
 | 
			
		||||
                            // writing the data blobs should trigger recovery, since 3/4 coding and
 | 
			
		||||
                            // 15/16 data blobs will be present
 | 
			
		||||
                            for shared_coding_blob in erasure_set.coding {
 | 
			
		||||
                                let blob = shared_coding_blob.read().unwrap();
 | 
			
		||||
                                let size = blob.size() + BLOB_HEADER_SIZE;
 | 
			
		||||
                                blocktree
 | 
			
		||||
                                    .put_coding_blob_bytes(slot, blob.index(), &blob.data[..size])
 | 
			
		||||
                                    .expect("Writing coding blobs must succeed");
 | 
			
		||||
                            }
 | 
			
		||||
                            debug!(
 | 
			
		||||
                                "multislot: wrote coding: slot: {}, erasure_set: {}",
 | 
			
		||||
                                slot, erasure_set.set_index
 | 
			
		||||
                            );
 | 
			
		||||
 | 
			
		||||
                            blocktree
 | 
			
		||||
                                .write_shared_blobs(erasure_set.data)
 | 
			
		||||
                                .expect("Writing data blobs must succeed");
 | 
			
		||||
                            debug!(
 | 
			
		||||
                                "multislot: wrote data: slot: {}, erasure_set: {}",
 | 
			
		||||
                                slot, erasure_set.set_index
 | 
			
		||||
                            );
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
 | 
			
		||||
                handles.push(handle);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            handles
 | 
			
		||||
@@ -2926,7 +2862,7 @@ pub mod tests {
 | 
			
		||||
                    );
 | 
			
		||||
 | 
			
		||||
                    // all possibility for recovery should be exhausted
 | 
			
		||||
                    assert!(!erasure_meta.can_recover());
 | 
			
		||||
                    assert_eq!(erasure_meta.status(), ErasureMetaStatus::DataFull);
 | 
			
		||||
                    // Should have all data
 | 
			
		||||
                    assert_eq!(erasure_meta.data, 0xFFFF);
 | 
			
		||||
                    if set_index % 2 == 0 {
 | 
			
		||||
 
 | 
			
		||||
@@ -28,7 +28,6 @@ pub mod columns {
 | 
			
		||||
    /// Data Column
 | 
			
		||||
    pub struct Data;
 | 
			
		||||
 | 
			
		||||
    #[cfg(feature = "erasure")]
 | 
			
		||||
    #[derive(Debug)]
 | 
			
		||||
    /// The erasure meta column
 | 
			
		||||
    pub struct ErasureMeta;
 | 
			
		||||
 
 | 
			
		||||
@@ -138,7 +138,6 @@ impl TypedColumn<Kvs> for cf::SlotMeta {
 | 
			
		||||
    type Type = super::SlotMeta;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
impl Column<Kvs> for cf::ErasureMeta {
 | 
			
		||||
    const NAME: &'static str = super::ERASURE_META_CF;
 | 
			
		||||
    type Index = (u64, u64);
 | 
			
		||||
@@ -157,7 +156,6 @@ impl Column<Kvs> for cf::ErasureMeta {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
impl TypedColumn<Kvs> for cf::ErasureMeta {
 | 
			
		||||
    type Type = super::ErasureMeta;
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,3 @@
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
use crate::erasure::{NUM_CODING, NUM_DATA};
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
 | 
			
		||||
@@ -59,7 +58,6 @@ impl SlotMeta {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
 | 
			
		||||
/// Erasure coding information
 | 
			
		||||
pub struct ErasureMeta {
 | 
			
		||||
@@ -71,7 +69,13 @@ pub struct ErasureMeta {
 | 
			
		||||
    pub coding: u64,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
#[derive(Debug, PartialEq)]
 | 
			
		||||
pub enum ErasureMetaStatus {
 | 
			
		||||
    CanRecover,
 | 
			
		||||
    DataFull,
 | 
			
		||||
    StillNeed(usize),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ErasureMeta {
 | 
			
		||||
    pub fn new(set_index: u64) -> ErasureMeta {
 | 
			
		||||
        ErasureMeta {
 | 
			
		||||
@@ -81,46 +85,71 @@ impl ErasureMeta {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn can_recover(&self) -> bool {
 | 
			
		||||
    pub fn status(&self) -> ErasureMetaStatus {
 | 
			
		||||
        let (data_missing, coding_missing) = (
 | 
			
		||||
            NUM_DATA - self.data.count_ones() as usize,
 | 
			
		||||
            NUM_CODING - self.coding.count_ones() as usize,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        data_missing > 0 && data_missing + coding_missing <= NUM_CODING
 | 
			
		||||
        if data_missing > 0 && data_missing + coding_missing <= NUM_CODING {
 | 
			
		||||
            ErasureMetaStatus::CanRecover
 | 
			
		||||
        } else if data_missing == 0 {
 | 
			
		||||
            ErasureMetaStatus::DataFull
 | 
			
		||||
        } else {
 | 
			
		||||
            ErasureMetaStatus::StillNeed(data_missing + coding_missing - NUM_CODING)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn is_coding_present(&self, index: u64) -> bool {
 | 
			
		||||
        let set_index = Self::set_index_for(index);
 | 
			
		||||
        let position = index - self.start_index();
 | 
			
		||||
        let start = self.start_index();
 | 
			
		||||
        let end = start + NUM_CODING as u64;
 | 
			
		||||
 | 
			
		||||
        set_index == self.set_index && self.coding & (1 << position) != 0
 | 
			
		||||
        if start <= index && index < end {
 | 
			
		||||
            let position = index - start;
 | 
			
		||||
 | 
			
		||||
            self.coding & (1 << position) != 0
 | 
			
		||||
        } else {
 | 
			
		||||
            false
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn set_coding_present(&mut self, index: u64) {
 | 
			
		||||
    pub fn set_coding_present(&mut self, index: u64, present: bool) {
 | 
			
		||||
        let set_index = Self::set_index_for(index);
 | 
			
		||||
 | 
			
		||||
        if set_index as u64 == self.set_index {
 | 
			
		||||
            let position = index - self.start_index();
 | 
			
		||||
 | 
			
		||||
            if present {
 | 
			
		||||
                self.coding |= 1 << position;
 | 
			
		||||
            } else {
 | 
			
		||||
                self.coding &= !(1 << position);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn is_data_present(&self, index: u64) -> bool {
 | 
			
		||||
        let set_index = Self::set_index_for(index);
 | 
			
		||||
        let position = index - self.start_index();
 | 
			
		||||
        let start = self.start_index();
 | 
			
		||||
        let end = start + NUM_DATA as u64;
 | 
			
		||||
 | 
			
		||||
        set_index == self.set_index && self.data & (1 << position) != 0
 | 
			
		||||
        if start <= index && index < end {
 | 
			
		||||
            let position = index - start;
 | 
			
		||||
 | 
			
		||||
            self.data & (1 << position) != 0
 | 
			
		||||
        } else {
 | 
			
		||||
            false
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn set_data_present(&mut self, index: u64) {
 | 
			
		||||
    pub fn set_data_present(&mut self, index: u64, present: bool) {
 | 
			
		||||
        let set_index = Self::set_index_for(index);
 | 
			
		||||
 | 
			
		||||
        if set_index as u64 == self.set_index {
 | 
			
		||||
            let position = index - self.start_index();
 | 
			
		||||
 | 
			
		||||
            if present {
 | 
			
		||||
                self.data |= 1 << position;
 | 
			
		||||
            } else {
 | 
			
		||||
                self.data &= !(1 << position);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -139,7 +168,29 @@ impl ErasureMeta {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
#[test]
 | 
			
		||||
fn test_meta_indexes() {
 | 
			
		||||
    use rand::{thread_rng, Rng};
 | 
			
		||||
 | 
			
		||||
    let mut rng = thread_rng();
 | 
			
		||||
 | 
			
		||||
    for _ in 0..100 {
 | 
			
		||||
        let set_index = rng.gen_range(0, 1_000);
 | 
			
		||||
        let blob_index = (set_index * NUM_DATA as u64) + rng.gen_range(0, 16);
 | 
			
		||||
 | 
			
		||||
        assert_eq!(set_index, ErasureMeta::set_index_for(blob_index));
 | 
			
		||||
        let e_meta = ErasureMeta::new(set_index);
 | 
			
		||||
 | 
			
		||||
        assert_eq!(e_meta.start_index(), set_index * NUM_DATA as u64);
 | 
			
		||||
        let (data_end_idx, coding_end_idx) = e_meta.end_indexes();
 | 
			
		||||
        assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA as u64);
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            coding_end_idx,
 | 
			
		||||
            set_index * NUM_DATA as u64 + NUM_CODING as u64
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[test]
 | 
			
		||||
fn test_meta_coding_present() {
 | 
			
		||||
    let set_index = 0;
 | 
			
		||||
@@ -150,7 +201,7 @@ fn test_meta_coding_present() {
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    for i in 0..NUM_CODING as u64 {
 | 
			
		||||
        e_meta.set_coding_present(i);
 | 
			
		||||
        e_meta.set_coding_present(i, true);
 | 
			
		||||
        assert_eq!(e_meta.is_coding_present(i), true);
 | 
			
		||||
    }
 | 
			
		||||
    for i in NUM_CODING as u64..NUM_DATA as u64 {
 | 
			
		||||
@@ -160,7 +211,7 @@ fn test_meta_coding_present() {
 | 
			
		||||
    e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64);
 | 
			
		||||
 | 
			
		||||
    for i in (NUM_DATA * 17) as u64..((NUM_DATA * 17) + NUM_CODING) as u64 {
 | 
			
		||||
        e_meta.set_coding_present(i);
 | 
			
		||||
        e_meta.set_coding_present(i, true);
 | 
			
		||||
        assert_eq!(e_meta.is_coding_present(i), true);
 | 
			
		||||
    }
 | 
			
		||||
    for i in (NUM_DATA * 17 + NUM_CODING) as u64..((NUM_DATA * 17) + NUM_DATA) as u64 {
 | 
			
		||||
@@ -168,9 +219,8 @@ fn test_meta_coding_present() {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
#[test]
 | 
			
		||||
fn test_can_recover() {
 | 
			
		||||
fn test_erasure_meta_status() {
 | 
			
		||||
    let set_index = 0;
 | 
			
		||||
    let mut e_meta = ErasureMeta {
 | 
			
		||||
        set_index,
 | 
			
		||||
@@ -178,36 +228,63 @@ fn test_can_recover() {
 | 
			
		||||
        coding: 0,
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    assert!(!e_meta.can_recover());
 | 
			
		||||
    assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(NUM_DATA));
 | 
			
		||||
 | 
			
		||||
    e_meta.data = 0b1111_1111_1111_1111;
 | 
			
		||||
    e_meta.coding = 0x00;
 | 
			
		||||
 | 
			
		||||
    assert!(!e_meta.can_recover());
 | 
			
		||||
    assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
 | 
			
		||||
 | 
			
		||||
    e_meta.coding = 0x0e;
 | 
			
		||||
    assert_eq!(0x0fu8, 0b0000_1111u8);
 | 
			
		||||
    assert!(!e_meta.can_recover());
 | 
			
		||||
    assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
 | 
			
		||||
 | 
			
		||||
    e_meta.data = 0b0111_1111_1111_1111;
 | 
			
		||||
    assert!(e_meta.can_recover());
 | 
			
		||||
    assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
 | 
			
		||||
 | 
			
		||||
    e_meta.data = 0b0111_1111_1111_1110;
 | 
			
		||||
    assert!(e_meta.can_recover());
 | 
			
		||||
    assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
 | 
			
		||||
 | 
			
		||||
    e_meta.data = 0b0111_1111_1011_1110;
 | 
			
		||||
    assert!(e_meta.can_recover());
 | 
			
		||||
    assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
 | 
			
		||||
 | 
			
		||||
    e_meta.data = 0b0111_1011_1011_1110;
 | 
			
		||||
    assert!(!e_meta.can_recover());
 | 
			
		||||
    assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(1));
 | 
			
		||||
 | 
			
		||||
    e_meta.data = 0b0111_1011_1011_1110;
 | 
			
		||||
    assert!(!e_meta.can_recover());
 | 
			
		||||
    assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(1));
 | 
			
		||||
 | 
			
		||||
    e_meta.coding = 0b0000_1110;
 | 
			
		||||
    e_meta.data = 0b1111_1111_1111_1100;
 | 
			
		||||
    assert!(e_meta.can_recover());
 | 
			
		||||
    assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
 | 
			
		||||
 | 
			
		||||
    e_meta.data = 0b1111_1111_1111_1000;
 | 
			
		||||
    assert!(e_meta.can_recover());
 | 
			
		||||
    assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[test]
 | 
			
		||||
fn test_meta_data_present() {
 | 
			
		||||
    let set_index = 0;
 | 
			
		||||
    let mut e_meta = ErasureMeta {
 | 
			
		||||
        set_index,
 | 
			
		||||
        data: 0,
 | 
			
		||||
        coding: 0,
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    for i in 0..NUM_DATA as u64 {
 | 
			
		||||
        e_meta.set_data_present(i, true);
 | 
			
		||||
        assert_eq!(e_meta.is_data_present(i), true);
 | 
			
		||||
    }
 | 
			
		||||
    for i in NUM_DATA as u64..2 * NUM_DATA as u64 {
 | 
			
		||||
        assert_eq!(e_meta.is_data_present(i), false);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 23) as u64);
 | 
			
		||||
 | 
			
		||||
    for i in (NUM_DATA * 23) as u64..(NUM_DATA * 24) as u64 {
 | 
			
		||||
        e_meta.set_data_present(i, true);
 | 
			
		||||
        assert_eq!(e_meta.is_data_present(i), true);
 | 
			
		||||
    }
 | 
			
		||||
    for i in (NUM_DATA * 22) as u64..(NUM_DATA * 23) as u64 {
 | 
			
		||||
        assert_eq!(e_meta.is_data_present(i), false);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -30,9 +30,7 @@ impl Backend for Rocks {
 | 
			
		||||
    type Error = rocksdb::Error;
 | 
			
		||||
 | 
			
		||||
    fn open(path: &Path) -> Result<Rocks> {
 | 
			
		||||
        #[cfg(feature = "erasure")]
 | 
			
		||||
        use crate::blocktree::db::columns::ErasureMeta;
 | 
			
		||||
        use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta};
 | 
			
		||||
        use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta};
 | 
			
		||||
 | 
			
		||||
        fs::create_dir_all(&path)?;
 | 
			
		||||
 | 
			
		||||
@@ -43,7 +41,6 @@ impl Backend for Rocks {
 | 
			
		||||
        let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
 | 
			
		||||
        let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options());
 | 
			
		||||
        let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options());
 | 
			
		||||
        #[cfg(feature = "erasure")]
 | 
			
		||||
        let erasure_meta_cf_descriptor =
 | 
			
		||||
            ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
 | 
			
		||||
        let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
 | 
			
		||||
@@ -52,7 +49,6 @@ impl Backend for Rocks {
 | 
			
		||||
            meta_cf_descriptor,
 | 
			
		||||
            data_cf_descriptor,
 | 
			
		||||
            erasure_cf_descriptor,
 | 
			
		||||
            #[cfg(feature = "erasure")]
 | 
			
		||||
            erasure_meta_cf_descriptor,
 | 
			
		||||
            orphans_cf_descriptor,
 | 
			
		||||
        ];
 | 
			
		||||
@@ -64,13 +60,10 @@ impl Backend for Rocks {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn columns(&self) -> Vec<&'static str> {
 | 
			
		||||
        #[cfg(feature = "erasure")]
 | 
			
		||||
        use crate::blocktree::db::columns::ErasureMeta;
 | 
			
		||||
        use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta};
 | 
			
		||||
        use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta};
 | 
			
		||||
 | 
			
		||||
        vec![
 | 
			
		||||
            Coding::NAME,
 | 
			
		||||
            #[cfg(feature = "erasure")]
 | 
			
		||||
            ErasureMeta::NAME,
 | 
			
		||||
            Data::NAME,
 | 
			
		||||
            Orphans::NAME,
 | 
			
		||||
@@ -196,7 +189,6 @@ impl TypedColumn<Rocks> for cf::SlotMeta {
 | 
			
		||||
    type Type = super::SlotMeta;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
impl Column<Rocks> for cf::ErasureMeta {
 | 
			
		||||
    const NAME: &'static str = super::ERASURE_META_CF;
 | 
			
		||||
    type Index = (u64, u64);
 | 
			
		||||
@@ -216,7 +208,6 @@ impl Column<Rocks> for cf::ErasureMeta {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
impl TypedColumn<Rocks> for cf::ErasureMeta {
 | 
			
		||||
    type Type = super::ErasureMeta;
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -3,7 +3,6 @@
 | 
			
		||||
use crate::blocktree::Blocktree;
 | 
			
		||||
use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT};
 | 
			
		||||
use crate::entry::{EntrySender, EntrySlice};
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
use crate::erasure::CodingGenerator;
 | 
			
		||||
use crate::packet::index_blobs;
 | 
			
		||||
use crate::poh_recorder::WorkingBankEntries;
 | 
			
		||||
@@ -29,8 +28,6 @@ pub enum BroadcastStageReturnType {
 | 
			
		||||
 | 
			
		||||
struct Broadcast {
 | 
			
		||||
    id: Pubkey,
 | 
			
		||||
 | 
			
		||||
    #[cfg(feature = "erasure")]
 | 
			
		||||
    coding_generator: CodingGenerator,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -119,7 +116,6 @@ impl Broadcast {
 | 
			
		||||
 | 
			
		||||
        blocktree.write_shared_blobs(&blobs)?;
 | 
			
		||||
 | 
			
		||||
        #[cfg(feature = "erasure")]
 | 
			
		||||
        let coding = self.coding_generator.next(&blobs);
 | 
			
		||||
 | 
			
		||||
        let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
 | 
			
		||||
@@ -129,14 +125,10 @@ impl Broadcast {
 | 
			
		||||
        // Send out data
 | 
			
		||||
        ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;
 | 
			
		||||
 | 
			
		||||
        #[cfg(feature = "erasure")]
 | 
			
		||||
        ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
 | 
			
		||||
 | 
			
		||||
        inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
 | 
			
		||||
 | 
			
		||||
        // generate and transmit any erasure coding blobs.  if erasure isn't supported, just send everything again
 | 
			
		||||
        #[cfg(not(feature = "erasure"))]
 | 
			
		||||
        ClusterInfo::broadcast(&self.id, contains_last_tick, &broadcast_table, sock, &blobs)?;
 | 
			
		||||
        // send out erasures
 | 
			
		||||
        ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
 | 
			
		||||
 | 
			
		||||
        let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
 | 
			
		||||
 | 
			
		||||
@@ -194,11 +186,11 @@ impl BroadcastStage {
 | 
			
		||||
        storage_entry_sender: EntrySender,
 | 
			
		||||
    ) -> BroadcastStageReturnType {
 | 
			
		||||
        let me = cluster_info.read().unwrap().my_data().clone();
 | 
			
		||||
        let coding_generator = CodingGenerator::default();
 | 
			
		||||
 | 
			
		||||
        let mut broadcast = Broadcast {
 | 
			
		||||
            id: me.id,
 | 
			
		||||
            #[cfg(feature = "erasure")]
 | 
			
		||||
            coding_generator: CodingGenerator::new(),
 | 
			
		||||
            coding_generator,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        loop {
 | 
			
		||||
@@ -284,9 +276,9 @@ mod test {
 | 
			
		||||
    use crate::entry::create_ticks;
 | 
			
		||||
    use crate::service::Service;
 | 
			
		||||
    use solana_runtime::bank::Bank;
 | 
			
		||||
    use solana_sdk::genesis_block::GenesisBlock;
 | 
			
		||||
    use solana_sdk::hash::Hash;
 | 
			
		||||
    use solana_sdk::signature::{Keypair, KeypairUtil};
 | 
			
		||||
    use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
 | 
			
		||||
    use std::sync::atomic::AtomicBool;
 | 
			
		||||
    use std::sync::mpsc::channel;
 | 
			
		||||
    use std::sync::{Arc, RwLock};
 | 
			
		||||
@@ -321,7 +313,9 @@ mod test {
 | 
			
		||||
 | 
			
		||||
        let exit_sender = Arc::new(AtomicBool::new(false));
 | 
			
		||||
        let (storage_sender, _receiver) = channel();
 | 
			
		||||
        let bank = Arc::new(Bank::default());
 | 
			
		||||
 | 
			
		||||
        let (genesis_block, _) = GenesisBlock::new(10_000);
 | 
			
		||||
        let bank = Arc::new(Bank::new(&genesis_block));
 | 
			
		||||
 | 
			
		||||
        // Start up the broadcast stage
 | 
			
		||||
        let broadcast_service = BroadcastStage::new(
 | 
			
		||||
@@ -341,15 +335,13 @@ mod test {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    #[ignore]
 | 
			
		||||
    //TODO this test won't work since broadcast stage no longer edits the ledger
 | 
			
		||||
    fn test_broadcast_ledger() {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
        let ledger_path = get_tmp_ledger_path("test_broadcast_ledger");
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
            // Create the leader scheduler
 | 
			
		||||
            let leader_keypair = Keypair::new();
 | 
			
		||||
            let start_tick_height = 0;
 | 
			
		||||
            let max_tick_height = start_tick_height + DEFAULT_TICKS_PER_SLOT;
 | 
			
		||||
 | 
			
		||||
            let (entry_sender, entry_receiver) = channel();
 | 
			
		||||
            let broadcast_service = setup_dummy_broadcast_service(
 | 
			
		||||
@@ -358,6 +350,9 @@ mod test {
 | 
			
		||||
                entry_receiver,
 | 
			
		||||
            );
 | 
			
		||||
            let bank = broadcast_service.bank.clone();
 | 
			
		||||
            let start_tick_height = bank.tick_height();
 | 
			
		||||
            let max_tick_height = bank.max_tick_height();
 | 
			
		||||
            let ticks_per_slot = bank.ticks_per_slot();
 | 
			
		||||
 | 
			
		||||
            let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default());
 | 
			
		||||
            for (i, tick) in ticks.into_iter().enumerate() {
 | 
			
		||||
@@ -367,15 +362,23 @@ mod test {
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            sleep(Duration::from_millis(2000));
 | 
			
		||||
 | 
			
		||||
            trace!(
 | 
			
		||||
                "[broadcast_ledger] max_tick_height: {}, start_tick_height: {}, ticks_per_slot: {}",
 | 
			
		||||
                max_tick_height,
 | 
			
		||||
                start_tick_height,
 | 
			
		||||
                ticks_per_slot,
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            let blocktree = broadcast_service.blocktree;
 | 
			
		||||
            let mut blob_index = 0;
 | 
			
		||||
            for i in 0..max_tick_height - start_tick_height {
 | 
			
		||||
                let slot = (start_tick_height + i + 1) / DEFAULT_TICKS_PER_SLOT;
 | 
			
		||||
                let slot = (start_tick_height + i + 1) / ticks_per_slot;
 | 
			
		||||
 | 
			
		||||
                let result = blocktree.get_data_blob(slot, blob_index).unwrap();
 | 
			
		||||
 | 
			
		||||
                blob_index += 1;
 | 
			
		||||
                assert!(result.is_some());
 | 
			
		||||
                result.expect("expect blob presence");
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            drop(entry_sender);
 | 
			
		||||
 
 | 
			
		||||
@@ -1,278 +1,217 @@
 | 
			
		||||
// Support erasure coding
 | 
			
		||||
use crate::packet::{Blob, SharedBlob};
 | 
			
		||||
use crate::result::{Error, Result};
 | 
			
		||||
//! # Erasure Coding and Recovery
 | 
			
		||||
//!
 | 
			
		||||
//! Blobs are logically grouped into erasure sets or blocks. Each set contains 16 sequential data
 | 
			
		||||
//! blobs and 4 sequential coding blobs.
 | 
			
		||||
//!
 | 
			
		||||
//! Coding blobs in each set starting from `start_idx`:
 | 
			
		||||
//!   For each erasure set:
 | 
			
		||||
//!     generate `NUM_CODING` coding_blobs.
 | 
			
		||||
//!     index the coding blobs from `start_idx` to `start_idx + NUM_CODING - 1`.
 | 
			
		||||
//!
 | 
			
		||||
//!  model of an erasure set, with top row being data blobs and second being coding
 | 
			
		||||
//!  |<======================= NUM_DATA ==============================>|
 | 
			
		||||
//!  |<==== NUM_CODING ===>|
 | 
			
		||||
//!  +---+ +---+ +---+ +---+ +---+         +---+ +---+ +---+ +---+ +---+
 | 
			
		||||
//!  | D | | D | | D | | D | | D |         | D | | D | | D | | D | | D |
 | 
			
		||||
//!  +---+ +---+ +---+ +---+ +---+  . . .  +---+ +---+ +---+ +---+ +---+
 | 
			
		||||
//!  | C | | C | | C | | C | |   |         |   | |   | |   | |   | |   |
 | 
			
		||||
//!  +---+ +---+ +---+ +---+ +---+         +---+ +---+ +---+ +---+ +---+
 | 
			
		||||
//!
 | 
			
		||||
//!  blob structure for coding blobs
 | 
			
		||||
//!
 | 
			
		||||
//!   + ------- meta is set and used by transport, meta.size is actual length
 | 
			
		||||
//!   |           of data in the byte array blob.data
 | 
			
		||||
//!   |
 | 
			
		||||
//!   |          + -- data is stuff shipped over the wire, and has an included
 | 
			
		||||
//!   |          |        header
 | 
			
		||||
//!   V          V
 | 
			
		||||
//!  +----------+------------------------------------------------------------+
 | 
			
		||||
//!  | meta     |  data                                                      |
 | 
			
		||||
//!  |+---+--   |+---+---+---+---+------------------------------------------+|
 | 
			
		||||
//!  || s | .   || i |   | f | s |                                          ||
 | 
			
		||||
//!  || i | .   || n | i | l | i |                                          ||
 | 
			
		||||
//!  || z | .   || d | d | a | z |     blob.data(), or blob.data_mut()      ||
 | 
			
		||||
//!  || e |     || e |   | g | e |                                          ||
 | 
			
		||||
//!  |+---+--   || x |   | s |   |                                          ||
 | 
			
		||||
//!  |          |+---+---+---+---+------------------------------------------+|
 | 
			
		||||
//!  +----------+------------------------------------------------------------+
 | 
			
		||||
//!             |                |<=== coding blob part for "coding" =======>|
 | 
			
		||||
//!             |                                                            |
 | 
			
		||||
//!             |<============== data blob part for "coding"  ==============>|
 | 
			
		||||
//!
 | 
			
		||||
//!
 | 
			
		||||
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
 | 
			
		||||
use crate::result::Result;
 | 
			
		||||
use std::cmp;
 | 
			
		||||
use std::convert::AsMut;
 | 
			
		||||
use std::sync::{Arc, RwLock};
 | 
			
		||||
 | 
			
		||||
use reed_solomon_erasure::ReedSolomon;
 | 
			
		||||
 | 
			
		||||
//TODO(sakridge) pick these values
 | 
			
		||||
pub const NUM_DATA: usize = 16; // number of data blobs
 | 
			
		||||
pub const NUM_CODING: usize = 4; // number of coding blobs, also the maximum number that can go missing
 | 
			
		||||
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; // total number of blobs in an erasure set, includes data and coding blobs
 | 
			
		||||
/// Number of data blobs
 | 
			
		||||
pub const NUM_DATA: usize = 16;
 | 
			
		||||
/// Number of coding blobs; also the maximum number that can go missing.
 | 
			
		||||
pub const NUM_CODING: usize = 4;
 | 
			
		||||
/// Total number of blobs in an erasure set; includes data and coding blobs
 | 
			
		||||
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING;
 | 
			
		||||
 | 
			
		||||
macro_rules! align {
 | 
			
		||||
    ($x:expr, $align:expr) => {
 | 
			
		||||
        $x + ($align - 1) & !($align - 1)
 | 
			
		||||
    };
 | 
			
		||||
}
 | 
			
		||||
/// Represents an erasure "session" with a particular configuration and number of data and coding
 | 
			
		||||
/// blobs
 | 
			
		||||
#[derive(Debug, Clone)]
 | 
			
		||||
pub struct Session(ReedSolomon);
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, PartialEq, Eq)]
 | 
			
		||||
pub enum ErasureError {
 | 
			
		||||
    NotEnoughBlocksToDecode,
 | 
			
		||||
    DecodeError,
 | 
			
		||||
    EncodeError,
 | 
			
		||||
    InvalidBlockSize,
 | 
			
		||||
    InvalidBlobData,
 | 
			
		||||
    CorruptCoding,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// k = number of data devices
 | 
			
		||||
// m = number of coding devices
 | 
			
		||||
// w = word size
 | 
			
		||||
 | 
			
		||||
extern "C" {
 | 
			
		||||
    fn jerasure_matrix_encode(
 | 
			
		||||
        k: i32,
 | 
			
		||||
        m: i32,
 | 
			
		||||
        w: i32,
 | 
			
		||||
        matrix: *const i32,
 | 
			
		||||
        data_ptrs: *const *const u8,
 | 
			
		||||
        coding_ptrs: *const *mut u8,
 | 
			
		||||
        size: i32,
 | 
			
		||||
    );
 | 
			
		||||
    fn jerasure_matrix_decode(
 | 
			
		||||
        k: i32,
 | 
			
		||||
        m: i32,
 | 
			
		||||
        w: i32,
 | 
			
		||||
        matrix: *const i32,
 | 
			
		||||
        row_k_ones: i32,
 | 
			
		||||
        erasures: *const i32,
 | 
			
		||||
        data_ptrs: *const *mut u8,
 | 
			
		||||
        coding_ptrs: *const *mut u8,
 | 
			
		||||
        size: i32,
 | 
			
		||||
    ) -> i32;
 | 
			
		||||
    fn galois_single_divide(a: i32, b: i32, w: i32) -> i32;
 | 
			
		||||
    fn galois_init_default_field(w: i32) -> i32;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
use std::sync::Once;
 | 
			
		||||
static ERASURE_W_ONCE: Once = Once::new();
 | 
			
		||||
 | 
			
		||||
// jerasure word size of 32
 | 
			
		||||
fn w() -> i32 {
 | 
			
		||||
    let w = 32;
 | 
			
		||||
    unsafe {
 | 
			
		||||
        ERASURE_W_ONCE.call_once(|| {
 | 
			
		||||
            galois_init_default_field(w);
 | 
			
		||||
            ()
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
    w
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// jerasure checks that arrays are a multiple of w()/8 in length
 | 
			
		||||
fn wb() -> usize {
 | 
			
		||||
    (w() / 8) as usize
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn get_matrix(m: i32, k: i32, w: i32) -> Vec<i32> {
 | 
			
		||||
    let mut matrix = vec![0; (m * k) as usize];
 | 
			
		||||
    for i in 0..m {
 | 
			
		||||
        for j in 0..k {
 | 
			
		||||
            unsafe {
 | 
			
		||||
                matrix[(i * k + j) as usize] = galois_single_divide(1, i ^ (m + j), w);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    matrix
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Generate coding blocks into coding
 | 
			
		||||
//   There are some alignment restrictions, blocks should be aligned by 16 bytes
 | 
			
		||||
//   which means their size should be >= 16 bytes
 | 
			
		||||
fn generate_coding_blocks(coding: &mut [&mut [u8]], data: &[&[u8]]) -> Result<()> {
 | 
			
		||||
    if data.is_empty() {
 | 
			
		||||
        return Ok(());
 | 
			
		||||
    }
 | 
			
		||||
    let k = data.len() as i32;
 | 
			
		||||
    let m = coding.len() as i32;
 | 
			
		||||
    let block_len = data[0].len() as i32;
 | 
			
		||||
    let matrix: Vec<i32> = get_matrix(m, k, w());
 | 
			
		||||
    let mut data_arg = Vec::with_capacity(data.len());
 | 
			
		||||
    for block in data {
 | 
			
		||||
        if block_len != block.len() as i32 {
 | 
			
		||||
            error!(
 | 
			
		||||
                "data block size incorrect {} expected {}",
 | 
			
		||||
                block.len(),
 | 
			
		||||
                block_len
 | 
			
		||||
            );
 | 
			
		||||
            return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
 | 
			
		||||
        }
 | 
			
		||||
        data_arg.push(block.as_ptr());
 | 
			
		||||
    }
 | 
			
		||||
    let mut coding_arg = Vec::with_capacity(coding.len());
 | 
			
		||||
    for block in coding {
 | 
			
		||||
        if block_len != block.len() as i32 {
 | 
			
		||||
            error!(
 | 
			
		||||
                "coding block size incorrect {} expected {}",
 | 
			
		||||
                block.len(),
 | 
			
		||||
                block_len
 | 
			
		||||
            );
 | 
			
		||||
            return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
 | 
			
		||||
        }
 | 
			
		||||
        coding_arg.push(block.as_mut_ptr());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    unsafe {
 | 
			
		||||
        jerasure_matrix_encode(
 | 
			
		||||
            k,
 | 
			
		||||
            m,
 | 
			
		||||
            w(),
 | 
			
		||||
            matrix.as_ptr(),
 | 
			
		||||
            data_arg.as_ptr(),
 | 
			
		||||
            coding_arg.as_ptr(),
 | 
			
		||||
            block_len,
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Recover data + coding blocks into data blocks
 | 
			
		||||
//   data: array of blocks to recover into
 | 
			
		||||
//   coding: arry of coding blocks
 | 
			
		||||
//   erasures: list of indices in data where blocks should be recovered
 | 
			
		||||
pub fn decode_blocks(
 | 
			
		||||
    data: &mut [&mut [u8]],
 | 
			
		||||
    coding: &mut [&mut [u8]],
 | 
			
		||||
    erasures: &[i32],
 | 
			
		||||
) -> Result<()> {
 | 
			
		||||
    if data.is_empty() {
 | 
			
		||||
        return Ok(());
 | 
			
		||||
    }
 | 
			
		||||
    let block_len = data[0].len();
 | 
			
		||||
    let matrix: Vec<i32> = get_matrix(coding.len() as i32, data.len() as i32, w());
 | 
			
		||||
 | 
			
		||||
    // generate coding pointers, blocks should be the same size
 | 
			
		||||
    let mut coding_arg: Vec<*mut u8> = Vec::new();
 | 
			
		||||
    for x in coding.iter_mut() {
 | 
			
		||||
        if x.len() != block_len {
 | 
			
		||||
            return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
 | 
			
		||||
        }
 | 
			
		||||
        coding_arg.push(x.as_mut_ptr());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // generate data pointers, blocks should be the same size
 | 
			
		||||
    let mut data_arg: Vec<*mut u8> = Vec::new();
 | 
			
		||||
    for x in data.iter_mut() {
 | 
			
		||||
        if x.len() != block_len {
 | 
			
		||||
            return Err(Error::ErasureError(ErasureError::InvalidBlockSize));
 | 
			
		||||
        }
 | 
			
		||||
        data_arg.push(x.as_mut_ptr());
 | 
			
		||||
    }
 | 
			
		||||
    let ret = unsafe {
 | 
			
		||||
        jerasure_matrix_decode(
 | 
			
		||||
            data.len() as i32,
 | 
			
		||||
            coding.len() as i32,
 | 
			
		||||
            w(),
 | 
			
		||||
            matrix.as_ptr(),
 | 
			
		||||
            0,
 | 
			
		||||
            erasures.as_ptr(),
 | 
			
		||||
            data_arg.as_ptr(),
 | 
			
		||||
            coding_arg.as_ptr(),
 | 
			
		||||
            data[0].len() as i32,
 | 
			
		||||
        )
 | 
			
		||||
    };
 | 
			
		||||
    trace!("jerasure_matrix_decode ret: {}", ret);
 | 
			
		||||
    for x in data[erasures[0] as usize][0..8].iter() {
 | 
			
		||||
        trace!("{} ", x)
 | 
			
		||||
    }
 | 
			
		||||
    trace!("");
 | 
			
		||||
    if ret < 0 {
 | 
			
		||||
        return Err(Error::ErasureError(ErasureError::DecodeError));
 | 
			
		||||
    }
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Generate coding blocks in window starting from start_idx,
 | 
			
		||||
//   for num_blobs..  For each block place the coding blobs
 | 
			
		||||
//   at the start of the block like so:
 | 
			
		||||
//
 | 
			
		||||
//  model of an erasure set, with top row being data blobs and second being coding
 | 
			
		||||
//  |<======================= NUM_DATA ==============================>|
 | 
			
		||||
//  |<==== NUM_CODING ===>|
 | 
			
		||||
//  +---+ +---+ +---+ +---+ +---+         +---+ +---+ +---+ +---+ +---+
 | 
			
		||||
//  | D | | D | | D | | D | | D |         | D | | D | | D | | D | | D |
 | 
			
		||||
//  +---+ +---+ +---+ +---+ +---+  . . .  +---+ +---+ +---+ +---+ +---+
 | 
			
		||||
//  | C | | C | | C | | C | |   |         |   | |   | |   | |   | |   |
 | 
			
		||||
//  +---+ +---+ +---+ +---+ +---+         +---+ +---+ +---+ +---+ +---+
 | 
			
		||||
//
 | 
			
		||||
//  blob structure for coding, recover
 | 
			
		||||
//
 | 
			
		||||
//   + ------- meta is set and used by transport, meta.size is actual length
 | 
			
		||||
//   |           of data in the byte array blob.data
 | 
			
		||||
//   |
 | 
			
		||||
//   |          + -- data is stuff shipped over the wire, and has an included
 | 
			
		||||
//   |          |        header
 | 
			
		||||
//   V          V
 | 
			
		||||
//  +----------+------------------------------------------------------------+
 | 
			
		||||
//  | meta     |  data                                                      |
 | 
			
		||||
//  |+---+--   |+---+---+---+---+------------------------------------------+|
 | 
			
		||||
//  || s | .   || i |   | f | s |                                          ||
 | 
			
		||||
//  || i | .   || n | i | l | i |                                          ||
 | 
			
		||||
//  || z | .   || d | d | a | z |     blob.data(), or blob.data_mut()      ||
 | 
			
		||||
//  || e |     || e |   | g | e |                                          ||
 | 
			
		||||
//  |+---+--   || x |   | s |   |                                          ||
 | 
			
		||||
//  |          |+---+---+---+---+------------------------------------------+|
 | 
			
		||||
//  +----------+------------------------------------------------------------+
 | 
			
		||||
//             |                |<=== coding blob part for "coding" =======>|
 | 
			
		||||
//             |                                                            |
 | 
			
		||||
//             |<============== data blob part for "coding"  ==============>|
 | 
			
		||||
//
 | 
			
		||||
//
 | 
			
		||||
//
 | 
			
		||||
/// Generates coding blobs on demand given data blobs
 | 
			
		||||
#[derive(Debug, Clone)]
 | 
			
		||||
pub struct CodingGenerator {
 | 
			
		||||
    leftover: Vec<SharedBlob>, // SharedBlobs that couldn't be used in last call to next()
 | 
			
		||||
    /// SharedBlobs that couldn't be used in last call to next()
 | 
			
		||||
    leftover: Vec<SharedBlob>,
 | 
			
		||||
    session: Arc<Session>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Default for CodingGenerator {
 | 
			
		||||
    fn default() -> Self {
 | 
			
		||||
        CodingGenerator {
 | 
			
		||||
            leftover: Vec::with_capacity(NUM_DATA),
 | 
			
		||||
impl Session {
 | 
			
		||||
    pub fn new(data_count: usize, coding_count: usize) -> Result<Session> {
 | 
			
		||||
        let rs = ReedSolomon::new(data_count, coding_count)?;
 | 
			
		||||
 | 
			
		||||
        Ok(Session(rs))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Create coding blocks by overwriting `parity`
 | 
			
		||||
    pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> {
 | 
			
		||||
        self.0.encode_sep(data, parity)?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Recover data + coding blocks into data blocks
 | 
			
		||||
    /// # Arguments
 | 
			
		||||
    /// * `data` - array of data blocks to recover into
 | 
			
		||||
    /// * `coding` - array of coding blocks
 | 
			
		||||
    /// * `erasures` - list of indices in data where blocks should be recovered
 | 
			
		||||
    pub fn decode_blocks(&self, blocks: &mut [&mut [u8]], present: &[bool]) -> Result<()> {
 | 
			
		||||
        self.0.reconstruct(blocks, present)?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Returns `(number_of_data_blobs, number_of_coding_blobs)`
 | 
			
		||||
    pub fn dimensions(&self) -> (usize, usize) {
 | 
			
		||||
        (self.0.data_shard_count(), self.0.parity_shard_count())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Reconstruct any missing blobs in this erasure set if possible
 | 
			
		||||
    /// Re-indexes any coding blobs that have been reconstructed and fixes up size in metadata
 | 
			
		||||
    /// Assumes that the user has sliced into the blobs appropriately already. else recovery will
 | 
			
		||||
    /// return an error or garbage data
 | 
			
		||||
    pub fn reconstruct_blobs<B>(
 | 
			
		||||
        &self,
 | 
			
		||||
        blobs: &mut [B],
 | 
			
		||||
        present: &[bool],
 | 
			
		||||
        size: usize,
 | 
			
		||||
        block_start_idx: u64,
 | 
			
		||||
        slot: u64,
 | 
			
		||||
    ) -> Result<(Vec<Blob>, Vec<Blob>)>
 | 
			
		||||
    where
 | 
			
		||||
        B: AsMut<[u8]>,
 | 
			
		||||
    {
 | 
			
		||||
        let mut blocks: Vec<&mut [u8]> = blobs.iter_mut().map(AsMut::as_mut).collect();
 | 
			
		||||
 | 
			
		||||
        trace!("[reconstruct_blobs] present: {:?}, size: {}", present, size,);
 | 
			
		||||
 | 
			
		||||
        // Decode the blocks
 | 
			
		||||
        self.decode_blocks(blocks.as_mut_slice(), &present)?;
 | 
			
		||||
 | 
			
		||||
        let mut recovered_data = vec![];
 | 
			
		||||
        let mut recovered_coding = vec![];
 | 
			
		||||
 | 
			
		||||
        let erasures = present
 | 
			
		||||
            .iter()
 | 
			
		||||
            .enumerate()
 | 
			
		||||
            .filter_map(|(i, present)| if *present { None } else { Some(i) });
 | 
			
		||||
 | 
			
		||||
        // Create the missing blobs from the reconstructed data
 | 
			
		||||
        for n in erasures {
 | 
			
		||||
            let data_size;
 | 
			
		||||
            let idx;
 | 
			
		||||
            let first_byte;
 | 
			
		||||
 | 
			
		||||
            if n < NUM_DATA {
 | 
			
		||||
                let mut blob = Blob::new(&blocks[n]);
 | 
			
		||||
 | 
			
		||||
                data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
 | 
			
		||||
                idx = n as u64 + block_start_idx;
 | 
			
		||||
                first_byte = blob.data[0];
 | 
			
		||||
 | 
			
		||||
                blob.set_size(data_size);
 | 
			
		||||
                recovered_data.push(blob);
 | 
			
		||||
            } else {
 | 
			
		||||
                let mut blob = Blob::default();
 | 
			
		||||
                blob.data_mut()[..size].copy_from_slice(&blocks[n]);
 | 
			
		||||
                data_size = size;
 | 
			
		||||
                idx = (n as u64 + block_start_idx) - NUM_DATA as u64;
 | 
			
		||||
                first_byte = blob.data[0];
 | 
			
		||||
 | 
			
		||||
                blob.set_slot(slot);
 | 
			
		||||
                blob.set_index(idx);
 | 
			
		||||
                blob.set_size(data_size);
 | 
			
		||||
                recovered_coding.push(blob);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            trace!(
 | 
			
		||||
                "[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}",
 | 
			
		||||
                n,
 | 
			
		||||
                idx,
 | 
			
		||||
                data_size,
 | 
			
		||||
                first_byte
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok((recovered_data, recovered_coding))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl CodingGenerator {
 | 
			
		||||
    pub fn new() -> Self {
 | 
			
		||||
        Self::default()
 | 
			
		||||
    pub fn new(session: Arc<Session>) -> Self {
 | 
			
		||||
        CodingGenerator {
 | 
			
		||||
            leftover: Vec::with_capacity(session.0.data_shard_count()),
 | 
			
		||||
            session,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // must be called with consecutive data blobs from previous invocation
 | 
			
		||||
    // blobs from a new slot not start halfway through next_data
 | 
			
		||||
    /// Yields next set of coding blobs, if any.
 | 
			
		||||
    /// Must be called with consecutive data blobs within a slot.
 | 
			
		||||
    ///
 | 
			
		||||
    /// Passing in a slice with the first blob having a new slot will cause internal state to
 | 
			
		||||
    /// reset, so the above concern does not apply to slot boundaries, only indexes within a slot
 | 
			
		||||
    /// must be consecutive.
 | 
			
		||||
    ///
 | 
			
		||||
    /// If used improperly, it my return garbage coding blobs, but will not give an
 | 
			
		||||
    /// error.
 | 
			
		||||
    pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec<SharedBlob> {
 | 
			
		||||
        let (num_data, num_coding) = self.session.dimensions();
 | 
			
		||||
        let mut next_coding =
 | 
			
		||||
            Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING);
 | 
			
		||||
            Vec::with_capacity((self.leftover.len() + next_data.len()) / num_data * num_coding);
 | 
			
		||||
 | 
			
		||||
        if self.leftover.len() > 0 && next_data.len() > 0 {
 | 
			
		||||
            if self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot() {
 | 
			
		||||
                self.leftover.clear(); // reset on slot boundaries
 | 
			
		||||
            }
 | 
			
		||||
        if !self.leftover.is_empty()
 | 
			
		||||
            && !next_data.is_empty()
 | 
			
		||||
            && self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot()
 | 
			
		||||
        {
 | 
			
		||||
            self.leftover.clear();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect();
 | 
			
		||||
 | 
			
		||||
        for data_blobs in next_data.chunks(NUM_DATA) {
 | 
			
		||||
            if data_blobs.len() < NUM_DATA {
 | 
			
		||||
        for data_blobs in next_data.chunks(num_data) {
 | 
			
		||||
            if data_blobs.len() < num_data {
 | 
			
		||||
                self.leftover = data_blobs.to_vec();
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
            self.leftover.clear();
 | 
			
		||||
 | 
			
		||||
            // find max_data_size for the chunk, round length up to a multiple of wb()
 | 
			
		||||
            let max_data_size = align!(
 | 
			
		||||
                data_blobs
 | 
			
		||||
            // find max_data_size for the erasure set
 | 
			
		||||
            let max_data_size = data_blobs
 | 
			
		||||
                .iter()
 | 
			
		||||
                    .fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)),
 | 
			
		||||
                wb()
 | 
			
		||||
            );
 | 
			
		||||
                .fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max));
 | 
			
		||||
 | 
			
		||||
            let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
 | 
			
		||||
            let data_ptrs: Vec<_> = data_locks
 | 
			
		||||
@@ -280,9 +219,9 @@ impl CodingGenerator {
 | 
			
		||||
                .map(|l| &l.data[..max_data_size])
 | 
			
		||||
                .collect();
 | 
			
		||||
 | 
			
		||||
            let mut coding_blobs = Vec::with_capacity(NUM_CODING);
 | 
			
		||||
            let mut coding_blobs = Vec::with_capacity(num_coding);
 | 
			
		||||
 | 
			
		||||
            for data_blob in &data_locks[..NUM_CODING] {
 | 
			
		||||
            for data_blob in &data_locks[..num_coding] {
 | 
			
		||||
                let index = data_blob.index();
 | 
			
		||||
                let slot = data_blob.slot();
 | 
			
		||||
                let id = data_blob.id();
 | 
			
		||||
@@ -305,7 +244,7 @@ impl CodingGenerator {
 | 
			
		||||
                    .map(|blob| &mut blob.data_mut()[..max_data_size])
 | 
			
		||||
                    .collect();
 | 
			
		||||
 | 
			
		||||
                generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)
 | 
			
		||||
                self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice())
 | 
			
		||||
            }
 | 
			
		||||
            .is_ok()
 | 
			
		||||
            {
 | 
			
		||||
@@ -320,12 +259,27 @@ impl CodingGenerator {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Default for Session {
 | 
			
		||||
    fn default() -> Session {
 | 
			
		||||
        Session::new(NUM_DATA, NUM_CODING).unwrap()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Default for CodingGenerator {
 | 
			
		||||
    fn default() -> Self {
 | 
			
		||||
        let session = Session::default();
 | 
			
		||||
        CodingGenerator {
 | 
			
		||||
            leftover: Vec::with_capacity(session.0.data_shard_count()),
 | 
			
		||||
            session: Arc::new(session),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
pub mod test {
 | 
			
		||||
    use super::*;
 | 
			
		||||
    use crate::blocktree::get_tmp_ledger_path;
 | 
			
		||||
    use crate::blocktree::Blocktree;
 | 
			
		||||
    use crate::entry::{make_tiny_test_entries, EntrySlice};
 | 
			
		||||
    use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE};
 | 
			
		||||
    use solana_sdk::pubkey::Pubkey;
 | 
			
		||||
    use solana_sdk::signature::{Keypair, KeypairUtil};
 | 
			
		||||
@@ -368,63 +322,63 @@ pub mod test {
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_coding() {
 | 
			
		||||
        let zero_vec = vec![0; 16];
 | 
			
		||||
        let mut vs: Vec<Vec<u8>> = (0..4).map(|i| (i..(16 + i)).collect()).collect();
 | 
			
		||||
        const N_DATA: usize = 4;
 | 
			
		||||
        const N_CODING: usize = 2;
 | 
			
		||||
 | 
			
		||||
        let session = Session::new(N_DATA, N_CODING).unwrap();
 | 
			
		||||
 | 
			
		||||
        let mut vs: Vec<Vec<u8>> = (0..N_DATA as u8).map(|i| (i..(16 + i)).collect()).collect();
 | 
			
		||||
        let v_orig: Vec<u8> = vs[0].clone();
 | 
			
		||||
 | 
			
		||||
        let m = 2;
 | 
			
		||||
        let mut coding_blocks: Vec<_> = (0..m).map(|_| vec![0u8; 16]).collect();
 | 
			
		||||
        let mut coding_blocks: Vec<_> = (0..N_CODING).map(|_| vec![0u8; 16]).collect();
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
        let mut coding_blocks_slices: Vec<_> =
 | 
			
		||||
                coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect();
 | 
			
		||||
            let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect();
 | 
			
		||||
            coding_blocks.iter_mut().map(Vec::as_mut_slice).collect();
 | 
			
		||||
        let v_slices: Vec<_> = vs.iter().map(Vec::as_slice).collect();
 | 
			
		||||
 | 
			
		||||
        session
 | 
			
		||||
            .encode(v_slices.as_slice(), coding_blocks_slices.as_mut_slice())
 | 
			
		||||
            .expect("encoding must succeed");
 | 
			
		||||
 | 
			
		||||
            assert!(generate_coding_blocks(
 | 
			
		||||
                coding_blocks_slices.as_mut_slice(),
 | 
			
		||||
                v_slices.as_slice(),
 | 
			
		||||
            )
 | 
			
		||||
            .is_ok());
 | 
			
		||||
        }
 | 
			
		||||
        trace!("test_coding: coding blocks:");
 | 
			
		||||
        for b in &coding_blocks {
 | 
			
		||||
            trace!("test_coding: {:?}", b);
 | 
			
		||||
        }
 | 
			
		||||
        let erasure: i32 = 1;
 | 
			
		||||
        let erasures = vec![erasure, -1];
 | 
			
		||||
 | 
			
		||||
        let erasure: usize = 1;
 | 
			
		||||
        let present = &mut [true; N_DATA + N_CODING];
 | 
			
		||||
        present[erasure] = false;
 | 
			
		||||
        let erased = vs[erasure].clone();
 | 
			
		||||
 | 
			
		||||
        // clear an entry
 | 
			
		||||
        vs[erasure as usize].copy_from_slice(zero_vec.as_slice());
 | 
			
		||||
        vs[erasure as usize].copy_from_slice(&[0; 16]);
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
            let mut coding_blocks_slices: Vec<_> =
 | 
			
		||||
                coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect();
 | 
			
		||||
            let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect();
 | 
			
		||||
        let mut blocks: Vec<_> = vs
 | 
			
		||||
            .iter_mut()
 | 
			
		||||
            .chain(coding_blocks.iter_mut())
 | 
			
		||||
            .map(Vec::as_mut_slice)
 | 
			
		||||
            .collect();
 | 
			
		||||
 | 
			
		||||
            assert!(decode_blocks(
 | 
			
		||||
                v_slices.as_mut_slice(),
 | 
			
		||||
                coding_blocks_slices.as_mut_slice(),
 | 
			
		||||
                erasures.as_slice(),
 | 
			
		||||
            )
 | 
			
		||||
            .is_ok());
 | 
			
		||||
        }
 | 
			
		||||
        session
 | 
			
		||||
            .decode_blocks(blocks.as_mut_slice(), present)
 | 
			
		||||
            .expect("decoding must succeed");
 | 
			
		||||
 | 
			
		||||
        trace!("test_coding: vs:");
 | 
			
		||||
        for v in &vs {
 | 
			
		||||
            trace!("test_coding: {:?}", v);
 | 
			
		||||
        }
 | 
			
		||||
        assert_eq!(v_orig, vs[0]);
 | 
			
		||||
        assert_eq!(erased, vs[erasure]);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn test_toss_and_recover(
 | 
			
		||||
        session: &Session,
 | 
			
		||||
        data_blobs: &[SharedBlob],
 | 
			
		||||
        coding_blobs: &[SharedBlob],
 | 
			
		||||
        block_start_idx: usize,
 | 
			
		||||
    ) {
 | 
			
		||||
        let size = coding_blobs[0].read().unwrap().size();
 | 
			
		||||
 | 
			
		||||
        // toss one data and one coding
 | 
			
		||||
        let erasures: Vec<i32> = vec![0, NUM_DATA as i32, -1];
 | 
			
		||||
 | 
			
		||||
        let mut blobs: Vec<SharedBlob> = Vec::with_capacity(ERASURE_SET_SIZE);
 | 
			
		||||
 | 
			
		||||
        blobs.push(SharedBlob::default()); // empty data, erasure at zero
 | 
			
		||||
@@ -432,14 +386,23 @@ pub mod test {
 | 
			
		||||
            // skip first blob
 | 
			
		||||
            blobs.push(blob.clone());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        blobs.push(SharedBlob::default()); // empty coding, erasure at zero
 | 
			
		||||
        for blob in &coding_blobs[1..NUM_CODING] {
 | 
			
		||||
            blobs.push(blob.clone());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap();
 | 
			
		||||
        // toss one data and one coding
 | 
			
		||||
        let mut present = vec![true; blobs.len()];
 | 
			
		||||
        present[0] = false;
 | 
			
		||||
        present[NUM_DATA] = false;
 | 
			
		||||
 | 
			
		||||
        assert!(!corrupt);
 | 
			
		||||
        let (recovered_data, recovered_coding) = session
 | 
			
		||||
            .reconstruct_shared_blobs(&mut blobs, &present, size, block_start_idx as u64, 0)
 | 
			
		||||
            .expect("reconstruction must succeed");
 | 
			
		||||
 | 
			
		||||
        assert_eq!(recovered_data.len(), 1);
 | 
			
		||||
        assert_eq!(recovered_coding.len(), 1);
 | 
			
		||||
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            blobs[1].read().unwrap().meta,
 | 
			
		||||
@@ -450,15 +413,15 @@ pub mod test {
 | 
			
		||||
            data_blobs[block_start_idx + 1].read().unwrap().data()
 | 
			
		||||
        );
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            blobs[0].read().unwrap().meta,
 | 
			
		||||
            recovered_data[0].meta,
 | 
			
		||||
            data_blobs[block_start_idx].read().unwrap().meta
 | 
			
		||||
        );
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            blobs[0].read().unwrap().data(),
 | 
			
		||||
            recovered_data[0].data(),
 | 
			
		||||
            data_blobs[block_start_idx].read().unwrap().data()
 | 
			
		||||
        );
 | 
			
		||||
        assert_eq!(
 | 
			
		||||
            blobs[NUM_DATA].read().unwrap().data(),
 | 
			
		||||
            recovered_coding[0].data(),
 | 
			
		||||
            coding_blobs[0].read().unwrap().data()
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
@@ -468,11 +431,11 @@ pub mod test {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
 | 
			
		||||
        // trivial case
 | 
			
		||||
        let mut coding_generator = CodingGenerator::new();
 | 
			
		||||
        let mut coding_generator = CodingGenerator::default();
 | 
			
		||||
        let blobs = Vec::new();
 | 
			
		||||
        for _ in 0..NUM_DATA * 2 {
 | 
			
		||||
            let coding = coding_generator.next(&blobs);
 | 
			
		||||
            assert_eq!(coding.len(), 0);
 | 
			
		||||
            assert!(coding.is_empty());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // test coding by iterating one blob at a time
 | 
			
		||||
@@ -480,6 +443,7 @@ pub mod test {
 | 
			
		||||
 | 
			
		||||
        for (i, blob) in data_blobs.iter().cloned().enumerate() {
 | 
			
		||||
            let coding_blobs = coding_generator.next(&[blob]);
 | 
			
		||||
 | 
			
		||||
            if !coding_blobs.is_empty() {
 | 
			
		||||
                assert_eq!(i % NUM_DATA, NUM_DATA - 1);
 | 
			
		||||
                assert_eq!(coding_blobs.len(), NUM_CODING);
 | 
			
		||||
@@ -490,7 +454,12 @@ pub mod test {
 | 
			
		||||
                        ((i / NUM_DATA) * NUM_DATA + j) as u64
 | 
			
		||||
                    );
 | 
			
		||||
                }
 | 
			
		||||
                test_toss_and_recover(&data_blobs, &coding_blobs, i - (i % NUM_DATA));
 | 
			
		||||
                test_toss_and_recover(
 | 
			
		||||
                    &coding_generator.session,
 | 
			
		||||
                    &data_blobs,
 | 
			
		||||
                    &coding_blobs,
 | 
			
		||||
                    i - (i % NUM_DATA),
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@@ -499,7 +468,7 @@ pub mod test {
 | 
			
		||||
    fn test_erasure_generate_coding_reset_on_new_slot() {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
 | 
			
		||||
        let mut coding_generator = CodingGenerator::new();
 | 
			
		||||
        let mut coding_generator = CodingGenerator::default();
 | 
			
		||||
 | 
			
		||||
        // test coding by iterating one blob at a time
 | 
			
		||||
        let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
 | 
			
		||||
@@ -509,13 +478,18 @@ pub mod test {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]);
 | 
			
		||||
        assert_eq!(coding_blobs.len(), 0);
 | 
			
		||||
        assert!(coding_blobs.is_empty());
 | 
			
		||||
 | 
			
		||||
        let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]);
 | 
			
		||||
 | 
			
		||||
        assert_eq!(coding_blobs.len(), NUM_CODING);
 | 
			
		||||
 | 
			
		||||
        test_toss_and_recover(&data_blobs, &coding_blobs, NUM_DATA);
 | 
			
		||||
        test_toss_and_recover(
 | 
			
		||||
            &coding_generator.session,
 | 
			
		||||
            &data_blobs,
 | 
			
		||||
            &coding_blobs,
 | 
			
		||||
            NUM_DATA,
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[test]
 | 
			
		||||
@@ -571,24 +545,17 @@ pub mod test {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// This test is ignored because if successful, it never stops running. It is useful for
 | 
			
		||||
    /// dicovering an initialization race-condition in the erasure FFI bindings. If this bug
 | 
			
		||||
    /// re-emerges, running with `Z_THREADS = N` where `N > 1` should crash fairly rapidly.
 | 
			
		||||
    #[ignore]
 | 
			
		||||
    #[test]
 | 
			
		||||
    fn test_recovery_with_model() {
 | 
			
		||||
        use std::env;
 | 
			
		||||
        use std::sync::{Arc, Mutex};
 | 
			
		||||
        use std::thread;
 | 
			
		||||
 | 
			
		||||
        const MAX_ERASURE_SETS: u64 = 16;
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
        let n_threads: usize = env::var("Z_THREADS")
 | 
			
		||||
            .unwrap_or("1".to_string())
 | 
			
		||||
            .parse()
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        const N_THREADS: usize = 2;
 | 
			
		||||
        const N_SLOTS: u64 = 10;
 | 
			
		||||
 | 
			
		||||
        let specs = (0..).map(|slot| {
 | 
			
		||||
        solana_logger::setup();
 | 
			
		||||
 | 
			
		||||
        let specs = (0..N_SLOTS).map(|slot| {
 | 
			
		||||
            let num_erasure_sets = slot % MAX_ERASURE_SETS;
 | 
			
		||||
 | 
			
		||||
            let set_specs = (0..num_erasure_sets)
 | 
			
		||||
@@ -602,12 +569,12 @@ pub mod test {
 | 
			
		||||
            SlotSpec { slot, set_specs }
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        let decode_mutex = Arc::new(Mutex::new(()));
 | 
			
		||||
        let mut handles = vec![];
 | 
			
		||||
        let session = Arc::new(Session::default());
 | 
			
		||||
 | 
			
		||||
        for i in 0..n_threads {
 | 
			
		||||
        for i in 0..N_THREADS {
 | 
			
		||||
            let specs = specs.clone();
 | 
			
		||||
            let decode_mutex = Arc::clone(&decode_mutex);
 | 
			
		||||
            let session = Arc::clone(&session);
 | 
			
		||||
 | 
			
		||||
            let handle = thread::Builder::new()
 | 
			
		||||
                .name(i.to_string())
 | 
			
		||||
@@ -617,55 +584,39 @@ pub mod test {
 | 
			
		||||
                            let erased_coding = erasure_set.coding[0].clone();
 | 
			
		||||
                            let erased_data = erasure_set.data[..3].to_vec();
 | 
			
		||||
 | 
			
		||||
                            let mut data = Vec::with_capacity(NUM_DATA);
 | 
			
		||||
                            let mut coding = Vec::with_capacity(NUM_CODING);
 | 
			
		||||
                            let erasures = vec![0, 1, 2, NUM_DATA as i32, -1];
 | 
			
		||||
                            let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE);
 | 
			
		||||
 | 
			
		||||
                            data.push(SharedBlob::default());
 | 
			
		||||
                            data.push(SharedBlob::default());
 | 
			
		||||
                            data.push(SharedBlob::default());
 | 
			
		||||
                            blobs.push(SharedBlob::default());
 | 
			
		||||
                            blobs.push(SharedBlob::default());
 | 
			
		||||
                            blobs.push(SharedBlob::default());
 | 
			
		||||
                            for blob in erasure_set.data.into_iter().skip(3) {
 | 
			
		||||
                                data.push(blob);
 | 
			
		||||
                                blobs.push(blob);
 | 
			
		||||
                            }
 | 
			
		||||
 | 
			
		||||
                            coding.push(SharedBlob::default());
 | 
			
		||||
                            blobs.push(SharedBlob::default());
 | 
			
		||||
                            for blob in erasure_set.coding.into_iter().skip(1) {
 | 
			
		||||
                                coding.push(blob);
 | 
			
		||||
                                blobs.push(blob);
 | 
			
		||||
                            }
 | 
			
		||||
 | 
			
		||||
                            let size = erased_coding.read().unwrap().data_size() as usize;
 | 
			
		||||
                            let size = erased_coding.read().unwrap().size() as usize;
 | 
			
		||||
 | 
			
		||||
                            let mut data_locks: Vec<_> =
 | 
			
		||||
                                data.iter().map(|shared| shared.write().unwrap()).collect();
 | 
			
		||||
                            let mut coding_locks: Vec<_> = coding
 | 
			
		||||
                                .iter()
 | 
			
		||||
                                .map(|shared| shared.write().unwrap())
 | 
			
		||||
                                .collect();
 | 
			
		||||
                            let mut present = vec![true; ERASURE_SET_SIZE];
 | 
			
		||||
                            present[0] = false;
 | 
			
		||||
                            present[1] = false;
 | 
			
		||||
                            present[2] = false;
 | 
			
		||||
                            present[NUM_DATA] = false;
 | 
			
		||||
 | 
			
		||||
                            let mut data_ptrs: Vec<_> = data_locks
 | 
			
		||||
                                .iter_mut()
 | 
			
		||||
                                .map(|blob| &mut blob.data[..size])
 | 
			
		||||
                                .collect();
 | 
			
		||||
                            let mut coding_ptrs: Vec<_> = coding_locks
 | 
			
		||||
                                .iter_mut()
 | 
			
		||||
                                .map(|blob| &mut blob.data_mut()[..size])
 | 
			
		||||
                                .collect();
 | 
			
		||||
 | 
			
		||||
                            {
 | 
			
		||||
                                let _lock = decode_mutex.lock();
 | 
			
		||||
 | 
			
		||||
                                decode_blocks(
 | 
			
		||||
                                    data_ptrs.as_mut_slice(),
 | 
			
		||||
                                    coding_ptrs.as_mut_slice(),
 | 
			
		||||
                                    &erasures,
 | 
			
		||||
                            session
 | 
			
		||||
                                .reconstruct_shared_blobs(
 | 
			
		||||
                                    &mut blobs,
 | 
			
		||||
                                    &present,
 | 
			
		||||
                                    size,
 | 
			
		||||
                                    erasure_set.set_index * NUM_DATA as u64,
 | 
			
		||||
                                    slot_model.slot,
 | 
			
		||||
                                )
 | 
			
		||||
                                .expect("decoding must succeed");
 | 
			
		||||
                            }
 | 
			
		||||
                                .expect("reconstruction must succeed");
 | 
			
		||||
 | 
			
		||||
                            drop(coding_locks);
 | 
			
		||||
                            drop(data_locks);
 | 
			
		||||
 | 
			
		||||
                            for (expected, recovered) in erased_data.iter().zip(data.iter()) {
 | 
			
		||||
                            for (expected, recovered) in erased_data.iter().zip(blobs.iter()) {
 | 
			
		||||
                                let expected = expected.read().unwrap();
 | 
			
		||||
                                let mut recovered = recovered.write().unwrap();
 | 
			
		||||
                                let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE;
 | 
			
		||||
@@ -677,7 +628,7 @@ pub mod test {
 | 
			
		||||
 | 
			
		||||
                            assert_eq!(
 | 
			
		||||
                                erased_coding.read().unwrap().data(),
 | 
			
		||||
                                coding[0].read().unwrap().data()
 | 
			
		||||
                                blobs[NUM_DATA].read().unwrap().data()
 | 
			
		||||
                            );
 | 
			
		||||
 | 
			
		||||
                            debug!("passed set: {}", erasure_set.set_index);
 | 
			
		||||
@@ -702,7 +653,9 @@ pub mod test {
 | 
			
		||||
        IntoIt: Iterator<Item = S> + Clone + 'a,
 | 
			
		||||
        S: Borrow<SlotSpec>,
 | 
			
		||||
    {
 | 
			
		||||
        specs.into_iter().map(|spec| {
 | 
			
		||||
        let mut coding_generator = CodingGenerator::default();
 | 
			
		||||
 | 
			
		||||
        specs.into_iter().map(move |spec| {
 | 
			
		||||
            let spec = spec.borrow();
 | 
			
		||||
            let slot = spec.slot;
 | 
			
		||||
 | 
			
		||||
@@ -713,7 +666,7 @@ pub mod test {
 | 
			
		||||
                    let set_index = erasure_spec.set_index as usize;
 | 
			
		||||
                    let start_index = set_index * NUM_DATA;
 | 
			
		||||
 | 
			
		||||
                    let mut blobs = make_tiny_test_entries(NUM_DATA).to_single_entry_shared_blobs();
 | 
			
		||||
                    let mut blobs = generate_test_blobs(0, NUM_DATA);
 | 
			
		||||
                    index_blobs(
 | 
			
		||||
                        &blobs,
 | 
			
		||||
                        &Keypair::new().pubkey(),
 | 
			
		||||
@@ -722,7 +675,6 @@ pub mod test {
 | 
			
		||||
                        0,
 | 
			
		||||
                    );
 | 
			
		||||
 | 
			
		||||
                    let mut coding_generator = CodingGenerator::new();
 | 
			
		||||
                    let mut coding_blobs = coding_generator.next(&blobs);
 | 
			
		||||
 | 
			
		||||
                    blobs.drain(erasure_spec.num_data..);
 | 
			
		||||
@@ -770,84 +722,60 @@ pub mod test {
 | 
			
		||||
        blocktree
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    //    fn verify_test_blobs(offset: usize, blobs: &[SharedBlob]) -> bool {
 | 
			
		||||
    //        let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
 | 
			
		||||
    //
 | 
			
		||||
    //        blobs.iter().enumerate().all(|(i, blob)| {
 | 
			
		||||
    //            let blob = blob.read().unwrap();
 | 
			
		||||
    //            blob.index() as usize == i + offset && blob.data() == &data[..]
 | 
			
		||||
    //        })
 | 
			
		||||
    //    }
 | 
			
		||||
    //
 | 
			
		||||
    fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
 | 
			
		||||
        let blobs = make_tiny_test_entries(num_blobs).to_single_entry_shared_blobs();
 | 
			
		||||
        let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
 | 
			
		||||
 | 
			
		||||
        let blobs: Vec<_> = (0..num_blobs)
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .map(|_| {
 | 
			
		||||
                let mut blob = Blob::default();
 | 
			
		||||
                blob.data_mut()[..data.len()].copy_from_slice(&data);
 | 
			
		||||
                blob.set_size(data.len());
 | 
			
		||||
                Arc::new(RwLock::new(blob))
 | 
			
		||||
            })
 | 
			
		||||
            .collect();
 | 
			
		||||
 | 
			
		||||
        index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0);
 | 
			
		||||
 | 
			
		||||
        blobs
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn decode_blobs(
 | 
			
		||||
        blobs: &[SharedBlob],
 | 
			
		||||
        erasures: &[i32],
 | 
			
		||||
    impl Session {
 | 
			
		||||
        fn reconstruct_shared_blobs(
 | 
			
		||||
            &self,
 | 
			
		||||
            blobs: &mut [SharedBlob],
 | 
			
		||||
            present: &[bool],
 | 
			
		||||
            size: usize,
 | 
			
		||||
            block_start_idx: u64,
 | 
			
		||||
            slot: u64,
 | 
			
		||||
    ) -> Result<bool> {
 | 
			
		||||
        let mut locks = Vec::with_capacity(ERASURE_SET_SIZE);
 | 
			
		||||
        let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
 | 
			
		||||
        let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
 | 
			
		||||
        ) -> Result<(Vec<Blob>, Vec<Blob>)> {
 | 
			
		||||
            let mut locks: Vec<std::sync::RwLockWriteGuard<_>> = blobs
 | 
			
		||||
                .iter()
 | 
			
		||||
                .map(|shared_blob| shared_blob.write().unwrap())
 | 
			
		||||
                .collect();
 | 
			
		||||
 | 
			
		||||
        assert_eq!(blobs.len(), ERASURE_SET_SIZE);
 | 
			
		||||
        for b in blobs {
 | 
			
		||||
            locks.push(b.write().unwrap());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        for (i, l) in locks.iter_mut().enumerate() {
 | 
			
		||||
            let mut slices: Vec<_> = locks
 | 
			
		||||
                .iter_mut()
 | 
			
		||||
                .enumerate()
 | 
			
		||||
                .map(|(i, blob)| {
 | 
			
		||||
                    if i < NUM_DATA {
 | 
			
		||||
                data_ptrs.push(&mut l.data[..size]);
 | 
			
		||||
                        &mut blob.data[..size]
 | 
			
		||||
                    } else {
 | 
			
		||||
                coding_ptrs.push(&mut l.data_mut()[..size]);
 | 
			
		||||
                        &mut blob.data_mut()[..size]
 | 
			
		||||
                    }
 | 
			
		||||
                })
 | 
			
		||||
                .collect();
 | 
			
		||||
 | 
			
		||||
            self.reconstruct_blobs(&mut slices, present, size, block_start_idx, slot)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        // Decode the blocks
 | 
			
		||||
        decode_blocks(
 | 
			
		||||
            data_ptrs.as_mut_slice(),
 | 
			
		||||
            coding_ptrs.as_mut_slice(),
 | 
			
		||||
            &erasures,
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        // Create the missing blobs from the reconstructed data
 | 
			
		||||
        let mut corrupt = false;
 | 
			
		||||
 | 
			
		||||
        for i in &erasures[..erasures.len() - 1] {
 | 
			
		||||
            let n = *i as usize;
 | 
			
		||||
            let mut idx = n as u64 + block_start_idx;
 | 
			
		||||
 | 
			
		||||
            let mut data_size;
 | 
			
		||||
            if n < NUM_DATA {
 | 
			
		||||
                data_size = locks[n].data_size() as usize;
 | 
			
		||||
                data_size -= BLOB_HEADER_SIZE;
 | 
			
		||||
                if data_size > BLOB_DATA_SIZE {
 | 
			
		||||
                    error!("corrupt data blob[{}] data_size: {}", idx, data_size);
 | 
			
		||||
                    corrupt = true;
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                data_size = size;
 | 
			
		||||
                idx -= NUM_DATA as u64;
 | 
			
		||||
                locks[n].set_slot(slot);
 | 
			
		||||
                locks[n].set_index(idx);
 | 
			
		||||
 | 
			
		||||
                if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
 | 
			
		||||
                    error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
 | 
			
		||||
                    corrupt = true;
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            locks[n].set_size(data_size);
 | 
			
		||||
            trace!(
 | 
			
		||||
                "erasures[{}] ({}) size: {} data[0]: {}",
 | 
			
		||||
                *i,
 | 
			
		||||
                idx,
 | 
			
		||||
                data_size,
 | 
			
		||||
                locks[n].data()[0]
 | 
			
		||||
            );
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(corrupt)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -31,7 +31,6 @@ pub mod cluster;
 | 
			
		||||
pub mod cluster_info;
 | 
			
		||||
pub mod cluster_tests;
 | 
			
		||||
pub mod entry;
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
pub mod erasure;
 | 
			
		||||
pub mod fetch_stage;
 | 
			
		||||
pub mod fullnode;
 | 
			
		||||
 
 | 
			
		||||
@@ -372,7 +372,11 @@ pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
 | 
			
		||||
impl Blob {
 | 
			
		||||
    pub fn new(data: &[u8]) -> Self {
 | 
			
		||||
        let mut blob = Self::default();
 | 
			
		||||
 | 
			
		||||
        assert!(data.len() <= blob.data.len());
 | 
			
		||||
 | 
			
		||||
        let data_len = cmp::min(data.len(), blob.data.len());
 | 
			
		||||
 | 
			
		||||
        let bytes = &data[..data_len];
 | 
			
		||||
        blob.data[..data_len].copy_from_slice(bytes);
 | 
			
		||||
        blob.meta.size = blob.data_size() as usize;
 | 
			
		||||
@@ -459,8 +463,8 @@ impl Blob {
 | 
			
		||||
        LittleEndian::read_u64(&self.data[SIZE_RANGE])
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn set_data_size(&mut self, ix: u64) {
 | 
			
		||||
        LittleEndian::write_u64(&mut self.data[SIZE_RANGE], ix);
 | 
			
		||||
    pub fn set_data_size(&mut self, size: u64) {
 | 
			
		||||
        LittleEndian::write_u64(&mut self.data[SIZE_RANGE], size);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn data(&self) -> &[u8] {
 | 
			
		||||
 
 | 
			
		||||
@@ -2,8 +2,6 @@
 | 
			
		||||
 | 
			
		||||
use crate::blocktree;
 | 
			
		||||
use crate::cluster_info;
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
use crate::erasure;
 | 
			
		||||
use crate::packet;
 | 
			
		||||
use crate::poh_recorder;
 | 
			
		||||
use bincode;
 | 
			
		||||
@@ -25,8 +23,7 @@ pub enum Error {
 | 
			
		||||
    TransactionError(transaction::TransactionError),
 | 
			
		||||
    ClusterInfoError(cluster_info::ClusterInfoError),
 | 
			
		||||
    BlobError(packet::BlobError),
 | 
			
		||||
    #[cfg(feature = "erasure")]
 | 
			
		||||
    ErasureError(erasure::ErasureError),
 | 
			
		||||
    ErasureError(reed_solomon_erasure::Error),
 | 
			
		||||
    SendError,
 | 
			
		||||
    PohRecorderError(poh_recorder::PohRecorderError),
 | 
			
		||||
    BlocktreeError(blocktree::BlocktreeError),
 | 
			
		||||
@@ -67,9 +64,8 @@ impl std::convert::From<cluster_info::ClusterInfoError> for Error {
 | 
			
		||||
        Error::ClusterInfoError(e)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
#[cfg(feature = "erasure")]
 | 
			
		||||
impl std::convert::From<erasure::ErasureError> for Error {
 | 
			
		||||
    fn from(e: erasure::ErasureError) -> Error {
 | 
			
		||||
impl std::convert::From<reed_solomon_erasure::Error> for Error {
 | 
			
		||||
    fn from(e: reed_solomon_erasure::Error) -> Error {
 | 
			
		||||
        Error::ErasureError(e)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user