Use the default Pubkey formatter instead of debug_id()
This commit is contained in:
126
src/window.rs
126
src/window.rs
@@ -111,7 +111,7 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
|
||||
}
|
||||
|
||||
fn repair_window(
|
||||
debug_id: u64,
|
||||
id: &Pubkey,
|
||||
window: &SharedWindow,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
@@ -122,7 +122,7 @@ fn repair_window(
|
||||
) -> Option<Vec<(SocketAddr, Vec<u8>)>> {
|
||||
//exponential backoff
|
||||
if !repair_backoff(last, times, consumed) {
|
||||
trace!("{:x} !repair_backoff() times = {}", debug_id, times);
|
||||
trace!("{} !repair_backoff() times = {}", id, times);
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -135,8 +135,8 @@ fn repair_window(
|
||||
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!(
|
||||
"{:x}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
|
||||
debug_id,
|
||||
"{}: repair_window counter times: {} consumed: {} highest_lost: {} missing: {}",
|
||||
id,
|
||||
*times,
|
||||
consumed,
|
||||
highest_lost,
|
||||
@@ -144,7 +144,7 @@ fn repair_window(
|
||||
);
|
||||
|
||||
for (to, _) in reqs.clone() {
|
||||
trace!("{:x}: repair_window request to {}", debug_id, to);
|
||||
trace!("{}: repair_window request to {}", id, to);
|
||||
}
|
||||
}
|
||||
Some(reqs)
|
||||
@@ -199,7 +199,7 @@ fn add_block_to_retransmit_queue(
|
||||
fn retransmit_all_leader_blocks(
|
||||
maybe_leader: Option<NodeInfo>,
|
||||
dq: &[SharedBlob],
|
||||
debug_id: u64,
|
||||
id: &Pubkey,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: u64,
|
||||
received: u64,
|
||||
@@ -235,12 +235,12 @@ fn retransmit_all_leader_blocks(
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("{:x}: no leader to retransmit from", debug_id);
|
||||
warn!("{}: no leader to retransmit from", id);
|
||||
}
|
||||
if !retransmit_queue.is_empty() {
|
||||
trace!(
|
||||
"{:x}: RECV_WINDOW {} {}: retransmit {}",
|
||||
debug_id,
|
||||
"{}: RECV_WINDOW {} {}: retransmit {}",
|
||||
id,
|
||||
consumed,
|
||||
received,
|
||||
retransmit_queue.len(),
|
||||
@@ -255,7 +255,7 @@ fn retransmit_all_leader_blocks(
|
||||
/// starting from consumed is thereby formed, add that continuous
|
||||
/// range of blobs to a queue to be sent on to the next stage.
|
||||
///
|
||||
/// * `debug_id` - this node's id in a useful-for-debug format
|
||||
/// * `id` - this node's id
|
||||
/// * `blob` - the blob to be processed into the window and rebroadcast
|
||||
/// * `pix` - the index of the blob, corresponds to
|
||||
/// the entry height of this blob
|
||||
@@ -266,7 +266,7 @@ fn retransmit_all_leader_blocks(
|
||||
/// * `consumed` - input/output, the entry-height to which this
|
||||
/// node has populated and rebroadcast entries
|
||||
fn process_blob(
|
||||
debug_id: u64,
|
||||
id: &Pubkey,
|
||||
blob: SharedBlob,
|
||||
pix: u64,
|
||||
consume_queue: &mut SharedBlobs,
|
||||
@@ -290,7 +290,7 @@ fn process_blob(
|
||||
// blob unless the incoming blob is a duplicate (based on idx)
|
||||
// returns whether the incoming is a duplicate blob
|
||||
fn insert_blob_is_dup(
|
||||
debug_id: u64,
|
||||
id: &Pubkey,
|
||||
blob: SharedBlob,
|
||||
pix: u64,
|
||||
window_slot: &mut Option<SharedBlob>,
|
||||
@@ -301,31 +301,24 @@ fn process_blob(
|
||||
let is_dup = old.read().unwrap().get_index().unwrap() == pix;
|
||||
recycler.recycle(old, "insert_blob_is_dup");
|
||||
trace!(
|
||||
"{:x}: occupied {} window slot {:}, is_dup: {}",
|
||||
debug_id,
|
||||
"{}: occupied {} window slot {:}, is_dup: {}",
|
||||
id,
|
||||
c_or_d,
|
||||
pix,
|
||||
is_dup
|
||||
);
|
||||
is_dup
|
||||
} else {
|
||||
trace!("{:x}: empty {} window slot {:}", debug_id, c_or_d, pix);
|
||||
trace!("{}: empty {} window slot {:}", id, c_or_d, pix);
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry
|
||||
let is_duplicate = if is_coding {
|
||||
insert_blob_is_dup(
|
||||
debug_id,
|
||||
blob,
|
||||
pix,
|
||||
&mut window[w].coding,
|
||||
recycler,
|
||||
"coding",
|
||||
)
|
||||
insert_blob_is_dup(id, blob, pix, &mut window[w].coding, recycler, "coding")
|
||||
} else {
|
||||
insert_blob_is_dup(debug_id, blob, pix, &mut window[w].data, recycler, "data")
|
||||
insert_blob_is_dup(id, blob, pix, &mut window[w].data, recycler, "data")
|
||||
};
|
||||
|
||||
if is_duplicate {
|
||||
@@ -338,21 +331,21 @@ fn process_blob(
|
||||
#[cfg(feature = "erasure")]
|
||||
{
|
||||
if erasure::recover(
|
||||
debug_id,
|
||||
id,
|
||||
recycler,
|
||||
&mut window,
|
||||
*consumed,
|
||||
(*consumed % WINDOW_SIZE) as usize,
|
||||
).is_err()
|
||||
{
|
||||
trace!("{:x}: erasure::recover failed", debug_id);
|
||||
trace!("{}: erasure::recover failed", id);
|
||||
}
|
||||
}
|
||||
|
||||
// push all contiguous blobs into consumed queue, increment consumed
|
||||
loop {
|
||||
let k = (*consumed % WINDOW_SIZE) as usize;
|
||||
trace!("{:x}: k: {} consumed: {}", debug_id, k, *consumed,);
|
||||
trace!("{}: k: {} consumed: {}", id, k, *consumed,);
|
||||
|
||||
if let Some(blob) = &window[k].data {
|
||||
if blob.read().unwrap().get_index().unwrap() < *consumed {
|
||||
@@ -368,14 +361,14 @@ fn process_blob(
|
||||
}
|
||||
}
|
||||
|
||||
fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64) -> bool {
|
||||
fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool {
|
||||
// Prevent receive window from running over
|
||||
// Got a blob which has already been consumed, skip it
|
||||
// probably from a repair window request
|
||||
if pix < consumed {
|
||||
trace!(
|
||||
"{:x}: received: {} but older than consumed: {} skipping..",
|
||||
debug_id,
|
||||
"{}: received: {} but older than consumed: {} skipping..",
|
||||
id,
|
||||
pix,
|
||||
consumed
|
||||
);
|
||||
@@ -389,8 +382,8 @@ fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64
|
||||
|
||||
if pix >= consumed + WINDOW_SIZE {
|
||||
trace!(
|
||||
"{:x}: received: {} will overrun window: {} skipping..",
|
||||
debug_id,
|
||||
"{}: received: {} will overrun window: {} skipping..",
|
||||
id,
|
||||
pix,
|
||||
consumed + WINDOW_SIZE
|
||||
);
|
||||
@@ -403,7 +396,7 @@ fn blob_idx_in_window(debug_id: u64, pix: u64, consumed: u64, received: &mut u64
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||
fn recv_window(
|
||||
debug_id: u64,
|
||||
id: &Pubkey,
|
||||
window: &SharedWindow,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
@@ -428,8 +421,8 @@ fn recv_window(
|
||||
let now = Instant::now();
|
||||
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
|
||||
trace!(
|
||||
"{:x}: RECV_WINDOW {} {}: got packets {}",
|
||||
debug_id,
|
||||
"{}: RECV_WINDOW {} {}: got packets {}",
|
||||
id,
|
||||
*consumed,
|
||||
*received,
|
||||
dq.len(),
|
||||
@@ -438,7 +431,7 @@ fn recv_window(
|
||||
retransmit_all_leader_blocks(
|
||||
maybe_leader,
|
||||
&dq,
|
||||
debug_id,
|
||||
id,
|
||||
recycler,
|
||||
*consumed,
|
||||
*received,
|
||||
@@ -457,15 +450,15 @@ fn recv_window(
|
||||
};
|
||||
pixs.push(pix);
|
||||
|
||||
if !blob_idx_in_window(debug_id, pix, *consumed, received) {
|
||||
if !blob_idx_in_window(&id, pix, *consumed, received) {
|
||||
recycler.recycle(b, "recv_window");
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("{:x} window pix: {} size: {}", debug_id, pix, meta_size);
|
||||
trace!("{} window pix: {} size: {}", id, pix, meta_size);
|
||||
|
||||
process_blob(
|
||||
debug_id,
|
||||
id,
|
||||
b,
|
||||
pix,
|
||||
&mut consume_queue,
|
||||
@@ -477,10 +470,10 @@ fn recv_window(
|
||||
);
|
||||
}
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!("{}", print_window(debug_id, window, *consumed));
|
||||
trace!("{}", print_window(id, window, *consumed));
|
||||
trace!(
|
||||
"{:x}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
|
||||
debug_id,
|
||||
"{}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
|
||||
id,
|
||||
*consumed,
|
||||
*received,
|
||||
consume_queue.len(),
|
||||
@@ -495,7 +488,7 @@ fn recv_window(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn print_window(debug_id: u64, window: &SharedWindow, consumed: u64) -> String {
|
||||
pub fn print_window(id: &Pubkey, window: &SharedWindow, consumed: u64) -> String {
|
||||
let pointer: Vec<_> = window
|
||||
.read()
|
||||
.unwrap()
|
||||
@@ -529,11 +522,11 @@ pub fn print_window(debug_id: u64, window: &SharedWindow, consumed: u64) -> Stri
|
||||
})
|
||||
.collect();
|
||||
format!(
|
||||
"\n{:x}: WINDOW ({}): {}\n{:x}: WINDOW ({}): {}",
|
||||
debug_id,
|
||||
"\n{}: WINDOW ({}): {}\n{}: WINDOW ({}): {}",
|
||||
id,
|
||||
consumed,
|
||||
pointer.join(""),
|
||||
debug_id,
|
||||
id,
|
||||
consumed,
|
||||
buf.join("")
|
||||
)
|
||||
@@ -552,7 +545,7 @@ pub fn index_blobs(
|
||||
receive_index: &mut u64,
|
||||
) -> Result<()> {
|
||||
// enumerate all the blobs, those are the indices
|
||||
trace!("{:x}: INDEX_BLOBS {}", node_info.debug_id(), blobs.len());
|
||||
trace!("{}: INDEX_BLOBS {}", node_info.id, blobs.len());
|
||||
for (i, b) in blobs.iter().enumerate() {
|
||||
// only leader should be broadcasting
|
||||
let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs");
|
||||
@@ -576,14 +569,14 @@ pub fn initialized_window(
|
||||
entry_height: u64,
|
||||
) -> SharedWindow {
|
||||
let window = default_window();
|
||||
let debug_id = node_info.debug_id();
|
||||
let id = node_info.id;
|
||||
|
||||
{
|
||||
let mut win = window.write().unwrap();
|
||||
|
||||
trace!(
|
||||
"{:x} initialized window entry_height:{} blobs_len:{}",
|
||||
debug_id,
|
||||
"{} initialized window entry_height:{} blobs_len:{}",
|
||||
id,
|
||||
entry_height,
|
||||
blobs.len()
|
||||
);
|
||||
@@ -597,7 +590,7 @@ pub fn initialized_window(
|
||||
for b in blobs.into_iter().skip(diff) {
|
||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||
let pos = (ix % WINDOW_SIZE) as usize;
|
||||
trace!("{:x} caching {} at {}", debug_id, ix, pos);
|
||||
trace!("{} caching {} at {}", id, ix, pos);
|
||||
assert!(win[pos].data.is_none());
|
||||
win[pos].data = Some(b);
|
||||
}
|
||||
@@ -634,12 +627,12 @@ pub fn window(
|
||||
let mut received = entry_height;
|
||||
let mut last = entry_height;
|
||||
let mut times = 0;
|
||||
let debug_id = crdt.read().unwrap().debug_id();
|
||||
let id = crdt.read().unwrap().id;
|
||||
let mut pending_retransmits = false;
|
||||
trace!("{:x}: RECV_WINDOW started", debug_id);
|
||||
trace!("{}: RECV_WINDOW started", id);
|
||||
loop {
|
||||
if let Err(e) = recv_window(
|
||||
debug_id,
|
||||
&id,
|
||||
&window,
|
||||
&crdt,
|
||||
&recycler,
|
||||
@@ -660,11 +653,11 @@ pub fn window(
|
||||
}
|
||||
}
|
||||
if let Some(reqs) = repair_window(
|
||||
debug_id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received,
|
||||
&id, &window, &crdt, &recycler, &mut last, &mut times, consumed, received,
|
||||
) {
|
||||
for (to, req) in reqs {
|
||||
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
|
||||
info!("{:x} repair req send_to({}) error {:?}", debug_id, to, e);
|
||||
info!("{} repair req send_to({}) error {:?}", id, to, e);
|
||||
0
|
||||
});
|
||||
}
|
||||
@@ -679,6 +672,7 @@ mod test {
|
||||
use crdt::{Crdt, Node};
|
||||
use logger;
|
||||
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
|
||||
use signature::Pubkey;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::net::UdpSocket;
|
||||
@@ -1023,30 +1017,26 @@ mod test {
|
||||
);
|
||||
}
|
||||
|
||||
fn wrap_blob_idx_in_window(
|
||||
debug_id: u64,
|
||||
pix: u64,
|
||||
consumed: u64,
|
||||
received: u64,
|
||||
) -> (bool, u64) {
|
||||
fn wrap_blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: u64) -> (bool, u64) {
|
||||
let mut received = received;
|
||||
let is_in_window = blob_idx_in_window(debug_id, pix, consumed, &mut received);
|
||||
let is_in_window = blob_idx_in_window(&id, pix, consumed, &mut received);
|
||||
(is_in_window, received)
|
||||
}
|
||||
#[test]
|
||||
pub fn blob_idx_in_window_test() {
|
||||
let id = Pubkey::default();
|
||||
assert_eq!(
|
||||
wrap_blob_idx_in_window(0, 90 + WINDOW_SIZE, 90, 100),
|
||||
wrap_blob_idx_in_window(&id, 90 + WINDOW_SIZE, 90, 100),
|
||||
(false, 90 + WINDOW_SIZE)
|
||||
);
|
||||
assert_eq!(
|
||||
wrap_blob_idx_in_window(0, 91 + WINDOW_SIZE, 90, 100),
|
||||
wrap_blob_idx_in_window(&id, 91 + WINDOW_SIZE, 90, 100),
|
||||
(false, 91 + WINDOW_SIZE)
|
||||
);
|
||||
assert_eq!(wrap_blob_idx_in_window(0, 89, 90, 100), (false, 100));
|
||||
assert_eq!(wrap_blob_idx_in_window(&id, 89, 90, 100), (false, 100));
|
||||
|
||||
assert_eq!(wrap_blob_idx_in_window(0, 91, 90, 100), (true, 100));
|
||||
assert_eq!(wrap_blob_idx_in_window(0, 101, 90, 100), (true, 101));
|
||||
assert_eq!(wrap_blob_idx_in_window(&id, 91, 90, 100), (true, 100));
|
||||
assert_eq!(wrap_blob_idx_in_window(&id, 101, 90, 100), (true, 101));
|
||||
}
|
||||
#[test]
|
||||
pub fn test_repair_backoff() {
|
||||
|
Reference in New Issue
Block a user