This commit is contained in:
Anatoly Yakovenko
2018-05-22 15:30:46 -07:00
parent bbe89df2ff
commit 021953d59a
4 changed files with 27 additions and 48 deletions

View File

@ -19,7 +19,6 @@ pub mod recorder;
pub mod replicate_stage; pub mod replicate_stage;
pub mod request; pub mod request;
pub mod request_processor; pub mod request_processor;
pub mod request_replicator;
pub mod request_stage; pub mod request_stage;
pub mod result; pub mod result;
pub mod rpu; pub mod rpu;

View File

@ -1,10 +1,13 @@
//! The `replicate_stage` replicates transactions broadcast by the leader. //! The `replicate_stage` replicates transactions broadcast by the leader.
use bank::Bank;
use ledger;
use packet; use packet;
use request_replicator::RequestReplicator; use result::Result;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer; use streamer;
pub struct ReplicateStage { pub struct ReplicateStage {
@ -12,14 +15,34 @@ pub struct ReplicateStage {
} }
impl ReplicateStage { impl ReplicateStage {
/// Process verified blobs, already in order
fn replicate_requests(
bank: &Arc<Bank>,
verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
let res = bank.process_verified_entries(entries);
if res.is_err() {
error!("process_verified_entries {} {:?}", blobs.len(), res);
}
res?;
for blob in blobs {
blob_recycler.recycle(blob);
}
Ok(())
}
pub fn new( pub fn new(
request_replicator: RequestReplicator, bank: Arc<Bank>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
window_receiver: streamer::BlobReceiver, window_receiver: streamer::BlobReceiver,
blob_recycler: packet::BlobRecycler, blob_recycler: packet::BlobRecycler,
) -> Self { ) -> Self {
let thread_hdl = spawn(move || loop { let thread_hdl = spawn(move || loop {
let e = request_replicator.replicate_requests(&window_receiver, &blob_recycler); let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) { if e.is_err() && exit.load(Ordering::Relaxed) {
break; break;
} }

View File

@ -1,41 +0,0 @@
//! The `request_replicator` is part of `replicator_stage` which replicates transactions broadcast
//! by the leader.
use bank::Bank;
use ledger;
use packet;
use result::Result;
use std::sync::Arc;
use std::time::Duration;
use streamer;
pub struct RequestReplicator {
bank: Arc<Bank>,
}
impl RequestReplicator {
/// Create a new Tvu that wraps the given Bank.
pub fn new(bank: Arc<Bank>) -> Self {
RequestReplicator { bank: bank }
}
/// Process verified blobs, already in order
pub fn replicate_requests(
&self,
verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
let res = self.bank.process_verified_entries(entries);
if res.is_err() {
error!("process_verified_entries {} {:?}", blobs.len(), res);
}
res?;
for blob in blobs {
blob_recycler.recycle(blob);
}
Ok(())
}
}

View File

@ -8,7 +8,6 @@ use hash::Hash;
use packet; use packet;
use record_stage::RecordStage; use record_stage::RecordStage;
use replicate_stage::ReplicateStage; use replicate_stage::ReplicateStage;
use request_replicator::RequestReplicator;
use result::Result; use result::Result;
use sig_verify_stage::SigVerifyStage; use sig_verify_stage::SigVerifyStage;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -112,9 +111,8 @@ impl Tvu {
retransmit_sender, retransmit_sender,
); );
let request_replicator = RequestReplicator::new(obj.bank.clone());
let replicate_stage = ReplicateStage::new( let replicate_stage = ReplicateStage::new(
request_replicator, obj.bank.clone(),
exit.clone(), exit.clone(),
window_receiver, window_receiver,
blob_recycler.clone(), blob_recycler.clone(),