Move vote into ReplicateStage after process_entries

This commit is contained in:
Tyera Eulberg
2018-09-10 09:32:52 -06:00
committed by Greg Fitzgerald
parent 8f0e0c4440
commit 751dd7eebb
3 changed files with 24 additions and 18 deletions

View File

@ -6,23 +6,22 @@ use crdt::Crdt;
use entry::EntryReceiver; use entry::EntryReceiver;
use ledger::{Block, LedgerWriter}; use ledger::{Block, LedgerWriter};
use log::Level; use log::Level;
use packet::BlobRecycler;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
use signature::Keypair; use signature::Keypair;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::responder; use streamer::{responder, BlobSender};
use vote_stage::VoteStage; use vote_stage::send_validator_vote;
pub struct ReplicateStage { pub struct ReplicateStage {
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
vote_stage: VoteStage,
} }
impl ReplicateStage { impl ReplicateStage {
@ -30,8 +29,11 @@ impl ReplicateStage {
fn replicate_requests( fn replicate_requests(
bank: &Arc<Bank>, bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
window_receiver: &EntryReceiver, window_receiver: &EntryReceiver,
ledger_writer: Option<&mut LedgerWriter>, ledger_writer: Option<&mut LedgerWriter>,
keypair: &Arc<Keypair>,
vote_blob_sender: &BlobSender,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
//coalesce all the available entries into a single vote //coalesce all the available entries into a single vote
@ -42,6 +44,11 @@ impl ReplicateStage {
let res = bank.process_entries(&entries); let res = bank.process_entries(&entries);
if let Err(err) = send_validator_vote(bank, keypair, crdt, blob_recycler, vote_blob_sender)
{
info!("Vote failed: {:?}", err);
}
{ {
let mut wcrdt = crdt.write().unwrap(); let mut wcrdt = crdt.write().unwrap();
wcrdt.insert_votes(&entries.votes()); wcrdt.insert_votes(&entries.votes());
@ -66,23 +73,27 @@ impl ReplicateStage {
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
window_receiver: EntryReceiver, window_receiver: EntryReceiver,
ledger_path: Option<&str>, ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
let (vote_blob_sender, vote_blob_receiver) = channel(); let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
let t_responder = responder("replicate_stage", Arc::new(send), vote_blob_receiver); let t_responder = responder("replicate_stage", Arc::new(send), vote_blob_receiver);
let vote_stage =
VoteStage::new(keypair, bank.clone(), crdt.clone(), vote_blob_sender, exit);
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap()); let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap());
let keypair = Arc::new(keypair);
let blob_recycler = BlobRecycler::default();
let t_replicate = Builder::new() let t_replicate = Builder::new()
.name("solana-replicate-stage".to_string()) .name("solana-replicate-stage".to_string())
.spawn(move || loop { .spawn(move || loop {
if let Err(e) = if let Err(e) = Self::replicate_requests(
Self::replicate_requests(&bank, &crdt, &window_receiver, ledger_writer.as_mut()) &bank,
{ &crdt,
&blob_recycler,
&window_receiver,
ledger_writer.as_mut(),
&keypair,
&vote_blob_sender,
) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
@ -93,10 +104,7 @@ impl ReplicateStage {
let thread_hdls = vec![t_responder, t_replicate]; let thread_hdls = vec![t_responder, t_replicate];
ReplicateStage { ReplicateStage { thread_hdls }
thread_hdls,
vote_stage,
}
} }
} }
@ -107,7 +115,6 @@ impl Service for ReplicateStage {
for thread_hdl in self.thread_hdls { for thread_hdl in self.thread_hdls {
thread_hdl.join()?; thread_hdl.join()?;
} }
self.vote_stage.join()?;
Ok(()) Ok(())
} }
} }

View File

@ -104,7 +104,6 @@ impl Tvu {
crdt, crdt,
blob_window_receiver, blob_window_receiver,
ledger_path, ledger_path,
exit,
); );
Tvu { Tvu {

View File

@ -153,7 +153,7 @@ pub fn send_leader_vote(
Ok(()) Ok(())
} }
fn send_validator_vote( pub fn send_validator_vote(
bank: &Arc<Bank>, bank: &Arc<Bank>,
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
crdt: &Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,