Moved deserialization of blobs to entries from replicate_stage to window_service (#1287)

This commit is contained in:
carllin
2018-09-21 16:01:24 -07:00
committed by GitHub
parent a9355c33b2
commit c50ac96f75
5 changed files with 90 additions and 46 deletions

View File

@ -9,8 +9,12 @@ use rayon::prelude::*;
use signature::Pubkey; use signature::Pubkey;
use std::io::Cursor; use std::io::Cursor;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::mpsc::{Receiver, Sender};
use transaction::Transaction; use transaction::Transaction;
pub type EntrySender = Sender<Vec<Entry>>;
pub type EntryReceiver = Receiver<Vec<Entry>>;
/// Each Entry contains three pieces of data. The `num_hashes` field is the number /// Each Entry contains three pieces of data. The `num_hashes` field is the number
/// of hashes performed since the previous entry. The `id` field is the result /// of hashes performed since the previous entry. The `id` field is the result
/// of hashing `id` from the previous entry `num_hashes` times. The `transactions` /// of hashing `id` from the previous entry `num_hashes` times. The `transactions`

View File

@ -3,7 +3,8 @@
use bank::Bank; use bank::Bank;
use counter::Counter; use counter::Counter;
use crdt::Crdt; use crdt::Crdt;
use ledger::{reconstruct_entries_from_blobs, Block, LedgerWriter}; use entry::EntryReceiver;
use ledger::{Block, LedgerWriter};
use log::Level; use log::Level;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
@ -16,7 +17,7 @@ use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::{responder, BlobReceiver}; use streamer::responder;
use vote_stage::VoteStage; use vote_stage::VoteStage;
pub struct ReplicateStage { pub struct ReplicateStage {
@ -29,16 +30,15 @@ impl ReplicateStage {
fn replicate_requests( fn replicate_requests(
bank: &Arc<Bank>, bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
window_receiver: &BlobReceiver, window_receiver: &EntryReceiver,
ledger_writer: Option<&mut LedgerWriter>, ledger_writer: Option<&mut LedgerWriter>,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
//coalesce all the available blobs into a single vote //coalesce all the available entries into a single vote
let mut blobs = window_receiver.recv_timeout(timer)?; let mut entries = window_receiver.recv_timeout(timer)?;
while let Ok(mut more) = window_receiver.try_recv() { while let Ok(mut more) = window_receiver.try_recv() {
blobs.append(&mut more); entries.append(&mut more);
} }
let entries = reconstruct_entries_from_blobs(blobs)?;
let res = bank.process_entries(entries.clone()); let res = bank.process_entries(entries.clone());
@ -64,7 +64,7 @@ impl ReplicateStage {
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
bank: Arc<Bank>, bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
window_receiver: BlobReceiver, window_receiver: EntryReceiver,
ledger_path: Option<&str>, ledger_path: Option<&str>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {

View File

@ -2,13 +2,14 @@
use counter::Counter; use counter::Counter;
use crdt::Crdt; use crdt::Crdt;
use entry::Entry;
use log::Level; use log::Level;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
@ -68,23 +69,23 @@ impl RetransmitStage {
retransmit_socket: Arc<UdpSocket>, retransmit_socket: Arc<UdpSocket>,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver, fetch_stage_receiver: BlobReceiver,
) -> (Self, BlobReceiver) { ) -> (Self, Receiver<Vec<Entry>>) {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = retransmitter(retransmit_socket, crdt.clone(), retransmit_receiver); let t_retransmit = retransmitter(retransmit_socket, crdt.clone(), retransmit_receiver);
let (blob_sender, blob_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let t_window = window_service( let t_window = window_service(
crdt.clone(), crdt.clone(),
window, window,
entry_height, entry_height,
fetch_stage_receiver, fetch_stage_receiver,
blob_sender, entry_sender,
retransmit_sender, retransmit_sender,
repair_socket, repair_socket,
); );
let thread_hdls = vec![t_retransmit, t_window]; let thread_hdls = vec![t_retransmit, t_window];
(RetransmitStage { thread_hdls }, blob_receiver) (RetransmitStage { thread_hdls }, entry_receiver)
} }
} }

View File

@ -5,9 +5,9 @@ use crdt::{Crdt, NodeInfo};
use entry::Entry; use entry::Entry;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use erasure; use erasure;
use ledger::Block; use ledger::{reconstruct_entries_from_blobs, Block};
use log::Level; use log::Level;
use packet::{BlobRecycler, SharedBlob, SharedBlobs}; use packet::{BlobRecycler, SharedBlob};
use result::Result; use result::Result;
use signature::Pubkey; use signature::Pubkey;
use std::cmp; use std::cmp;
@ -67,7 +67,7 @@ pub trait WindowUtil {
id: &Pubkey, id: &Pubkey,
blob: SharedBlob, blob: SharedBlob,
pix: u64, pix: u64,
consume_queue: &mut SharedBlobs, consume_queue: &mut Vec<Entry>,
recycler: &BlobRecycler, recycler: &BlobRecycler,
consumed: &mut u64, consumed: &mut u64,
leader_unknown: bool, leader_unknown: bool,
@ -180,7 +180,7 @@ impl WindowUtil for Window {
id: &Pubkey, id: &Pubkey,
blob: SharedBlob, blob: SharedBlob,
pix: u64, pix: u64,
consume_queue: &mut SharedBlobs, consume_queue: &mut Vec<Entry>,
recycler: &BlobRecycler, recycler: &BlobRecycler,
consumed: &mut u64, consumed: &mut u64,
leader_unknown: bool, leader_unknown: bool,
@ -254,19 +254,33 @@ impl WindowUtil for Window {
let k = (*consumed % WINDOW_SIZE) as usize; let k = (*consumed % WINDOW_SIZE) as usize;
trace!("{}: k: {} consumed: {}", id, k, *consumed,); trace!("{}: k: {} consumed: {}", id, k, *consumed,);
if let Some(blob) = &self[k].data { let k_data_blob;
let k_data_slot = &mut self[k].data;
if let Some(blob) = k_data_slot {
if blob.read().get_index().unwrap() < *consumed { if blob.read().get_index().unwrap() < *consumed {
// window wrap-around, end of received // window wrap-around, end of received
break; break;
} }
k_data_blob = (*blob).clone();
} else { } else {
// self[k].data is None, end of received // self[k].data is None, end of received
break; break;
} }
let slot = self[k].clone();
if let Some(r) = slot.data { // Check that we can get the entries from this blob
consume_queue.push(r) match reconstruct_entries_from_blobs(vec![k_data_blob]) {
Ok(entries) => {
consume_queue.extend(entries);
}
Err(_) => {
// If the blob can't be deserialized, then remove it from the
// window and exit. *k_data_slot cannot be None at this point,
// so it's safe to unwrap.
k_data_slot.take();
break;
}
} }
*consumed += 1; *consumed += 1;
} }
} }

View File

@ -2,6 +2,7 @@
//! //!
use counter::Counter; use counter::Counter;
use crdt::{Crdt, NodeInfo}; use crdt::{Crdt, NodeInfo};
use entry::EntrySender;
use log::Level; use log::Level;
use packet::{BlobRecycler, SharedBlob}; use packet::{BlobRecycler, SharedBlob};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
@ -144,7 +145,7 @@ fn recv_window(
consumed: &mut u64, consumed: &mut u64,
received: &mut u64, received: &mut u64,
r: &BlobReceiver, r: &BlobReceiver,
s: &BlobSender, s: &EntrySender,
retransmit: &BlobSender, retransmit: &BlobSender,
pending_retransmits: &mut bool, pending_retransmits: &mut bool,
) -> Result<()> { ) -> Result<()> {
@ -232,7 +233,7 @@ pub fn window_service(
window: SharedWindow, window: SharedWindow,
entry_height: u64, entry_height: u64,
r: BlobReceiver, r: BlobReceiver,
s: BlobSender, s: EntrySender,
retransmit: BlobSender, retransmit: BlobSender,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
@ -295,25 +296,54 @@ pub fn window_service(
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crdt::{Crdt, Node}; use crdt::{Crdt, Node};
use entry::Entry;
use hash::Hash;
use logger; use logger;
use packet::{BlobRecycler, PACKET_DATA_SIZE}; use packet::{BlobRecycler, SharedBlobs, PACKET_DATA_SIZE};
use std::net::UdpSocket; use recorder::Recorder;
use signature::Pubkey;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
use streamer::{blob_receiver, responder, BlobReceiver}; use streamer::{blob_receiver, responder};
use window::default_window; use window::default_window;
use window_service::{repair_backoff, window_service}; use window_service::{repair_backoff, window_service};
fn get_blobs(r: BlobReceiver, num: &mut usize) { fn make_consecutive_blobs(
me_id: Pubkey,
mut num_blobs_to_make: u64,
start_hash: Hash,
addr: &SocketAddr,
resp_recycler: &BlobRecycler,
) -> SharedBlobs {
let mut msgs = Vec::new();
let mut recorder = Recorder::new(start_hash);
while num_blobs_to_make != 0 {
let new_entries = recorder.record(vec![]);
let mut new_blobs: SharedBlobs = new_entries
.iter()
.enumerate()
.map(|(i, e)| {
let blob_index = num_blobs_to_make - i as u64 - 1;
let new_blob =
e.to_blob(&resp_recycler, Some(blob_index), Some(me_id), Some(addr));
assert_eq!(blob_index, new_blob.read().get_index().unwrap());
new_blob
}).collect();
new_blobs.truncate(num_blobs_to_make as usize);
num_blobs_to_make -= new_blobs.len() as u64;
msgs.extend(new_blobs);
}
msgs
}
fn get_entries(r: Receiver<Vec<Entry>>, num: &mut usize) {
for _t in 0..5 { for _t in 0..5 {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
match r.recv_timeout(timer) { match r.recv_timeout(timer) {
Ok(m) => { Ok(m) => {
for (i, v) in m.iter().enumerate() {
assert_eq!(v.read().get_index().unwrap() as usize, *num + i);
}
*num += m.len(); *num += m.len();
} }
e => info!("error {:?}", e), e => info!("error {:?}", e),
@ -355,26 +385,21 @@ mod test {
tn.sockets.replicate.into_iter().map(Arc::new).collect(); tn.sockets.replicate.into_iter().map(Arc::new).collect();
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut msgs = Vec::new(); let mut num_blobs_to_make = 10;
for v in 0..10 { let gossip_address = &tn.info.contact_info.ncp;
let i = 9 - v; let msgs = make_consecutive_blobs(
let b = resp_recycler.allocate(); me_id,
{ num_blobs_to_make,
let mut w = b.write(); Hash::default(),
w.set_index(i).unwrap(); &gossip_address,
w.set_id(me_id).unwrap(); &resp_recycler,
assert_eq!(i, w.get_index().unwrap()); );
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs.push(b);
}
s_responder.send(msgs).expect("send"); s_responder.send(msgs).expect("send");
t_responder t_responder
}; };
let mut num = 0; let mut num = 0;
get_blobs(r_window, &mut num); get_entries(r_window, &mut num);
assert_eq!(num, 10); assert_eq!(num, 10);
let mut q = r_retransmit.recv().unwrap(); let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.try_recv() { while let Ok(mut nq) = r_retransmit.try_recv() {