delete db_window.rs, move contents to window_service, clean up process_blobs (#3746)

This commit is contained in:
Rob Walker
2019-04-14 18:52:05 -07:00
committed by GitHub
parent dd005fb50e
commit bd1db51e07
3 changed files with 73 additions and 106 deletions

View File

@ -1,83 +0,0 @@
//! Set of functions for emulating windowing functions from a database ledger implementation
use crate::blocktree::*;
use crate::packet::{SharedBlob, BLOB_HEADER_SIZE};
use crate::result::Result;
use crate::streamer::BlobSender;
use solana_metrics::counter::Counter;
use solana_sdk::pubkey::Pubkey;
use std::borrow::Borrow;
use std::sync::Arc;
pub const MAX_REPAIR_LENGTH: usize = 128;
pub fn retransmit_blobs(dq: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) -> Result<()> {
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
for b in dq {
// Don't add blobs generated by this node to the retransmit queue
if b.read().unwrap().id() != *id {
retransmit_queue.push(b.clone());
}
}
if !retransmit_queue.is_empty() {
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
retransmit.send(retransmit_queue)?;
}
Ok(())
}
/// Process a blob: Add blob to the ledger window.
pub fn process_blob(blocktree: &Arc<Blocktree>, blob: &SharedBlob) -> Result<()> {
let is_coding = blob.read().unwrap().is_coding();
// Check if the blob is in the range of our known leaders. If not, we return.
let (slot, pix) = {
let r_blob = blob.read().unwrap();
(r_blob.slot(), r_blob.index())
};
// TODO: Once the original leader signature is added to the blob, make sure that
// the blob was originally generated by the expected leader for this slot
// Insert the new blob into block tree
if is_coding {
let blob = &blob.read().unwrap();
blocktree.put_coding_blob_bytes(slot, pix, &blob.data[..BLOB_HEADER_SIZE + blob.size()])?;
} else {
blocktree.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])?;
}
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::entry::{make_tiny_test_entries, EntrySlice};
use crate::packet::index_blobs;
use std::sync::Arc;
#[test]
fn test_process_blob() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap());
let num_entries = 10;
let original_entries = make_tiny_test_entries(num_entries);
let shared_blobs = original_entries.clone().to_shared_blobs();
index_blobs(&shared_blobs, &Pubkey::new_rand(), 0, 0, 0);
for blob in shared_blobs.iter().rev() {
process_blob(&blocktree, blob).expect("Expect successful processing of blob");
}
assert_eq!(
blocktree.get_slot_entries(0, 0, None).unwrap(),
original_entries
);
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
}

View File

@ -30,7 +30,6 @@ pub mod blocktree_processor;
pub mod cluster;
pub mod cluster_info;
pub mod cluster_tests;
pub mod db_window;
pub mod entry;
#[cfg(feature = "erasure")]
pub mod erasure;

View File

@ -1,8 +1,9 @@
//! The `window_service` provides a thread for maintaining a window (tail of the ledger).
//! `window_service` handles the data plane incoming blobs, storing them in
//! blocktree and retransmitting where required
//!
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::db_window::*;
use crate::packet::{SharedBlob, BLOB_HEADER_SIZE};
use crate::repair_service::{RepairService, RepairSlotRange};
use crate::result::{Error, Result};
use crate::service::Service;
@ -18,14 +19,51 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant};
pub const MAX_REPAIR_BACKOFF: usize = 128;
fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) -> Result<()> {
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
for blob in blobs {
// Don't add blobs generated by this node to the retransmit queue
if blob.read().unwrap().id() != *id {
retransmit_queue.push(blob.clone());
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum WindowServiceReturnType {
LeaderRotation(u64),
if !retransmit_queue.is_empty() {
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
retransmit.send(retransmit_queue)?;
}
Ok(())
}
/// Process a blob: Add blob to the ledger window.
fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc<Blocktree>) -> Result<()> {
// make an iterator for insert_data_blobs()
let blobs: Vec<_> = blobs.iter().map(move |blob| blob.read().unwrap()).collect();
blocktree.insert_data_blobs(blobs.iter().filter_map(|blob| {
if !blob.is_coding() {
Some(&(**blob))
} else {
None
}
}))?;
for blob in blobs {
// TODO: Once the original leader signature is added to the blob, make sure that
// the blob was originally generated by the expected leader for this slot
// Insert the new blob into block tree
if blob.is_coding() {
blocktree.put_coding_blob_bytes(
blob.slot(),
blob.index(),
&blob.data[..BLOB_HEADER_SIZE + blob.size()],
)?;
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn recv_window(
blocktree: &Arc<Blocktree>,
id: &Pubkey,
@ -52,16 +90,7 @@ fn recv_window(
//send a contiguous set of blocks
trace!("{} num blobs received: {}", id, dq.len());
for b in dq {
let (pix, meta_size) = {
let p = b.read().unwrap();
(p.index(), p.meta.size)
};
trace!("{} window pix: {} size: {}", id, pix, meta_size);
let _ = process_blob(blocktree, &b);
}
process_blobs(&dq, blocktree)?;
trace!(
"Elapsed processing time in recv_window(): {}",
@ -95,7 +124,6 @@ pub struct WindowService {
}
impl WindowService {
#[allow(clippy::too_many_arguments)]
pub fn new(
blocktree: Arc<Blocktree>,
cluster_info: Arc<RwLock<ClusterInfo>>,
@ -155,13 +183,13 @@ impl Service for WindowService {
#[cfg(test)]
mod test {
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::Blocktree;
use super::*;
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::cluster_info::{ClusterInfo, Node};
use crate::entry::make_consecutive_blobs;
use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, EntrySlice};
use crate::packet::index_blobs;
use crate::service::Service;
use crate::streamer::{blob_receiver, responder};
use crate::window_service::WindowService;
use solana_sdk::hash::Hash;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
@ -170,6 +198,29 @@ mod test {
use std::sync::{Arc, RwLock};
use std::time::Duration;
#[test]
fn test_process_blob() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap());
let num_entries = 10;
let original_entries = make_tiny_test_entries(num_entries);
let shared_blobs = original_entries.clone().to_shared_blobs();
index_blobs(&shared_blobs, &Pubkey::new_rand(), 0, 0, 0);
for blob in shared_blobs.into_iter().rev() {
process_blobs(&[blob], &blocktree).expect("Expect successful processing of blob");
}
assert_eq!(
blocktree.get_slot_entries(0, 0, None).unwrap(),
original_entries
);
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn window_send_test() {
solana_logger::setup();