ValidatorConfig path reform: use Path/PathBuf for paths (#5353)

This commit is contained in:
Michael Vines
2019-07-30 19:47:24 -07:00
committed by GitHub
parent b7e08052ae
commit 4a336eb5ff
6 changed files with 56 additions and 59 deletions

View File

@ -19,7 +19,7 @@ pub struct BankForks {
working_bank: Arc<Bank>, working_bank: Arc<Bank>,
root: u64, root: u64,
slots: HashSet<u64>, slots: HashSet<u64>,
snapshot_path: Option<String>, snapshot_path: Option<PathBuf>,
confidence: HashMap<u64, Confidence>, confidence: HashMap<u64, Confidence>,
} }
@ -198,14 +198,14 @@ impl BankForks {
.retain(|slot, _| descendants[&root].contains(slot)); .retain(|slot, _| descendants[&root].contains(slot));
self.confidence self.confidence
.retain(|slot, _| slot == &root || descendants[&root].contains(slot)); .retain(|slot, _| slot == &root || descendants[&root].contains(slot));
if self.snapshot_path.is_some() { if let Some(snapshot_path) = &self.snapshot_path {
let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect(); let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect();
trace!("prune non root {} - {:?}", root, diff); trace!("prune non root {} - {:?}", root, diff);
for slot in diff.iter() { for slot in diff.iter() {
if **slot > root { if **slot > root {
let _ = self.add_snapshot(**slot, root); let _ = self.add_snapshot(**slot, root);
} else { } else {
BankForks::remove_snapshot(**slot, &self.snapshot_path); BankForks::remove_snapshot(**slot, &snapshot_path);
} }
} }
} }
@ -252,12 +252,8 @@ impl BankForks {
Error::new(ErrorKind::Other, error) Error::new(ErrorKind::Other, error)
} }
fn get_snapshot_path(path: &Option<String>) -> PathBuf {
Path::new(&path.clone().unwrap()).to_path_buf()
}
pub fn add_snapshot(&self, slot: u64, root: u64) -> Result<(), Error> { pub fn add_snapshot(&self, slot: u64, root: u64) -> Result<(), Error> {
let path = BankForks::get_snapshot_path(&self.snapshot_path); let path = self.snapshot_path.as_ref().expect("no snapshot_path");
fs::create_dir_all(path.clone())?; fs::create_dir_all(path.clone())?;
let bank_file = format!("{}", slot); let bank_file = format!("{}", slot);
let bank_file_path = path.join(bank_file); let bank_file_path = path.join(bank_file);
@ -286,15 +282,14 @@ impl BankForks {
Ok(()) Ok(())
} }
pub fn remove_snapshot(slot: u64, path: &Option<String>) { pub fn remove_snapshot(slot: u64, path: &Path) {
let path = BankForks::get_snapshot_path(path);
let bank_file = format!("{}", slot); let bank_file = format!("{}", slot);
let bank_file_path = path.join(bank_file); let bank_file_path = path.join(bank_file);
let _ = fs::remove_file(bank_file_path); let _ = fs::remove_file(bank_file_path);
} }
pub fn set_snapshot_config(&mut self, path: Option<String>) { pub fn set_snapshot_path(&mut self, snapshot_path: &Path) {
self.snapshot_path = path; self.snapshot_path = Some(snapshot_path.to_path_buf());
} }
fn load_snapshots( fn load_snapshots(
@ -302,14 +297,13 @@ impl BankForks {
bank0: &mut Bank, bank0: &mut Bank,
bank_maps: &mut Vec<(u64, u64, Bank)>, bank_maps: &mut Vec<(u64, u64, Bank)>,
status_cache_rc: &StatusCacheRc, status_cache_rc: &StatusCacheRc,
snapshot_path: &Option<String>, snapshot_path: &Path,
) -> Option<u64> { ) -> Option<u64> {
let path = BankForks::get_snapshot_path(snapshot_path);
let mut bank_root: Option<u64> = None; let mut bank_root: Option<u64> = None;
for bank_slot in names.iter().rev() { for bank_slot in names.iter().rev() {
let bank_path = format!("{}", bank_slot); let bank_path = format!("{}", bank_slot);
let bank_file_path = path.join(bank_path.clone()); let bank_file_path = snapshot_path.join(bank_path.clone());
info!("Load from {:?}", bank_file_path); info!("Load from {:?}", bank_file_path);
let file = File::open(bank_file_path); let file = File::open(bank_file_path);
if file.is_err() { if file.is_err() {
@ -385,10 +379,9 @@ impl BankForks {
pub fn load_from_snapshot( pub fn load_from_snapshot(
genesis_block: &GenesisBlock, genesis_block: &GenesisBlock,
account_paths: Option<String>, account_paths: Option<String>,
snapshot_path: &Option<String>, snapshot_path: &Path,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let path = BankForks::get_snapshot_path(snapshot_path); let paths = fs::read_dir(snapshot_path)?;
let paths = fs::read_dir(path)?;
let mut names = paths let mut names = paths
.filter_map(|entry| { .filter_map(|entry| {
entry.ok().and_then(|e| { entry.ok().and_then(|e| {
@ -427,7 +420,7 @@ impl BankForks {
working_bank, working_bank,
root, root,
slots, slots,
snapshot_path: snapshot_path.clone(), snapshot_path: Some(snapshot_path.to_path_buf()),
confidence: HashMap::new(), confidence: HashMap::new(),
}) })
} }
@ -615,9 +608,10 @@ mod tests {
account_paths: Option<String>, account_paths: Option<String>,
last_slot: u64, last_slot: u64,
) { ) {
let snapshot_path = bank_forks.snapshot_path.as_ref().unwrap();
let new = let new =
BankForks::load_from_snapshot(&genesis_block, account_paths, &bank_forks.snapshot_path) BankForks::load_from_snapshot(&genesis_block, account_paths, snapshot_path).unwrap();
.unwrap();
for (slot, _) in new.banks.iter() { for (slot, _) in new.banks.iter() {
if *slot > 0 { if *slot > 0 {
let bank = bank_forks.banks.get(slot).unwrap().clone(); let bank = bank_forks.banks.get(slot).unwrap().clone();
@ -627,7 +621,7 @@ mod tests {
} }
assert_eq!(new.working_bank().slot(), last_slot); assert_eq!(new.working_bank().slot(), last_slot);
for (slot, _) in new.banks.iter() { for (slot, _) in new.banks.iter() {
BankForks::remove_snapshot(*slot, &bank_forks.snapshot_path); BankForks::remove_snapshot(*slot, snapshot_path);
} }
} }
@ -648,7 +642,7 @@ mod tests {
bank0.freeze(); bank0.freeze();
let slot = bank0.slot(); let slot = bank0.slot();
let mut bank_forks = BankForks::new(0, bank0); let mut bank_forks = BankForks::new(0, bank0);
bank_forks.set_snapshot_config(Some(spath.paths.clone())); bank_forks.set_snapshot_path(&PathBuf::from(&spath.paths));
bank_forks.add_snapshot(slot, 0).unwrap(); bank_forks.add_snapshot(slot, 0).unwrap();
for forks in 0..index { for forks in 0..index {
let bank = Bank::new_from_parent(&bank_forks[forks], &Pubkey::default(), forks + 1); let bank = Bank::new_from_parent(&bank_forks[forks], &Pubkey::default(), forks + 1);

View File

@ -10,6 +10,7 @@ use serde_json::json;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::cell::RefCell; use std::cell::RefCell;
use std::path::{Path, PathBuf};
pub trait EntryWriter: std::fmt::Debug { pub trait EntryWriter: std::fmt::Debug {
fn write(&self, payload: String) -> Result<()>; fn write(&self, payload: String) -> Result<()>;
@ -41,7 +42,7 @@ impl EntryVec {
#[derive(Debug)] #[derive(Debug)]
pub struct EntrySocket { pub struct EntrySocket {
socket: String, unix_socket: PathBuf,
} }
impl EntryWriter for EntrySocket { impl EntryWriter for EntrySocket {
@ -50,11 +51,10 @@ impl EntryWriter for EntrySocket {
use std::io::prelude::*; use std::io::prelude::*;
use std::net::Shutdown; use std::net::Shutdown;
use std::os::unix::net::UnixStream; use std::os::unix::net::UnixStream;
use std::path::Path;
const MESSAGE_TERMINATOR: &str = "\n"; const MESSAGE_TERMINATOR: &str = "\n";
let mut socket = UnixStream::connect(Path::new(&self.socket))?; let mut socket = UnixStream::connect(&self.unix_socket)?;
socket.write_all(payload.as_bytes())?; socket.write_all(payload.as_bytes())?;
socket.write_all(MESSAGE_TERMINATOR.as_bytes())?; socket.write_all(MESSAGE_TERMINATOR.as_bytes())?;
socket.shutdown(Shutdown::Write)?; socket.shutdown(Shutdown::Write)?;
@ -144,9 +144,11 @@ where
pub type SocketBlockstream = Blockstream<EntrySocket>; pub type SocketBlockstream = Blockstream<EntrySocket>;
impl SocketBlockstream { impl SocketBlockstream {
pub fn new(socket: String) -> Self { pub fn new(unix_socket: &Path) -> Self {
Blockstream { Blockstream {
output: EntrySocket { socket }, output: EntrySocket {
unix_socket: unix_socket.to_path_buf(),
},
} }
} }
} }
@ -154,7 +156,7 @@ impl SocketBlockstream {
pub type MockBlockstream = Blockstream<EntryVec>; pub type MockBlockstream = Blockstream<EntryVec>;
impl MockBlockstream { impl MockBlockstream {
pub fn new(_: String) -> Self { pub fn new(_: &Path) -> Self {
Blockstream { Blockstream {
output: EntryVec::new(), output: EntryVec::new(),
} }
@ -183,6 +185,7 @@ mod test {
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction; use solana_sdk::system_transaction;
use std::collections::HashSet; use std::collections::HashSet;
use std::path::PathBuf;
#[test] #[test]
fn test_serialize_transactions() { fn test_serialize_transactions() {
@ -205,7 +208,7 @@ mod test {
#[test] #[test]
fn test_blockstream() -> () { fn test_blockstream() -> () {
let blockstream = MockBlockstream::new("test_stream".to_string()); let blockstream = MockBlockstream::new(&PathBuf::from("test_stream"));
let ticks_per_slot = 5; let ticks_per_slot = 5;
let mut blockhash = Hash::default(); let mut blockhash = Hash::default();

View File

@ -11,6 +11,7 @@ use crate::blocktree::Blocktree;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::Arc; use std::sync::Arc;
@ -26,10 +27,10 @@ impl BlockstreamService {
pub fn new( pub fn new(
slot_full_receiver: Receiver<(u64, Pubkey)>, slot_full_receiver: Receiver<(u64, Pubkey)>,
blocktree: Arc<Blocktree>, blocktree: Arc<Blocktree>,
blockstream_socket: String, unix_socket: &Path,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
let mut blockstream = Blockstream::new(blockstream_socket); let mut blockstream = Blockstream::new(unix_socket);
let exit = exit.clone(); let exit = exit.clone();
let t_blockstream = Builder::new() let t_blockstream = Builder::new()
.name("solana-blockstream".to_string()) .name("solana-blockstream".to_string())
@ -116,6 +117,7 @@ mod test {
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction; use solana_sdk::system_transaction;
use std::path::PathBuf;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
#[test] #[test]
@ -133,7 +135,7 @@ mod test {
let blocktree = Blocktree::open(&ledger_path).unwrap(); let blocktree = Blocktree::open(&ledger_path).unwrap();
// Set up blockstream // Set up blockstream
let mut blockstream = Blockstream::new("test_stream".to_string()); let mut blockstream = Blockstream::new(&PathBuf::from("test_stream"));
// Set up dummy channel to receive a full-slot notification // Set up dummy channel to receive a full-slot notification
let (slot_full_sender, slot_full_receiver) = channel(); let (slot_full_sender, slot_full_receiver) = channel();

View File

@ -28,6 +28,7 @@ use crate::storage_stage::{StorageStage, StorageState};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
@ -65,7 +66,7 @@ impl Tvu {
sockets: Sockets, sockets: Sockets,
blocktree: Arc<Blocktree>, blocktree: Arc<Blocktree>,
storage_state: &StorageState, storage_state: &StorageState,
blockstream: Option<&String>, blockstream_unix_socket: Option<&PathBuf>,
max_ledger_slots: Option<u64>, max_ledger_slots: Option<u64>,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
@ -131,11 +132,11 @@ impl Tvu {
vec![blockstream_slot_sender, ledger_cleanup_slot_sender], vec![blockstream_slot_sender, ledger_cleanup_slot_sender],
); );
let blockstream_service = if blockstream.is_some() { let blockstream_service = if blockstream_unix_socket.is_some() {
let blockstream_service = BlockstreamService::new( let blockstream_service = BlockstreamService::new(
blockstream_slot_receiver, blockstream_slot_receiver,
blocktree.clone(), blocktree.clone(),
blockstream.unwrap().to_string(), blockstream_unix_socket.unwrap(),
&exit, &exit,
); );
Some(blockstream_service) Some(blockstream_service)

View File

@ -36,11 +36,11 @@ use std::thread::Result;
pub struct ValidatorConfig { pub struct ValidatorConfig {
pub sigverify_disabled: bool, pub sigverify_disabled: bool,
pub voting_disabled: bool, pub voting_disabled: bool,
pub blockstream: Option<String>, pub blockstream_unix_socket: Option<PathBuf>,
pub storage_slots_per_turn: u64, pub storage_slots_per_turn: u64,
pub account_paths: Option<String>, pub account_paths: Option<String>,
pub rpc_config: JsonRpcConfig, pub rpc_config: JsonRpcConfig,
pub snapshot_path: Option<String>, pub snapshot_path: Option<PathBuf>,
pub max_ledger_slots: Option<u64>, pub max_ledger_slots: Option<u64>,
pub broadcast_stage_type: BroadcastStageType, pub broadcast_stage_type: BroadcastStageType,
pub erasure_config: ErasureConfig, pub erasure_config: ErasureConfig,
@ -51,7 +51,7 @@ impl Default for ValidatorConfig {
Self { Self {
sigverify_disabled: false, sigverify_disabled: false,
voting_disabled: false, voting_disabled: false,
blockstream: None, blockstream_unix_socket: None,
storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN, storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN,
max_ledger_slots: None, max_ledger_slots: None,
account_paths: None, account_paths: None,
@ -105,7 +105,7 @@ impl Validator {
) = new_banks_from_blocktree( ) = new_banks_from_blocktree(
ledger_path, ledger_path,
config.account_paths.clone(), config.account_paths.clone(),
config.snapshot_path.clone(), config.snapshot_path.as_ref(),
verify_ledger, verify_ledger,
); );
@ -249,7 +249,7 @@ impl Validator {
sockets, sockets,
blocktree.clone(), blocktree.clone(),
&storage_state, &storage_state,
config.blockstream.as_ref(), config.blockstream_unix_socket.as_ref(),
config.max_ledger_slots, config.max_ledger_slots,
ledger_signal_receiver, ledger_signal_receiver,
&subscriptions, &subscriptions,
@ -307,12 +307,12 @@ fn get_bank_forks(
genesis_block: &GenesisBlock, genesis_block: &GenesisBlock,
blocktree: &Blocktree, blocktree: &Blocktree,
account_paths: Option<String>, account_paths: Option<String>,
snapshot_path: Option<String>, snapshot_path: Option<&PathBuf>,
verify_ledger: bool, verify_ledger: bool,
) -> (BankForks, Vec<BankForksInfo>, LeaderScheduleCache) { ) -> (BankForks, Vec<BankForksInfo>, LeaderScheduleCache) {
if snapshot_path.is_some() { if let Some(snapshot_path) = snapshot_path {
let bank_forks = let bank_forks =
BankForks::load_from_snapshot(&genesis_block, account_paths.clone(), &snapshot_path); BankForks::load_from_snapshot(&genesis_block, account_paths.clone(), snapshot_path);
match bank_forks { match bank_forks {
Ok(v) => { Ok(v) => {
let bank = &v.working_bank(); let bank = &v.working_bank();
@ -333,8 +333,8 @@ fn get_bank_forks(
verify_ledger, verify_ledger,
) )
.expect("process_blocktree failed"); .expect("process_blocktree failed");
if snapshot_path.is_some() { if let Some(snapshot_path) = snapshot_path {
bank_forks.set_snapshot_config(snapshot_path); bank_forks.set_snapshot_path(snapshot_path);
let _ = bank_forks.add_snapshot(0, 0); let _ = bank_forks.add_snapshot(0, 0);
} }
(bank_forks, bank_forks_info, leader_schedule_cache) (bank_forks, bank_forks_info, leader_schedule_cache)
@ -343,7 +343,7 @@ fn get_bank_forks(
pub fn new_banks_from_blocktree( pub fn new_banks_from_blocktree(
blocktree_path: &Path, blocktree_path: &Path,
account_paths: Option<String>, account_paths: Option<String>,
snapshot_path: Option<String>, snapshot_path: Option<&PathBuf>,
verify_ledger: bool, verify_ledger: bool,
) -> ( ) -> (
BankForks, BankForks,

View File

@ -33,11 +33,11 @@ fn main() {
let matches = App::new(crate_name!()).about(crate_description!()) let matches = App::new(crate_name!()).about(crate_description!())
.version(crate_version!()) .version(crate_version!())
.arg( .arg(
Arg::with_name("blockstream") Arg::with_name("blockstream_unix_socket")
.long("blockstream") .long("blockstream")
.takes_value(true) .takes_value(true)
.value_name("UNIX DOMAIN SOCKET") .value_name("UNIX DOMAIN SOCKET")
.help("Open blockstream at this unix domain socket location") .help("Open blockstream at this unix domain socket path")
) )
.arg( .arg(
Arg::with_name("identity") Arg::with_name("identity")
@ -215,9 +215,8 @@ fn main() {
validator_config.voting_disabled = matches.is_present("no_voting"); validator_config.voting_disabled = matches.is_present("no_voting");
if matches.is_present("enable_rpc_exit") { validator_config.rpc_config.enable_fullnode_exit = matches.is_present("enable_rpc_exit");
validator_config.rpc_config.enable_fullnode_exit = true;
}
validator_config.rpc_config.drone_addr = matches.value_of("rpc_drone_addr").map(|address| { validator_config.rpc_config.drone_addr = matches.value_of("rpc_drone_addr").map(|address| {
solana_netutil::parse_host_port(address).expect("failed to parse drone address") solana_netutil::parse_host_port(address).expect("failed to parse drone address")
}); });
@ -234,12 +233,8 @@ fn main() {
), ),
); );
if let Some(paths) = matches.value_of("accounts") { validator_config.account_paths = matches.value_of("accounts").map(ToString::to_string);
validator_config.account_paths = Some(paths.to_string()); validator_config.snapshot_path = matches.value_of("snapshot_path").map(PathBuf::from);
}
if let Some(paths) = matches.value_of("snapshot_path") {
validator_config.snapshot_path = Some(paths.to_string());
}
if matches.is_present("limit_ledger_size") { if matches.is_present("limit_ledger_size") {
validator_config.max_ledger_slots = Some(DEFAULT_MAX_LEDGER_SLOTS); validator_config.max_ledger_slots = Some(DEFAULT_MAX_LEDGER_SLOTS);
} }
@ -265,7 +260,9 @@ fn main() {
(Some(signer_service), signer_addr) (Some(signer_service), signer_addr)
}; };
let init_complete_file = matches.value_of("init_complete_file"); let init_complete_file = matches.value_of("init_complete_file");
validator_config.blockstream = matches.value_of("blockstream").map(ToString::to_string); validator_config.blockstream_unix_socket = matches
.value_of("blockstream_unix_socket")
.map(PathBuf::from);
let keypair = Arc::new(keypair); let keypair = Arc::new(keypair);
let mut node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr, dynamic_port_range); let mut node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr, dynamic_port_range);