From 32162ef0f1f9204f15273b7388ec7ab82631f920 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 31 Jan 2019 13:43:22 -0800 Subject: [PATCH] Connect TPU's broadcast service with TVU's blob fetch stage (#2587) * Connect TPU's broadcast service with TVU's blob fetch stage - This is needed since ledger is being written only in TVU now * fix clippy warnings * fix failing test * fix broken tests * fixed failing tests --- src/blob_fetch_stage.rs | 14 ++++++-------- src/broadcast_service.rs | 19 ++++++++++++++++++- src/fullnode.rs | 7 ++++++- src/replicator.rs | 6 ++++-- src/retransmit_stage.rs | 4 +++- src/tpu.rs | 5 +++++ src/tvu.rs | 36 ++++++++++++++++++++++-------------- src/window_service.rs | 8 +++++++- tests/multinode.rs | 12 +++++++++--- 9 files changed, 80 insertions(+), 31 deletions(-) diff --git a/src/blob_fetch_stage.rs b/src/blob_fetch_stage.rs index 5cef29271d..a0cebc32ad 100644 --- a/src/blob_fetch_stage.rs +++ b/src/blob_fetch_stage.rs @@ -1,10 +1,9 @@ //! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel. use crate::service::Service; -use crate::streamer::{self, BlobReceiver}; +use crate::streamer::{self, BlobSender}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::{self, JoinHandle}; @@ -14,21 +13,20 @@ pub struct BlobFetchStage { } impl BlobFetchStage { - #[allow(clippy::new_ret_no_self)] - pub fn new(socket: Arc, exit: Arc) -> (Self, BlobReceiver) { - Self::new_multi_socket(vec![socket], exit) + pub fn new(socket: Arc, sender: &BlobSender, exit: Arc) -> Self { + Self::new_multi_socket(vec![socket], sender, exit) } pub fn new_multi_socket( sockets: Vec>, + sender: &BlobSender, exit: Arc, - ) -> (Self, BlobReceiver) { - let (sender, receiver) = channel(); + ) -> Self { let thread_hdls: Vec<_> = sockets .into_iter() .map(|socket| streamer::blob_receiver(socket, exit.clone(), sender.clone())) .collect(); - (Self { exit, thread_hdls }, receiver) + Self { exit, thread_hdls } } pub fn close(&self) { diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index ee4af66bac..1ff1af8538 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -11,6 +11,7 @@ use crate::leader_scheduler::LeaderScheduler; use crate::packet::index_blobs; use crate::result::{Error, Result}; use crate::service::Service; +use crate::streamer::BlobSender; use log::Level; use rayon::prelude::*; use solana_metrics::{influxdb, submit}; @@ -46,6 +47,7 @@ impl Broadcast { receiver: &Receiver>, sock: &UdpSocket, leader_scheduler: &Arc>, + blob_sender: &BlobSender, ) -> Result<()> { let timer = Duration::new(1, 0); let entries = receiver.recv_timeout(timer)?; @@ -91,6 +93,8 @@ impl Broadcast { inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); + blob_sender.send(blobs.clone())?; + // don't count coding blobs in the blob indexes self.blob_index += blobs.len() as u64; @@ -191,6 +195,7 @@ impl BroadcastService { receiver: &Receiver>, max_tick_height: Option, exit_signal: &Arc, + blob_sender: &BlobSender, ) -> BroadcastServiceReturnType { let me = cluster_info.read().unwrap().my_data().clone(); @@ -210,7 +215,13 @@ impl BroadcastService { // Layer 1, leader nodes are limited to the fanout size. broadcast_table.truncate(DATA_PLANE_FANOUT); inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); - if let Err(e) = broadcast.run(&broadcast_table, receiver, sock, leader_scheduler) { + if let Err(e) = broadcast.run( + &broadcast_table, + receiver, + sock, + leader_scheduler, + blob_sender, + ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { return BroadcastServiceReturnType::ChannelDisconnected; @@ -250,8 +261,10 @@ impl BroadcastService { receiver: Receiver>, max_tick_height: Option, exit_sender: Arc, + blob_sender: &BlobSender, ) -> Self { let exit_signal = Arc::new(AtomicBool::new(false)); + let blob_sender = blob_sender.clone(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { @@ -265,6 +278,7 @@ impl BroadcastService { &receiver, max_tick_height, &exit_signal, + &blob_sender, ) }) .unwrap(); @@ -328,6 +342,8 @@ mod test { let exit_sender = Arc::new(AtomicBool::new(false)); let bank = Arc::new(Bank::default()); + let (blob_fetch_sender, _) = channel(); + // Start up the broadcast stage let broadcast_service = BroadcastService::new( bank.clone(), @@ -338,6 +354,7 @@ mod test { entry_receiver, Some(max_tick_height), exit_sender, + &blob_fetch_sender, ); MockBroadcastService { diff --git a/src/fullnode.rs b/src/fullnode.rs index 4145c1bc89..a09137b25b 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -11,6 +11,7 @@ use crate::rpc::JsonRpcService; use crate::rpc_pubsub::PubSubService; use crate::service::Service; use crate::storage_stage::StorageState; +use crate::streamer::BlobSender; use crate::tpu::{Tpu, TpuReturnType}; use crate::tvu::{Sockets, Tvu, TvuReturnType}; use crate::vote_signer_proxy::VoteSignerProxy; @@ -100,6 +101,7 @@ pub struct Fullnode { broadcast_socket: UdpSocket, pub node_services: NodeServices, pub role_notifiers: (TvuRotationReceiver, TpuRotationReceiver), + blob_sender: BlobSender, } impl Fullnode { @@ -219,7 +221,7 @@ impl Fullnode { let (to_leader_sender, to_leader_receiver) = channel(); let (to_validator_sender, to_validator_receiver) = channel(); - let tvu = Tvu::new( + let (tvu, blob_sender) = Tvu::new( vote_signer_option, &bank, entry_height, @@ -257,6 +259,7 @@ impl Fullnode { id, scheduled_leader == id, &to_validator_sender, + &blob_sender, ); inc_new_counter_info!("fullnode-new", 1); @@ -274,6 +277,7 @@ impl Fullnode { tpu_sockets: node.sockets.tpu, broadcast_socket: node.sockets.broadcast, role_notifiers: (to_leader_receiver, to_validator_receiver), + blob_sender, } } @@ -333,6 +337,7 @@ impl Fullnode { &last_id, self.id, &to_validator_sender, + &self.blob_sender, ) } diff --git a/src/replicator.rs b/src/replicator.rs index f7b7e6d00e..0dab69d633 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -167,8 +167,9 @@ impl Replicator { let mut blob_sockets: Vec> = node.sockets.tvu.into_iter().map(Arc::new).collect(); blob_sockets.push(repair_socket.clone()); - let (fetch_stage, blob_fetch_receiver) = - BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); + let (blob_fetch_sender, blob_fetch_receiver) = channel(); + let fetch_stage = + BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, exit.clone()); // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); @@ -189,6 +190,7 @@ impl Replicator { leader_pubkey, ))), done.clone(), + exit.clone(), ); info!("window created, waiting for ledger download done"); diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 04b10534ce..77dce2e583 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -123,7 +123,7 @@ pub struct RetransmitStage { } impl RetransmitStage { - #[allow(clippy::new_ret_no_self)] + #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] pub fn new( bank: &Arc, db_ledger: Arc, @@ -134,6 +134,7 @@ impl RetransmitStage { repair_socket: Arc, fetch_stage_receiver: BlobReceiver, leader_scheduler: Arc>, + exit: Arc, ) -> (Self, Receiver>) { let (retransmit_sender, retransmit_receiver) = channel(); @@ -157,6 +158,7 @@ impl RetransmitStage { repair_socket, leader_scheduler, done, + exit, ); let thread_hdls = vec![t_retransmit, t_window]; diff --git a/src/tpu.rs b/src/tpu.rs index 59fb012dca..597a7e6a4d 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -10,6 +10,7 @@ use crate::fullnode::TpuRotationSender; use crate::poh_service::Config; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; +use crate::streamer::BlobSender; use crate::tpu_forwarder::TpuForwarder; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; @@ -81,6 +82,7 @@ impl Tpu { leader_id: Pubkey, is_leader: bool, to_validator_sender: &TpuRotationSender, + blob_sender: &BlobSender, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); @@ -110,6 +112,7 @@ impl Tpu { entry_receiver, max_tick_height, exit.clone(), + blob_sender, ); let svcs = LeaderServices::new( @@ -162,6 +165,7 @@ impl Tpu { last_entry_id: &Hash, leader_id: Pubkey, to_validator_sender: &TpuRotationSender, + blob_sender: &BlobSender, ) { match &self.tpu_mode { TpuMode::Leader(svcs) => { @@ -197,6 +201,7 @@ impl Tpu { entry_receiver, max_tick_height, self.exit.clone(), + blob_sender, ); let svcs = LeaderServices::new( diff --git a/src/tvu.rs b/src/tvu.rs index 757a2f04a2..407b217e8b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -21,11 +21,13 @@ use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; +use crate::streamer::BlobSender; use crate::vote_signer_proxy::VoteSignerProxy; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread; @@ -60,7 +62,7 @@ impl Tvu { /// * `cluster_info` - The cluster_info state. /// * `sockets` - My fetch, repair, and restransmit sockets /// * `db_ledger` - the ledger itself - #[allow(clippy::too_many_arguments)] + #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] pub fn new( vote_signer: Option>, bank: &Arc, @@ -73,7 +75,7 @@ impl Tvu { to_leader_sender: TvuRotationSender, storage_state: &StorageState, entry_stream: Option, - ) -> Self { + ) -> (Self, BlobSender) { let exit = Arc::new(AtomicBool::new(false)); let keypair: Arc = cluster_info .read() @@ -87,12 +89,14 @@ impl Tvu { retransmit: retransmit_socket, } = sockets; + let (blob_fetch_sender, blob_fetch_receiver) = channel(); + let repair_socket = Arc::new(repair_socket); let mut blob_sockets: Vec> = fetch_sockets.into_iter().map(Arc::new).collect(); blob_sockets.push(repair_socket.clone()); - let (fetch_stage, blob_fetch_receiver) = - BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); + let fetch_stage = + BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, exit.clone()); //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified @@ -107,6 +111,7 @@ impl Tvu { repair_socket, blob_fetch_receiver, bank.leader_scheduler.clone(), + exit.clone(), ); let l_entry_height = Arc::new(RwLock::new(entry_height)); @@ -136,15 +141,18 @@ impl Tvu { &cluster_info, ); - Tvu { - fetch_stage, - retransmit_stage, - replay_stage, - storage_stage, - exit, - last_entry_id: l_last_entry_id, - entry_height: l_entry_height, - } + ( + Tvu { + fetch_stage, + retransmit_stage, + replay_stage, + storage_stage, + exit, + last_entry_id: l_last_entry_id, + entry_height: l_entry_height, + }, + blob_fetch_sender, + ) } pub fn get_state(&self) -> (Hash, u64) { @@ -285,7 +293,7 @@ pub mod tests { let vote_account_keypair = Arc::new(Keypair::new()); let vote_signer = VoteSignerProxy::new_local(&vote_account_keypair); let (sender, _) = channel(); - let tvu = Tvu::new( + let (tvu, _) = Tvu::new( Some(Arc::new(vote_signer)), &bank, 0, diff --git a/src/window_service.rs b/src/window_service.rs index 1f6dc5e405..ab3bf2ec00 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -15,7 +15,7 @@ use solana_metrics::{influxdb, submit}; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; @@ -129,6 +129,7 @@ pub fn window_service( repair_socket: Arc, leader_scheduler: Arc>, done: Arc, + exit: Arc, ) -> JoinHandle<()> { Builder::new() .name("solana-window".to_string()) @@ -139,6 +140,9 @@ pub fn window_service( let id = cluster_info.read().unwrap().id(); trace!("{}: RECV_WINDOW started", id); loop { + if exit.load(Ordering::Relaxed) { + break; + } if let Err(e) = recv_window( &db_ledger, &id, @@ -273,6 +277,7 @@ mod test { Arc::new(tn.sockets.repair), Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), done, + exit.clone(), ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -342,6 +347,7 @@ mod test { Arc::new(tn.sockets.repair), Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), done, + exit.clone(), ); let t_responder = { let (s_responder, r_responder) = channel(); diff --git a/tests/multinode.rs b/tests/multinode.rs index 92f8e8ca77..f2e3580487 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -29,6 +29,7 @@ use std::env; use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::{Duration, Instant}; @@ -1708,9 +1709,14 @@ fn test_broadcast_last_tick() { let blob_fetch_stages: Vec<_> = listening_nodes .iter_mut() .map(|(_, _, node, _)| { - BlobFetchStage::new( - Arc::new(node.sockets.tvu.pop().unwrap()), - blob_receiver_exit.clone(), + let (blob_fetch_sender, blob_fetch_receiver) = channel(); + ( + BlobFetchStage::new( + Arc::new(node.sockets.tvu.pop().unwrap()), + &blob_fetch_sender, + blob_receiver_exit.clone(), + ), + blob_fetch_receiver, ) }) .collect();