Compare commits
13 Commits
v0.7.0-rc.
...
v0.7.0-rc.
Author | SHA1 | Date | |
---|---|---|---|
|
0696f9f497 | ||
|
b2ea2455e2 | ||
|
3f659a69fd | ||
|
2c62be951f | ||
|
2348733d6c | ||
|
cc229b535d | ||
|
7f810a29ff | ||
|
fc1dfd86d2 | ||
|
5deb34e5bd | ||
|
39df087902 | ||
|
6ff46540b6 | ||
|
dbab8792e4 | ||
|
4eb676afaa |
@@ -1,3 +1,6 @@
|
||||
FROM rustlang/rust:nightly
|
||||
|
||||
RUN cargo install --force clippy cargo-cov
|
||||
RUN cargo install --force clippy cargo-cov && \
|
||||
echo deb http://ftp.debian.org/debian stretch-backports main >> /etc/apt/sources.list && \
|
||||
apt update && \
|
||||
apt install -y llvm-6.0
|
||||
|
@@ -27,6 +27,6 @@ ls -l target/cov/report/index.html
|
||||
if [[ -z "$CODECOV_TOKEN" ]]; then
|
||||
echo CODECOV_TOKEN undefined
|
||||
else
|
||||
bash <(curl -s https://codecov.io/bash) -x 'llvm-cov gcov'
|
||||
bash <(curl -s https://codecov.io/bash) -x 'llvm-cov-6.0 gcov'
|
||||
fi
|
||||
|
||||
|
@@ -5,9 +5,12 @@
|
||||
# This script must be run by a user/machine that has successfully authenticated
|
||||
# with GCP and has sufficient permission.
|
||||
#
|
||||
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
# TODO: Switch over to rolling updates
|
||||
ROLLING_UPDATE=false
|
||||
#ROLLING_UPDATE=true
|
||||
|
||||
if [[ -z $SOLANA_METRICS_CONFIG ]]; then
|
||||
echo Error: SOLANA_METRICS_CONFIG environment variable is unset
|
||||
exit 1
|
||||
@@ -26,7 +29,7 @@ edge)
|
||||
;;
|
||||
beta)
|
||||
publicUrl=testnet.solana.com
|
||||
publicIp=# # Use default value
|
||||
publicIp="" # Use default value
|
||||
;;
|
||||
*)
|
||||
echo Error: Unknown SOLANA_SNAP_CHANNEL=$SOLANA_SNAP_CHANNEL
|
||||
@@ -61,6 +64,28 @@ wait_for_node() {
|
||||
fi
|
||||
}
|
||||
|
||||
if ! $ROLLING_UPDATE; then
|
||||
count=1
|
||||
for info in "${vmlist[@]}"; do
|
||||
nodePosition="($count/${#vmlist[*]})"
|
||||
vmName=${info%:*}
|
||||
vmZone=${info#*:}
|
||||
echo "--- Shutting down $vmName in zone $vmZone $nodePosition"
|
||||
gcloud compute ssh "$vmName" --zone "$vmZone" \
|
||||
--ssh-flag="-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" \
|
||||
--command="echo sudo snap remove solana" &
|
||||
|
||||
if [[ $((count % 10)) = 0 ]]; then
|
||||
# Slow down deployment to avoid triggering GCP login
|
||||
# quota limits (each |ssh| counts as a login)
|
||||
sleep 3
|
||||
fi
|
||||
|
||||
count=$((count + 1))
|
||||
done
|
||||
|
||||
wait
|
||||
fi
|
||||
|
||||
echo "--- Refreshing leader for $publicUrl"
|
||||
leader=true
|
||||
@@ -76,13 +101,18 @@ for info in "${vmlist[@]}"; do
|
||||
(
|
||||
SECONDS=0
|
||||
echo "--- $vmName in zone $vmZone $nodePosition"
|
||||
commonNodeConfig="\
|
||||
rust-log=$RUST_LOG \
|
||||
default-metrics-rate=$SOLANA_DEFAULT_METRICS_RATE \
|
||||
metrics-config=$SOLANA_METRICS_CONFIG \
|
||||
"
|
||||
if $leader; then
|
||||
nodeConfig="mode=leader+drone metrics-config=$SOLANA_METRICS_CONFIG"
|
||||
nodeConfig="mode=leader+drone $commonNodeConfig"
|
||||
if [[ -n $SOLANA_CUDA ]]; then
|
||||
nodeConfig="$nodeConfig enable-cuda=1"
|
||||
fi
|
||||
else
|
||||
nodeConfig="mode=validator metrics-config=$SOLANA_METRICS_CONFIG leader-address=$publicIp"
|
||||
nodeConfig="mode=validator leader-address=$publicIp $commonNodeConfig"
|
||||
fi
|
||||
|
||||
set -x
|
||||
|
@@ -35,6 +35,7 @@ if [[ -d "$SNAP" ]]; then # Running inside a Linux Snap?
|
||||
mkdir -p "$SNAP_DATA"/{drone,leader,validator}
|
||||
|
||||
SOLANA_METRICS_CONFIG="$(snapctl get metrics-config)"
|
||||
SOLANA_DEFAULT_METRICS_RATE="$(snapctl get default-metrics-rate)"
|
||||
SOLANA_CUDA="$(snapctl get enable-cuda)"
|
||||
RUST_LOG="$(snapctl get rust-log)"
|
||||
|
||||
|
@@ -79,7 +79,7 @@ common_start_setup() {
|
||||
|
||||
# Killing sshguard for now. TODO: Find a better solution
|
||||
# sshguard is blacklisting IP address after ssh-keyscan and ssh login attempts
|
||||
ssh -n -f "$remote_user@$ip_addr" " \
|
||||
ssh "$remote_user@$ip_addr" " \
|
||||
set -ex; \
|
||||
sudo service sshguard stop; \
|
||||
sudo apt-get --assume-yes install rsync libssl-dev; \
|
||||
@@ -92,6 +92,7 @@ common_start_setup() {
|
||||
rsync -vPrz "$ssh_keys"/id_rsa "$remote_user@$ip_addr":~/.ssh/
|
||||
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/
|
||||
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/authorized_keys
|
||||
rsync -vPrz ./multinode-demo "$remote_user@$ip_addr":~/solana/
|
||||
} >>log/"$ip_addr".log
|
||||
fi
|
||||
}
|
||||
@@ -101,7 +102,6 @@ start_leader() {
|
||||
|
||||
{
|
||||
rsync -vPrz ~/.cargo/bin/solana* "$remote_user@$ip_addr":~/.cargo/bin/
|
||||
rsync -vPrz ./multinode-demo "$remote_user@$ip_addr":~/solana/
|
||||
rsync -vPrz ./fetch-perf-libs.sh "$remote_user@$ip_addr":~/solana/
|
||||
ssh -n -f "$remote_user@$ip_addr" 'cd solana; FORCE=1 ./multinode-demo/remote_leader.sh'
|
||||
} >>log/"$1".log
|
||||
@@ -114,7 +114,6 @@ start_leader() {
|
||||
start_validator() {
|
||||
common_start_setup "$1"
|
||||
|
||||
ssh "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader_ip"":~/solana/multinode-demo ~/solana/" >>log/"$1".log
|
||||
ssh -n -f "$remote_user@$ip_addr" "cd solana; FORCE=1 ./multinode-demo/remote_validator.sh $leader_ip" >>log/"$1".log
|
||||
}
|
||||
|
||||
@@ -128,9 +127,6 @@ start_all_nodes() {
|
||||
mkdir -p log
|
||||
|
||||
for ip_addr in "${ip_addr_array[@]}"; do
|
||||
ssh-keygen -R "$ip_addr" >log/local.log
|
||||
ssh-keyscan "$ip_addr" >>~/.ssh/known_hosts 2>/dev/null
|
||||
|
||||
if ((!count)); then
|
||||
# Start the leader on the first node
|
||||
echo "Leader node $ip_addr, killing previous instance and restarting"
|
||||
@@ -159,6 +155,9 @@ stop_all_nodes() {
|
||||
SECONDS=0
|
||||
local count=0
|
||||
for ip_addr in "${ip_addr_array[@]}"; do
|
||||
ssh-keygen -R "$ip_addr" >log/local.log
|
||||
ssh-keyscan "$ip_addr" >>~/.ssh/known_hosts 2>/dev/null
|
||||
|
||||
echo "Stopping node[$count] $ip_addr. Remote user $remote_user"
|
||||
|
||||
ssh -n -f "$remote_user@$ip_addr" " \
|
||||
@@ -176,7 +175,7 @@ stop_all_nodes() {
|
||||
}
|
||||
|
||||
if [[ $command == "start" ]]; then
|
||||
#build_project
|
||||
build_project
|
||||
stop_all_nodes
|
||||
start_all_nodes
|
||||
elif [[ $command == "stop" ]]; then
|
||||
|
@@ -6,7 +6,8 @@ chmod 600 ~/.ssh/authorized_keys ~/.ssh/id_rsa
|
||||
|
||||
PATH="$HOME"/.cargo/bin:"$PATH"
|
||||
|
||||
ssh-keygen -R "$1"
|
||||
touch ~/.ssh/known_hosts
|
||||
ssh-keygen -R "$1" 2>/dev/null
|
||||
ssh-keyscan "$1" >>~/.ssh/known_hosts 2>/dev/null
|
||||
|
||||
rsync -vPrz "$1":~/.cargo/bin/solana* ~/.cargo/bin/
|
||||
|
@@ -9,7 +9,7 @@ use bincode::serialize;
|
||||
use clap::{App, Arg};
|
||||
use rayon::prelude::*;
|
||||
use solana::crdt::{Crdt, NodeInfo};
|
||||
use solana::drone::DroneRequest;
|
||||
use solana::drone::{DroneRequest, DRONE_PORT};
|
||||
use solana::fullnode::Config;
|
||||
use solana::hash::Hash;
|
||||
use solana::nat::{udp_public_bind, udp_random_bind};
|
||||
@@ -220,12 +220,13 @@ fn main() {
|
||||
}
|
||||
|
||||
let mut drone_addr = leader.contact_info.tpu;
|
||||
drone_addr.set_port(9900);
|
||||
drone_addr.set_port(DRONE_PORT);
|
||||
|
||||
let signal = Arc::new(AtomicBool::new(false));
|
||||
let mut c_threads = vec![];
|
||||
let validators = converge(&leader, &signal, num_nodes, &mut c_threads);
|
||||
assert_eq!(validators.len(), num_nodes);
|
||||
println!("Network has {} node(s)", validators.len());
|
||||
assert!(validators.len() >= num_nodes);
|
||||
|
||||
let mut client = mk_client(&leader);
|
||||
|
||||
@@ -417,6 +418,12 @@ fn converge(
|
||||
println!("CONVERGED!");
|
||||
rv.extend(v.into_iter());
|
||||
break;
|
||||
} else {
|
||||
println!(
|
||||
"{} node(s) discovered (looking for {} or more)",
|
||||
v.len(),
|
||||
num_nodes
|
||||
);
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
|
@@ -10,7 +10,7 @@ extern crate tokio_io;
|
||||
use bincode::deserialize;
|
||||
use clap::{App, Arg};
|
||||
use solana::crdt::NodeInfo;
|
||||
use solana::drone::{Drone, DroneRequest};
|
||||
use solana::drone::{Drone, DroneRequest, DRONE_PORT};
|
||||
use solana::fullnode::Config;
|
||||
use solana::metrics::set_panic_hook;
|
||||
use solana::signature::read_keypair;
|
||||
@@ -85,7 +85,7 @@ fn main() {
|
||||
request_cap = None;
|
||||
}
|
||||
|
||||
let drone_addr: SocketAddr = "0.0.0.0:9900".parse().unwrap();
|
||||
let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap();
|
||||
|
||||
let drone = Arc::new(Mutex::new(Drone::new(
|
||||
mint_keypair,
|
||||
|
@@ -10,7 +10,7 @@ extern crate solana;
|
||||
use bincode::serialize;
|
||||
use clap::{App, Arg, SubCommand};
|
||||
use solana::crdt::NodeInfo;
|
||||
use solana::drone::DroneRequest;
|
||||
use solana::drone::{DroneRequest, DRONE_PORT};
|
||||
use solana::fullnode::Config;
|
||||
use solana::signature::{read_keypair, KeyPair, KeyPairUtil, PublicKey, Signature};
|
||||
use solana::thin_client::ThinClient;
|
||||
@@ -164,7 +164,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
|
||||
})?;
|
||||
|
||||
let mut drone_addr = leader.contact_info.tpu;
|
||||
drone_addr.set_port(9900);
|
||||
drone_addr.set_port(DRONE_PORT);
|
||||
|
||||
let command = match matches.subcommand() {
|
||||
("airdrop", Some(airdrop_matches)) => {
|
||||
|
171
src/crdt.rs
171
src/crdt.rs
@@ -120,8 +120,6 @@ pub struct ContactInfo {
|
||||
pub struct LedgerState {
|
||||
/// last verified hash that was submitted to the leader
|
||||
pub last_id: Hash,
|
||||
/// last verified entry count, always increasing
|
||||
pub entry_height: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
@@ -166,7 +164,6 @@ impl NodeInfo {
|
||||
leader_id: PublicKey::default(),
|
||||
ledger_state: LedgerState {
|
||||
last_id: Hash::default(),
|
||||
entry_height: 0,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -429,6 +426,7 @@ impl Crdt {
|
||||
let _ = self.table.insert(v.id, v.clone());
|
||||
let _ = self.local.insert(v.id, self.update_index);
|
||||
inc_new_counter!("crdt-update-count", 1);
|
||||
self.update_liveness(v.id);
|
||||
} else {
|
||||
trace!(
|
||||
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
|
||||
@@ -438,7 +436,6 @@ impl Crdt {
|
||||
self.table[&v.id].version
|
||||
);
|
||||
}
|
||||
self.update_liveness(v.id);
|
||||
}
|
||||
|
||||
fn update_liveness(&mut self, id: PublicKey) {
|
||||
@@ -458,35 +455,20 @@ impl Crdt {
|
||||
/// challenging part is that we are on a permissionless network
|
||||
pub fn purge(&mut self, now: u64) {
|
||||
if self.table.len() <= MIN_TABLE_SIZE {
|
||||
trace!("purge: skipped: table too small: {}", self.table.len());
|
||||
return;
|
||||
}
|
||||
if self.leader_data().is_none() {
|
||||
trace!("purge: skipped: no leader_data");
|
||||
return;
|
||||
}
|
||||
let leader_id = self.leader_data().unwrap().id;
|
||||
|
||||
let limit = GOSSIP_PURGE_MILLIS;
|
||||
let dead_ids: Vec<PublicKey> = self.alive
|
||||
.iter()
|
||||
.filter_map(|(&k, v)| {
|
||||
if k != self.me && (now - v) > limit {
|
||||
if leader_id == k {
|
||||
info!(
|
||||
"{:x}: PURGE LEADER {:x} {}",
|
||||
self.debug_id(),
|
||||
make_debug_id(&k),
|
||||
now - v
|
||||
);
|
||||
Some(k)
|
||||
} else {
|
||||
info!(
|
||||
"{:x}: PURGE {:x} {}",
|
||||
self.debug_id(),
|
||||
make_debug_id(&k),
|
||||
now - v
|
||||
);
|
||||
Some(k)
|
||||
}
|
||||
Some(k)
|
||||
} else {
|
||||
trace!(
|
||||
"{:x} purge skipped {:x} {} {}",
|
||||
@@ -508,9 +490,19 @@ impl Crdt {
|
||||
self.remote.remove(id);
|
||||
self.local.remove(id);
|
||||
self.external_liveness.remove(id);
|
||||
info!("{:x}: PURGE {:x}", self.debug_id(), make_debug_id(id));
|
||||
for map in self.external_liveness.values_mut() {
|
||||
map.remove(id);
|
||||
}
|
||||
if *id == leader_id {
|
||||
info!(
|
||||
"{:x}: PURGE LEADER {:x}",
|
||||
self.debug_id(),
|
||||
make_debug_id(id),
|
||||
);
|
||||
inc_new_counter!("crdt-purge-purged_leader", 1, 1);
|
||||
self.set_leader(PublicKey::default());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -777,12 +769,11 @@ impl Crdt {
|
||||
Ok((v.contact_info.ncp, req))
|
||||
}
|
||||
|
||||
pub fn new_vote(&mut self, height: u64, last_id: Hash) -> Result<(Vote, SocketAddr)> {
|
||||
pub fn new_vote(&mut self, last_id: Hash) -> Result<(Vote, SocketAddr)> {
|
||||
let mut me = self.my_data().clone();
|
||||
let leader = self.leader_data().ok_or(CrdtError::NoLeader)?.clone();
|
||||
me.version += 1;
|
||||
me.ledger_state.last_id = last_id;
|
||||
me.ledger_state.entry_height = height;
|
||||
let vote = Vote {
|
||||
version: me.version,
|
||||
contact_info_version: me.contact_info.version,
|
||||
@@ -993,11 +984,35 @@ impl Crdt {
|
||||
blob: &Blob,
|
||||
) -> Option<SharedBlob> {
|
||||
match deserialize(&blob.data[..blob.meta.size]) {
|
||||
Ok(request) => Crdt::handle_protocol(request, obj, window, blob_recycler),
|
||||
Err(_) => {
|
||||
warn!("deserialize crdt packet failed");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_protocol(
|
||||
request: Protocol,
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
window: &Window,
|
||||
blob_recycler: &BlobRecycler,
|
||||
) -> Option<SharedBlob> {
|
||||
match request {
|
||||
// TODO sigverify these
|
||||
Ok(Protocol::RequestUpdates(v, from_rd)) => {
|
||||
trace!("RequestUpdates {}", v);
|
||||
Protocol::RequestUpdates(v, from_rd) => {
|
||||
let addr = from_rd.contact_info.ncp;
|
||||
trace!("RequestUpdates {} from {}", v, addr);
|
||||
let me = obj.read().unwrap();
|
||||
if addr == me.table[&me.me].contact_info.ncp {
|
||||
warn!(
|
||||
"RequestUpdates ignored, I'm talking to myself: me={:x} remoteme={:x}",
|
||||
me.debug_id(),
|
||||
make_debug_id(&from_rd.id)
|
||||
);
|
||||
inc_new_counter!("crdt-window-request-loopback", 1);
|
||||
return None;
|
||||
}
|
||||
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
|
||||
let (from, ups, data) = me.get_updates_since(v);
|
||||
let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect();
|
||||
@@ -1005,7 +1020,11 @@ impl Crdt {
|
||||
trace!("get updates since response {} {}", v, data.len());
|
||||
let len = data.len();
|
||||
let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness);
|
||||
obj.write().unwrap().insert(&from_rd);
|
||||
{
|
||||
let mut me = obj.write().unwrap();
|
||||
me.insert(&from_rd);
|
||||
me.update_liveness(from_rd.id);
|
||||
}
|
||||
if len < 1 {
|
||||
let me = obj.read().unwrap();
|
||||
trace!(
|
||||
@@ -1029,19 +1048,19 @@ impl Crdt {
|
||||
None
|
||||
}
|
||||
}
|
||||
Ok(Protocol::ReceiveUpdates(from, ups, data, external_liveness)) => {
|
||||
Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => {
|
||||
trace!(
|
||||
"ReceivedUpdates {:x} {} {}",
|
||||
"ReceivedUpdates from={:x} update_index={} len={}",
|
||||
make_debug_id(&from),
|
||||
ups,
|
||||
update_index,
|
||||
data.len()
|
||||
);
|
||||
obj.write()
|
||||
.expect("'obj' write lock in ReceiveUpdates")
|
||||
.apply_updates(from, ups, &data, &external_liveness);
|
||||
.apply_updates(from, update_index, &data, &external_liveness);
|
||||
None
|
||||
}
|
||||
Ok(Protocol::RequestWindowIndex(from, ix)) => {
|
||||
Protocol::RequestWindowIndex(from, ix) => {
|
||||
//TODO this doesn't depend on CRDT module, can be moved
|
||||
//but we are using the listen thread to service these request
|
||||
//TODO verify from is signed
|
||||
@@ -1066,10 +1085,6 @@ impl Crdt {
|
||||
}
|
||||
Self::run_window_request(&window, &me, &from, ix, blob_recycler)
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("deserialize crdt packet failed");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1236,14 +1251,14 @@ impl TestNode {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crdt::{
|
||||
parse_port_or_addr, Crdt, CrdtError, NodeInfo, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS,
|
||||
MIN_TABLE_SIZE,
|
||||
parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS,
|
||||
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
|
||||
};
|
||||
use hash::Hash;
|
||||
use logger;
|
||||
use packet::BlobRecycler;
|
||||
use result::Error;
|
||||
use signature::{KeyPair, KeyPairUtil};
|
||||
use signature::{KeyPair, KeyPairUtil, PublicKey};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
@@ -1375,12 +1390,25 @@ mod tests {
|
||||
assert_eq!(d.version, 0);
|
||||
let mut crdt = Crdt::new(d.clone()).unwrap();
|
||||
assert_eq!(crdt.table[&d.id].version, 0);
|
||||
assert!(!crdt.alive.contains_key(&d.id));
|
||||
|
||||
d.version = 2;
|
||||
crdt.insert(&d);
|
||||
let liveness = crdt.alive[&d.id];
|
||||
assert_eq!(crdt.table[&d.id].version, 2);
|
||||
|
||||
d.version = 1;
|
||||
crdt.insert(&d);
|
||||
assert_eq!(crdt.table[&d.id].version, 2);
|
||||
assert_eq!(liveness, crdt.alive[&d.id]);
|
||||
|
||||
// Ensure liveness will be updated for version 3
|
||||
sleep(Duration::from_millis(1));
|
||||
|
||||
d.version = 3;
|
||||
crdt.insert(&d);
|
||||
assert_eq!(crdt.table[&d.id].version, 3);
|
||||
assert!(liveness < crdt.alive[&d.id]);
|
||||
}
|
||||
#[test]
|
||||
fn test_new_vote() {
|
||||
@@ -1391,12 +1419,12 @@ mod tests {
|
||||
let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
|
||||
assert_ne!(d.id, leader.id);
|
||||
assert_matches!(
|
||||
crdt.new_vote(0, Hash::default()).err(),
|
||||
crdt.new_vote(Hash::default()).err(),
|
||||
Some(Error::CrdtError(CrdtError::NoLeader))
|
||||
);
|
||||
crdt.insert(&leader);
|
||||
assert_matches!(
|
||||
crdt.new_vote(0, Hash::default()).err(),
|
||||
crdt.new_vote(Hash::default()).err(),
|
||||
Some(Error::CrdtError(CrdtError::NoLeader))
|
||||
);
|
||||
crdt.set_leader(leader.id);
|
||||
@@ -1406,7 +1434,7 @@ mod tests {
|
||||
contact_info_version: 0,
|
||||
};
|
||||
let expected = (v, crdt.table[&leader.id].contact_info.tpu);
|
||||
assert_eq!(crdt.new_vote(0, Hash::default()).unwrap(), expected);
|
||||
assert_eq!(crdt.new_vote(Hash::default()).unwrap(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1740,6 +1768,28 @@ mod tests {
|
||||
let rv = crdt.gossip_request().unwrap();
|
||||
assert_eq!(rv.0, nxt.contact_info.ncp);
|
||||
}
|
||||
#[test]
|
||||
fn purge_leader_test() {
|
||||
logger::setup();
|
||||
let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
|
||||
let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
|
||||
assert_ne!(me.id, nxt.id);
|
||||
crdt.insert(&nxt);
|
||||
crdt.set_leader(nxt.id);
|
||||
let now = crdt.alive[&nxt.id];
|
||||
let mut nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
|
||||
crdt.insert(&nxt2);
|
||||
while now == crdt.alive[&nxt2.id] {
|
||||
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
|
||||
nxt2.version = nxt2.version + 1;
|
||||
crdt.insert(&nxt2);
|
||||
}
|
||||
let len = crdt.table.len() as u64;
|
||||
crdt.purge(now + GOSSIP_PURGE_MILLIS + 1);
|
||||
assert_eq!(len as usize - 1, crdt.table.len());
|
||||
assert_eq!(crdt.my_data().leader_id, PublicKey::default());
|
||||
}
|
||||
|
||||
/// test window requests respond with the right blob, and do not overrun
|
||||
#[test]
|
||||
@@ -1831,4 +1881,41 @@ mod tests {
|
||||
crdt.update_leader();
|
||||
assert_eq!(crdt.my_data().leader_id, leader1.id);
|
||||
}
|
||||
|
||||
/// Validates the node that sent Protocol::ReceiveUpdates gets its
|
||||
/// liveness updated, but not if the node sends Protocol::ReceiveUpdates
|
||||
/// to itself.
|
||||
#[test]
|
||||
fn protocol_requestupdate_alive() {
|
||||
logger::setup();
|
||||
let window = default_window();
|
||||
let recycler = BlobRecycler::default();
|
||||
|
||||
let node = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||
let node_with_same_addr = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||
assert_ne!(node.id, node_with_same_addr.id);
|
||||
let node_with_diff_addr = NodeInfo::new_leader(&"127.0.0.1:4321".parse().unwrap());
|
||||
|
||||
let crdt = Crdt::new(node.clone()).expect("Crdt::new");
|
||||
assert_eq!(crdt.alive.len(), 0);
|
||||
|
||||
let obj = Arc::new(RwLock::new(crdt));
|
||||
|
||||
let request = Protocol::RequestUpdates(1, node.clone());
|
||||
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
|
||||
|
||||
let request = Protocol::RequestUpdates(1, node_with_same_addr.clone());
|
||||
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
|
||||
|
||||
let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone());
|
||||
Crdt::handle_protocol(request, &obj, &window, &recycler);
|
||||
|
||||
let me = obj.write().unwrap();
|
||||
|
||||
// |node| and |node_with_same_addr| should not be in me.alive, but
|
||||
// |node_with_diff_addr| should now be.
|
||||
assert!(!me.alive.contains_key(&node.id));
|
||||
assert!(!me.alive.contains_key(&node_with_same_addr.id));
|
||||
assert!(me.alive[&node_with_diff_addr.id] > 0);
|
||||
}
|
||||
}
|
||||
|
@@ -16,6 +16,7 @@ use transaction::Transaction;
|
||||
|
||||
pub const TIME_SLICE: u64 = 60;
|
||||
pub const REQUEST_CAP: u64 = 1_000_000;
|
||||
pub const DRONE_PORT: u16 = 9900;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||
pub enum DroneRequest {
|
||||
|
@@ -20,7 +20,8 @@ impl<'a, W: Write> EntryWriter<'a, W> {
|
||||
|
||||
fn write_entry(writer: &mut W, entry: &Entry) -> io::Result<()> {
|
||||
let serialized = serde_json::to_string(entry).unwrap();
|
||||
writeln!(writer, "{}", serialized)
|
||||
writeln!(writer, "{}", serialized)?;
|
||||
writer.flush()
|
||||
}
|
||||
|
||||
pub fn write_entries<I>(writer: &mut W, entries: I) -> io::Result<()>
|
||||
|
@@ -212,6 +212,7 @@ impl Packets {
|
||||
Ok((nrecv, from)) => {
|
||||
p.meta.size = nrecv;
|
||||
p.meta.set_addr(&from);
|
||||
trace!("got {} bytes from {}", nrecv, from);
|
||||
if i == 0 {
|
||||
socket.set_nonblocking(true)?;
|
||||
}
|
||||
@@ -405,6 +406,7 @@ impl Blob {
|
||||
Ok((nrecv, from)) => {
|
||||
p.meta.size = nrecv;
|
||||
p.meta.set_addr(&from);
|
||||
trace!("got {} bytes from {}", nrecv, from);
|
||||
if i == 0 {
|
||||
socket.set_nonblocking(true)?;
|
||||
}
|
||||
|
@@ -60,16 +60,16 @@ impl ReplicateStage {
|
||||
if res.is_err() {
|
||||
error!("process_entries {} {:?}", blobs_len, res);
|
||||
}
|
||||
let _ = res?;
|
||||
let now = timing::timestamp();
|
||||
if now - *last_vote > VOTE_TIMEOUT_MS {
|
||||
let height = res?;
|
||||
let last_id = bank.last_id();
|
||||
let shared_blob = blob_recycler.allocate();
|
||||
let (vote, addr) = {
|
||||
let mut wcrdt = crdt.write().unwrap();
|
||||
//TODO: doesn't seem like there is a synchronous call to get height and id
|
||||
info!("replicate_stage {} {:?}", height, &last_id[..8]);
|
||||
wcrdt.new_vote(height, last_id)
|
||||
info!("replicate_stage {:?}", &last_id[..8]);
|
||||
wcrdt.new_vote(last_id)
|
||||
}?;
|
||||
{
|
||||
let mut blob = shared_blob.write().unwrap();
|
||||
|
@@ -126,7 +126,7 @@ pub fn responder(
|
||||
//TODO, we would need to stick block authentication before we create the
|
||||
//window.
|
||||
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
|
||||
trace!("receiving on {}", sock.local_addr().unwrap());
|
||||
trace!("recv_blobs: receiving on {}", sock.local_addr().unwrap());
|
||||
let dq = Blob::recv_from(recycler, sock)?;
|
||||
if !dq.is_empty() {
|
||||
s.send(dq)?;
|
||||
|
Reference in New Issue
Block a user