From c85c4699aafe02c4edfc8acfdff6b167c1cbe61a Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 30 Jan 2020 20:44:53 -0800 Subject: [PATCH] validator: add --private-rpc flag (bp #8037) (#8054) automerge --- core/src/contact_info.rs | 14 ++- core/src/rpc.rs | 15 +-- core/src/validator.rs | 186 ++++++++++++++++------------- local-cluster/src/local_cluster.rs | 10 ++ validator/src/main.rs | 25 ++-- 5 files changed, 145 insertions(+), 105 deletions(-) diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index 69efba1176..feaaf334ec 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -299,8 +299,8 @@ mod tests { assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.tvu.port(), 12); assert_eq!(ci.tpu_forwards.port(), 13); - assert_eq!(ci.rpc.port(), 8899); - assert_eq!(ci.rpc_pubsub.port(), 8900); + assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT); + assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); assert!(ci.storage_addr.ip().is_unspecified()); } #[test] @@ -315,8 +315,14 @@ mod tests { assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236")); assert_eq!(d1.tpu_forwards, socketaddr!("127.0.0.1:1237")); assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234")); - assert_eq!(d1.rpc, socketaddr!("127.0.0.1:8899")); - assert_eq!(d1.rpc_pubsub, socketaddr!("127.0.0.1:8900")); + assert_eq!( + d1.rpc, + socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PORT)) + ); + assert_eq!( + d1.rpc_pubsub, + socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PUBSUB_PORT)) + ); } #[test] diff --git a/core/src/rpc.rs b/core/src/rpc.rs index f24a7138fd..9a738834f3 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -45,21 +45,12 @@ fn new_response(bank: &Bank, value: T) -> RpcResponse { Ok(Response { context, value }) } -#[derive(Debug, Clone)] +#[derive(Debug, Default, Clone)] pub struct JsonRpcConfig { pub enable_validator_exit: bool, // Enable the 'validatorExit' command pub faucet_addr: Option, } -impl Default for JsonRpcConfig { - fn default() -> Self { - Self { - enable_validator_exit: false, - faucet_addr: None, - } - } -} - #[derive(Clone)] pub struct JsonRpcRequestProcessor { bank_forks: Arc>, @@ -1124,6 +1115,7 @@ pub mod tests { fee_calculator::DEFAULT_BURN_PERCENT, hash::{hash, Hash}, instruction::InstructionError, + rpc_port, signature::{Keypair, KeypairUtil}, system_transaction, transaction::TransactionError, @@ -1362,8 +1354,9 @@ pub mod tests { .expect("actual response deserialization"); let expected = format!( - r#"{{"jsonrpc":"2.0","result":[{{"pubkey": "{}", "gossip": "127.0.0.1:1235", "tpu": "127.0.0.1:1234", "rpc": "127.0.0.1:8899"}}],"id":1}}"#, + r#"{{"jsonrpc":"2.0","result":[{{"pubkey": "{}", "gossip": "127.0.0.1:1235", "tpu": "127.0.0.1:1234", "rpc": "127.0.0.1:{}"}}],"id":1}}"#, leader_pubkey, + rpc_port::DEFAULT_RPC_PORT ); let expected: Response = diff --git a/core/src/validator.rs b/core/src/validator.rs index ec2df2dc35..c2f84bc871 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -63,6 +63,7 @@ pub struct ValidatorConfig { pub storage_slots_per_turn: u64, pub account_paths: Vec, pub rpc_config: JsonRpcConfig, + pub rpc_ports: Option<(u16, u16)>, // (API, PubSub) pub snapshot_config: Option, pub max_ledger_slots: Option, pub broadcast_stage_type: BroadcastStageType, @@ -86,6 +87,7 @@ impl Default for ValidatorConfig { max_ledger_slots: None, account_paths: Vec::new(), rpc_config: JsonRpcConfig::default(), + rpc_ports: None, snapshot_config: None, broadcast_stage_type: BroadcastStageType::Standard, enable_partition: None, @@ -116,8 +118,7 @@ impl ValidatorExit { pub struct Validator { pub id: Pubkey, validator_exit: Arc>>, - rpc_service: Option, - rpc_pubsub_service: Option, + rpc_service: Option<(JsonRpcService, PubSubService)>, transaction_status_service: Option, gossip_service: GossipService, poh_recorder: Arc>, @@ -128,7 +129,6 @@ pub struct Validator { } impl Validator { - #[allow(clippy::cognitive_complexity)] pub fn new( mut node: Node, keypair: &Arc, @@ -219,36 +219,36 @@ impl Validator { let blockstore = Arc::new(blockstore); - let rpc_service = if node.info.rpc.port() == 0 { - None - } else { - Some(JsonRpcService::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()), - config.rpc_config.clone(), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore.clone(), - cluster_info.clone(), - genesis_hash, - ledger_path, - storage_state.clone(), - validator_exit.clone(), - )) - }; - let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); - let rpc_pubsub_service = if node.info.rpc_pubsub.port() == 0 { - None - } else { - Some(PubSubService::new( - &subscriptions, - SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - node.info.rpc_pubsub.port(), + + let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| { + if ContactInfo::is_valid_address(&node.info.rpc) { + assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + assert_eq!(rpc_port, node.info.rpc.port()); + assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port()); + } else { + assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub)); + } + ( + JsonRpcService::new( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port), + config.rpc_config.clone(), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + genesis_hash, + ledger_path, + storage_state.clone(), + validator_exit.clone(), ), - &exit, - )) - }; + PubSubService::new( + &subscriptions, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port), + &exit, + ), + ) + }); let (transaction_status_sender, transaction_status_service) = if rpc_service.is_some() && !config.transaction_status_service_disabled { @@ -317,47 +317,7 @@ impl Validator { .set_entrypoint(entrypoint_info.clone()); } - if config.wait_for_supermajority { - info!( - "Waiting for more than 75% of activated stake at slot {} to be in gossip...", - bank.slot() - ); - loop { - let gossip_stake_percent = get_stake_percent_in_gossip(&bank, &cluster_info); - - info!("{}% of activated stake in gossip", gossip_stake_percent,); - if gossip_stake_percent > 75 { - break; - } - sleep(Duration::new(1, 0)); - } - } - - let sockets = Sockets { - repair: node - .sockets - .repair - .try_clone() - .expect("Failed to clone repair socket"), - retransmit: node - .sockets - .retransmit_sockets - .iter() - .map(|s| s.try_clone().expect("Failed to clone retransmit socket")) - .collect(), - fetch: node - .sockets - .tvu - .iter() - .map(|s| s.try_clone().expect("Failed to clone TVU Sockets")) - .collect(), - forwards: node - .sockets - .tvu_forwards - .iter() - .map(|s| s.try_clone().expect("Failed to clone TVU forwards Sockets")) - .collect(), - }; + wait_for_supermajority(config, &bank, &cluster_info); let voting_keypair = if config.voting_disabled { None @@ -378,7 +338,31 @@ impl Validator { storage_keypair, &bank_forks, &cluster_info, - sockets, + Sockets { + repair: node + .sockets + .repair + .try_clone() + .expect("Failed to clone repair socket"), + retransmit: node + .sockets + .retransmit_sockets + .iter() + .map(|s| s.try_clone().expect("Failed to clone retransmit socket")) + .collect(), + fetch: node + .sockets + .tvu + .iter() + .map(|s| s.try_clone().expect("Failed to clone TVU Sockets")) + .collect(), + forwards: node + .sockets + .tvu_forwards + .iter() + .map(|s| s.try_clone().expect("Failed to clone TVU forwards Sockets")) + .collect(), + }, blockstore.clone(), &storage_state, config.blockstream_unix_socket.as_ref(), @@ -420,7 +404,6 @@ impl Validator { id, gossip_service, rpc_service, - rpc_pubsub_service, transaction_status_service, tpu, tvu, @@ -471,10 +454,8 @@ impl Validator { pub fn join(self) -> Result<()> { self.poh_service.join()?; drop(self.poh_recorder); - if let Some(rpc_service) = self.rpc_service { + if let Some((rpc_service, rpc_pubsub_service)) = self.rpc_service { rpc_service.join()?; - } - if let Some(rpc_pubsub_service) = self.rpc_pubsub_service { rpc_pubsub_service.join()?; } if let Some(transaction_status_service) = self.transaction_status_service { @@ -575,6 +556,30 @@ fn new_banks_from_blockstore( ) } +fn wait_for_supermajority( + config: &ValidatorConfig, + bank: &Arc, + cluster_info: &Arc>, +) { + if !config.wait_for_supermajority { + return; + } + + info!( + "Waiting for more than 75% of activated stake at slot {} to be in gossip...", + bank.slot() + ); + loop { + let gossip_stake_percent = get_stake_percent_in_gossip(&bank, &cluster_info); + + info!("{}% of activated stake in gossip", gossip_stake_percent,); + if gossip_stake_percent > 75 { + break; + } + sleep(Duration::new(1, 0)); + } +} + pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) { use crate::genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}; @@ -598,8 +603,11 @@ pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) { let leader_voting_keypair = Arc::new(voting_keypair); let storage_keypair = Arc::new(Keypair::new()); - let mut config = ValidatorConfig::default(); - config.transaction_status_service_disabled = true; + let config = ValidatorConfig { + transaction_status_service_disabled: true, + rpc_ports: Some((node.info.rpc.port(), node.info.rpc_pubsub.port())), + ..ValidatorConfig::default() + }; let node = Validator::new( node, &node_keypair, @@ -700,8 +708,14 @@ mod tests { let voting_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new()); - let mut config = ValidatorConfig::default(); - config.transaction_status_service_disabled = true; + let config = ValidatorConfig { + transaction_status_service_disabled: true, + rpc_ports: Some(( + validator_node.info.rpc.port(), + validator_node.info.rpc_pubsub.port(), + )), + ..ValidatorConfig::default() + }; let validator = Validator::new( validator_node, &Arc::new(validator_keypair), @@ -734,8 +748,14 @@ mod tests { ledger_paths.push(validator_ledger_path.clone()); let voting_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new()); - let mut config = ValidatorConfig::default(); - config.transaction_status_service_disabled = true; + let config = ValidatorConfig { + transaction_status_service_disabled: true, + rpc_ports: Some(( + validator_node.info.rpc.port(), + validator_node.info.rpc_pubsub.port(), + )), + ..ValidatorConfig::default() + }; Validator::new( validator_node, &Arc::new(validator_keypair), diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index a7bedf2b38..1d8b2c786f 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -207,6 +207,10 @@ impl LocalCluster { let leader_storage_keypair = Arc::new(storage_keypair); let leader_voting_keypair = Arc::new(voting_keypair); let mut leader_config = config.validator_configs[0].clone(); + leader_config.rpc_ports = Some(( + leader_node.info.rpc.port(), + leader_node.info.rpc_pubsub.port(), + )); leader_config.transaction_status_service_disabled = true; let leader_server = Validator::new( leader_node, @@ -351,6 +355,10 @@ impl LocalCluster { } let mut config = validator_config.clone(); + config.rpc_ports = Some(( + validator_node.info.rpc.port(), + validator_node.info.rpc_pubsub.port(), + )); config.transaction_status_service_disabled = true; let voting_keypair = Arc::new(voting_keypair); let validator_server = Validator::new( @@ -658,6 +666,8 @@ impl Cluster for LocalCluster { // Update the stored ContactInfo for this node let node = Node::new_localhost_with_pubkey(&pubkey); cluster_validator_info.info.contact_info = node.info.clone(); + cluster_validator_info.config.rpc_ports = + Some((node.info.rpc.port(), node.info.rpc_pubsub.port())); cluster_validator_info .config .transaction_status_service_disabled = true; diff --git a/validator/src/main.rs b/validator/src/main.rs index 430f223a33..0f0acb3c92 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -496,6 +496,12 @@ pub fn main() { .validator(port_validator) .help("RPC port to use for this node"), ) + .arg( + Arg::with_name("private_rpc") + .long("--private-rpc") + .takes_value(false) + .help("Do not publish the RPC port for use by other nodes") + ) .arg( Arg::with_name("enable_rpc_exit") .long("enable-rpc-exit") @@ -657,7 +663,7 @@ pub fn main() { let cuda = matches.is_present("cuda"); let no_genesis_fetch = matches.is_present("no_genesis_fetch"); let no_snapshot_fetch = matches.is_present("no_snapshot_fetch"); - let rpc_port = value_t!(matches, "rpc_port", u16); + let private_rpc = matches.is_present("private_rpc"); // Canonicalize ledger path to avoid issues with symlink creation let _ = fs::create_dir_all(&ledger_path); @@ -683,6 +689,9 @@ pub fn main() { solana_net_utils::parse_host_port(address).expect("failed to parse faucet address") }), }, + rpc_ports: value_t!(matches, "rpc_port", u16) + .ok() + .map(|rpc_port| (rpc_port, rpc_port + 1)), voting_disabled: matches.is_present("no_voting"), wait_for_supermajority: !matches.is_present("no_wait_for_supermajority"), ..ValidatorConfig::default() @@ -852,12 +861,14 @@ pub fn main() { let mut tcp_ports = vec![]; let mut node = Node::new_with_external_ip(&identity_keypair.pubkey(), &gossip_addr, dynamic_port_range); - if let Ok(rpc_port) = rpc_port { - let rpc_pubsub_port = rpc_port + 1; - node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc_port); - node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), rpc_pubsub_port); - tcp_ports = vec![rpc_port, rpc_pubsub_port]; - }; + + if !private_rpc { + if let Some((rpc_port, rpc_pubsub_port)) = validator_config.rpc_ports { + node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc_port); + node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), rpc_pubsub_port); + tcp_ports = vec![rpc_port, rpc_pubsub_port]; + } + } if let Some(ref cluster_entrypoint) = cluster_entrypoint { let udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];