validator: add --private-rpc flag (bp #8037) (#8054)

automerge
This commit is contained in:
mergify[bot]
2020-01-30 20:44:53 -08:00
committed by GitHub
parent 81add4d6bf
commit c85c4699aa
5 changed files with 145 additions and 105 deletions

View File

@ -299,8 +299,8 @@ mod tests {
assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.gossip.port(), 11);
assert_eq!(ci.tvu.port(), 12); assert_eq!(ci.tvu.port(), 12);
assert_eq!(ci.tpu_forwards.port(), 13); assert_eq!(ci.tpu_forwards.port(), 13);
assert_eq!(ci.rpc.port(), 8899); assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT);
assert_eq!(ci.rpc_pubsub.port(), 8900); assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
assert!(ci.storage_addr.ip().is_unspecified()); assert!(ci.storage_addr.ip().is_unspecified());
} }
#[test] #[test]
@ -315,8 +315,14 @@ mod tests {
assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236")); assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236"));
assert_eq!(d1.tpu_forwards, socketaddr!("127.0.0.1:1237")); assert_eq!(d1.tpu_forwards, socketaddr!("127.0.0.1:1237"));
assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234")); assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234"));
assert_eq!(d1.rpc, socketaddr!("127.0.0.1:8899")); assert_eq!(
assert_eq!(d1.rpc_pubsub, socketaddr!("127.0.0.1:8900")); d1.rpc,
socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PORT))
);
assert_eq!(
d1.rpc_pubsub,
socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PUBSUB_PORT))
);
} }
#[test] #[test]

View File

@ -45,21 +45,12 @@ fn new_response<T>(bank: &Bank, value: T) -> RpcResponse<T> {
Ok(Response { context, value }) Ok(Response { context, value })
} }
#[derive(Debug, Clone)] #[derive(Debug, Default, Clone)]
pub struct JsonRpcConfig { pub struct JsonRpcConfig {
pub enable_validator_exit: bool, // Enable the 'validatorExit' command pub enable_validator_exit: bool, // Enable the 'validatorExit' command
pub faucet_addr: Option<SocketAddr>, pub faucet_addr: Option<SocketAddr>,
} }
impl Default for JsonRpcConfig {
fn default() -> Self {
Self {
enable_validator_exit: false,
faucet_addr: None,
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct JsonRpcRequestProcessor { pub struct JsonRpcRequestProcessor {
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
@ -1124,6 +1115,7 @@ pub mod tests {
fee_calculator::DEFAULT_BURN_PERCENT, fee_calculator::DEFAULT_BURN_PERCENT,
hash::{hash, Hash}, hash::{hash, Hash},
instruction::InstructionError, instruction::InstructionError,
rpc_port,
signature::{Keypair, KeypairUtil}, signature::{Keypair, KeypairUtil},
system_transaction, system_transaction,
transaction::TransactionError, transaction::TransactionError,
@ -1362,8 +1354,9 @@ pub mod tests {
.expect("actual response deserialization"); .expect("actual response deserialization");
let expected = format!( let expected = format!(
r#"{{"jsonrpc":"2.0","result":[{{"pubkey": "{}", "gossip": "127.0.0.1:1235", "tpu": "127.0.0.1:1234", "rpc": "127.0.0.1:8899"}}],"id":1}}"#, r#"{{"jsonrpc":"2.0","result":[{{"pubkey": "{}", "gossip": "127.0.0.1:1235", "tpu": "127.0.0.1:1234", "rpc": "127.0.0.1:{}"}}],"id":1}}"#,
leader_pubkey, leader_pubkey,
rpc_port::DEFAULT_RPC_PORT
); );
let expected: Response = let expected: Response =

View File

@ -63,6 +63,7 @@ pub struct ValidatorConfig {
pub storage_slots_per_turn: u64, pub storage_slots_per_turn: u64,
pub account_paths: Vec<PathBuf>, pub account_paths: Vec<PathBuf>,
pub rpc_config: JsonRpcConfig, pub rpc_config: JsonRpcConfig,
pub rpc_ports: Option<(u16, u16)>, // (API, PubSub)
pub snapshot_config: Option<SnapshotConfig>, pub snapshot_config: Option<SnapshotConfig>,
pub max_ledger_slots: Option<u64>, pub max_ledger_slots: Option<u64>,
pub broadcast_stage_type: BroadcastStageType, pub broadcast_stage_type: BroadcastStageType,
@ -86,6 +87,7 @@ impl Default for ValidatorConfig {
max_ledger_slots: None, max_ledger_slots: None,
account_paths: Vec::new(), account_paths: Vec::new(),
rpc_config: JsonRpcConfig::default(), rpc_config: JsonRpcConfig::default(),
rpc_ports: None,
snapshot_config: None, snapshot_config: None,
broadcast_stage_type: BroadcastStageType::Standard, broadcast_stage_type: BroadcastStageType::Standard,
enable_partition: None, enable_partition: None,
@ -116,8 +118,7 @@ impl ValidatorExit {
pub struct Validator { pub struct Validator {
pub id: Pubkey, pub id: Pubkey,
validator_exit: Arc<RwLock<Option<ValidatorExit>>>, validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
rpc_service: Option<JsonRpcService>, rpc_service: Option<(JsonRpcService, PubSubService)>,
rpc_pubsub_service: Option<PubSubService>,
transaction_status_service: Option<TransactionStatusService>, transaction_status_service: Option<TransactionStatusService>,
gossip_service: GossipService, gossip_service: GossipService,
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
@ -128,7 +129,6 @@ pub struct Validator {
} }
impl Validator { impl Validator {
#[allow(clippy::cognitive_complexity)]
pub fn new( pub fn new(
mut node: Node, mut node: Node,
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
@ -219,36 +219,36 @@ impl Validator {
let blockstore = Arc::new(blockstore); let blockstore = Arc::new(blockstore);
let rpc_service = if node.info.rpc.port() == 0 {
None
} else {
Some(JsonRpcService::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()),
config.rpc_config.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
genesis_hash,
ledger_path,
storage_state.clone(),
validator_exit.clone(),
))
};
let subscriptions = Arc::new(RpcSubscriptions::new(&exit)); let subscriptions = Arc::new(RpcSubscriptions::new(&exit));
let rpc_pubsub_service = if node.info.rpc_pubsub.port() == 0 {
None let rpc_service = config.rpc_ports.map(|(rpc_port, rpc_pubsub_port)| {
} else { if ContactInfo::is_valid_address(&node.info.rpc) {
Some(PubSubService::new( assert!(ContactInfo::is_valid_address(&node.info.rpc_pubsub));
&subscriptions, assert_eq!(rpc_port, node.info.rpc.port());
SocketAddr::new( assert_eq!(rpc_pubsub_port, node.info.rpc_pubsub.port());
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), } else {
node.info.rpc_pubsub.port(), assert!(!ContactInfo::is_valid_address(&node.info.rpc_pubsub));
}
(
JsonRpcService::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port),
config.rpc_config.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
genesis_hash,
ledger_path,
storage_state.clone(),
validator_exit.clone(),
), ),
&exit, PubSubService::new(
)) &subscriptions,
}; SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port),
&exit,
),
)
});
let (transaction_status_sender, transaction_status_service) = let (transaction_status_sender, transaction_status_service) =
if rpc_service.is_some() && !config.transaction_status_service_disabled { if rpc_service.is_some() && !config.transaction_status_service_disabled {
@ -317,47 +317,7 @@ impl Validator {
.set_entrypoint(entrypoint_info.clone()); .set_entrypoint(entrypoint_info.clone());
} }
if config.wait_for_supermajority { wait_for_supermajority(config, &bank, &cluster_info);
info!(
"Waiting for more than 75% of activated stake at slot {} to be in gossip...",
bank.slot()
);
loop {
let gossip_stake_percent = get_stake_percent_in_gossip(&bank, &cluster_info);
info!("{}% of activated stake in gossip", gossip_stake_percent,);
if gossip_stake_percent > 75 {
break;
}
sleep(Duration::new(1, 0));
}
}
let sockets = Sockets {
repair: node
.sockets
.repair
.try_clone()
.expect("Failed to clone repair socket"),
retransmit: node
.sockets
.retransmit_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone retransmit socket"))
.collect(),
fetch: node
.sockets
.tvu
.iter()
.map(|s| s.try_clone().expect("Failed to clone TVU Sockets"))
.collect(),
forwards: node
.sockets
.tvu_forwards
.iter()
.map(|s| s.try_clone().expect("Failed to clone TVU forwards Sockets"))
.collect(),
};
let voting_keypair = if config.voting_disabled { let voting_keypair = if config.voting_disabled {
None None
@ -378,7 +338,31 @@ impl Validator {
storage_keypair, storage_keypair,
&bank_forks, &bank_forks,
&cluster_info, &cluster_info,
sockets, Sockets {
repair: node
.sockets
.repair
.try_clone()
.expect("Failed to clone repair socket"),
retransmit: node
.sockets
.retransmit_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone retransmit socket"))
.collect(),
fetch: node
.sockets
.tvu
.iter()
.map(|s| s.try_clone().expect("Failed to clone TVU Sockets"))
.collect(),
forwards: node
.sockets
.tvu_forwards
.iter()
.map(|s| s.try_clone().expect("Failed to clone TVU forwards Sockets"))
.collect(),
},
blockstore.clone(), blockstore.clone(),
&storage_state, &storage_state,
config.blockstream_unix_socket.as_ref(), config.blockstream_unix_socket.as_ref(),
@ -420,7 +404,6 @@ impl Validator {
id, id,
gossip_service, gossip_service,
rpc_service, rpc_service,
rpc_pubsub_service,
transaction_status_service, transaction_status_service,
tpu, tpu,
tvu, tvu,
@ -471,10 +454,8 @@ impl Validator {
pub fn join(self) -> Result<()> { pub fn join(self) -> Result<()> {
self.poh_service.join()?; self.poh_service.join()?;
drop(self.poh_recorder); drop(self.poh_recorder);
if let Some(rpc_service) = self.rpc_service { if let Some((rpc_service, rpc_pubsub_service)) = self.rpc_service {
rpc_service.join()?; rpc_service.join()?;
}
if let Some(rpc_pubsub_service) = self.rpc_pubsub_service {
rpc_pubsub_service.join()?; rpc_pubsub_service.join()?;
} }
if let Some(transaction_status_service) = self.transaction_status_service { if let Some(transaction_status_service) = self.transaction_status_service {
@ -575,6 +556,30 @@ fn new_banks_from_blockstore(
) )
} }
fn wait_for_supermajority(
config: &ValidatorConfig,
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) {
if !config.wait_for_supermajority {
return;
}
info!(
"Waiting for more than 75% of activated stake at slot {} to be in gossip...",
bank.slot()
);
loop {
let gossip_stake_percent = get_stake_percent_in_gossip(&bank, &cluster_info);
info!("{}% of activated stake in gossip", gossip_stake_percent,);
if gossip_stake_percent > 75 {
break;
}
sleep(Duration::new(1, 0));
}
}
pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) { pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) {
use crate::genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}; use crate::genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo};
@ -598,8 +603,11 @@ pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) {
let leader_voting_keypair = Arc::new(voting_keypair); let leader_voting_keypair = Arc::new(voting_keypair);
let storage_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new());
let mut config = ValidatorConfig::default(); let config = ValidatorConfig {
config.transaction_status_service_disabled = true; transaction_status_service_disabled: true,
rpc_ports: Some((node.info.rpc.port(), node.info.rpc_pubsub.port())),
..ValidatorConfig::default()
};
let node = Validator::new( let node = Validator::new(
node, node,
&node_keypair, &node_keypair,
@ -700,8 +708,14 @@ mod tests {
let voting_keypair = Arc::new(Keypair::new()); let voting_keypair = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new());
let mut config = ValidatorConfig::default(); let config = ValidatorConfig {
config.transaction_status_service_disabled = true; transaction_status_service_disabled: true,
rpc_ports: Some((
validator_node.info.rpc.port(),
validator_node.info.rpc_pubsub.port(),
)),
..ValidatorConfig::default()
};
let validator = Validator::new( let validator = Validator::new(
validator_node, validator_node,
&Arc::new(validator_keypair), &Arc::new(validator_keypair),
@ -734,8 +748,14 @@ mod tests {
ledger_paths.push(validator_ledger_path.clone()); ledger_paths.push(validator_ledger_path.clone());
let voting_keypair = Arc::new(Keypair::new()); let voting_keypair = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new());
let mut config = ValidatorConfig::default(); let config = ValidatorConfig {
config.transaction_status_service_disabled = true; transaction_status_service_disabled: true,
rpc_ports: Some((
validator_node.info.rpc.port(),
validator_node.info.rpc_pubsub.port(),
)),
..ValidatorConfig::default()
};
Validator::new( Validator::new(
validator_node, validator_node,
&Arc::new(validator_keypair), &Arc::new(validator_keypair),

View File

@ -207,6 +207,10 @@ impl LocalCluster {
let leader_storage_keypair = Arc::new(storage_keypair); let leader_storage_keypair = Arc::new(storage_keypair);
let leader_voting_keypair = Arc::new(voting_keypair); let leader_voting_keypair = Arc::new(voting_keypair);
let mut leader_config = config.validator_configs[0].clone(); let mut leader_config = config.validator_configs[0].clone();
leader_config.rpc_ports = Some((
leader_node.info.rpc.port(),
leader_node.info.rpc_pubsub.port(),
));
leader_config.transaction_status_service_disabled = true; leader_config.transaction_status_service_disabled = true;
let leader_server = Validator::new( let leader_server = Validator::new(
leader_node, leader_node,
@ -351,6 +355,10 @@ impl LocalCluster {
} }
let mut config = validator_config.clone(); let mut config = validator_config.clone();
config.rpc_ports = Some((
validator_node.info.rpc.port(),
validator_node.info.rpc_pubsub.port(),
));
config.transaction_status_service_disabled = true; config.transaction_status_service_disabled = true;
let voting_keypair = Arc::new(voting_keypair); let voting_keypair = Arc::new(voting_keypair);
let validator_server = Validator::new( let validator_server = Validator::new(
@ -658,6 +666,8 @@ impl Cluster for LocalCluster {
// Update the stored ContactInfo for this node // Update the stored ContactInfo for this node
let node = Node::new_localhost_with_pubkey(&pubkey); let node = Node::new_localhost_with_pubkey(&pubkey);
cluster_validator_info.info.contact_info = node.info.clone(); cluster_validator_info.info.contact_info = node.info.clone();
cluster_validator_info.config.rpc_ports =
Some((node.info.rpc.port(), node.info.rpc_pubsub.port()));
cluster_validator_info cluster_validator_info
.config .config
.transaction_status_service_disabled = true; .transaction_status_service_disabled = true;

View File

@ -496,6 +496,12 @@ pub fn main() {
.validator(port_validator) .validator(port_validator)
.help("RPC port to use for this node"), .help("RPC port to use for this node"),
) )
.arg(
Arg::with_name("private_rpc")
.long("--private-rpc")
.takes_value(false)
.help("Do not publish the RPC port for use by other nodes")
)
.arg( .arg(
Arg::with_name("enable_rpc_exit") Arg::with_name("enable_rpc_exit")
.long("enable-rpc-exit") .long("enable-rpc-exit")
@ -657,7 +663,7 @@ pub fn main() {
let cuda = matches.is_present("cuda"); let cuda = matches.is_present("cuda");
let no_genesis_fetch = matches.is_present("no_genesis_fetch"); let no_genesis_fetch = matches.is_present("no_genesis_fetch");
let no_snapshot_fetch = matches.is_present("no_snapshot_fetch"); let no_snapshot_fetch = matches.is_present("no_snapshot_fetch");
let rpc_port = value_t!(matches, "rpc_port", u16); let private_rpc = matches.is_present("private_rpc");
// Canonicalize ledger path to avoid issues with symlink creation // Canonicalize ledger path to avoid issues with symlink creation
let _ = fs::create_dir_all(&ledger_path); let _ = fs::create_dir_all(&ledger_path);
@ -683,6 +689,9 @@ pub fn main() {
solana_net_utils::parse_host_port(address).expect("failed to parse faucet address") solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")
}), }),
}, },
rpc_ports: value_t!(matches, "rpc_port", u16)
.ok()
.map(|rpc_port| (rpc_port, rpc_port + 1)),
voting_disabled: matches.is_present("no_voting"), voting_disabled: matches.is_present("no_voting"),
wait_for_supermajority: !matches.is_present("no_wait_for_supermajority"), wait_for_supermajority: !matches.is_present("no_wait_for_supermajority"),
..ValidatorConfig::default() ..ValidatorConfig::default()
@ -852,12 +861,14 @@ pub fn main() {
let mut tcp_ports = vec![]; let mut tcp_ports = vec![];
let mut node = let mut node =
Node::new_with_external_ip(&identity_keypair.pubkey(), &gossip_addr, dynamic_port_range); Node::new_with_external_ip(&identity_keypair.pubkey(), &gossip_addr, dynamic_port_range);
if let Ok(rpc_port) = rpc_port {
let rpc_pubsub_port = rpc_port + 1; if !private_rpc {
node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc_port); if let Some((rpc_port, rpc_pubsub_port)) = validator_config.rpc_ports {
node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), rpc_pubsub_port); node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc_port);
tcp_ports = vec![rpc_port, rpc_pubsub_port]; node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), rpc_pubsub_port);
}; tcp_ports = vec![rpc_port, rpc_pubsub_port];
}
}
if let Some(ref cluster_entrypoint) = cluster_entrypoint { if let Some(ref cluster_entrypoint) = cluster_entrypoint {
let udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair]; let udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];