From 0002b5dd020c43a9db73c617ce7b279a2334b40f Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 11 Feb 2019 17:56:52 -0800 Subject: [PATCH] Write to ledger in BroadcastService - Also disconnect the channel between TPU and TVU --- src/broadcast_service.rs | 20 +++++++++----------- src/fullnode.rs | 11 +++++------ src/tpu.rs | 10 +++++----- src/tvu.rs | 26 +++++++++++--------------- 4 files changed, 30 insertions(+), 37 deletions(-) diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index 15b52f257e..64e4787756 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -1,6 +1,7 @@ //! The `broadcast_service` broadcasts data from a leader node to validators //! use crate::bank::Bank; +use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo, DATA_PLANE_FANOUT}; use crate::counter::Counter; use crate::entry::Entry; @@ -11,7 +12,6 @@ use crate::leader_scheduler::LeaderScheduler; use crate::packet::index_blobs; use crate::result::{Error, Result}; use crate::service::Service; -use crate::streamer::BlobSender; use log::Level; use rayon::prelude::*; use solana_metrics::{influxdb, submit}; @@ -47,7 +47,7 @@ impl Broadcast { receiver: &Receiver>, sock: &UdpSocket, leader_scheduler: &Arc>, - blob_sender: &BlobSender, + blocktree: &Arc, ) -> Result<()> { let timer = Duration::new(1, 0); let entries = receiver.recv_timeout(timer)?; @@ -90,7 +90,7 @@ impl Broadcast { inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); - blob_sender.send(blobs.clone())?; + blocktree.write_shared_blobs(blobs.clone())?; // Send out data ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?; @@ -187,7 +187,7 @@ impl BroadcastService { receiver: &Receiver>, max_tick_height: u64, exit_signal: &Arc, - blob_sender: &BlobSender, + blocktree: &Arc, ) -> BroadcastServiceReturnType { let me = cluster_info.read().unwrap().my_data().clone(); @@ -212,7 +212,7 @@ impl BroadcastService { receiver, sock, leader_scheduler, - blob_sender, + blocktree, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { @@ -254,10 +254,10 @@ impl BroadcastService { receiver: Receiver>, max_tick_height: u64, exit_sender: Arc, - blob_sender: &BlobSender, + blocktree: &Arc, ) -> Self { let exit_signal = Arc::new(AtomicBool::new(false)); - let blob_sender = blob_sender.clone(); + let blocktree = blocktree.clone(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { @@ -271,7 +271,7 @@ impl BroadcastService { &receiver, max_tick_height, &exit_signal, - &blob_sender, + &blocktree, ) }) .unwrap(); @@ -335,8 +335,6 @@ mod test { let exit_sender = Arc::new(AtomicBool::new(false)); let bank = Arc::new(Bank::default()); - let (blob_fetch_sender, _) = channel(); - // Start up the broadcast stage let broadcast_service = BroadcastService::new( bank.clone(), @@ -347,7 +345,7 @@ mod test { entry_receiver, max_tick_height, exit_sender, - &blob_fetch_sender, + &blocktree, ); MockBroadcastService { diff --git a/src/fullnode.rs b/src/fullnode.rs index 7d79c0bc42..cc93b9cfa4 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -12,7 +12,6 @@ use crate::rpc::JsonRpcService; use crate::rpc_pubsub::PubSubService; use crate::service::Service; use crate::storage_stage::StorageState; -use crate::streamer::BlobSender; use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender}; use crate::tvu::{Sockets, Tvu}; use crate::voting_keypair::VotingKeypair; @@ -106,7 +105,7 @@ pub struct Fullnode { node_services: NodeServices, rotation_sender: TpuRotationSender, rotation_receiver: TpuRotationReceiver, - blob_sender: BlobSender, + blocktree: Arc, } impl Fullnode { @@ -258,7 +257,7 @@ impl Fullnode { // Setup channel for rotation indications let (rotation_sender, rotation_receiver) = channel(); - let (tvu, blob_sender) = Tvu::new( + let tvu = Tvu::new( voting_keypair_option, &bank, blob_index, @@ -293,7 +292,7 @@ impl Fullnode { &last_entry_id, id, &rotation_sender, - &blob_sender, + &blocktree, scheduled_leader == id, ); @@ -313,7 +312,7 @@ impl Fullnode { broadcast_socket: node.sockets.broadcast, rotation_sender, rotation_receiver, - blob_sender, + blocktree, } } @@ -395,7 +394,7 @@ impl Fullnode { &last_entry_id, self.id, &self.rotation_sender, - &self.blob_sender, + &self.blocktree, ); transition diff --git a/src/tpu.rs b/src/tpu.rs index dc35d1a841..b98b428859 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -3,6 +3,7 @@ use crate::bank::Bank; use crate::banking_stage::BankingStage; +use crate::blocktree::Blocktree; use crate::broadcast_service::BroadcastService; use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; @@ -10,7 +11,6 @@ use crate::fetch_stage::FetchStage; use crate::poh_service::PohServiceConfig; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; -use crate::streamer::BlobSender; use crate::tpu_forwarder::TpuForwarder; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; @@ -84,7 +84,7 @@ impl Tpu { last_entry_id: &Hash, leader_id: Pubkey, to_validator_sender: &TpuRotationSender, - blob_sender: &BlobSender, + blocktree: &Arc, is_leader: bool, ) -> Self { let mut tpu = Self { @@ -105,7 +105,7 @@ impl Tpu { last_entry_id, leader_id, to_validator_sender, - blob_sender, + blocktree, ); } else { tpu.switch_to_forwarder(transactions_sockets, cluster_info); @@ -150,7 +150,7 @@ impl Tpu { last_entry_id: &Hash, leader_id: Pubkey, to_validator_sender: &TpuRotationSender, - blob_sender: &BlobSender, + blocktree: &Arc, ) { self.tpu_mode_close(); @@ -186,7 +186,7 @@ impl Tpu { entry_receiver, max_tick_height, self.exit.clone(), - blob_sender, + blocktree, ); let svcs = LeaderServices::new( diff --git a/src/tvu.rs b/src/tvu.rs index 045ee4689d..f3b2ebbd7f 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -21,7 +21,6 @@ use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; -use crate::streamer::BlobSender; use crate::tpu::{TpuReturnType, TpuRotationReceiver, TpuRotationSender}; use crate::voting_keypair::VotingKeypair; use solana_sdk::hash::Hash; @@ -79,7 +78,7 @@ impl Tvu { entry_stream: Option<&String>, ledger_signal_sender: SyncSender, ledger_signal_receiver: Receiver, - ) -> (Self, BlobSender) { + ) -> Self { let exit = Arc::new(AtomicBool::new(false)); let keypair: Arc = cluster_info .read() @@ -156,18 +155,15 @@ impl Tvu { &cluster_info, ); - ( - Tvu { - fetch_stage, - retransmit_stage, - replay_stage, - entry_stream_stage, - storage_stage, - exit, - last_entry_id: l_last_entry_id, - }, - blob_fetch_sender, - ) + Tvu { + fetch_stage, + retransmit_stage, + replay_stage, + entry_stream_stage, + storage_stage, + exit, + last_entry_id: l_last_entry_id, + } } #[cfg(test)] @@ -260,7 +256,7 @@ pub mod tests { let vote_account_keypair = Arc::new(Keypair::new()); let voting_keypair = VotingKeypair::new_local(&vote_account_keypair); let (sender, _receiver) = channel(); - let (tvu, _blob_sender) = Tvu::new( + let tvu = Tvu::new( Some(Arc::new(voting_keypair)), &bank, 0,