diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index d538454733..520cdb99da 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -19,7 +19,7 @@ pub struct BankForks { working_bank: Arc, root: u64, slots: HashSet, - snapshot_path: Option, + snapshot_path: Option, confidence: HashMap, } @@ -198,14 +198,14 @@ impl BankForks { .retain(|slot, _| descendants[&root].contains(slot)); self.confidence .retain(|slot, _| slot == &root || descendants[&root].contains(slot)); - if self.snapshot_path.is_some() { + if let Some(snapshot_path) = &self.snapshot_path { let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect(); trace!("prune non root {} - {:?}", root, diff); for slot in diff.iter() { if **slot > root { let _ = self.add_snapshot(**slot, root); } else { - BankForks::remove_snapshot(**slot, &self.snapshot_path); + BankForks::remove_snapshot(**slot, &snapshot_path); } } } @@ -252,12 +252,8 @@ impl BankForks { Error::new(ErrorKind::Other, error) } - fn get_snapshot_path(path: &Option) -> PathBuf { - Path::new(&path.clone().unwrap()).to_path_buf() - } - pub fn add_snapshot(&self, slot: u64, root: u64) -> Result<(), Error> { - let path = BankForks::get_snapshot_path(&self.snapshot_path); + let path = self.snapshot_path.as_ref().expect("no snapshot_path"); fs::create_dir_all(path.clone())?; let bank_file = format!("{}", slot); let bank_file_path = path.join(bank_file); @@ -286,15 +282,14 @@ impl BankForks { Ok(()) } - pub fn remove_snapshot(slot: u64, path: &Option) { - let path = BankForks::get_snapshot_path(path); + pub fn remove_snapshot(slot: u64, path: &Path) { let bank_file = format!("{}", slot); let bank_file_path = path.join(bank_file); let _ = fs::remove_file(bank_file_path); } - pub fn set_snapshot_config(&mut self, path: Option) { - self.snapshot_path = path; + pub fn set_snapshot_path(&mut self, snapshot_path: &Path) { + self.snapshot_path = Some(snapshot_path.to_path_buf()); } fn load_snapshots( @@ -302,14 +297,13 @@ impl BankForks { bank0: &mut Bank, bank_maps: &mut Vec<(u64, u64, Bank)>, status_cache_rc: &StatusCacheRc, - snapshot_path: &Option, + snapshot_path: &Path, ) -> Option { - let path = BankForks::get_snapshot_path(snapshot_path); let mut bank_root: Option = None; for bank_slot in names.iter().rev() { let bank_path = format!("{}", bank_slot); - let bank_file_path = path.join(bank_path.clone()); + let bank_file_path = snapshot_path.join(bank_path.clone()); info!("Load from {:?}", bank_file_path); let file = File::open(bank_file_path); if file.is_err() { @@ -385,10 +379,9 @@ impl BankForks { pub fn load_from_snapshot( genesis_block: &GenesisBlock, account_paths: Option, - snapshot_path: &Option, + snapshot_path: &Path, ) -> Result { - let path = BankForks::get_snapshot_path(snapshot_path); - let paths = fs::read_dir(path)?; + let paths = fs::read_dir(snapshot_path)?; let mut names = paths .filter_map(|entry| { entry.ok().and_then(|e| { @@ -427,7 +420,7 @@ impl BankForks { working_bank, root, slots, - snapshot_path: snapshot_path.clone(), + snapshot_path: Some(snapshot_path.to_path_buf()), confidence: HashMap::new(), }) } @@ -615,9 +608,10 @@ mod tests { account_paths: Option, last_slot: u64, ) { + let snapshot_path = bank_forks.snapshot_path.as_ref().unwrap(); + let new = - BankForks::load_from_snapshot(&genesis_block, account_paths, &bank_forks.snapshot_path) - .unwrap(); + BankForks::load_from_snapshot(&genesis_block, account_paths, snapshot_path).unwrap(); for (slot, _) in new.banks.iter() { if *slot > 0 { let bank = bank_forks.banks.get(slot).unwrap().clone(); @@ -627,7 +621,7 @@ mod tests { } assert_eq!(new.working_bank().slot(), last_slot); for (slot, _) in new.banks.iter() { - BankForks::remove_snapshot(*slot, &bank_forks.snapshot_path); + BankForks::remove_snapshot(*slot, snapshot_path); } } @@ -648,7 +642,7 @@ mod tests { bank0.freeze(); let slot = bank0.slot(); let mut bank_forks = BankForks::new(0, bank0); - bank_forks.set_snapshot_config(Some(spath.paths.clone())); + bank_forks.set_snapshot_path(&PathBuf::from(&spath.paths)); bank_forks.add_snapshot(slot, 0).unwrap(); for forks in 0..index { let bank = Bank::new_from_parent(&bank_forks[forks], &Pubkey::default(), forks + 1); diff --git a/core/src/blockstream.rs b/core/src/blockstream.rs index 4480d5de0e..56cf35f546 100644 --- a/core/src/blockstream.rs +++ b/core/src/blockstream.rs @@ -10,6 +10,7 @@ use serde_json::json; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::cell::RefCell; +use std::path::{Path, PathBuf}; pub trait EntryWriter: std::fmt::Debug { fn write(&self, payload: String) -> Result<()>; @@ -41,7 +42,7 @@ impl EntryVec { #[derive(Debug)] pub struct EntrySocket { - socket: String, + unix_socket: PathBuf, } impl EntryWriter for EntrySocket { @@ -50,11 +51,10 @@ impl EntryWriter for EntrySocket { use std::io::prelude::*; use std::net::Shutdown; use std::os::unix::net::UnixStream; - use std::path::Path; const MESSAGE_TERMINATOR: &str = "\n"; - let mut socket = UnixStream::connect(Path::new(&self.socket))?; + let mut socket = UnixStream::connect(&self.unix_socket)?; socket.write_all(payload.as_bytes())?; socket.write_all(MESSAGE_TERMINATOR.as_bytes())?; socket.shutdown(Shutdown::Write)?; @@ -144,9 +144,11 @@ where pub type SocketBlockstream = Blockstream; impl SocketBlockstream { - pub fn new(socket: String) -> Self { + pub fn new(unix_socket: &Path) -> Self { Blockstream { - output: EntrySocket { socket }, + output: EntrySocket { + unix_socket: unix_socket.to_path_buf(), + }, } } } @@ -154,7 +156,7 @@ impl SocketBlockstream { pub type MockBlockstream = Blockstream; impl MockBlockstream { - pub fn new(_: String) -> Self { + pub fn new(_: &Path) -> Self { Blockstream { output: EntryVec::new(), } @@ -183,6 +185,7 @@ mod test { use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; use std::collections::HashSet; + use std::path::PathBuf; #[test] fn test_serialize_transactions() { @@ -205,7 +208,7 @@ mod test { #[test] fn test_blockstream() -> () { - let blockstream = MockBlockstream::new("test_stream".to_string()); + let blockstream = MockBlockstream::new(&PathBuf::from("test_stream")); let ticks_per_slot = 5; let mut blockhash = Hash::default(); diff --git a/core/src/blockstream_service.rs b/core/src/blockstream_service.rs index 337fa442c3..953c69b0f2 100644 --- a/core/src/blockstream_service.rs +++ b/core/src/blockstream_service.rs @@ -11,6 +11,7 @@ use crate::blocktree::Blocktree; use crate::result::{Error, Result}; use crate::service::Service; use solana_sdk::pubkey::Pubkey; +use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::Arc; @@ -26,10 +27,10 @@ impl BlockstreamService { pub fn new( slot_full_receiver: Receiver<(u64, Pubkey)>, blocktree: Arc, - blockstream_socket: String, + unix_socket: &Path, exit: &Arc, ) -> Self { - let mut blockstream = Blockstream::new(blockstream_socket); + let mut blockstream = Blockstream::new(unix_socket); let exit = exit.clone(); let t_blockstream = Builder::new() .name("solana-blockstream".to_string()) @@ -116,6 +117,7 @@ mod test { use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; + use std::path::PathBuf; use std::sync::mpsc::channel; #[test] @@ -133,7 +135,7 @@ mod test { let blocktree = Blocktree::open(&ledger_path).unwrap(); // Set up blockstream - let mut blockstream = Blockstream::new("test_stream".to_string()); + let mut blockstream = Blockstream::new(&PathBuf::from("test_stream")); // Set up dummy channel to receive a full-slot notification let (slot_full_sender, slot_full_receiver) = channel(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 0813f77d86..ad2ca18b68 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -28,6 +28,7 @@ use crate::storage_stage::{StorageStage, StorageState}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; +use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; @@ -65,7 +66,7 @@ impl Tvu { sockets: Sockets, blocktree: Arc, storage_state: &StorageState, - blockstream: Option<&String>, + blockstream_unix_socket: Option<&PathBuf>, max_ledger_slots: Option, ledger_signal_receiver: Receiver, subscriptions: &Arc, @@ -131,11 +132,11 @@ impl Tvu { vec![blockstream_slot_sender, ledger_cleanup_slot_sender], ); - let blockstream_service = if blockstream.is_some() { + let blockstream_service = if blockstream_unix_socket.is_some() { let blockstream_service = BlockstreamService::new( blockstream_slot_receiver, blocktree.clone(), - blockstream.unwrap().to_string(), + blockstream_unix_socket.unwrap(), &exit, ); Some(blockstream_service) diff --git a/core/src/validator.rs b/core/src/validator.rs index c6e8ee2581..42104ebd13 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -36,11 +36,11 @@ use std::thread::Result; pub struct ValidatorConfig { pub sigverify_disabled: bool, pub voting_disabled: bool, - pub blockstream: Option, + pub blockstream_unix_socket: Option, pub storage_slots_per_turn: u64, pub account_paths: Option, pub rpc_config: JsonRpcConfig, - pub snapshot_path: Option, + pub snapshot_path: Option, pub max_ledger_slots: Option, pub broadcast_stage_type: BroadcastStageType, pub erasure_config: ErasureConfig, @@ -51,7 +51,7 @@ impl Default for ValidatorConfig { Self { sigverify_disabled: false, voting_disabled: false, - blockstream: None, + blockstream_unix_socket: None, storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN, max_ledger_slots: None, account_paths: None, @@ -105,7 +105,7 @@ impl Validator { ) = new_banks_from_blocktree( ledger_path, config.account_paths.clone(), - config.snapshot_path.clone(), + config.snapshot_path.as_ref(), verify_ledger, ); @@ -249,7 +249,7 @@ impl Validator { sockets, blocktree.clone(), &storage_state, - config.blockstream.as_ref(), + config.blockstream_unix_socket.as_ref(), config.max_ledger_slots, ledger_signal_receiver, &subscriptions, @@ -307,12 +307,12 @@ fn get_bank_forks( genesis_block: &GenesisBlock, blocktree: &Blocktree, account_paths: Option, - snapshot_path: Option, + snapshot_path: Option<&PathBuf>, verify_ledger: bool, ) -> (BankForks, Vec, LeaderScheduleCache) { - if snapshot_path.is_some() { + if let Some(snapshot_path) = snapshot_path { let bank_forks = - BankForks::load_from_snapshot(&genesis_block, account_paths.clone(), &snapshot_path); + BankForks::load_from_snapshot(&genesis_block, account_paths.clone(), snapshot_path); match bank_forks { Ok(v) => { let bank = &v.working_bank(); @@ -333,8 +333,8 @@ fn get_bank_forks( verify_ledger, ) .expect("process_blocktree failed"); - if snapshot_path.is_some() { - bank_forks.set_snapshot_config(snapshot_path); + if let Some(snapshot_path) = snapshot_path { + bank_forks.set_snapshot_path(snapshot_path); let _ = bank_forks.add_snapshot(0, 0); } (bank_forks, bank_forks_info, leader_schedule_cache) @@ -343,7 +343,7 @@ fn get_bank_forks( pub fn new_banks_from_blocktree( blocktree_path: &Path, account_paths: Option, - snapshot_path: Option, + snapshot_path: Option<&PathBuf>, verify_ledger: bool, ) -> ( BankForks, diff --git a/validator/src/main.rs b/validator/src/main.rs index 082e4f3c2c..603f9bb78e 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -33,11 +33,11 @@ fn main() { let matches = App::new(crate_name!()).about(crate_description!()) .version(crate_version!()) .arg( - Arg::with_name("blockstream") + Arg::with_name("blockstream_unix_socket") .long("blockstream") .takes_value(true) .value_name("UNIX DOMAIN SOCKET") - .help("Open blockstream at this unix domain socket location") + .help("Open blockstream at this unix domain socket path") ) .arg( Arg::with_name("identity") @@ -215,9 +215,8 @@ fn main() { validator_config.voting_disabled = matches.is_present("no_voting"); - if matches.is_present("enable_rpc_exit") { - validator_config.rpc_config.enable_fullnode_exit = true; - } + validator_config.rpc_config.enable_fullnode_exit = matches.is_present("enable_rpc_exit"); + validator_config.rpc_config.drone_addr = matches.value_of("rpc_drone_addr").map(|address| { solana_netutil::parse_host_port(address).expect("failed to parse drone address") }); @@ -234,12 +233,8 @@ fn main() { ), ); - if let Some(paths) = matches.value_of("accounts") { - validator_config.account_paths = Some(paths.to_string()); - } - if let Some(paths) = matches.value_of("snapshot_path") { - validator_config.snapshot_path = Some(paths.to_string()); - } + validator_config.account_paths = matches.value_of("accounts").map(ToString::to_string); + validator_config.snapshot_path = matches.value_of("snapshot_path").map(PathBuf::from); if matches.is_present("limit_ledger_size") { validator_config.max_ledger_slots = Some(DEFAULT_MAX_LEDGER_SLOTS); } @@ -265,7 +260,9 @@ fn main() { (Some(signer_service), signer_addr) }; let init_complete_file = matches.value_of("init_complete_file"); - validator_config.blockstream = matches.value_of("blockstream").map(ToString::to_string); + validator_config.blockstream_unix_socket = matches + .value_of("blockstream_unix_socket") + .map(PathBuf::from); let keypair = Arc::new(keypair); let mut node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr, dynamic_port_range);