From 5ab38afa51ff24a989ddb2c730a81231edf401e6 Mon Sep 17 00:00:00 2001 From: carllin Date: Fri, 21 Sep 2018 16:50:58 -0700 Subject: [PATCH] Changed the window_service in Replicator to send entries instead of blobs (#1302) --- src/replicator.rs | 6 +++--- src/store_ledger_stage.rs | 17 ++++++++--------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/replicator.rs b/src/replicator.rs index edaa8bd027..e70b69f32d 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -49,7 +49,7 @@ impl Replicator { let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); - let (blob_window_sender, blob_window_receiver) = channel(); + let (entry_window_sender, entry_window_receiver) = channel(); // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); let t_window = window_service( @@ -57,12 +57,12 @@ impl Replicator { shared_window.clone(), entry_height, blob_fetch_receiver, - blob_window_sender, + entry_window_sender, retransmit_sender, repair_socket, ); - let store_ledger_stage = StoreLedgerStage::new(blob_window_receiver, ledger_path); + let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path); let ncp = Ncp::new( &crdt, diff --git a/src/store_ledger_stage.rs b/src/store_ledger_stage.rs index e18f1d882d..4f4a23f064 100644 --- a/src/store_ledger_stage.rs +++ b/src/store_ledger_stage.rs @@ -1,7 +1,8 @@ -//! The `store_ledger` stores the ledger from received blobs for storage nodes +//! The `store_ledger` stores the ledger from received entries for storage nodes use counter::Counter; -use ledger::{reconstruct_entries_from_blobs, LedgerWriter}; +use entry::EntryReceiver; +use ledger::LedgerWriter; use log::Level; use result::{Error, Result}; use service::Service; @@ -9,24 +10,22 @@ use std::sync::atomic::AtomicUsize; use std::sync::mpsc::RecvTimeoutError; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -use streamer::BlobReceiver; pub struct StoreLedgerStage { thread_hdls: Vec>, } impl StoreLedgerStage { - /// Process entry blobs, already in order + /// Process entries, already in order fn store_requests( - window_receiver: &BlobReceiver, + window_receiver: &EntryReceiver, ledger_writer: Option<&mut LedgerWriter>, ) -> Result<()> { let timer = Duration::new(1, 0); - let mut blobs = window_receiver.recv_timeout(timer)?; + let mut entries = window_receiver.recv_timeout(timer)?; while let Ok(mut more) = window_receiver.try_recv() { - blobs.append(&mut more); + entries.append(&mut more); } - let entries = reconstruct_entries_from_blobs(blobs.clone())?; inc_new_counter_info!( "store-transactions", @@ -40,7 +39,7 @@ impl StoreLedgerStage { Ok(()) } - pub fn new(window_receiver: BlobReceiver, ledger_path: Option<&str>) -> Self { + pub fn new(window_receiver: EntryReceiver, ledger_path: Option<&str>) -> Self { let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, true).unwrap()); let t_store_requests = Builder::new()