Revert "move rpc_server to drop() semantics instead of having its own thread"

This reverts commit 40aa0654fa.
This commit is contained in:
Rob Walker
2018-09-10 22:41:44 -07:00
parent 016ee36808
commit b313b7f6f9
2 changed files with 52 additions and 40 deletions

View File

@ -24,7 +24,6 @@ use window;
pub struct Fullnode { pub struct Fullnode {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
_rpc_service: JsonRpcService,
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@ -191,8 +190,14 @@ impl Fullnode {
let mut drone_addr = node.info.contact_info.tpu; let mut drone_addr = node.info.contact_info.tpu;
drone_addr.set_port(DRONE_PORT); drone_addr.set_port(DRONE_PORT);
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), RPC_PORT); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), RPC_PORT);
let _rpc_service = let rpc_service = JsonRpcService::new(
JsonRpcService::new(&bank, node.info.contact_info.tpu, drone_addr, rpc_addr); &bank,
node.info.contact_info.tpu,
drone_addr,
rpc_addr,
exit.clone(),
);
thread_hdls.extend(rpc_service.thread_hdls());
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let window = let window =
@ -261,11 +266,7 @@ impl Fullnode {
} }
} }
Fullnode { Fullnode { exit, thread_hdls }
exit,
thread_hdls,
_rpc_service,
}
} }
//used for notifying many nodes in parallel to exit //used for notifying many nodes in parallel to exit

View File

@ -5,11 +5,13 @@ use bincode::deserialize;
use bs58; use bs58;
use jsonrpc_core::*; use jsonrpc_core::*;
use jsonrpc_http_server::*; use jsonrpc_http_server::*;
use service::Service;
use signature::{Pubkey, Signature}; use signature::{Pubkey, Signature};
use std::mem; use std::mem;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::thread::sleep; use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use transaction::Transaction; use transaction::Transaction;
@ -18,7 +20,7 @@ use wallet::request_airdrop;
pub const RPC_PORT: u16 = 8899; pub const RPC_PORT: u16 = 8899;
pub struct JsonRpcService { pub struct JsonRpcService {
server: Option<Server>, thread_hdl: JoinHandle<()>,
} }
impl JsonRpcService { impl JsonRpcService {
@ -27,42 +29,51 @@ impl JsonRpcService {
transactions_addr: SocketAddr, transactions_addr: SocketAddr,
drone_addr: SocketAddr, drone_addr: SocketAddr,
rpc_addr: SocketAddr, rpc_addr: SocketAddr,
exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
let request_processor = JsonRpcRequestProcessor::new(bank.clone()); let request_processor = JsonRpcRequestProcessor::new(bank.clone());
let mut io = MetaIoHandler::default(); let thread_hdl = Builder::new()
let rpc = RpcSolImpl; .name("solana-jsonrpc".to_string())
io.extend_with(rpc.to_delegate()); .spawn(move || {
let mut io = MetaIoHandler::default();
let rpc = RpcSolImpl;
io.extend_with(rpc.to_delegate());
let server = ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta { let server =
request_processor: request_processor.clone(), ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request| Meta {
transactions_addr, request_processor: request_processor.clone(),
drone_addr, transactions_addr,
}).threads(4) drone_addr,
.cors(DomainsValidation::AllowOnly(vec![ }).threads(4)
AccessControlAllowOrigin::Any, .cors(DomainsValidation::AllowOnly(vec![
])) AccessControlAllowOrigin::Any,
.start_http(&rpc_addr); ]))
.start_http(&rpc_addr);
if server.is_err() {
warn!("JSON RPC service unavailable: unable to bind to RPC port {}. \nMake sure this port is not already in use by another application", rpc_addr.port());
return;
}
loop {
if exit.load(Ordering::Relaxed) {
server.unwrap().close();
break;
}
sleep(Duration::from_millis(100));
}
()
})
.unwrap();
JsonRpcService { thread_hdl }
}
}
let server = match server { impl Service for JsonRpcService {
Ok(server) => Some(server), fn thread_hdls(self) -> Vec<JoinHandle<()>> {
Err(e) => { vec![self.thread_hdl]
warn!(
"JSON RPC service unavailable: error {}. \n\
Make sure the RPC port {} is not already in use by another application",
e,
rpc_addr.port()
);
None
}
};
JsonRpcService { server }
} }
pub fn close(self) { fn join(self) -> thread::Result<()> {
if let Some(server) = self.server { self.thread_hdl.join()
server.close();
}
} }
} }