RPC port is no longer reset on leader-to-validator transition
This commit is contained in:
@ -102,9 +102,7 @@ pub struct Fullnode {
|
|||||||
retransmit_socket: UdpSocket,
|
retransmit_socket: UdpSocket,
|
||||||
tpu_sockets: Vec<UdpSocket>,
|
tpu_sockets: Vec<UdpSocket>,
|
||||||
broadcast_socket: UdpSocket,
|
broadcast_socket: UdpSocket,
|
||||||
rpc_addr: SocketAddr,
|
|
||||||
rpc_pubsub_addr: SocketAddr,
|
rpc_pubsub_addr: SocketAddr,
|
||||||
drone_addr: SocketAddr,
|
|
||||||
db_ledger: Arc<DbLedger>,
|
db_ledger: Arc<DbLedger>,
|
||||||
vote_signer: Arc<VoteSignerProxy>,
|
vote_signer: Arc<VoteSignerProxy>,
|
||||||
}
|
}
|
||||||
@ -215,8 +213,22 @@ impl Fullnode {
|
|||||||
};
|
};
|
||||||
drone_addr.set_port(solana_drone::drone::DRONE_PORT);
|
drone_addr.set_port(solana_drone::drone::DRONE_PORT);
|
||||||
|
|
||||||
let (rpc_service, rpc_pubsub_service) =
|
// TODO: The RPC service assumes that there is a drone running on the leader
|
||||||
Self::startup_rpc_services(rpc_addr, rpc_pubsub_addr, drone_addr, &bank, &cluster_info);
|
// See https://github.com/solana-labs/solana/issues/1830
|
||||||
|
let rpc_service = JsonRpcService::new(
|
||||||
|
&bank,
|
||||||
|
&cluster_info,
|
||||||
|
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_addr.port()),
|
||||||
|
drone_addr,
|
||||||
|
);
|
||||||
|
|
||||||
|
let rpc_pubsub_service = PubSubService::new(
|
||||||
|
&bank,
|
||||||
|
SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||||
|
rpc_pubsub_addr.port(),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
let gossip_service = GossipService::new(
|
let gossip_service = GossipService::new(
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
@ -338,21 +350,16 @@ impl Fullnode {
|
|||||||
retransmit_socket: node.sockets.retransmit,
|
retransmit_socket: node.sockets.retransmit,
|
||||||
tpu_sockets: node.sockets.tpu,
|
tpu_sockets: node.sockets.tpu,
|
||||||
broadcast_socket: node.sockets.broadcast,
|
broadcast_socket: node.sockets.broadcast,
|
||||||
rpc_addr,
|
|
||||||
rpc_pubsub_addr,
|
rpc_pubsub_addr,
|
||||||
drone_addr,
|
|
||||||
db_ledger,
|
db_ledger,
|
||||||
vote_signer,
|
vote_signer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn leader_to_validator(&mut self) -> Result<()> {
|
fn leader_to_validator(&mut self) -> Result<()> {
|
||||||
// Close down any services that could have a reference to the bank
|
trace!("leader_to_validator");
|
||||||
if self.rpc_service.is_some() {
|
|
||||||
let old_rpc_service = self.rpc_service.take().unwrap();
|
|
||||||
old_rpc_service.close()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Close down any services that could have a reference to the bank
|
||||||
if self.rpc_pubsub_service.is_some() {
|
if self.rpc_pubsub_service.is_some() {
|
||||||
let old_rpc_pubsub_service = self.rpc_pubsub_service.take().unwrap();
|
let old_rpc_pubsub_service = self.rpc_pubsub_service.take().unwrap();
|
||||||
old_rpc_pubsub_service.close()?;
|
old_rpc_pubsub_service.close()?;
|
||||||
@ -386,17 +393,21 @@ impl Fullnode {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
.set_leader(scheduled_leader);
|
.set_leader(scheduled_leader);
|
||||||
|
|
||||||
// Spin up new versions of all the services that relied on the bank, passing in the
|
//
|
||||||
// new bank
|
if let Some(ref mut rpc_service) = self.rpc_service {
|
||||||
let (rpc_service, rpc_pubsub_service) = Self::startup_rpc_services(
|
rpc_service.set_bank(&new_bank);
|
||||||
self.rpc_addr,
|
}
|
||||||
self.rpc_pubsub_addr,
|
|
||||||
self.drone_addr,
|
// TODO: Don't restart PubSubService on leader rotation
|
||||||
|
// See https://github.com/solana-labs/solana/issues/2419
|
||||||
|
self.rpc_pubsub_service = Some(PubSubService::new(
|
||||||
&new_bank,
|
&new_bank,
|
||||||
&self.cluster_info,
|
SocketAddr::new(
|
||||||
);
|
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||||
self.rpc_service = Some(rpc_service);
|
self.rpc_pubsub_addr.port(),
|
||||||
self.rpc_pubsub_service = Some(rpc_pubsub_service);
|
),
|
||||||
|
));
|
||||||
|
|
||||||
self.bank = new_bank;
|
self.bank = new_bank;
|
||||||
|
|
||||||
// In the rare case that the leader exited on a multiple of seed_rotation_interval
|
// In the rare case that the leader exited on a multiple of seed_rotation_interval
|
||||||
@ -448,6 +459,7 @@ impl Fullnode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_id: Hash) {
|
fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_id: Hash) {
|
||||||
|
trace!("validator_to_leader");
|
||||||
self.cluster_info
|
self.cluster_info
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@ -571,31 +583,6 @@ impl Fullnode {
|
|||||||
&self.bank.leader_scheduler
|
&self.bank.leader_scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
fn startup_rpc_services(
|
|
||||||
rpc_addr: SocketAddr,
|
|
||||||
rpc_pubsub_addr: SocketAddr,
|
|
||||||
drone_addr: SocketAddr,
|
|
||||||
bank: &Arc<Bank>,
|
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
|
||||||
) -> (JsonRpcService, PubSubService) {
|
|
||||||
let rpc_port = rpc_addr.port();
|
|
||||||
let rpc_pubsub_port = rpc_pubsub_addr.port();
|
|
||||||
// TODO: The RPC service assumes that there is a drone running on the leader
|
|
||||||
// Drone location/id will need to be handled a different way as soon as leader rotation begins
|
|
||||||
(
|
|
||||||
JsonRpcService::new(
|
|
||||||
bank,
|
|
||||||
cluster_info,
|
|
||||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_port),
|
|
||||||
drone_addr,
|
|
||||||
),
|
|
||||||
PubSubService::new(
|
|
||||||
bank,
|
|
||||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), rpc_pubsub_port),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_db_ledger(ledger_path: &str) -> Arc<DbLedger> {
|
fn make_db_ledger(ledger_path: &str) -> Arc<DbLedger> {
|
||||||
Arc::new(
|
Arc::new(
|
||||||
DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"),
|
DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"),
|
||||||
|
73
src/rpc.rs
73
src/rpc.rs
@ -28,6 +28,7 @@ pub const RPC_PORT: u16 = 8899;
|
|||||||
pub struct JsonRpcService {
|
pub struct JsonRpcService {
|
||||||
thread_hdl: JoinHandle<()>,
|
thread_hdl: JoinHandle<()>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
|
request_processor: Arc<RwLock<JsonRpcRequestProcessor>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JsonRpcService {
|
impl JsonRpcService {
|
||||||
@ -37,10 +38,15 @@ impl JsonRpcService {
|
|||||||
rpc_addr: SocketAddr,
|
rpc_addr: SocketAddr,
|
||||||
drone_addr: SocketAddr,
|
drone_addr: SocketAddr,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
info!("rpc bound to {:?}", rpc_addr);
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let request_processor = JsonRpcRequestProcessor::new(bank.clone());
|
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(bank.clone())));
|
||||||
|
request_processor.write().unwrap().bank = bank.clone();
|
||||||
|
let request_processor_ = request_processor.clone();
|
||||||
|
|
||||||
let info = cluster_info.clone();
|
let info = cluster_info.clone();
|
||||||
let exit_ = exit.clone();
|
let exit_ = exit.clone();
|
||||||
|
|
||||||
let thread_hdl = Builder::new()
|
let thread_hdl = Builder::new()
|
||||||
.name("solana-jsonrpc".to_string())
|
.name("solana-jsonrpc".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
@ -50,7 +56,7 @@ impl JsonRpcService {
|
|||||||
|
|
||||||
let server =
|
let server =
|
||||||
ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request<hyper::Body>| Meta {
|
ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request<hyper::Body>| Meta {
|
||||||
request_processor: request_processor.clone(),
|
request_processor: request_processor_.clone(),
|
||||||
cluster_info: info.clone(),
|
cluster_info: info.clone(),
|
||||||
drone_addr,
|
drone_addr,
|
||||||
rpc_addr,
|
rpc_addr,
|
||||||
@ -69,7 +75,15 @@ impl JsonRpcService {
|
|||||||
server.unwrap().close();
|
server.unwrap().close();
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
Self { thread_hdl, exit }
|
Self {
|
||||||
|
thread_hdl,
|
||||||
|
exit,
|
||||||
|
request_processor,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_bank(&mut self, bank: &Arc<Bank>) {
|
||||||
|
self.request_processor.write().unwrap().bank = bank.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn exit(&self) {
|
pub fn exit(&self) {
|
||||||
@ -92,7 +106,7 @@ impl Service for JsonRpcService {
|
|||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Meta {
|
pub struct Meta {
|
||||||
pub request_processor: JsonRpcRequestProcessor,
|
pub request_processor: Arc<RwLock<JsonRpcRequestProcessor>>,
|
||||||
pub cluster_info: Arc<RwLock<ClusterInfo>>,
|
pub cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||||
pub rpc_addr: SocketAddr,
|
pub rpc_addr: SocketAddr,
|
||||||
pub drone_addr: SocketAddr,
|
pub drone_addr: SocketAddr,
|
||||||
@ -177,25 +191,35 @@ impl RpcSol for RpcSolImpl {
|
|||||||
fn get_account_info(&self, meta: Self::Metadata, id: String) -> Result<Account> {
|
fn get_account_info(&self, meta: Self::Metadata, id: String) -> Result<Account> {
|
||||||
info!("get_account_info rpc request received: {:?}", id);
|
info!("get_account_info rpc request received: {:?}", id);
|
||||||
let pubkey = verify_pubkey(id)?;
|
let pubkey = verify_pubkey(id)?;
|
||||||
meta.request_processor.get_account_info(pubkey)
|
meta.request_processor
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.get_account_info(pubkey)
|
||||||
}
|
}
|
||||||
fn get_balance(&self, meta: Self::Metadata, id: String) -> Result<u64> {
|
fn get_balance(&self, meta: Self::Metadata, id: String) -> Result<u64> {
|
||||||
info!("get_balance rpc request received: {:?}", id);
|
info!("get_balance rpc request received: {:?}", id);
|
||||||
let pubkey = verify_pubkey(id)?;
|
let pubkey = verify_pubkey(id)?;
|
||||||
meta.request_processor.get_balance(pubkey)
|
meta.request_processor.read().unwrap().get_balance(pubkey)
|
||||||
}
|
}
|
||||||
fn get_confirmation_time(&self, meta: Self::Metadata) -> Result<usize> {
|
fn get_confirmation_time(&self, meta: Self::Metadata) -> Result<usize> {
|
||||||
info!("get_confirmation_time rpc request received");
|
info!("get_confirmation_time rpc request received");
|
||||||
meta.request_processor.get_confirmation_time()
|
meta.request_processor
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.get_confirmation_time()
|
||||||
}
|
}
|
||||||
fn get_last_id(&self, meta: Self::Metadata) -> Result<String> {
|
fn get_last_id(&self, meta: Self::Metadata) -> Result<String> {
|
||||||
info!("get_last_id rpc request received");
|
info!("get_last_id rpc request received");
|
||||||
meta.request_processor.get_last_id()
|
meta.request_processor.read().unwrap().get_last_id()
|
||||||
}
|
}
|
||||||
fn get_signature_status(&self, meta: Self::Metadata, id: String) -> Result<RpcSignatureStatus> {
|
fn get_signature_status(&self, meta: Self::Metadata, id: String) -> Result<RpcSignatureStatus> {
|
||||||
info!("get_signature_status rpc request received: {:?}", id);
|
info!("get_signature_status rpc request received: {:?}", id);
|
||||||
let signature = verify_signature(&id)?;
|
let signature = verify_signature(&id)?;
|
||||||
let res = meta.request_processor.get_signature_status(signature);
|
let res = meta
|
||||||
|
.request_processor
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.get_signature_status(signature);
|
||||||
if res.is_none() {
|
if res.is_none() {
|
||||||
return Ok(RpcSignatureStatus::SignatureNotFound);
|
return Ok(RpcSignatureStatus::SignatureNotFound);
|
||||||
}
|
}
|
||||||
@ -216,17 +240,21 @@ impl RpcSol for RpcSolImpl {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
info!("get_signature_status rpc request status: {:?}", status);
|
||||||
Ok(status)
|
Ok(status)
|
||||||
}
|
}
|
||||||
fn get_transaction_count(&self, meta: Self::Metadata) -> Result<u64> {
|
fn get_transaction_count(&self, meta: Self::Metadata) -> Result<u64> {
|
||||||
info!("get_transaction_count rpc request received");
|
info!("get_transaction_count rpc request received");
|
||||||
meta.request_processor.get_transaction_count()
|
meta.request_processor
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.get_transaction_count()
|
||||||
}
|
}
|
||||||
fn request_airdrop(&self, meta: Self::Metadata, id: String, tokens: u64) -> Result<String> {
|
fn request_airdrop(&self, meta: Self::Metadata, id: String, tokens: u64) -> Result<String> {
|
||||||
trace!("request_airdrop id={} tokens={}", id, tokens);
|
trace!("request_airdrop id={} tokens={}", id, tokens);
|
||||||
let pubkey = verify_pubkey(id)?;
|
let pubkey = verify_pubkey(id)?;
|
||||||
|
|
||||||
let last_id = meta.request_processor.bank.last_id();
|
let last_id = meta.request_processor.read().unwrap().bank.last_id();
|
||||||
let transaction = request_airdrop_transaction(&meta.drone_addr, &pubkey, tokens, last_id)
|
let transaction = request_airdrop_transaction(&meta.drone_addr, &pubkey, tokens, last_id)
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
info!("request_airdrop_transaction failed: {:?}", err);
|
info!("request_airdrop_transaction failed: {:?}", err);
|
||||||
@ -251,7 +279,11 @@ impl RpcSol for RpcSolImpl {
|
|||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let mut signature_status;
|
let mut signature_status;
|
||||||
loop {
|
loop {
|
||||||
signature_status = meta.request_processor.get_signature_status(signature);
|
signature_status = meta
|
||||||
|
.request_processor
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.get_signature_status(signature);
|
||||||
|
|
||||||
if signature_status == Some(Status::Complete(Ok(()))) {
|
if signature_status == Some(Status::Complete(Ok(()))) {
|
||||||
info!("airdrop signature ok");
|
info!("airdrop signature ok");
|
||||||
@ -278,6 +310,7 @@ impl RpcSol for RpcSolImpl {
|
|||||||
}
|
}
|
||||||
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let transactions_addr = get_leader_addr(&meta.cluster_info)?;
|
let transactions_addr = get_leader_addr(&meta.cluster_info)?;
|
||||||
|
trace!("send_transaction: leader is {:?}", &transactions_addr);
|
||||||
transactions_socket
|
transactions_socket
|
||||||
.send_to(&data, transactions_addr)
|
.send_to(&data, transactions_addr)
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
@ -293,10 +326,16 @@ impl RpcSol for RpcSolImpl {
|
|||||||
Ok(signature)
|
Ok(signature)
|
||||||
}
|
}
|
||||||
fn get_storage_mining_last_id(&self, meta: Self::Metadata) -> Result<String> {
|
fn get_storage_mining_last_id(&self, meta: Self::Metadata) -> Result<String> {
|
||||||
meta.request_processor.get_storage_mining_last_id()
|
meta.request_processor
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.get_storage_mining_last_id()
|
||||||
}
|
}
|
||||||
fn get_storage_mining_entry_height(&self, meta: Self::Metadata) -> Result<u64> {
|
fn get_storage_mining_entry_height(&self, meta: Self::Metadata) -> Result<u64> {
|
||||||
meta.request_processor.get_storage_mining_entry_height()
|
meta.request_processor
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.get_storage_mining_entry_height()
|
||||||
}
|
}
|
||||||
fn get_storage_pubkeys_for_entry_height(
|
fn get_storage_pubkeys_for_entry_height(
|
||||||
&self,
|
&self,
|
||||||
@ -304,6 +343,8 @@ impl RpcSol for RpcSolImpl {
|
|||||||
entry_height: u64,
|
entry_height: u64,
|
||||||
) -> Result<Vec<Pubkey>> {
|
) -> Result<Vec<Pubkey>> {
|
||||||
meta.request_processor
|
meta.request_processor
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
.get_storage_pubkeys_for_entry_height(entry_height)
|
.get_storage_pubkeys_for_entry_height(entry_height)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -428,7 +469,7 @@ mod tests {
|
|||||||
let tx = Transaction::system_move(&alice.keypair(), pubkey, 20, last_id, 0);
|
let tx = Transaction::system_move(&alice.keypair(), pubkey, 20, last_id, 0);
|
||||||
bank.process_transaction(&tx).expect("process transaction");
|
bank.process_transaction(&tx).expect("process transaction");
|
||||||
|
|
||||||
let request_processor = JsonRpcRequestProcessor::new(Arc::new(bank));
|
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(Arc::new(bank))));
|
||||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
|
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
|
||||||
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
|
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
|
||||||
|
|
||||||
@ -752,7 +793,7 @@ mod tests {
|
|||||||
let rpc = RpcSolImpl;
|
let rpc = RpcSolImpl;
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
let meta = Meta {
|
let meta = Meta {
|
||||||
request_processor: JsonRpcRequestProcessor::new(Arc::new(bank)),
|
request_processor: Arc::new(RwLock::new(JsonRpcRequestProcessor::new(Arc::new(bank)))),
|
||||||
cluster_info: Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))),
|
cluster_info: Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))),
|
||||||
drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
drone_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
||||||
rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
rpc_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
|
||||||
|
@ -41,6 +41,7 @@ impl Service for PubSubService {
|
|||||||
|
|
||||||
impl PubSubService {
|
impl PubSubService {
|
||||||
pub fn new(bank: &Arc<Bank>, pubsub_addr: SocketAddr) -> Self {
|
pub fn new(bank: &Arc<Bank>, pubsub_addr: SocketAddr) -> Self {
|
||||||
|
info!("rpc_pubsub bound to {:?}", pubsub_addr);
|
||||||
let rpc = RpcSolPubSubImpl::new(bank.clone());
|
let rpc = RpcSolPubSubImpl::new(bank.clone());
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let exit_ = exit.clone();
|
let exit_ = exit.clone();
|
||||||
|
Reference in New Issue
Block a user