@ -1,5 +1,5 @@
|
||||
//! `window_service` handles the data plane incoming shreds, storing them in
|
||||
//! blocktree and retransmitting where required
|
||||
//! blockstore and retransmitting where required
|
||||
//!
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
use crate::packet::Packets;
|
||||
@ -13,7 +13,7 @@ use rayon::iter::IntoParallelRefMutIterator;
|
||||
use rayon::iter::ParallelIterator;
|
||||
use rayon::ThreadPool;
|
||||
use solana_ledger::bank_forks::BankForks;
|
||||
use solana_ledger::blocktree::{self, Blocktree, MAX_DATA_SHREDS_PER_SLOT};
|
||||
use solana_ledger::blockstore::{self, Blockstore, MAX_DATA_SHREDS_PER_SLOT};
|
||||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||
use solana_ledger::shred::Shred;
|
||||
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
|
||||
@ -30,7 +30,7 @@ use std::time::{Duration, Instant};
|
||||
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
|
||||
if shred.is_data() {
|
||||
// Only data shreds have parent information
|
||||
blocktree::verify_shred_slots(shred.slot(), shred.parent(), root)
|
||||
blockstore::verify_shred_slots(shred.slot(), shred.parent(), root)
|
||||
} else {
|
||||
// Filter out outdated coding shreds
|
||||
shred.slot() >= root
|
||||
@ -75,7 +75,7 @@ pub fn should_retransmit_and_persist(
|
||||
|
||||
fn run_insert(
|
||||
shred_receiver: &CrossbeamReceiver<Vec<Shred>>,
|
||||
blocktree: &Arc<Blocktree>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::from_millis(200);
|
||||
@ -85,15 +85,15 @@ fn run_insert(
|
||||
shreds.append(&mut more_shreds)
|
||||
}
|
||||
|
||||
let blocktree_insert_metrics =
|
||||
blocktree.insert_shreds(shreds, Some(leader_schedule_cache), false)?;
|
||||
blocktree_insert_metrics.report_metrics("recv-window-insert-shreds");
|
||||
let blockstore_insert_metrics =
|
||||
blockstore.insert_shreds(shreds, Some(leader_schedule_cache), false)?;
|
||||
blockstore_insert_metrics.report_metrics("recv-window-insert-shreds");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn recv_window<F>(
|
||||
blocktree: &Arc<Blocktree>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
insert_shred_sender: &CrossbeamSender<Vec<Shred>>,
|
||||
my_pubkey: &Pubkey,
|
||||
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
|
||||
@ -117,7 +117,7 @@ where
|
||||
let now = Instant::now();
|
||||
inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
|
||||
|
||||
let last_root = blocktree.last_root();
|
||||
let last_root = blockstore.last_root();
|
||||
let shreds: Vec<_> = thread_pool.install(|| {
|
||||
packets
|
||||
.par_iter_mut()
|
||||
@ -138,7 +138,7 @@ where
|
||||
// get retransmitted. It'll allow peer nodes to see this shred
|
||||
// and trigger them to mark the slot as dead.
|
||||
if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 {
|
||||
let _ = blocktree.set_dead_slot(shred.slot());
|
||||
let _ = blockstore.set_dead_slot(shred.slot());
|
||||
}
|
||||
packet.meta.slot = shred.slot();
|
||||
packet.meta.seed = shred.seed();
|
||||
@ -205,7 +205,7 @@ pub struct WindowService {
|
||||
impl WindowService {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new<F>(
|
||||
blocktree: Arc<Blocktree>,
|
||||
blockstore: Arc<Blockstore>,
|
||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
retransmit: PacketSender,
|
||||
@ -227,7 +227,7 @@ impl WindowService {
|
||||
};
|
||||
|
||||
let repair_service = RepairService::new(
|
||||
blocktree.clone(),
|
||||
blockstore.clone(),
|
||||
exit.clone(),
|
||||
repair_socket,
|
||||
cluster_info.clone(),
|
||||
@ -238,7 +238,7 @@ impl WindowService {
|
||||
|
||||
let t_insert = Self::start_window_insert_thread(
|
||||
exit,
|
||||
&blocktree,
|
||||
&blockstore,
|
||||
leader_schedule_cache,
|
||||
insert_receiver,
|
||||
);
|
||||
@ -246,7 +246,7 @@ impl WindowService {
|
||||
let t_window = Self::start_recv_window_thread(
|
||||
cluster_info.read().unwrap().id(),
|
||||
exit,
|
||||
&blocktree,
|
||||
&blockstore,
|
||||
insert_sender,
|
||||
verified_receiver,
|
||||
shred_filter,
|
||||
@ -263,12 +263,12 @@ impl WindowService {
|
||||
|
||||
fn start_window_insert_thread(
|
||||
exit: &Arc<AtomicBool>,
|
||||
blocktree: &Arc<Blocktree>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
insert_receiver: CrossbeamReceiver<Vec<Shred>>,
|
||||
) -> JoinHandle<()> {
|
||||
let exit = exit.clone();
|
||||
let blocktree = blocktree.clone();
|
||||
let blockstore = blockstore.clone();
|
||||
let leader_schedule_cache = leader_schedule_cache.clone();
|
||||
let mut handle_timeout = || {};
|
||||
let handle_error = || {
|
||||
@ -281,7 +281,7 @@ impl WindowService {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(e) = run_insert(&insert_receiver, &blocktree, &leader_schedule_cache) {
|
||||
if let Err(e) = run_insert(&insert_receiver, &blockstore, &leader_schedule_cache) {
|
||||
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
|
||||
break;
|
||||
}
|
||||
@ -293,7 +293,7 @@ impl WindowService {
|
||||
fn start_recv_window_thread<F>(
|
||||
id: Pubkey,
|
||||
exit: &Arc<AtomicBool>,
|
||||
blocktree: &Arc<Blocktree>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
insert_sender: CrossbeamSender<Vec<Shred>>,
|
||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
shred_filter: F,
|
||||
@ -307,7 +307,7 @@ impl WindowService {
|
||||
+ std::marker::Sync,
|
||||
{
|
||||
let exit = exit.clone();
|
||||
let blocktree = blocktree.clone();
|
||||
let blockstore = blockstore.clone();
|
||||
Builder::new()
|
||||
.name("solana-window".to_string())
|
||||
.spawn(move || {
|
||||
@ -334,7 +334,7 @@ impl WindowService {
|
||||
}
|
||||
};
|
||||
if let Err(e) = recv_window(
|
||||
&blocktree,
|
||||
&blockstore,
|
||||
&insert_sender,
|
||||
&id,
|
||||
&verified_receiver,
|
||||
@ -401,7 +401,7 @@ mod test {
|
||||
use rand::thread_rng;
|
||||
use solana_ledger::shred::DataShredHeader;
|
||||
use solana_ledger::{
|
||||
blocktree::{make_many_slot_entries, Blocktree},
|
||||
blockstore::{make_many_slot_entries, Blockstore},
|
||||
entry::{create_ticks, Entry},
|
||||
get_tmp_ledger_path,
|
||||
shred::Shredder,
|
||||
@ -434,23 +434,23 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn test_process_shred() {
|
||||
let blocktree_path = get_tmp_ledger_path!();
|
||||
let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap());
|
||||
let blockstore_path = get_tmp_ledger_path!();
|
||||
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
|
||||
let num_entries = 10;
|
||||
let original_entries = create_ticks(num_entries, 0, Hash::default());
|
||||
let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Arc::new(Keypair::new()));
|
||||
shreds.reverse();
|
||||
blocktree
|
||||
blockstore
|
||||
.insert_shreds(shreds, None, false)
|
||||
.expect("Expect successful processing of shred");
|
||||
|
||||
assert_eq!(
|
||||
blocktree.get_slot_entries(0, 0, None).unwrap(),
|
||||
blockstore.get_slot_entries(0, 0, None).unwrap(),
|
||||
original_entries
|
||||
);
|
||||
|
||||
drop(blocktree);
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
drop(blockstore);
|
||||
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -529,18 +529,18 @@ mod test {
|
||||
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> WindowService {
|
||||
let blocktree_path = get_tmp_ledger_path!();
|
||||
let (blocktree, _, _) = Blocktree::open_with_signal(&blocktree_path)
|
||||
let blockstore_path = get_tmp_ledger_path!();
|
||||
let (blockstore, _, _) = Blockstore::open_with_signal(&blockstore_path)
|
||||
.expect("Expected to be able to open database ledger");
|
||||
|
||||
let blocktree = Arc::new(blocktree);
|
||||
let blockstore = Arc::new(blockstore);
|
||||
let (retransmit_sender, _retransmit_receiver) = channel();
|
||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
||||
ContactInfo::new_localhost(&Pubkey::default(), 0),
|
||||
)));
|
||||
let repair_sock = Arc::new(UdpSocket::bind(socketaddr_any!()).unwrap());
|
||||
let window = WindowService::new(
|
||||
blocktree,
|
||||
blockstore,
|
||||
cluster_info,
|
||||
verified_receiver,
|
||||
retransmit_sender,
|
||||
|
Reference in New Issue
Block a user