From 669164bada9e469122f4a619b7a2215635435fb4 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sun, 1 Jul 2018 14:31:13 -0600 Subject: [PATCH] Boot EntryWriter's Mutex Finally! --- src/bin/genesis.rs | 5 ++--- src/entry_writer.rs | 21 ++++++++++----------- src/write_stage.rs | 8 ++++---- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index bc0698cd41..f927ecb007 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -10,7 +10,6 @@ use solana::mint::Mint; use std::error; use std::io::{stdin, stdout, Read}; use std::process::exit; -use std::sync::Mutex; fn main() -> Result<(), Box> { if is(Stream::Stdin) { @@ -26,7 +25,7 @@ fn main() -> Result<(), Box> { } let mint: Mint = serde_json::from_str(&buffer)?; - let writer = Mutex::new(stdout()); - EntryWriter::write_entries(&writer, &mint.create_entries())?; + let mut writer = stdout(); + EntryWriter::write_entries(&mut writer, &mint.create_entries())?; Ok(()) } diff --git a/src/entry_writer.rs b/src/entry_writer.rs index d8d8d73b79..0932653974 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -6,40 +6,39 @@ use bank::Bank; use entry::Entry; use serde_json; use std::io::{self, Write}; -use std::sync::Mutex; pub struct EntryWriter<'a, W> { bank: &'a Bank, - writer: Mutex, + writer: W, } impl<'a, W: Write> EntryWriter<'a, W> { /// Create a new Tpu that wraps the given Bank. - pub fn new(bank: &'a Bank, writer: Mutex) -> Self { + pub fn new(bank: &'a Bank, writer: W) -> Self { EntryWriter { bank, writer } } - fn write_entry(writer: &Mutex, entry: &Entry) -> io::Result<()> { + fn write_entry(writer: &mut W, entry: &Entry) -> io::Result<()> { let serialized = serde_json::to_string(&entry).unwrap(); - writeln!(writer.lock().unwrap(), "{}", serialized) + writeln!(writer, "{}", serialized) } - pub fn write_entries(writer: &Mutex, entries: &[Entry]) -> io::Result<()> { + pub fn write_entries(writer: &mut W, entries: &[Entry]) -> io::Result<()> { for entry in entries { Self::write_entry(writer, entry)?; } Ok(()) } - fn write_and_register_entry(&self, entry: &Entry) -> io::Result<()> { + fn write_and_register_entry(&mut self, entry: &Entry) -> io::Result<()> { trace!("write_and_register_entry entry"); if !entry.has_more { self.bank.register_entry_id(&entry.id); } - Self::write_entry(&self.writer, entry) + Self::write_entry(&mut self.writer, entry) } - pub fn write_and_register_entries(&self, entries: &[Entry]) -> io::Result<()> { + pub fn write_and_register_entries(&mut self, entries: &[Entry]) -> io::Result<()> { for entry in entries { self.write_and_register_entry(&entry)?; } @@ -61,8 +60,8 @@ mod tests { let mint = Mint::new(1); let bank = Bank::new(&mint); - let writer = Mutex::new(io::sink()); - let entry_writer = EntryWriter::new(&bank, writer); + let writer = io::sink(); + let mut entry_writer = EntryWriter::new(&bank, writer); let keypair = KeyPair::new(); let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id()); diff --git a/src/write_stage.rs b/src/write_stage.rs index cdcd221c45..ecdf773d3c 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -12,7 +12,7 @@ use std::collections::VecDeque; use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread::{Builder, JoinHandle}; use std::time::Duration; use streamer::{BlobReceiver, BlobSender}; @@ -26,7 +26,7 @@ impl WriteStage { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out pub fn write_and_send_entries( - entry_writer: &EntryWriter, + entry_writer: &mut EntryWriter, blob_sender: &BlobSender, blob_recycler: &BlobRecycler, entry_receiver: &Receiver>, @@ -55,10 +55,10 @@ impl WriteStage { let thread_hdl = Builder::new() .name("solana-writer".to_string()) .spawn(move || { - let entry_writer = EntryWriter::new(&bank, Mutex::new(writer)); + let mut entry_writer = EntryWriter::new(&bank, writer); loop { let _ = Self::write_and_send_entries( - &entry_writer, + &mut entry_writer, &blob_sender, &blob_recycler, &entry_receiver,