From c50ac96f7577f23f897e6def9208c034ad404c59 Mon Sep 17 00:00:00 2001 From: carllin Date: Fri, 21 Sep 2018 16:01:24 -0700 Subject: [PATCH] Moved deserialization of blobs to entries from replicate_stage to window_service (#1287) --- src/entry.rs | 4 +++ src/replicate_stage.rs | 16 ++++----- src/retransmit_stage.rs | 11 +++--- src/window.rs | 30 ++++++++++++----- src/window_service.rs | 75 +++++++++++++++++++++++++++-------------- 5 files changed, 90 insertions(+), 46 deletions(-) diff --git a/src/entry.rs b/src/entry.rs index 03d569a5bd..365c382499 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -9,8 +9,12 @@ use rayon::prelude::*; use signature::Pubkey; use std::io::Cursor; use std::net::SocketAddr; +use std::sync::mpsc::{Receiver, Sender}; use transaction::Transaction; +pub type EntrySender = Sender>; +pub type EntryReceiver = Receiver>; + /// Each Entry contains three pieces of data. The `num_hashes` field is the number /// of hashes performed since the previous entry. The `id` field is the result /// of hashing `id` from the previous entry `num_hashes` times. The `transactions` diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index bcb35863b9..85e93fd657 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -3,7 +3,8 @@ use bank::Bank; use counter::Counter; use crdt::Crdt; -use ledger::{reconstruct_entries_from_blobs, Block, LedgerWriter}; +use entry::EntryReceiver; +use ledger::{Block, LedgerWriter}; use log::Level; use result::{Error, Result}; use service::Service; @@ -16,7 +17,7 @@ use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -use streamer::{responder, BlobReceiver}; +use streamer::responder; use vote_stage::VoteStage; pub struct ReplicateStage { @@ -29,16 +30,15 @@ impl ReplicateStage { fn replicate_requests( bank: &Arc, crdt: &Arc>, - window_receiver: &BlobReceiver, + window_receiver: &EntryReceiver, ledger_writer: Option<&mut LedgerWriter>, ) -> Result<()> { let timer = Duration::new(1, 0); - //coalesce all the available blobs into a single vote - let mut blobs = window_receiver.recv_timeout(timer)?; + //coalesce all the available entries into a single vote + let mut entries = window_receiver.recv_timeout(timer)?; while let Ok(mut more) = window_receiver.try_recv() { - blobs.append(&mut more); + entries.append(&mut more); } - let entries = reconstruct_entries_from_blobs(blobs)?; let res = bank.process_entries(entries.clone()); @@ -64,7 +64,7 @@ impl ReplicateStage { keypair: Arc, bank: Arc, crdt: Arc>, - window_receiver: BlobReceiver, + window_receiver: EntryReceiver, ledger_path: Option<&str>, exit: Arc, ) -> Self { diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 1f18aad194..bd15dd3409 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -2,13 +2,14 @@ use counter::Counter; use crdt::Crdt; +use entry::Entry; use log::Level; use result::{Error, Result}; use service::Service; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::channel; use std::sync::mpsc::RecvTimeoutError; +use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -68,23 +69,23 @@ impl RetransmitStage { retransmit_socket: Arc, repair_socket: Arc, fetch_stage_receiver: BlobReceiver, - ) -> (Self, BlobReceiver) { + ) -> (Self, Receiver>) { let (retransmit_sender, retransmit_receiver) = channel(); let t_retransmit = retransmitter(retransmit_socket, crdt.clone(), retransmit_receiver); - let (blob_sender, blob_receiver) = channel(); + let (entry_sender, entry_receiver) = channel(); let t_window = window_service( crdt.clone(), window, entry_height, fetch_stage_receiver, - blob_sender, + entry_sender, retransmit_sender, repair_socket, ); let thread_hdls = vec![t_retransmit, t_window]; - (RetransmitStage { thread_hdls }, blob_receiver) + (RetransmitStage { thread_hdls }, entry_receiver) } } diff --git a/src/window.rs b/src/window.rs index e2f0591bc3..a725f670ae 100644 --- a/src/window.rs +++ b/src/window.rs @@ -5,9 +5,9 @@ use crdt::{Crdt, NodeInfo}; use entry::Entry; #[cfg(feature = "erasure")] use erasure; -use ledger::Block; +use ledger::{reconstruct_entries_from_blobs, Block}; use log::Level; -use packet::{BlobRecycler, SharedBlob, SharedBlobs}; +use packet::{BlobRecycler, SharedBlob}; use result::Result; use signature::Pubkey; use std::cmp; @@ -67,7 +67,7 @@ pub trait WindowUtil { id: &Pubkey, blob: SharedBlob, pix: u64, - consume_queue: &mut SharedBlobs, + consume_queue: &mut Vec, recycler: &BlobRecycler, consumed: &mut u64, leader_unknown: bool, @@ -180,7 +180,7 @@ impl WindowUtil for Window { id: &Pubkey, blob: SharedBlob, pix: u64, - consume_queue: &mut SharedBlobs, + consume_queue: &mut Vec, recycler: &BlobRecycler, consumed: &mut u64, leader_unknown: bool, @@ -254,19 +254,33 @@ impl WindowUtil for Window { let k = (*consumed % WINDOW_SIZE) as usize; trace!("{}: k: {} consumed: {}", id, k, *consumed,); - if let Some(blob) = &self[k].data { + let k_data_blob; + let k_data_slot = &mut self[k].data; + if let Some(blob) = k_data_slot { if blob.read().get_index().unwrap() < *consumed { // window wrap-around, end of received break; } + k_data_blob = (*blob).clone(); } else { // self[k].data is None, end of received break; } - let slot = self[k].clone(); - if let Some(r) = slot.data { - consume_queue.push(r) + + // Check that we can get the entries from this blob + match reconstruct_entries_from_blobs(vec![k_data_blob]) { + Ok(entries) => { + consume_queue.extend(entries); + } + Err(_) => { + // If the blob can't be deserialized, then remove it from the + // window and exit. *k_data_slot cannot be None at this point, + // so it's safe to unwrap. + k_data_slot.take(); + break; + } } + *consumed += 1; } } diff --git a/src/window_service.rs b/src/window_service.rs index 064884c8c4..ea549cc0ec 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -2,6 +2,7 @@ //! use counter::Counter; use crdt::{Crdt, NodeInfo}; +use entry::EntrySender; use log::Level; use packet::{BlobRecycler, SharedBlob}; use rand::{thread_rng, Rng}; @@ -144,7 +145,7 @@ fn recv_window( consumed: &mut u64, received: &mut u64, r: &BlobReceiver, - s: &BlobSender, + s: &EntrySender, retransmit: &BlobSender, pending_retransmits: &mut bool, ) -> Result<()> { @@ -232,7 +233,7 @@ pub fn window_service( window: SharedWindow, entry_height: u64, r: BlobReceiver, - s: BlobSender, + s: EntrySender, retransmit: BlobSender, repair_socket: Arc, ) -> JoinHandle<()> { @@ -295,25 +296,54 @@ pub fn window_service( #[cfg(test)] mod test { use crdt::{Crdt, Node}; + use entry::Entry; + use hash::Hash; use logger; - use packet::{BlobRecycler, PACKET_DATA_SIZE}; - use std::net::UdpSocket; + use packet::{BlobRecycler, SharedBlobs, PACKET_DATA_SIZE}; + use recorder::Recorder; + use signature::Pubkey; + use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::channel; + use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; use std::time::Duration; - use streamer::{blob_receiver, responder, BlobReceiver}; + use streamer::{blob_receiver, responder}; use window::default_window; use window_service::{repair_backoff, window_service}; - fn get_blobs(r: BlobReceiver, num: &mut usize) { + fn make_consecutive_blobs( + me_id: Pubkey, + mut num_blobs_to_make: u64, + start_hash: Hash, + addr: &SocketAddr, + resp_recycler: &BlobRecycler, + ) -> SharedBlobs { + let mut msgs = Vec::new(); + let mut recorder = Recorder::new(start_hash); + while num_blobs_to_make != 0 { + let new_entries = recorder.record(vec![]); + let mut new_blobs: SharedBlobs = new_entries + .iter() + .enumerate() + .map(|(i, e)| { + let blob_index = num_blobs_to_make - i as u64 - 1; + let new_blob = + e.to_blob(&resp_recycler, Some(blob_index), Some(me_id), Some(addr)); + assert_eq!(blob_index, new_blob.read().get_index().unwrap()); + new_blob + }).collect(); + new_blobs.truncate(num_blobs_to_make as usize); + num_blobs_to_make -= new_blobs.len() as u64; + msgs.extend(new_blobs); + } + msgs + } + + fn get_entries(r: Receiver>, num: &mut usize) { for _t in 0..5 { let timer = Duration::new(1, 0); match r.recv_timeout(timer) { Ok(m) => { - for (i, v) in m.iter().enumerate() { - assert_eq!(v.read().get_index().unwrap() as usize, *num + i); - } *num += m.len(); } e => info!("error {:?}", e), @@ -355,26 +385,21 @@ mod test { tn.sockets.replicate.into_iter().map(Arc::new).collect(); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); - let mut msgs = Vec::new(); - for v in 0..10 { - let i = 9 - v; - let b = resp_recycler.allocate(); - { - let mut w = b.write(); - w.set_index(i).unwrap(); - w.set_id(me_id).unwrap(); - assert_eq!(i, w.get_index().unwrap()); - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.info.contact_info.ncp); - } - msgs.push(b); - } + let mut num_blobs_to_make = 10; + let gossip_address = &tn.info.contact_info.ncp; + let msgs = make_consecutive_blobs( + me_id, + num_blobs_to_make, + Hash::default(), + &gossip_address, + &resp_recycler, + ); s_responder.send(msgs).expect("send"); t_responder }; let mut num = 0; - get_blobs(r_window, &mut num); + get_entries(r_window, &mut num); assert_eq!(num, 10); let mut q = r_retransmit.recv().unwrap(); while let Ok(mut nq) = r_retransmit.try_recv() {