diff --git a/core/src/db_window.rs b/core/src/db_window.rs deleted file mode 100644 index b7708c2c44..0000000000 --- a/core/src/db_window.rs +++ /dev/null @@ -1,83 +0,0 @@ -//! Set of functions for emulating windowing functions from a database ledger implementation -use crate::blocktree::*; -use crate::packet::{SharedBlob, BLOB_HEADER_SIZE}; -use crate::result::Result; -use crate::streamer::BlobSender; -use solana_metrics::counter::Counter; -use solana_sdk::pubkey::Pubkey; -use std::borrow::Borrow; -use std::sync::Arc; - -pub const MAX_REPAIR_LENGTH: usize = 128; - -pub fn retransmit_blobs(dq: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) -> Result<()> { - let mut retransmit_queue: Vec = Vec::new(); - for b in dq { - // Don't add blobs generated by this node to the retransmit queue - if b.read().unwrap().id() != *id { - retransmit_queue.push(b.clone()); - } - } - - if !retransmit_queue.is_empty() { - inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len()); - retransmit.send(retransmit_queue)?; - } - Ok(()) -} - -/// Process a blob: Add blob to the ledger window. -pub fn process_blob(blocktree: &Arc, blob: &SharedBlob) -> Result<()> { - let is_coding = blob.read().unwrap().is_coding(); - - // Check if the blob is in the range of our known leaders. If not, we return. - let (slot, pix) = { - let r_blob = blob.read().unwrap(); - (r_blob.slot(), r_blob.index()) - }; - - // TODO: Once the original leader signature is added to the blob, make sure that - // the blob was originally generated by the expected leader for this slot - - // Insert the new blob into block tree - if is_coding { - let blob = &blob.read().unwrap(); - blocktree.put_coding_blob_bytes(slot, pix, &blob.data[..BLOB_HEADER_SIZE + blob.size()])?; - } else { - blocktree.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])?; - } - - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - use crate::blocktree::get_tmp_ledger_path; - use crate::entry::{make_tiny_test_entries, EntrySlice}; - use crate::packet::index_blobs; - use std::sync::Arc; - - #[test] - fn test_process_blob() { - let blocktree_path = get_tmp_ledger_path!(); - let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap()); - let num_entries = 10; - let original_entries = make_tiny_test_entries(num_entries); - let shared_blobs = original_entries.clone().to_shared_blobs(); - - index_blobs(&shared_blobs, &Pubkey::new_rand(), 0, 0, 0); - - for blob in shared_blobs.iter().rev() { - process_blob(&blocktree, blob).expect("Expect successful processing of blob"); - } - - assert_eq!( - blocktree.get_slot_entries(0, 0, None).unwrap(), - original_entries - ); - - drop(blocktree); - Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); - } -} diff --git a/core/src/lib.rs b/core/src/lib.rs index ca38503618..82b28dc75e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -30,7 +30,6 @@ pub mod blocktree_processor; pub mod cluster; pub mod cluster_info; pub mod cluster_tests; -pub mod db_window; pub mod entry; #[cfg(feature = "erasure")] pub mod erasure; diff --git a/core/src/window_service.rs b/core/src/window_service.rs index c705743eca..11e2aae8e3 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -1,8 +1,9 @@ -//! The `window_service` provides a thread for maintaining a window (tail of the ledger). +//! `window_service` handles the data plane incoming blobs, storing them in +//! blocktree and retransmitting where required //! use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; -use crate::db_window::*; +use crate::packet::{SharedBlob, BLOB_HEADER_SIZE}; use crate::repair_service::{RepairService, RepairSlotRange}; use crate::result::{Error, Result}; use crate::service::Service; @@ -18,14 +19,51 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; -pub const MAX_REPAIR_BACKOFF: usize = 128; +fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) -> Result<()> { + let mut retransmit_queue: Vec = Vec::new(); + for blob in blobs { + // Don't add blobs generated by this node to the retransmit queue + if blob.read().unwrap().id() != *id { + retransmit_queue.push(blob.clone()); + } + } -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum WindowServiceReturnType { - LeaderRotation(u64), + if !retransmit_queue.is_empty() { + inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len()); + retransmit.send(retransmit_queue)?; + } + Ok(()) +} + +/// Process a blob: Add blob to the ledger window. +fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc) -> Result<()> { + // make an iterator for insert_data_blobs() + let blobs: Vec<_> = blobs.iter().map(move |blob| blob.read().unwrap()).collect(); + + blocktree.insert_data_blobs(blobs.iter().filter_map(|blob| { + if !blob.is_coding() { + Some(&(**blob)) + } else { + None + } + }))?; + + for blob in blobs { + // TODO: Once the original leader signature is added to the blob, make sure that + // the blob was originally generated by the expected leader for this slot + + // Insert the new blob into block tree + if blob.is_coding() { + blocktree.put_coding_blob_bytes( + blob.slot(), + blob.index(), + &blob.data[..BLOB_HEADER_SIZE + blob.size()], + )?; + } + } + Ok(()) } -#[allow(clippy::too_many_arguments)] fn recv_window( blocktree: &Arc, id: &Pubkey, @@ -52,16 +90,7 @@ fn recv_window( //send a contiguous set of blocks trace!("{} num blobs received: {}", id, dq.len()); - for b in dq { - let (pix, meta_size) = { - let p = b.read().unwrap(); - (p.index(), p.meta.size) - }; - - trace!("{} window pix: {} size: {}", id, pix, meta_size); - - let _ = process_blob(blocktree, &b); - } + process_blobs(&dq, blocktree)?; trace!( "Elapsed processing time in recv_window(): {}", @@ -95,7 +124,6 @@ pub struct WindowService { } impl WindowService { - #[allow(clippy::too_many_arguments)] pub fn new( blocktree: Arc, cluster_info: Arc>, @@ -155,13 +183,13 @@ impl Service for WindowService { #[cfg(test)] mod test { - use crate::blocktree::get_tmp_ledger_path; - use crate::blocktree::Blocktree; + use super::*; + use crate::blocktree::{get_tmp_ledger_path, Blocktree}; use crate::cluster_info::{ClusterInfo, Node}; - use crate::entry::make_consecutive_blobs; + use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, EntrySlice}; + use crate::packet::index_blobs; use crate::service::Service; use crate::streamer::{blob_receiver, responder}; - use crate::window_service::WindowService; use solana_sdk::hash::Hash; use std::fs::remove_dir_all; use std::net::UdpSocket; @@ -170,6 +198,29 @@ mod test { use std::sync::{Arc, RwLock}; use std::time::Duration; + #[test] + fn test_process_blob() { + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap()); + let num_entries = 10; + let original_entries = make_tiny_test_entries(num_entries); + let shared_blobs = original_entries.clone().to_shared_blobs(); + + index_blobs(&shared_blobs, &Pubkey::new_rand(), 0, 0, 0); + + for blob in shared_blobs.into_iter().rev() { + process_blobs(&[blob], &blocktree).expect("Expect successful processing of blob"); + } + + assert_eq!( + blocktree.get_slot_entries(0, 0, None).unwrap(), + original_entries + ); + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + #[test] pub fn window_send_test() { solana_logger::setup();