From f683817b48167d403dfe5ce779793ba413d9ce88 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 5 Nov 2018 10:50:58 -0700 Subject: [PATCH] Remove RPU; replace with RPC --- src/bin/bench-tps.rs | 4 +- src/bin/fullnode.rs | 20 ++- src/client.rs | 13 +- src/cluster_info.rs | 89 +++++++------ src/drone.rs | 15 +-- src/fullnode.rs | 113 ++++++---------- src/lib.rs | 4 - src/packet.rs | 9 +- src/replicator.rs | 1 + src/request.rs | 43 ------ src/request_processor.rs | 66 ---------- src/request_stage.rs | 119 ----------------- src/rpc.rs | 6 +- src/rpu.rs | 88 ------------- src/thin_client.rs | 275 +++++++++++++-------------------------- src/tvu.rs | 2 +- src/wallet.rs | 14 +- tests/multinode.rs | 30 +++-- 18 files changed, 245 insertions(+), 666 deletions(-) delete mode 100644 src/request.rs delete mode 100644 src/request_processor.rs delete mode 100644 src/request_stage.rs delete mode 100644 src/rpu.rs diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index 70a6ccb3bf..a5b32fe343 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -638,7 +638,7 @@ fn main() { let leader = leader.unwrap(); - println!("leader is at {} {}", leader.contact_info.rpu, leader.id); + println!("leader RPC is at {} {}", leader.contact_info.rpc, leader.id); let mut client = mk_client(&leader); let mut barrier_client = mk_client(&leader); @@ -821,7 +821,7 @@ fn converge( v = spy_ref .table .values() - .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu)) + .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpc)) .cloned() .collect(); diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 19689d6aee..f5f1135041 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -52,7 +52,13 @@ fn main() { .value_name("DIR") .takes_value(true) .required(true) - .help("use DIR as persistent ledger location"), + .help("Use DIR as persistent ledger location"), + ).arg( + Arg::with_name("rpc") + .long("rpc") + .value_name("PORT") + .takes_value(true) + .help("Custom RPC port for this node"), ).get_matches(); let (keypair, ncp) = if let Some(i) = matches.value_of("identity") { @@ -102,6 +108,17 @@ fn main() { // Remove this line to enable leader rotation leader_scheduler.use_only_bootstrap_leader = true; + let rpc_port = if let Some(port) = matches.value_of("rpc") { + let port_number = port.to_string().parse().expect("integer"); + if port_number == 0 { + eprintln!("Invalid RPC port requested: {:?}", port); + exit(1); + } + Some(port_number) + } else { + None + }; + let mut fullnode = Fullnode::new( node, ledger_path, @@ -110,6 +127,7 @@ fn main() { network, false, leader_scheduler, + rpc_port, ); let mut client = mk_client(&leader); diff --git a/src/client.rs b/src/client.rs index d4df59d804..2bec75471d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,20 +1,9 @@ use cluster_info::{NodeInfo, FULLNODE_PORT_RANGE}; use netutil::bind_in_range; -use std::time::Duration; use thin_client::ThinClient; pub fn mk_client(r: &NodeInfo) -> ThinClient { - let (_, requests_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); let (_, transactions_socket) = bind_in_range(FULLNODE_PORT_RANGE).unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(1, 0))) - .unwrap(); - - ThinClient::new( - r.contact_info.rpu, - requests_socket, - r.contact_info.tpu, - transactions_socket, - ) + ThinClient::new(r.contact_info.rpc, r.contact_info.tpu, transactions_socket) } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index a679b1494a..60a2e2f07e 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -19,11 +19,12 @@ use hash::Hash; use leader_scheduler::LeaderScheduler; use ledger::LedgerWindow; use log::Level; -use netutil::{bind_in_range, bind_to, multi_bind_in_range}; +use netutil::{bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range}; use packet::{to_blob, Blob, SharedBlob, BLOB_SIZE}; use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::{Error, Result}; +use rpc::RPC_PORT; use signature::{Keypair, KeypairUtil}; use solana_sdk::pubkey::Pubkey; use std; @@ -79,12 +80,14 @@ pub struct ContactInfo { pub ncp: SocketAddr, /// address to connect to for replication pub tvu: SocketAddr, - /// address to connect to when this node is leader - pub rpu: SocketAddr, /// transactions address pub tpu: SocketAddr, /// storage data address pub storage_addr: SocketAddr, + /// address to which to send JSON-RPC requests + pub rpc: SocketAddr, + /// websocket for JSON-RPC push notifications + pub rpc_pubsub: SocketAddr, /// if this struture changes update this value as well /// Always update `NodeInfo` version too /// This separate version for addresses allows us to use the `Vote` @@ -115,9 +118,10 @@ impl NodeInfo { id: Pubkey, ncp: SocketAddr, tvu: SocketAddr, - rpu: SocketAddr, tpu: SocketAddr, storage_addr: SocketAddr, + rpc: SocketAddr, + rpc_pubsub: SocketAddr, ) -> Self { NodeInfo { id, @@ -125,9 +129,10 @@ impl NodeInfo { contact_info: ContactInfo { ncp, tvu, - rpu, tpu, storage_addr, + rpc, + rpc_pubsub, version: 0, }, leader_id: Pubkey::default(), @@ -142,6 +147,7 @@ impl NodeInfo { socketaddr!("127.0.0.1:1236"), socketaddr!("127.0.0.1:1237"), socketaddr!("127.0.0.1:1238"), + socketaddr!("127.0.0.1:1239"), ) } @@ -150,14 +156,14 @@ impl NodeInfo { pub fn new_unspecified() -> Self { let addr = socketaddr!(0, 0); assert!(addr.ip().is_unspecified()); - Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr) + Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr, addr) } #[cfg(test)] /// NodeInfo with multicast addresses for adversarial testing. pub fn new_multicast() -> Self { let addr = socketaddr!("224.0.1.255:1000"); assert!(addr.ip().is_multicast()); - Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr) + Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr, addr) } fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { let mut nxt_addr = *addr; @@ -168,14 +174,16 @@ impl NodeInfo { let transactions_addr = *bind_addr; let gossip_addr = Self::next_port(&bind_addr, 1); let replicate_addr = Self::next_port(&bind_addr, 2); - let requests_addr = Self::next_port(&bind_addr, 3); + let rpc_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT); + let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT + 1); NodeInfo::new( pubkey, gossip_addr, replicate_addr, - requests_addr, transactions_addr, "0.0.0.0:0".parse().unwrap(), + rpc_addr, + rpc_pubsub_addr, ) } pub fn new_with_socketaddr(bind_addr: &SocketAddr) -> Self { @@ -185,7 +193,15 @@ impl NodeInfo { // pub fn new_entry_point(gossip_addr: &SocketAddr) -> Self { let daddr: SocketAddr = socketaddr!("0.0.0.0:0"); - NodeInfo::new(Pubkey::default(), *gossip_addr, daddr, daddr, daddr, daddr) + NodeInfo::new( + Pubkey::default(), + *gossip_addr, + daddr, + daddr, + daddr, + daddr, + daddr, + ) } } @@ -281,13 +297,13 @@ impl ClusterInfo { let nodes: Vec<_> = self .table .values() - .filter(|n| Self::is_valid_address(&n.contact_info.rpu)) + .filter(|n| Self::is_valid_address(&n.contact_info.rpc)) .cloned() .map(|node| { format!( " ncp: {:20} | {}{}\n \ - rpu: {:20} |\n \ - tpu: {:20} |\n", + tpu: {:20} |\n \ + rpc: {:20} |\n", node.contact_info.ncp.to_string(), node.id, if node.id == leader_id { @@ -295,8 +311,8 @@ impl ClusterInfo { } else { "" }, - node.contact_info.rpu.to_string(), - node.contact_info.tpu.to_string() + node.contact_info.tpu.to_string(), + node.contact_info.rpc.to_string() ) }).collect(); @@ -323,7 +339,7 @@ impl ClusterInfo { self.table .values() .filter(|x| x.id != me) - .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu)) + .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpc)) .cloned() .collect() } @@ -1187,7 +1203,7 @@ impl ClusterInfo { let pubkey = Keypair::new().pubkey(); let daddr = socketaddr_any!(); - let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr, daddr); + let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr, daddr, daddr); (node, gossip_socket) } } @@ -1195,10 +1211,8 @@ impl ClusterInfo { #[derive(Debug)] pub struct Sockets { pub gossip: UdpSocket, - pub requests: UdpSocket, pub replicate: Vec, pub transaction: Vec, - pub respond: UdpSocket, pub broadcast: UdpSocket, pub repair: UdpSocket, pub retransmit: UdpSocket, @@ -1219,10 +1233,13 @@ impl Node { let transaction = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); - let requests = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); + let rpc_port = find_available_port_in_range((1024, 65535)).unwrap(); + let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port); + let rpc_pubsub_port = find_available_port_in_range((1024, 65535)).unwrap(); + let rpc_pubsub_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_pubsub_port); - let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); let storage = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -1230,18 +1247,17 @@ impl Node { pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), - requests.local_addr().unwrap(), transaction.local_addr().unwrap(), storage.local_addr().unwrap(), + rpc_addr, + rpc_pubsub_addr, ); Node { info, sockets: Sockets { gossip, - requests, replicate: vec![replicate], transaction: vec![transaction], - respond, broadcast, repair, retransmit, @@ -1262,8 +1278,6 @@ impl Node { let (replicate_port, replicate_sockets) = multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tvu multi_bind"); - let (requests_port, requests) = bind(); - let (transaction_port, transaction_sockets) = multi_bind_in_range(FULLNODE_PORT_RANGE, 32).expect("tpu multi_bind"); @@ -1272,18 +1286,14 @@ impl Node { let (_, retransmit) = bind(); let (storage_port, _) = bind(); - // Responses are sent from the same Udp port as requests are received - // from, in hopes that a NAT sitting in the middle will route the - // response Udp packet correctly back to the requester. - let respond = requests.try_clone().unwrap(); - let info = NodeInfo::new( pubkey, SocketAddr::new(ncp.ip(), gossip_port), SocketAddr::new(ncp.ip(), replicate_port), - SocketAddr::new(ncp.ip(), requests_port), SocketAddr::new(ncp.ip(), transaction_port), SocketAddr::new(ncp.ip(), storage_port), + SocketAddr::new(ncp.ip(), RPC_PORT), + SocketAddr::new(ncp.ip(), RPC_PORT + 1), ); trace!("new NodeInfo: {:?}", info); @@ -1291,10 +1301,8 @@ impl Node { info, sockets: Sockets { gossip, - requests, replicate: replicate_sockets, transaction: transaction_sockets, - respond, broadcast, repair, retransmit, @@ -1374,8 +1382,9 @@ mod tests { assert_eq!(d1.id, keypair.pubkey()); assert_eq!(d1.contact_info.ncp, socketaddr!("127.0.0.1:1235")); assert_eq!(d1.contact_info.tvu, socketaddr!("127.0.0.1:1236")); - assert_eq!(d1.contact_info.rpu, socketaddr!("127.0.0.1:1237")); assert_eq!(d1.contact_info.tpu, socketaddr!("127.0.0.1:1234")); + assert_eq!(d1.contact_info.rpc, socketaddr!("127.0.0.1:8899")); + assert_eq!(d1.contact_info.rpc_pubsub, socketaddr!("127.0.0.1:8900")); } #[test] fn max_updates() { @@ -1480,6 +1489,7 @@ mod tests { socketaddr!([127, 0, 0, 1], 1236), socketaddr!([127, 0, 0, 1], 1237), socketaddr!([127, 0, 0, 1], 1238), + socketaddr!([127, 0, 0, 1], 1239), ); cluster_info.insert(&nxt); let rv = cluster_info.window_index_request(0).unwrap(); @@ -1494,6 +1504,7 @@ mod tests { socketaddr!([127, 0, 0, 1], 1236), socketaddr!([127, 0, 0, 1], 1237), socketaddr!([127, 0, 0, 1], 1238), + socketaddr!([127, 0, 0, 1], 1239), ); cluster_info.insert(&nxt); let mut one = false; @@ -1520,6 +1531,7 @@ mod tests { socketaddr!("127.0.0.1:127"), socketaddr!("127.0.0.1:127"), socketaddr!("127.0.0.1:127"), + socketaddr!("127.0.0.1:127"), ); let mut cluster_info = ClusterInfo::new(me).expect("ClusterInfo::new"); @@ -1665,6 +1677,7 @@ mod tests { socketaddr!("127.0.0.1:1236"), socketaddr!("127.0.0.1:1237"), socketaddr!("127.0.0.1:1238"), + socketaddr!("127.0.0.1:1239"), ); let rv = ClusterInfo::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0); @@ -1843,7 +1856,6 @@ mod tests { for tx_socket in node.sockets.replicate.iter() { assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); } - assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); assert!(node.sockets.transaction.len() > 1); for tx_socket in node.sockets.transaction.iter() { assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); @@ -1858,8 +1870,6 @@ mod tests { for tx_socket in node.sockets.replicate.iter() { assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); } - assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); - assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); assert!(tx_port >= FULLNODE_PORT_RANGE.0); assert!(tx_port < FULLNODE_PORT_RANGE.1); @@ -1879,7 +1889,6 @@ mod tests { for tx_socket in node.sockets.replicate.iter() { assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); } - assert_eq!(node.sockets.requests.local_addr().unwrap().ip(), ip); assert!(node.sockets.transaction.len() > 1); for tx_socket in node.sockets.transaction.iter() { assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); @@ -1893,8 +1902,6 @@ mod tests { for tx_socket in node.sockets.replicate.iter() { assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); } - assert!(node.sockets.requests.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); - assert!(node.sockets.requests.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); assert!(tx_port >= FULLNODE_PORT_RANGE.0); assert!(tx_port < FULLNODE_PORT_RANGE.1); diff --git a/src/drone.rs b/src/drone.rs index 4f74dd8375..25d64661c2 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -104,15 +104,13 @@ impl Drone { pub fn send_airdrop(&mut self, req: DroneRequest) -> Result { let request_amount: u64; - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let leader = poll_gossip_for_leader(self.network_addr, Some(10)) .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; let mut client = ThinClient::new( - leader.contact_info.rpu, - requests_socket, + leader.contact_info.rpc, leader.contact_info.tpu, transactions_socket, ); @@ -340,7 +338,7 @@ mod tests { None, &ledger_path, false, - Some(0), + None, ); let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket"); @@ -353,13 +351,11 @@ mod tests { Some(150_000), ); - let requests_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket"); let transactions_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket"); let mut client = ThinClient::new( - leader_data.contact_info.rpu, - requests_socket, + leader_data.contact_info.rpc, leader_data.contact_info.tpu, transactions_socket, ); @@ -385,15 +381,14 @@ mod tests { None, false, LeaderScheduler::from_bootstrap_leader(leader_data.id), + None, ); - let requests_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket"); let transactions_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket"); let mut client = ThinClient::new( - leader_data.contact_info.rpu, - requests_socket, + leader_data.contact_info.rpc, leader_data.contact_info.tpu, transactions_socket, ); diff --git a/src/fullnode.rs b/src/fullnode.rs index a189ff7916..1c730911b6 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -7,13 +7,12 @@ use hash::Hash; use leader_scheduler::LeaderScheduler; use ledger::read_ledger; use ncp::Ncp; -use rpc::{JsonRpcService, RPC_PORT}; +use rpc::JsonRpcService; use rpc_pubsub::PubSubService; -use rpu::Rpu; use service::Service; use signature::{Keypair, KeypairUtil}; +use std::net::SocketAddr; use std::net::UdpSocket; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::Result; @@ -87,7 +86,6 @@ pub struct Fullnode { keypair: Arc, vote_account_keypair: Arc, exit: Arc, - rpu: Option, rpc_service: Option, rpc_pubsub_service: Option, ncp: Ncp, @@ -101,9 +99,8 @@ pub struct Fullnode { retransmit_socket: UdpSocket, transaction_sockets: Vec, broadcast_socket: UdpSocket, - requests_socket: UdpSocket, - respond_socket: UdpSocket, - rpc_port: Option, + rpc_addr: SocketAddr, + rpc_pubsub_addr: SocketAddr, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -137,6 +134,7 @@ impl Fullnode { leader_addr: Option, sigverify_disabled: bool, leader_scheduler: LeaderScheduler, + rpc_port: Option, ) -> Self { let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); @@ -152,9 +150,11 @@ impl Fullnode { "starting... local gossip address: {} (advertising {})", local_gossip_addr, node.info.contact_info.ncp ); + let mut rpc_addr = node.info.contact_info.rpc; + if let Some(port) = rpc_port { + rpc_addr.set_port(port); + } - let local_requests_addr = node.sockets.requests.local_addr().unwrap(); - let requests_addr = node.info.contact_info.rpu; let leader_info = leader_addr.map(|i| NodeInfo::new_entry_point(&i)); let server = Self::new_with_bank( keypair, @@ -166,21 +166,18 @@ impl Fullnode { leader_info.as_ref(), ledger_path, sigverify_disabled, - None, + rpc_port, ); match leader_addr { Some(leader_addr) => { info!( - "validator ready... local request address: {} (advertising {}) connected to: {}", - local_requests_addr, requests_addr, leader_addr + "validator ready... rpc address: {}, connected to: {}", + rpc_addr, leader_addr ); } None => { - info!( - "leader ready... local request address: {} (advertising {})", - local_requests_addr, requests_addr - ); + info!("leader ready... rpc address: {}", rpc_addr); } } @@ -244,27 +241,27 @@ impl Fullnode { bank: Bank, entry_height: u64, last_id: &Hash, - node: Node, + mut node: Node, bootstrap_leader_info_option: Option<&NodeInfo>, ledger_path: &str, sigverify_disabled: bool, rpc_port: Option, ) -> Self { + let mut rpc_addr = node.info.contact_info.rpc; + let mut rpc_pubsub_addr = node.info.contact_info.rpc_pubsub; + // Use custom RPC port, if provided (`Some(port)`) + // RPC port may be any valid open port on the node + // If rpc_port == `None`, node will listen on the ports set in NodeInfo + if let Some(port) = rpc_port { + rpc_addr.set_port(port); + node.info.contact_info.rpc = rpc_addr; + rpc_pubsub_addr.set_port(port + 1); + node.info.contact_info.rpc_pubsub = rpc_pubsub_addr; + } + let exit = Arc::new(AtomicBool::new(false)); let bank = Arc::new(bank); - let rpu = Some(Rpu::new( - &bank, - node.sockets - .requests - .try_clone() - .expect("Failed to clone requests socket"), - node.sockets - .respond - .try_clone() - .expect("Failed to clone respond socket"), - )); - let window = new_window(32 * 1024); let shared_window = Arc::new(RwLock::new(window)); let cluster_info = Arc::new(RwLock::new( @@ -272,7 +269,7 @@ impl Fullnode { )); let (rpc_service, rpc_pubsub_service) = - Self::startup_rpc_services(rpc_port, &bank, &cluster_info); + Self::startup_rpc_services(rpc_addr, rpc_pubsub_addr, &bank, &cluster_info); let ncp = Ncp::new( &cluster_info, @@ -364,7 +361,6 @@ impl Fullnode { shared_window, bank, sigverify_disabled, - rpu, ncp, rpc_service: Some(rpc_service), rpc_pubsub_service: Some(rpc_pubsub_service), @@ -376,19 +372,13 @@ impl Fullnode { retransmit_socket: node.sockets.retransmit, transaction_sockets: node.sockets.transaction, broadcast_socket: node.sockets.broadcast, - requests_socket: node.sockets.requests, - respond_socket: node.sockets.respond, - rpc_port, + rpc_addr, + rpc_pubsub_addr, } } fn leader_to_validator(&mut self) -> Result<()> { // Close down any services that could have a reference to the bank - if self.rpu.is_some() { - let old_rpu = self.rpu.take().unwrap(); - old_rpu.close()?; - } - if self.rpc_service.is_some() { let old_rpc_service = self.rpc_service.take().unwrap(); old_rpc_service.close()?; @@ -429,18 +419,12 @@ impl Fullnode { // Spin up new versions of all the services that relied on the bank, passing in the // new bank - self.rpu = Some(Rpu::new( + let (rpc_service, rpc_pubsub_service) = Self::startup_rpc_services( + self.rpc_addr, + self.rpc_pubsub_addr, &new_bank, - self.requests_socket - .try_clone() - .expect("Failed to clone requests socket"), - self.respond_socket - .try_clone() - .expect("Failed to clone respond socket"), - )); - - let (rpc_service, rpc_pubsub_service) = - Self::startup_rpc_services(self.rpc_port, &new_bank, &self.cluster_info); + &self.cluster_info, + ); self.rpc_service = Some(rpc_service); self.rpc_pubsub_service = Some(rpc_pubsub_service); self.bank = new_bank; @@ -555,9 +539,6 @@ impl Fullnode { //used for notifying many nodes in parallel to exit pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); - if let Some(ref rpu) = self.rpu { - rpu.exit(); - } if let Some(ref rpc_service) = self.rpc_service { rpc_service.exit(); } @@ -599,22 +580,11 @@ impl Fullnode { } fn startup_rpc_services( - rpc_port: Option, + rpc_addr: SocketAddr, + rpc_pubsub_addr: SocketAddr, bank: &Arc, cluster_info: &Arc>, ) -> (JsonRpcService, PubSubService) { - // Use custom RPC port, if provided (`Some(port)`) - // RPC port may be any open port on the node - // If rpc_port == `None`, node will listen on the default RPC_PORT from Rpc module - // If rpc_port == `Some(0)`, node will dynamically choose any open port for both - // Rpc and RpcPubsub serivces. Useful for tests. - - let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), rpc_port.unwrap_or(RPC_PORT)); - let rpc_pubsub_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::from(0)), - rpc_port.map_or(RPC_PORT + 1, |port| if port == 0 { port } else { port + 1 }), - ); - // TODO: The RPC service assumes that there is a drone running on the leader // Drone location/id will need to be handled a different way as soon as leader rotation begins ( @@ -628,9 +598,6 @@ impl Service for Fullnode { type JoinReturnType = Option; fn join(self) -> Result> { - if let Some(rpu) = self.rpu { - rpu.join()?; - } if let Some(rpc_service) = self.rpc_service { rpc_service.join()?; } @@ -702,7 +669,7 @@ mod tests { Some(&entry), &validator_ledger_path, false, - Some(0), + None, ); v.close().unwrap(); remove_dir_all(validator_ledger_path).unwrap(); @@ -742,7 +709,7 @@ mod tests { Some(&entry), &validator_ledger_path, false, - Some(0), + None, ) }).collect(); @@ -810,6 +777,7 @@ mod tests { Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ); // Wait for the leader to transition, ticks should cause the leader to @@ -900,6 +868,7 @@ mod tests { Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ); match bootstrap_leader.node_role { @@ -918,6 +887,7 @@ mod tests { Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ); match validator.node_role { @@ -997,6 +967,7 @@ mod tests { Some(leader_ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ); // Send blobs to the validator from our mock leader diff --git a/src/lib.rs b/src/lib.rs index e2c9ff0f2f..5bde2c9291 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,15 +53,11 @@ pub mod poh_service; pub mod recvmmsg; pub mod replicate_stage; pub mod replicator; -pub mod request; -pub mod request_processor; -pub mod request_stage; pub mod result; pub mod retransmit_stage; pub mod rpc; pub mod rpc_pubsub; pub mod rpc_request; -pub mod rpu; pub mod service; pub mod signature; pub mod sigverify; diff --git a/src/packet.rs b/src/packet.rs index ea820c4d54..795ae0eac0 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -429,14 +429,17 @@ pub fn make_consecutive_blobs( #[cfg(test)] mod tests { + use hash::Hash; use packet::{ to_packets, Blob, Meta, Packet, Packets, SharedBlob, SharedPackets, NUM_PACKETS, PACKET_DATA_SIZE, }; - use request::Request; + use signature::{Keypair, KeypairUtil}; use std::io; use std::io::Write; use std::net::UdpSocket; + use system_transaction::SystemTransaction; + use transaction::Transaction; #[test] pub fn packet_send_recv() { @@ -460,7 +463,9 @@ mod tests { #[test] fn test_to_packets() { - let tx = Request::GetTransactionCount; + let keypair = Keypair::new(); + let hash = Hash::new(&[1; 32]); + let tx = Transaction::system_new(&keypair, keypair.pubkey(), 1, hash); let rv = to_packets(&vec![tx.clone(); 1]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].read().unwrap().packets.len(), 1); diff --git a/src/replicator.rs b/src/replicator.rs index 037816d9c7..93bd28c9a0 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -214,6 +214,7 @@ mod tests { None, false, LeaderScheduler::from_bootstrap_leader(leader_info.id), + None, ); let mut leader_client = mk_client(&leader_info); diff --git a/src/request.rs b/src/request.rs deleted file mode 100644 index 27dd579da7..0000000000 --- a/src/request.rs +++ /dev/null @@ -1,43 +0,0 @@ -//! The `request` module defines the messages for the thin client. - -use hash::Hash; -use signature::Signature; -use solana_sdk::account::Account; -use solana_sdk::pubkey::Pubkey; - -#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] -#[derive(Serialize, Deserialize, Debug, Clone, Copy)] -pub enum Request { - GetAccount { key: Pubkey }, - GetLastId, - GetTransactionCount, - GetSignature { signature: Signature }, - GetFinality, -} - -impl Request { - /// Verify the request is valid. - pub fn verify(&self) -> bool { - true - } -} - -#[derive(Serialize, Deserialize, Debug)] -pub enum Response { - Account { - key: Pubkey, - account: Option, - }, - LastId { - id: Hash, - }, - TransactionCount { - transaction_count: u64, - }, - SignatureStatus { - signature_status: bool, - }, - Finality { - time: usize, - }, -} diff --git a/src/request_processor.rs b/src/request_processor.rs deleted file mode 100644 index d8b74a5a9f..0000000000 --- a/src/request_processor.rs +++ /dev/null @@ -1,66 +0,0 @@ -//! The `request_processor` processes thin client Request messages. - -use bank::Bank; -use request::{Request, Response}; -use std::net::SocketAddr; -use std::sync::Arc; - -pub struct RequestProcessor { - bank: Arc, -} - -impl RequestProcessor { - /// Create a new Tpu that wraps the given Bank. - pub fn new(bank: Arc) -> Self { - RequestProcessor { bank } - } - - /// Process Request items sent by clients. - fn process_request( - &self, - msg: Request, - rsp_addr: SocketAddr, - ) -> Option<(Response, SocketAddr)> { - match msg { - Request::GetAccount { key } => { - let account = self.bank.get_account(&key); - let rsp = (Response::Account { key, account }, rsp_addr); - debug!("Response::Account {:?}", rsp); - Some(rsp) - } - Request::GetLastId => { - let id = self.bank.last_id(); - let rsp = (Response::LastId { id }, rsp_addr); - debug!("Response::LastId {:?}", rsp); - Some(rsp) - } - Request::GetTransactionCount => { - let transaction_count = self.bank.transaction_count() as u64; - let rsp = (Response::TransactionCount { transaction_count }, rsp_addr); - debug!("Response::TransactionCount {:?}", rsp); - Some(rsp) - } - Request::GetSignature { signature } => { - let signature_status = self.bank.has_signature(&signature); - let rsp = (Response::SignatureStatus { signature_status }, rsp_addr); - debug!("Response::Signature {:?}", rsp); - Some(rsp) - } - Request::GetFinality => { - let time = self.bank.finality(); - let rsp = (Response::Finality { time }, rsp_addr); - debug!("Response::Finality {:?}", rsp); - Some(rsp) - } - } - } - - pub fn process_requests( - &self, - reqs: Vec<(Request, SocketAddr)>, - ) -> Vec<(Response, SocketAddr)> { - reqs.into_iter() - .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) - .collect() - } -} diff --git a/src/request_stage.rs b/src/request_stage.rs deleted file mode 100644 index 82c0a63084..0000000000 --- a/src/request_stage.rs +++ /dev/null @@ -1,119 +0,0 @@ -//! The `request_stage` processes thin client Request messages. - -use bincode::deserialize; -use counter::Counter; -use log::Level; -use packet::{to_blobs, Packets, SharedPackets}; -use rayon::prelude::*; -use request::Request; -use request_processor::RequestProcessor; -use result::{Error, Result}; -use service::Service; -use std::net::SocketAddr; -use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; -use std::sync::Arc; -use std::thread::{self, Builder, JoinHandle}; -use std::time::Instant; -use streamer::{self, BlobReceiver, BlobSender}; -use timing; - -pub struct RequestStage { - thread_hdl: JoinHandle<()>, - pub request_processor: Arc, -} - -impl RequestStage { - pub fn deserialize_requests(p: &Packets) -> Vec> { - p.packets - .par_iter() - .map(|x| { - deserialize(&x.data[0..x.meta.size]) - .map(|req| (req, x.meta.addr())) - .ok() - }).collect() - } - - pub fn process_request_packets( - request_processor: &RequestProcessor, - packet_receiver: &Receiver, - blob_sender: &BlobSender, - ) -> Result<()> { - let (batch, batch_len, _recv_time) = streamer::recv_batch(packet_receiver)?; - - debug!( - "@{:?} request_stage: processing: {}", - timing::timestamp(), - batch_len - ); - - let mut reqs_len = 0; - let proc_start = Instant::now(); - for msgs in batch { - let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap()) - .into_iter() - .filter_map(|x| x) - .collect(); - reqs_len += reqs.len(); - - let rsps = request_processor.process_requests(reqs); - - let blobs = to_blobs(rsps)?; - if !blobs.is_empty() { - debug!("process: sending blobs: {}", blobs.len()); - //don't wake up the other side if there is nothing - blob_sender.send(blobs)?; - } - } - let total_time_s = timing::duration_as_s(&proc_start.elapsed()); - let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); - inc_new_counter_info!("request_stage-time_ms", total_time_ms as usize); - debug!( - "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", - timing::timestamp(), - batch_len, - total_time_ms, - reqs_len, - (reqs_len as f32) / (total_time_s) - ); - Ok(()) - } - pub fn new( - request_processor: RequestProcessor, - packet_receiver: Receiver, - ) -> (Self, BlobReceiver) { - let request_processor = Arc::new(request_processor); - let request_processor_ = request_processor.clone(); - let (blob_sender, blob_receiver) = channel(); - let thread_hdl = Builder::new() - .name("solana-request-stage".to_string()) - .spawn(move || loop { - if let Err(e) = Self::process_request_packets( - &request_processor_, - &packet_receiver, - &blob_sender, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => error!("{:?}", e), - } - } - }).unwrap(); - ( - RequestStage { - thread_hdl, - request_processor, - }, - blob_receiver, - ) - } -} - -impl Service for RequestStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - self.thread_hdl.join() - } -} diff --git a/src/rpc.rs b/src/rpc.rs index 54063cc6bb..203da33587 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -615,7 +615,6 @@ mod tests { let last_id = bank.last_id(); let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0); let serial_tx = serialize(&tx).unwrap(); - let rpc_port = 22222; // Needs to be distinct known number to not conflict with other tests let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_data.id, @@ -634,7 +633,7 @@ mod tests { None, &ledger_path, false, - Some(rpc_port), + None, ); sleep(Duration::from_millis(900)); @@ -645,8 +644,7 @@ mod tests { "method": "sendTransaction", "params": json!(vec![serial_tx]) }); - let mut rpc_addr = leader_data.contact_info.ncp; - rpc_addr.set_port(22222); + let rpc_addr = leader_data.contact_info.rpc; let rpc_string = format!("http://{}", rpc_addr.to_string()); let mut response = client .post(&rpc_string) diff --git a/src/rpu.rs b/src/rpu.rs deleted file mode 100644 index 4ed6893ffc..0000000000 --- a/src/rpu.rs +++ /dev/null @@ -1,88 +0,0 @@ -//! The `rpu` module implements the Request Processing Unit, a -//! 3-stage transaction processing pipeline in software. It listens -//! for `Request` messages from clients and replies with `Response` -//! messages. -//! -//! ```text -//! .------. -//! | Bank | -//! `---+--` -//! | -//! .------------------|-------------------. -//! | RPU | | -//! | v | -//! .---------. | .-------. .---------. .---------. | .---------. -//! | Alice |--->| | | | | +---->| Alice | -//! `---------` | | Fetch | | Request | | Respond | | `---------` -//! | | Stage |->| Stage |->| Stage | | -//! .---------. | | | | | | | | .---------. -//! | Bob |--->| | | | | +---->| Bob | -//! `---------` | `-------` `---------` `---------` | `---------` -//! | | -//! | | -//! `--------------------------------------` -//! ``` - -use bank::Bank; -use request_processor::RequestProcessor; -use request_stage::RequestStage; -use service::Service; -use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; -use std::sync::Arc; -use std::thread::{self, JoinHandle}; -use streamer; - -pub struct Rpu { - request_stage: RequestStage, - thread_hdls: Vec>, - exit: Arc, -} - -impl Rpu { - pub fn new(bank: &Arc, requests_socket: UdpSocket, respond_socket: UdpSocket) -> Self { - let exit = Arc::new(AtomicBool::new(false)); - let (packet_sender, packet_receiver) = channel(); - let t_receiver = streamer::receiver( - Arc::new(requests_socket), - exit.clone(), - packet_sender, - "rpu", - ); - - let request_processor = RequestProcessor::new(bank.clone()); - let (request_stage, blob_receiver) = RequestStage::new(request_processor, packet_receiver); - - let t_responder = streamer::responder("rpu", Arc::new(respond_socket), blob_receiver); - - let thread_hdls = vec![t_receiver, t_responder]; - - Rpu { - thread_hdls, - request_stage, - exit, - } - } - - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } -} - -impl Service for Rpu { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - self.request_stage.join()?; - Ok(()) - } -} diff --git a/src/thin_client.rs b/src/thin_client.rs index ef76f43285..5d0673c6d8 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -4,13 +4,15 @@ //! unstable and may change in future releases. use bank::Bank; -use bincode::{deserialize, serialize}; +use bincode::serialize; +use bs58; use cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo}; use hash::Hash; use log::Level; use ncp::Ncp; -use request::{Request, Response}; use result::{Error, Result}; +use rpc_request::RpcRequest; +use serde_json; use signature::{Keypair, Signature}; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; @@ -33,8 +35,7 @@ use metrics; /// An object for querying and sending transactions to the network. pub struct ThinClient { - requests_addr: SocketAddr, - requests_socket: UdpSocket, + rpc_addr: SocketAddr, transactions_addr: SocketAddr, transactions_socket: UdpSocket, last_id: Option, @@ -45,18 +46,15 @@ pub struct ThinClient { } impl ThinClient { - /// Create a new ThinClient that will interface with Rpu - /// over `requests_socket` and `transactions_socket`. To receive responses, the caller must bind `socket` - /// to a public address before invoking ThinClient methods. + /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP + /// and the Tpu at `transactions_addr` over `transactions_socket` using UDP. pub fn new( - requests_addr: SocketAddr, - requests_socket: UdpSocket, + rpc_addr: SocketAddr, transactions_addr: SocketAddr, transactions_socket: UdpSocket, ) -> Self { ThinClient { - requests_addr, - requests_socket, + rpc_addr, transactions_addr, transactions_socket, last_id: None, @@ -67,58 +65,6 @@ impl ThinClient { } } - pub fn recv_response(&self) -> io::Result { - let mut buf = vec![0u8; 1024]; - trace!("start recv_from"); - match self.requests_socket.recv_from(&mut buf) { - Ok((len, from)) => { - trace!("end recv_from got {} {}", len, from); - deserialize(&buf) - .or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize"))) - } - Err(e) => { - trace!("end recv_from got {:?}", e); - Err(e) - } - } - } - - pub fn process_response(&mut self, resp: &Response) { - match *resp { - Response::Account { - key, - account: Some(ref account), - } => { - trace!("Response account {:?} {:?}", key, account); - self.balances.insert(key, account.clone()); - } - Response::Account { key, account: None } => { - debug!("Response account {}: None ", key); - self.balances.remove(&key); - } - Response::LastId { id } => { - trace!("Response last_id {:?}", id); - self.last_id = Some(id); - } - Response::TransactionCount { transaction_count } => { - trace!("Response transaction count {:?}", transaction_count); - self.transaction_count = transaction_count; - } - Response::SignatureStatus { signature_status } => { - self.signature_status = signature_status; - if signature_status { - trace!("Response found signature"); - } else { - trace!("Response signature not found"); - } - } - Response::Finality { time } => { - trace!("Response finality {:?}", time); - self.finality = Some(time); - } - } - } - /// Send a signed Transaction to the server for processing. This method /// does not wait for a response. pub fn transfer_signed(&self, tx: &Transaction) -> io::Result { @@ -195,41 +141,36 @@ impl ThinClient { } pub fn get_account_userdata(&mut self, pubkey: &Pubkey) -> io::Result>> { - let req = Request::GetAccount { key: *pubkey }; - let data = serialize(&req).expect("serialize GetAccount in pub fn get_account_userdata"); - self.requests_socket - .send_to(&data, &self.requests_addr) - .expect("buffer error in pub fn get_account_userdata"); - - loop { - let resp = self.recv_response()?; - trace!("recv_response {:?}", resp); - if let Response::Account { key, account } = resp { - if key == *pubkey { - return Ok(account.map(|account| account.userdata)); - } - } + let params = json!(format!("{}", pubkey)); + let rpc_string = format!("http://{}", self.rpc_addr.to_string()); + let resp = RpcRequest::GetAccountInfo.make_rpc_request(&rpc_string, 1, Some(params)); + if let Ok(account_json) = resp { + let account: Account = + serde_json::from_value(account_json).expect("deserialize account"); + return Ok(Some(account.userdata)); } + Err(io::Error::new( + io::ErrorKind::Other, + "get_account_userdata failed", + )) } /// Request the balance of the user holding `pubkey`. This method blocks /// until the server sends a response. If the response packet is dropped /// by the network, this method will hang indefinitely. pub fn get_balance(&mut self, pubkey: &Pubkey) -> io::Result { - trace!("get_balance sending request to {}", self.requests_addr); - let req = Request::GetAccount { key: *pubkey }; - let data = serialize(&req).expect("serialize GetAccount in pub fn get_balance"); - self.requests_socket - .send_to(&data, &self.requests_addr) - .expect("buffer error in pub fn get_balance"); - let mut done = false; - while !done { - let resp = self.recv_response()?; - trace!("recv_response {:?}", resp); - if let Response::Account { key, .. } = &resp { - done = key == pubkey; - } - self.process_response(&resp); + trace!("get_balance sending request to {}", self.rpc_addr); + let params = json!(format!("{}", pubkey)); + let rpc_string = format!("http://{}", self.rpc_addr.to_string()); + let resp = RpcRequest::GetAccountInfo.make_rpc_request(&rpc_string, 1, Some(params)); + if let Ok(account_json) = resp { + let account: Account = + serde_json::from_value(account_json).expect("deserialize account"); + trace!("Response account {:?} {:?}", pubkey, account); + self.balances.insert(*pubkey, account.clone()); + } else { + debug!("Response account {}: None ", pubkey); + self.balances.remove(&pubkey); } trace!("get_balance {:?}", self.balances.get(pubkey)); // TODO: This is a hard coded call to introspect the balance of a budget_dsl contract @@ -243,25 +184,18 @@ impl ThinClient { /// Request the finality from the leader node pub fn get_finality(&mut self) -> usize { trace!("get_finality"); - let req = Request::GetFinality; - let data = serialize(&req).expect("serialize GetFinality in pub fn get_finality"); let mut done = false; + let rpc_string = format!("http://{}", self.rpc_addr.to_string()); while !done { - debug!("get_finality send_to {}", &self.requests_addr); - self.requests_socket - .send_to(&data, &self.requests_addr) - .expect("buffer error in pub fn get_finality"); + debug!("get_finality send_to {}", &self.rpc_addr); + let resp = RpcRequest::GetFinality.make_rpc_request(&rpc_string, 1, None); - match self.recv_response() { - Ok(resp) => { - if let Response::Finality { .. } = resp { - done = true; - } - self.process_response(&resp); - } - Err(e) => { - debug!("thin_client get_finality error: {}", e); - } + if let Ok(value) = resp { + done = true; + let finality = value.as_u64().unwrap() as usize; + self.finality = Some(finality); + } else { + debug!("thin_client get_finality error: {:?}", resp); } } self.finality.expect("some finality") @@ -271,21 +205,16 @@ impl ThinClient { /// this method will try again 5 times. pub fn transaction_count(&mut self) -> u64 { debug!("transaction_count"); - let req = Request::GetTransactionCount; - let data = - serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count"); let mut tries_left = 5; + let rpc_string = format!("http://{}", self.rpc_addr.to_string()); while tries_left > 0 { - self.requests_socket - .send_to(&data, &self.requests_addr) - .expect("buffer error in pub fn transaction_count"); + let resp = RpcRequest::GetTransactionCount.make_rpc_request(&rpc_string, 1, None); - if let Ok(resp) = self.recv_response() { - debug!("transaction_count recv_response: {:?}", resp); - if let Response::TransactionCount { .. } = resp { - tries_left = 0; - } - self.process_response(&resp); + if let Ok(value) = resp { + debug!("transaction_count recv_response: {:?}", value); + tries_left = 0; + let transaction_count = value.as_u64().unwrap(); + self.transaction_count = transaction_count; } else { tries_left -= 1; } @@ -297,25 +226,20 @@ impl ThinClient { /// until the server sends a response. pub fn get_last_id(&mut self) -> Hash { trace!("get_last_id"); - let req = Request::GetLastId; - let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id"); let mut done = false; + let rpc_string = format!("http://{}", self.rpc_addr.to_string()); while !done { - debug!("get_last_id send_to {}", &self.requests_addr); - self.requests_socket - .send_to(&data, &self.requests_addr) - .expect("buffer error in pub fn get_last_id"); + debug!("get_last_id send_to {}", &self.rpc_addr); + let resp = RpcRequest::GetLastId.make_rpc_request(&rpc_string, 1, None); - match self.recv_response() { - Ok(resp) => { - if let Response::LastId { .. } = resp { - done = true; - } - self.process_response(&resp); - } - Err(e) => { - debug!("thin_client get_last_id error: {}", e); - } + if let Ok(value) = resp { + done = true; + let last_id_str = value.as_str().unwrap(); + let last_id_vec = bs58::decode(last_id_str).into_vec().unwrap(); + let last_id = Hash::new(&last_id_vec); + self.last_id = Some(last_id); + } else { + debug!("thin_client get_last_id error: {:?}", resp); } } self.last_id.expect("some last_id") @@ -377,22 +301,25 @@ impl ThinClient { /// until the server sends a response. pub fn check_signature(&mut self, signature: &Signature) -> bool { trace!("check_signature"); - let req = Request::GetSignature { - signature: *signature, - }; - let data = serialize(&req).expect("serialize GetSignature in pub fn check_signature"); + let params = json!(format!("{}", signature)); let now = Instant::now(); + let rpc_string = format!("http://{}", self.rpc_addr.to_string()); let mut done = false; while !done { - self.requests_socket - .send_to(&data, &self.requests_addr) - .expect("buffer error in pub fn get_last_id"); + let resp = RpcRequest::ConfirmTransaction.make_rpc_request( + &rpc_string, + 1, + Some(params.clone()), + ); - if let Ok(resp) = self.recv_response() { - if let Response::SignatureStatus { .. } = resp { - done = true; + if let Ok(confirmation) = resp { + done = true; + self.signature_status = confirmation.as_bool().unwrap(); + if self.signature_status { + trace!("Response found signature"); + } else { + trace!("Response signature not found"); } - self.process_response(&resp); } } metrics::submit( @@ -500,6 +427,7 @@ pub fn retry_get_balance( mod tests { use super::*; use bank::Bank; + use bincode::deserialize; use cluster_info::Node; use fullnode::Fullnode; use leader_scheduler::LeaderScheduler; @@ -512,7 +440,6 @@ mod tests { use vote_program::VoteProgram; #[test] - #[ignore] fn test_thin_client() { logger::setup(); let leader_keypair = Arc::new(Keypair::new()); @@ -540,19 +467,21 @@ mod tests { None, &ledger_path, false, - Some(0), + None, ); sleep(Duration::from_millis(900)); - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.contact_info.rpu, - requests_socket, + leader_data.contact_info.rpc, leader_data.contact_info.tpu, transactions_socket, ); + let transaction_count = client.transaction_count(); + assert_eq!(transaction_count, 0); + let finality = client.get_finality(); + assert_eq!(finality, 18446744073709551615); let last_id = client.get_last_id(); let signature = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) @@ -560,6 +489,8 @@ mod tests { client.poll_for_signature(&signature).unwrap(); let balance = client.get_balance(&bob_pubkey); assert_eq!(balance.unwrap(), 500); + let transaction_count = client.transaction_count(); + assert_eq!(transaction_count, 1); server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } @@ -593,19 +524,14 @@ mod tests { None, &ledger_path, false, - Some(0), + None, ); //TODO: remove this sleep, or add a retry so CI is stable sleep(Duration::from_millis(300)); - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(5, 0))) - .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.contact_info.rpu, - requests_socket, + leader_data.contact_info.rpc, leader_data.contact_info.tpu, transactions_socket, ); @@ -660,18 +586,13 @@ mod tests { None, &ledger_path, false, - Some(0), + None, ); sleep(Duration::from_millis(300)); - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(5, 0))) - .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.contact_info.rpu, - requests_socket, + leader_data.contact_info.rpc, leader_data.contact_info.tpu, transactions_socket, ); @@ -714,18 +635,13 @@ mod tests { None, &ledger_path, false, - Some(0), + None, ); sleep(Duration::from_millis(300)); - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(5, 0))) - .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.contact_info.rpu, - requests_socket, + leader_data.contact_info.rpc, leader_data.contact_info.tpu, transactions_socket, ); @@ -788,12 +704,8 @@ mod tests { // set a bogus address, see that we don't hang logger::setup(); let addr = "0.0.0.0:1234".parse().unwrap(); - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::from_millis(250))) - .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut client = ThinClient::new(addr, requests_socket, addr, transactions_socket); + let mut client = ThinClient::new(addr, addr, transactions_socket); assert_eq!(client.transaction_count(), 0); } @@ -825,18 +737,13 @@ mod tests { None, &ledger_path, false, - Some(0), + None, ); sleep(Duration::from_millis(900)); - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(5, 0))) - .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut client = ThinClient::new( - leader_data.contact_info.rpu, - requests_socket, + leader_data.contact_info.rpc, leader_data.contact_info.tpu, transactions_socket, ); diff --git a/src/tvu.rs b/src/tvu.rs index b756303dee..799db33582 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -267,7 +267,7 @@ pub mod tests { let (s_responder, r_responder) = channel(); let t_responder = streamer::responder( "test_replicate", - Arc::new(leader.sockets.requests), + Arc::new(leader.sockets.retransmit), r_responder, ); diff --git a/src/wallet.rs b/src/wallet.rs index 93012898d4..af37ee67ed 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -1167,8 +1167,6 @@ mod tests { create_tmp_genesis("wallet_request_airdrop", 10_000_000, leader_data.id, 1000); let mut bank = Bank::new(&alice); - let rpc_port = 11111; // Needs to be distinct known number to not conflict with other tests - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_data.id, ))); @@ -1186,7 +1184,7 @@ mod tests { None, &ledger_path, false, - Some(rpc_port), + None, ); sleep(Duration::from_millis(900)); @@ -1194,9 +1192,7 @@ mod tests { run_local_drone(alice.keypair(), leader_data.contact_info.ncp, sender); let drone_addr = receiver.recv().unwrap(); - let mut addr = leader_data.contact_info.ncp; - addr.set_port(rpc_port); - let rpc_addr = format!("http://{}", addr.to_string()); + let rpc_addr = format!("http://{}", leader_data.contact_info.rpc.to_string()); let signature = request_airdrop(&drone_addr, &bob_pubkey, 50); assert!(signature.is_ok()); @@ -1377,7 +1373,6 @@ mod tests { let mut config_payer = WalletConfig::default(); let mut config_witness = WalletConfig::default(); - let rpc_port = 11223; // Needs to be distinct known number to not conflict with other tests let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_data.id, ))); @@ -1394,7 +1389,7 @@ mod tests { None, &ledger_path, false, - Some(rpc_port), + None, ); sleep(Duration::from_millis(900)); @@ -1405,8 +1400,7 @@ mod tests { config_payer.leader = leader_data1; config_witness.leader = leader_data2; - let mut rpc_addr = leader_data.contact_info.ncp; - rpc_addr.set_port(rpc_port); + let rpc_addr = leader_data.contact_info.rpc; config_payer.rpc_addr = format!("http://{}", rpc_addr.to_string()); config_witness.rpc_addr = config_payer.rpc_addr.clone(); diff --git a/tests/multinode.rs b/tests/multinode.rs index f8a4b5d9fa..218dae5b47 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -42,7 +42,6 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { let me = spy.info.id.clone(); let daddr = "0.0.0.0:0".parse().unwrap(); spy.info.contact_info.tvu = daddr; - spy.info.contact_info.rpu = spy.sockets.transaction[0].local_addr().unwrap(); let mut spy_cluster_info = ClusterInfo::new(spy.info).expect("ClusterInfo::new"); spy_cluster_info.insert(&leader); spy_cluster_info.set_leader(leader.id); @@ -141,6 +140,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); // start up another validator from zero, converge and then check @@ -157,6 +157,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); // Send validator some tokens to vote @@ -233,6 +234,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); let mut nodes = vec![server]; @@ -260,6 +262,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); nodes.push(val); } @@ -297,6 +300,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); nodes.push(val); //contains the leader and new node @@ -365,6 +369,7 @@ fn test_multi_node_basic() { None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); let mut nodes = vec![server]; @@ -389,6 +394,7 @@ fn test_multi_node_basic() { Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); nodes.push(val); } @@ -442,6 +448,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap(); @@ -463,6 +470,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); let mut client = mk_client(&validator_data); let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance)); @@ -488,6 +496,7 @@ fn create_leader(ledger_path: &str, leader_keypair: Arc) -> (NodeInfo, None, false, LeaderScheduler::from_bootstrap_leader(leader_data.id), + None, ); (leader_data, leader_fullnode) } @@ -549,6 +558,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_data.id), + None, ); // trigger broadcast, validator should catch up from leader, whose window contains @@ -615,6 +625,7 @@ fn test_multi_node_dynamic_network() { None, true, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); info!("{} LEADER", leader_data.id); @@ -678,6 +689,7 @@ fn test_multi_node_dynamic_network() { Some(leader_data.contact_info.ncp), true, LeaderScheduler::from_bootstrap_leader(leader_pubkey), + None, ); (rd, val) }).unwrap() @@ -823,6 +835,7 @@ fn test_leader_to_validator_transition() { Some(leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ); // Make an extra node for our leader to broadcast to, @@ -962,6 +975,7 @@ fn test_leader_validator_basic() { Some(leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ); // Start the leader fullnode @@ -973,6 +987,7 @@ fn test_leader_validator_basic() { Some(leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ); // Wait for convergence @@ -1148,6 +1163,7 @@ fn test_dropped_handoff_recovery() { Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ); let mut nodes = vec![bootstrap_leader]; @@ -1170,6 +1186,7 @@ fn test_dropped_handoff_recovery() { Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ); nodes.push(validator); @@ -1195,6 +1212,7 @@ fn test_dropped_handoff_recovery() { Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ); // Wait for catchup @@ -1311,6 +1329,7 @@ fn test_full_leader_validator_network() { Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ))); let mut nodes: Vec>> = vec![bootstrap_leader.clone()]; @@ -1337,6 +1356,7 @@ fn test_full_leader_validator_network() { Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), + None, ))); nodes.push(validator.clone()); @@ -1451,16 +1471,10 @@ fn test_full_leader_validator_network() { } fn mk_client(leader: &NodeInfo) -> ThinClient { - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(1, 0))) - .unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - assert!(ClusterInfo::is_valid_address(&leader.contact_info.rpu)); assert!(ClusterInfo::is_valid_address(&leader.contact_info.tpu)); ThinClient::new( - leader.contact_info.rpu, - requests_socket, + leader.contact_info.rpc, leader.contact_info.tpu, transactions_socket, )