BlobFetchStage cleanup post shred work (#6254)

This commit is contained in:
Pankaj Garg
2019-10-07 11:08:01 -07:00
committed by GitHub
parent 6662986169
commit 17f169f446
4 changed files with 12 additions and 28 deletions

View File

@ -7,7 +7,6 @@
pub mod bank_forks; pub mod bank_forks;
pub mod banking_stage; pub mod banking_stage;
pub mod blob_fetch_stage;
pub mod broadcast_stage; pub mod broadcast_stage;
pub mod chacha; pub mod chacha;
pub mod chacha_cuda; pub mod chacha_cuda;
@ -15,6 +14,7 @@ pub mod cluster_info_vote_listener;
pub mod confidence; pub mod confidence;
pub mod perf_libs; pub mod perf_libs;
pub mod recycler; pub mod recycler;
pub mod shred_fetch_stage;
#[macro_use] #[macro_use]
pub mod contact_info; pub mod contact_info;
pub mod crds; pub mod crds;

View File

@ -1,4 +1,3 @@
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE}; use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE};
use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE};
@ -12,6 +11,7 @@ use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy};
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use crate::shred::Shred; use crate::shred::Shred;
use crate::shred_fetch_stage::ShredFetchStage;
use crate::storage_stage::NUM_STORAGE_SAMPLES; use crate::storage_stage::NUM_STORAGE_SAMPLES;
use crate::streamer::{receiver, responder, PacketReceiver}; use crate::streamer::{receiver, responder, PacketReceiver};
use crate::window_service::WindowService; use crate::window_service::WindowService;
@ -263,7 +263,7 @@ impl Replicator {
.map(Arc::new) .map(Arc::new)
.collect(); .collect();
let (blob_fetch_sender, blob_fetch_receiver) = channel(); let (blob_fetch_sender, blob_fetch_receiver) = channel();
let fetch_stage = BlobFetchStage::new_multi_socket_packet( let fetch_stage = ShredFetchStage::new_multi_socket(
blob_sockets, blob_sockets,
blob_forward_sockets, blob_forward_sockets,
&blob_fetch_sender, &blob_fetch_sender,

View File

@ -1,37 +1,21 @@
//! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel. //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
use crate::recycler::Recycler; use crate::recycler::Recycler;
use crate::result; use crate::result;
use crate::result::Error; use crate::result::Error;
use crate::service::Service; use crate::service::Service;
use crate::streamer::{self, BlobSender, PacketReceiver, PacketSender}; use crate::streamer::{self, PacketReceiver, PacketSender};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, RecvTimeoutError}; use std::sync::mpsc::{channel, RecvTimeoutError};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
pub struct BlobFetchStage { pub struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
impl BlobFetchStage { impl ShredFetchStage {
pub fn new(socket: Arc<UdpSocket>, sender: &BlobSender, exit: &Arc<AtomicBool>) -> Self {
Self::new_multi_socket(vec![socket], sender, exit)
}
pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
sender: &BlobSender,
exit: &Arc<AtomicBool>,
) -> Self {
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| streamer::blob_receiver(socket, &exit, sender.clone()))
.collect();
Self { thread_hdls }
}
fn handle_forwarded_packets( fn handle_forwarded_packets(
recvr: &PacketReceiver, recvr: &PacketReceiver,
sendr: &PacketSender, sendr: &PacketSender,
@ -55,7 +39,7 @@ impl BlobFetchStage {
Ok(()) Ok(())
} }
pub fn new_multi_socket_packet( pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>, sockets: Vec<Arc<UdpSocket>>,
forward_sockets: Vec<Arc<UdpSocket>>, forward_sockets: Vec<Arc<UdpSocket>>,
sender: &PacketSender, sender: &PacketSender,
@ -106,7 +90,7 @@ impl BlobFetchStage {
} }
} }
impl Service for BlobFetchStage { impl Service for ShredFetchStage {
type JoinReturnType = (); type JoinReturnType = ();
fn join(self) -> thread::Result<()> { fn join(self) -> thread::Result<()> {

View File

@ -13,7 +13,6 @@
//! - Generating the keys used to encrypt the ledger and sample it for storage mining. //! - Generating the keys used to encrypt the ledger and sample it for storage mining.
use crate::bank_forks::BankForks; use crate::bank_forks::BankForks;
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blockstream_service::BlockstreamService; use crate::blockstream_service::BlockstreamService;
use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::blocktree::{Blocktree, CompletedSlotsReceiver};
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
@ -25,6 +24,7 @@ use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage; use crate::retransmit_stage::RetransmitStage;
use crate::rpc_subscriptions::RpcSubscriptions; use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service; use crate::service::Service;
use crate::shred_fetch_stage::ShredFetchStage;
use crate::snapshot_package::SnapshotPackagerService; use crate::snapshot_package::SnapshotPackagerService;
use crate::storage_stage::{StorageStage, StorageState}; use crate::storage_stage::{StorageStage, StorageState};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
@ -37,7 +37,7 @@ use std::sync::{Arc, Mutex, RwLock};
use std::thread; use std::thread;
pub struct Tvu { pub struct Tvu {
fetch_stage: BlobFetchStage, fetch_stage: ShredFetchStage,
retransmit_stage: RetransmitStage, retransmit_stage: RetransmitStage,
replay_stage: ReplayStage, replay_stage: ReplayStage,
blockstream_service: Option<BlockstreamService>, blockstream_service: Option<BlockstreamService>,
@ -104,7 +104,7 @@ impl Tvu {
blob_sockets.push(repair_socket.clone()); blob_sockets.push(repair_socket.clone());
let blob_forward_sockets: Vec<Arc<UdpSocket>> = let blob_forward_sockets: Vec<Arc<UdpSocket>> =
tvu_forward_sockets.into_iter().map(Arc::new).collect(); tvu_forward_sockets.into_iter().map(Arc::new).collect();
let fetch_stage = BlobFetchStage::new_multi_socket_packet( let fetch_stage = ShredFetchStage::new_multi_socket(
blob_sockets, blob_sockets,
blob_forward_sockets, blob_forward_sockets,
&fetch_sender, &fetch_sender,