Compare commits
13 Commits
v0.7.0-rc.
...
v0.7.0-rc.
Author | SHA1 | Date | |
---|---|---|---|
|
0696f9f497 | ||
|
b2ea2455e2 | ||
|
3f659a69fd | ||
|
2c62be951f | ||
|
2348733d6c | ||
|
cc229b535d | ||
|
7f810a29ff | ||
|
fc1dfd86d2 | ||
|
5deb34e5bd | ||
|
39df087902 | ||
|
6ff46540b6 | ||
|
dbab8792e4 | ||
|
4eb676afaa |
@@ -1,3 +1,6 @@
|
|||||||
FROM rustlang/rust:nightly
|
FROM rustlang/rust:nightly
|
||||||
|
|
||||||
RUN cargo install --force clippy cargo-cov
|
RUN cargo install --force clippy cargo-cov && \
|
||||||
|
echo deb http://ftp.debian.org/debian stretch-backports main >> /etc/apt/sources.list && \
|
||||||
|
apt update && \
|
||||||
|
apt install -y llvm-6.0
|
||||||
|
@@ -27,6 +27,6 @@ ls -l target/cov/report/index.html
|
|||||||
if [[ -z "$CODECOV_TOKEN" ]]; then
|
if [[ -z "$CODECOV_TOKEN" ]]; then
|
||||||
echo CODECOV_TOKEN undefined
|
echo CODECOV_TOKEN undefined
|
||||||
else
|
else
|
||||||
bash <(curl -s https://codecov.io/bash) -x 'llvm-cov gcov'
|
bash <(curl -s https://codecov.io/bash) -x 'llvm-cov-6.0 gcov'
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
@@ -5,9 +5,12 @@
|
|||||||
# This script must be run by a user/machine that has successfully authenticated
|
# This script must be run by a user/machine that has successfully authenticated
|
||||||
# with GCP and has sufficient permission.
|
# with GCP and has sufficient permission.
|
||||||
#
|
#
|
||||||
|
|
||||||
cd "$(dirname "$0")/.."
|
cd "$(dirname "$0")/.."
|
||||||
|
|
||||||
|
# TODO: Switch over to rolling updates
|
||||||
|
ROLLING_UPDATE=false
|
||||||
|
#ROLLING_UPDATE=true
|
||||||
|
|
||||||
if [[ -z $SOLANA_METRICS_CONFIG ]]; then
|
if [[ -z $SOLANA_METRICS_CONFIG ]]; then
|
||||||
echo Error: SOLANA_METRICS_CONFIG environment variable is unset
|
echo Error: SOLANA_METRICS_CONFIG environment variable is unset
|
||||||
exit 1
|
exit 1
|
||||||
@@ -26,7 +29,7 @@ edge)
|
|||||||
;;
|
;;
|
||||||
beta)
|
beta)
|
||||||
publicUrl=testnet.solana.com
|
publicUrl=testnet.solana.com
|
||||||
publicIp=# # Use default value
|
publicIp="" # Use default value
|
||||||
;;
|
;;
|
||||||
*)
|
*)
|
||||||
echo Error: Unknown SOLANA_SNAP_CHANNEL=$SOLANA_SNAP_CHANNEL
|
echo Error: Unknown SOLANA_SNAP_CHANNEL=$SOLANA_SNAP_CHANNEL
|
||||||
@@ -61,6 +64,28 @@ wait_for_node() {
|
|||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ! $ROLLING_UPDATE; then
|
||||||
|
count=1
|
||||||
|
for info in "${vmlist[@]}"; do
|
||||||
|
nodePosition="($count/${#vmlist[*]})"
|
||||||
|
vmName=${info%:*}
|
||||||
|
vmZone=${info#*:}
|
||||||
|
echo "--- Shutting down $vmName in zone $vmZone $nodePosition"
|
||||||
|
gcloud compute ssh "$vmName" --zone "$vmZone" \
|
||||||
|
--ssh-flag="-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" \
|
||||||
|
--command="echo sudo snap remove solana" &
|
||||||
|
|
||||||
|
if [[ $((count % 10)) = 0 ]]; then
|
||||||
|
# Slow down deployment to avoid triggering GCP login
|
||||||
|
# quota limits (each |ssh| counts as a login)
|
||||||
|
sleep 3
|
||||||
|
fi
|
||||||
|
|
||||||
|
count=$((count + 1))
|
||||||
|
done
|
||||||
|
|
||||||
|
wait
|
||||||
|
fi
|
||||||
|
|
||||||
echo "--- Refreshing leader for $publicUrl"
|
echo "--- Refreshing leader for $publicUrl"
|
||||||
leader=true
|
leader=true
|
||||||
@@ -76,13 +101,18 @@ for info in "${vmlist[@]}"; do
|
|||||||
(
|
(
|
||||||
SECONDS=0
|
SECONDS=0
|
||||||
echo "--- $vmName in zone $vmZone $nodePosition"
|
echo "--- $vmName in zone $vmZone $nodePosition"
|
||||||
|
commonNodeConfig="\
|
||||||
|
rust-log=$RUST_LOG \
|
||||||
|
default-metrics-rate=$SOLANA_DEFAULT_METRICS_RATE \
|
||||||
|
metrics-config=$SOLANA_METRICS_CONFIG \
|
||||||
|
"
|
||||||
if $leader; then
|
if $leader; then
|
||||||
nodeConfig="mode=leader+drone metrics-config=$SOLANA_METRICS_CONFIG"
|
nodeConfig="mode=leader+drone $commonNodeConfig"
|
||||||
if [[ -n $SOLANA_CUDA ]]; then
|
if [[ -n $SOLANA_CUDA ]]; then
|
||||||
nodeConfig="$nodeConfig enable-cuda=1"
|
nodeConfig="$nodeConfig enable-cuda=1"
|
||||||
fi
|
fi
|
||||||
else
|
else
|
||||||
nodeConfig="mode=validator metrics-config=$SOLANA_METRICS_CONFIG leader-address=$publicIp"
|
nodeConfig="mode=validator leader-address=$publicIp $commonNodeConfig"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
set -x
|
set -x
|
||||||
|
@@ -35,6 +35,7 @@ if [[ -d "$SNAP" ]]; then # Running inside a Linux Snap?
|
|||||||
mkdir -p "$SNAP_DATA"/{drone,leader,validator}
|
mkdir -p "$SNAP_DATA"/{drone,leader,validator}
|
||||||
|
|
||||||
SOLANA_METRICS_CONFIG="$(snapctl get metrics-config)"
|
SOLANA_METRICS_CONFIG="$(snapctl get metrics-config)"
|
||||||
|
SOLANA_DEFAULT_METRICS_RATE="$(snapctl get default-metrics-rate)"
|
||||||
SOLANA_CUDA="$(snapctl get enable-cuda)"
|
SOLANA_CUDA="$(snapctl get enable-cuda)"
|
||||||
RUST_LOG="$(snapctl get rust-log)"
|
RUST_LOG="$(snapctl get rust-log)"
|
||||||
|
|
||||||
|
@@ -79,7 +79,7 @@ common_start_setup() {
|
|||||||
|
|
||||||
# Killing sshguard for now. TODO: Find a better solution
|
# Killing sshguard for now. TODO: Find a better solution
|
||||||
# sshguard is blacklisting IP address after ssh-keyscan and ssh login attempts
|
# sshguard is blacklisting IP address after ssh-keyscan and ssh login attempts
|
||||||
ssh -n -f "$remote_user@$ip_addr" " \
|
ssh "$remote_user@$ip_addr" " \
|
||||||
set -ex; \
|
set -ex; \
|
||||||
sudo service sshguard stop; \
|
sudo service sshguard stop; \
|
||||||
sudo apt-get --assume-yes install rsync libssl-dev; \
|
sudo apt-get --assume-yes install rsync libssl-dev; \
|
||||||
@@ -92,6 +92,7 @@ common_start_setup() {
|
|||||||
rsync -vPrz "$ssh_keys"/id_rsa "$remote_user@$ip_addr":~/.ssh/
|
rsync -vPrz "$ssh_keys"/id_rsa "$remote_user@$ip_addr":~/.ssh/
|
||||||
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/
|
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/
|
||||||
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/authorized_keys
|
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/authorized_keys
|
||||||
|
rsync -vPrz ./multinode-demo "$remote_user@$ip_addr":~/solana/
|
||||||
} >>log/"$ip_addr".log
|
} >>log/"$ip_addr".log
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
@@ -101,7 +102,6 @@ start_leader() {
|
|||||||
|
|
||||||
{
|
{
|
||||||
rsync -vPrz ~/.cargo/bin/solana* "$remote_user@$ip_addr":~/.cargo/bin/
|
rsync -vPrz ~/.cargo/bin/solana* "$remote_user@$ip_addr":~/.cargo/bin/
|
||||||
rsync -vPrz ./multinode-demo "$remote_user@$ip_addr":~/solana/
|
|
||||||
rsync -vPrz ./fetch-perf-libs.sh "$remote_user@$ip_addr":~/solana/
|
rsync -vPrz ./fetch-perf-libs.sh "$remote_user@$ip_addr":~/solana/
|
||||||
ssh -n -f "$remote_user@$ip_addr" 'cd solana; FORCE=1 ./multinode-demo/remote_leader.sh'
|
ssh -n -f "$remote_user@$ip_addr" 'cd solana; FORCE=1 ./multinode-demo/remote_leader.sh'
|
||||||
} >>log/"$1".log
|
} >>log/"$1".log
|
||||||
@@ -114,7 +114,6 @@ start_leader() {
|
|||||||
start_validator() {
|
start_validator() {
|
||||||
common_start_setup "$1"
|
common_start_setup "$1"
|
||||||
|
|
||||||
ssh "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader_ip"":~/solana/multinode-demo ~/solana/" >>log/"$1".log
|
|
||||||
ssh -n -f "$remote_user@$ip_addr" "cd solana; FORCE=1 ./multinode-demo/remote_validator.sh $leader_ip" >>log/"$1".log
|
ssh -n -f "$remote_user@$ip_addr" "cd solana; FORCE=1 ./multinode-demo/remote_validator.sh $leader_ip" >>log/"$1".log
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,9 +127,6 @@ start_all_nodes() {
|
|||||||
mkdir -p log
|
mkdir -p log
|
||||||
|
|
||||||
for ip_addr in "${ip_addr_array[@]}"; do
|
for ip_addr in "${ip_addr_array[@]}"; do
|
||||||
ssh-keygen -R "$ip_addr" >log/local.log
|
|
||||||
ssh-keyscan "$ip_addr" >>~/.ssh/known_hosts 2>/dev/null
|
|
||||||
|
|
||||||
if ((!count)); then
|
if ((!count)); then
|
||||||
# Start the leader on the first node
|
# Start the leader on the first node
|
||||||
echo "Leader node $ip_addr, killing previous instance and restarting"
|
echo "Leader node $ip_addr, killing previous instance and restarting"
|
||||||
@@ -159,6 +155,9 @@ stop_all_nodes() {
|
|||||||
SECONDS=0
|
SECONDS=0
|
||||||
local count=0
|
local count=0
|
||||||
for ip_addr in "${ip_addr_array[@]}"; do
|
for ip_addr in "${ip_addr_array[@]}"; do
|
||||||
|
ssh-keygen -R "$ip_addr" >log/local.log
|
||||||
|
ssh-keyscan "$ip_addr" >>~/.ssh/known_hosts 2>/dev/null
|
||||||
|
|
||||||
echo "Stopping node[$count] $ip_addr. Remote user $remote_user"
|
echo "Stopping node[$count] $ip_addr. Remote user $remote_user"
|
||||||
|
|
||||||
ssh -n -f "$remote_user@$ip_addr" " \
|
ssh -n -f "$remote_user@$ip_addr" " \
|
||||||
@@ -176,7 +175,7 @@ stop_all_nodes() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if [[ $command == "start" ]]; then
|
if [[ $command == "start" ]]; then
|
||||||
#build_project
|
build_project
|
||||||
stop_all_nodes
|
stop_all_nodes
|
||||||
start_all_nodes
|
start_all_nodes
|
||||||
elif [[ $command == "stop" ]]; then
|
elif [[ $command == "stop" ]]; then
|
||||||
|
@@ -6,7 +6,8 @@ chmod 600 ~/.ssh/authorized_keys ~/.ssh/id_rsa
|
|||||||
|
|
||||||
PATH="$HOME"/.cargo/bin:"$PATH"
|
PATH="$HOME"/.cargo/bin:"$PATH"
|
||||||
|
|
||||||
ssh-keygen -R "$1"
|
touch ~/.ssh/known_hosts
|
||||||
|
ssh-keygen -R "$1" 2>/dev/null
|
||||||
ssh-keyscan "$1" >>~/.ssh/known_hosts 2>/dev/null
|
ssh-keyscan "$1" >>~/.ssh/known_hosts 2>/dev/null
|
||||||
|
|
||||||
rsync -vPrz "$1":~/.cargo/bin/solana* ~/.cargo/bin/
|
rsync -vPrz "$1":~/.cargo/bin/solana* ~/.cargo/bin/
|
||||||
|
@@ -9,7 +9,7 @@ use bincode::serialize;
|
|||||||
use clap::{App, Arg};
|
use clap::{App, Arg};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use solana::crdt::{Crdt, NodeInfo};
|
use solana::crdt::{Crdt, NodeInfo};
|
||||||
use solana::drone::DroneRequest;
|
use solana::drone::{DroneRequest, DRONE_PORT};
|
||||||
use solana::fullnode::Config;
|
use solana::fullnode::Config;
|
||||||
use solana::hash::Hash;
|
use solana::hash::Hash;
|
||||||
use solana::nat::{udp_public_bind, udp_random_bind};
|
use solana::nat::{udp_public_bind, udp_random_bind};
|
||||||
@@ -220,12 +220,13 @@ fn main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut drone_addr = leader.contact_info.tpu;
|
let mut drone_addr = leader.contact_info.tpu;
|
||||||
drone_addr.set_port(9900);
|
drone_addr.set_port(DRONE_PORT);
|
||||||
|
|
||||||
let signal = Arc::new(AtomicBool::new(false));
|
let signal = Arc::new(AtomicBool::new(false));
|
||||||
let mut c_threads = vec![];
|
let mut c_threads = vec![];
|
||||||
let validators = converge(&leader, &signal, num_nodes, &mut c_threads);
|
let validators = converge(&leader, &signal, num_nodes, &mut c_threads);
|
||||||
assert_eq!(validators.len(), num_nodes);
|
println!("Network has {} node(s)", validators.len());
|
||||||
|
assert!(validators.len() >= num_nodes);
|
||||||
|
|
||||||
let mut client = mk_client(&leader);
|
let mut client = mk_client(&leader);
|
||||||
|
|
||||||
@@ -417,6 +418,12 @@ fn converge(
|
|||||||
println!("CONVERGED!");
|
println!("CONVERGED!");
|
||||||
rv.extend(v.into_iter());
|
rv.extend(v.into_iter());
|
||||||
break;
|
break;
|
||||||
|
} else {
|
||||||
|
println!(
|
||||||
|
"{} node(s) discovered (looking for {} or more)",
|
||||||
|
v.len(),
|
||||||
|
num_nodes
|
||||||
|
);
|
||||||
}
|
}
|
||||||
sleep(Duration::new(1, 0));
|
sleep(Duration::new(1, 0));
|
||||||
}
|
}
|
||||||
|
@@ -10,7 +10,7 @@ extern crate tokio_io;
|
|||||||
use bincode::deserialize;
|
use bincode::deserialize;
|
||||||
use clap::{App, Arg};
|
use clap::{App, Arg};
|
||||||
use solana::crdt::NodeInfo;
|
use solana::crdt::NodeInfo;
|
||||||
use solana::drone::{Drone, DroneRequest};
|
use solana::drone::{Drone, DroneRequest, DRONE_PORT};
|
||||||
use solana::fullnode::Config;
|
use solana::fullnode::Config;
|
||||||
use solana::metrics::set_panic_hook;
|
use solana::metrics::set_panic_hook;
|
||||||
use solana::signature::read_keypair;
|
use solana::signature::read_keypair;
|
||||||
@@ -85,7 +85,7 @@ fn main() {
|
|||||||
request_cap = None;
|
request_cap = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
let drone_addr: SocketAddr = "0.0.0.0:9900".parse().unwrap();
|
let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap();
|
||||||
|
|
||||||
let drone = Arc::new(Mutex::new(Drone::new(
|
let drone = Arc::new(Mutex::new(Drone::new(
|
||||||
mint_keypair,
|
mint_keypair,
|
||||||
|
@@ -10,7 +10,7 @@ extern crate solana;
|
|||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
use clap::{App, Arg, SubCommand};
|
use clap::{App, Arg, SubCommand};
|
||||||
use solana::crdt::NodeInfo;
|
use solana::crdt::NodeInfo;
|
||||||
use solana::drone::DroneRequest;
|
use solana::drone::{DroneRequest, DRONE_PORT};
|
||||||
use solana::fullnode::Config;
|
use solana::fullnode::Config;
|
||||||
use solana::signature::{read_keypair, KeyPair, KeyPairUtil, PublicKey, Signature};
|
use solana::signature::{read_keypair, KeyPair, KeyPairUtil, PublicKey, Signature};
|
||||||
use solana::thin_client::ThinClient;
|
use solana::thin_client::ThinClient;
|
||||||
@@ -164,7 +164,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
let mut drone_addr = leader.contact_info.tpu;
|
let mut drone_addr = leader.contact_info.tpu;
|
||||||
drone_addr.set_port(9900);
|
drone_addr.set_port(DRONE_PORT);
|
||||||
|
|
||||||
let command = match matches.subcommand() {
|
let command = match matches.subcommand() {
|
||||||
("airdrop", Some(airdrop_matches)) => {
|
("airdrop", Some(airdrop_matches)) => {
|
||||||
|
169
src/crdt.rs
169
src/crdt.rs
@@ -120,8 +120,6 @@ pub struct ContactInfo {
|
|||||||
pub struct LedgerState {
|
pub struct LedgerState {
|
||||||
/// last verified hash that was submitted to the leader
|
/// last verified hash that was submitted to the leader
|
||||||
pub last_id: Hash,
|
pub last_id: Hash,
|
||||||
/// last verified entry count, always increasing
|
|
||||||
pub entry_height: u64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||||
@@ -166,7 +164,6 @@ impl NodeInfo {
|
|||||||
leader_id: PublicKey::default(),
|
leader_id: PublicKey::default(),
|
||||||
ledger_state: LedgerState {
|
ledger_state: LedgerState {
|
||||||
last_id: Hash::default(),
|
last_id: Hash::default(),
|
||||||
entry_height: 0,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -429,6 +426,7 @@ impl Crdt {
|
|||||||
let _ = self.table.insert(v.id, v.clone());
|
let _ = self.table.insert(v.id, v.clone());
|
||||||
let _ = self.local.insert(v.id, self.update_index);
|
let _ = self.local.insert(v.id, self.update_index);
|
||||||
inc_new_counter!("crdt-update-count", 1);
|
inc_new_counter!("crdt-update-count", 1);
|
||||||
|
self.update_liveness(v.id);
|
||||||
} else {
|
} else {
|
||||||
trace!(
|
trace!(
|
||||||
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
|
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
|
||||||
@@ -438,7 +436,6 @@ impl Crdt {
|
|||||||
self.table[&v.id].version
|
self.table[&v.id].version
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
self.update_liveness(v.id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_liveness(&mut self, id: PublicKey) {
|
fn update_liveness(&mut self, id: PublicKey) {
|
||||||
@@ -458,35 +455,20 @@ impl Crdt {
|
|||||||
/// challenging part is that we are on a permissionless network
|
/// challenging part is that we are on a permissionless network
|
||||||
pub fn purge(&mut self, now: u64) {
|
pub fn purge(&mut self, now: u64) {
|
||||||
if self.table.len() <= MIN_TABLE_SIZE {
|
if self.table.len() <= MIN_TABLE_SIZE {
|
||||||
|
trace!("purge: skipped: table too small: {}", self.table.len());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if self.leader_data().is_none() {
|
if self.leader_data().is_none() {
|
||||||
|
trace!("purge: skipped: no leader_data");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let leader_id = self.leader_data().unwrap().id;
|
let leader_id = self.leader_data().unwrap().id;
|
||||||
|
|
||||||
let limit = GOSSIP_PURGE_MILLIS;
|
let limit = GOSSIP_PURGE_MILLIS;
|
||||||
let dead_ids: Vec<PublicKey> = self.alive
|
let dead_ids: Vec<PublicKey> = self.alive
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(&k, v)| {
|
.filter_map(|(&k, v)| {
|
||||||
if k != self.me && (now - v) > limit {
|
if k != self.me && (now - v) > limit {
|
||||||
if leader_id == k {
|
|
||||||
info!(
|
|
||||||
"{:x}: PURGE LEADER {:x} {}",
|
|
||||||
self.debug_id(),
|
|
||||||
make_debug_id(&k),
|
|
||||||
now - v
|
|
||||||
);
|
|
||||||
Some(k)
|
Some(k)
|
||||||
} else {
|
|
||||||
info!(
|
|
||||||
"{:x}: PURGE {:x} {}",
|
|
||||||
self.debug_id(),
|
|
||||||
make_debug_id(&k),
|
|
||||||
now - v
|
|
||||||
);
|
|
||||||
Some(k)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
trace!(
|
trace!(
|
||||||
"{:x} purge skipped {:x} {} {}",
|
"{:x} purge skipped {:x} {} {}",
|
||||||
@@ -508,9 +490,19 @@ impl Crdt {
|
|||||||
self.remote.remove(id);
|
self.remote.remove(id);
|
||||||
self.local.remove(id);
|
self.local.remove(id);
|
||||||
self.external_liveness.remove(id);
|
self.external_liveness.remove(id);
|
||||||
|
info!("{:x}: PURGE {:x}", self.debug_id(), make_debug_id(id));
|
||||||
for map in self.external_liveness.values_mut() {
|
for map in self.external_liveness.values_mut() {
|
||||||
map.remove(id);
|
map.remove(id);
|
||||||
}
|
}
|
||||||
|
if *id == leader_id {
|
||||||
|
info!(
|
||||||
|
"{:x}: PURGE LEADER {:x}",
|
||||||
|
self.debug_id(),
|
||||||
|
make_debug_id(id),
|
||||||
|
);
|
||||||
|
inc_new_counter!("crdt-purge-purged_leader", 1, 1);
|
||||||
|
self.set_leader(PublicKey::default());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -777,12 +769,11 @@ impl Crdt {
|
|||||||
Ok((v.contact_info.ncp, req))
|
Ok((v.contact_info.ncp, req))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_vote(&mut self, height: u64, last_id: Hash) -> Result<(Vote, SocketAddr)> {
|
pub fn new_vote(&mut self, last_id: Hash) -> Result<(Vote, SocketAddr)> {
|
||||||
let mut me = self.my_data().clone();
|
let mut me = self.my_data().clone();
|
||||||
let leader = self.leader_data().ok_or(CrdtError::NoLeader)?.clone();
|
let leader = self.leader_data().ok_or(CrdtError::NoLeader)?.clone();
|
||||||
me.version += 1;
|
me.version += 1;
|
||||||
me.ledger_state.last_id = last_id;
|
me.ledger_state.last_id = last_id;
|
||||||
me.ledger_state.entry_height = height;
|
|
||||||
let vote = Vote {
|
let vote = Vote {
|
||||||
version: me.version,
|
version: me.version,
|
||||||
contact_info_version: me.contact_info.version,
|
contact_info_version: me.contact_info.version,
|
||||||
@@ -993,11 +984,35 @@ impl Crdt {
|
|||||||
blob: &Blob,
|
blob: &Blob,
|
||||||
) -> Option<SharedBlob> {
|
) -> Option<SharedBlob> {
|
||||||
match deserialize(&blob.data[..blob.meta.size]) {
|
match deserialize(&blob.data[..blob.meta.size]) {
|
||||||
|
Ok(request) => Crdt::handle_protocol(request, obj, window, blob_recycler),
|
||||||
|
Err(_) => {
|
||||||
|
warn!("deserialize crdt packet failed");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_protocol(
|
||||||
|
request: Protocol,
|
||||||
|
obj: &Arc<RwLock<Self>>,
|
||||||
|
window: &Window,
|
||||||
|
blob_recycler: &BlobRecycler,
|
||||||
|
) -> Option<SharedBlob> {
|
||||||
|
match request {
|
||||||
// TODO sigverify these
|
// TODO sigverify these
|
||||||
Ok(Protocol::RequestUpdates(v, from_rd)) => {
|
Protocol::RequestUpdates(v, from_rd) => {
|
||||||
trace!("RequestUpdates {}", v);
|
|
||||||
let addr = from_rd.contact_info.ncp;
|
let addr = from_rd.contact_info.ncp;
|
||||||
|
trace!("RequestUpdates {} from {}", v, addr);
|
||||||
let me = obj.read().unwrap();
|
let me = obj.read().unwrap();
|
||||||
|
if addr == me.table[&me.me].contact_info.ncp {
|
||||||
|
warn!(
|
||||||
|
"RequestUpdates ignored, I'm talking to myself: me={:x} remoteme={:x}",
|
||||||
|
me.debug_id(),
|
||||||
|
make_debug_id(&from_rd.id)
|
||||||
|
);
|
||||||
|
inc_new_counter!("crdt-window-request-loopback", 1);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
|
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
|
||||||
let (from, ups, data) = me.get_updates_since(v);
|
let (from, ups, data) = me.get_updates_since(v);
|
||||||
let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect();
|
let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect();
|
||||||
@@ -1005,7 +1020,11 @@ impl Crdt {
|
|||||||
trace!("get updates since response {} {}", v, data.len());
|
trace!("get updates since response {} {}", v, data.len());
|
||||||
let len = data.len();
|
let len = data.len();
|
||||||
let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness);
|
let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness);
|
||||||
obj.write().unwrap().insert(&from_rd);
|
{
|
||||||
|
let mut me = obj.write().unwrap();
|
||||||
|
me.insert(&from_rd);
|
||||||
|
me.update_liveness(from_rd.id);
|
||||||
|
}
|
||||||
if len < 1 {
|
if len < 1 {
|
||||||
let me = obj.read().unwrap();
|
let me = obj.read().unwrap();
|
||||||
trace!(
|
trace!(
|
||||||
@@ -1029,19 +1048,19 @@ impl Crdt {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Protocol::ReceiveUpdates(from, ups, data, external_liveness)) => {
|
Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => {
|
||||||
trace!(
|
trace!(
|
||||||
"ReceivedUpdates {:x} {} {}",
|
"ReceivedUpdates from={:x} update_index={} len={}",
|
||||||
make_debug_id(&from),
|
make_debug_id(&from),
|
||||||
ups,
|
update_index,
|
||||||
data.len()
|
data.len()
|
||||||
);
|
);
|
||||||
obj.write()
|
obj.write()
|
||||||
.expect("'obj' write lock in ReceiveUpdates")
|
.expect("'obj' write lock in ReceiveUpdates")
|
||||||
.apply_updates(from, ups, &data, &external_liveness);
|
.apply_updates(from, update_index, &data, &external_liveness);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
Ok(Protocol::RequestWindowIndex(from, ix)) => {
|
Protocol::RequestWindowIndex(from, ix) => {
|
||||||
//TODO this doesn't depend on CRDT module, can be moved
|
//TODO this doesn't depend on CRDT module, can be moved
|
||||||
//but we are using the listen thread to service these request
|
//but we are using the listen thread to service these request
|
||||||
//TODO verify from is signed
|
//TODO verify from is signed
|
||||||
@@ -1066,10 +1085,6 @@ impl Crdt {
|
|||||||
}
|
}
|
||||||
Self::run_window_request(&window, &me, &from, ix, blob_recycler)
|
Self::run_window_request(&window, &me, &from, ix, blob_recycler)
|
||||||
}
|
}
|
||||||
Err(_) => {
|
|
||||||
warn!("deserialize crdt packet failed");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1236,14 +1251,14 @@ impl TestNode {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crdt::{
|
use crdt::{
|
||||||
parse_port_or_addr, Crdt, CrdtError, NodeInfo, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS,
|
parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS,
|
||||||
MIN_TABLE_SIZE,
|
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
|
||||||
};
|
};
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use logger;
|
use logger;
|
||||||
use packet::BlobRecycler;
|
use packet::BlobRecycler;
|
||||||
use result::Error;
|
use result::Error;
|
||||||
use signature::{KeyPair, KeyPairUtil};
|
use signature::{KeyPair, KeyPairUtil, PublicKey};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
@@ -1375,12 +1390,25 @@ mod tests {
|
|||||||
assert_eq!(d.version, 0);
|
assert_eq!(d.version, 0);
|
||||||
let mut crdt = Crdt::new(d.clone()).unwrap();
|
let mut crdt = Crdt::new(d.clone()).unwrap();
|
||||||
assert_eq!(crdt.table[&d.id].version, 0);
|
assert_eq!(crdt.table[&d.id].version, 0);
|
||||||
|
assert!(!crdt.alive.contains_key(&d.id));
|
||||||
|
|
||||||
d.version = 2;
|
d.version = 2;
|
||||||
crdt.insert(&d);
|
crdt.insert(&d);
|
||||||
|
let liveness = crdt.alive[&d.id];
|
||||||
assert_eq!(crdt.table[&d.id].version, 2);
|
assert_eq!(crdt.table[&d.id].version, 2);
|
||||||
|
|
||||||
d.version = 1;
|
d.version = 1;
|
||||||
crdt.insert(&d);
|
crdt.insert(&d);
|
||||||
assert_eq!(crdt.table[&d.id].version, 2);
|
assert_eq!(crdt.table[&d.id].version, 2);
|
||||||
|
assert_eq!(liveness, crdt.alive[&d.id]);
|
||||||
|
|
||||||
|
// Ensure liveness will be updated for version 3
|
||||||
|
sleep(Duration::from_millis(1));
|
||||||
|
|
||||||
|
d.version = 3;
|
||||||
|
crdt.insert(&d);
|
||||||
|
assert_eq!(crdt.table[&d.id].version, 3);
|
||||||
|
assert!(liveness < crdt.alive[&d.id]);
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn test_new_vote() {
|
fn test_new_vote() {
|
||||||
@@ -1391,12 +1419,12 @@ mod tests {
|
|||||||
let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
|
let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
|
||||||
assert_ne!(d.id, leader.id);
|
assert_ne!(d.id, leader.id);
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
crdt.new_vote(0, Hash::default()).err(),
|
crdt.new_vote(Hash::default()).err(),
|
||||||
Some(Error::CrdtError(CrdtError::NoLeader))
|
Some(Error::CrdtError(CrdtError::NoLeader))
|
||||||
);
|
);
|
||||||
crdt.insert(&leader);
|
crdt.insert(&leader);
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
crdt.new_vote(0, Hash::default()).err(),
|
crdt.new_vote(Hash::default()).err(),
|
||||||
Some(Error::CrdtError(CrdtError::NoLeader))
|
Some(Error::CrdtError(CrdtError::NoLeader))
|
||||||
);
|
);
|
||||||
crdt.set_leader(leader.id);
|
crdt.set_leader(leader.id);
|
||||||
@@ -1406,7 +1434,7 @@ mod tests {
|
|||||||
contact_info_version: 0,
|
contact_info_version: 0,
|
||||||
};
|
};
|
||||||
let expected = (v, crdt.table[&leader.id].contact_info.tpu);
|
let expected = (v, crdt.table[&leader.id].contact_info.tpu);
|
||||||
assert_eq!(crdt.new_vote(0, Hash::default()).unwrap(), expected);
|
assert_eq!(crdt.new_vote(Hash::default()).unwrap(), expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -1740,6 +1768,28 @@ mod tests {
|
|||||||
let rv = crdt.gossip_request().unwrap();
|
let rv = crdt.gossip_request().unwrap();
|
||||||
assert_eq!(rv.0, nxt.contact_info.ncp);
|
assert_eq!(rv.0, nxt.contact_info.ncp);
|
||||||
}
|
}
|
||||||
|
#[test]
|
||||||
|
fn purge_leader_test() {
|
||||||
|
logger::setup();
|
||||||
|
let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||||
|
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
|
||||||
|
let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
|
||||||
|
assert_ne!(me.id, nxt.id);
|
||||||
|
crdt.insert(&nxt);
|
||||||
|
crdt.set_leader(nxt.id);
|
||||||
|
let now = crdt.alive[&nxt.id];
|
||||||
|
let mut nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
|
||||||
|
crdt.insert(&nxt2);
|
||||||
|
while now == crdt.alive[&nxt2.id] {
|
||||||
|
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
|
||||||
|
nxt2.version = nxt2.version + 1;
|
||||||
|
crdt.insert(&nxt2);
|
||||||
|
}
|
||||||
|
let len = crdt.table.len() as u64;
|
||||||
|
crdt.purge(now + GOSSIP_PURGE_MILLIS + 1);
|
||||||
|
assert_eq!(len as usize - 1, crdt.table.len());
|
||||||
|
assert_eq!(crdt.my_data().leader_id, PublicKey::default());
|
||||||
|
}
|
||||||
|
|
||||||
/// test window requests respond with the right blob, and do not overrun
|
/// test window requests respond with the right blob, and do not overrun
|
||||||
#[test]
|
#[test]
|
||||||
@@ -1831,4 +1881,41 @@ mod tests {
|
|||||||
crdt.update_leader();
|
crdt.update_leader();
|
||||||
assert_eq!(crdt.my_data().leader_id, leader1.id);
|
assert_eq!(crdt.my_data().leader_id, leader1.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Validates the node that sent Protocol::ReceiveUpdates gets its
|
||||||
|
/// liveness updated, but not if the node sends Protocol::ReceiveUpdates
|
||||||
|
/// to itself.
|
||||||
|
#[test]
|
||||||
|
fn protocol_requestupdate_alive() {
|
||||||
|
logger::setup();
|
||||||
|
let window = default_window();
|
||||||
|
let recycler = BlobRecycler::default();
|
||||||
|
|
||||||
|
let node = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||||
|
let node_with_same_addr = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
|
||||||
|
assert_ne!(node.id, node_with_same_addr.id);
|
||||||
|
let node_with_diff_addr = NodeInfo::new_leader(&"127.0.0.1:4321".parse().unwrap());
|
||||||
|
|
||||||
|
let crdt = Crdt::new(node.clone()).expect("Crdt::new");
|
||||||
|
assert_eq!(crdt.alive.len(), 0);
|
||||||
|
|
||||||
|
let obj = Arc::new(RwLock::new(crdt));
|
||||||
|
|
||||||
|
let request = Protocol::RequestUpdates(1, node.clone());
|
||||||
|
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
|
||||||
|
|
||||||
|
let request = Protocol::RequestUpdates(1, node_with_same_addr.clone());
|
||||||
|
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
|
||||||
|
|
||||||
|
let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone());
|
||||||
|
Crdt::handle_protocol(request, &obj, &window, &recycler);
|
||||||
|
|
||||||
|
let me = obj.write().unwrap();
|
||||||
|
|
||||||
|
// |node| and |node_with_same_addr| should not be in me.alive, but
|
||||||
|
// |node_with_diff_addr| should now be.
|
||||||
|
assert!(!me.alive.contains_key(&node.id));
|
||||||
|
assert!(!me.alive.contains_key(&node_with_same_addr.id));
|
||||||
|
assert!(me.alive[&node_with_diff_addr.id] > 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -16,6 +16,7 @@ use transaction::Transaction;
|
|||||||
|
|
||||||
pub const TIME_SLICE: u64 = 60;
|
pub const TIME_SLICE: u64 = 60;
|
||||||
pub const REQUEST_CAP: u64 = 1_000_000;
|
pub const REQUEST_CAP: u64 = 1_000_000;
|
||||||
|
pub const DRONE_PORT: u16 = 9900;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||||
pub enum DroneRequest {
|
pub enum DroneRequest {
|
||||||
|
@@ -20,7 +20,8 @@ impl<'a, W: Write> EntryWriter<'a, W> {
|
|||||||
|
|
||||||
fn write_entry(writer: &mut W, entry: &Entry) -> io::Result<()> {
|
fn write_entry(writer: &mut W, entry: &Entry) -> io::Result<()> {
|
||||||
let serialized = serde_json::to_string(entry).unwrap();
|
let serialized = serde_json::to_string(entry).unwrap();
|
||||||
writeln!(writer, "{}", serialized)
|
writeln!(writer, "{}", serialized)?;
|
||||||
|
writer.flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_entries<I>(writer: &mut W, entries: I) -> io::Result<()>
|
pub fn write_entries<I>(writer: &mut W, entries: I) -> io::Result<()>
|
||||||
|
@@ -212,6 +212,7 @@ impl Packets {
|
|||||||
Ok((nrecv, from)) => {
|
Ok((nrecv, from)) => {
|
||||||
p.meta.size = nrecv;
|
p.meta.size = nrecv;
|
||||||
p.meta.set_addr(&from);
|
p.meta.set_addr(&from);
|
||||||
|
trace!("got {} bytes from {}", nrecv, from);
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
socket.set_nonblocking(true)?;
|
socket.set_nonblocking(true)?;
|
||||||
}
|
}
|
||||||
@@ -405,6 +406,7 @@ impl Blob {
|
|||||||
Ok((nrecv, from)) => {
|
Ok((nrecv, from)) => {
|
||||||
p.meta.size = nrecv;
|
p.meta.size = nrecv;
|
||||||
p.meta.set_addr(&from);
|
p.meta.set_addr(&from);
|
||||||
|
trace!("got {} bytes from {}", nrecv, from);
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
socket.set_nonblocking(true)?;
|
socket.set_nonblocking(true)?;
|
||||||
}
|
}
|
||||||
|
@@ -60,16 +60,16 @@ impl ReplicateStage {
|
|||||||
if res.is_err() {
|
if res.is_err() {
|
||||||
error!("process_entries {} {:?}", blobs_len, res);
|
error!("process_entries {} {:?}", blobs_len, res);
|
||||||
}
|
}
|
||||||
|
let _ = res?;
|
||||||
let now = timing::timestamp();
|
let now = timing::timestamp();
|
||||||
if now - *last_vote > VOTE_TIMEOUT_MS {
|
if now - *last_vote > VOTE_TIMEOUT_MS {
|
||||||
let height = res?;
|
|
||||||
let last_id = bank.last_id();
|
let last_id = bank.last_id();
|
||||||
let shared_blob = blob_recycler.allocate();
|
let shared_blob = blob_recycler.allocate();
|
||||||
let (vote, addr) = {
|
let (vote, addr) = {
|
||||||
let mut wcrdt = crdt.write().unwrap();
|
let mut wcrdt = crdt.write().unwrap();
|
||||||
//TODO: doesn't seem like there is a synchronous call to get height and id
|
//TODO: doesn't seem like there is a synchronous call to get height and id
|
||||||
info!("replicate_stage {} {:?}", height, &last_id[..8]);
|
info!("replicate_stage {:?}", &last_id[..8]);
|
||||||
wcrdt.new_vote(height, last_id)
|
wcrdt.new_vote(last_id)
|
||||||
}?;
|
}?;
|
||||||
{
|
{
|
||||||
let mut blob = shared_blob.write().unwrap();
|
let mut blob = shared_blob.write().unwrap();
|
||||||
|
@@ -126,7 +126,7 @@ pub fn responder(
|
|||||||
//TODO, we would need to stick block authentication before we create the
|
//TODO, we would need to stick block authentication before we create the
|
||||||
//window.
|
//window.
|
||||||
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
|
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
|
||||||
trace!("receiving on {}", sock.local_addr().unwrap());
|
trace!("recv_blobs: receiving on {}", sock.local_addr().unwrap());
|
||||||
let dq = Blob::recv_from(recycler, sock)?;
|
let dq = Blob::recv_from(recycler, sock)?;
|
||||||
if !dq.is_empty() {
|
if !dq.is_empty() {
|
||||||
s.send(dq)?;
|
s.send(dq)?;
|
||||||
|
Reference in New Issue
Block a user