only submit to influx when we log

test accumilated value logging

lots of counters

higher influx rate

fix counter name

replicate-transactions
This commit is contained in:
Anatoly Yakovenko
2018-07-10 12:37:39 -07:00
committed by Greg Fitzgerald
parent 0c6d2ef1f4
commit 03a8a5ed55
7 changed files with 79 additions and 47 deletions

View File

@ -125,7 +125,7 @@ impl BankingStage {
reqs_len, reqs_len,
(reqs_len as f32) / (total_time_s) (reqs_len as f32) / (total_time_s)
); );
inc_counter!(COUNTER, count, proc_start); inc_counter!(COUNTER, count);
Ok(()) Ok(())
} }
} }

View File

@ -1,14 +1,14 @@
use influx_db_client as influxdb; use influx_db_client as influxdb;
use metrics; use metrics;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use timing; use timing;
const INFLUX_RATE: usize = 100;
pub struct Counter { pub struct Counter {
pub name: &'static str, pub name: &'static str,
/// total accumulated value /// total accumulated value
pub counts: AtomicUsize, pub counts: AtomicUsize,
pub nanos: AtomicUsize,
pub times: AtomicUsize, pub times: AtomicUsize,
/// last accumulated value logged /// last accumulated value logged
pub lastlog: AtomicUsize, pub lastlog: AtomicUsize,
@ -20,7 +20,6 @@ macro_rules! create_counter {
Counter { Counter {
name: $name, name: $name,
counts: AtomicUsize::new(0), counts: AtomicUsize::new(0),
nanos: AtomicUsize::new(0),
times: AtomicUsize::new(0), times: AtomicUsize::new(0),
lastlog: AtomicUsize::new(0), lastlog: AtomicUsize::new(0),
lograte: $lograte, lograte: $lograte,
@ -29,38 +28,32 @@ macro_rules! create_counter {
} }
macro_rules! inc_counter { macro_rules! inc_counter {
($name:expr, $count:expr, $start:expr) => { ($name:expr, $count:expr) => {
unsafe { $name.inc($count, $start.elapsed()) }; unsafe { $name.inc($count) };
}; };
} }
impl Counter { impl Counter {
pub fn inc(&mut self, events: usize, dur: Duration) { pub fn inc(&mut self, events: usize) {
let total = dur.as_secs() * 1_000_000_000 + dur.subsec_nanos() as u64;
let counts = self.counts.fetch_add(events, Ordering::Relaxed); let counts = self.counts.fetch_add(events, Ordering::Relaxed);
let nanos = self.nanos.fetch_add(total as usize, Ordering::Relaxed);
let times = self.times.fetch_add(1, Ordering::Relaxed); let times = self.times.fetch_add(1, Ordering::Relaxed);
let lastlog = self.lastlog.load(Ordering::Relaxed); let lastlog = self.lastlog.load(Ordering::Relaxed);
if times % self.lograte == 0 && times > 0 { if times % self.lograte == 0 && times > 0 {
info!( info!(
"COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"nanos\": {}, \"samples\": {}, \"rate\": {}, \"now\": {}}}", "COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}}}",
self.name, self.name,
counts, counts,
nanos,
times, times,
counts as f64 * 1e9 / nanos as f64,
timing::timestamp(), timing::timestamp(),
); );
}
if times % INFLUX_RATE == 0 && times > 0 {
metrics::submit( metrics::submit(
influxdb::Point::new(&format!("counter_{}", self.name)) influxdb::Point::new(&format!("counter_{}", self.name))
.add_field( .add_field(
"count", "count",
influxdb::Value::Integer(counts as i64 - lastlog as i64), influxdb::Value::Integer(counts as i64 - lastlog as i64),
) )
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&dur) as i64),
)
.to_owned(), .to_owned(),
); );
self.lastlog self.lastlog
@ -72,28 +65,25 @@ impl Counter {
mod tests { mod tests {
use counter::Counter; use counter::Counter;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
#[test] #[test]
fn test_counter() { fn test_counter() {
static mut COUNTER: Counter = create_counter!("test", 100); static mut COUNTER: Counter = create_counter!("test", 100);
let start = Instant::now();
let count = 1; let count = 1;
inc_counter!(COUNTER, count, start); inc_counter!(COUNTER, count);
unsafe { unsafe {
assert_eq!(COUNTER.counts.load(Ordering::Relaxed), 1); assert_eq!(COUNTER.counts.load(Ordering::Relaxed), 1);
assert_ne!(COUNTER.nanos.load(Ordering::Relaxed), 0);
assert_eq!(COUNTER.times.load(Ordering::Relaxed), 1); assert_eq!(COUNTER.times.load(Ordering::Relaxed), 1);
assert_eq!(COUNTER.lograte, 100); assert_eq!(COUNTER.lograte, 100);
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 0); assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 0);
assert_eq!(COUNTER.name, "test"); assert_eq!(COUNTER.name, "test");
} }
for _ in 0..199 { for _ in 0..199 {
inc_counter!(COUNTER, 2, start); inc_counter!(COUNTER, 2);
} }
unsafe { unsafe {
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 199); assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 199);
} }
inc_counter!(COUNTER, 2, start); inc_counter!(COUNTER, 2);
unsafe { unsafe {
assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 399); assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 399);
} }

View File

@ -16,6 +16,7 @@
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt}; use byteorder::{LittleEndian, ReadBytesExt};
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy};
use counter::Counter;
use hash::Hash; use hash::Hash;
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use pnet_datalink as datalink; use pnet_datalink as datalink;
@ -28,7 +29,7 @@ use std::collections::HashMap;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::Cursor; use std::io::Cursor;
use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder, JoinHandle}; use std::thread::{sleep, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -36,6 +37,8 @@ use streamer::{BlobReceiver, BlobSender, Window};
use timing::timestamp; use timing::timestamp;
use transaction::Vote; use transaction::Vote;
const LOG_RATE: usize = 10;
/// milliseconds we sleep for between gossip requests /// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100; const GOSSIP_SLEEP_MILLIS: u64 = 100;
const GOSSIP_PURGE_MILLIS: u64 = 15000; const GOSSIP_PURGE_MILLIS: u64 = 15000;
@ -337,6 +340,8 @@ impl Crdt {
} }
} }
pub fn insert_votes(&mut self, votes: Vec<(PublicKey, Vote, Hash)>) { pub fn insert_votes(&mut self, votes: Vec<(PublicKey, Vote, Hash)>) {
static mut COUNTER_VOTE: Counter = create_counter!("crdt-vote-count", LOG_RATE);
inc_counter!(COUNTER_VOTE, votes.len());
if votes.len() > 0 { if votes.len() > 0 {
info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len()); info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len());
} }
@ -360,6 +365,8 @@ impl Crdt {
self.update_index += 1; self.update_index += 1;
let _ = self.table.insert(v.id.clone(), v.clone()); let _ = self.table.insert(v.id.clone(), v.clone());
let _ = self.local.insert(v.id, self.update_index); let _ = self.local.insert(v.id, self.update_index);
static mut COUNTER_UPDATE: Counter = create_counter!("crdt-update-count", LOG_RATE);
inc_counter!(COUNTER_UPDATE, 1);
} else { } else {
trace!( trace!(
"{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}", "{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}",
@ -431,6 +438,9 @@ impl Crdt {
}) })
.collect(); .collect();
static mut COUNTER_PURGE: Counter = create_counter!("crdt-purge-count", LOG_RATE);
inc_counter!(COUNTER_PURGE, dead_ids.len());
for id in dead_ids.iter() { for id in dead_ids.iter() {
self.alive.remove(id); self.alive.remove(id);
self.table.remove(id); self.table.remove(id);
@ -884,15 +894,24 @@ impl Crdt {
outblob.meta.set_addr(&from.contact_info.tvu_window); outblob.meta.set_addr(&from.contact_info.tvu_window);
outblob.set_id(sender_id).expect("blob set_id"); outblob.set_id(sender_id).expect("blob set_id");
} }
static mut COUNTER_REQ_WINDOW_PASS: Counter =
create_counter!("crdt-window-request-pass", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW_PASS, 1);
return Some(out); return Some(out);
} else { } else {
static mut COUNTER_REQ_WINDOW_OUTSIDE: Counter =
create_counter!("crdt-window-request-outside", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW_OUTSIDE, 1);
info!( info!(
"requested ix {} != blob_ix {}, outside window!", "requested ix {} != blob_ix {}, outside window!",
ix, blob_ix ix, blob_ix
); );
} }
} else { } else {
static mut COUNTER_REQ_WINDOW_FAIL: Counter =
create_counter!("crdt-window-request-fail", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW_FAIL, 1);
assert!(window.read().unwrap()[pos].is_none()); assert!(window.read().unwrap()[pos].is_none());
info!( info!(
"{:x}: failed RequestWindowIndex {:x} {} {}", "{:x}: failed RequestWindowIndex {:x} {} {}",
@ -971,6 +990,9 @@ impl Crdt {
//TODO verify from is signed //TODO verify from is signed
obj.write().unwrap().insert(&from); obj.write().unwrap().insert(&from);
let me = obj.read().unwrap().my_data().clone(); let me = obj.read().unwrap().my_data().clone();
static mut COUNTER_REQ_WINDOW: Counter =
create_counter!("crdt-window-request-recv", LOG_RATE);
inc_counter!(COUNTER_REQ_WINDOW, 1);
trace!( trace!(
"{:x}:received RequestWindowIndex {:x} {} ", "{:x}:received RequestWindowIndex {:x} {} ",
me.debug_id(), me.debug_id(),

View File

@ -12,7 +12,6 @@ use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;
pub type SharedPackets = Arc<RwLock<Packets>>; pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>; pub type SharedBlob = Arc<RwLock<Blob>>;
@ -20,6 +19,7 @@ pub type SharedBlobs = VecDeque<SharedBlob>;
pub type PacketRecycler = Recycler<Packets>; pub type PacketRecycler = Recycler<Packets>;
pub type BlobRecycler = Recycler<Blob>; pub type BlobRecycler = Recycler<Blob>;
const LOG_RATE: usize = 10;
pub const NUM_PACKETS: usize = 1024 * 8; pub const NUM_PACKETS: usize = 1024 * 8;
pub const BLOB_SIZE: usize = 64 * 1024; pub const BLOB_SIZE: usize = 64 * 1024;
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE; pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE;
@ -188,7 +188,7 @@ impl<T: Default> Recycler<T> {
impl Packets { impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> { fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
static mut COUNTER: Counter = create_counter!("packets", 10); static mut COUNTER: Counter = create_counter!("packets", LOG_RATE);
self.packets.resize(NUM_PACKETS, Packet::default()); self.packets.resize(NUM_PACKETS, Packet::default());
let mut i = 0; let mut i = 0;
//DOCUMENTED SIDE-EFFECT //DOCUMENTED SIDE-EFFECT
@ -198,13 +198,12 @@ impl Packets {
// * read until it fails // * read until it fails
// * set it back to blocking before returning // * set it back to blocking before returning
socket.set_nonblocking(false)?; socket.set_nonblocking(false)?;
let mut start = Instant::now();
for p in &mut self.packets { for p in &mut self.packets {
p.meta.size = 0; p.meta.size = 0;
trace!("receiving on {}", socket.local_addr().unwrap()); trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) { match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => { Err(_) if i > 0 => {
inc_counter!(COUNTER, i, start); inc_counter!(COUNTER, i);
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap()); debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break; break;
} }
@ -216,7 +215,6 @@ impl Packets {
p.meta.size = nrecv; p.meta.size = nrecv;
p.meta.set_addr(&from); p.meta.set_addr(&from);
if i == 0 { if i == 0 {
start = Instant::now();
socket.set_nonblocking(true)?; socket.set_nonblocking(true)?;
} }
} }

View File

@ -2,6 +2,7 @@
use bank::Bank; use bank::Bank;
use bincode::serialize; use bincode::serialize;
use counter::Counter;
use crdt::Crdt; use crdt::Crdt;
use ledger; use ledger;
use packet::BlobRecycler; use packet::BlobRecycler;
@ -10,6 +11,7 @@ use service::Service;
use signature::KeyPair; use signature::KeyPair;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -25,6 +27,7 @@ pub struct ReplicateStage {
} }
const VOTE_TIMEOUT_MS: u64 = 1000; const VOTE_TIMEOUT_MS: u64 = 1000;
const LOG_RATE: usize = 10;
impl ReplicateStage { impl ReplicateStage {
/// Process entry blobs, already in order /// Process entry blobs, already in order
@ -46,6 +49,12 @@ impl ReplicateStage {
let blobs_len = blobs.len(); let blobs_len = blobs.len();
let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?; let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?;
let votes = entries_to_votes(&entries); let votes = entries_to_votes(&entries);
static mut COUNTER_REPLICATE: Counter = create_counter!("replicate-transactions", LOG_RATE);
inc_counter!(
COUNTER_REPLICATE,
entries.iter().map(|x| x.transactions.len()).sum()
);
let res = bank.process_entries(entries); let res = bank.process_entries(entries);
if res.is_err() { if res.is_err() {
error!("process_entries {} {:?}", blobs_len, res); error!("process_entries {} {:?}", blobs_len, res);

View File

@ -8,7 +8,6 @@ use counter::Counter;
use packet::{Packet, SharedPackets}; use packet::{Packet, SharedPackets};
use std::mem::size_of; use std::mem::size_of;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::time::Instant;
use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET}; use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET};
pub const TX_OFFSET: usize = 0; pub const TX_OFFSET: usize = 0;
@ -71,7 +70,6 @@ fn batch_size(batches: &Vec<SharedPackets>) -> usize {
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> { pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use rayon::prelude::*; use rayon::prelude::*;
static mut COUNTER: Counter = create_counter!("ed25519_verify", 1); static mut COUNTER: Counter = create_counter!("ed25519_verify", 1);
let start = Instant::now();
let count = batch_size(batches); let count = batch_size(batches);
info!("CPU ECDSA for {}", batch_size(batches)); info!("CPU ECDSA for {}", batch_size(batches));
let rv = batches let rv = batches
@ -85,7 +83,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
.collect() .collect()
}) })
.collect(); .collect();
inc_counter!(COUNTER, count, start); inc_counter!(COUNTER, count);
rv rv
} }
@ -93,7 +91,6 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> { pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use packet::PACKET_DATA_SIZE; use packet::PACKET_DATA_SIZE;
static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1); static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1);
let start = Instant::now();
let count = batch_size(batches); let count = batch_size(batches);
info!("CUDA ECDSA for {}", batch_size(batches)); info!("CUDA ECDSA for {}", batch_size(batches));
let mut out = Vec::new(); let mut out = Vec::new();
@ -153,7 +150,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
num += 1; num += 1;
} }
} }
inc_counter!(COUNTER, count, start); inc_counter!(COUNTER, count);
rvs rvs
} }

View File

@ -1,5 +1,6 @@
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
//! //!
use counter::Counter;
use crdt::{Crdt, CrdtError, ReplicatedData}; use crdt::{Crdt, CrdtError, ReplicatedData};
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
@ -11,12 +12,13 @@ use std::cmp;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::mem; use std::mem;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
const LOG_RATE: usize = 10;
pub const WINDOW_SIZE: u64 = 2 * 1024; pub const WINDOW_SIZE: u64 = 2 * 1024;
pub type PacketReceiver = Receiver<SharedPackets>; pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = Sender<SharedPackets>; pub type PacketSender = Sender<SharedPackets>;
@ -217,6 +219,9 @@ fn repair_window(
let reqs = find_next_missing(locked_window, crdt, consumed, received)?; let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
if reqs.len() > 0 { if reqs.len() > 0 {
static mut COUNTER_REPAIR: Counter =
create_counter!("streamer-repair_window-repair", LOG_RATE);
inc_counter!(COUNTER_REPAIR, reqs.len());
debug!( debug!(
"{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}", "{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}",
debug_id, debug_id,
@ -259,6 +264,8 @@ fn recv_window(
while let Ok(mut nq) = r.try_recv() { while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq) dq.append(&mut nq)
} }
static mut COUNTER_RECV: Counter = create_counter!("streamer-recv_window-recv", LOG_RATE);
inc_counter!(COUNTER_RECV, dq.len());
debug!( debug!(
"{:x}: RECV_WINDOW {} {}: got packets {}", "{:x}: RECV_WINDOW {} {}: got packets {}",
debug_id, debug_id,
@ -268,7 +275,7 @@ fn recv_window(
); );
{ {
//retransmit all leader blocks //retransmit all leader blocks
let mut retransmitq = VecDeque::new(); let mut retransmit_queue = VecDeque::new();
if let Some(leader) = maybe_leader { if let Some(leader) = maybe_leader {
for b in &dq { for b in &dq {
let p = b.read().expect("'b' read lock in fn recv_window"); let p = b.read().expect("'b' read lock in fn recv_window");
@ -297,25 +304,28 @@ fn recv_window(
mnv.meta.size = sz; mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]); mnv.data[..sz].copy_from_slice(&p.data[..sz]);
} }
retransmitq.push_back(nv); retransmit_queue.push_back(nv);
} }
} }
} else { } else {
warn!("{:x}: no leader to retransmit from", debug_id); warn!("{:x}: no leader to retransmit from", debug_id);
} }
if !retransmitq.is_empty() { if !retransmit_queue.is_empty() {
debug!( debug!(
"{:x}: RECV_WINDOW {} {}: retransmit {}", "{:x}: RECV_WINDOW {} {}: retransmit {}",
debug_id, debug_id,
*consumed, *consumed,
*received, *received,
retransmitq.len(), retransmit_queue.len(),
); );
retransmit.send(retransmitq)?; static mut COUNTER_RETRANSMIT: Counter =
create_counter!("streamer-recv_window-retransmit", LOG_RATE);
inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len());
retransmit.send(retransmit_queue)?;
} }
} }
//send a contiguous set of blocks //send a contiguous set of blocks
let mut contq = VecDeque::new(); let mut consume_queue = VecDeque::new();
while let Some(b) = dq.pop_front() { while let Some(b) = dq.pop_front() {
let (pix, meta_size) = { let (pix, meta_size) = {
let p = b.write().expect("'b' write lock in fn recv_window"); let p = b.write().expect("'b' write lock in fn recv_window");
@ -386,7 +396,7 @@ fn recv_window(
} }
} }
if !is_coding { if !is_coding {
contq.push_back(window[k].clone().expect("clone in fn recv_window")); consume_queue.push_back(window[k].clone().expect("clone in fn recv_window"));
*consumed += 1; *consumed += 1;
} else { } else {
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
@ -416,17 +426,20 @@ fn recv_window(
} }
} }
print_window(debug_id, locked_window, *consumed); print_window(debug_id, locked_window, *consumed);
trace!("sending contq.len: {}", contq.len()); trace!("sending consume_queue.len: {}", consume_queue.len());
if !contq.is_empty() { if !consume_queue.is_empty() {
debug!( debug!(
"{:x}: RECV_WINDOW {} {}: forwarding contq {}", "{:x}: RECV_WINDOW {} {}: forwarding consume_queue {}",
debug_id, debug_id,
*consumed, *consumed,
*received, *received,
contq.len(), consume_queue.len(),
); );
trace!("sending contq.len: {}", contq.len()); trace!("sending consume_queue.len: {}", consume_queue.len());
s.send(contq)?; static mut COUNTER_CONSUME: Counter =
create_counter!("streamer-recv_window-consume", LOG_RATE);
inc_counter!(COUNTER_CONSUME, consume_queue.len());
s.send(consume_queue)?;
} }
Ok(()) Ok(())
} }
@ -592,6 +605,9 @@ fn broadcast(
// Index the blobs // Index the blobs
Crdt::index_blobs(&me, &blobs, receive_index)?; Crdt::index_blobs(&me, &blobs, receive_index)?;
// keep the cache of blobs that are broadcast // keep the cache of blobs that are broadcast
static mut COUNTER_BROADCAST: Counter =
create_counter!("streamer-broadcast-sent", LOG_RATE);
inc_counter!(COUNTER_BROADCAST, blobs.len());
{ {
let mut win = window.write().unwrap(); let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len()); assert!(blobs.len() <= win.len());