diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index 3f939f6464..d7bb9f7ada 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -7,12 +7,11 @@ use crate::db_ledger::DbLedger; use crate::entry::Entry; use crate::entry::EntrySlice; #[cfg(feature = "erasure")] -use crate::erasure; +use crate::erasure::CodingGenerator; use crate::leader_scheduler::LeaderScheduler; -use crate::packet::{index_blobs, SharedBlob}; +use crate::packet::index_blobs; use crate::result::{Error, Result}; use crate::service::Service; -use crate::window::{SharedWindow, WindowIndex, WindowUtil}; use log::Level; use rayon::prelude::*; use solana_metrics::{influxdb, submit}; @@ -32,160 +31,110 @@ pub enum BroadcastServiceReturnType { ExitSignal, } -#[allow(clippy::too_many_arguments)] -fn broadcast( - db_ledger: &Arc, +struct Broadcast { + id: Pubkey, max_tick_height: Option, - leader_id: Pubkey, - node_info: &NodeInfo, - broadcast_table: &[NodeInfo], - window: &SharedWindow, - receiver: &Receiver>, - sock: &UdpSocket, - transmit_index: &mut WindowIndex, - receive_index: &mut u64, - leader_scheduler: &Arc>, -) -> Result<()> { - let id = node_info.id; - let timer = Duration::new(1, 0); - let entries = receiver.recv_timeout(timer)?; - let now = Instant::now(); - let mut num_entries = entries.len(); - let mut ventries = Vec::new(); - ventries.push(entries); + blob_index: u64, - let mut contains_last_tick = false; - while let Ok(entries) = receiver.try_recv() { - num_entries += entries.len(); + #[cfg(feature = "erasure")] + coding_generator: CodingGenerator, +} + +impl Broadcast { + fn run( + &mut self, + broadcast_table: &[NodeInfo], + receiver: &Receiver>, + sock: &UdpSocket, + leader_scheduler: &Arc>, + db_ledger: &Arc, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let entries = receiver.recv_timeout(timer)?; + let now = Instant::now(); + let mut num_entries = entries.len(); + let mut ventries = Vec::new(); ventries.push(entries); - } - if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) { - contains_last_tick |= Some(last.tick_height) == max_tick_height; - } - - inc_new_counter_info!("broadcast_service-entries_received", num_entries); - - let to_blobs_start = Instant::now(); - - // Generate the slot heights for all the entries inside ventries - let slot_heights = generate_slots(&ventries, leader_scheduler); - - let blobs: Vec<_> = ventries - .into_par_iter() - .flat_map(|p| p.to_shared_blobs()) - .collect(); - - let blobs_slot_heights: Vec<(SharedBlob, u64)> = blobs.into_iter().zip(slot_heights).collect(); - - let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); - - let blobs_chunking = Instant::now(); - // We could receive more blobs than window slots so - // break them up into window-sized chunks to process - let window_size = window.read().unwrap().window_size(); - let blobs_chunked = blobs_slot_heights - .chunks(window_size as usize) - .map(|x| x.to_vec()); - let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed()); - - let broadcast_start = Instant::now(); - for blobs in blobs_chunked { - let blobs_len = blobs.len(); - trace!("{}: broadcast blobs.len: {}", id, blobs_len); - - index_blobs(blobs.iter(), &node_info.id, *receive_index); - - // keep the cache of blobs that are broadcast - inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); - { - let mut win = window.write().unwrap(); - assert!(blobs.len() <= win.len()); - let blobs: Vec<_> = blobs.into_iter().map(|(b, _)| b).collect(); - for b in &blobs { - let ix = b.read().unwrap().index().expect("blob index"); - let pos = (ix % window_size) as usize; - if let Some(x) = win[pos].data.take() { - trace!( - "{} popped {} at {}", - id, - x.read().unwrap().index().unwrap(), - pos - ); - } - if let Some(x) = win[pos].coding.take() { - trace!( - "{} popped {} at {}", - id, - x.read().unwrap().index().unwrap(), - pos - ); - } - - trace!("{} null {}", id, pos); - } - for b in &blobs { - { - let ix = b.read().unwrap().index().expect("blob index"); - let pos = (ix % window_size) as usize; - trace!("{} caching {} at {}", id, ix, pos); - assert!(win[pos].data.is_none()); - win[pos].data = Some(b.clone()); - } - } - - db_ledger - .write_consecutive_blobs(&blobs) - .expect("Unrecoverable failure to write to database"); + while let Ok(entries) = receiver.try_recv() { + num_entries += entries.len(); + ventries.push(entries); } + let last_tick = match self.max_tick_height { + Some(max_tick_height) => { + if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) { + last.tick_height == max_tick_height + } else { + false + } + } + None => false, + }; + + inc_new_counter_info!("broadcast_service-entries_received", num_entries); + + let to_blobs_start = Instant::now(); + + // Generate the slot heights for all the entries inside ventries + // this may span slots if this leader broadcasts for consecutive slots... + let slots = generate_slots(&ventries, leader_scheduler); + + let blobs: Vec<_> = ventries + .into_par_iter() + .flat_map(|p| p.to_shared_blobs()) + .collect(); + + // TODO: blob_index should be slot-relative... + index_blobs(&blobs, &self.id, self.blob_index, &slots); + + let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); + + let broadcast_start = Instant::now(); + + inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); + + db_ledger + .write_consecutive_blobs(&blobs) + .expect("Unrecoverable failure to write to database"); + + // don't count coding blobs in the blob indexes + self.blob_index += blobs.len() as u64; + + // Send out data + ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?; + // Fill in the coding blob data from the window data blobs #[cfg(feature = "erasure")] { - erasure::generate_coding( - &id, - &mut window.write().unwrap(), - *receive_index, - blobs_len, - &mut transmit_index.coding, - )?; + let coding = self.coding_generator.next(&blobs)?; + + // send out erasures + ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; } - *receive_index += blobs_len as u64; + let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); - // Send blobs out from the window - ClusterInfo::broadcast( - contains_last_tick, - leader_id, - &node_info, - &broadcast_table, - &window, - &sock, - transmit_index, - *receive_index, - )?; + inc_new_counter_info!( + "broadcast_service-time_ms", + duration_as_ms(&now.elapsed()) as usize + ); + info!( + "broadcast: {} entries, blob time {} broadcast time {}", + num_entries, to_blobs_elapsed, broadcast_elapsed + ); + + submit( + influxdb::Point::new("broadcast-service") + .add_field( + "transmit-index", + influxdb::Value::Integer(self.blob_index as i64), + ) + .to_owned(), + ); + + Ok(()) } - let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); - - inc_new_counter_info!( - "broadcast_service-time_ms", - duration_as_ms(&now.elapsed()) as usize - ); - info!( - "broadcast: {} entries, blob time {} chunking time {} broadcast time {}", - num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed - ); - - submit( - influxdb::Point::new("broadcast-service") - .add_field( - "transmit-index", - influxdb::Value::Integer(transmit_index.data as i64), - ) - .to_owned(), - ); - - Ok(()) } fn generate_slots( @@ -240,46 +189,41 @@ pub struct BroadcastService { } impl BroadcastService { - #[allow(clippy::too_many_arguments)] fn run( db_ledger: &Arc, bank: &Arc, sock: &UdpSocket, cluster_info: &Arc>, - window: &SharedWindow, entry_height: u64, leader_scheduler: &Arc>, receiver: &Receiver>, max_tick_height: Option, exit_signal: &Arc, ) -> BroadcastServiceReturnType { - let mut transmit_index = WindowIndex { - data: entry_height, - coding: entry_height, - }; - let mut receive_index = entry_height; let me = cluster_info.read().unwrap().my_data().clone(); + + let mut broadcast = Broadcast { + id: me.id, + max_tick_height, + blob_index: entry_height, + #[cfg(feature = "erasure")] + coding_generator: CodingGenerator::new(), + }; + loop { if exit_signal.load(Ordering::Relaxed) { return BroadcastServiceReturnType::ExitSignal; } let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(&bank); - // Layer 1 nodes are limited to the fanout size. + // Layer 1, leader nodes are limited to the fanout size. broadcast_table.truncate(DATA_PLANE_FANOUT); inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); - let leader_id = cluster_info.read().unwrap().leader_id(); - if let Err(e) = broadcast( - db_ledger, - max_tick_height, - leader_id, - &me, + if let Err(e) = broadcast.run( &broadcast_table, - &window, - &receiver, - &sock, - &mut transmit_index, - &mut receive_index, + receiver, + sock, leader_scheduler, + db_ledger, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { @@ -311,13 +255,11 @@ impl BroadcastService { /// WriteStage is the last stage in the pipeline), which will then close Broadcast service, /// which will then close FetchStage in the Tpu, and then the rest of the Tpu, /// completing the cycle. - #[allow(clippy::too_many_arguments)] pub fn new( db_ledger: Arc, bank: Arc, sock: UdpSocket, cluster_info: Arc>, - window: SharedWindow, entry_height: u64, leader_scheduler: Arc>, receiver: Receiver>, @@ -334,7 +276,6 @@ impl BroadcastService { &bank, &sock, &cluster_info, - &window, entry_height, &leader_scheduler, &receiver, @@ -364,7 +305,6 @@ mod test { use crate::db_ledger::DbLedger; use crate::entry::create_ticks; use crate::service::Service; - use crate::window::new_window; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::sync::atomic::AtomicBool; @@ -401,8 +341,6 @@ mod test { cluster_info.insert_info(broadcast_buddy.info); let cluster_info = Arc::new(RwLock::new(cluster_info)); - let window = new_window(32 * 1024); - let shared_window = Arc::new(RwLock::new(window)); let exit_sender = Arc::new(AtomicBool::new(false)); let bank = Arc::new(Bank::default()); @@ -412,7 +350,6 @@ mod test { bank.clone(), leader_info.sockets.broadcast, cluster_info, - shared_window, entry_height, leader_scheduler, entry_receiver, diff --git a/src/cluster_info.rs b/src/cluster_info.rs index f15c8e58da..0893602334 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -25,7 +25,6 @@ use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; use crate::result::Result; use crate::rpc::RPC_PORT; use crate::streamer::{BlobReceiver, BlobSender}; -use crate::window::{SharedWindow, WindowIndex}; use bincode::{deserialize, serialize}; use hashbrown::HashMap; use log::Level; @@ -493,58 +492,33 @@ impl ClusterInfo { /// broadcast messages from the leader to layer 1 nodes /// # Remarks - /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( + id: &Pubkey, contains_last_tick: bool, - leader_id: Pubkey, - me: &NodeInfo, broadcast_table: &[NodeInfo], - window: &SharedWindow, s: &UdpSocket, - transmit_index: &mut WindowIndex, - received_index: u64, + blobs: &[SharedBlob], ) -> Result<()> { if broadcast_table.is_empty() { - debug!("{}:not enough peers in cluster_info table", me.id); + debug!("{}:not enough peers in cluster_info table", id); inc_new_counter_info!("cluster_info-broadcast-not_enough_peers_error", 1); Err(ClusterInfoError::NoPeers)?; } - trace!( - "{} transmit_index: {:?} received_index: {} broadcast_len: {}", - me.id, - *transmit_index, - received_index, - broadcast_table.len() - ); - let old_transmit_index = transmit_index.data; + let orders = Self::create_broadcast_orders(contains_last_tick, blobs, broadcast_table); - let orders = Self::create_broadcast_orders( - contains_last_tick, - window, - broadcast_table, - transmit_index, - received_index, - me, - ); trace!("broadcast orders table {}", orders.len()); - let errs = Self::send_orders(s, orders, me, leader_id); + let errs = Self::send_orders(id, s, orders); for e in errs { if let Err(e) = &e { - trace!("broadcast result {:?}", e); + trace!("{}: broadcast result {:?}", id, e); } e?; - if transmit_index.data < received_index { - transmit_index.data += 1; - } } - inc_new_counter_info!( - "cluster_info-broadcast-max_idx", - (transmit_index.data - old_transmit_index) as usize - ); - transmit_index.coding = transmit_index.data; + + inc_new_counter_info!("cluster_info-broadcast-max_idx", blobs.len()); Ok(()) } @@ -603,19 +577,15 @@ impl ClusterInfo { } fn send_orders( + id: &Pubkey, s: &UdpSocket, - orders: Vec<(Option, Vec<&NodeInfo>)>, - me: &NodeInfo, - leader_id: Pubkey, + orders: Vec<(SharedBlob, Vec<&NodeInfo>)>, ) -> Vec> { orders .into_iter() .flat_map(|(b, vs)| { - // only leader should be broadcasting - assert!(vs.iter().find(|info| info.id == leader_id).is_none()); - let bl = b.unwrap(); - let blob = bl.read().unwrap(); - //TODO profile this, may need multiple sockets for par_iter + let blob = b.read().unwrap(); + let ids_and_tvus = if log_enabled!(Level::Trace) { let v_ids = vs.iter().map(|v| v.id); let tvus = vs.iter().map(|v| v.tvu); @@ -623,7 +593,7 @@ impl ClusterInfo { trace!( "{}: BROADCAST idx: {} sz: {} to {:?} coding: {}", - me.id, + id, blob.index().unwrap(), blob.meta.size, ids_and_tvus, @@ -642,7 +612,7 @@ impl ClusterInfo { let e = s.send_to(&blob.data[..blob.meta.size], &v.tvu); trace!( "{}: done broadcast {} to {:?}", - me.id, + id, blob.meta.size, ids_and_tvus ); @@ -656,70 +626,36 @@ impl ClusterInfo { fn create_broadcast_orders<'a>( contains_last_tick: bool, - window: &SharedWindow, + blobs: &[SharedBlob], broadcast_table: &'a [NodeInfo], - transmit_index: &mut WindowIndex, - received_index: u64, - me: &NodeInfo, - ) -> Vec<(Option, Vec<&'a NodeInfo>)> { + ) -> Vec<(SharedBlob, Vec<&'a NodeInfo>)> { // enumerate all the blobs in the window, those are the indices // transmit them to nodes, starting from a different node. - let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize); - let window_l = window.read().unwrap(); - let mut br_idx = transmit_index.data as usize % broadcast_table.len(); + if blobs.is_empty() { + return vec![]; + } - for idx in transmit_index.data..received_index { - let w_idx = idx as usize % window_l.len(); + let mut orders = Vec::with_capacity(blobs.len()); - trace!( - "{} broadcast order data w_idx {} br_idx {}", - me.id, - w_idx, - br_idx - ); + for (i, blob) in blobs.iter().enumerate() { + let br_idx = i % broadcast_table.len(); + trace!("broadcast order data br_idx {}", br_idx); + + orders.push((blob.clone(), vec![&broadcast_table[br_idx]])); + } + + if contains_last_tick { // Broadcast the last tick to everyone on the network so it doesn't get dropped // (Need to maximize probability the next leader in line sees this handoff tick // despite packet drops) - let target = if idx == received_index - 1 && contains_last_tick { - // If we see a tick at max_tick_height, then we know it must be the last - // Blob in the window, at index == received_index. There cannot be an entry - // that got sent after the last tick, guaranteed by the PohService). - assert!(window_l[w_idx].data.is_some()); - ( - window_l[w_idx].data.clone(), - broadcast_table.iter().collect(), - ) - } else { - (window_l[w_idx].data.clone(), vec![&broadcast_table[br_idx]]) - }; - - orders.push(target); - br_idx += 1; - br_idx %= broadcast_table.len(); - } - - for idx in transmit_index.coding..received_index { - let w_idx = idx as usize % window_l.len(); - - // skip over empty slots - if window_l[w_idx].coding.is_none() { - continue; - } - - trace!( - "{} broadcast order coding w_idx: {} br_idx :{}", - me.id, - w_idx, - br_idx, - ); - + // If we had a tick at max_tick_height, then we know it must be the last + // Blob in the broadcast, There cannot be an entry that got sent after the + // last tick, guaranteed by the PohService). orders.push(( - window_l[w_idx].coding.clone(), - vec![&broadcast_table[br_idx]], + blobs.last().unwrap().clone(), + broadcast_table.iter().collect(), )); - br_idx += 1; - br_idx %= broadcast_table.len(); } orders diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 16749f9911..b6a3a61cc7 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -976,11 +976,7 @@ mod tests { fn test_read_blobs_bytes() { let shared_blobs = make_tiny_test_entries(10).to_shared_blobs(); let slot = DEFAULT_SLOT_HEIGHT; - index_blobs( - shared_blobs.iter().zip(vec![slot; 10].into_iter()), - &Keypair::new().pubkey(), - 0, - ); + index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, &[slot; 10]); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); diff --git a/src/db_window.rs b/src/db_window.rs index 35ab2377e9..e2caed6bfd 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -562,9 +562,10 @@ mod test { let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs(); index_blobs( - shared_blobs.iter().zip(vec![slot; num_entries].into_iter()), + &shared_blobs, &Keypair::new().pubkey(), 0, + &vec![slot; num_entries], ); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); @@ -657,11 +658,10 @@ mod test { let shared_blobs = original_entries.clone().to_shared_blobs(); index_blobs( - shared_blobs - .iter() - .zip(vec![DEFAULT_SLOT_HEIGHT; num_entries].into_iter()), + &shared_blobs, &Keypair::new().pubkey(), 0, + &vec![DEFAULT_SLOT_HEIGHT; num_entries], ); let mut consume_queue = vec![]; diff --git a/src/erasure.rs b/src/erasure.rs index 6ee4eb6849..a7bbc60187 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -2,8 +2,6 @@ use crate::db_ledger::DbLedger; use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; use crate::result::{Error, Result}; -use crate::window::WindowSlot; -use solana_sdk::pubkey::Pubkey; use std::cmp; use std::sync::{Arc, RwLock}; @@ -177,6 +175,79 @@ pub fn decode_blocks( Ok(()) } +fn decode_blobs( + blobs: &[SharedBlob], + erasures: &[i32], + size: usize, + block_start_idx: u64, + slot: u64, +) -> Result { + let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); + let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); + let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); + + assert!(blobs.len() == NUM_DATA + NUM_CODING); + for b in blobs { + locks.push(b.write().unwrap()); + } + + for (i, l) in locks.iter_mut().enumerate() { + if i < NUM_DATA { + data_ptrs.push(&mut l.data[..size]); + } else { + coding_ptrs.push(&mut l.data_mut()[..size]); + } + } + + // Decode the blocks + decode_blocks( + data_ptrs.as_mut_slice(), + coding_ptrs.as_mut_slice(), + &erasures, + )?; + + // Create the missing blobs from the reconstructed data + let mut corrupt = false; + + for i in &erasures[..erasures.len() - 1] { + let n = *i as usize; + let mut idx = n as u64 + block_start_idx; + + let mut data_size; + if n < NUM_DATA { + data_size = locks[n].data_size().unwrap() as usize; + data_size -= BLOB_HEADER_SIZE; + if data_size > BLOB_DATA_SIZE { + error!("corrupt data blob[{}] data_size: {}", idx, data_size); + corrupt = true; + break; + } + } else { + data_size = size; + idx -= NUM_CODING as u64; + locks[n].set_slot(slot).unwrap(); + locks[n].set_index(idx).unwrap(); + + if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { + error!("corrupt coding blob[{}] data_size: {}", idx, data_size); + corrupt = true; + break; + } + } + + locks[n].set_size(data_size); + trace!( + "erasures[{}] ({}) size: {} data[0]: {}", + *i, + idx, + data_size, + locks[n].data()[0] + ); + } + + Ok(corrupt) +} + // Generate coding blocks in window starting from start_idx, // for num_blobs.. For each block place the coding blobs // at the end of the block like so: @@ -214,137 +285,80 @@ pub fn decode_blocks( // // // -pub fn generate_coding( - id: &Pubkey, - window: &mut [WindowSlot], - receive_index: u64, - num_blobs: usize, - transmit_index_coding: &mut u64, -) -> Result<()> { - // beginning of the coding blobs of the block that receive_index points into - let coding_index_start = - receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64; +pub struct CodingGenerator { + leftover: Vec, // SharedBlobs that couldn't be used in last call to next() +} - let start_idx = receive_index as usize % window.len(); - let mut block_start = start_idx - (start_idx % NUM_DATA); - - loop { - let block_end = block_start + NUM_DATA; - if block_end > (start_idx + num_blobs) { - break; +impl CodingGenerator { + pub fn new() -> Self { + Self { + leftover: Vec::with_capacity(NUM_DATA), } - info!( - "generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}", - id, block_start, block_end, start_idx, num_blobs - ); - - let mut max_data_size = 0; - - // find max_data_size, maybe bail if not all the data is here - for i in block_start..block_end { - let n = i % window.len(); - trace!("{} window[{}] = {:?}", id, n, window[n].data); - - if let Some(b) = &window[n].data { - max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size); - } else { - trace!("{} data block is null @ {}", id, n); - return Ok(()); - } - } - - // round up to the nearest jerasure alignment - max_data_size = align!(max_data_size, JERASURE_ALIGN); - - let mut data_blobs = Vec::with_capacity(NUM_DATA); - for i in block_start..block_end { - let n = i % window.len(); - - if let Some(b) = &window[n].data { - // make sure extra bytes in each blob are zero-d out for generation of - // coding blobs - let mut b_wl = b.write().unwrap(); - for i in b_wl.meta.size..max_data_size { - b_wl.data[i] = 0; - } - data_blobs.push(b); - } - } - - // getting ready to do erasure coding, means that we're potentially - // going back in time, tell our caller we've inserted coding blocks - // starting at coding_index_start - *transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start); - - let mut coding_blobs = Vec::with_capacity(NUM_CODING); - let coding_start = block_end - NUM_CODING; - for i in coding_start..block_end { - let n = i % window.len(); - assert!(window[n].coding.is_none()); - - window[n].coding = Some(SharedBlob::default()); - - let coding = window[n].coding.clone().unwrap(); - let mut coding_wl = coding.write().unwrap(); - for i in 0..max_data_size { - coding_wl.data[i] = 0; - } - // copy index and id from the data blob - if let Some(data) = &window[n].data { - let data_rl = data.read().unwrap(); - - let index = data_rl.index().unwrap(); - let slot = data_rl.slot().unwrap(); - let id = data_rl.id().unwrap(); - - trace!( - "{} copying index {} id {:?} from data to coding", - id, - index, - id - ); - coding_wl.set_index(index).unwrap(); - coding_wl.set_slot(slot).unwrap(); - coding_wl.set_id(&id).unwrap(); - } - coding_wl.set_size(max_data_size); - if coding_wl.set_coding().is_err() { - return Err(Error::ErasureError(ErasureError::EncodeError)); - } - - coding_blobs.push(coding.clone()); - } - - let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); - - let data_ptrs: Vec<_> = data_locks - .iter() - .enumerate() - .map(|(i, l)| { - trace!("{} i: {} data: {}", id, i, l.data[0]); - &l.data[..max_data_size] - }) - .collect(); - - let mut coding_locks: Vec<_> = coding_blobs.iter().map(|b| b.write().unwrap()).collect(); - - let mut coding_ptrs: Vec<_> = coding_locks - .iter_mut() - .enumerate() - .map(|(i, l)| { - trace!("{} i: {} coding: {}", id, i, l.data[0],); - &mut l.data_mut()[..max_data_size] - }) - .collect(); - - generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; - debug!( - "{} start_idx: {} data: {}:{} coding: {}:{}", - id, start_idx, block_start, block_end, coding_start, block_end - ); - block_start = block_end; } - Ok(()) + + // must be called with consecutive data blobs from previous invocation + pub fn next(&mut self, next_data: &[SharedBlob]) -> Result> { + let mut next_coding = + Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING); + + let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect(); + + for data_blobs in next_data.chunks(NUM_DATA) { + if data_blobs.len() < NUM_DATA { + self.leftover = data_blobs.to_vec(); + break; + } + self.leftover.clear(); + + // find max_data_size for the chunk + let max_data_size = align!( + data_blobs + .iter() + .fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)), + JERASURE_ALIGN + ); + + let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); + let data_ptrs: Vec<_> = data_locks + .iter() + .map(|l| &l.data[..max_data_size]) + .collect(); + + let mut coding_blobs = Vec::with_capacity(NUM_CODING); + + for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] { + let index = data_blob.index().unwrap(); + let slot = data_blob.slot().unwrap(); + let id = data_blob.id().unwrap(); + + let coding_blob = SharedBlob::default(); + { + let mut coding_blob = coding_blob.write().unwrap(); + coding_blob.set_index(index).unwrap(); + coding_blob.set_slot(slot).unwrap(); + coding_blob.set_id(&id).unwrap(); + coding_blob.set_size(max_data_size); + coding_blob.set_coding().unwrap(); + } + coding_blobs.push(coding_blob); + } + + { + let mut coding_locks: Vec<_> = + coding_blobs.iter().map(|b| b.write().unwrap()).collect(); + + let mut coding_ptrs: Vec<_> = coding_locks + .iter_mut() + .map(|l| &mut l.data_mut()[..max_data_size]) + .collect(); + + generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; + } + next_coding.append(&mut coding_blobs); + } + + Ok(next_coding) + } } // Recover the missing data and coding blobs from the input ledger. Returns a vector @@ -401,7 +415,6 @@ pub fn recover( let mut missing_data: Vec = vec![]; let mut missing_coding: Vec = vec![]; - let mut size = None; // Add the data blobs we have into the recovery vector, mark the missing ones for i in block_start_idx..block_end_idx { @@ -416,6 +429,7 @@ pub fn recover( )?; } + let mut size = None; // Add the coding blobs we have into the recovery vector, mark the missing ones for i in coding_start_idx..block_end_idx { let result = db_ledger.get_coding_blob_bytes(slot, i)?; @@ -434,78 +448,17 @@ pub fn recover( } } } - - // Due to check (data_missing + coding_missing) > NUM_CODING from earlier in this function, - // we know at least one coding block must exist, so "size" will not remain None after the - // below processing. + // Due to checks above verifying that (data_missing + coding_missing) <= NUM_CODING and + // data_missing > 0, we know at least one coding block must exist, so "size" can + // not remain None after the above processing. let size = size.unwrap(); + // marks end of erasures erasures.push(-1); + trace!("erasures[]:{:?} data_size: {}", erasures, size,); - let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); - { - let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); - let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); - - for b in &blobs { - locks.push(b.write().unwrap()); - } - - for (i, l) in locks.iter_mut().enumerate() { - if i < NUM_DATA { - data_ptrs.push(&mut l.data[..size]); - } else { - coding_ptrs.push(&mut l.data_mut()[..size]); - } - } - - // Decode the blocks - decode_blocks( - data_ptrs.as_mut_slice(), - coding_ptrs.as_mut_slice(), - &erasures, - )?; - } - - // Create the missing blobs from the reconstructed data - let mut corrupt = false; - - for i in &erasures[..erasures.len() - 1] { - let n = *i as usize; - let mut idx = n as u64 + block_start_idx; - - let mut data_size; - if n < NUM_DATA { - data_size = locks[n].data_size().unwrap() as usize; - data_size -= BLOB_HEADER_SIZE; - if data_size > BLOB_DATA_SIZE { - error!("corrupt data blob[{}] data_size: {}", idx, data_size); - corrupt = true; - break; - } - } else { - data_size = size; - idx -= NUM_CODING as u64; - locks[n].set_slot(slot).unwrap(); - locks[n].set_index(idx).unwrap(); - - if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { - error!("corrupt coding blob[{}] data_size: {}", idx, data_size); - corrupt = true; - break; - } - } - - locks[n].set_size(data_size); - trace!( - "erasures[{}] ({}) size: {} data[0]: {}", - *i, - idx, - data_size, - locks[n].data()[0] - ); - } + let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx, slot)?; if corrupt { // Remove the corrupted coding blobs so there's no effort wasted in trying to @@ -560,7 +513,7 @@ pub mod test { use std::sync::Arc; #[test] - pub fn test_coding() { + fn test_coding() { let zero_vec = vec![0; 16]; let mut vs: Vec> = (0..4).map(|i| (i..(16 + i)).collect()).collect(); let v_orig: Vec = vs[0].clone(); @@ -608,6 +561,75 @@ pub mod test { assert_eq!(v_orig, vs[0]); } + #[test] + fn test_erasure_generate_coding() { + solana_logger::setup(); + + // trivial case + let mut coding_generator = CodingGenerator::new(); + let blobs = Vec::new(); + for _ in 0..NUM_DATA * 2 { + let coding = coding_generator.next(&blobs).unwrap(); + assert_eq!(coding.len(), 0); + } + + // test coding by iterating one blob at a time + let data_blobs = generate_test_blobs(0, NUM_DATA * 2); + + for (i, blob) in data_blobs.iter().cloned().enumerate() { + let coding = coding_generator.next(&[blob]).unwrap(); + + if !coding.is_empty() { + assert_eq!(i % NUM_DATA, NUM_DATA - 1); + assert_eq!(coding.len(), NUM_CODING); + + let size = coding[0].read().unwrap().size().unwrap(); + + // toss one data and one coding + let erasures: Vec = vec![0, NUM_DATA as i32, -1]; + + let block_start_idx = i - (i % NUM_DATA); + let mut blobs: Vec = Vec::with_capacity(NUM_DATA + NUM_CODING); + + blobs.push(SharedBlob::default()); // empty data, erasure at zero + for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] { + // skip first blob + blobs.push(blob.clone()); + } + blobs.push(SharedBlob::default()); // empty coding, erasure at NUM_DATA + for blob in &coding[1..NUM_CODING] { + blobs.push(blob.clone()); + } + + let corrupt = + decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap(); + + assert!(!corrupt); + + assert_eq!( + blobs[1].read().unwrap().meta, + data_blobs[block_start_idx + 1].read().unwrap().meta + ); + assert_eq!( + blobs[1].read().unwrap().data(), + data_blobs[block_start_idx + 1].read().unwrap().data() + ); + assert_eq!( + blobs[0].read().unwrap().meta, + data_blobs[block_start_idx].read().unwrap().meta + ); + assert_eq!( + blobs[0].read().unwrap().data(), + data_blobs[block_start_idx].read().unwrap().data() + ); + assert_eq!( + blobs[NUM_DATA].read().unwrap().data(), + coding[0].read().unwrap().data() + ); + } + } + } + // TODO: Temprorary function used in tests to generate a database ledger // from the window (which is used to generate the erasure coding) // until we also transition generate_coding() and BroadcastStage to use DbLedger @@ -663,6 +685,140 @@ pub mod test { db_ledger } + fn generate_coding( + id: &Pubkey, + window: &mut [WindowSlot], + receive_index: u64, + num_blobs: usize, + transmit_index_coding: &mut u64, + ) -> Result<()> { + // beginning of the coding blobs of the block that receive_index points into + let coding_index_start = + receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64; + + let start_idx = receive_index as usize % window.len(); + let mut block_start = start_idx - (start_idx % NUM_DATA); + + loop { + let block_end = block_start + NUM_DATA; + if block_end > (start_idx + num_blobs) { + break; + } + info!( + "generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}", + id, block_start, block_end, start_idx, num_blobs + ); + + let mut max_data_size = 0; + + // find max_data_size, maybe bail if not all the data is here + for i in block_start..block_end { + let n = i % window.len(); + trace!("{} window[{}] = {:?}", id, n, window[n].data); + + if let Some(b) = &window[n].data { + max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size); + } else { + trace!("{} data block is null @ {}", id, n); + return Ok(()); + } + } + + // round up to the nearest jerasure alignment + max_data_size = align!(max_data_size, JERASURE_ALIGN); + + let mut data_blobs = Vec::with_capacity(NUM_DATA); + for i in block_start..block_end { + let n = i % window.len(); + + if let Some(b) = &window[n].data { + // make sure extra bytes in each blob are zero-d out for generation of + // coding blobs + let mut b_wl = b.write().unwrap(); + for i in b_wl.meta.size..max_data_size { + b_wl.data[i] = 0; + } + data_blobs.push(b); + } + } + + // getting ready to do erasure coding, means that we're potentially + // going back in time, tell our caller we've inserted coding blocks + // starting at coding_index_start + *transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start); + + let mut coding_blobs = Vec::with_capacity(NUM_CODING); + let coding_start = block_end - NUM_CODING; + for i in coding_start..block_end { + let n = i % window.len(); + assert!(window[n].coding.is_none()); + + window[n].coding = Some(SharedBlob::default()); + + let coding = window[n].coding.clone().unwrap(); + let mut coding_wl = coding.write().unwrap(); + for i in 0..max_data_size { + coding_wl.data[i] = 0; + } + // copy index and id from the data blob + if let Some(data) = &window[n].data { + let data_rl = data.read().unwrap(); + + let index = data_rl.index().unwrap(); + let slot = data_rl.slot().unwrap(); + let id = data_rl.id().unwrap(); + + trace!( + "{} copying index {} id {:?} from data to coding", + id, + index, + id + ); + coding_wl.set_index(index).unwrap(); + coding_wl.set_slot(slot).unwrap(); + coding_wl.set_id(&id).unwrap(); + } + coding_wl.set_size(max_data_size); + if coding_wl.set_coding().is_err() { + return Err(Error::ErasureError(ErasureError::EncodeError)); + } + + coding_blobs.push(coding.clone()); + } + + let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); + + let data_ptrs: Vec<_> = data_locks + .iter() + .enumerate() + .map(|(i, l)| { + trace!("{} i: {} data: {}", id, i, l.data[0]); + &l.data[..max_data_size] + }) + .collect(); + + let mut coding_locks: Vec<_> = + coding_blobs.iter().map(|b| b.write().unwrap()).collect(); + + let mut coding_ptrs: Vec<_> = coding_locks + .iter_mut() + .enumerate() + .map(|(i, l)| { + trace!("{} i: {} coding: {}", id, i, l.data[0],); + &mut l.data_mut()[..max_data_size] + }) + .collect(); + + generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; + debug!( + "{} start_idx: {} data: {}:{} coding: {}:{}", + id, start_idx, block_start, block_end, coding_start, block_end + ); + block_start = block_end; + } + Ok(()) + } + pub fn setup_window_ledger( offset: usize, num_blobs: usize, @@ -740,12 +896,14 @@ pub mod test { blobs.push(b_); } - { - // Make some dummy slots - let slot_tick_heights: Vec<(&SharedBlob, u64)> = - blobs.iter().zip(vec![slot; blobs.len()]).collect(); - index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64); - } + // Make some dummy slots + index_blobs( + &blobs, + &Keypair::new().pubkey(), + offset as u64, + &vec![slot; blobs.len()], + ); + for b in blobs { let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; @@ -754,6 +912,18 @@ pub mod test { window } + fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec { + let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs(); + + index_blobs( + &blobs, + &Keypair::new().pubkey(), + offset as u64, + &vec![DEFAULT_SLOT_HEIGHT; blobs.len()], + ); + blobs + } + fn generate_entry_window(offset: usize, num_blobs: usize) -> Vec { let mut window = vec![ WindowSlot { @@ -763,17 +933,8 @@ pub mod test { }; WINDOW_SIZE ]; - let entries = make_tiny_test_entries(num_blobs); - let blobs = entries.to_shared_blobs(); - { - // Make some dummy slots - let slot_tick_heights: Vec<(&SharedBlob, u64)> = blobs - .iter() - .zip(vec![DEFAULT_SLOT_HEIGHT; blobs.len()]) - .collect(); - index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64); - } + let blobs = generate_test_blobs(offset, num_blobs); for b in blobs.into_iter() { let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; diff --git a/src/fullnode.rs b/src/fullnode.rs index 2e868f577a..439057b22e 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -14,7 +14,6 @@ use crate::tpu::{Tpu, TpuReturnType}; use crate::tpu_forwarder::TpuForwarder; use crate::tvu::{Sockets, Tvu, TvuReturnType}; use crate::vote_signer_proxy::VoteSignerProxy; -use crate::window::{new_window, SharedWindow}; use log::Level; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -98,7 +97,6 @@ pub struct Fullnode { bank: Arc, cluster_info: Arc>, sigverify_disabled: bool, - shared_window: SharedWindow, tvu_sockets: Vec, repair_socket: UdpSocket, retransmit_socket: UdpSocket, @@ -204,8 +202,6 @@ impl Fullnode { let db_ledger = db_ledger.unwrap_or_else(|| Self::make_db_ledger(ledger_path)); - let window = new_window(32 * 1024); - let shared_window = Arc::new(RwLock::new(window)); node.info.wallclock = timestamp(); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair( node.info, @@ -315,7 +311,6 @@ impl Fullnode { .try_clone() .expect("Failed to clone broadcast socket"), cluster_info.clone(), - shared_window.clone(), entry_height, bank.leader_scheduler.clone(), entry_receiver, @@ -331,7 +326,6 @@ impl Fullnode { Fullnode { keypair, cluster_info, - shared_window, bank, sigverify_disabled, gossip_service, @@ -487,7 +481,6 @@ impl Fullnode { .try_clone() .expect("Failed to clone broadcast socket"), self.cluster_info.clone(), - self.shared_window.clone(), entry_height, self.bank.leader_scheduler.clone(), blob_receiver, diff --git a/src/lib.rs b/src/lib.rs index e656a134b8..697d11952f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,6 +70,7 @@ pub mod tpu; pub mod tpu_forwarder; pub mod tvu; pub mod vote_signer_proxy; +#[cfg(test)] pub mod window; pub mod window_service; diff --git a/src/packet.rs b/src/packet.rs index 53b79fc40c..76d0f81384 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -8,7 +8,6 @@ use log::Level; use serde::Serialize; pub use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; -use std::borrow::Borrow; use std::cmp; use std::fmt; use std::io; @@ -451,21 +450,14 @@ impl Blob { } } -pub fn index_blobs(blobs: I, id: &Pubkey, mut index: u64) -where - I: IntoIterator, - J: Borrow<(K, u64)>, - K: Borrow, -{ +pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slots: &[u64]) { // enumerate all the blobs, those are the indices - for b in blobs { - let (b, slot) = b.borrow(); - let mut blob = b.borrow().write().unwrap(); + for (blob, slot) in blobs.iter().zip(slots) { + let mut blob = blob.write().unwrap(); blob.set_index(index).expect("set_index"); blob.set_slot(*slot).expect("set_slot"); blob.set_id(id).expect("set_id"); - blob.set_flags(0).unwrap(); index += 1; }