This commit is contained in:
Anatoly Yakovenko
2018-05-22 15:17:59 -07:00
parent 26272a3600
commit a638ec5911
4 changed files with 23 additions and 64 deletions

View File

@ -15,6 +15,8 @@ pub mod mint;
pub mod packet; pub mod packet;
pub mod plan; pub mod plan;
pub mod record_stage; pub mod record_stage;
pub mod request_replicator;
pub mod replicate_stage;
pub mod recorder; pub mod recorder;
pub mod request; pub mod request;
pub mod request_processor; pub mod request_processor;

View File

@ -1,22 +1,11 @@
//! The `replicate_stage` replicates transactions broadcast by the leader. //! The `replicate_stage` replicates transactions broadcast by the leader.
use bank::Bank;
use banking_stage::BankingStage;
use crdt::{Crdt, ReplicatedData};
use hash::Hash;
use ledger;
use packet; use packet;
use record_stage::RecordStage; use request_replicator::RequestReplicator;
use result::Result;
use sig_verify_stage::SigVerifyStage;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer; use streamer;
use write_stage::WriteStage;
pub struct ReplicateStage { pub struct ReplicateStage {
pub thread_hdl: JoinHandle<()>, pub thread_hdl: JoinHandle<()>,
@ -24,13 +13,13 @@ pub struct ReplicateStage {
impl ReplicateStage { impl ReplicateStage {
pub fn new(request_replicator: RequestReplicator, exit: Arc<AtomicBool>, window_receiver: streamer::BlobReceiver, blob_recycler: &packet::BlobRecycler) -> Self { pub fn new(request_replicator: RequestReplicator, exit: Arc<AtomicBool>, window_receiver: streamer::BlobReceiver, blob_recycler: packet::BlobRecycler) -> Self {
let thread_hdl = spawn(move || loop { let thread_hdl = spawn(move || loop {
let e = request_replicator.replicate_requests(&window_receiver, &blob_recycler); let e = request_replicator.replicate_requests(&window_receiver, &blob_recycler);
if e.is_err() && s_exit.load(Ordering::Relaxed) { if e.is_err() && exit.load(Ordering::Relaxed) {
break; break;
} }
}); });
ReplicateStage{thread_hdl}; ReplicateStage{thread_hdl}
} }
} }

View File

@ -2,32 +2,22 @@
//! by the leader. //! by the leader.
use bank::Bank; use bank::Bank;
use banking_stage::BankingStage;
use crdt::{Crdt, ReplicatedData};
use hash::Hash;
use ledger; use ledger;
use packet; use packet;
use record_stage::RecordStage;
use result::Result; use result::Result;
use sig_verify_stage::SigVerifyStage;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer; use streamer;
use write_stage::WriteStage; use std::sync::Arc;
pub struct RequestReplicator { pub struct RequestReplicator {
bank: Arc<Bank>, bank: Arc<Bank>,
} }
impl Tvu { impl RequestReplicator {
/// Create a new Tvu that wraps the given Bank. /// Create a new Tvu that wraps the given Bank.
pub fn new(bank: Bank) -> Self { pub fn new(bank: Arc<Bank>) -> Self {
RequestReplicator { RequestReplicator {
bank: Arc::new(bank), bank: bank,
} }
} }

View File

@ -5,16 +5,17 @@ use bank::Bank;
use banking_stage::BankingStage; use banking_stage::BankingStage;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use hash::Hash; use hash::Hash;
use ledger;
use packet; use packet;
use record_stage::RecordStage; use record_stage::RecordStage;
use request_replicator::RequestReplicator;
use replicate_stage::ReplicateStage;
use result::Result; use result::Result;
use sig_verify_stage::SigVerifyStage; use sig_verify_stage::SigVerifyStage;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::JoinHandle;
use std::time::Duration; use std::time::Duration;
use streamer; use streamer;
use write_stage::WriteStage; use write_stage::WriteStage;
@ -35,27 +36,6 @@ impl Tvu {
} }
} }
/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
obj: &Tvu,
verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
let res = obj.bank.process_verified_entries(entries);
if res.is_err() {
error!("process_verified_entries {} {:?}", blobs.len(), res);
}
res?;
for blob in blobs {
blob_recycler.recycle(blob);
}
Ok(())
}
/// This service receives messages from a leader in the network and processes the transactions /// This service receives messages from a leader in the network and processes the transactions
/// on the bank state. /// on the bank state.
/// # Arguments /// # Arguments
@ -132,14 +112,13 @@ impl Tvu {
retransmit_sender, retransmit_sender,
); );
let tvu = obj.clone(); let request_replicator = RequestReplicator::new(obj.bank.clone());
let s_exit = exit.clone(); let replicate_stage = ReplicateStage::new(
let t_replicator = spawn(move || loop { request_replicator,
let e = Self::replicate_state(&tvu, &window_receiver, &blob_recycler); exit.clone(),
if e.is_err() && s_exit.load(Ordering::Relaxed) { window_receiver,
break; blob_recycler.clone(),
} );
});
//serve pipeline //serve pipeline
// make sure we are on the same interface // make sure we are on the same interface
@ -178,7 +157,7 @@ impl Tvu {
t_blob_receiver, t_blob_receiver,
t_retransmit, t_retransmit,
t_window, t_window,
t_replicator, replicate_stage.thread_hdl,
t_gossip, t_gossip,
t_listen, t_listen,
//serve threads //serve threads
@ -375,5 +354,4 @@ mod tests {
t_l_gossip.join().expect("join"); t_l_gossip.join().expect("join");
t_l_listen.join().expect("join"); t_l_listen.join().expect("join");
} }
} }