From 8cc030ef8472e9a70d41894c37bead3eda3954d8 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 3 Sep 2018 00:22:47 -1000 Subject: [PATCH] Use Vec instead of VecDeque for SharedBlobs --- src/crdt.rs | 11 ++++------- src/ledger.rs | 11 +++++------ src/packet.rs | 29 +++++++++++------------------ src/replicate_stage.rs | 2 +- src/retransmit_stage.rs | 2 +- src/streamer.rs | 9 ++++----- src/tvu.rs | 5 ++--- src/vote_stage.rs | 7 +++---- src/window.rs | 36 +++++++++++++++++------------------- src/write_stage.rs | 3 +-- 10 files changed, 49 insertions(+), 66 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index 1a0f5d3638..d30fd64cfb 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -27,7 +27,6 @@ use result::{Error, Result}; use signature::{Keypair, KeypairUtil, Pubkey}; use std; use std::collections::HashMap; -use std::collections::VecDeque; use std::io::Cursor; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -796,9 +795,7 @@ impl Crdt { // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have let blob = to_blob(req, remote_gossip_addr, blob_recycler)?; - let mut q: VecDeque = VecDeque::new(); - q.push_back(blob); - blob_sender.send(q)?; + blob_sender.send(vec![blob])?; Ok(()) } /// TODO: This is obviously the wrong way to do this. Need to implement leader selection @@ -1188,8 +1185,8 @@ impl Crdt { while let Ok(mut more) = requests_receiver.try_recv() { reqs.append(&mut more); } - let mut resps = VecDeque::new(); - while let Some(req) = reqs.pop_front() { + let mut resps = Vec::new(); + for req in reqs { if let Some(resp) = Self::handle_blob( obj, window, @@ -1197,7 +1194,7 @@ impl Crdt { blob_recycler, &req.read().unwrap(), ) { - resps.push_back(resp); + resps.push(resp); } blob_recycler.recycle(req); } diff --git a/src/ledger.rs b/src/ledger.rs index 455d189823..4249cb9278 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -10,7 +10,6 @@ use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; use result::{Error, Result}; use signature::Pubkey; -use std::collections::VecDeque; use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; @@ -413,7 +412,7 @@ pub fn read_ledger( pub trait Block { /// Verifies the hashes and counts of a slice of transactions are all consistent. fn verify(&self, start_hash: &Hash) -> bool; - fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque); + fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut Vec); fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>; } @@ -434,10 +433,10 @@ impl Block for [Entry] { }) } - fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque) { + fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut Vec) { for entry in self { let blob = entry.to_blob(blob_recycler, None, None, None); - q.push_back(blob); + q.push(blob); } } @@ -448,7 +447,7 @@ impl Block for [Entry] { } } -pub fn reconstruct_entries_from_blobs(blobs: VecDeque) -> Result> { +pub fn reconstruct_entries_from_blobs(blobs: Vec) -> Result> { let mut entries: Vec = Vec::with_capacity(blobs.len()); for blob in blobs { @@ -635,7 +634,7 @@ mod tests { let entries = make_test_entries(); let blob_recycler = BlobRecycler::default(); - let mut blob_q = VecDeque::new(); + let mut blob_q = Vec::new(); entries.to_blobs(&blob_recycler, &mut blob_q); assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries); diff --git a/src/packet.rs b/src/packet.rs index 181f8c7410..06b16058df 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -6,7 +6,6 @@ use log::Level; use result::{Error, Result}; use serde::Serialize; use signature::Pubkey; -use std::collections::VecDeque; use std::fmt; use std::io; use std::mem::size_of; @@ -16,7 +15,7 @@ use std::sync::{Arc, Mutex, RwLock}; pub type SharedPackets = Arc>; pub type SharedBlob = Arc>; -pub type SharedBlobs = VecDeque; +pub type SharedBlobs = Vec; pub type PacketRecycler = Recycler; pub type BlobRecycler = Recycler; @@ -336,9 +335,9 @@ pub fn to_blobs( rsps: Vec<(T, SocketAddr)>, blob_recycler: &BlobRecycler, ) -> Result { - let mut blobs = VecDeque::new(); + let mut blobs = Vec::new(); for (resp, rsp_addr) in rsps { - blobs.push_back(to_blob(resp, rsp_addr, blob_recycler)?); + blobs.push(to_blob(resp, rsp_addr, blob_recycler)?); } Ok(blobs) } @@ -437,7 +436,7 @@ impl Blob { self.set_data_size(new_size as u64).unwrap(); } pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result { - let mut v = VecDeque::new(); + let mut v = Vec::new(); //DOCUMENTED SIDE-EFFECT //Performance out of the IO without poll // * block on the socket until it's readable @@ -471,12 +470,12 @@ impl Blob { } } } - v.push_back(r); + v.push(r); } Ok(v) } - pub fn send_to(re: &BlobRecycler, socket: &UdpSocket, v: &mut SharedBlobs) -> Result<()> { - while let Some(r) = v.pop_front() { + pub fn send_to(re: &BlobRecycler, socket: &UdpSocket, v: SharedBlobs) -> Result<()> { + for r in v { { let p = r.read().expect("'r' read lock in pub fn send_to"); let a = p.meta.addr(); @@ -501,7 +500,6 @@ mod tests { BLOB_HEADER_SIZE, NUM_PACKETS, }; use request::Request; - use std::collections::VecDeque; use std::io; use std::io::Write; use std::net::UdpSocket; @@ -613,18 +611,13 @@ mod tests { let p = r.allocate(); p.write().unwrap().meta.set_addr(&addr); p.write().unwrap().meta.size = 1024; - let mut v = VecDeque::new(); - v.push_back(p); - assert_eq!(v.len(), 1); - Blob::send_to(&r, &sender, &mut v).unwrap(); + let v = vec![p]; + Blob::send_to(&r, &sender, v).unwrap(); trace!("send_to"); - assert_eq!(v.len(), 0); - let mut rv = Blob::recv_from(&r, &reader).unwrap(); + let rv = Blob::recv_from(&r, &reader).unwrap(); trace!("recv_from"); assert_eq!(rv.len(), 1); - let rp = rv.pop_front().unwrap(); - assert_eq!(rp.write().unwrap().meta.size, 1024); - r.recycle(rp); + assert_eq!(rv[0].write().unwrap().meta.size, 1024); } #[cfg(all(feature = "ipv6", test))] diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index b0cf34aeae..917eba5042 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -43,7 +43,7 @@ impl ReplicateStage { let res = bank.process_entries(entries.clone()); - while let Some(blob) = blobs.pop_front() { + for blob in blobs { blob_recycler.recycle(blob); } diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 3272d20083..c3515bbee9 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -32,7 +32,7 @@ fn retransmit( Crdt::retransmit(&crdt, b, sock)?; } } - while let Some(b) = dq.pop_front() { + for b in dq { recycler.recycle(b); } Ok(()) diff --git a/src/streamer.rs b/src/streamer.rs index cf657710a1..646ed60557 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -64,8 +64,8 @@ pub fn receiver( fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> { let timer = Duration::new(1, 0); - let mut msgs = r.recv_timeout(timer)?; - Blob::send_to(recycler, sock, &mut msgs)?; + let msgs = r.recv_timeout(timer)?; + Blob::send_to(recycler, sock, msgs)?; Ok(()) } @@ -144,7 +144,6 @@ pub fn blob_receiver( #[cfg(test)] mod test { use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; - use std::collections::VecDeque; use std::io; use std::io::Write; use std::net::UdpSocket; @@ -198,7 +197,7 @@ mod test { resp_recycler.clone(), r_responder, ); - let mut msgs = VecDeque::new(); + let mut msgs = Vec::new(); for i in 0..10 { let b = resp_recycler.allocate(); { @@ -207,7 +206,7 @@ mod test { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); } - msgs.push_back(b); + msgs.push(b); } s_responder.send(msgs).expect("send"); t_responder diff --git a/src/tvu.rs b/src/tvu.rs index f23ec0204b..a84648f699 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -154,7 +154,6 @@ pub mod tests { use packet::BlobRecycler; use service::Service; use signature::{Keypair, KeypairUtil}; - use std::collections::VecDeque; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; @@ -248,7 +247,7 @@ pub mod tests { ); let mut alice_ref_balance = starting_balance; - let mut msgs = VecDeque::new(); + let mut msgs = Vec::new(); let mut cur_hash = Hash::default(); let mut blob_id = 0; let num_transfers = 10; @@ -287,7 +286,7 @@ pub mod tests { w.set_size(serialized_entry.len()); w.meta.set_addr(&replicate_addr); } - msgs.push_back(b); + msgs.push(b); } } diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 0398ee32d7..85dfe58065 100755 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -12,7 +12,6 @@ use packet::{BlobRecycler, SharedBlob}; use result::Result; use service::Service; use signature::Keypair; -use std::collections::VecDeque; use std::result; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -136,7 +135,7 @@ pub fn send_leader_vote( if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt, blob_recycler) { - vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; + vote_blob_sender.send(vec![shared_blob])?; let finality_ms = now - super_majority_timestamp; *last_valid_validator_timestamp = super_majority_timestamp; @@ -170,7 +169,7 @@ fn send_validator_vote( if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt, blob_recycler) { inc_new_counter_info!("replicate-vote_sent", 1); - vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; + vote_blob_sender.send(vec![shared_blob])?; } Ok(()) } @@ -388,7 +387,7 @@ pub mod tests { assert!(vote_blob.is_ok()); // vote should be valid - let blob = vote_blob.unwrap().pop_front().unwrap(); + let blob = &vote_blob.unwrap()[0]; let tx = deserialize(&(blob.read().unwrap().data)).unwrap(); assert!(bank.process_transaction(&tx).is_ok()); } diff --git a/src/window.rs b/src/window.rs index d6a098317e..532ac29f24 100644 --- a/src/window.rs +++ b/src/window.rs @@ -12,7 +12,6 @@ use rand::{thread_rng, Rng}; use result::{Error, Result}; use signature::Pubkey; use std::cmp; -use std::collections::VecDeque; use std::mem; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::AtomicUsize; @@ -158,7 +157,7 @@ fn add_block_to_retransmit_queue( b: &SharedBlob, leader_id: Pubkey, recycler: &BlobRecycler, - retransmit_queue: &mut VecDeque, + retransmit_queue: &mut Vec, ) { let p = b .read() @@ -193,7 +192,7 @@ fn add_block_to_retransmit_queue( mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); } - retransmit_queue.push_back(nv); + retransmit_queue.push(nv); } } @@ -208,7 +207,7 @@ fn retransmit_all_leader_blocks( window: &SharedWindow, pending_retransmits: &mut bool, ) -> Result<()> { - let mut retransmit_queue: VecDeque = VecDeque::new(); + let mut retransmit_queue: Vec = Vec::new(); if let Some(leader) = maybe_leader { let leader_id = leader.id; for b in dq { @@ -364,7 +363,7 @@ fn process_blob( // window[k].data is None, end of received break; } - consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window")); + consume_queue.push(window[k].data.clone().expect("clone in fn recv_window")); *consumed += 1; } } @@ -450,8 +449,8 @@ fn recv_window( let mut pixs = Vec::new(); //send a contiguous set of blocks - let mut consume_queue = VecDeque::new(); - while let Some(b) = dq.pop_front() { + let mut consume_queue = Vec::new(); + for b in dq { let (pix, meta_size) = { let p = b.write().unwrap(); (p.get_index()?, p.meta.size) @@ -614,7 +613,7 @@ pub fn new_window_from_entries( blob_recycler: &BlobRecycler, ) -> SharedWindow { // convert to blobs - let mut blobs = VecDeque::new(); + let mut blobs = Vec::new(); ledger_tail.to_blobs(&blob_recycler, &mut blobs); // flatten deque to vec @@ -684,7 +683,6 @@ mod test { use crdt::{Crdt, Node}; use logger; use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; - use std::collections::VecDeque; use std::io; use std::io::Write; use std::net::UdpSocket; @@ -741,7 +739,7 @@ mod test { resp_recycler.clone(), r_responder, ); - let mut msgs = VecDeque::new(); + let mut msgs = Vec::new(); for i in 0..10 { let b = resp_recycler.allocate(); { @@ -750,7 +748,7 @@ mod test { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); } - msgs.push_back(b); + msgs.push(b); } s_responder.send(msgs).expect("send"); t_responder @@ -821,7 +819,7 @@ mod test { resp_recycler.clone(), r_responder, ); - let mut msgs = VecDeque::new(); + let mut msgs = Vec::new(); for v in 0..10 { let i = 9 - v; let b = resp_recycler.allocate(); @@ -833,7 +831,7 @@ mod test { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&tn.info.contact_info.ncp); } - msgs.push_back(b); + msgs.push(b); } s_responder.send(msgs).expect("send"); t_responder @@ -891,7 +889,7 @@ mod test { resp_recycler.clone(), r_responder, ); - let mut msgs = VecDeque::new(); + let mut msgs = Vec::new(); for v in 0..10 { let i = 9 - v; let b = resp_recycler.allocate(); @@ -903,7 +901,7 @@ mod test { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&tn.info.contact_info.ncp); } - msgs.push_back(b); + msgs.push(b); } s_responder.send(msgs).expect("send"); t_responder @@ -954,7 +952,7 @@ mod test { resp_recycler.clone(), r_responder, ); - let mut msgs = VecDeque::new(); + let mut msgs = Vec::new(); for v in 0..10 { let i = 9 - v; let b = resp_recycler.allocate(); @@ -966,7 +964,7 @@ mod test { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&tn.info.contact_info.ncp); } - msgs.push_back(b); + msgs.push(b); } s_responder.send(msgs).expect("send"); @@ -974,7 +972,7 @@ mod test { subs.write().unwrap().set_leader(me_id); - let mut msgs1 = VecDeque::new(); + let mut msgs1 = Vec::new(); for v in 1..5 { let i = 9 + v; let b = resp_recycler.allocate(); @@ -986,7 +984,7 @@ mod test { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&tn.info.contact_info.ncp); } - msgs1.push_back(b); + msgs1.push(b); } s_responder.send(msgs1).expect("send"); t_responder diff --git a/src/write_stage.rs b/src/write_stage.rs index 3cdd1d4b07..560d11acf6 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -12,7 +12,6 @@ use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use signature::Keypair; -use std::collections::VecDeque; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; @@ -55,7 +54,7 @@ impl WriteStage { //on a valid last id trace!("New blobs? {}", entries.len()); - let mut blobs = VecDeque::new(); + let mut blobs = Vec::new(); entries.to_blobs(blob_recycler, &mut blobs); if !blobs.is_empty() {