Enable Crdt debug messages to debug validators
This commit is contained in:
committed by
sakridge
parent
a6857dbaaa
commit
8331aab26a
38
src/crdt.rs
38
src/crdt.rs
@ -34,9 +34,9 @@ use std::net::{IpAddr, SocketAddr, UdpSocket};
|
|||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::{sleep, Builder, JoinHandle};
|
use std::thread::{sleep, Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
use streamer::{BlobReceiver, BlobSender, SharedWindow, WindowIndex};
|
use streamer::{BlobReceiver, BlobSender, SharedWindow, WindowIndex};
|
||||||
use timing::timestamp;
|
use timing::{duration_as_ms, timestamp};
|
||||||
use transaction::Vote;
|
use transaction::Vote;
|
||||||
|
|
||||||
/// milliseconds we sleep for between gossip requests
|
/// milliseconds we sleep for between gossip requests
|
||||||
@ -417,7 +417,8 @@ impl Crdt {
|
|||||||
self.insert_vote(&v.0, &v.1, v.2);
|
self.insert_vote(&v.0, &v.1, v.2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn insert(&mut self, v: &NodeInfo) {
|
|
||||||
|
pub fn insert(&mut self, v: &NodeInfo) -> usize {
|
||||||
// TODO check that last_verified types are always increasing
|
// TODO check that last_verified types are always increasing
|
||||||
//update the peer table
|
//update the peer table
|
||||||
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
|
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
|
||||||
@ -436,8 +437,8 @@ impl Crdt {
|
|||||||
self.update_index += 1;
|
self.update_index += 1;
|
||||||
let _ = self.table.insert(v.id, v.clone());
|
let _ = self.table.insert(v.id, v.clone());
|
||||||
let _ = self.local.insert(v.id, self.update_index);
|
let _ = self.local.insert(v.id, self.update_index);
|
||||||
inc_new_counter_info!("crdt-update-count", 1);
|
|
||||||
self.update_liveness(v.id);
|
self.update_liveness(v.id);
|
||||||
|
1
|
||||||
} else {
|
} else {
|
||||||
trace!(
|
trace!(
|
||||||
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
|
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
|
||||||
@ -446,6 +447,7 @@ impl Crdt {
|
|||||||
v.version,
|
v.version,
|
||||||
self.table[&v.id].version
|
self.table[&v.id].version
|
||||||
);
|
);
|
||||||
|
0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -901,9 +903,11 @@ impl Crdt {
|
|||||||
trace!("got updates {}", data.len());
|
trace!("got updates {}", data.len());
|
||||||
// TODO we need to punish/spam resist here
|
// TODO we need to punish/spam resist here
|
||||||
// sig verify the whole update and slash anyone who sends a bad update
|
// sig verify the whole update and slash anyone who sends a bad update
|
||||||
|
let mut insert_total = 0;
|
||||||
for v in data {
|
for v in data {
|
||||||
self.insert(&v);
|
insert_total += self.insert(&v);
|
||||||
}
|
}
|
||||||
|
inc_new_counter_info!("crdt-update-count", insert_total);
|
||||||
|
|
||||||
for (pk, external_remote_index) in external_liveness {
|
for (pk, external_remote_index) in external_liveness {
|
||||||
let remote_entry = if let Some(v) = self.remote.get(pk) {
|
let remote_entry = if let Some(v) = self.remote.get(pk) {
|
||||||
@ -1118,6 +1122,7 @@ impl Crdt {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => {
|
Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => {
|
||||||
|
let now = Instant::now();
|
||||||
trace!(
|
trace!(
|
||||||
"ReceivedUpdates from={:x} update_index={} len={}",
|
"ReceivedUpdates from={:x} update_index={} len={}",
|
||||||
make_debug_id(&from),
|
make_debug_id(&from),
|
||||||
@ -1127,9 +1132,16 @@ impl Crdt {
|
|||||||
obj.write()
|
obj.write()
|
||||||
.expect("'obj' write lock in ReceiveUpdates")
|
.expect("'obj' write lock in ReceiveUpdates")
|
||||||
.apply_updates(from, update_index, &data, &external_liveness);
|
.apply_updates(from, update_index, &data, &external_liveness);
|
||||||
|
|
||||||
|
report_time_spent(
|
||||||
|
"ReceiveUpdates",
|
||||||
|
&now.elapsed(),
|
||||||
|
&format!(" len: {}", data.len()),
|
||||||
|
);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
Protocol::RequestWindowIndex(from, ix) => {
|
Protocol::RequestWindowIndex(from, ix) => {
|
||||||
|
let now = Instant::now();
|
||||||
//TODO this doesn't depend on CRDT module, can be moved
|
//TODO this doesn't depend on CRDT module, can be moved
|
||||||
//but we are using the listen thread to service these request
|
//but we are using the listen thread to service these request
|
||||||
//TODO verify from is signed
|
//TODO verify from is signed
|
||||||
@ -1152,7 +1164,14 @@ impl Crdt {
|
|||||||
inc_new_counter_info!("crdt-window-request-address-eq", 1);
|
inc_new_counter_info!("crdt-window-request-address-eq", 1);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler)
|
let res =
|
||||||
|
Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler);
|
||||||
|
report_time_spent(
|
||||||
|
"RequestWindowIndex",
|
||||||
|
&now.elapsed(),
|
||||||
|
&format!(" ix: {}", ix),
|
||||||
|
);
|
||||||
|
res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1343,6 +1362,13 @@ impl TestNode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn report_time_spent(label: &str, time: &Duration, extra: &str) {
|
||||||
|
let count = duration_as_ms(time);
|
||||||
|
if count > 5 {
|
||||||
|
info!("{} took: {} ms {}", label, count, extra);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crdt::{
|
use crdt::{
|
||||||
|
@ -17,7 +17,8 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
|||||||
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::{Builder, JoinHandle};
|
use std::thread::{Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
use timing::duration_as_ms;
|
||||||
|
|
||||||
pub const WINDOW_SIZE: u64 = 2 * 1024;
|
pub const WINDOW_SIZE: u64 = 2 * 1024;
|
||||||
pub type PacketReceiver = Receiver<SharedPackets>;
|
pub type PacketReceiver = Receiver<SharedPackets>;
|
||||||
@ -250,7 +251,7 @@ fn repair_window(
|
|||||||
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
|
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
|
||||||
if !reqs.is_empty() {
|
if !reqs.is_empty() {
|
||||||
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
|
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
|
||||||
debug!(
|
info!(
|
||||||
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
|
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
|
||||||
debug_id,
|
debug_id,
|
||||||
*times,
|
*times,
|
||||||
@ -496,7 +497,8 @@ fn recv_window(
|
|||||||
while let Ok(mut nq) = r.try_recv() {
|
while let Ok(mut nq) = r.try_recv() {
|
||||||
dq.append(&mut nq)
|
dq.append(&mut nq)
|
||||||
}
|
}
|
||||||
inc_new_counter_info!("streamer-recv_window-recv", dq.len());
|
let now = Instant::now();
|
||||||
|
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
|
||||||
debug!(
|
debug!(
|
||||||
"{:x}: RECV_WINDOW {} {}: got packets {}",
|
"{:x}: RECV_WINDOW {} {}: got packets {}",
|
||||||
debug_id,
|
debug_id,
|
||||||
@ -515,6 +517,7 @@ fn recv_window(
|
|||||||
retransmit,
|
retransmit,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
let mut pixs = Vec::new();
|
||||||
//send a contiguous set of blocks
|
//send a contiguous set of blocks
|
||||||
let mut consume_queue = VecDeque::new();
|
let mut consume_queue = VecDeque::new();
|
||||||
while let Some(b) = dq.pop_front() {
|
while let Some(b) = dq.pop_front() {
|
||||||
@ -522,6 +525,7 @@ fn recv_window(
|
|||||||
let p = b.write().expect("'b' write lock in fn recv_window");
|
let p = b.write().expect("'b' write lock in fn recv_window");
|
||||||
(p.get_index()?, p.meta.size)
|
(p.get_index()?, p.meta.size)
|
||||||
};
|
};
|
||||||
|
pixs.push(pix);
|
||||||
|
|
||||||
if !blob_idx_in_window(debug_id, pix, *consumed, received) {
|
if !blob_idx_in_window(debug_id, pix, *consumed, received) {
|
||||||
recycler.recycle(b);
|
recycler.recycle(b);
|
||||||
@ -543,10 +547,14 @@ fn recv_window(
|
|||||||
if log_enabled!(Level::Trace) {
|
if log_enabled!(Level::Trace) {
|
||||||
trace!("{}", print_window(debug_id, window, *consumed));
|
trace!("{}", print_window(debug_id, window, *consumed));
|
||||||
}
|
}
|
||||||
trace!(
|
info!(
|
||||||
"{:x}: sending consume_queue.len: {}",
|
"{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
|
||||||
debug_id,
|
debug_id,
|
||||||
consume_queue.len()
|
*consumed,
|
||||||
|
*received,
|
||||||
|
consume_queue.len(),
|
||||||
|
pixs,
|
||||||
|
duration_as_ms(&now.elapsed())
|
||||||
);
|
);
|
||||||
if !consume_queue.is_empty() {
|
if !consume_queue.is_empty() {
|
||||||
debug!(
|
debug!(
|
||||||
|
Reference in New Issue
Block a user