Clean up exit flag handing across TVU

This commit is contained in:
Michael Vines
2019-03-04 20:50:02 -08:00
committed by Grimes
parent 2a849ae268
commit 6ab6e6cb9b
21 changed files with 97 additions and 138 deletions

View File

@ -81,12 +81,7 @@ fn main() -> Result<()> {
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
read_channels.push(r_reader); read_channels.push(r_reader);
read_threads.push(receiver( read_threads.push(receiver(Arc::new(read), &exit, s_reader, "bench-streamer"));
Arc::new(read),
exit.clone(),
s_reader,
"bench-streamer",
));
} }
let t_producer1 = producer(&addr, exit.clone()); let t_producer1 = producer(&addr, exit.clone());

View File

@ -9,6 +9,7 @@ use solana::cluster_info::ClusterInfo;
use solana::cluster_info::Node; use solana::cluster_info::Node;
use solana::packet::to_packets_chunked; use solana::packet::to_packets_chunked;
use solana::poh_recorder::WorkingBankEntries; use solana::poh_recorder::WorkingBankEntries;
use solana::service::Service;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::hash; use solana_sdk::hash::hash;
@ -17,6 +18,7 @@ use solana_sdk::signature::{KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}; use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES};
use std::iter; use std::iter;
use std::sync::atomic::Ordering;
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
@ -102,7 +104,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect()) (x, iter::repeat(1).take(len).collect())
}) })
.collect(); .collect();
let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
@ -127,7 +129,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
start += half_len; start += half_len;
start %= verified.len(); start %= verified.len();
}); });
poh_service.close().unwrap(); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
} }
#[bench] #[bench]
@ -208,7 +211,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect()) (x, iter::repeat(1).take(len).collect())
}) })
.collect(); .collect();
let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
@ -233,5 +236,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
start += half_len; start += half_len;
start %= verified.len(); start %= verified.len();
}); });
poh_service.close().unwrap(); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
} }

View File

@ -347,6 +347,7 @@ impl Service for BankingStage {
pub fn create_test_recorder( pub fn create_test_recorder(
bank: &Arc<Bank>, bank: &Arc<Bank>,
) -> ( ) -> (
Arc<AtomicBool>,
Arc<Mutex<PohRecorder>>, Arc<Mutex<PohRecorder>>,
PohService, PohService,
Receiver<WorkingBankEntries>, Receiver<WorkingBankEntries>,
@ -356,7 +357,7 @@ pub fn create_test_recorder(
PohRecorder::new(bank.tick_height(), bank.last_blockhash()); PohRecorder::new(bank.tick_height(), bank.last_blockhash());
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit); let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit);
(poh_recorder, poh_service, entry_receiver) (exit, poh_recorder, poh_service, entry_receiver)
} }
#[cfg(test)] #[cfg(test)]
@ -378,13 +379,14 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
drop(verified_sender); drop(verified_sender);
exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap(); banking_stage.join().unwrap();
poh_service.close().unwrap(); poh_service.join().unwrap();
} }
#[test] #[test]
@ -395,7 +397,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);
@ -403,7 +405,8 @@ mod tests {
trace!("sending bank"); trace!("sending bank");
sleep(Duration::from_millis(600)); sleep(Duration::from_millis(600));
drop(verified_sender); drop(verified_sender);
poh_service.close().unwrap(); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder); drop(poh_recorder);
trace!("getting entries"); trace!("getting entries");
@ -424,7 +427,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);
@ -452,7 +455,8 @@ mod tests {
.unwrap(); .unwrap();
drop(verified_sender); drop(verified_sender);
poh_service.close().expect("close"); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder); drop(poh_recorder);
//receive entries + ticks //receive entries + ticks
@ -481,7 +485,7 @@ mod tests {
let (genesis_block, mint_keypair) = GenesisBlock::new(2); let (genesis_block, mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);
@ -516,7 +520,8 @@ mod tests {
.unwrap(); .unwrap();
drop(verified_sender); drop(verified_sender);
poh_service.close().expect("close");; exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder); drop(poh_recorder);
// Collect the ledger and feed it to a new bank. // Collect the ledger and feed it to a new bank.

View File

@ -3,34 +3,29 @@
use crate::service::Service; use crate::service::Service;
use crate::streamer::{self, BlobSender}; use crate::streamer::{self, BlobSender};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use std::thread::{self, JoinHandle}; use std::thread::{self, JoinHandle};
pub struct BlobFetchStage { pub struct BlobFetchStage {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
impl BlobFetchStage { impl BlobFetchStage {
pub fn new(socket: Arc<UdpSocket>, sender: &BlobSender, exit: Arc<AtomicBool>) -> Self { pub fn new(socket: Arc<UdpSocket>, sender: &BlobSender, exit: &Arc<AtomicBool>) -> Self {
Self::new_multi_socket(vec![socket], sender, exit) Self::new_multi_socket(vec![socket], sender, exit)
} }
pub fn new_multi_socket( pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>, sockets: Vec<Arc<UdpSocket>>,
sender: &BlobSender, sender: &BlobSender,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
let thread_hdls: Vec<_> = sockets let thread_hdls: Vec<_> = sockets
.into_iter() .into_iter()
.map(|socket| streamer::blob_receiver(socket, exit.clone(), sender.clone())) .map(|socket| streamer::blob_receiver(socket, &exit, sender.clone()))
.collect(); .collect();
Self { exit, thread_hdls } Self { thread_hdls }
}
pub fn close(&self) {
self.exit.store(true, Ordering::Relaxed);
} }
} }

View File

@ -27,9 +27,10 @@ impl BlockstreamService {
slot_full_receiver: Receiver<(u64, Pubkey)>, slot_full_receiver: Receiver<(u64, Pubkey)>,
blocktree: Arc<Blocktree>, blocktree: Arc<Blocktree>,
blockstream_socket: String, blockstream_socket: String,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
let mut blockstream = Blockstream::new(blockstream_socket); let mut blockstream = Blockstream::new(blockstream_socket);
let exit = exit.clone();
let t_blockstream = Builder::new() let t_blockstream = Builder::new()
.name("solana-blockstream".to_string()) .name("solana-blockstream".to_string())
.spawn(move || loop { .spawn(move || loop {

View File

@ -868,8 +868,9 @@ impl ClusterInfo {
obj: Arc<RwLock<Self>>, obj: Arc<RwLock<Self>>,
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
blob_sender: BlobSender, blob_sender: BlobSender,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let exit = exit.clone();
Builder::new() Builder::new()
.name("solana-gossip".to_string()) .name("solana-gossip".to_string())
.spawn(move || { .spawn(move || {
@ -1243,8 +1244,9 @@ impl ClusterInfo {
blocktree: Option<Arc<Blocktree>>, blocktree: Option<Arc<Blocktree>>,
requests_receiver: BlobReceiver, requests_receiver: BlobReceiver,
response_sender: BlobSender, response_sender: BlobSender,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let exit = exit.clone();
Builder::new() Builder::new()
.name("solana-listen".to_string()) .name("solana-listen".to_string())
.spawn(move || loop { .spawn(move || loop {

View File

@ -134,12 +134,7 @@ mod test {
let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = receiver( let t_receiver = receiver(Arc::new(read), &exit, s_reader, "window-streamer-test");
Arc::new(read),
exit.clone(),
s_reader,
"window-streamer-test",
);
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);

View File

@ -33,7 +33,7 @@ impl FetchStage {
) -> Self { ) -> Self {
let thread_hdls: Vec<_> = sockets let thread_hdls: Vec<_> = sockets
.into_iter() .into_iter()
.map(|socket| streamer::receiver(socket, exit.clone(), sender.clone(), "fetch-stage")) .map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage"))
.collect(); .collect();
Self { thread_hdls } Self { thread_hdls }

View File

@ -264,6 +264,7 @@ impl Fullnode {
// Used for notifying many nodes in parallel to exit // Used for notifying many nodes in parallel to exit
pub fn exit(&self) { pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
// Need to force the poh_recorder to drop the WorkingBank, // Need to force the poh_recorder to drop the WorkingBank,
// which contains the channel to BroadcastStage. This should be // which contains the channel to BroadcastStage. This should be
// sufficient as long as no other rotations are happening that // sufficient as long as no other rotations are happening that
@ -272,7 +273,6 @@ impl Fullnode {
// in motion because exit()/close() are only called by the run() loop // in motion because exit()/close() are only called by the run() loop
// which is the sole initiator of rotations. // which is the sole initiator of rotations.
self.poh_recorder.lock().unwrap().clear_bank(); self.poh_recorder.lock().unwrap().clear_bank();
self.poh_service.exit();
} }
pub fn close(self) -> Result<()> { pub fn close(self) -> Result<()> {

View File

@ -34,8 +34,7 @@ impl GossipService {
&cluster_info.read().unwrap().my_data().id, &cluster_info.read().unwrap().my_data().id,
gossip_socket.local_addr().unwrap() gossip_socket.local_addr().unwrap()
); );
let t_receiver = let t_receiver = streamer::blob_receiver(gossip_socket.clone(), &exit, request_sender);
streamer::blob_receiver(gossip_socket.clone(), exit.clone(), request_sender);
let (response_sender, response_receiver) = channel(); let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); let t_responder = streamer::responder("gossip", gossip_socket, response_receiver);
let t_listen = ClusterInfo::listen( let t_listen = ClusterInfo::listen(
@ -43,14 +42,9 @@ impl GossipService {
blocktree, blocktree,
request_receiver, request_receiver,
response_sender.clone(), response_sender.clone(),
exit.clone(), exit,
);
let t_gossip = ClusterInfo::gossip(
cluster_info.clone(),
bank_forks,
response_sender,
exit.clone(),
); );
let t_gossip = ClusterInfo::gossip(cluster_info.clone(), bank_forks, response_sender, exit);
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Self { thread_hdls } Self { thread_hdls }
} }

View File

@ -31,19 +31,9 @@ impl Default for PohServiceConfig {
pub struct PohService { pub struct PohService {
tick_producer: JoinHandle<()>, tick_producer: JoinHandle<()>,
poh_exit: Arc<AtomicBool>,
} }
impl PohService { impl PohService {
pub fn exit(&self) {
self.poh_exit.store(true, Ordering::Relaxed);
}
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
pub fn new( pub fn new(
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
config: &PohServiceConfig, config: &PohServiceConfig,
@ -64,10 +54,7 @@ impl PohService {
}) })
.unwrap(); .unwrap();
Self { Self { tick_producer }
tick_producer,
poh_exit: poh_exit.clone(),
}
} }
fn tick_producer( fn tick_producer(
@ -157,7 +144,7 @@ mod tests {
let poh_service = PohService::new( let poh_service = PohService::new(
poh_recorder.clone(), poh_recorder.clone(),
&PohServiceConfig::Tick(HASHES_PER_TICK as usize), &PohServiceConfig::Tick(HASHES_PER_TICK as usize),
&Arc::new(AtomicBool::new(false)), &exit,
); );
poh_recorder.lock().unwrap().set_working_bank(working_bank); poh_recorder.lock().unwrap().set_working_bank(working_bank);
@ -192,7 +179,6 @@ mod tests {
} }
} }
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service.exit();
let _ = poh_service.join().unwrap(); let _ = poh_service.join().unwrap();
let _ = entry_producer.join().unwrap(); let _ = entry_producer.join().unwrap();
} }

View File

@ -45,7 +45,7 @@ pub struct RepairService {
impl RepairService { impl RepairService {
fn run( fn run(
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
exit: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
repair_socket: &Arc<UdpSocket>, repair_socket: &Arc<UdpSocket>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
) { ) {
@ -112,13 +112,14 @@ impl RepairService {
pub fn new( pub fn new(
blocktree: Arc<Blocktree>, blocktree: Arc<Blocktree>,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
) -> Self { ) -> Self {
let exit = exit.clone();
let t_repair = Builder::new() let t_repair = Builder::new()
.name("solana-repair-service".to_string()) .name("solana-repair-service".to_string())
.spawn(move || Self::run(&blocktree, &exit, &repair_socket, &cluster_info)) .spawn(move || Self::run(&blocktree, exit, &repair_socket, &cluster_info))
.unwrap(); .unwrap();
RepairService { t_repair } RepairService { t_repair }

View File

@ -48,7 +48,6 @@ impl Drop for Finalizer {
pub struct ReplayStage { pub struct ReplayStage {
t_replay: JoinHandle<result::Result<()>>, t_replay: JoinHandle<result::Result<()>>,
exit: Arc<AtomicBool>,
} }
impl ReplayStage { impl ReplayStage {
@ -59,7 +58,7 @@ impl ReplayStage {
blocktree: Arc<Blocktree>, blocktree: Arc<Blocktree>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
@ -199,7 +198,7 @@ impl ReplayStage {
}) })
.unwrap(); .unwrap();
( (
Self { t_replay, exit }, Self { t_replay },
slot_full_receiver, slot_full_receiver,
forward_entry_receiver, forward_entry_receiver,
) )
@ -259,15 +258,6 @@ impl ReplayStage {
result result
} }
pub fn close(self) -> thread::Result<()> {
self.exit();
self.join()
}
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
}
pub fn verify_and_process_entries( pub fn verify_and_process_entries(
bank: &Bank, bank: &Bank,
entries: &[Entry], entries: &[Entry],
@ -345,7 +335,6 @@ mod test {
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -368,22 +357,21 @@ mod test {
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
// Set up the replay stage // Set up the replay stage
let exit = Arc::new(AtomicBool::new(false));
let voting_keypair = Arc::new(Keypair::new());
{ {
let voting_keypair = Arc::new(Keypair::new());
let (bank_forks, _bank_forks_info, blocktree, l_receiver) = let (bank_forks, _bank_forks_info, blocktree, l_receiver) =
new_banks_from_blocktree(&my_ledger_path, None); new_banks_from_blocktree(&my_ledger_path, None);
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
let blocktree = Arc::new(blocktree); let blocktree = Arc::new(blocktree);
let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank);
let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new( let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(), my_keypair.pubkey(),
Some(voting_keypair.clone()), Some(voting_keypair.clone()),
blocktree.clone(), blocktree.clone(),
&Arc::new(RwLock::new(bank_forks)), &Arc::new(RwLock::new(bank_forks)),
cluster_info_me.clone(), cluster_info_me.clone(),
exit.clone(), &exit,
l_receiver, l_receiver,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::default()),
&poh_recorder, &poh_recorder,
@ -403,10 +391,9 @@ mod test {
assert_eq!(next_tick[0], received_tick[0]); assert_eq!(next_tick[0], received_tick[0]);
replay_stage exit.store(true, Ordering::Relaxed);
.close() replay_stage.join().unwrap();
.expect("Expect successful ReplayStage exit"); poh_service.join().unwrap();
poh_service.close().unwrap();
} }
let _ignored = remove_dir_all(&my_ledger_path); let _ignored = remove_dir_all(&my_ledger_path);
} }

View File

@ -171,8 +171,7 @@ impl Replicator {
node.sockets.tvu.into_iter().map(Arc::new).collect(); node.sockets.tvu.into_iter().map(Arc::new).collect();
blob_sockets.push(repair_socket.clone()); blob_sockets.push(repair_socket.clone());
let (blob_fetch_sender, blob_fetch_receiver) = channel(); let (blob_fetch_sender, blob_fetch_receiver) = channel();
let fetch_stage = let fetch_stage = BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, &exit);
BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, exit.clone());
// todo: pull blobs off the retransmit_receiver and recycle them? // todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
@ -183,7 +182,7 @@ impl Replicator {
blob_fetch_receiver, blob_fetch_receiver,
retransmit_sender, retransmit_sender,
repair_socket, repair_socket,
exit.clone(), &exit,
); );
info!("window created, waiting for ledger download done"); info!("window created, waiting for ledger download done");

View File

@ -113,7 +113,7 @@ impl RetransmitStage {
retransmit_socket: Arc<UdpSocket>, retransmit_socket: Arc<UdpSocket>,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver, fetch_stage_receiver: BlobReceiver,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();

View File

@ -19,7 +19,7 @@ pub type BlobReceiver = Receiver<SharedBlobs>;
fn recv_loop( fn recv_loop(
sock: &UdpSocket, sock: &UdpSocket,
exit: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
channel: &PacketSender, channel: &PacketSender,
channel_tag: &'static str, channel_tag: &'static str,
) -> Result<()> { ) -> Result<()> {
@ -47,7 +47,7 @@ fn recv_loop(
pub fn receiver( pub fn receiver(
sock: Arc<UdpSocket>, sock: Arc<UdpSocket>,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
packet_sender: PacketSender, packet_sender: PacketSender,
sender_tag: &'static str, sender_tag: &'static str,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
@ -55,10 +55,11 @@ pub fn receiver(
if res.is_err() { if res.is_err() {
panic!("streamer::receiver set_read_timeout error"); panic!("streamer::receiver set_read_timeout error");
} }
let exit = exit.clone();
Builder::new() Builder::new()
.name("solana-receiver".to_string()) .name("solana-receiver".to_string())
.spawn(move || { .spawn(move || {
let _ = recv_loop(&sock, &exit, &packet_sender, sender_tag); let _ = recv_loop(&sock, exit, &packet_sender, sender_tag);
}) })
.unwrap() .unwrap()
} }
@ -116,12 +117,17 @@ fn recv_blobs(sock: &UdpSocket, s: &BlobSender) -> Result<()> {
Ok(()) Ok(())
} }
pub fn blob_receiver(sock: Arc<UdpSocket>, exit: Arc<AtomicBool>, s: BlobSender) -> JoinHandle<()> { pub fn blob_receiver(
sock: Arc<UdpSocket>,
exit: &Arc<AtomicBool>,
s: BlobSender,
) -> JoinHandle<()> {
//DOCUMENTED SIDE-EFFECT //DOCUMENTED SIDE-EFFECT
//1 second timeout on socket read //1 second timeout on socket read
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer)) sock.set_read_timeout(Some(timer))
.expect("set socket timeout"); .expect("set socket timeout");
let exit = exit.clone();
Builder::new() Builder::new()
.name("solana-blob_receiver".to_string()) .name("solana-blob_receiver".to_string())
.spawn(move || loop { .spawn(move || loop {
@ -173,7 +179,7 @@ mod test {
let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader, "streamer-test"); let t_receiver = receiver(Arc::new(read), &exit, s_reader, "streamer-test");
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);

View File

@ -89,8 +89,7 @@ impl Tvu {
let mut blob_sockets: Vec<Arc<UdpSocket>> = let mut blob_sockets: Vec<Arc<UdpSocket>> =
fetch_sockets.into_iter().map(Arc::new).collect(); fetch_sockets.into_iter().map(Arc::new).collect();
blob_sockets.push(repair_socket.clone()); blob_sockets.push(repair_socket.clone());
let fetch_stage = let fetch_stage = BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, &exit);
BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, exit.clone());
//TODO //TODO
//the packets coming out of blob_receiver need to be sent to the GPU and verified //the packets coming out of blob_receiver need to be sent to the GPU and verified
@ -102,7 +101,7 @@ impl Tvu {
Arc::new(retransmit_socket), Arc::new(retransmit_socket),
repair_socket, repair_socket,
blob_fetch_receiver, blob_fetch_receiver,
exit.clone(), &exit,
); );
let (replay_stage, slot_full_receiver, forward_entry_receiver) = ReplayStage::new( let (replay_stage, slot_full_receiver, forward_entry_receiver) = ReplayStage::new(
@ -111,7 +110,7 @@ impl Tvu {
blocktree.clone(), blocktree.clone(),
&bank_forks, &bank_forks,
cluster_info.clone(), cluster_info.clone(),
exit.clone(), &exit,
ledger_signal_receiver, ledger_signal_receiver,
subscriptions, subscriptions,
poh_recorder, poh_recorder,
@ -122,7 +121,7 @@ impl Tvu {
slot_full_receiver, slot_full_receiver,
blocktree.clone(), blocktree.clone(),
blockstream.unwrap().to_string(), blockstream.unwrap().to_string(),
exit.clone(), &exit,
); );
Some(blockstream_service) Some(blockstream_service)
} else { } else {
@ -134,7 +133,7 @@ impl Tvu {
forward_entry_receiver, forward_entry_receiver,
Some(blocktree), Some(blocktree),
&keypair, &keypair,
&exit.clone(), &exit,
bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still
storage_rotate_count, storage_rotate_count,
&cluster_info, &cluster_info,
@ -202,8 +201,7 @@ pub mod tests {
let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path) let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path)
.expect("Expected to successfully open ledger"); .expect("Expected to successfully open ledger");
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank);
let exit = Arc::new(AtomicBool::new(false));
let tvu = Tvu::new( let tvu = Tvu::new(
Some(Arc::new(Keypair::new())), Some(Arc::new(Keypair::new())),
&Arc::new(RwLock::new(bank_forks)), &Arc::new(RwLock::new(bank_forks)),
@ -227,6 +225,6 @@ pub mod tests {
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
tvu.join().unwrap(); tvu.join().unwrap();
poh_service.close().unwrap(); poh_service.join().unwrap();
} }
} }

View File

@ -214,12 +214,7 @@ mod test {
let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = receiver( let t_receiver = receiver(Arc::new(read), &exit, s_reader, "window-streamer-test");
Arc::new(read),
exit.clone(),
s_reader,
"window-streamer-test",
);
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);

View File

@ -102,19 +102,15 @@ impl WindowService {
r: BlobReceiver, r: BlobReceiver,
retransmit: BlobSender, retransmit: BlobSender,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> WindowService { ) -> WindowService {
let exit_ = exit.clone(); let repair_service =
let repair_service = RepairService::new( RepairService::new(blocktree.clone(), exit, repair_socket, cluster_info.clone());
blocktree.clone(), let exit = exit.clone();
exit.clone(),
repair_socket,
cluster_info.clone(),
);
let t_window = Builder::new() let t_window = Builder::new()
.name("solana-window".to_string()) .name("solana-window".to_string())
.spawn(move || { .spawn(move || {
let _exit = Finalizer::new(exit_); let _exit = Finalizer::new(exit.clone());
let id = cluster_info.read().unwrap().id(); let id = cluster_info.read().unwrap().id();
trace!("{}: RECV_WINDOW started", id); trace!("{}: RECV_WINDOW started", id);
loop { loop {
@ -182,8 +178,7 @@ mod test {
let subs = Arc::new(RwLock::new(cluster_info_me)); let subs = Arc::new(RwLock::new(cluster_info_me));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = let t_receiver = blob_receiver(Arc::new(leader_node.sockets.gossip), &exit, s_reader);
blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader);
let (s_retransmit, r_retransmit) = channel(); let (s_retransmit, r_retransmit) = channel();
let blocktree_path = get_tmp_ledger_path!(); let blocktree_path = get_tmp_ledger_path!();
let blocktree = Arc::new( let blocktree = Arc::new(
@ -195,7 +190,7 @@ mod test {
r_reader, r_reader,
s_retransmit, s_retransmit,
Arc::new(leader_node.sockets.repair), Arc::new(leader_node.sockets.repair),
exit.clone(), &exit,
); );
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
@ -254,8 +249,7 @@ mod test {
let subs = Arc::new(RwLock::new(cluster_info_me)); let subs = Arc::new(RwLock::new(cluster_info_me));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = let t_receiver = blob_receiver(Arc::new(leader_node.sockets.gossip), &exit, s_reader);
blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader);
let (s_retransmit, r_retransmit) = channel(); let (s_retransmit, r_retransmit) = channel();
let blocktree_path = get_tmp_ledger_path!(); let blocktree_path = get_tmp_ledger_path!();
let blocktree = Arc::new( let blocktree = Arc::new(
@ -267,7 +261,7 @@ mod test {
r_reader, r_reader,
s_retransmit, s_retransmit,
Arc::new(leader_node.sockets.repair), Arc::new(leader_node.sockets.repair),
exit.clone(), &exit,
); );
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();

View File

@ -156,7 +156,7 @@ fn test_replicator_startup_basic() {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let repair_socket = Arc::new(tn.sockets.repair); let repair_socket = Arc::new(tn.sockets.repair);
let t_receiver = blob_receiver(repair_socket.clone(), exit.clone(), s_reader); let t_receiver = blob_receiver(repair_socket.clone(), &exit, s_reader);
info!( info!(
"Sending repair requests from: {} to: {}", "Sending repair requests from: {} to: {}",

View File

@ -67,7 +67,7 @@ fn test_replay() {
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> = target2.sockets.tvu.into_iter().map(Arc::new).collect(); let blob_sockets: Vec<Arc<UdpSocket>> = target2.sockets.tvu.into_iter().map(Arc::new).collect();
let t_receiver = streamer::blob_receiver(blob_sockets[0].clone(), exit.clone(), s_reader); let t_receiver = streamer::blob_receiver(blob_sockets[0].clone(), &exit, s_reader);
// simulate leader sending messages // simulate leader sending messages
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
@ -111,7 +111,8 @@ fn test_replay() {
.expect("Expected to successfully open ledger"); .expect("Expected to successfully open ledger");
let vote_account_keypair = Arc::new(Keypair::new()); let vote_account_keypair = Arc::new(Keypair::new());
let voting_keypair = VotingKeypair::new_local(&vote_account_keypair); let voting_keypair = VotingKeypair::new_local(&vote_account_keypair);
let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank);
let tvu = Tvu::new( let tvu = Tvu::new(
Some(Arc::new(voting_keypair)), Some(Arc::new(voting_keypair)),
&Arc::new(RwLock::new(bank_forks)), &Arc::new(RwLock::new(bank_forks)),
@ -185,13 +186,14 @@ fn test_replay() {
assert_eq!(bob_balance, starting_balance - alice_ref_balance); assert_eq!(bob_balance, starting_balance - alice_ref_balance);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service.close().expect("close"); poh_service_exit.store(true, Ordering::Relaxed);
tvu.join().expect("join"); poh_service.join().unwrap();
dr_l.join().expect("join"); tvu.join().unwrap();
dr_2.join().expect("join"); dr_l.join().unwrap();
dr_1.join().expect("join"); dr_2.join().unwrap();
t_receiver.join().expect("join"); dr_1.join().unwrap();
t_responder.join().expect("join"); t_receiver.join().unwrap();
t_responder.join().unwrap();
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&blocktree_path); let _ignored = remove_dir_all(&blocktree_path);
} }