2018-05-12 10:53:25 -06:00
|
|
|
//! The `rpu` module implements the Request Processing Unit, a
|
2018-05-08 17:32:50 -06:00
|
|
|
//! 5-stage transaction processing pipeline in software.
|
2018-03-29 12:20:54 -06:00
|
|
|
|
2018-05-09 14:56:34 -06:00
|
|
|
use accounting_stage::AccountingStage;
|
2018-04-28 00:31:20 -07:00
|
|
|
use crdt::{Crdt, ReplicatedData};
|
2018-05-11 22:36:16 -06:00
|
|
|
use entry_writer::EntryWriter;
|
2018-03-26 21:07:11 -07:00
|
|
|
use packet;
|
2018-05-12 10:31:28 -06:00
|
|
|
use request_stage::{RequestProcessor, RequestStage};
|
2018-03-10 22:09:17 -06:00
|
|
|
use result::Result;
|
2018-05-11 21:51:37 -06:00
|
|
|
use sig_verify_stage::SigVerifyStage;
|
2018-05-11 11:38:52 -07:00
|
|
|
use std::io::Write;
|
2018-05-10 15:30:18 -06:00
|
|
|
use std::net::UdpSocket;
|
2018-03-22 14:05:23 -06:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2018-05-11 23:37:44 -06:00
|
|
|
use std::sync::mpsc::channel;
|
2018-04-18 12:02:54 -07:00
|
|
|
use std::sync::{Arc, Mutex, RwLock};
|
2018-03-10 22:09:17 -06:00
|
|
|
use std::thread::{spawn, JoinHandle};
|
2018-03-26 22:03:26 -06:00
|
|
|
use streamer;
|
2018-02-28 10:07:54 -07:00
|
|
|
|
2018-05-12 10:53:25 -06:00
|
|
|
pub struct Rpu {
|
2018-05-11 23:37:44 -06:00
|
|
|
accounting_stage: Arc<AccountingStage>,
|
|
|
|
request_processor: Arc<RequestProcessor>,
|
2018-03-29 13:18:08 -06:00
|
|
|
}
|
|
|
|
|
2018-05-12 10:53:25 -06:00
|
|
|
impl Rpu {
|
|
|
|
/// Create a new Rpu that wraps the given Accountant.
|
2018-05-09 16:14:40 -06:00
|
|
|
pub fn new(accounting_stage: AccountingStage) -> Self {
|
2018-05-11 23:07:44 -06:00
|
|
|
let request_processor = RequestProcessor::new(accounting_stage.accountant.clone());
|
2018-05-12 10:53:25 -06:00
|
|
|
Rpu {
|
2018-05-11 23:37:44 -06:00
|
|
|
accounting_stage: Arc::new(accounting_stage),
|
|
|
|
request_processor: Arc::new(request_processor),
|
2018-04-17 17:31:52 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-11 20:50:50 -06:00
|
|
|
pub fn write_service<W: Write + Send + 'static>(
|
2018-05-11 23:51:35 -06:00
|
|
|
accounting_stage: Arc<AccountingStage>,
|
|
|
|
request_processor: Arc<RequestProcessor>,
|
2018-04-28 00:31:20 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
broadcast: streamer::BlobSender,
|
|
|
|
blob_recycler: packet::BlobRecycler,
|
2018-05-09 15:27:33 -06:00
|
|
|
writer: Mutex<W>,
|
2018-05-07 21:44:44 -06:00
|
|
|
) -> JoinHandle<()> {
|
|
|
|
spawn(move || loop {
|
2018-05-11 23:51:35 -06:00
|
|
|
let entry_writer = EntryWriter::new(&accounting_stage, &request_processor);
|
2018-05-11 22:36:16 -06:00
|
|
|
let _ = entry_writer.write_and_send_entries(&broadcast, &blob_recycler, &writer);
|
2018-05-07 21:44:44 -06:00
|
|
|
if exit.load(Ordering::Relaxed) {
|
2018-05-11 20:18:04 -06:00
|
|
|
info!("broadcat_service exiting");
|
2018-05-07 21:44:44 -06:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2018-05-11 23:51:35 -06:00
|
|
|
pub fn drain_service(
|
|
|
|
accounting_stage: Arc<AccountingStage>,
|
|
|
|
request_processor: Arc<RequestProcessor>,
|
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
) -> JoinHandle<()> {
|
2018-05-11 22:36:16 -06:00
|
|
|
spawn(move || {
|
2018-05-11 23:51:35 -06:00
|
|
|
let entry_writer = EntryWriter::new(&accounting_stage, &request_processor);
|
2018-05-11 22:36:16 -06:00
|
|
|
loop {
|
|
|
|
let _ = entry_writer.drain_entries();
|
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
info!("drain_service exiting");
|
|
|
|
break;
|
|
|
|
}
|
2018-04-28 00:31:20 -07:00
|
|
|
}
|
|
|
|
})
|
2018-02-28 14:16:50 -07:00
|
|
|
}
|
|
|
|
|
2018-05-12 10:53:25 -06:00
|
|
|
/// Create a UDP microservice that forwards messages the given Rpu.
|
2018-04-17 19:26:19 -07:00
|
|
|
/// This service is the network leader
|
2018-03-29 12:20:54 -06:00
|
|
|
/// Set `exit` to shutdown its threads.
|
2018-04-28 00:31:20 -07:00
|
|
|
pub fn serve<W: Write + Send + 'static>(
|
2018-05-12 00:19:12 -06:00
|
|
|
&self,
|
2018-04-28 00:31:20 -07:00
|
|
|
me: ReplicatedData,
|
2018-05-11 16:41:35 -06:00
|
|
|
requests_socket: UdpSocket,
|
2018-04-28 00:31:20 -07:00
|
|
|
gossip: UdpSocket,
|
2018-03-22 14:05:23 -06:00
|
|
|
exit: Arc<AtomicBool>,
|
2018-04-28 00:31:20 -07:00
|
|
|
writer: W,
|
2018-03-23 21:49:28 -06:00
|
|
|
) -> Result<Vec<JoinHandle<()>>> {
|
2018-04-28 00:31:20 -07:00
|
|
|
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
|
|
|
|
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
|
|
|
|
let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());
|
|
|
|
|
2018-03-10 22:09:17 -06:00
|
|
|
// make sure we are on the same interface
|
2018-05-11 16:41:35 -06:00
|
|
|
let mut local = requests_socket.local_addr()?;
|
2018-03-10 22:09:17 -06:00
|
|
|
local.set_port(0);
|
2018-02-28 10:07:54 -07:00
|
|
|
|
2018-04-02 19:32:58 -07:00
|
|
|
let packet_recycler = packet::PacketRecycler::default();
|
|
|
|
let (packet_sender, packet_receiver) = channel();
|
2018-05-11 16:41:35 -06:00
|
|
|
let t_receiver = streamer::receiver(
|
|
|
|
requests_socket,
|
|
|
|
exit.clone(),
|
|
|
|
packet_recycler.clone(),
|
|
|
|
packet_sender,
|
|
|
|
)?;
|
2018-03-26 21:07:11 -07:00
|
|
|
|
2018-05-11 21:51:37 -06:00
|
|
|
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
|
2018-03-26 21:07:11 -07:00
|
|
|
|
2018-05-11 20:50:50 -06:00
|
|
|
let blob_recycler = packet::BlobRecycler::default();
|
2018-05-12 10:31:28 -06:00
|
|
|
let request_stage = RequestStage::new(
|
2018-05-12 00:19:12 -06:00
|
|
|
self.request_processor.clone(),
|
|
|
|
self.accounting_stage.clone(),
|
2018-05-11 20:50:50 -06:00
|
|
|
exit.clone(),
|
2018-05-11 21:51:37 -06:00
|
|
|
sig_verify_stage.output,
|
2018-05-11 20:50:50 -06:00
|
|
|
packet_recycler.clone(),
|
|
|
|
blob_recycler.clone(),
|
|
|
|
);
|
|
|
|
|
2018-04-28 00:31:20 -07:00
|
|
|
let (broadcast_sender, broadcast_receiver) = channel();
|
2018-05-11 20:50:50 -06:00
|
|
|
let t_write = Self::write_service(
|
2018-05-12 00:19:12 -06:00
|
|
|
self.accounting_stage.clone(),
|
|
|
|
self.request_processor.clone(),
|
2018-05-11 20:11:25 -06:00
|
|
|
exit.clone(),
|
|
|
|
broadcast_sender,
|
|
|
|
blob_recycler.clone(),
|
|
|
|
Mutex::new(writer),
|
|
|
|
);
|
2018-04-28 00:31:20 -07:00
|
|
|
|
|
|
|
let broadcast_socket = UdpSocket::bind(local)?;
|
|
|
|
let t_broadcast = streamer::broadcaster(
|
|
|
|
broadcast_socket,
|
|
|
|
exit.clone(),
|
|
|
|
crdt.clone(),
|
|
|
|
blob_recycler.clone(),
|
|
|
|
broadcast_receiver,
|
|
|
|
);
|
|
|
|
|
2018-05-11 20:50:50 -06:00
|
|
|
let respond_socket = UdpSocket::bind(local.clone())?;
|
2018-05-11 20:11:25 -06:00
|
|
|
let t_responder = streamer::responder(
|
|
|
|
respond_socket,
|
2018-04-28 00:31:20 -07:00
|
|
|
exit.clone(),
|
|
|
|
blob_recycler.clone(),
|
2018-05-12 10:31:28 -06:00
|
|
|
request_stage.output,
|
2018-04-28 00:31:20 -07:00
|
|
|
);
|
|
|
|
|
2018-05-07 16:49:15 -07:00
|
|
|
let mut threads = vec![
|
2018-04-28 00:31:20 -07:00
|
|
|
t_receiver,
|
|
|
|
t_responder,
|
2018-05-12 10:31:28 -06:00
|
|
|
request_stage.thread_hdl,
|
2018-05-11 20:50:50 -06:00
|
|
|
t_write,
|
2018-04-28 00:31:20 -07:00
|
|
|
t_gossip,
|
|
|
|
t_listen,
|
|
|
|
t_broadcast,
|
2018-05-07 16:49:15 -07:00
|
|
|
];
|
2018-05-11 21:51:37 -06:00
|
|
|
threads.extend(sig_verify_stage.thread_hdls.into_iter());
|
2018-05-07 16:49:15 -07:00
|
|
|
Ok(threads)
|
2018-03-26 21:07:11 -07:00
|
|
|
}
|
2018-04-11 14:05:29 -06:00
|
|
|
}
|