Review comments: node creation functions for replicators

And rework download loop.
This commit is contained in:
Stephen Akridge
2019-03-18 11:12:19 -07:00
committed by sakridge
parent ee58c1f960
commit f1802e592a
3 changed files with 84 additions and 46 deletions

View File

@ -1362,11 +1362,39 @@ impl Node {
Self::new_localhost_with_pubkey(&pubkey) Self::new_localhost_with_pubkey(&pubkey)
} }
pub fn new_localhost_replicator(pubkey: &Pubkey) -> Self { pub fn new_localhost_replicator(pubkey: &Pubkey) -> Self {
let mut new = Self::new_localhost_with_pubkey(pubkey); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let tvu = UdpSocket::bind("127.0.0.1:0").unwrap();
let storage = UdpSocket::bind("127.0.0.1:0").unwrap(); let storage = UdpSocket::bind("127.0.0.1:0").unwrap();
new.info.storage_addr = storage.local_addr().unwrap(); let empty = "0.0.0.0:0".parse().unwrap();
new.sockets.storage = Some(storage); let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
new
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
let info = ContactInfo::new(
pubkey,
gossip.local_addr().unwrap(),
tvu.local_addr().unwrap(),
empty,
empty,
storage.local_addr().unwrap(),
empty,
empty,
timestamp(),
);
Node {
info,
sockets: Sockets {
gossip,
tvu: vec![tvu],
tpu: vec![],
tpu_via_blobs: vec![],
broadcast,
repair,
retransmit,
storage: Some(storage),
},
}
} }
pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self { pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self {
let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); let tpu = UdpSocket::bind("127.0.0.1:0").unwrap();
@ -1408,12 +1436,8 @@ impl Node {
}, },
} }
} }
pub fn new_with_external_ip(pubkey: &Pubkey, gossip_addr: &SocketAddr) -> Node { fn get_gossip_port(gossip_addr: &SocketAddr) -> (u16, UdpSocket) {
fn bind() -> (u16, UdpSocket) { if gossip_addr.port() != 0 {
bind_in_range(FULLNODE_PORT_RANGE).expect("Failed to bind")
};
let (gossip_port, gossip) = if gossip_addr.port() != 0 {
( (
gossip_addr.port(), gossip_addr.port(),
bind_to(gossip_addr.port(), false).unwrap_or_else(|e| { bind_to(gossip_addr.port(), false).unwrap_or_else(|e| {
@ -1421,8 +1445,14 @@ impl Node {
}), }),
) )
} else { } else {
bind() Self::bind()
}; }
}
fn bind() -> (u16, UdpSocket) {
bind_in_range(FULLNODE_PORT_RANGE).expect("Failed to bind")
}
pub fn new_with_external_ip(pubkey: &Pubkey, gossip_addr: &SocketAddr) -> Node {
let (gossip_port, gossip) = Self::get_gossip_port(gossip_addr);
let (tvu_port, tvu_sockets) = let (tvu_port, tvu_sockets) =
multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tvu multi_bind"); multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tvu multi_bind");
@ -1433,9 +1463,9 @@ impl Node {
let (tpu_via_blobs_port, tpu_via_blobs_sockets) = let (tpu_via_blobs_port, tpu_via_blobs_sockets) =
multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tpu multi_bind"); multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tpu multi_bind");
let (_, repair) = bind(); let (_, repair) = Self::bind();
let (_, broadcast) = bind(); let (_, broadcast) = Self::bind();
let (_, retransmit) = bind(); let (_, retransmit) = Self::bind();
let info = ContactInfo::new( let info = ContactInfo::new(
pubkey, pubkey,
@ -1464,6 +1494,21 @@ impl Node {
}, },
} }
} }
pub fn new_replicator_with_external_ip(pubkey: &Pubkey, gossip_addr: &SocketAddr) -> Node {
let mut new = Self::new_with_external_ip(pubkey, gossip_addr);
let (storage_port, storage_socket) = Self::bind();
new.info.storage_addr = SocketAddr::new(gossip_addr.ip(), storage_port);
new.sockets.storage = Some(storage_socket);
let empty = "0.0.0.0:0".parse().unwrap();
new.info.tpu = empty;
new.info.tpu_via_blobs = empty;
new.sockets.tpu = vec![];
new.sockets.tpu_via_blobs = vec![];
new
}
} }
fn report_time_spent(label: &str, time: &Duration, extra: &str) { fn report_time_spent(label: &str, time: &Duration, extra: &str) {
@ -1758,6 +1803,19 @@ mod tests {
assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050);
} }
#[test]
fn new_replicator_external_ip_test() {
let ip = IpAddr::V4(Ipv4Addr::from(0));
let node =
Node::new_replicator_with_external_ip(&Keypair::new().pubkey(), &socketaddr!(0, 8050));
check_socket(&node.sockets.storage.unwrap(), ip, FULLNODE_PORT_RANGE);
check_socket(&node.sockets.gossip, ip, FULLNODE_PORT_RANGE);
check_socket(&node.sockets.repair, ip, FULLNODE_PORT_RANGE);
check_sockets(&node.sockets.tvu, ip, FULLNODE_PORT_RANGE);
}
//test that all cluster_info objects only generate signed messages //test that all cluster_info objects only generate signed messages
//when constructed with keypairs //when constructed with keypairs
#[test] #[test]

View File

@ -41,7 +41,7 @@ use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::thread::spawn; use std::thread::spawn;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::time::{Duration, Instant}; use std::time::Duration;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub enum ReplicatorRequest { pub enum ReplicatorRequest {
@ -179,20 +179,13 @@ impl Replicator {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new( pub fn new(
ledger_path: &str, ledger_path: &str,
mut node: Node, node: Node,
cluster_entrypoint: ContactInfo, cluster_entrypoint: ContactInfo,
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
_timeout: Option<Duration>, _timeout: Option<Duration>,
) -> Result<Self> { ) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
// replicator cannot give information on rpc and
// cannot be leader so tpu/rpc ports are cleared
node.info.rpc = "0.0.0.0:0".parse().unwrap();
node.info.rpc_pubsub = "0.0.0.0:0".parse().unwrap();
node.info.tpu = "0.0.0.0:0".parse().unwrap();
node.info.tpu_via_blobs = "0.0.0.0:0".parse().unwrap();
info!("Replicator: id: {}", keypair.pubkey()); info!("Replicator: id: {}", keypair.pubkey());
info!("Creating cluster info...."); info!("Creating cluster info....");
let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone()); let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone());
@ -316,38 +309,25 @@ impl Replicator {
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
) { ) {
info!("window created, waiting for ledger download done"); info!("window created, waiting for ledger download done");
let _start = Instant::now();
let mut _received_so_far = 0; let mut _received_so_far = 0;
let mut current_slot = start_slot; let mut current_slot = start_slot;
let mut done = false; 'outer: loop {
loop { while let Ok(meta) = blocktree.meta(current_slot) {
loop {
if let Ok(meta) = blocktree.meta(current_slot) {
if let Some(meta) = meta { if let Some(meta) = meta {
if meta.is_rooted { if meta.is_rooted {
current_slot += 1; current_slot += 1;
info!("current slot: {}", current_slot); warn!("current slot: {}", current_slot);
} else {
break;
}
} else {
break;
}
} else {
break;
}
if current_slot >= start_slot + ENTRIES_PER_SEGMENT { if current_slot >= start_slot + ENTRIES_PER_SEGMENT {
info!("current slot: {} start: {}", current_slot, start_slot); break 'outer;
done = true; }
} else {
break;
}
} else {
break; break;
} }
} }
if done {
break;
}
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }

View File

@ -67,7 +67,7 @@ fn main() {
} }
addr addr
}; };
let node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr); let node = Node::new_replicator_with_external_ip(&keypair.pubkey(), &gossip_addr);
println!( println!(
"replicating the data with keypair={:?} gossip_addr={:?}", "replicating the data with keypair={:?} gossip_addr={:?}",