@ -16,6 +16,7 @@ pub mod packet;
|
||||
pub mod plan;
|
||||
pub mod record_stage;
|
||||
pub mod recorder;
|
||||
pub mod replicate_stage;
|
||||
pub mod request;
|
||||
pub mod request_processor;
|
||||
pub mod request_stage;
|
||||
|
52
src/replicate_stage.rs
Normal file
52
src/replicate_stage.rs
Normal file
@ -0,0 +1,52 @@
|
||||
//! The `replicate_stage` replicates transactions broadcast by the leader.
|
||||
|
||||
use bank::Bank;
|
||||
use ledger;
|
||||
use packet;
|
||||
use result::Result;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use streamer;
|
||||
|
||||
pub struct ReplicateStage {
|
||||
pub thread_hdl: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl ReplicateStage {
|
||||
/// Process verified blobs, already in order
|
||||
fn replicate_requests(
|
||||
bank: &Arc<Bank>,
|
||||
verified_receiver: &streamer::BlobReceiver,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let blobs = verified_receiver.recv_timeout(timer)?;
|
||||
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
|
||||
let res = bank.process_verified_entries(entries);
|
||||
if res.is_err() {
|
||||
error!("process_verified_entries {} {:?}", blobs.len(), res);
|
||||
}
|
||||
res?;
|
||||
for blob in blobs {
|
||||
blob_recycler.recycle(blob);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
bank: Arc<Bank>,
|
||||
exit: Arc<AtomicBool>,
|
||||
window_receiver: streamer::BlobReceiver,
|
||||
blob_recycler: packet::BlobRecycler,
|
||||
) -> Self {
|
||||
let thread_hdl = spawn(move || loop {
|
||||
let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler);
|
||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
});
|
||||
ReplicateStage { thread_hdl }
|
||||
}
|
||||
}
|
44
src/tvu.rs
44
src/tvu.rs
@ -5,16 +5,16 @@ use bank::Bank;
|
||||
use banking_stage::BankingStage;
|
||||
use crdt::{Crdt, ReplicatedData};
|
||||
use hash::Hash;
|
||||
use ledger;
|
||||
use packet;
|
||||
use record_stage::RecordStage;
|
||||
use replicate_stage::ReplicateStage;
|
||||
use result::Result;
|
||||
use sig_verify_stage::SigVerifyStage;
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Duration;
|
||||
use streamer;
|
||||
use write_stage::WriteStage;
|
||||
@ -35,27 +35,6 @@ impl Tvu {
|
||||
}
|
||||
}
|
||||
|
||||
/// Process verified blobs, already in order
|
||||
/// Respond with a signed hash of the state
|
||||
fn replicate_state(
|
||||
obj: &Tvu,
|
||||
verified_receiver: &streamer::BlobReceiver,
|
||||
blob_recycler: &packet::BlobRecycler,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let blobs = verified_receiver.recv_timeout(timer)?;
|
||||
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
|
||||
let res = obj.bank.process_verified_entries(entries);
|
||||
if res.is_err() {
|
||||
error!("process_verified_entries {} {:?}", blobs.len(), res);
|
||||
}
|
||||
res?;
|
||||
for blob in blobs {
|
||||
blob_recycler.recycle(blob);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This service receives messages from a leader in the network and processes the transactions
|
||||
/// on the bank state.
|
||||
/// # Arguments
|
||||
@ -132,14 +111,12 @@ impl Tvu {
|
||||
retransmit_sender,
|
||||
);
|
||||
|
||||
let tvu = obj.clone();
|
||||
let s_exit = exit.clone();
|
||||
let t_replicator = spawn(move || loop {
|
||||
let e = Self::replicate_state(&tvu, &window_receiver, &blob_recycler);
|
||||
if e.is_err() && s_exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
});
|
||||
let replicate_stage = ReplicateStage::new(
|
||||
obj.bank.clone(),
|
||||
exit.clone(),
|
||||
window_receiver,
|
||||
blob_recycler.clone(),
|
||||
);
|
||||
|
||||
//serve pipeline
|
||||
// make sure we are on the same interface
|
||||
@ -178,7 +155,7 @@ impl Tvu {
|
||||
t_blob_receiver,
|
||||
t_retransmit,
|
||||
t_window,
|
||||
t_replicator,
|
||||
replicate_stage.thread_hdl,
|
||||
t_gossip,
|
||||
t_listen,
|
||||
//serve threads
|
||||
@ -375,5 +352,4 @@ mod tests {
|
||||
t_l_gossip.join().expect("join");
|
||||
t_l_listen.join().expect("join");
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user