diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index dcbc8f3744..e925f678e8 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -1,7 +1,6 @@ extern crate env_logger; extern crate getopts; extern crate isatty; -extern crate pnet; extern crate serde_json; extern crate solana; #[macro_use] @@ -9,18 +8,16 @@ extern crate log; use getopts::Options; use isatty::stdin_isatty; -use pnet::datalink; use solana::bank::Bank; -use solana::crdt::ReplicatedData; +use solana::crdt::{get_ip_addr, parse_port_or_addr, ReplicatedData}; use solana::entry::Entry; use solana::payment_plan::PaymentPlan; use solana::server::Server; -use solana::signature::{KeyPair, KeyPairUtil}; use solana::transaction::Instruction; use std::env; use std::fs::File; use std::io::{stdin, Read}; -use std::net::{IpAddr, SocketAddr, UdpSocket}; +use std::net::{SocketAddr, UdpSocket}; use std::process::exit; use std::sync::atomic::AtomicBool; use std::sync::Arc; @@ -121,7 +118,7 @@ fn main() { let exit = Arc::new(AtomicBool::new(false)); // we need all the receiving sockets to be bound within the expected // port range that we open on aws - let mut repl_data = make_repl_data(&bind_addr); + let mut repl_data = ReplicatedData::new_leader(&bind_addr); if matches.opt_present("l") { let path = matches.opt_str("l").unwrap(); if let Ok(file) = File::open(path.clone()) { @@ -178,62 +175,3 @@ fn main() { t.join().expect("join"); } } - -fn next_port(server_addr: &SocketAddr, nxt: u16) -> SocketAddr { - let mut gossip_addr = server_addr.clone(); - gossip_addr.set_port(server_addr.port() + nxt); - gossip_addr -} - -fn make_repl_data(bind_addr: &SocketAddr) -> ReplicatedData { - let transactions_addr = bind_addr.clone(); - let gossip_addr = next_port(&bind_addr, 1); - let replicate_addr = next_port(&bind_addr, 2); - let requests_addr = next_port(&bind_addr, 3); - let pubkey = KeyPair::new().pubkey(); - ReplicatedData::new( - pubkey, - gossip_addr, - replicate_addr, - requests_addr, - transactions_addr, - ) -} - -fn parse_port_or_addr(optstr: Option) -> SocketAddr { - let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); - if let Some(addrstr) = optstr { - if let Ok(port) = addrstr.parse() { - let mut addr = daddr.clone(); - addr.set_port(port); - addr - } else if let Ok(addr) = addrstr.parse() { - addr - } else { - daddr - } - } else { - daddr - } -} - -fn get_ip_addr() -> Option { - for iface in datalink::interfaces() { - for p in iface.ips { - if !p.ip().is_loopback() && !p.ip().is_multicast() { - return Some(p.ip()); - } - } - } - None -} - -#[test] -fn test_parse_port_or_addr() { - let p1 = parse_port_or_addr(Some("9000".to_string())); - assert_eq!(p1.port(), 9000); - let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string())); - assert_eq!(p2.port(), 7000); - let p3 = parse_port_or_addr(None); - assert_eq!(p3.port(), 8000); -} diff --git a/src/crdt.rs b/src/crdt.rs index 73c06b49a1..7c2d7e494b 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -17,6 +17,7 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; use hash::Hash; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; +use pnet::datalink; use rayon::prelude::*; use result::{Error, Result}; use ring::rand::{SecureRandom, SystemRandom}; @@ -26,13 +27,41 @@ use std; use std::collections::HashMap; use std::collections::VecDeque; use std::io::Cursor; -use std::net::{SocketAddr, UdpSocket}; +use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use streamer::{BlobReceiver, BlobSender}; +pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { + let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); + if let Some(addrstr) = optstr { + if let Ok(port) = addrstr.parse() { + let mut addr = daddr.clone(); + addr.set_port(port); + addr + } else if let Ok(addr) = addrstr.parse() { + addr + } else { + daddr + } + } else { + daddr + } +} + +pub fn get_ip_addr() -> Option { + for iface in datalink::interfaces() { + for p in iface.ips { + if !p.ip().is_loopback() && !p.ip().is_multicast() { + return Some(p.ip()); + } + } + } + None +} + /// Structure to be replicated by the network #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct ReplicatedData { @@ -77,6 +106,27 @@ impl ReplicatedData { last_verified_count: 0, } } + + fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { + let mut nxt_addr = addr.clone(); + nxt_addr.set_port(addr.port() + nxt); + nxt_addr + } + + pub fn new_leader(bind_addr: &SocketAddr) -> Self { + let transactions_addr = bind_addr.clone(); + let gossip_addr = Self::next_port(&bind_addr, 1); + let replicate_addr = Self::next_port(&bind_addr, 2); + let requests_addr = Self::next_port(&bind_addr, 3); + let pubkey = KeyPair::new().pubkey(); + ReplicatedData::new( + pubkey, + gossip_addr, + replicate_addr, + requests_addr, + transactions_addr, + ) + } } /// `Crdt` structure keeps a table of `ReplicatedData` structs @@ -647,9 +697,19 @@ impl TestNode { #[cfg(test)] mod tests { - use crdt::{Crdt, ReplicatedData}; + use crdt::{parse_port_or_addr, Crdt, ReplicatedData}; use signature::{KeyPair, KeyPairUtil}; + #[test] + fn test_parse_port_or_addr() { + let p1 = parse_port_or_addr(Some("9000".to_string())); + assert_eq!(p1.port(), 9000); + let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string())); + assert_eq!(p2.port(), 7000); + let p3 = parse_port_or_addr(None); + assert_eq!(p3.port(), 8000); + } + /// Test that insert drops messages that are older #[test] fn insert_test() { diff --git a/src/lib.rs b/src/lib.rs index 110dd1558c..8583e75466 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,6 +46,7 @@ extern crate ring; extern crate serde; #[macro_use] extern crate serde_derive; +extern crate pnet; extern crate serde_json; extern crate sha2; extern crate untrusted;