Compare commits

..

13 Commits

Author SHA1 Message Date
Rob Walker
0696f9f497 flush writer, makes partial deserialization a bit less likely 2018-07-18 22:53:33 -07:00
Michael Vines
b2ea2455e2 Disable rolling updates 2018-07-18 21:16:03 -07:00
Michael Vines
3f659a69fd Prevent nodes from gossiping with themselves with different ids 2018-07-18 19:38:38 -07:00
anatoly yakovenko
2c62be951f boot invalid height (#688) 2018-07-18 18:10:53 -07:00
Stephen Akridge
2348733d6c remove drone port magic number 2018-07-19 02:01:23 +02:00
pgarg66
cc229b535d Remote multinode script cleanup (#683)
- Create a known_hosts file if it doesn't exist
  Otherwise ssh-keygen exits
- Move some common rsync code to common_start_setup
- Build the project before deploying it
2018-07-18 16:02:05 -07:00
anatoly yakovenko
7f810a29ff Purge leader (#687)
* purge leader

* fixup!

* fixup!
2018-07-18 14:39:43 -07:00
Michael Vines
fc1dfd86d2 Disable coverage again :-/ 2018-07-18 12:54:50 -07:00
Michael Vines
5deb34e5bd Little more trace! logging 2018-07-18 12:54:50 -07:00
Michael Vines
39df087902 Permit more than the requested amount of nodes 2018-07-18 12:07:50 -07:00
Greg Fitzgerald
6ff46540b6 Install llvm-cov on nightly to revive coverage
Towards #433
2018-07-18 12:52:13 -04:00
Michael Vines
dbab8792e4 Use real default value 2018-07-18 08:23:59 -07:00
Michael Vines
4eb676afaa Tunnel SOLANA_DEFAULT_METRICS_RATE into Snap nodes 2018-07-18 08:23:59 -07:00
15 changed files with 200 additions and 68 deletions

View File

@@ -1,3 +1,6 @@
FROM rustlang/rust:nightly
RUN cargo install --force clippy cargo-cov
RUN cargo install --force clippy cargo-cov && \
echo deb http://ftp.debian.org/debian stretch-backports main >> /etc/apt/sources.list && \
apt update && \
apt install -y llvm-6.0

View File

@@ -27,6 +27,6 @@ ls -l target/cov/report/index.html
if [[ -z "$CODECOV_TOKEN" ]]; then
echo CODECOV_TOKEN undefined
else
bash <(curl -s https://codecov.io/bash) -x 'llvm-cov gcov'
bash <(curl -s https://codecov.io/bash) -x 'llvm-cov-6.0 gcov'
fi

View File

@@ -5,9 +5,12 @@
# This script must be run by a user/machine that has successfully authenticated
# with GCP and has sufficient permission.
#
cd "$(dirname "$0")/.."
# TODO: Switch over to rolling updates
ROLLING_UPDATE=false
#ROLLING_UPDATE=true
if [[ -z $SOLANA_METRICS_CONFIG ]]; then
echo Error: SOLANA_METRICS_CONFIG environment variable is unset
exit 1
@@ -26,7 +29,7 @@ edge)
;;
beta)
publicUrl=testnet.solana.com
publicIp=# # Use default value
publicIp="" # Use default value
;;
*)
echo Error: Unknown SOLANA_SNAP_CHANNEL=$SOLANA_SNAP_CHANNEL
@@ -61,6 +64,28 @@ wait_for_node() {
fi
}
if ! $ROLLING_UPDATE; then
count=1
for info in "${vmlist[@]}"; do
nodePosition="($count/${#vmlist[*]})"
vmName=${info%:*}
vmZone=${info#*:}
echo "--- Shutting down $vmName in zone $vmZone $nodePosition"
gcloud compute ssh "$vmName" --zone "$vmZone" \
--ssh-flag="-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" \
--command="echo sudo snap remove solana" &
if [[ $((count % 10)) = 0 ]]; then
# Slow down deployment to avoid triggering GCP login
# quota limits (each |ssh| counts as a login)
sleep 3
fi
count=$((count + 1))
done
wait
fi
echo "--- Refreshing leader for $publicUrl"
leader=true
@@ -76,13 +101,18 @@ for info in "${vmlist[@]}"; do
(
SECONDS=0
echo "--- $vmName in zone $vmZone $nodePosition"
commonNodeConfig="\
rust-log=$RUST_LOG \
default-metrics-rate=$SOLANA_DEFAULT_METRICS_RATE \
metrics-config=$SOLANA_METRICS_CONFIG \
"
if $leader; then
nodeConfig="mode=leader+drone metrics-config=$SOLANA_METRICS_CONFIG"
nodeConfig="mode=leader+drone $commonNodeConfig"
if [[ -n $SOLANA_CUDA ]]; then
nodeConfig="$nodeConfig enable-cuda=1"
fi
else
nodeConfig="mode=validator metrics-config=$SOLANA_METRICS_CONFIG leader-address=$publicIp"
nodeConfig="mode=validator leader-address=$publicIp $commonNodeConfig"
fi
set -x

View File

@@ -35,6 +35,7 @@ if [[ -d "$SNAP" ]]; then # Running inside a Linux Snap?
mkdir -p "$SNAP_DATA"/{drone,leader,validator}
SOLANA_METRICS_CONFIG="$(snapctl get metrics-config)"
SOLANA_DEFAULT_METRICS_RATE="$(snapctl get default-metrics-rate)"
SOLANA_CUDA="$(snapctl get enable-cuda)"
RUST_LOG="$(snapctl get rust-log)"

View File

@@ -79,7 +79,7 @@ common_start_setup() {
# Killing sshguard for now. TODO: Find a better solution
# sshguard is blacklisting IP address after ssh-keyscan and ssh login attempts
ssh -n -f "$remote_user@$ip_addr" " \
ssh "$remote_user@$ip_addr" " \
set -ex; \
sudo service sshguard stop; \
sudo apt-get --assume-yes install rsync libssl-dev; \
@@ -92,6 +92,7 @@ common_start_setup() {
rsync -vPrz "$ssh_keys"/id_rsa "$remote_user@$ip_addr":~/.ssh/
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/
rsync -vPrz "$ssh_keys"/id_rsa.pub "$remote_user@$ip_addr":~/.ssh/authorized_keys
rsync -vPrz ./multinode-demo "$remote_user@$ip_addr":~/solana/
} >>log/"$ip_addr".log
fi
}
@@ -101,7 +102,6 @@ start_leader() {
{
rsync -vPrz ~/.cargo/bin/solana* "$remote_user@$ip_addr":~/.cargo/bin/
rsync -vPrz ./multinode-demo "$remote_user@$ip_addr":~/solana/
rsync -vPrz ./fetch-perf-libs.sh "$remote_user@$ip_addr":~/solana/
ssh -n -f "$remote_user@$ip_addr" 'cd solana; FORCE=1 ./multinode-demo/remote_leader.sh'
} >>log/"$1".log
@@ -114,7 +114,6 @@ start_leader() {
start_validator() {
common_start_setup "$1"
ssh "$remote_user@$ip_addr" "rsync -vPrz ""$remote_user@$leader_ip"":~/solana/multinode-demo ~/solana/" >>log/"$1".log
ssh -n -f "$remote_user@$ip_addr" "cd solana; FORCE=1 ./multinode-demo/remote_validator.sh $leader_ip" >>log/"$1".log
}
@@ -128,9 +127,6 @@ start_all_nodes() {
mkdir -p log
for ip_addr in "${ip_addr_array[@]}"; do
ssh-keygen -R "$ip_addr" >log/local.log
ssh-keyscan "$ip_addr" >>~/.ssh/known_hosts 2>/dev/null
if ((!count)); then
# Start the leader on the first node
echo "Leader node $ip_addr, killing previous instance and restarting"
@@ -159,6 +155,9 @@ stop_all_nodes() {
SECONDS=0
local count=0
for ip_addr in "${ip_addr_array[@]}"; do
ssh-keygen -R "$ip_addr" >log/local.log
ssh-keyscan "$ip_addr" >>~/.ssh/known_hosts 2>/dev/null
echo "Stopping node[$count] $ip_addr. Remote user $remote_user"
ssh -n -f "$remote_user@$ip_addr" " \
@@ -176,7 +175,7 @@ stop_all_nodes() {
}
if [[ $command == "start" ]]; then
#build_project
build_project
stop_all_nodes
start_all_nodes
elif [[ $command == "stop" ]]; then

View File

@@ -6,7 +6,8 @@ chmod 600 ~/.ssh/authorized_keys ~/.ssh/id_rsa
PATH="$HOME"/.cargo/bin:"$PATH"
ssh-keygen -R "$1"
touch ~/.ssh/known_hosts
ssh-keygen -R "$1" 2>/dev/null
ssh-keyscan "$1" >>~/.ssh/known_hosts 2>/dev/null
rsync -vPrz "$1":~/.cargo/bin/solana* ~/.cargo/bin/

View File

@@ -9,7 +9,7 @@ use bincode::serialize;
use clap::{App, Arg};
use rayon::prelude::*;
use solana::crdt::{Crdt, NodeInfo};
use solana::drone::DroneRequest;
use solana::drone::{DroneRequest, DRONE_PORT};
use solana::fullnode::Config;
use solana::hash::Hash;
use solana::nat::{udp_public_bind, udp_random_bind};
@@ -220,12 +220,13 @@ fn main() {
}
let mut drone_addr = leader.contact_info.tpu;
drone_addr.set_port(9900);
drone_addr.set_port(DRONE_PORT);
let signal = Arc::new(AtomicBool::new(false));
let mut c_threads = vec![];
let validators = converge(&leader, &signal, num_nodes, &mut c_threads);
assert_eq!(validators.len(), num_nodes);
println!("Network has {} node(s)", validators.len());
assert!(validators.len() >= num_nodes);
let mut client = mk_client(&leader);
@@ -417,6 +418,12 @@ fn converge(
println!("CONVERGED!");
rv.extend(v.into_iter());
break;
} else {
println!(
"{} node(s) discovered (looking for {} or more)",
v.len(),
num_nodes
);
}
sleep(Duration::new(1, 0));
}

View File

@@ -10,7 +10,7 @@ extern crate tokio_io;
use bincode::deserialize;
use clap::{App, Arg};
use solana::crdt::NodeInfo;
use solana::drone::{Drone, DroneRequest};
use solana::drone::{Drone, DroneRequest, DRONE_PORT};
use solana::fullnode::Config;
use solana::metrics::set_panic_hook;
use solana::signature::read_keypair;
@@ -85,7 +85,7 @@ fn main() {
request_cap = None;
}
let drone_addr: SocketAddr = "0.0.0.0:9900".parse().unwrap();
let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap();
let drone = Arc::new(Mutex::new(Drone::new(
mint_keypair,

View File

@@ -10,7 +10,7 @@ extern crate solana;
use bincode::serialize;
use clap::{App, Arg, SubCommand};
use solana::crdt::NodeInfo;
use solana::drone::DroneRequest;
use solana::drone::{DroneRequest, DRONE_PORT};
use solana::fullnode::Config;
use solana::signature::{read_keypair, KeyPair, KeyPairUtil, PublicKey, Signature};
use solana::thin_client::ThinClient;
@@ -164,7 +164,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
})?;
let mut drone_addr = leader.contact_info.tpu;
drone_addr.set_port(9900);
drone_addr.set_port(DRONE_PORT);
let command = match matches.subcommand() {
("airdrop", Some(airdrop_matches)) => {

View File

@@ -120,8 +120,6 @@ pub struct ContactInfo {
pub struct LedgerState {
/// last verified hash that was submitted to the leader
pub last_id: Hash,
/// last verified entry count, always increasing
pub entry_height: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@@ -166,7 +164,6 @@ impl NodeInfo {
leader_id: PublicKey::default(),
ledger_state: LedgerState {
last_id: Hash::default(),
entry_height: 0,
},
}
}
@@ -429,6 +426,7 @@ impl Crdt {
let _ = self.table.insert(v.id, v.clone());
let _ = self.local.insert(v.id, self.update_index);
inc_new_counter!("crdt-update-count", 1);
self.update_liveness(v.id);
} else {
trace!(
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
@@ -438,7 +436,6 @@ impl Crdt {
self.table[&v.id].version
);
}
self.update_liveness(v.id);
}
fn update_liveness(&mut self, id: PublicKey) {
@@ -458,35 +455,20 @@ impl Crdt {
/// challenging part is that we are on a permissionless network
pub fn purge(&mut self, now: u64) {
if self.table.len() <= MIN_TABLE_SIZE {
trace!("purge: skipped: table too small: {}", self.table.len());
return;
}
if self.leader_data().is_none() {
trace!("purge: skipped: no leader_data");
return;
}
let leader_id = self.leader_data().unwrap().id;
let limit = GOSSIP_PURGE_MILLIS;
let dead_ids: Vec<PublicKey> = self.alive
.iter()
.filter_map(|(&k, v)| {
if k != self.me && (now - v) > limit {
if leader_id == k {
info!(
"{:x}: PURGE LEADER {:x} {}",
self.debug_id(),
make_debug_id(&k),
now - v
);
Some(k)
} else {
info!(
"{:x}: PURGE {:x} {}",
self.debug_id(),
make_debug_id(&k),
now - v
);
Some(k)
}
Some(k)
} else {
trace!(
"{:x} purge skipped {:x} {} {}",
@@ -508,9 +490,19 @@ impl Crdt {
self.remote.remove(id);
self.local.remove(id);
self.external_liveness.remove(id);
info!("{:x}: PURGE {:x}", self.debug_id(), make_debug_id(id));
for map in self.external_liveness.values_mut() {
map.remove(id);
}
if *id == leader_id {
info!(
"{:x}: PURGE LEADER {:x}",
self.debug_id(),
make_debug_id(id),
);
inc_new_counter!("crdt-purge-purged_leader", 1, 1);
self.set_leader(PublicKey::default());
}
}
}
@@ -777,12 +769,11 @@ impl Crdt {
Ok((v.contact_info.ncp, req))
}
pub fn new_vote(&mut self, height: u64, last_id: Hash) -> Result<(Vote, SocketAddr)> {
pub fn new_vote(&mut self, last_id: Hash) -> Result<(Vote, SocketAddr)> {
let mut me = self.my_data().clone();
let leader = self.leader_data().ok_or(CrdtError::NoLeader)?.clone();
me.version += 1;
me.ledger_state.last_id = last_id;
me.ledger_state.entry_height = height;
let vote = Vote {
version: me.version,
contact_info_version: me.contact_info.version,
@@ -993,11 +984,35 @@ impl Crdt {
blob: &Blob,
) -> Option<SharedBlob> {
match deserialize(&blob.data[..blob.meta.size]) {
Ok(request) => Crdt::handle_protocol(request, obj, window, blob_recycler),
Err(_) => {
warn!("deserialize crdt packet failed");
None
}
}
}
fn handle_protocol(
request: Protocol,
obj: &Arc<RwLock<Self>>,
window: &Window,
blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> {
match request {
// TODO sigverify these
Ok(Protocol::RequestUpdates(v, from_rd)) => {
trace!("RequestUpdates {}", v);
Protocol::RequestUpdates(v, from_rd) => {
let addr = from_rd.contact_info.ncp;
trace!("RequestUpdates {} from {}", v, addr);
let me = obj.read().unwrap();
if addr == me.table[&me.me].contact_info.ncp {
warn!(
"RequestUpdates ignored, I'm talking to myself: me={:x} remoteme={:x}",
me.debug_id(),
make_debug_id(&from_rd.id)
);
inc_new_counter!("crdt-window-request-loopback", 1);
return None;
}
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = me.get_updates_since(v);
let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect();
@@ -1005,7 +1020,11 @@ impl Crdt {
trace!("get updates since response {} {}", v, data.len());
let len = data.len();
let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness);
obj.write().unwrap().insert(&from_rd);
{
let mut me = obj.write().unwrap();
me.insert(&from_rd);
me.update_liveness(from_rd.id);
}
if len < 1 {
let me = obj.read().unwrap();
trace!(
@@ -1029,19 +1048,19 @@ impl Crdt {
None
}
}
Ok(Protocol::ReceiveUpdates(from, ups, data, external_liveness)) => {
Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => {
trace!(
"ReceivedUpdates {:x} {} {}",
"ReceivedUpdates from={:x} update_index={} len={}",
make_debug_id(&from),
ups,
update_index,
data.len()
);
obj.write()
.expect("'obj' write lock in ReceiveUpdates")
.apply_updates(from, ups, &data, &external_liveness);
.apply_updates(from, update_index, &data, &external_liveness);
None
}
Ok(Protocol::RequestWindowIndex(from, ix)) => {
Protocol::RequestWindowIndex(from, ix) => {
//TODO this doesn't depend on CRDT module, can be moved
//but we are using the listen thread to service these request
//TODO verify from is signed
@@ -1066,10 +1085,6 @@ impl Crdt {
}
Self::run_window_request(&window, &me, &from, ix, blob_recycler)
}
Err(_) => {
warn!("deserialize crdt packet failed");
None
}
}
}
@@ -1236,14 +1251,14 @@ impl TestNode {
#[cfg(test)]
mod tests {
use crdt::{
parse_port_or_addr, Crdt, CrdtError, NodeInfo, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS,
MIN_TABLE_SIZE,
parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS,
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
};
use hash::Hash;
use logger;
use packet::BlobRecycler;
use result::Error;
use signature::{KeyPair, KeyPairUtil};
use signature::{KeyPair, KeyPairUtil, PublicKey};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
@@ -1375,12 +1390,25 @@ mod tests {
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()).unwrap();
assert_eq!(crdt.table[&d.id].version, 0);
assert!(!crdt.alive.contains_key(&d.id));
d.version = 2;
crdt.insert(&d);
let liveness = crdt.alive[&d.id];
assert_eq!(crdt.table[&d.id].version, 2);
d.version = 1;
crdt.insert(&d);
assert_eq!(crdt.table[&d.id].version, 2);
assert_eq!(liveness, crdt.alive[&d.id]);
// Ensure liveness will be updated for version 3
sleep(Duration::from_millis(1));
d.version = 3;
crdt.insert(&d);
assert_eq!(crdt.table[&d.id].version, 3);
assert!(liveness < crdt.alive[&d.id]);
}
#[test]
fn test_new_vote() {
@@ -1391,12 +1419,12 @@ mod tests {
let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
assert_ne!(d.id, leader.id);
assert_matches!(
crdt.new_vote(0, Hash::default()).err(),
crdt.new_vote(Hash::default()).err(),
Some(Error::CrdtError(CrdtError::NoLeader))
);
crdt.insert(&leader);
assert_matches!(
crdt.new_vote(0, Hash::default()).err(),
crdt.new_vote(Hash::default()).err(),
Some(Error::CrdtError(CrdtError::NoLeader))
);
crdt.set_leader(leader.id);
@@ -1406,7 +1434,7 @@ mod tests {
contact_info_version: 0,
};
let expected = (v, crdt.table[&leader.id].contact_info.tpu);
assert_eq!(crdt.new_vote(0, Hash::default()).unwrap(), expected);
assert_eq!(crdt.new_vote(Hash::default()).unwrap(), expected);
}
#[test]
@@ -1740,6 +1768,28 @@ mod tests {
let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.contact_info.ncp);
}
#[test]
fn purge_leader_test() {
logger::setup();
let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
assert_ne!(me.id, nxt.id);
crdt.insert(&nxt);
crdt.set_leader(nxt.id);
let now = crdt.alive[&nxt.id];
let mut nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap());
crdt.insert(&nxt2);
while now == crdt.alive[&nxt2.id] {
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
nxt2.version = nxt2.version + 1;
crdt.insert(&nxt2);
}
let len = crdt.table.len() as u64;
crdt.purge(now + GOSSIP_PURGE_MILLIS + 1);
assert_eq!(len as usize - 1, crdt.table.len());
assert_eq!(crdt.my_data().leader_id, PublicKey::default());
}
/// test window requests respond with the right blob, and do not overrun
#[test]
@@ -1831,4 +1881,41 @@ mod tests {
crdt.update_leader();
assert_eq!(crdt.my_data().leader_id, leader1.id);
}
/// Validates the node that sent Protocol::ReceiveUpdates gets its
/// liveness updated, but not if the node sends Protocol::ReceiveUpdates
/// to itself.
#[test]
fn protocol_requestupdate_alive() {
logger::setup();
let window = default_window();
let recycler = BlobRecycler::default();
let node = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let node_with_same_addr = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_ne!(node.id, node_with_same_addr.id);
let node_with_diff_addr = NodeInfo::new_leader(&"127.0.0.1:4321".parse().unwrap());
let crdt = Crdt::new(node.clone()).expect("Crdt::new");
assert_eq!(crdt.alive.len(), 0);
let obj = Arc::new(RwLock::new(crdt));
let request = Protocol::RequestUpdates(1, node.clone());
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
let request = Protocol::RequestUpdates(1, node_with_same_addr.clone());
assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none());
let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone());
Crdt::handle_protocol(request, &obj, &window, &recycler);
let me = obj.write().unwrap();
// |node| and |node_with_same_addr| should not be in me.alive, but
// |node_with_diff_addr| should now be.
assert!(!me.alive.contains_key(&node.id));
assert!(!me.alive.contains_key(&node_with_same_addr.id));
assert!(me.alive[&node_with_diff_addr.id] > 0);
}
}

View File

@@ -16,6 +16,7 @@ use transaction::Transaction;
pub const TIME_SLICE: u64 = 60;
pub const REQUEST_CAP: u64 = 1_000_000;
pub const DRONE_PORT: u16 = 9900;
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum DroneRequest {

View File

@@ -20,7 +20,8 @@ impl<'a, W: Write> EntryWriter<'a, W> {
fn write_entry(writer: &mut W, entry: &Entry) -> io::Result<()> {
let serialized = serde_json::to_string(entry).unwrap();
writeln!(writer, "{}", serialized)
writeln!(writer, "{}", serialized)?;
writer.flush()
}
pub fn write_entries<I>(writer: &mut W, entries: I) -> io::Result<()>

View File

@@ -212,6 +212,7 @@ impl Packets {
Ok((nrecv, from)) => {
p.meta.size = nrecv;
p.meta.set_addr(&from);
trace!("got {} bytes from {}", nrecv, from);
if i == 0 {
socket.set_nonblocking(true)?;
}
@@ -405,6 +406,7 @@ impl Blob {
Ok((nrecv, from)) => {
p.meta.size = nrecv;
p.meta.set_addr(&from);
trace!("got {} bytes from {}", nrecv, from);
if i == 0 {
socket.set_nonblocking(true)?;
}

View File

@@ -60,16 +60,16 @@ impl ReplicateStage {
if res.is_err() {
error!("process_entries {} {:?}", blobs_len, res);
}
let _ = res?;
let now = timing::timestamp();
if now - *last_vote > VOTE_TIMEOUT_MS {
let height = res?;
let last_id = bank.last_id();
let shared_blob = blob_recycler.allocate();
let (vote, addr) = {
let mut wcrdt = crdt.write().unwrap();
//TODO: doesn't seem like there is a synchronous call to get height and id
info!("replicate_stage {} {:?}", height, &last_id[..8]);
wcrdt.new_vote(height, last_id)
info!("replicate_stage {:?}", &last_id[..8]);
wcrdt.new_vote(last_id)
}?;
{
let mut blob = shared_blob.write().unwrap();

View File

@@ -126,7 +126,7 @@ pub fn responder(
//TODO, we would need to stick block authentication before we create the
//window.
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
trace!("receiving on {}", sock.local_addr().unwrap());
trace!("recv_blobs: receiving on {}", sock.local_addr().unwrap());
let dq = Blob::recv_from(recycler, sock)?;
if !dq.is_empty() {
s.send(dq)?;