diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 8ccb0cf621..9ded7d2f9e 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -27,7 +27,7 @@ fn print_usage(program: &str, opts: Options) { print!("{}", opts.usage(&brief)); } -fn main() { +fn main() -> () { env_logger::init(); let mut opts = Options::new(); opts.optflag("h", "help", "print help"); diff --git a/src/crdt.rs b/src/crdt.rs index 23654c7061..f9ec4b22d2 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -37,6 +37,7 @@ use timing::timestamp; /// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; +//const GOSSIP_MIN_PURGE_MILLIS: u64 = 15000; /// minimum membership table size before we start purging dead nodes const MIN_TABLE_SIZE: usize = 2; @@ -226,14 +227,15 @@ impl Crdt { pub fn my_data(&self) -> &ReplicatedData { &self.table[&self.me] } - pub fn leader_data(&self) -> &ReplicatedData { - &self.table[&self.table[&self.me].current_leader_id] + pub fn leader_data(&self) -> Option<&ReplicatedData> { + self.table.get(&(self.table[&self.me].current_leader_id)) } pub fn set_leader(&mut self, key: PublicKey) -> () { let mut me = self.my_data().clone(); me.current_leader_id = key; me.version += 1; + info!("setting leader to {:?}", &key[..4]); self.insert(&me); } @@ -281,6 +283,7 @@ impl Crdt { //wait for 4x as long as it would randomly take to reach our node //assuming everyone is waiting the same amount of time as this node let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4; + //let limit = std::cmp::max(limit, GOSSIP_MIN_PURGE_MILLIS); let dead_ids: Vec = self.alive .iter() @@ -596,6 +599,9 @@ impl Crdt { trace!("leader {:?} {}", &v.current_leader_id[..4], *cnt); } let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect(); + if sorted.len() > 0 { + info!("sorted leaders {:?}", sorted); + } sorted.sort_by_key(|a| a.1); sorted.last().map(|a| *a.0) } @@ -919,6 +925,46 @@ impl TestNode { }, } } + pub fn new_with_bind_addr(data: ReplicatedData, bind_addr: SocketAddr) -> TestNode { + let mut local_gossip_addr = bind_addr.clone(); + local_gossip_addr.set_port(data.gossip_addr.port()); + + let mut local_replicate_addr = bind_addr.clone(); + local_replicate_addr.set_port(data.replicate_addr.port()); + + let mut local_requests_addr = bind_addr.clone(); + local_requests_addr.set_port(data.requests_addr.port()); + + let mut local_transactions_addr = bind_addr.clone(); + local_transactions_addr.set_port(data.transactions_addr.port()); + + let mut local_repair_addr = bind_addr.clone(); + local_repair_addr.set_port(data.repair_addr.port()); + + let transaction = UdpSocket::bind(local_transactions_addr).unwrap(); + let gossip = UdpSocket::bind(local_gossip_addr).unwrap(); + let replicate = UdpSocket::bind(local_replicate_addr).unwrap(); + let requests = UdpSocket::bind(local_requests_addr).unwrap(); + let repair = UdpSocket::bind(local_repair_addr).unwrap(); + let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); + let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); + TestNode { + data: data, + sockets: Sockets { + gossip, + gossip_send, + requests, + replicate, + transaction, + respond, + broadcast, + repair, + retransmit, + }, + } + } } #[cfg(test)]