Write to ledger in BroadcastService

- Also disconnect the channel between TPU and TVU
This commit is contained in:
Pankaj Garg
2019-02-11 17:56:52 -08:00
committed by Grimes
parent 709598541f
commit 0002b5dd02
4 changed files with 30 additions and 37 deletions

View File

@ -1,6 +1,7 @@
//! The `broadcast_service` broadcasts data from a leader node to validators //! The `broadcast_service` broadcasts data from a leader node to validators
//! //!
use crate::bank::Bank; use crate::bank::Bank;
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo, DATA_PLANE_FANOUT}; use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo, DATA_PLANE_FANOUT};
use crate::counter::Counter; use crate::counter::Counter;
use crate::entry::Entry; use crate::entry::Entry;
@ -11,7 +12,6 @@ use crate::leader_scheduler::LeaderScheduler;
use crate::packet::index_blobs; use crate::packet::index_blobs;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use crate::streamer::BlobSender;
use log::Level; use log::Level;
use rayon::prelude::*; use rayon::prelude::*;
use solana_metrics::{influxdb, submit}; use solana_metrics::{influxdb, submit};
@ -47,7 +47,7 @@ impl Broadcast {
receiver: &Receiver<Vec<Entry>>, receiver: &Receiver<Vec<Entry>>,
sock: &UdpSocket, sock: &UdpSocket,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
blob_sender: &BlobSender, blocktree: &Arc<Blocktree>,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let entries = receiver.recv_timeout(timer)?; let entries = receiver.recv_timeout(timer)?;
@ -90,7 +90,7 @@ impl Broadcast {
inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
blob_sender.send(blobs.clone())?; blocktree.write_shared_blobs(blobs.clone())?;
// Send out data // Send out data
ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?; ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?;
@ -187,7 +187,7 @@ impl BroadcastService {
receiver: &Receiver<Vec<Entry>>, receiver: &Receiver<Vec<Entry>>,
max_tick_height: u64, max_tick_height: u64,
exit_signal: &Arc<AtomicBool>, exit_signal: &Arc<AtomicBool>,
blob_sender: &BlobSender, blocktree: &Arc<Blocktree>,
) -> BroadcastServiceReturnType { ) -> BroadcastServiceReturnType {
let me = cluster_info.read().unwrap().my_data().clone(); let me = cluster_info.read().unwrap().my_data().clone();
@ -212,7 +212,7 @@ impl BroadcastService {
receiver, receiver,
sock, sock,
leader_scheduler, leader_scheduler,
blob_sender, blocktree,
) { ) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => {
@ -254,10 +254,10 @@ impl BroadcastService {
receiver: Receiver<Vec<Entry>>, receiver: Receiver<Vec<Entry>>,
max_tick_height: u64, max_tick_height: u64,
exit_sender: Arc<AtomicBool>, exit_sender: Arc<AtomicBool>,
blob_sender: &BlobSender, blocktree: &Arc<Blocktree>,
) -> Self { ) -> Self {
let exit_signal = Arc::new(AtomicBool::new(false)); let exit_signal = Arc::new(AtomicBool::new(false));
let blob_sender = blob_sender.clone(); let blocktree = blocktree.clone();
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string()) .name("solana-broadcaster".to_string())
.spawn(move || { .spawn(move || {
@ -271,7 +271,7 @@ impl BroadcastService {
&receiver, &receiver,
max_tick_height, max_tick_height,
&exit_signal, &exit_signal,
&blob_sender, &blocktree,
) )
}) })
.unwrap(); .unwrap();
@ -335,8 +335,6 @@ mod test {
let exit_sender = Arc::new(AtomicBool::new(false)); let exit_sender = Arc::new(AtomicBool::new(false));
let bank = Arc::new(Bank::default()); let bank = Arc::new(Bank::default());
let (blob_fetch_sender, _) = channel();
// Start up the broadcast stage // Start up the broadcast stage
let broadcast_service = BroadcastService::new( let broadcast_service = BroadcastService::new(
bank.clone(), bank.clone(),
@ -347,7 +345,7 @@ mod test {
entry_receiver, entry_receiver,
max_tick_height, max_tick_height,
exit_sender, exit_sender,
&blob_fetch_sender, &blocktree,
); );
MockBroadcastService { MockBroadcastService {

View File

@ -12,7 +12,6 @@ use crate::rpc::JsonRpcService;
use crate::rpc_pubsub::PubSubService; use crate::rpc_pubsub::PubSubService;
use crate::service::Service; use crate::service::Service;
use crate::storage_stage::StorageState; use crate::storage_stage::StorageState;
use crate::streamer::BlobSender;
use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender}; use crate::tpu::{Tpu, TpuRotationReceiver, TpuRotationSender};
use crate::tvu::{Sockets, Tvu}; use crate::tvu::{Sockets, Tvu};
use crate::voting_keypair::VotingKeypair; use crate::voting_keypair::VotingKeypair;
@ -106,7 +105,7 @@ pub struct Fullnode {
node_services: NodeServices, node_services: NodeServices,
rotation_sender: TpuRotationSender, rotation_sender: TpuRotationSender,
rotation_receiver: TpuRotationReceiver, rotation_receiver: TpuRotationReceiver,
blob_sender: BlobSender, blocktree: Arc<Blocktree>,
} }
impl Fullnode { impl Fullnode {
@ -258,7 +257,7 @@ impl Fullnode {
// Setup channel for rotation indications // Setup channel for rotation indications
let (rotation_sender, rotation_receiver) = channel(); let (rotation_sender, rotation_receiver) = channel();
let (tvu, blob_sender) = Tvu::new( let tvu = Tvu::new(
voting_keypair_option, voting_keypair_option,
&bank, &bank,
blob_index, blob_index,
@ -293,7 +292,7 @@ impl Fullnode {
&last_entry_id, &last_entry_id,
id, id,
&rotation_sender, &rotation_sender,
&blob_sender, &blocktree,
scheduled_leader == id, scheduled_leader == id,
); );
@ -313,7 +312,7 @@ impl Fullnode {
broadcast_socket: node.sockets.broadcast, broadcast_socket: node.sockets.broadcast,
rotation_sender, rotation_sender,
rotation_receiver, rotation_receiver,
blob_sender, blocktree,
} }
} }
@ -395,7 +394,7 @@ impl Fullnode {
&last_entry_id, &last_entry_id,
self.id, self.id,
&self.rotation_sender, &self.rotation_sender,
&self.blob_sender, &self.blocktree,
); );
transition transition

View File

@ -3,6 +3,7 @@
use crate::bank::Bank; use crate::bank::Bank;
use crate::banking_stage::BankingStage; use crate::banking_stage::BankingStage;
use crate::blocktree::Blocktree;
use crate::broadcast_service::BroadcastService; use crate::broadcast_service::BroadcastService;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
@ -10,7 +11,6 @@ use crate::fetch_stage::FetchStage;
use crate::poh_service::PohServiceConfig; use crate::poh_service::PohServiceConfig;
use crate::service::Service; use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage; use crate::sigverify_stage::SigVerifyStage;
use crate::streamer::BlobSender;
use crate::tpu_forwarder::TpuForwarder; use crate::tpu_forwarder::TpuForwarder;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
@ -84,7 +84,7 @@ impl Tpu {
last_entry_id: &Hash, last_entry_id: &Hash,
leader_id: Pubkey, leader_id: Pubkey,
to_validator_sender: &TpuRotationSender, to_validator_sender: &TpuRotationSender,
blob_sender: &BlobSender, blocktree: &Arc<Blocktree>,
is_leader: bool, is_leader: bool,
) -> Self { ) -> Self {
let mut tpu = Self { let mut tpu = Self {
@ -105,7 +105,7 @@ impl Tpu {
last_entry_id, last_entry_id,
leader_id, leader_id,
to_validator_sender, to_validator_sender,
blob_sender, blocktree,
); );
} else { } else {
tpu.switch_to_forwarder(transactions_sockets, cluster_info); tpu.switch_to_forwarder(transactions_sockets, cluster_info);
@ -150,7 +150,7 @@ impl Tpu {
last_entry_id: &Hash, last_entry_id: &Hash,
leader_id: Pubkey, leader_id: Pubkey,
to_validator_sender: &TpuRotationSender, to_validator_sender: &TpuRotationSender,
blob_sender: &BlobSender, blocktree: &Arc<Blocktree>,
) { ) {
self.tpu_mode_close(); self.tpu_mode_close();
@ -186,7 +186,7 @@ impl Tpu {
entry_receiver, entry_receiver,
max_tick_height, max_tick_height,
self.exit.clone(), self.exit.clone(),
blob_sender, blocktree,
); );
let svcs = LeaderServices::new( let svcs = LeaderServices::new(

View File

@ -21,7 +21,6 @@ use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage; use crate::retransmit_stage::RetransmitStage;
use crate::service::Service; use crate::service::Service;
use crate::storage_stage::{StorageStage, StorageState}; use crate::storage_stage::{StorageStage, StorageState};
use crate::streamer::BlobSender;
use crate::tpu::{TpuReturnType, TpuRotationReceiver, TpuRotationSender}; use crate::tpu::{TpuReturnType, TpuRotationReceiver, TpuRotationSender};
use crate::voting_keypair::VotingKeypair; use crate::voting_keypair::VotingKeypair;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
@ -79,7 +78,7 @@ impl Tvu {
entry_stream: Option<&String>, entry_stream: Option<&String>,
ledger_signal_sender: SyncSender<bool>, ledger_signal_sender: SyncSender<bool>,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
) -> (Self, BlobSender) { ) -> Self {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let keypair: Arc<Keypair> = cluster_info let keypair: Arc<Keypair> = cluster_info
.read() .read()
@ -156,18 +155,15 @@ impl Tvu {
&cluster_info, &cluster_info,
); );
( Tvu {
Tvu { fetch_stage,
fetch_stage, retransmit_stage,
retransmit_stage, replay_stage,
replay_stage, entry_stream_stage,
entry_stream_stage, storage_stage,
storage_stage, exit,
exit, last_entry_id: l_last_entry_id,
last_entry_id: l_last_entry_id, }
},
blob_fetch_sender,
)
} }
#[cfg(test)] #[cfg(test)]
@ -260,7 +256,7 @@ pub mod tests {
let vote_account_keypair = Arc::new(Keypair::new()); let vote_account_keypair = Arc::new(Keypair::new());
let voting_keypair = VotingKeypair::new_local(&vote_account_keypair); let voting_keypair = VotingKeypair::new_local(&vote_account_keypair);
let (sender, _receiver) = channel(); let (sender, _receiver) = channel();
let (tvu, _blob_sender) = Tvu::new( let tvu = Tvu::new(
Some(Arc::new(voting_keypair)), Some(Arc::new(voting_keypair)),
&bank, &bank,
0, 0,