generic array fail case

This commit is contained in:
Anatoly Yakovenko
2018-06-23 16:08:53 -07:00
committed by Greg Fitzgerald
parent d5be23dffe
commit 88503c2a09
3 changed files with 86 additions and 14 deletions

View File

@ -36,7 +36,12 @@ fn main() {
let mut opts = Options::new();
opts.optflag("h", "help", "print help");
opts.optopt("l", "", "run with the identity found in FILE", "FILE");
opts.optopt("v", "", "validate; find leader's identity in FILE", "FILE");
opts.optopt(
"t",
"",
"testnet; connec to the network at this gossip entry point",
"host:port",
);
opts.optopt(
"o",
"",
@ -119,14 +124,14 @@ fn main() {
}
let exit = Arc::new(AtomicBool::new(false));
let threads = if matches.opt_present("v") {
let path = matches.opt_str("v").unwrap();
let threads = if matches.opt_present("t") {
let testnet = matches.opt_str("t").unwrap();
eprintln!(
"starting validator... {} using {}",
repl_data.requests_addr, path
"starting validator... {} connecting to {}",
repl_data.requests_addr, testnet
);
let file = File::open(path.clone()).expect(&format!("file not found: {}", path));
let leader = serde_json::from_reader(file).expect("parse");
let taddr = testnet.parse().unwrap();
let entry = ReplicatedData::new_entry_point(taddr);
let s = Server::new_validator(
bank,
repl_data.clone(),
@ -135,7 +140,7 @@ fn main() {
UdpSocket::bind(repl_data.replicate_addr).unwrap(),
UdpSocket::bind(repl_data.gossip_addr).unwrap(),
UdpSocket::bind(repl_data.repair_addr).unwrap(),
leader,
entry,
exit.clone(),
);
s.thread_hdls

View File

@ -551,6 +551,33 @@ impl Crdt {
blob_sender.send(q)?;
Ok(())
}
/// FIXME: This is obviously the wrong way to do this. Need to implement leader selection
fn top_leader(&self) -> Option<PublicKey> {
let mut table = HashMap::new();
let def = PublicKey::default();
let cur = self.table.values().filter(|x| x.current_leader_id != def);
for v in cur {
let cnt = table.entry(&v.current_leader_id).or_insert(0);
//let cnt = table.get_mut(&v.current_leader_id).unwrap();
*cnt += 1;
println!("leader {:?} {}", &v.current_leader_id[..4], *cnt);
}
let mut sorted: Vec<_> = table.iter().collect();
sorted.sort_by_key(|a| a.1);
sorted.last().map(|a| *(*(*a).0))
}
/// FIXME: This is obviously the wrong way to do this. Need to implement leader selection
/// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet
fn update_leader(&mut self) {
if let Some(lid) = self.top_leader() {
if self.my_data().current_leader_id != lid {
if self.table.get(&lid).is_some() {
self.set_leader(lid);
}
}
}
}
/// Apply updates that we received from the identity `from`
/// # Arguments
@ -577,6 +604,7 @@ impl Crdt {
Builder::new()
.name("solana-gossip".to_string())
.spawn(move || loop {
let start = timestamp();
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler);
obj.write().unwrap().purge(timestamp());
if exit.load(Ordering::Relaxed) {
@ -584,7 +612,12 @@ impl Crdt {
}
//TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
let _ = obj.write().unwrap().update_leader();
let elapsed = timestamp() - start;
if GOSSIP_SLEEP_MILLIS > elapsed {
let left = GOSSIP_SLEEP_MILLIS - elapsed;
sleep(Duration::from_millis(left));
}
})
.unwrap()
}
@ -825,6 +858,7 @@ mod tests {
use packet::BlobRecycler;
use result::Error;
use signature::{KeyPair, KeyPairUtil};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
@ -1166,4 +1200,40 @@ mod tests {
assert_eq!(blob.get_id().unwrap(), id);
}
}
/// FIXME: This is obviously the wrong way to do this. Need to implement leader selection,
/// delete this test after leader selection is correctly implemented
#[test]
fn test_update_leader() {
let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
let lead = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
let lead2 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
let mut crdt = Crdt::new(me.clone());
assert_matches!(crdt.top_leader(), None);
crdt.set_leader(lead.id);
assert_eq!(crdt.top_leader().unwrap(), lead.id);
//add a bunch of nodes with a new leader
for _ in 0..10 {
let mut dum = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap());
dum.current_leader_id = lead2.id;
crdt.insert(&dum);
}
assert_eq!(crdt.top_leader().unwrap(), lead2.id);
crdt.update_leader();
assert_eq!(crdt.my_data().current_leader_id, lead.id);
crdt.insert(&lead2);
crdt.update_leader();
assert_eq!(crdt.my_data().current_leader_id, lead2.id);
}
#[test]
fn test_update_leader_pubkeys() {
let key1 = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap());
let key2 = ReplicatedData::new_entry_point("127.0.0.1:1234".parse().unwrap());
let mut table = HashMap::new();
table.entry(&key1.current_leader_id).or_insert(0);
for _ in 0..1000 {
let a = table.entry(&key2.current_leader_id).or_insert(0);
*a += 1;
}
assert_eq!(table.len(), 2);
}
}

View File

@ -134,7 +134,7 @@ impl Server {
replicate_socket: UdpSocket,
gossip_listen_socket: UdpSocket,
repair_socket: UdpSocket,
leader_repl_data: ReplicatedData,
entry_point: ReplicatedData,
exit: Arc<AtomicBool>,
) -> Self {
let bank = Arc::new(bank);
@ -143,12 +143,9 @@ impl Server {
thread_hdls.extend(rpu.thread_hdls);
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
crdt.write()
.expect("'crdt' write lock in pub fn replicate")
.set_leader(leader_repl_data.id);
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&leader_repl_data);
.insert(&entry_point);
let window = streamer::default_window();
let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");
let retransmit_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0");