diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 56c4711640..ff571509a0 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -49,8 +49,6 @@ impl Default for Config { } impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. - /// Discard input packets using `packet_recycler` to minimize memory - /// allocations in a previous stage such as the `fetch_stage`. pub fn new( bank: Arc, verified_receiver: Receiver)>>, @@ -215,7 +213,7 @@ impl BankingStage { let count = mms.iter().map(|x| x.1.len()).sum(); let proc_start = Instant::now(); for (msgs, vers) in mms { - let transactions = Self::deserialize_transactions(&msgs.read()); + let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); reqs_len += transactions.len(); debug!("transactions received {}", transactions.len()); @@ -275,7 +273,7 @@ mod tests { use bank::Bank; use ledger::Block; use mint::Mint; - use packet::{to_packets, PacketRecycler}; + use packet::to_packets; use signature::{Keypair, KeypairUtil}; use std::thread::sleep; use system_transaction::SystemTransaction; @@ -342,8 +340,7 @@ mod tests { let tx_anf = Transaction::system_new(&keypair, keypair.pubkey(), 1, start_hash); // send 'em over - let recycler = PacketRecycler::default(); - let packets = to_packets(&recycler, &[tx, tx_no_ver, tx_anf]); + let packets = to_packets(&[tx, tx_no_ver, tx_anf]); // glad they all fit assert_eq!(packets.len(), 1); @@ -373,7 +370,6 @@ mod tests { // Entry OR if the verifier tries to parallelize across multiple Entries. let mint = Mint::new(2); let bank = Arc::new(Bank::new(&mint)); - let recycler = PacketRecycler::default(); let (verified_sender, verified_receiver) = channel(); let (banking_stage, entry_receiver) = BankingStage::new(bank.clone(), verified_receiver, Default::default()); @@ -382,14 +378,14 @@ mod tests { let alice = Keypair::new(); let tx = Transaction::system_new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); - let packets = to_packets(&recycler, &[tx]); + let packets = to_packets(&[tx]); verified_sender .send(vec![(packets[0].clone(), vec![1u8])]) .unwrap(); // Process a second batch that spends one of those tokens. let tx = Transaction::system_new(&alice, mint.pubkey(), 1, mint.last_id()); - let packets = to_packets(&recycler, &[tx]); + let packets = to_packets(&[tx]); verified_sender .send(vec![(packets[0].clone(), vec![1u8])]) .unwrap(); diff --git a/src/bin/bench-streamer.rs b/src/bin/bench-streamer.rs index 5189c652b3..149cc97d50 100644 --- a/src/bin/bench-streamer.rs +++ b/src/bin/bench-streamer.rs @@ -3,7 +3,7 @@ extern crate solana; use clap::{App, Arg}; use solana::netutil::bind_to; -use solana::packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE}; +use solana::packet::{Packet, SharedPackets, BLOB_SIZE, PACKET_DATA_SIZE}; use solana::result::Result; use solana::streamer::{receiver, PacketReceiver}; use std::cmp::max; @@ -16,12 +16,12 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use std::time::SystemTime; -fn producer(addr: &SocketAddr, recycler: &PacketRecycler, exit: Arc) -> JoinHandle<()> { +fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let msgs = recycler.allocate(); + let msgs = SharedPackets::default(); let msgs_ = msgs.clone(); - msgs.write().packets.resize(10, Packet::default()); - for w in &mut msgs.write().packets { + msgs.write().unwrap().packets.resize(10, Packet::default()); + for w in &mut msgs.write().unwrap().packets { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); } @@ -30,7 +30,7 @@ fn producer(addr: &SocketAddr, recycler: &PacketRecycler, exit: Arc) return; } let mut num = 0; - for p in &msgs_.read().packets { + for p in &msgs_.read().unwrap().packets { let a = p.meta.addr(); assert!(p.meta.size < BLOB_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); @@ -47,7 +47,7 @@ fn sink(exit: Arc, rvs: Arc, r: PacketReceiver) -> Join } let timer = Duration::new(1, 0); if let Ok(msgs) = r.recv_timeout(timer) { - rvs.fetch_add(msgs.read().packets.len(), Ordering::Relaxed); + rvs.fetch_add(msgs.read().unwrap().packets.len(), Ordering::Relaxed); } }) } @@ -72,7 +72,6 @@ fn main() -> Result<()> { let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let exit = Arc::new(AtomicBool::new(false)); - let pack_recycler = PacketRecycler::default(); let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); @@ -93,9 +92,9 @@ fn main() -> Result<()> { )); } - let t_producer1 = producer(&addr, &pack_recycler, exit.clone()); - let t_producer2 = producer(&addr, &pack_recycler, exit.clone()); - let t_producer3 = producer(&addr, &pack_recycler, exit.clone()); + let t_producer1 = producer(&addr, exit.clone()); + let t_producer2 = producer(&addr, exit.clone()); + let t_producer3 = producer(&addr, exit.clone()); let rvs = Arc::new(AtomicUsize::new(0)); let sink_threads: Vec<_> = read_channels diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 7220ab2185..c638f27292 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -7,7 +7,7 @@ use entry::Entry; use erasure; use ledger::Block; use log::Level; -use packet::{BlobRecycler, SharedBlobs}; +use packet::SharedBlobs; use rayon::prelude::*; use result::{Error, Result}; use service::Service; @@ -32,7 +32,6 @@ fn broadcast( node_info: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, - recycler: &BlobRecycler, receiver: &Receiver>, sock: &UdpSocket, transmit_index: &mut WindowIndex, @@ -54,7 +53,7 @@ fn broadcast( let to_blobs_start = Instant::now(); let dq: SharedBlobs = ventries .into_par_iter() - .flat_map(|p| p.to_blobs(recycler)) + .flat_map(|p| p.to_blobs()) .collect(); let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); @@ -85,19 +84,29 @@ fn broadcast( let mut win = window.write().unwrap(); assert!(blobs.len() <= win.len()); for b in &blobs { - let ix = b.read().get_index().expect("blob index"); + let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; if let Some(x) = win[pos].data.take() { - trace!("{} popped {} at {}", id, x.read().get_index().unwrap(), pos); + trace!( + "{} popped {} at {}", + id, + x.read().unwrap().get_index().unwrap(), + pos + ); } if let Some(x) = win[pos].coding.take() { - trace!("{} popped {} at {}", id, x.read().get_index().unwrap(), pos); + trace!( + "{} popped {} at {}", + id, + x.read().unwrap().get_index().unwrap(), + pos + ); } trace!("{} null {}", id, pos); } for b in &blobs { - let ix = b.read().get_index().expect("blob index"); + let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; trace!("{} caching {} at {}", id, ix, pos); assert!(win[pos].data.is_none()); @@ -111,7 +120,6 @@ fn broadcast( erasure::generate_coding( &id, &mut window.write().unwrap(), - recycler, *receive_index, blobs_len, &mut transmit_index.coding, @@ -174,7 +182,6 @@ impl BroadcastStage { crdt: &Arc>, window: &SharedWindow, entry_height: u64, - recycler: &BlobRecycler, receiver: &Receiver>, ) -> BroadcastStageReturnType { let mut transmit_index = WindowIndex { @@ -211,7 +218,6 @@ impl BroadcastStage { &me, &broadcast_table, &window, - &recycler, &receiver, &sock, &mut transmit_index, @@ -239,7 +245,6 @@ impl BroadcastStage { /// * `exit` - Boolean to signal system exit. /// * `crdt` - CRDT structure /// * `window` - Cache of blobs that we have broadcast - /// * `recycler` - Blob recycler. /// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. /// * `exit_sender` - Set to true when this stage exits, allows rest of Tpu to exit cleanly. Otherwise, /// when a Tpu stage closes, it only closes the stages that come after it. The stages @@ -256,12 +261,11 @@ impl BroadcastStage { receiver: Receiver>, exit_sender: Arc, ) -> Self { - let recycler = BlobRecycler::default(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_sender); - Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver) + Self::run(&sock, &crdt, &window, entry_height, &receiver) }).unwrap(); BroadcastStage { thread_hdl } @@ -356,7 +360,7 @@ mod tests { let window = shared_window.read().unwrap(); window.iter().fold(0, |m, w_slot| { if let Some(ref blob) = w_slot.data { - cmp::max(m, blob.read().get_index().unwrap()) + cmp::max(m, blob.read().unwrap().get_index().unwrap()) } else { m } diff --git a/src/crdt.rs b/src/crdt.rs index a798806cb0..25c770bd11 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -20,7 +20,7 @@ use hash::Hash; use ledger::LedgerWindow; use log::Level; use netutil::{bind_in_range, bind_to, multi_bind_in_range}; -use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; +use packet::{to_blob, Blob, SharedBlob, BLOB_SIZE}; use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::{Error, Result}; @@ -606,7 +606,7 @@ impl Crdt { // only leader should be broadcasting assert!(me.leader_id != v.id); let bl = b.unwrap(); - let blob = bl.read(); + let blob = bl.read().unwrap(); //TODO profile this, may need multiple sockets for par_iter trace!( "{}: BROADCAST idx: {} sz: {} to {},{} coding: {}", @@ -658,9 +658,10 @@ impl Crdt { (s.my_data().clone(), s.table.values().cloned().collect()) }; blob.write() + .unwrap() .set_id(me.id) .expect("set_id in pub fn retransmit"); - let rblob = blob.read(); + let rblob = blob.read().unwrap(); let orders: Vec<_> = table .iter() .filter(|v| { @@ -814,11 +815,7 @@ impl Crdt { } /// At random pick a node and try to get updated changes from them - fn run_gossip( - obj: &Arc>, - blob_sender: &BlobSender, - blob_recycler: &BlobRecycler, - ) -> Result<()> { + fn run_gossip(obj: &Arc>, blob_sender: &BlobSender) -> Result<()> { //TODO we need to keep track of stakes and weight the selection by stake size //TODO cache sockets @@ -831,7 +828,7 @@ impl Crdt { // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have - let blob = to_blob(req, remote_gossip_addr, blob_recycler)?; + let blob = to_blob(req, remote_gossip_addr)?; blob_sender.send(vec![blob])?; Ok(()) } @@ -918,12 +915,11 @@ impl Crdt { blob_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { - let blob_recycler = BlobRecycler::default(); Builder::new() .name("solana-gossip".to_string()) .spawn(move || loop { let start = timestamp(); - let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); + let _ = Self::run_gossip(&obj, &blob_sender); if exit.load(Ordering::Relaxed) { return; } @@ -945,11 +941,10 @@ impl Crdt { ledger_window: &mut Option<&mut LedgerWindow>, me: &NodeInfo, ix: u64, - blob_recycler: &BlobRecycler, ) -> Option { let pos = (ix as usize) % window.read().unwrap().len(); if let Some(ref mut blob) = &mut window.write().unwrap()[pos].data { - let mut wblob = blob.write(); + let mut wblob = blob.write().unwrap(); let blob_ix = wblob.get_index().expect("run_window_request get_index"); if blob_ix == ix { let num_retransmits = wblob.meta.num_retransmits; @@ -968,11 +963,11 @@ impl Crdt { sender_id = me.id } - let out = blob_recycler.allocate(); + let out = SharedBlob::default(); // copy to avoid doing IO inside the lock { - let mut outblob = out.write(); + let mut outblob = out.write().unwrap(); let sz = wblob.meta.size; outblob.meta.size = sz; outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); @@ -998,7 +993,6 @@ impl Crdt { inc_new_counter_info!("crdt-window-request-ledger", 1); let out = entry.to_blob( - blob_recycler, Some(ix), Some(me.id), // causes retransmission if I'm the leader Some(from_addr), @@ -1025,18 +1019,12 @@ impl Crdt { obj: &Arc>, window: &SharedWindow, ledger_window: &mut Option<&mut LedgerWindow>, - blob_recycler: &BlobRecycler, blob: &Blob, ) -> Option { match deserialize(&blob.data[..blob.meta.size]) { - Ok(request) => Crdt::handle_protocol( - obj, - &blob.meta.addr(), - request, - window, - ledger_window, - blob_recycler, - ), + Ok(request) => { + Crdt::handle_protocol(obj, &blob.meta.addr(), request, window, ledger_window) + } Err(_) => { warn!("deserialize crdt packet failed"); None @@ -1050,7 +1038,6 @@ impl Crdt { request: Protocol, window: &SharedWindow, ledger_window: &mut Option<&mut LedgerWindow>, - blob_recycler: &BlobRecycler, ) -> Option { match request { // TODO sigverify these @@ -1119,7 +1106,7 @@ impl Crdt { } else { let rsp = Protocol::ReceiveUpdates(from_id, ups, data, liveness); - if let Ok(r) = to_blob(rsp, from.contact_info.ncp, &blob_recycler) { + if let Ok(r) = to_blob(rsp, from.contact_info.ncp) { trace!( "sending updates me {} len {} to {} {}", id, @@ -1176,15 +1163,8 @@ impl Crdt { let me = me.read().unwrap().my_data().clone(); inc_new_counter_info!("crdt-window-request-recv", 1); trace!("{}: received RequestWindowIndex {} {} ", me.id, from.id, ix,); - let res = Self::run_window_request( - &from, - &from_addr, - &window, - ledger_window, - &me, - ix, - blob_recycler, - ); + let res = + Self::run_window_request(&from, &from_addr, &window, ledger_window, &me, ix); report_time_spent( "RequestWindowIndex", &now.elapsed(), @@ -1200,7 +1180,6 @@ impl Crdt { obj: &Arc>, window: &SharedWindow, ledger_window: &mut Option<&mut LedgerWindow>, - blob_recycler: &BlobRecycler, requests_receiver: &BlobReceiver, response_sender: &BlobSender, ) -> Result<()> { @@ -1212,8 +1191,7 @@ impl Crdt { } let mut resps = Vec::new(); for req in reqs { - if let Some(resp) = - Self::handle_blob(obj, window, ledger_window, blob_recycler, &req.read()) + if let Some(resp) = Self::handle_blob(obj, window, ledger_window, &req.read().unwrap()) { resps.push(resp); } @@ -1230,7 +1208,6 @@ impl Crdt { exit: Arc, ) -> JoinHandle<()> { let mut ledger_window = ledger_path.map(|p| LedgerWindow::open(p).unwrap()); - let blob_recycler = BlobRecycler::default(); Builder::new() .name("solana-listen".to_string()) @@ -1239,7 +1216,6 @@ impl Crdt { &me, &window, &mut ledger_window.as_mut(), - &blob_recycler, &requests_receiver, &response_sender, ); @@ -1408,7 +1384,7 @@ mod tests { use hash::{hash, Hash}; use ledger::{LedgerWindow, LedgerWriter}; use logger; - use packet::BlobRecycler; + use packet::SharedBlob; use result::Error; use signature::{Keypair, KeypairUtil}; use solana_program_interface::pubkey::Pubkey; @@ -1661,9 +1637,9 @@ mod tests { } assert!(rv.len() > 0); for i in rv.iter() { - if i.read().meta.addr() == nxt1.contact_info.ncp { + if i.read().unwrap().meta.addr() == nxt1.contact_info.ncp { one = true; - } else if i.read().meta.addr() == nxt2.contact_info.ncp { + } else if i.read().unwrap().meta.addr() == nxt2.contact_info.ncp { two = true; } else { //unexpected request @@ -1760,43 +1736,18 @@ mod tests { socketaddr!("127.0.0.1:1237"), socketaddr!("127.0.0.1:1238"), ); - let recycler = BlobRecycler::default(); - let rv = Crdt::run_window_request( - &me, - &socketaddr_any!(), - &window, - &mut None, - &me, - 0, - &recycler, - ); + let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0); assert!(rv.is_none()); - let out = recycler.allocate(); - out.write().meta.size = 200; + let out = SharedBlob::default(); + out.write().unwrap().meta.size = 200; window.write().unwrap()[0].data = Some(out); - let rv = Crdt::run_window_request( - &me, - &socketaddr_any!(), - &window, - &mut None, - &me, - 0, - &recycler, - ); + let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0); assert!(rv.is_some()); let v = rv.unwrap(); //test we copied the blob - assert_eq!(v.read().meta.size, 200); + assert_eq!(v.read().unwrap().meta.size, 200); let len = window.read().unwrap().len() as u64; - let rv = Crdt::run_window_request( - &me, - &socketaddr_any!(), - &window, - &mut None, - &me, - len, - &recycler, - ); + let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, len); assert!(rv.is_none()); fn tmp_ledger(name: &str) -> String { @@ -1825,7 +1776,6 @@ mod tests { &mut Some(&mut ledger_window), &me, 1, - &recycler, ); assert!(rv.is_some()); @@ -1842,22 +1792,13 @@ mod tests { let mock_peer = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - let recycler = BlobRecycler::default(); - // Simulate handling a repair request from mock_peer - let rv = Crdt::run_window_request( - &mock_peer, - &socketaddr_any!(), - &window, - &mut None, - &me, - 0, - &recycler, - ); + let rv = + Crdt::run_window_request(&mock_peer, &socketaddr_any!(), &window, &mut None, &me, 0); assert!(rv.is_none()); - let blob = recycler.allocate(); + let blob = SharedBlob::default(); let blob_size = 200; - blob.write().meta.size = blob_size; + blob.write().unwrap().meta.size = blob_size; window.write().unwrap()[0].data = Some(blob); let num_requests: u32 = 64; @@ -1869,9 +1810,8 @@ mod tests { &mut None, &me, 0, - &recycler, ).unwrap(); - let blob = shared_blob.read(); + let blob = shared_blob.read().unwrap(); // Test we copied the blob assert_eq!(blob.meta.size, blob_size); @@ -1944,7 +1884,6 @@ mod tests { fn protocol_requestupdate_alive() { logger::setup(); let window = Arc::new(RwLock::new(default_window())); - let recycler = BlobRecycler::default(); let node = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let node_with_same_addr = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); @@ -1958,37 +1897,18 @@ mod tests { let request = Protocol::RequestUpdates(1, node.clone()); assert!( - Crdt::handle_protocol( - &obj, - &node.contact_info.ncp, - request, - &window, - &mut None, - &recycler - ).is_none() + Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,) + .is_none() ); let request = Protocol::RequestUpdates(1, node_with_same_addr.clone()); assert!( - Crdt::handle_protocol( - &obj, - &node.contact_info.ncp, - request, - &window, - &mut None, - &recycler - ).is_none() + Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,) + .is_none() ); let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone()); - Crdt::handle_protocol( - &obj, - &node.contact_info.ncp, - request, - &window, - &mut None, - &recycler, - ); + Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None); let me = obj.write().unwrap(); diff --git a/src/entry.rs b/src/entry.rs index 7f07aea81f..e3ec34fd8a 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -5,7 +5,7 @@ use bincode::{serialize_into, serialized_size}; use budget_transaction::BudgetTransaction; use hash::Hash; -use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE}; +use packet::{SharedBlob, BLOB_DATA_SIZE}; use poh::Poh; use rayon::prelude::*; use solana_program_interface::pubkey::Pubkey; @@ -70,14 +70,13 @@ impl Entry { pub fn to_blob( &self, - blob_recycler: &BlobRecycler, idx: Option, id: Option, addr: Option<&SocketAddr>, ) -> SharedBlob { - let blob = blob_recycler.allocate(); + let blob = SharedBlob::default(); { - let mut blob_w = blob.write(); + let mut blob_w = blob.write().unwrap(); let pos = { let mut out = Cursor::new(blob_w.data_mut()); serialize_into(&mut out, &self).expect("failed to serialize output"); diff --git a/src/erasure.rs b/src/erasure.rs index 52a2322ab3..d99d8afb13 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -1,5 +1,5 @@ // Support erasure coding -use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE}; +use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE}; use solana_program_interface::pubkey::Pubkey; use std::cmp; use std::mem; @@ -217,7 +217,6 @@ pub fn decode_blocks( pub fn generate_coding( id: &Pubkey, window: &mut [WindowSlot], - recycler: &BlobRecycler, receive_index: u64, num_blobs: usize, transmit_index_coding: &mut u64, @@ -285,7 +284,7 @@ pub fn generate_coding( let n = i % window.len(); assert!(window[n].coding.is_none()); - window[n].coding = Some(recycler.allocate()); + window[n].coding = Some(SharedBlob::default()); let coding = window[n].coding.clone().unwrap(); let mut coding_wl = coding.write(); @@ -408,17 +407,10 @@ fn find_missing( // Recover a missing block into window // missing blocks should be None or old... -// Use recycler to allocate new ones. // If not enough coding or data blocks are present to restore // any of the blocks, the block is skipped. // Side effect: old blobs in a block are None'd -pub fn recover( - id: &Pubkey, - recycler: &BlobRecycler, - window: &mut [WindowSlot], - start_idx: u64, - start: usize, -) -> Result<()> { +pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: usize) -> Result<()> { let block_start = start - (start % NUM_DATA); let block_start_idx = start_idx - (start_idx % NUM_DATA as u64); @@ -478,7 +470,7 @@ pub fn recover( } blobs.push(b); } else { - let n = recycler.allocate(); + let n = SharedBlob::default(); window[j].data = Some(n.clone()); // mark the missing memory blobs.push(n); @@ -499,7 +491,7 @@ pub fn recover( } blobs.push(b); } else { - let n = recycler.allocate(); + let n = SharedBlob::default(); window[j].coding = Some(n.clone()); //mark the missing memory blobs.push(n); @@ -602,7 +594,7 @@ mod test { use crdt; use erasure; use logger; - use packet::{BlobRecycler, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; + use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE}; use rand::{thread_rng, Rng}; use signature::{Keypair, KeypairUtil}; use solana_program_interface::pubkey::Pubkey; @@ -698,11 +690,7 @@ mod test { } const WINDOW_SIZE: usize = 64; - fn generate_window( - blob_recycler: &BlobRecycler, - offset: usize, - num_blobs: usize, - ) -> Vec { + fn generate_window(offset: usize, num_blobs: usize) -> Vec { let mut window = vec![ WindowSlot { data: None, @@ -713,7 +701,7 @@ mod test { ]; let mut blobs = Vec::with_capacity(num_blobs); for i in 0..num_blobs { - let b = blob_recycler.allocate(); + let b = SharedBlob::default(); let b_ = b.clone(); let mut w = b.write(); // generate a random length, multiple of 4 between 8 and 32 @@ -764,36 +752,13 @@ mod test { } } - fn pollute_recycler(blob_recycler: &BlobRecycler) { - let mut blobs = Vec::with_capacity(WINDOW_SIZE * 2); - for _ in 0..WINDOW_SIZE * 10 { - let blob = blob_recycler.allocate(); - { - let mut b_l = blob.write(); - - for i in 0..BLOB_SIZE { - b_l.data[i] = thread_rng().gen(); - } - // some of the blobs should previously been used for coding - if thread_rng().gen_bool(erasure::NUM_CODING as f64 / erasure::NUM_DATA as f64) { - b_l.set_coding().unwrap(); - } - } - blobs.push(blob); - } - } - #[test] pub fn test_window_recover_basic() { logger::setup(); - let blob_recycler = BlobRecycler::default(); - - pollute_recycler(&blob_recycler); - // Generate a window let offset = 0; let num_blobs = erasure::NUM_DATA + 2; - let mut window = generate_window(&blob_recycler, WINDOW_SIZE, num_blobs); + let mut window = generate_window(WINDOW_SIZE, num_blobs); for slot in &window { if let Some(blob) = &slot.data { @@ -809,14 +774,8 @@ mod test { let mut index = (erasure::NUM_DATA + 2) as u64; let id = Pubkey::default(); assert!( - erasure::generate_coding( - &id, - &mut window, - &blob_recycler, - offset as u64, - num_blobs, - &mut index - ).is_ok() + erasure::generate_coding(&id, &mut window, offset as u64, num_blobs, &mut index) + .is_ok() ); assert_eq!(index, (erasure::NUM_DATA - erasure::NUM_CODING) as u64); @@ -835,15 +794,7 @@ mod test { scramble_window_tails(&mut window, num_blobs); // Recover it from coding - assert!( - erasure::recover( - &id, - &blob_recycler, - &mut window, - (offset + WINDOW_SIZE) as u64, - offset, - ).is_ok() - ); + assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok()); println!("** after-recover:"); print_window(&window); @@ -880,15 +831,7 @@ mod test { print_window(&window); // Recover it from coding - assert!( - erasure::recover( - &id, - &blob_recycler, - &mut window, - (offset + WINDOW_SIZE) as u64, - offset, - ).is_ok() - ); + assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok()); println!("** after-recover:"); print_window(&window); @@ -923,15 +866,7 @@ mod test { print_window(&window); // Recover it from coding - assert!( - erasure::recover( - &id, - &blob_recycler, - &mut window, - (offset + WINDOW_SIZE) as u64, - offset, - ).is_ok() - ); + assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok()); println!("** after-recover:"); print_window(&window); @@ -968,11 +903,10 @@ mod test { // #[ignore] // pub fn test_window_recover() { // logger::setup(); - // let blob_recycler = BlobRecycler::default(); // let offset = 4; // let data_len = 16; // let num_blobs = erasure::NUM_DATA + 2; - // let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, offset, num_blobs); + // let (mut window, blobs_len) = generate_window(data_len, offset, num_blobs); // println!("** after-gen:"); // print_window(&window); // assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok()); @@ -989,7 +923,7 @@ mod test { // window_l0.write().unwrap().data[0] = 55; // println!("** after-nulling:"); // print_window(&window); - // assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok()); + // assert!(erasure::recover(&mut window, offset, offset + blobs_len).is_ok()); // println!("** after-restore:"); // print_window(&window); // let window_l = window[offset + 1].clone().unwrap(); diff --git a/src/fullnode.rs b/src/fullnode.rs index f701761a49..f3ec04d160 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -552,7 +552,7 @@ mod tests { use crdt::Node; use fullnode::{Fullnode, FullnodeReturnType}; use ledger::genesis; - use packet::{make_consecutive_blobs, BlobRecycler}; + use packet::make_consecutive_blobs; use service::Service; use signature::{Keypair, KeypairUtil}; use std::cmp; @@ -658,7 +658,6 @@ mod tests { ); // Send blobs to the validator from our mock leader - let resp_recycler = BlobRecycler::default(); let t_responder = { let (s_responder, r_responder) = channel(); let blob_sockets: Vec> = leader_node @@ -685,15 +684,11 @@ mod tests { .expect("expected at least one genesis entry") .id; let tvu_address = &validator_info.contact_info.tvu; - let msgs = make_consecutive_blobs( - leader_id, - total_blobs_to_send, - last_id, - &tvu_address, - &resp_recycler, - ).into_iter() - .rev() - .collect(); + let msgs = + make_consecutive_blobs(leader_id, total_blobs_to_send, last_id, &tvu_address) + .into_iter() + .rev() + .collect(); s_responder.send(msgs).expect("send"); t_responder }; diff --git a/src/ledger.rs b/src/ledger.rs index 0267711db9..d639843bf2 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -10,7 +10,7 @@ use hash::Hash; use log::Level::Trace; #[cfg(test)] use mint::Mint; -use packet::{self, SharedBlob, BLOB_DATA_SIZE}; +use packet::{SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; use result::{Error, Result}; #[cfg(test)] @@ -403,14 +403,8 @@ pub fn read_ledger( pub trait Block { /// Verifies the hashes and counts of a slice of transactions are all consistent. fn verify(&self, start_hash: &Hash) -> bool; - fn to_blobs(&self, blob_recycler: &packet::BlobRecycler) -> Vec; - fn to_blobs_with_id( - &self, - blob_recycler: &packet::BlobRecycler, - id: Pubkey, - start_id: u64, - addr: &SocketAddr, - ) -> Vec; + fn to_blobs(&self) -> Vec; + fn to_blobs_with_id(&self, id: Pubkey, start_id: u64, addr: &SocketAddr) -> Vec; fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>; } @@ -431,28 +425,16 @@ impl Block for [Entry] { }) } - fn to_blobs_with_id( - &self, - blob_recycler: &packet::BlobRecycler, - id: Pubkey, - start_idx: u64, - addr: &SocketAddr, - ) -> Vec { + fn to_blobs_with_id(&self, id: Pubkey, start_idx: u64, addr: &SocketAddr) -> Vec { self.iter() .enumerate() - .map(|(i, entry)| { - entry.to_blob( - blob_recycler, - Some(start_idx + i as u64), - Some(id), - Some(&addr), - ) - }).collect() + .map(|(i, entry)| entry.to_blob(Some(start_idx + i as u64), Some(id), Some(&addr))) + .collect() } - fn to_blobs(&self, blob_recycler: &packet::BlobRecycler) -> Vec { + fn to_blobs(&self) -> Vec { let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - self.to_blobs_with_id(blob_recycler, Pubkey::default(), 0, &default_addr) + self.to_blobs_with_id(Pubkey::default(), 0, &default_addr) } fn votes(&self) -> Vec<(Pubkey, Vote, Hash)> { @@ -471,7 +453,7 @@ pub fn reconstruct_entries_from_blobs(blobs: Vec) -> Result; -pub type SharedBlob = recycler::Recyclable; +pub type SharedPackets = Arc>; +pub type SharedBlob = Arc>; pub type SharedBlobs = Vec; -pub type PacketRecycler = recycler::Recycler; -pub type BlobRecycler = recycler::Recycler; pub const NUM_PACKETS: usize = 1024 * 8; pub const BLOB_SIZE: usize = (64 * 1024 - 128); // wikipedia says there should be 20b for ipv4 headers @@ -67,12 +65,6 @@ impl Default for Packet { } } -impl recycler::Reset for Packet { - fn reset(&mut self) { - self.meta = Meta::default(); - } -} - impl Meta { pub fn addr(&self) -> SocketAddr { if !self.v6 { @@ -127,14 +119,6 @@ impl Default for Packets { } } -impl recycler::Reset for Packets { - fn reset(&mut self) { - for i in 0..self.packets.len() { - self.packets[i].reset(); - } - } -} - #[derive(Clone)] pub struct Blob { pub data: [u8; BLOB_SIZE], @@ -162,13 +146,6 @@ impl Default for Blob { } } -impl recycler::Reset for Blob { - fn reset(&mut self) { - self.meta = Meta::default(); - self.data[..BLOB_HEADER_SIZE].copy_from_slice(&[0u8; BLOB_HEADER_SIZE]); - } -} - #[derive(Debug)] pub enum BlobError { /// the Blob's meta and data are not self-consistent @@ -226,16 +203,15 @@ impl Packets { } } -pub fn to_packets_chunked( - r: &PacketRecycler, - xs: &[T], - chunks: usize, -) -> Vec { +pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec { let mut out = vec![]; for x in xs.chunks(chunks) { - let mut p = r.allocate(); - p.write().packets.resize(x.len(), Default::default()); - for (i, o) in x.iter().zip(p.write().packets.iter_mut()) { + let mut p = SharedPackets::default(); + p.write() + .unwrap() + .packets + .resize(x.len(), Default::default()); + for (i, o) in x.iter().zip(p.write().unwrap().packets.iter_mut()) { let v = serialize(&i).expect("serialize request"); let len = v.len(); o.data[..len].copy_from_slice(&v); @@ -246,18 +222,14 @@ pub fn to_packets_chunked( out } -pub fn to_packets(r: &PacketRecycler, xs: &[T]) -> Vec { - to_packets_chunked(r, xs, NUM_PACKETS) +pub fn to_packets(xs: &[T]) -> Vec { + to_packets_chunked(xs, NUM_PACKETS) } -pub fn to_blob( - resp: T, - rsp_addr: SocketAddr, - blob_recycler: &BlobRecycler, -) -> Result { - let blob = blob_recycler.allocate(); +pub fn to_blob(resp: T, rsp_addr: SocketAddr) -> Result { + let blob = SharedBlob::default(); { - let mut b = blob.write(); + let mut b = blob.write().unwrap(); let v = serialize(&resp)?; let len = v.len(); assert!(len <= BLOB_SIZE); @@ -268,13 +240,10 @@ pub fn to_blob( Ok(blob) } -pub fn to_blobs( - rsps: Vec<(T, SocketAddr)>, - blob_recycler: &BlobRecycler, -) -> Result { +pub fn to_blobs(rsps: Vec<(T, SocketAddr)>) -> Result { let mut blobs = Vec::new(); for (resp, rsp_addr) in rsps { - blobs.push(to_blob(resp, rsp_addr, blob_recycler)?); + blobs.push(to_blob(resp, rsp_addr)?); } Ok(blobs) } @@ -374,7 +343,7 @@ impl Blob { } pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { - let mut p = r.write(); + let mut p = r.write().unwrap(); trace!("receiving on {}", socket.local_addr().unwrap()); let (nrecv, from) = socket.recv_from(&mut p.data)?; @@ -384,7 +353,7 @@ impl Blob { Ok(()) } - pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result { + pub fn recv_from(socket: &UdpSocket) -> Result { let mut v = Vec::new(); //DOCUMENTED SIDE-EFFECT //Performance out of the IO without poll @@ -394,7 +363,7 @@ impl Blob { // * set it back to blocking before returning socket.set_nonblocking(false)?; for i in 0..NUM_BLOBS { - let r = re.allocate(); + let r = SharedBlob::default(); match Blob::recv_blob(socket, &r) { Err(_) if i > 0 => { @@ -418,7 +387,7 @@ impl Blob { pub fn send_to(socket: &UdpSocket, v: SharedBlobs) -> Result<()> { for r in v { { - let p = r.read(); + let p = r.read().unwrap(); let a = p.meta.addr(); if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) { warn!( @@ -439,7 +408,6 @@ pub fn make_consecutive_blobs( num_blobs_to_make: u64, start_hash: Hash, addr: &SocketAddr, - resp_recycler: &BlobRecycler, ) -> SharedBlobs { let mut last_hash = start_hash; let mut num_hashes = 0; @@ -447,7 +415,7 @@ pub fn make_consecutive_blobs( for _ in 0..num_blobs_to_make { all_entries.extend(next_entries_mut(&mut last_hash, &mut num_hashes, vec![])); } - let mut new_blobs = all_entries.to_blobs_with_id(&resp_recycler, me_id, 0, addr); + let mut new_blobs = all_entries.to_blobs_with_id(me_id, 0, addr); new_blobs.truncate(num_blobs_to_make as usize); new_blobs } @@ -455,10 +423,9 @@ pub fn make_consecutive_blobs( #[cfg(test)] mod tests { use packet::{ - to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, BLOB_HEADER_SIZE, - NUM_PACKETS, PACKET_DATA_SIZE, + to_packets, Blob, Meta, Packet, Packets, SharedBlob, SharedPackets, NUM_PACKETS, + PACKET_DATA_SIZE, }; - use recycler::Reset; use request::Request; use std::io; use std::io::Write; @@ -470,16 +437,15 @@ mod tests { let addr = reader.local_addr().unwrap(); let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); let saddr = sender.local_addr().unwrap(); - let r = PacketRecycler::default(); - let p = r.allocate(); - p.write().packets.resize(10, Packet::default()); - for m in p.write().packets.iter_mut() { + let p = SharedPackets::default(); + p.write().unwrap().packets.resize(10, Packet::default()); + for m in p.write().unwrap().packets.iter_mut() { m.meta.set_addr(&addr); m.meta.size = PACKET_DATA_SIZE; } - p.read().send_to(&sender).unwrap(); - p.write().recv_from(&reader).unwrap(); - for m in p.write().packets.iter_mut() { + p.read().unwrap().send_to(&sender).unwrap(); + p.write().unwrap().recv_from(&reader).unwrap(); + for m in p.write().unwrap().packets.iter_mut() { assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.addr(), saddr); } @@ -488,19 +454,18 @@ mod tests { #[test] fn test_to_packets() { let tx = Request::GetTransactionCount; - let re = PacketRecycler::default(); - let rv = to_packets(&re, &vec![tx.clone(); 1]); + let rv = to_packets(&vec![tx.clone(); 1]); assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().packets.len(), 1); + assert_eq!(rv[0].read().unwrap().packets.len(), 1); - let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS]); + let rv = to_packets(&vec![tx.clone(); NUM_PACKETS]); assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().packets.len(), NUM_PACKETS); + assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS + 1]); + let rv = to_packets(&vec![tx.clone(); NUM_PACKETS + 1]); assert_eq!(rv.len(), 2); - assert_eq!(rv[0].read().packets.len(), NUM_PACKETS); - assert_eq!(rv[1].read().packets.len(), 1); + assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + assert_eq!(rv[1].read().unwrap().packets.len(), 1); } #[test] @@ -509,17 +474,16 @@ mod tests { let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); let addr = reader.local_addr().unwrap(); let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let r = BlobRecycler::default(); - let p = r.allocate(); - p.write().meta.set_addr(&addr); - p.write().meta.size = 1024; + let p = SharedBlob::default(); + p.write().unwrap().meta.set_addr(&addr); + p.write().unwrap().meta.size = 1024; let v = vec![p]; Blob::send_to(&sender, v).unwrap(); trace!("send_to"); - let rv = Blob::recv_from(&r, &reader).unwrap(); + let rv = Blob::recv_from(&reader).unwrap(); trace!("recv_from"); assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().meta.size, 1024); + assert_eq!(rv[0].read().unwrap().meta.size, 1024); } #[cfg(all(feature = "ipv6", test))] @@ -528,14 +492,13 @@ mod tests { let reader = UdpSocket::bind("[::1]:0").expect("bind"); let addr = reader.local_addr().unwrap(); let sender = UdpSocket::bind("[::1]:0").expect("bind"); - let r = BlobRecycler::default(); - let p = r.allocate(); - p.as_mut().meta.set_addr(&addr); - p.as_mut().meta.size = 1024; + let p = SharedBlob::default(); + p.as_mut().unwrap().meta.set_addr(&addr); + p.as_mut().unwrap().meta.size = 1024; let mut v = VecDeque::default(); v.push_back(p); Blob::send_to(&r, &sender, &mut v).unwrap(); - let mut rv = Blob::recv_from(&r, &reader).unwrap(); + let mut rv = Blob::recv_from(&reader).unwrap(); let rp = rv.pop_front().unwrap(); assert_eq!(rp.as_mut().meta.size, 1024); } @@ -554,8 +517,6 @@ mod tests { b.data_mut()[0] = 1; assert_eq!(b.data()[0], 1); assert_eq!(b.get_index().unwrap(), ::max_value()); - b.reset(); - assert!(b.data[..BLOB_HEADER_SIZE].starts_with(&[0u8; BLOB_HEADER_SIZE])); assert_eq!(b.meta, Meta::default()); } diff --git a/src/recycler.rs b/src/recycler.rs deleted file mode 100644 index f7a738d0bf..0000000000 --- a/src/recycler.rs +++ /dev/null @@ -1,170 +0,0 @@ -use std::fmt; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; - -/// A function that leaves the given type in the same state as Default, -/// but starts with an existing type instead of allocating a new one. -pub trait Reset { - fn reset(&mut self); -} - -/// An value that's returned to its heap once dropped. -pub struct Recyclable { - val: Arc>, - landfill: Arc>>>>, -} - -impl Recyclable { - pub fn read(&self) -> RwLockReadGuard { - self.val.read().unwrap() - } - pub fn write(&self) -> RwLockWriteGuard { - self.val.write().unwrap() - } -} - -impl Drop for Recyclable { - fn drop(&mut self) { - if Arc::strong_count(&self.val) == 1 { - // this isn't thread safe, it will allow some concurrent drops to leak and not recycle - // if that happens the allocator will end up allocating from the heap - self.landfill.lock().unwrap().push(self.val.clone()); - } - } -} - -impl Clone for Recyclable { - fn clone(&self) -> Self { - Recyclable { - val: self.val.clone(), - landfill: self.landfill.clone(), - } - } -} - -impl fmt::Debug for Recyclable { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Recyclable {:?}", &self.read()) - } -} - -/// An object to minimize memory allocations. Use `allocate()` -/// to get recyclable values of type `T`. When those recyclables -/// are dropped, they're returned to the recycler. The next time -/// `allocate()` is called, the value will be pulled from the -/// recycler instead being allocated from memory. - -pub struct Recycler { - landfill: Arc>>>>, -} -impl Clone for Recycler { - fn clone(&self) -> Self { - Recycler { - landfill: self.landfill.clone(), - } - } -} - -impl Default for Recycler { - fn default() -> Self { - Recycler { - landfill: Arc::new(Mutex::new(vec![])), - } - } -} - -impl Recycler { - pub fn allocate(&self) -> Recyclable { - let val = self - .landfill - .lock() - .unwrap() - .pop() - .map(|val| { - val.write().unwrap().reset(); - val - }).unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))); - Recyclable { - val, - landfill: self.landfill.clone(), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::mem; - use std::sync::mpsc::channel; - - #[derive(Default)] - struct Foo { - x: u8, - } - - impl Reset for Foo { - fn reset(&mut self) { - self.x = 0; - } - } - - #[test] - fn test_allocate() { - let recycler: Recycler = Recycler::default(); - let r = recycler.allocate(); - assert_eq!(r.read().x, 0); - } - - #[test] - fn test_recycle() { - let recycler: Recycler = Recycler::default(); - - { - let foo = recycler.allocate(); - foo.write().x = 1; - } - assert_eq!(recycler.landfill.lock().unwrap().len(), 1); - - let foo = recycler.allocate(); - assert_eq!(foo.read().x, 0); - assert_eq!(recycler.landfill.lock().unwrap().len(), 0); - } - #[test] - fn test_channel() { - let recycler: Recycler = Recycler::default(); - let (sender, receiver) = channel(); - { - let foo = recycler.allocate(); - foo.write().x = 1; - sender.send(foo).unwrap(); - assert_eq!(recycler.landfill.lock().unwrap().len(), 0); - } - { - let foo = receiver.recv().unwrap(); - assert_eq!(foo.read().x, 1); - assert_eq!(recycler.landfill.lock().unwrap().len(), 0); - } - assert_eq!(recycler.landfill.lock().unwrap().len(), 1); - } - #[test] - fn test_window() { - let recycler: Recycler = Recycler::default(); - let mut window = vec![None]; - let (sender, receiver) = channel(); - { - // item is in the window while its in the pipeline - // which is used to serve requests from other threads - let item = recycler.allocate(); - item.write().x = 1; - window[0] = Some(item); - sender.send(window[0].clone().unwrap()).unwrap(); - } - { - let foo = receiver.recv().unwrap(); - assert_eq!(foo.read().x, 1); - let old = mem::replace(&mut window[0], None).unwrap(); - assert_eq!(old.read().x, 1); - } - // only one thing should be in the landfill at the end - assert_eq!(recycler.landfill.lock().unwrap().len(), 1); - } -} diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index ba1527bb6e..532bc5dc5c 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -6,7 +6,6 @@ use crdt::Crdt; use entry::EntryReceiver; use ledger::{Block, LedgerWriter}; use log::Level; -use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use signature::Keypair; @@ -48,7 +47,6 @@ impl ReplicateStage { fn replicate_requests( bank: &Arc, crdt: &Arc>, - blob_recycler: &BlobRecycler, window_receiver: &EntryReceiver, ledger_writer: Option<&mut LedgerWriter>, keypair: &Arc, @@ -64,7 +62,7 @@ impl ReplicateStage { let res = bank.process_entries(&entries); if let Some(sender) = vote_blob_sender { - send_validator_vote(bank, keypair, crdt, blob_recycler, sender)?; + send_validator_vote(bank, keypair, crdt, sender)?; } { @@ -100,7 +98,6 @@ impl ReplicateStage { let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap()); let keypair = Arc::new(keypair); - let blob_recycler = BlobRecycler::default(); let t_replicate = Builder::new() .name("solana-replicate-stage".to_string()) @@ -120,7 +117,6 @@ impl ReplicateStage { if let Err(e) = Self::replicate_requests( &bank, &crdt, - &blob_recycler, &window_receiver, ledger_writer.as_mut(), &keypair, diff --git a/src/request_stage.rs b/src/request_stage.rs index 77c905d49b..d17141e097 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -3,7 +3,7 @@ use bincode::deserialize; use counter::Counter; use log::Level; -use packet::{to_blobs, BlobRecycler, Packets, SharedPackets}; +use packet::{to_blobs, Packets, SharedPackets}; use rayon::prelude::*; use request::Request; use request_processor::RequestProcessor; @@ -38,7 +38,6 @@ impl RequestStage { request_processor: &RequestProcessor, packet_receiver: &Receiver, blob_sender: &BlobSender, - blob_recycler: &BlobRecycler, ) -> Result<()> { let (batch, batch_len, _recv_time) = streamer::recv_batch(packet_receiver)?; @@ -51,7 +50,7 @@ impl RequestStage { let mut reqs_len = 0; let proc_start = Instant::now(); for msgs in batch { - let reqs: Vec<_> = Self::deserialize_requests(&msgs.read()) + let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap()) .into_iter() .filter_map(|x| x) .collect(); @@ -59,7 +58,7 @@ impl RequestStage { let rsps = request_processor.process_requests(reqs); - let blobs = to_blobs(rsps, blob_recycler)?; + let blobs = to_blobs(rsps)?; if !blobs.is_empty() { info!("process: sending blobs: {}", blobs.len()); //don't wake up the other side if there is nothing @@ -85,7 +84,6 @@ impl RequestStage { ) -> (Self, BlobReceiver) { let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); - let blob_recycler = BlobRecycler::default(); let (blob_sender, blob_receiver) = channel(); let thread_hdl = Builder::new() .name("solana-request-stage".to_string()) @@ -94,7 +92,6 @@ impl RequestStage { &request_processor_, &packet_receiver, &blob_sender, - &blob_recycler, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, diff --git a/src/sigverify.rs b/src/sigverify.rs index 28bc0376cc..e0eb235c7c 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -73,7 +73,10 @@ fn verify_packet_disabled(_packet: &Packet) -> u8 { } fn batch_size(batches: &[SharedPackets]) -> usize { - batches.iter().map(|p| p.read().packets.len()).sum() + batches + .iter() + .map(|p| p.read().unwrap().packets.len()) + .sum() } #[cfg(not(feature = "cuda"))] @@ -87,8 +90,14 @@ pub fn ed25519_verify_cpu(batches: &[SharedPackets]) -> Vec> { info!("CPU ECDSA for {}", batch_size(batches)); let rv = batches .into_par_iter() - .map(|p| p.read().packets.par_iter().map(verify_packet).collect()) - .collect(); + .map(|p| { + p.read() + .unwrap() + .packets + .par_iter() + .map(verify_packet) + .collect() + }).collect(); inc_new_counter_info!("ed25519_verify_cpu", count); rv } @@ -101,6 +110,7 @@ pub fn ed25519_verify_disabled(batches: &[SharedPackets]) -> Vec> { .into_par_iter() .map(|p| { p.read() + .unwrap() .packets .par_iter() .map(verify_packet_disabled) @@ -196,7 +206,7 @@ pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec> { #[cfg(test)] mod tests { use bincode::serialize; - use packet::{Packet, PacketRecycler}; + use packet::{Packet, SharedPackets}; use sigverify; use system_transaction::{memfind, test_tx}; use transaction::Transaction; @@ -228,15 +238,18 @@ mod tests { } // generate packet vector - let packet_recycler = PacketRecycler::default(); let batches: Vec<_> = (0..2) .map(|_| { - let packets = packet_recycler.allocate(); - packets.write().packets.resize(0, Default::default()); + let packets = SharedPackets::default(); + packets + .write() + .unwrap() + .packets + .resize(0, Default::default()); for _ in 0..n { - packets.write().packets.push(packet.clone()); + packets.write().unwrap().packets.push(packet.clone()); } - assert_eq!(packets.read().packets.len(), n); + assert_eq!(packets.read().unwrap().packets.len(), n); packets }).collect(); assert_eq!(batches.len(), 2); diff --git a/src/streamer.rs b/src/streamer.rs index 22042ae218..7854dc56ed 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -2,7 +2,7 @@ //! use influx_db_client as influxdb; use metrics; -use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlobs, SharedPackets}; +use packet::{Blob, SharedBlobs, SharedPackets}; use result::{Error, Result}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -20,22 +20,21 @@ pub type BlobReceiver = Receiver; fn recv_loop( sock: &UdpSocket, exit: &Arc, - re: &PacketRecycler, channel: &PacketSender, channel_tag: &'static str, ) -> Result<()> { loop { - let msgs = re.allocate(); + let msgs = SharedPackets::default(); loop { // Check for exit signal, even if socket is busy // (for instance the leader trasaction socket) if exit.load(Ordering::Relaxed) { return Ok(()); } - let result = msgs.write().recv_from(sock); + let result = msgs.write().unwrap().recv_from(sock); match result { Ok(()) => { - let len = msgs.read().packets.len(); + let len = msgs.read().unwrap().packets.len(); metrics::submit( influxdb::Point::new(channel_tag) .add_field("count", influxdb::Value::Integer(len as i64)) @@ -57,14 +56,13 @@ pub fn receiver( sender_tag: &'static str, ) -> JoinHandle<()> { let res = sock.set_read_timeout(Some(Duration::new(1, 0))); - let recycler = PacketRecycler::default(); if res.is_err() { panic!("streamer::receiver set_read_timeout error"); } Builder::new() .name("solana-receiver".to_string()) .spawn(move || { - let _ = recv_loop(&sock, &exit, &recycler, &packet_sender, sender_tag); + let _ = recv_loop(&sock, &exit, &packet_sender, sender_tag); () }).unwrap() } @@ -81,11 +79,11 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize, let msgs = recvr.recv_timeout(timer)?; let recv_start = Instant::now(); trace!("got msgs"); - let mut len = msgs.read().packets.len(); + let mut len = msgs.read().unwrap().packets.len(); let mut batch = vec![msgs]; while let Ok(more) = recvr.try_recv() { trace!("got more msgs"); - len += more.read().packets.len(); + len += more.read().unwrap().packets.len(); batch.push(more); if len > 100_000 { @@ -112,9 +110,9 @@ pub fn responder(name: &'static str, sock: Arc, r: BlobReceiver) -> J //TODO, we would need to stick block authentication before we create the //window. -fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> { +fn recv_blobs(sock: &UdpSocket, s: &BlobSender) -> Result<()> { trace!("recv_blobs: receiving on {}", sock.local_addr().unwrap()); - let dq = Blob::recv_from(recycler, sock)?; + let dq = Blob::recv_from(sock)?; if !dq.is_empty() { s.send(dq)?; } @@ -127,20 +125,19 @@ pub fn blob_receiver(sock: Arc, exit: Arc, s: BlobSender) let timer = Duration::new(1, 0); sock.set_read_timeout(Some(timer)) .expect("set socket timeout"); - let recycler = BlobRecycler::default(); Builder::new() .name("solana-blob_receiver".to_string()) .spawn(move || loop { if exit.load(Ordering::Relaxed) { break; } - let _ = recv_blobs(&recycler, &sock, &s); + let _ = recv_blobs(&sock, &s); }).unwrap() } #[cfg(test)] mod test { - use packet::{Blob, BlobRecycler, Packet, Packets, PACKET_DATA_SIZE}; + use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use std::io; use std::io::Write; use std::net::UdpSocket; @@ -155,7 +152,7 @@ mod test { for _t in 0..5 { let timer = Duration::new(1, 0); match r.recv_timeout(timer) { - Ok(m) => *num += m.read().packets.len(), + Ok(m) => *num += m.read().unwrap().packets.len(), _ => info!("get_msgs error"), } if *num == 10 { @@ -177,7 +174,6 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader, "streamer-test"); let t_responder = { @@ -185,9 +181,9 @@ mod test { let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); let mut msgs = Vec::new(); for i in 0..10 { - let mut b = resp_recycler.allocate(); + let mut b = SharedBlob::default(); { - let mut w = b.write(); + let mut w = b.write().unwrap(); w.data[0] = i as u8; w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); diff --git a/src/tvu.rs b/src/tvu.rs index 002be45f36..6383aa4d20 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -157,7 +157,7 @@ pub mod tests { use logger; use mint::Mint; use ncp::Ncp; - use packet::BlobRecycler; + use packet::SharedBlob; use service::Service; use signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; @@ -209,7 +209,6 @@ pub mod tests { // setup some blob services to send blobs into the socket // to simulate the source peer and get blobs out of the socket to // simulate target peer - let recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let blob_sockets: Vec> = target2 .sockets @@ -279,9 +278,9 @@ pub mod tests { alice_ref_balance -= transfer_amount; for entry in vec![entry0, entry1] { - let mut b = recycler.allocate(); + let mut b = SharedBlob::default(); { - let mut w = b.write(); + let mut w = b.write().unwrap(); w.set_index(blob_id).unwrap(); blob_id += 1; w.set_id(leader_id).unwrap(); diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 504b680678..8343224d23 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -9,7 +9,7 @@ use hash::Hash; use influx_db_client as influxdb; use log::Level; use metrics; -use packet::{BlobRecycler, SharedBlob}; +use packet::SharedBlob; use result::Result; use signature::Keypair; use solana_program_interface::pubkey::Pubkey; @@ -31,9 +31,8 @@ pub fn create_new_signed_vote_blob( last_id: &Hash, keypair: &Keypair, crdt: &Arc>, - blob_recycler: &BlobRecycler, ) -> Result { - let shared_blob = blob_recycler.allocate(); + let shared_blob = SharedBlob::default(); let (vote, addr) = { let mut wcrdt = crdt.write().unwrap(); //TODO: doesn't seem like there is a synchronous call to get height and id @@ -42,7 +41,7 @@ pub fn create_new_signed_vote_blob( }?; let tx = Transaction::budget_new_vote(&keypair, vote, *last_id, 0); { - let mut blob = shared_blob.write(); + let mut blob = shared_blob.write().unwrap(); let bytes = serialize(&tx)?; let len = bytes.len(); blob.data[..len].copy_from_slice(&bytes); @@ -109,7 +108,6 @@ pub fn send_leader_vote( keypair: &Keypair, bank: &Arc, crdt: &Arc>, - blob_recycler: &BlobRecycler, vote_blob_sender: &BlobSender, last_vote: &mut u64, last_valid_validator_timestamp: &mut u64, @@ -125,9 +123,7 @@ pub fn send_leader_vote( last_vote, last_valid_validator_timestamp, ) { - if let Ok(shared_blob) = - create_new_signed_vote_blob(&last_id, keypair, crdt, blob_recycler) - { + if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt) { vote_blob_sender.send(vec![shared_blob])?; let finality_ms = now - super_majority_timestamp; @@ -152,11 +148,10 @@ pub fn send_validator_vote( bank: &Arc, keypair: &Arc, crdt: &Arc>, - blob_recycler: &BlobRecycler, vote_blob_sender: &BlobSender, ) -> Result<()> { let last_id = bank.last_id(); - if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt, blob_recycler) { + if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt) { inc_new_counter_info!("replicate-vote_sent", 1); vote_blob_sender.send(vec![shared_blob])?; @@ -228,7 +223,6 @@ pub mod tests { leader_crdt.insert_vote(&validator.id, &vote, entry.id); } let leader = Arc::new(RwLock::new(leader_crdt)); - let blob_recycler = BlobRecycler::default(); let (vote_blob_sender, vote_blob_receiver) = channel(); let mut last_vote: u64 = timing::timestamp() - VOTE_TIMEOUT_MS - 1; let mut last_valid_validator_timestamp = 0; @@ -237,7 +231,6 @@ pub mod tests { &mint.keypair(), &bank, &leader, - &blob_recycler, &vote_blob_sender, &mut last_vote, &mut last_valid_validator_timestamp, @@ -277,7 +270,6 @@ pub mod tests { &mint.keypair(), &bank, &leader, - &blob_recycler, &vote_blob_sender, &mut last_vote, &mut last_valid_validator_timestamp, @@ -292,7 +284,7 @@ pub mod tests { // vote should be valid let blob = &vote_blob.unwrap()[0]; - let tx = deserialize(&(blob.read().data)).unwrap(); + let tx = deserialize(&(blob.read().unwrap().data)).unwrap(); assert!(bank.process_transaction(&tx).is_ok()); } diff --git a/src/window.rs b/src/window.rs index 339a0a53cf..ae4406feb1 100644 --- a/src/window.rs +++ b/src/window.rs @@ -7,7 +7,7 @@ use entry::Entry; use erasure; use ledger::{reconstruct_entries_from_blobs, Block}; use log::Level; -use packet::{BlobRecycler, SharedBlob}; +use packet::SharedBlob; use result::Result; use solana_program_interface::pubkey::Pubkey; use std::cmp; @@ -28,7 +28,7 @@ pub struct WindowSlot { impl WindowSlot { fn blob_index(&self) -> Option { match self.data { - Some(ref blob) => blob.read().get_index().ok(), + Some(ref blob) => blob.read().unwrap().get_index().ok(), None => None, } } @@ -70,7 +70,6 @@ pub trait WindowUtil { blob: SharedBlob, pix: u64, consume_queue: &mut Vec, - recycler: &BlobRecycler, consumed: &mut u64, leader_unknown: bool, pending_retransmits: &mut bool, @@ -200,7 +199,6 @@ impl WindowUtil for Window { blob: SharedBlob, pix: u64, consume_queue: &mut Vec, - recycler: &BlobRecycler, consumed: &mut u64, leader_unknown: bool, pending_retransmits: &mut bool, @@ -208,7 +206,7 @@ impl WindowUtil for Window { ) { let w = (pix % WINDOW_SIZE) as usize; - let is_coding = blob.read().is_coding(); + let is_coding = blob.read().unwrap().is_coding(); // insert a newly received blob into a window slot, clearing out and recycling any previous // blob unless the incoming blob is a duplicate (based on idx) @@ -221,7 +219,7 @@ impl WindowUtil for Window { c_or_d: &str, ) -> bool { if let Some(old) = mem::replace(window_slot, Some(blob)) { - let is_dup = old.read().get_index().unwrap() == pix; + let is_dup = old.read().unwrap().get_index().unwrap() == pix; trace!( "{}: occupied {} window slot {:}, is_dup: {}", id, @@ -250,21 +248,9 @@ impl WindowUtil for Window { self[w].leader_unknown = leader_unknown; *pending_retransmits = true; - #[cfg(not(feature = "erasure"))] - { - // suppress warning: unused variable: `recycler` - let _ = recycler; - } #[cfg(feature = "erasure")] { - if erasure::recover( - id, - recycler, - self, - *consumed, - (*consumed % WINDOW_SIZE) as usize, - ).is_err() - { + if erasure::recover(id, self, *consumed, (*consumed % WINDOW_SIZE) as usize).is_err() { trace!("{}: erasure::recover failed", id); } } @@ -289,7 +275,7 @@ impl WindowUtil for Window { let k_data_blob; let k_data_slot = &mut self[k].data; if let Some(blob) = k_data_slot { - if blob.read().get_index().unwrap() < *consumed { + if blob.read().unwrap().get_index().unwrap() < *consumed { // window wrap-around, end of received break; } @@ -389,7 +375,7 @@ pub fn index_blobs( trace!("{}: INDEX_BLOBS {}", node_info.id, blobs.len()); for (i, b) in blobs.iter().enumerate() { // only leader should be broadcasting - let mut blob = b.write(); + let mut blob = b.write().unwrap(); blob.set_id(node_info.id) .expect("set_id in pub fn broadcast"); blob.set_index(*receive_index + i as u64) @@ -426,7 +412,7 @@ pub fn initialized_window( // populate the window, offset by implied index let diff = cmp::max(blobs.len() as isize - window.len() as isize, 0) as usize; for b in blobs.into_iter().skip(diff) { - let ix = b.read().get_index().expect("blob index"); + let ix = b.read().unwrap().get_index().expect("blob index"); let pos = (ix % WINDOW_SIZE) as usize; trace!("{} caching {} at {}", id, ix, pos); assert!(window[pos].data.is_none()); @@ -442,14 +428,13 @@ pub fn new_window_from_entries( node_info: &NodeInfo, ) -> Window { // convert to blobs - let blob_recycler = BlobRecycler::default(); - let blobs = ledger_tail.to_blobs(&blob_recycler); + let blobs = ledger_tail.to_blobs(); initialized_window(&node_info, blobs, entry_height) } #[cfg(test)] mod test { - use packet::{Blob, BlobRecycler, Packet, Packets, PACKET_DATA_SIZE}; + use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use solana_program_interface::pubkey::Pubkey; use std::io; use std::io::Write; @@ -465,7 +450,7 @@ mod test { for _t in 0..5 { let timer = Duration::new(1, 0); match r.recv_timeout(timer) { - Ok(m) => *num += m.read().packets.len(), + Ok(m) => *num += m.read().unwrap().packets.len(), e => info!("error {:?}", e), } if *num == 10 { @@ -487,7 +472,6 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = receiver( Arc::new(read), @@ -500,9 +484,9 @@ mod test { let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); let mut msgs = Vec::new(); for i in 0..10 { - let mut b = resp_recycler.allocate(); + let mut b = SharedBlob::default(); { - let mut w = b.write(); + let mut w = b.write().unwrap(); w.data[0] = i as u8; w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); diff --git a/src/window_service.rs b/src/window_service.rs index 74f418b736..e5628f640f 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -4,7 +4,7 @@ use counter::Counter; use crdt::{Crdt, NodeInfo}; use entry::EntrySender; use log::Level; -use packet::{BlobRecycler, SharedBlob}; +use packet::SharedBlob; use rand::{thread_rng, Rng}; use result::{Error, Result}; use solana_program_interface::pubkey::Pubkey; @@ -49,10 +49,9 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { fn add_block_to_retransmit_queue( b: &SharedBlob, leader_id: Pubkey, - recycler: &BlobRecycler, retransmit_queue: &mut Vec, ) { - let p = b.read(); + let p = b.read().unwrap(); //TODO this check isn't safe against adverserial packets //we need to maintain a sequence window trace!( @@ -73,11 +72,9 @@ fn add_block_to_retransmit_queue( //otherwise we get into races with which thread //should do the recycling // - //a better abstraction would be to recycle when the blob - //is dropped via a weakref to the recycler - let nv = recycler.allocate(); + let nv = SharedBlob::default(); { - let mut mnv = nv.write(); + let mut mnv = nv.write().unwrap(); let sz = p.meta.size; mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); @@ -91,7 +88,6 @@ fn retransmit_all_leader_blocks( maybe_leader: Option, dq: &[SharedBlob], id: &Pubkey, - recycler: &BlobRecycler, consumed: u64, received: u64, retransmit: &BlobSender, @@ -101,7 +97,7 @@ fn retransmit_all_leader_blocks( if let Some(leader) = maybe_leader { let leader_id = leader.id; for b in dq { - add_block_to_retransmit_queue(b, leader_id, recycler, &mut retransmit_queue); + add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue); } if *pending_retransmits { @@ -113,12 +109,7 @@ fn retransmit_all_leader_blocks( *pending_retransmits = false; if w.leader_unknown { if let Some(ref b) = w.data { - add_block_to_retransmit_queue( - b, - leader_id, - recycler, - &mut retransmit_queue, - ); + add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue); w.leader_unknown = false; } } @@ -146,7 +137,6 @@ fn recv_window( window: &SharedWindow, id: &Pubkey, crdt: &Arc>, - recycler: &BlobRecycler, consumed: &mut u64, received: &mut u64, max_ix: u64, @@ -183,7 +173,6 @@ fn recv_window( maybe_leader, &dq, id, - recycler, *consumed, *received, retransmit, @@ -195,7 +184,7 @@ fn recv_window( let mut consume_queue = Vec::new(); for b in dq { let (pix, meta_size) = { - let p = b.read(); + let p = b.read().unwrap(); (p.get_index()?, p.meta.size) }; pixs.push(pix); @@ -219,7 +208,6 @@ fn recv_window( b, pix, &mut consume_queue, - recycler, consumed, leader_unknown, pending_retransmits, @@ -276,7 +264,6 @@ pub fn window_service( leader_rotation_interval = rcrdt.get_leader_rotation_interval(); } let mut pending_retransmits = false; - let recycler = BlobRecycler::default(); trace!("{}: RECV_WINDOW started", id); loop { if consumed != 0 && consumed % (leader_rotation_interval as u64) == 0 { @@ -297,7 +284,6 @@ pub fn window_service( &window, &id, &crdt, - &recycler, &mut consumed, &mut received, max_entry_height, @@ -354,7 +340,7 @@ mod test { use entry::Entry; use hash::Hash; use logger; - use packet::{make_consecutive_blobs, BlobRecycler, PACKET_DATA_SIZE}; + use packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE}; use signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -390,7 +376,6 @@ mod test { crdt_me.set_leader(me_id); let subs = Arc::new(RwLock::new(crdt_me)); - let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, r_window) = channel(); @@ -416,15 +401,11 @@ mod test { let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut num_blobs_to_make = 10; let gossip_address = &tn.info.contact_info.ncp; - let msgs = make_consecutive_blobs( - me_id, - num_blobs_to_make, - Hash::default(), - &gossip_address, - &resp_recycler, - ).into_iter() - .rev() - .collect();; + let msgs = + make_consecutive_blobs(me_id, num_blobs_to_make, Hash::default(), &gossip_address) + .into_iter() + .rev() + .collect();; s_responder.send(msgs).expect("send"); t_responder }; @@ -452,7 +433,6 @@ mod test { let me_id = crdt_me.my_data().id; let subs = Arc::new(RwLock::new(crdt_me)); - let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, _r_window) = channel(); @@ -478,9 +458,9 @@ mod test { let mut msgs = Vec::new(); for v in 0..10 { let i = 9 - v; - let b = resp_recycler.allocate(); + let b = SharedBlob::default(); { - let mut w = b.write(); + let mut w = b.write().unwrap(); w.set_index(i).unwrap(); w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); @@ -509,7 +489,6 @@ mod test { let me_id = crdt_me.my_data().id; let subs = Arc::new(RwLock::new(crdt_me)); - let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, _r_window) = channel(); @@ -535,9 +514,9 @@ mod test { let mut msgs = Vec::new(); for v in 0..10 { let i = 9 - v; - let b = resp_recycler.allocate(); + let b = SharedBlob::default(); { - let mut w = b.write(); + let mut w = b.write().unwrap(); w.set_index(i).unwrap(); w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); @@ -555,9 +534,9 @@ mod test { let mut msgs1 = Vec::new(); for v in 1..5 { let i = 9 + v; - let b = resp_recycler.allocate(); + let b = SharedBlob::default(); { - let mut w = b.write(); + let mut w = b.write().unwrap(); w.set_index(i).unwrap(); w.set_id(me_id).unwrap(); assert_eq!(i, w.get_index().unwrap()); @@ -631,7 +610,6 @@ mod test { let subs = Arc::new(RwLock::new(crdt_me)); - let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, _r_window) = channel(); @@ -667,15 +645,11 @@ mod test { let extra_blobs = leader_rotation_interval; let total_blobs_to_send = my_leader_begin_epoch * leader_rotation_interval + extra_blobs; - let msgs = make_consecutive_blobs( - me_id, - total_blobs_to_send, - Hash::default(), - &ncp_address, - &resp_recycler, - ).into_iter() - .rev() - .collect();; + let msgs = + make_consecutive_blobs(me_id, total_blobs_to_send, Hash::default(), &ncp_address) + .into_iter() + .rev() + .collect();; s_responder.send(msgs).expect("send"); t_responder }; diff --git a/src/write_stage.rs b/src/write_stage.rs index d4474dd11d..6f03962d1a 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -8,7 +8,6 @@ use crdt::Crdt; use entry::Entry; use ledger::{Block, LedgerWriter}; use log::Level; -use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use signature::Keypair; @@ -207,7 +206,6 @@ impl WriteStage { leader_rotation_interval = rcrdt.get_leader_rotation_interval(); } let mut entry_height = entry_height; - let blob_recycler = BlobRecycler::default(); loop { // Note that entry height is not zero indexed, it starts at 1, so the // old leader is in power up to and including entry height @@ -258,7 +256,6 @@ impl WriteStage { &keypair, &bank, &crdt, - &blob_recycler, &vote_blob_sender, &mut last_vote, &mut last_valid_validator_timestamp, diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index da90aa93f1..a6f52f38b5 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -7,7 +7,7 @@ use rayon::iter::*; use solana::crdt::{Crdt, Node}; use solana::logger; use solana::ncp::Ncp; -use solana::packet::{Blob, BlobRecycler}; +use solana::packet::{Blob, SharedBlob}; use solana::result; use solana::service::Service; use std::net::UdpSocket; @@ -159,9 +159,8 @@ pub fn crdt_retransmit() -> result::Result<()> { sleep(Duration::new(1, 0)); } assert!(done); - let r = BlobRecycler::default(); - let b = r.allocate(); - b.write().meta.size = 10; + let b = SharedBlob::default(); + b.write().unwrap().meta.size = 10; Crdt::retransmit(&c1, &b, &tn1)?; let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter()