diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index d7bb9f7ada..3f939f6464 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -7,11 +7,12 @@ use crate::db_ledger::DbLedger; use crate::entry::Entry; use crate::entry::EntrySlice; #[cfg(feature = "erasure")] -use crate::erasure::CodingGenerator; +use crate::erasure; use crate::leader_scheduler::LeaderScheduler; -use crate::packet::index_blobs; +use crate::packet::{index_blobs, SharedBlob}; use crate::result::{Error, Result}; use crate::service::Service; +use crate::window::{SharedWindow, WindowIndex, WindowUtil}; use log::Level; use rayon::prelude::*; use solana_metrics::{influxdb, submit}; @@ -31,110 +32,160 @@ pub enum BroadcastServiceReturnType { ExitSignal, } -struct Broadcast { - id: Pubkey, +#[allow(clippy::too_many_arguments)] +fn broadcast( + db_ledger: &Arc, max_tick_height: Option, - blob_index: u64, + leader_id: Pubkey, + node_info: &NodeInfo, + broadcast_table: &[NodeInfo], + window: &SharedWindow, + receiver: &Receiver>, + sock: &UdpSocket, + transmit_index: &mut WindowIndex, + receive_index: &mut u64, + leader_scheduler: &Arc>, +) -> Result<()> { + let id = node_info.id; + let timer = Duration::new(1, 0); + let entries = receiver.recv_timeout(timer)?; + let now = Instant::now(); + let mut num_entries = entries.len(); + let mut ventries = Vec::new(); + ventries.push(entries); - #[cfg(feature = "erasure")] - coding_generator: CodingGenerator, -} - -impl Broadcast { - fn run( - &mut self, - broadcast_table: &[NodeInfo], - receiver: &Receiver>, - sock: &UdpSocket, - leader_scheduler: &Arc>, - db_ledger: &Arc, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let entries = receiver.recv_timeout(timer)?; - let now = Instant::now(); - let mut num_entries = entries.len(); - let mut ventries = Vec::new(); + let mut contains_last_tick = false; + while let Ok(entries) = receiver.try_recv() { + num_entries += entries.len(); ventries.push(entries); + } - while let Ok(entries) = receiver.try_recv() { - num_entries += entries.len(); - ventries.push(entries); - } + if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) { + contains_last_tick |= Some(last.tick_height) == max_tick_height; + } - let last_tick = match self.max_tick_height { - Some(max_tick_height) => { - if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) { - last.tick_height == max_tick_height - } else { - false + inc_new_counter_info!("broadcast_service-entries_received", num_entries); + + let to_blobs_start = Instant::now(); + + // Generate the slot heights for all the entries inside ventries + let slot_heights = generate_slots(&ventries, leader_scheduler); + + let blobs: Vec<_> = ventries + .into_par_iter() + .flat_map(|p| p.to_shared_blobs()) + .collect(); + + let blobs_slot_heights: Vec<(SharedBlob, u64)> = blobs.into_iter().zip(slot_heights).collect(); + + let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); + + let blobs_chunking = Instant::now(); + // We could receive more blobs than window slots so + // break them up into window-sized chunks to process + let window_size = window.read().unwrap().window_size(); + let blobs_chunked = blobs_slot_heights + .chunks(window_size as usize) + .map(|x| x.to_vec()); + let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed()); + + let broadcast_start = Instant::now(); + for blobs in blobs_chunked { + let blobs_len = blobs.len(); + trace!("{}: broadcast blobs.len: {}", id, blobs_len); + + index_blobs(blobs.iter(), &node_info.id, *receive_index); + + // keep the cache of blobs that are broadcast + inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); + { + let mut win = window.write().unwrap(); + assert!(blobs.len() <= win.len()); + let blobs: Vec<_> = blobs.into_iter().map(|(b, _)| b).collect(); + for b in &blobs { + let ix = b.read().unwrap().index().expect("blob index"); + let pos = (ix % window_size) as usize; + if let Some(x) = win[pos].data.take() { + trace!( + "{} popped {} at {}", + id, + x.read().unwrap().index().unwrap(), + pos + ); + } + if let Some(x) = win[pos].coding.take() { + trace!( + "{} popped {} at {}", + id, + x.read().unwrap().index().unwrap(), + pos + ); + } + + trace!("{} null {}", id, pos); + } + for b in &blobs { + { + let ix = b.read().unwrap().index().expect("blob index"); + let pos = (ix % window_size) as usize; + trace!("{} caching {} at {}", id, ix, pos); + assert!(win[pos].data.is_none()); + win[pos].data = Some(b.clone()); } } - None => false, - }; - inc_new_counter_info!("broadcast_service-entries_received", num_entries); - - let to_blobs_start = Instant::now(); - - // Generate the slot heights for all the entries inside ventries - // this may span slots if this leader broadcasts for consecutive slots... - let slots = generate_slots(&ventries, leader_scheduler); - - let blobs: Vec<_> = ventries - .into_par_iter() - .flat_map(|p| p.to_shared_blobs()) - .collect(); - - // TODO: blob_index should be slot-relative... - index_blobs(&blobs, &self.id, self.blob_index, &slots); - - let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); - - let broadcast_start = Instant::now(); - - inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); - - db_ledger - .write_consecutive_blobs(&blobs) - .expect("Unrecoverable failure to write to database"); - - // don't count coding blobs in the blob indexes - self.blob_index += blobs.len() as u64; - - // Send out data - ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?; + db_ledger + .write_consecutive_blobs(&blobs) + .expect("Unrecoverable failure to write to database"); + } // Fill in the coding blob data from the window data blobs #[cfg(feature = "erasure")] { - let coding = self.coding_generator.next(&blobs)?; - - // send out erasures - ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?; + erasure::generate_coding( + &id, + &mut window.write().unwrap(), + *receive_index, + blobs_len, + &mut transmit_index.coding, + )?; } - let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); + *receive_index += blobs_len as u64; - inc_new_counter_info!( - "broadcast_service-time_ms", - duration_as_ms(&now.elapsed()) as usize - ); - info!( - "broadcast: {} entries, blob time {} broadcast time {}", - num_entries, to_blobs_elapsed, broadcast_elapsed - ); - - submit( - influxdb::Point::new("broadcast-service") - .add_field( - "transmit-index", - influxdb::Value::Integer(self.blob_index as i64), - ) - .to_owned(), - ); - - Ok(()) + // Send blobs out from the window + ClusterInfo::broadcast( + contains_last_tick, + leader_id, + &node_info, + &broadcast_table, + &window, + &sock, + transmit_index, + *receive_index, + )?; } + let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); + + inc_new_counter_info!( + "broadcast_service-time_ms", + duration_as_ms(&now.elapsed()) as usize + ); + info!( + "broadcast: {} entries, blob time {} chunking time {} broadcast time {}", + num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed + ); + + submit( + influxdb::Point::new("broadcast-service") + .add_field( + "transmit-index", + influxdb::Value::Integer(transmit_index.data as i64), + ) + .to_owned(), + ); + + Ok(()) } fn generate_slots( @@ -189,41 +240,46 @@ pub struct BroadcastService { } impl BroadcastService { + #[allow(clippy::too_many_arguments)] fn run( db_ledger: &Arc, bank: &Arc, sock: &UdpSocket, cluster_info: &Arc>, + window: &SharedWindow, entry_height: u64, leader_scheduler: &Arc>, receiver: &Receiver>, max_tick_height: Option, exit_signal: &Arc, ) -> BroadcastServiceReturnType { - let me = cluster_info.read().unwrap().my_data().clone(); - - let mut broadcast = Broadcast { - id: me.id, - max_tick_height, - blob_index: entry_height, - #[cfg(feature = "erasure")] - coding_generator: CodingGenerator::new(), + let mut transmit_index = WindowIndex { + data: entry_height, + coding: entry_height, }; - + let mut receive_index = entry_height; + let me = cluster_info.read().unwrap().my_data().clone(); loop { if exit_signal.load(Ordering::Relaxed) { return BroadcastServiceReturnType::ExitSignal; } let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(&bank); - // Layer 1, leader nodes are limited to the fanout size. + // Layer 1 nodes are limited to the fanout size. broadcast_table.truncate(DATA_PLANE_FANOUT); inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); - if let Err(e) = broadcast.run( - &broadcast_table, - receiver, - sock, - leader_scheduler, + let leader_id = cluster_info.read().unwrap().leader_id(); + if let Err(e) = broadcast( db_ledger, + max_tick_height, + leader_id, + &me, + &broadcast_table, + &window, + &receiver, + &sock, + &mut transmit_index, + &mut receive_index, + leader_scheduler, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { @@ -255,11 +311,13 @@ impl BroadcastService { /// WriteStage is the last stage in the pipeline), which will then close Broadcast service, /// which will then close FetchStage in the Tpu, and then the rest of the Tpu, /// completing the cycle. + #[allow(clippy::too_many_arguments)] pub fn new( db_ledger: Arc, bank: Arc, sock: UdpSocket, cluster_info: Arc>, + window: SharedWindow, entry_height: u64, leader_scheduler: Arc>, receiver: Receiver>, @@ -276,6 +334,7 @@ impl BroadcastService { &bank, &sock, &cluster_info, + &window, entry_height, &leader_scheduler, &receiver, @@ -305,6 +364,7 @@ mod test { use crate::db_ledger::DbLedger; use crate::entry::create_ticks; use crate::service::Service; + use crate::window::new_window; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::sync::atomic::AtomicBool; @@ -341,6 +401,8 @@ mod test { cluster_info.insert_info(broadcast_buddy.info); let cluster_info = Arc::new(RwLock::new(cluster_info)); + let window = new_window(32 * 1024); + let shared_window = Arc::new(RwLock::new(window)); let exit_sender = Arc::new(AtomicBool::new(false)); let bank = Arc::new(Bank::default()); @@ -350,6 +412,7 @@ mod test { bank.clone(), leader_info.sockets.broadcast, cluster_info, + shared_window, entry_height, leader_scheduler, entry_receiver, diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 0893602334..f15c8e58da 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -25,6 +25,7 @@ use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; use crate::result::Result; use crate::rpc::RPC_PORT; use crate::streamer::{BlobReceiver, BlobSender}; +use crate::window::{SharedWindow, WindowIndex}; use bincode::{deserialize, serialize}; use hashbrown::HashMap; use log::Level; @@ -492,33 +493,58 @@ impl ClusterInfo { /// broadcast messages from the leader to layer 1 nodes /// # Remarks + /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( - id: &Pubkey, contains_last_tick: bool, + leader_id: Pubkey, + me: &NodeInfo, broadcast_table: &[NodeInfo], + window: &SharedWindow, s: &UdpSocket, - blobs: &[SharedBlob], + transmit_index: &mut WindowIndex, + received_index: u64, ) -> Result<()> { if broadcast_table.is_empty() { - debug!("{}:not enough peers in cluster_info table", id); + debug!("{}:not enough peers in cluster_info table", me.id); inc_new_counter_info!("cluster_info-broadcast-not_enough_peers_error", 1); Err(ClusterInfoError::NoPeers)?; } + trace!( + "{} transmit_index: {:?} received_index: {} broadcast_len: {}", + me.id, + *transmit_index, + received_index, + broadcast_table.len() + ); - let orders = Self::create_broadcast_orders(contains_last_tick, blobs, broadcast_table); + let old_transmit_index = transmit_index.data; + let orders = Self::create_broadcast_orders( + contains_last_tick, + window, + broadcast_table, + transmit_index, + received_index, + me, + ); trace!("broadcast orders table {}", orders.len()); - let errs = Self::send_orders(id, s, orders); + let errs = Self::send_orders(s, orders, me, leader_id); for e in errs { if let Err(e) = &e { - trace!("{}: broadcast result {:?}", id, e); + trace!("broadcast result {:?}", e); } e?; + if transmit_index.data < received_index { + transmit_index.data += 1; + } } - - inc_new_counter_info!("cluster_info-broadcast-max_idx", blobs.len()); + inc_new_counter_info!( + "cluster_info-broadcast-max_idx", + (transmit_index.data - old_transmit_index) as usize + ); + transmit_index.coding = transmit_index.data; Ok(()) } @@ -577,15 +603,19 @@ impl ClusterInfo { } fn send_orders( - id: &Pubkey, s: &UdpSocket, - orders: Vec<(SharedBlob, Vec<&NodeInfo>)>, + orders: Vec<(Option, Vec<&NodeInfo>)>, + me: &NodeInfo, + leader_id: Pubkey, ) -> Vec> { orders .into_iter() .flat_map(|(b, vs)| { - let blob = b.read().unwrap(); - + // only leader should be broadcasting + assert!(vs.iter().find(|info| info.id == leader_id).is_none()); + let bl = b.unwrap(); + let blob = bl.read().unwrap(); + //TODO profile this, may need multiple sockets for par_iter let ids_and_tvus = if log_enabled!(Level::Trace) { let v_ids = vs.iter().map(|v| v.id); let tvus = vs.iter().map(|v| v.tvu); @@ -593,7 +623,7 @@ impl ClusterInfo { trace!( "{}: BROADCAST idx: {} sz: {} to {:?} coding: {}", - id, + me.id, blob.index().unwrap(), blob.meta.size, ids_and_tvus, @@ -612,7 +642,7 @@ impl ClusterInfo { let e = s.send_to(&blob.data[..blob.meta.size], &v.tvu); trace!( "{}: done broadcast {} to {:?}", - id, + me.id, blob.meta.size, ids_and_tvus ); @@ -626,36 +656,70 @@ impl ClusterInfo { fn create_broadcast_orders<'a>( contains_last_tick: bool, - blobs: &[SharedBlob], + window: &SharedWindow, broadcast_table: &'a [NodeInfo], - ) -> Vec<(SharedBlob, Vec<&'a NodeInfo>)> { + transmit_index: &mut WindowIndex, + received_index: u64, + me: &NodeInfo, + ) -> Vec<(Option, Vec<&'a NodeInfo>)> { // enumerate all the blobs in the window, those are the indices // transmit them to nodes, starting from a different node. - if blobs.is_empty() { - return vec![]; - } + let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize); + let window_l = window.read().unwrap(); + let mut br_idx = transmit_index.data as usize % broadcast_table.len(); - let mut orders = Vec::with_capacity(blobs.len()); + for idx in transmit_index.data..received_index { + let w_idx = idx as usize % window_l.len(); - for (i, blob) in blobs.iter().enumerate() { - let br_idx = i % broadcast_table.len(); + trace!( + "{} broadcast order data w_idx {} br_idx {}", + me.id, + w_idx, + br_idx + ); - trace!("broadcast order data br_idx {}", br_idx); - - orders.push((blob.clone(), vec![&broadcast_table[br_idx]])); - } - - if contains_last_tick { // Broadcast the last tick to everyone on the network so it doesn't get dropped // (Need to maximize probability the next leader in line sees this handoff tick // despite packet drops) - // If we had a tick at max_tick_height, then we know it must be the last - // Blob in the broadcast, There cannot be an entry that got sent after the - // last tick, guaranteed by the PohService). + let target = if idx == received_index - 1 && contains_last_tick { + // If we see a tick at max_tick_height, then we know it must be the last + // Blob in the window, at index == received_index. There cannot be an entry + // that got sent after the last tick, guaranteed by the PohService). + assert!(window_l[w_idx].data.is_some()); + ( + window_l[w_idx].data.clone(), + broadcast_table.iter().collect(), + ) + } else { + (window_l[w_idx].data.clone(), vec![&broadcast_table[br_idx]]) + }; + + orders.push(target); + br_idx += 1; + br_idx %= broadcast_table.len(); + } + + for idx in transmit_index.coding..received_index { + let w_idx = idx as usize % window_l.len(); + + // skip over empty slots + if window_l[w_idx].coding.is_none() { + continue; + } + + trace!( + "{} broadcast order coding w_idx: {} br_idx :{}", + me.id, + w_idx, + br_idx, + ); + orders.push(( - blobs.last().unwrap().clone(), - broadcast_table.iter().collect(), + window_l[w_idx].coding.clone(), + vec![&broadcast_table[br_idx]], )); + br_idx += 1; + br_idx %= broadcast_table.len(); } orders diff --git a/src/db_ledger.rs b/src/db_ledger.rs index b6a3a61cc7..16749f9911 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -976,7 +976,11 @@ mod tests { fn test_read_blobs_bytes() { let shared_blobs = make_tiny_test_entries(10).to_shared_blobs(); let slot = DEFAULT_SLOT_HEIGHT; - index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, &[slot; 10]); + index_blobs( + shared_blobs.iter().zip(vec![slot; 10].into_iter()), + &Keypair::new().pubkey(), + 0, + ); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); diff --git a/src/db_window.rs b/src/db_window.rs index e2caed6bfd..35ab2377e9 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -562,10 +562,9 @@ mod test { let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs(); index_blobs( - &shared_blobs, + shared_blobs.iter().zip(vec![slot; num_entries].into_iter()), &Keypair::new().pubkey(), 0, - &vec![slot; num_entries], ); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); @@ -658,10 +657,11 @@ mod test { let shared_blobs = original_entries.clone().to_shared_blobs(); index_blobs( - &shared_blobs, + shared_blobs + .iter() + .zip(vec![DEFAULT_SLOT_HEIGHT; num_entries].into_iter()), &Keypair::new().pubkey(), 0, - &vec![DEFAULT_SLOT_HEIGHT; num_entries], ); let mut consume_queue = vec![]; diff --git a/src/erasure.rs b/src/erasure.rs index a7bbc60187..6ee4eb6849 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -2,6 +2,8 @@ use crate::db_ledger::DbLedger; use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; use crate::result::{Error, Result}; +use crate::window::WindowSlot; +use solana_sdk::pubkey::Pubkey; use std::cmp; use std::sync::{Arc, RwLock}; @@ -175,79 +177,6 @@ pub fn decode_blocks( Ok(()) } -fn decode_blobs( - blobs: &[SharedBlob], - erasures: &[i32], - size: usize, - block_start_idx: u64, - slot: u64, -) -> Result { - let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); - let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); - let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); - - assert!(blobs.len() == NUM_DATA + NUM_CODING); - for b in blobs { - locks.push(b.write().unwrap()); - } - - for (i, l) in locks.iter_mut().enumerate() { - if i < NUM_DATA { - data_ptrs.push(&mut l.data[..size]); - } else { - coding_ptrs.push(&mut l.data_mut()[..size]); - } - } - - // Decode the blocks - decode_blocks( - data_ptrs.as_mut_slice(), - coding_ptrs.as_mut_slice(), - &erasures, - )?; - - // Create the missing blobs from the reconstructed data - let mut corrupt = false; - - for i in &erasures[..erasures.len() - 1] { - let n = *i as usize; - let mut idx = n as u64 + block_start_idx; - - let mut data_size; - if n < NUM_DATA { - data_size = locks[n].data_size().unwrap() as usize; - data_size -= BLOB_HEADER_SIZE; - if data_size > BLOB_DATA_SIZE { - error!("corrupt data blob[{}] data_size: {}", idx, data_size); - corrupt = true; - break; - } - } else { - data_size = size; - idx -= NUM_CODING as u64; - locks[n].set_slot(slot).unwrap(); - locks[n].set_index(idx).unwrap(); - - if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { - error!("corrupt coding blob[{}] data_size: {}", idx, data_size); - corrupt = true; - break; - } - } - - locks[n].set_size(data_size); - trace!( - "erasures[{}] ({}) size: {} data[0]: {}", - *i, - idx, - data_size, - locks[n].data()[0] - ); - } - - Ok(corrupt) -} - // Generate coding blocks in window starting from start_idx, // for num_blobs.. For each block place the coding blobs // at the end of the block like so: @@ -285,80 +214,137 @@ fn decode_blobs( // // // -pub struct CodingGenerator { - leftover: Vec, // SharedBlobs that couldn't be used in last call to next() -} +pub fn generate_coding( + id: &Pubkey, + window: &mut [WindowSlot], + receive_index: u64, + num_blobs: usize, + transmit_index_coding: &mut u64, +) -> Result<()> { + // beginning of the coding blobs of the block that receive_index points into + let coding_index_start = + receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64; -impl CodingGenerator { - pub fn new() -> Self { - Self { - leftover: Vec::with_capacity(NUM_DATA), + let start_idx = receive_index as usize % window.len(); + let mut block_start = start_idx - (start_idx % NUM_DATA); + + loop { + let block_end = block_start + NUM_DATA; + if block_end > (start_idx + num_blobs) { + break; } - } + info!( + "generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}", + id, block_start, block_end, start_idx, num_blobs + ); - // must be called with consecutive data blobs from previous invocation - pub fn next(&mut self, next_data: &[SharedBlob]) -> Result> { - let mut next_coding = - Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING); + let mut max_data_size = 0; - let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect(); + // find max_data_size, maybe bail if not all the data is here + for i in block_start..block_end { + let n = i % window.len(); + trace!("{} window[{}] = {:?}", id, n, window[n].data); - for data_blobs in next_data.chunks(NUM_DATA) { - if data_blobs.len() < NUM_DATA { - self.leftover = data_blobs.to_vec(); - break; + if let Some(b) = &window[n].data { + max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size); + } else { + trace!("{} data block is null @ {}", id, n); + return Ok(()); } - self.leftover.clear(); + } - // find max_data_size for the chunk - let max_data_size = align!( - data_blobs - .iter() - .fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)), - JERASURE_ALIGN - ); + // round up to the nearest jerasure alignment + max_data_size = align!(max_data_size, JERASURE_ALIGN); - let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); - let data_ptrs: Vec<_> = data_locks - .iter() - .map(|l| &l.data[..max_data_size]) - .collect(); + let mut data_blobs = Vec::with_capacity(NUM_DATA); + for i in block_start..block_end { + let n = i % window.len(); - let mut coding_blobs = Vec::with_capacity(NUM_CODING); - - for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] { - let index = data_blob.index().unwrap(); - let slot = data_blob.slot().unwrap(); - let id = data_blob.id().unwrap(); - - let coding_blob = SharedBlob::default(); - { - let mut coding_blob = coding_blob.write().unwrap(); - coding_blob.set_index(index).unwrap(); - coding_blob.set_slot(slot).unwrap(); - coding_blob.set_id(&id).unwrap(); - coding_blob.set_size(max_data_size); - coding_blob.set_coding().unwrap(); + if let Some(b) = &window[n].data { + // make sure extra bytes in each blob are zero-d out for generation of + // coding blobs + let mut b_wl = b.write().unwrap(); + for i in b_wl.meta.size..max_data_size { + b_wl.data[i] = 0; } - coding_blobs.push(coding_blob); + data_blobs.push(b); } - - { - let mut coding_locks: Vec<_> = - coding_blobs.iter().map(|b| b.write().unwrap()).collect(); - - let mut coding_ptrs: Vec<_> = coding_locks - .iter_mut() - .map(|l| &mut l.data_mut()[..max_data_size]) - .collect(); - - generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; - } - next_coding.append(&mut coding_blobs); } - Ok(next_coding) + // getting ready to do erasure coding, means that we're potentially + // going back in time, tell our caller we've inserted coding blocks + // starting at coding_index_start + *transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start); + + let mut coding_blobs = Vec::with_capacity(NUM_CODING); + let coding_start = block_end - NUM_CODING; + for i in coding_start..block_end { + let n = i % window.len(); + assert!(window[n].coding.is_none()); + + window[n].coding = Some(SharedBlob::default()); + + let coding = window[n].coding.clone().unwrap(); + let mut coding_wl = coding.write().unwrap(); + for i in 0..max_data_size { + coding_wl.data[i] = 0; + } + // copy index and id from the data blob + if let Some(data) = &window[n].data { + let data_rl = data.read().unwrap(); + + let index = data_rl.index().unwrap(); + let slot = data_rl.slot().unwrap(); + let id = data_rl.id().unwrap(); + + trace!( + "{} copying index {} id {:?} from data to coding", + id, + index, + id + ); + coding_wl.set_index(index).unwrap(); + coding_wl.set_slot(slot).unwrap(); + coding_wl.set_id(&id).unwrap(); + } + coding_wl.set_size(max_data_size); + if coding_wl.set_coding().is_err() { + return Err(Error::ErasureError(ErasureError::EncodeError)); + } + + coding_blobs.push(coding.clone()); + } + + let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); + + let data_ptrs: Vec<_> = data_locks + .iter() + .enumerate() + .map(|(i, l)| { + trace!("{} i: {} data: {}", id, i, l.data[0]); + &l.data[..max_data_size] + }) + .collect(); + + let mut coding_locks: Vec<_> = coding_blobs.iter().map(|b| b.write().unwrap()).collect(); + + let mut coding_ptrs: Vec<_> = coding_locks + .iter_mut() + .enumerate() + .map(|(i, l)| { + trace!("{} i: {} coding: {}", id, i, l.data[0],); + &mut l.data_mut()[..max_data_size] + }) + .collect(); + + generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; + debug!( + "{} start_idx: {} data: {}:{} coding: {}:{}", + id, start_idx, block_start, block_end, coding_start, block_end + ); + block_start = block_end; } + Ok(()) } // Recover the missing data and coding blobs from the input ledger. Returns a vector @@ -415,6 +401,7 @@ pub fn recover( let mut missing_data: Vec = vec![]; let mut missing_coding: Vec = vec![]; + let mut size = None; // Add the data blobs we have into the recovery vector, mark the missing ones for i in block_start_idx..block_end_idx { @@ -429,7 +416,6 @@ pub fn recover( )?; } - let mut size = None; // Add the coding blobs we have into the recovery vector, mark the missing ones for i in coding_start_idx..block_end_idx { let result = db_ledger.get_coding_blob_bytes(slot, i)?; @@ -448,17 +434,78 @@ pub fn recover( } } } - // Due to checks above verifying that (data_missing + coding_missing) <= NUM_CODING and - // data_missing > 0, we know at least one coding block must exist, so "size" can - // not remain None after the above processing. - let size = size.unwrap(); + // Due to check (data_missing + coding_missing) > NUM_CODING from earlier in this function, + // we know at least one coding block must exist, so "size" will not remain None after the + // below processing. + let size = size.unwrap(); // marks end of erasures erasures.push(-1); - trace!("erasures[]:{:?} data_size: {}", erasures, size,); - let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx, slot)?; + let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING); + { + let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING); + let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA); + + for b in &blobs { + locks.push(b.write().unwrap()); + } + + for (i, l) in locks.iter_mut().enumerate() { + if i < NUM_DATA { + data_ptrs.push(&mut l.data[..size]); + } else { + coding_ptrs.push(&mut l.data_mut()[..size]); + } + } + + // Decode the blocks + decode_blocks( + data_ptrs.as_mut_slice(), + coding_ptrs.as_mut_slice(), + &erasures, + )?; + } + + // Create the missing blobs from the reconstructed data + let mut corrupt = false; + + for i in &erasures[..erasures.len() - 1] { + let n = *i as usize; + let mut idx = n as u64 + block_start_idx; + + let mut data_size; + if n < NUM_DATA { + data_size = locks[n].data_size().unwrap() as usize; + data_size -= BLOB_HEADER_SIZE; + if data_size > BLOB_DATA_SIZE { + error!("corrupt data blob[{}] data_size: {}", idx, data_size); + corrupt = true; + break; + } + } else { + data_size = size; + idx -= NUM_CODING as u64; + locks[n].set_slot(slot).unwrap(); + locks[n].set_index(idx).unwrap(); + + if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE { + error!("corrupt coding blob[{}] data_size: {}", idx, data_size); + corrupt = true; + break; + } + } + + locks[n].set_size(data_size); + trace!( + "erasures[{}] ({}) size: {} data[0]: {}", + *i, + idx, + data_size, + locks[n].data()[0] + ); + } if corrupt { // Remove the corrupted coding blobs so there's no effort wasted in trying to @@ -513,7 +560,7 @@ pub mod test { use std::sync::Arc; #[test] - fn test_coding() { + pub fn test_coding() { let zero_vec = vec![0; 16]; let mut vs: Vec> = (0..4).map(|i| (i..(16 + i)).collect()).collect(); let v_orig: Vec = vs[0].clone(); @@ -561,75 +608,6 @@ pub mod test { assert_eq!(v_orig, vs[0]); } - #[test] - fn test_erasure_generate_coding() { - solana_logger::setup(); - - // trivial case - let mut coding_generator = CodingGenerator::new(); - let blobs = Vec::new(); - for _ in 0..NUM_DATA * 2 { - let coding = coding_generator.next(&blobs).unwrap(); - assert_eq!(coding.len(), 0); - } - - // test coding by iterating one blob at a time - let data_blobs = generate_test_blobs(0, NUM_DATA * 2); - - for (i, blob) in data_blobs.iter().cloned().enumerate() { - let coding = coding_generator.next(&[blob]).unwrap(); - - if !coding.is_empty() { - assert_eq!(i % NUM_DATA, NUM_DATA - 1); - assert_eq!(coding.len(), NUM_CODING); - - let size = coding[0].read().unwrap().size().unwrap(); - - // toss one data and one coding - let erasures: Vec = vec![0, NUM_DATA as i32, -1]; - - let block_start_idx = i - (i % NUM_DATA); - let mut blobs: Vec = Vec::with_capacity(NUM_DATA + NUM_CODING); - - blobs.push(SharedBlob::default()); // empty data, erasure at zero - for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] { - // skip first blob - blobs.push(blob.clone()); - } - blobs.push(SharedBlob::default()); // empty coding, erasure at NUM_DATA - for blob in &coding[1..NUM_CODING] { - blobs.push(blob.clone()); - } - - let corrupt = - decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap(); - - assert!(!corrupt); - - assert_eq!( - blobs[1].read().unwrap().meta, - data_blobs[block_start_idx + 1].read().unwrap().meta - ); - assert_eq!( - blobs[1].read().unwrap().data(), - data_blobs[block_start_idx + 1].read().unwrap().data() - ); - assert_eq!( - blobs[0].read().unwrap().meta, - data_blobs[block_start_idx].read().unwrap().meta - ); - assert_eq!( - blobs[0].read().unwrap().data(), - data_blobs[block_start_idx].read().unwrap().data() - ); - assert_eq!( - blobs[NUM_DATA].read().unwrap().data(), - coding[0].read().unwrap().data() - ); - } - } - } - // TODO: Temprorary function used in tests to generate a database ledger // from the window (which is used to generate the erasure coding) // until we also transition generate_coding() and BroadcastStage to use DbLedger @@ -685,140 +663,6 @@ pub mod test { db_ledger } - fn generate_coding( - id: &Pubkey, - window: &mut [WindowSlot], - receive_index: u64, - num_blobs: usize, - transmit_index_coding: &mut u64, - ) -> Result<()> { - // beginning of the coding blobs of the block that receive_index points into - let coding_index_start = - receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64; - - let start_idx = receive_index as usize % window.len(); - let mut block_start = start_idx - (start_idx % NUM_DATA); - - loop { - let block_end = block_start + NUM_DATA; - if block_end > (start_idx + num_blobs) { - break; - } - info!( - "generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}", - id, block_start, block_end, start_idx, num_blobs - ); - - let mut max_data_size = 0; - - // find max_data_size, maybe bail if not all the data is here - for i in block_start..block_end { - let n = i % window.len(); - trace!("{} window[{}] = {:?}", id, n, window[n].data); - - if let Some(b) = &window[n].data { - max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size); - } else { - trace!("{} data block is null @ {}", id, n); - return Ok(()); - } - } - - // round up to the nearest jerasure alignment - max_data_size = align!(max_data_size, JERASURE_ALIGN); - - let mut data_blobs = Vec::with_capacity(NUM_DATA); - for i in block_start..block_end { - let n = i % window.len(); - - if let Some(b) = &window[n].data { - // make sure extra bytes in each blob are zero-d out for generation of - // coding blobs - let mut b_wl = b.write().unwrap(); - for i in b_wl.meta.size..max_data_size { - b_wl.data[i] = 0; - } - data_blobs.push(b); - } - } - - // getting ready to do erasure coding, means that we're potentially - // going back in time, tell our caller we've inserted coding blocks - // starting at coding_index_start - *transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start); - - let mut coding_blobs = Vec::with_capacity(NUM_CODING); - let coding_start = block_end - NUM_CODING; - for i in coding_start..block_end { - let n = i % window.len(); - assert!(window[n].coding.is_none()); - - window[n].coding = Some(SharedBlob::default()); - - let coding = window[n].coding.clone().unwrap(); - let mut coding_wl = coding.write().unwrap(); - for i in 0..max_data_size { - coding_wl.data[i] = 0; - } - // copy index and id from the data blob - if let Some(data) = &window[n].data { - let data_rl = data.read().unwrap(); - - let index = data_rl.index().unwrap(); - let slot = data_rl.slot().unwrap(); - let id = data_rl.id().unwrap(); - - trace!( - "{} copying index {} id {:?} from data to coding", - id, - index, - id - ); - coding_wl.set_index(index).unwrap(); - coding_wl.set_slot(slot).unwrap(); - coding_wl.set_id(&id).unwrap(); - } - coding_wl.set_size(max_data_size); - if coding_wl.set_coding().is_err() { - return Err(Error::ErasureError(ErasureError::EncodeError)); - } - - coding_blobs.push(coding.clone()); - } - - let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect(); - - let data_ptrs: Vec<_> = data_locks - .iter() - .enumerate() - .map(|(i, l)| { - trace!("{} i: {} data: {}", id, i, l.data[0]); - &l.data[..max_data_size] - }) - .collect(); - - let mut coding_locks: Vec<_> = - coding_blobs.iter().map(|b| b.write().unwrap()).collect(); - - let mut coding_ptrs: Vec<_> = coding_locks - .iter_mut() - .enumerate() - .map(|(i, l)| { - trace!("{} i: {} coding: {}", id, i, l.data[0],); - &mut l.data_mut()[..max_data_size] - }) - .collect(); - - generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?; - debug!( - "{} start_idx: {} data: {}:{} coding: {}:{}", - id, start_idx, block_start, block_end, coding_start, block_end - ); - block_start = block_end; - } - Ok(()) - } - pub fn setup_window_ledger( offset: usize, num_blobs: usize, @@ -896,14 +740,12 @@ pub mod test { blobs.push(b_); } - // Make some dummy slots - index_blobs( - &blobs, - &Keypair::new().pubkey(), - offset as u64, - &vec![slot; blobs.len()], - ); - + { + // Make some dummy slots + let slot_tick_heights: Vec<(&SharedBlob, u64)> = + blobs.iter().zip(vec![slot; blobs.len()]).collect(); + index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64); + } for b in blobs { let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; @@ -912,18 +754,6 @@ pub mod test { window } - fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec { - let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs(); - - index_blobs( - &blobs, - &Keypair::new().pubkey(), - offset as u64, - &vec![DEFAULT_SLOT_HEIGHT; blobs.len()], - ); - blobs - } - fn generate_entry_window(offset: usize, num_blobs: usize) -> Vec { let mut window = vec![ WindowSlot { @@ -933,8 +763,17 @@ pub mod test { }; WINDOW_SIZE ]; + let entries = make_tiny_test_entries(num_blobs); + let blobs = entries.to_shared_blobs(); - let blobs = generate_test_blobs(offset, num_blobs); + { + // Make some dummy slots + let slot_tick_heights: Vec<(&SharedBlob, u64)> = blobs + .iter() + .zip(vec![DEFAULT_SLOT_HEIGHT; blobs.len()]) + .collect(); + index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64); + } for b in blobs.into_iter() { let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; diff --git a/src/fullnode.rs b/src/fullnode.rs index 439057b22e..2e868f577a 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -14,6 +14,7 @@ use crate::tpu::{Tpu, TpuReturnType}; use crate::tpu_forwarder::TpuForwarder; use crate::tvu::{Sockets, Tvu, TvuReturnType}; use crate::vote_signer_proxy::VoteSignerProxy; +use crate::window::{new_window, SharedWindow}; use log::Level; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -97,6 +98,7 @@ pub struct Fullnode { bank: Arc, cluster_info: Arc>, sigverify_disabled: bool, + shared_window: SharedWindow, tvu_sockets: Vec, repair_socket: UdpSocket, retransmit_socket: UdpSocket, @@ -202,6 +204,8 @@ impl Fullnode { let db_ledger = db_ledger.unwrap_or_else(|| Self::make_db_ledger(ledger_path)); + let window = new_window(32 * 1024); + let shared_window = Arc::new(RwLock::new(window)); node.info.wallclock = timestamp(); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair( node.info, @@ -311,6 +315,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone broadcast socket"), cluster_info.clone(), + shared_window.clone(), entry_height, bank.leader_scheduler.clone(), entry_receiver, @@ -326,6 +331,7 @@ impl Fullnode { Fullnode { keypair, cluster_info, + shared_window, bank, sigverify_disabled, gossip_service, @@ -481,6 +487,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone broadcast socket"), self.cluster_info.clone(), + self.shared_window.clone(), entry_height, self.bank.leader_scheduler.clone(), blob_receiver, diff --git a/src/lib.rs b/src/lib.rs index 697d11952f..e656a134b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,7 +70,6 @@ pub mod tpu; pub mod tpu_forwarder; pub mod tvu; pub mod vote_signer_proxy; -#[cfg(test)] pub mod window; pub mod window_service; diff --git a/src/packet.rs b/src/packet.rs index 76d0f81384..53b79fc40c 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -8,6 +8,7 @@ use log::Level; use serde::Serialize; pub use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; +use std::borrow::Borrow; use std::cmp; use std::fmt; use std::io; @@ -450,14 +451,21 @@ impl Blob { } } -pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slots: &[u64]) { +pub fn index_blobs(blobs: I, id: &Pubkey, mut index: u64) +where + I: IntoIterator, + J: Borrow<(K, u64)>, + K: Borrow, +{ // enumerate all the blobs, those are the indices - for (blob, slot) in blobs.iter().zip(slots) { - let mut blob = blob.write().unwrap(); + for b in blobs { + let (b, slot) = b.borrow(); + let mut blob = b.borrow().write().unwrap(); blob.set_index(index).expect("set_index"); blob.set_slot(*slot).expect("set_slot"); blob.set_id(id).expect("set_id"); + blob.set_flags(0).unwrap(); index += 1; }