Changed the window_service in Replicator to send entries instead of blobs (#1302)

This commit is contained in:
carllin
2018-09-21 16:50:58 -07:00
committed by GitHub
parent e49b8f0ce7
commit 5ab38afa51
2 changed files with 11 additions and 12 deletions

View File

@ -49,7 +49,7 @@ impl Replicator {
let (fetch_stage, blob_fetch_receiver) = let (fetch_stage, blob_fetch_receiver) =
BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); BlobFetchStage::new_multi_socket(blob_sockets, exit.clone());
let (blob_window_sender, blob_window_receiver) = channel(); let (entry_window_sender, entry_window_receiver) = channel();
// todo: pull blobs off the retransmit_receiver and recycle them? // todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
let t_window = window_service( let t_window = window_service(
@ -57,12 +57,12 @@ impl Replicator {
shared_window.clone(), shared_window.clone(),
entry_height, entry_height,
blob_fetch_receiver, blob_fetch_receiver,
blob_window_sender, entry_window_sender,
retransmit_sender, retransmit_sender,
repair_socket, repair_socket,
); );
let store_ledger_stage = StoreLedgerStage::new(blob_window_receiver, ledger_path); let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path);
let ncp = Ncp::new( let ncp = Ncp::new(
&crdt, &crdt,

View File

@ -1,7 +1,8 @@
//! The `store_ledger` stores the ledger from received blobs for storage nodes //! The `store_ledger` stores the ledger from received entries for storage nodes
use counter::Counter; use counter::Counter;
use ledger::{reconstruct_entries_from_blobs, LedgerWriter}; use entry::EntryReceiver;
use ledger::LedgerWriter;
use log::Level; use log::Level;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
@ -9,24 +10,22 @@ use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::RecvTimeoutError;
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::BlobReceiver;
pub struct StoreLedgerStage { pub struct StoreLedgerStage {
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
impl StoreLedgerStage { impl StoreLedgerStage {
/// Process entry blobs, already in order /// Process entries, already in order
fn store_requests( fn store_requests(
window_receiver: &BlobReceiver, window_receiver: &EntryReceiver,
ledger_writer: Option<&mut LedgerWriter>, ledger_writer: Option<&mut LedgerWriter>,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let mut blobs = window_receiver.recv_timeout(timer)?; let mut entries = window_receiver.recv_timeout(timer)?;
while let Ok(mut more) = window_receiver.try_recv() { while let Ok(mut more) = window_receiver.try_recv() {
blobs.append(&mut more); entries.append(&mut more);
} }
let entries = reconstruct_entries_from_blobs(blobs.clone())?;
inc_new_counter_info!( inc_new_counter_info!(
"store-transactions", "store-transactions",
@ -40,7 +39,7 @@ impl StoreLedgerStage {
Ok(()) Ok(())
} }
pub fn new(window_receiver: BlobReceiver, ledger_path: Option<&str>) -> Self { pub fn new(window_receiver: EntryReceiver, ledger_path: Option<&str>) -> Self {
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, true).unwrap()); let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, true).unwrap());
let t_store_requests = Builder::new() let t_store_requests = Builder::new()