Remove recycler and it's usage

- The memory usage due to recycler was high, and incrementing with
  time.
This commit is contained in:
Pankaj Garg 2018-09-26 16:50:12 +00:00 committed by Greg Fitzgerald
parent 2e00be262e
commit e10574c64d
21 changed files with 241 additions and 677 deletions

View File

@ -49,8 +49,6 @@ impl Default for Config {
}
impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
/// Discard input packets using `packet_recycler` to minimize memory
/// allocations in a previous stage such as the `fetch_stage`.
pub fn new(
bank: Arc<Bank>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
@ -215,7 +213,7 @@ impl BankingStage {
let count = mms.iter().map(|x| x.1.len()).sum();
let proc_start = Instant::now();
for (msgs, vers) in mms {
let transactions = Self::deserialize_transactions(&msgs.read());
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
reqs_len += transactions.len();
debug!("transactions received {}", transactions.len());
@ -275,7 +273,7 @@ mod tests {
use bank::Bank;
use ledger::Block;
use mint::Mint;
use packet::{to_packets, PacketRecycler};
use packet::to_packets;
use signature::{Keypair, KeypairUtil};
use std::thread::sleep;
use system_transaction::SystemTransaction;
@ -342,8 +340,7 @@ mod tests {
let tx_anf = Transaction::system_new(&keypair, keypair.pubkey(), 1, start_hash);
// send 'em over
let recycler = PacketRecycler::default();
let packets = to_packets(&recycler, &[tx, tx_no_ver, tx_anf]);
let packets = to_packets(&[tx, tx_no_ver, tx_anf]);
// glad they all fit
assert_eq!(packets.len(), 1);
@ -373,7 +370,6 @@ mod tests {
// Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2);
let bank = Arc::new(Bank::new(&mint));
let recycler = PacketRecycler::default();
let (verified_sender, verified_receiver) = channel();
let (banking_stage, entry_receiver) =
BankingStage::new(bank.clone(), verified_receiver, Default::default());
@ -382,14 +378,14 @@ mod tests {
let alice = Keypair::new();
let tx = Transaction::system_new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let packets = to_packets(&recycler, &[tx]);
let packets = to_packets(&[tx]);
verified_sender
.send(vec![(packets[0].clone(), vec![1u8])])
.unwrap();
// Process a second batch that spends one of those tokens.
let tx = Transaction::system_new(&alice, mint.pubkey(), 1, mint.last_id());
let packets = to_packets(&recycler, &[tx]);
let packets = to_packets(&[tx]);
verified_sender
.send(vec![(packets[0].clone(), vec![1u8])])
.unwrap();

View File

@ -3,7 +3,7 @@ extern crate solana;
use clap::{App, Arg};
use solana::netutil::bind_to;
use solana::packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE};
use solana::packet::{Packet, SharedPackets, BLOB_SIZE, PACKET_DATA_SIZE};
use solana::result::Result;
use solana::streamer::{receiver, PacketReceiver};
use std::cmp::max;
@ -16,12 +16,12 @@ use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use std::time::SystemTime;
fn producer(addr: &SocketAddr, recycler: &PacketRecycler, exit: Arc<AtomicBool>) -> JoinHandle<()> {
fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let msgs = recycler.allocate();
let msgs = SharedPackets::default();
let msgs_ = msgs.clone();
msgs.write().packets.resize(10, Packet::default());
for w in &mut msgs.write().packets {
msgs.write().unwrap().packets.resize(10, Packet::default());
for w in &mut msgs.write().unwrap().packets {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
@ -30,7 +30,7 @@ fn producer(addr: &SocketAddr, recycler: &PacketRecycler, exit: Arc<AtomicBool>)
return;
}
let mut num = 0;
for p in &msgs_.read().packets {
for p in &msgs_.read().unwrap().packets {
let a = p.meta.addr();
assert!(p.meta.size < BLOB_SIZE);
send.send_to(&p.data[..p.meta.size], &a).unwrap();
@ -47,7 +47,7 @@ fn sink(exit: Arc<AtomicBool>, rvs: Arc<AtomicUsize>, r: PacketReceiver) -> Join
}
let timer = Duration::new(1, 0);
if let Ok(msgs) = r.recv_timeout(timer) {
rvs.fetch_add(msgs.read().packets.len(), Ordering::Relaxed);
rvs.fetch_add(msgs.read().unwrap().packets.len(), Ordering::Relaxed);
}
})
}
@ -72,7 +72,6 @@ fn main() -> Result<()> {
let mut addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let exit = Arc::new(AtomicBool::new(false));
let pack_recycler = PacketRecycler::default();
let mut read_channels = Vec::new();
let mut read_threads = Vec::new();
@ -93,9 +92,9 @@ fn main() -> Result<()> {
));
}
let t_producer1 = producer(&addr, &pack_recycler, exit.clone());
let t_producer2 = producer(&addr, &pack_recycler, exit.clone());
let t_producer3 = producer(&addr, &pack_recycler, exit.clone());
let t_producer1 = producer(&addr, exit.clone());
let t_producer2 = producer(&addr, exit.clone());
let t_producer3 = producer(&addr, exit.clone());
let rvs = Arc::new(AtomicUsize::new(0));
let sink_threads: Vec<_> = read_channels

View File

@ -7,7 +7,7 @@ use entry::Entry;
use erasure;
use ledger::Block;
use log::Level;
use packet::{BlobRecycler, SharedBlobs};
use packet::SharedBlobs;
use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
@ -32,7 +32,6 @@ fn broadcast(
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
recycler: &BlobRecycler,
receiver: &Receiver<Vec<Entry>>,
sock: &UdpSocket,
transmit_index: &mut WindowIndex,
@ -54,7 +53,7 @@ fn broadcast(
let to_blobs_start = Instant::now();
let dq: SharedBlobs = ventries
.into_par_iter()
.flat_map(|p| p.to_blobs(recycler))
.flat_map(|p| p.to_blobs())
.collect();
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
@ -85,19 +84,29 @@ fn broadcast(
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
for b in &blobs {
let ix = b.read().get_index().expect("blob index");
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
if let Some(x) = win[pos].data.take() {
trace!("{} popped {} at {}", id, x.read().get_index().unwrap(), pos);
trace!(
"{} popped {} at {}",
id,
x.read().unwrap().get_index().unwrap(),
pos
);
}
if let Some(x) = win[pos].coding.take() {
trace!("{} popped {} at {}", id, x.read().get_index().unwrap(), pos);
trace!(
"{} popped {} at {}",
id,
x.read().unwrap().get_index().unwrap(),
pos
);
}
trace!("{} null {}", id, pos);
}
for b in &blobs {
let ix = b.read().get_index().expect("blob index");
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("{} caching {} at {}", id, ix, pos);
assert!(win[pos].data.is_none());
@ -111,7 +120,6 @@ fn broadcast(
erasure::generate_coding(
&id,
&mut window.write().unwrap(),
recycler,
*receive_index,
blobs_len,
&mut transmit_index.coding,
@ -174,7 +182,6 @@ impl BroadcastStage {
crdt: &Arc<RwLock<Crdt>>,
window: &SharedWindow,
entry_height: u64,
recycler: &BlobRecycler,
receiver: &Receiver<Vec<Entry>>,
) -> BroadcastStageReturnType {
let mut transmit_index = WindowIndex {
@ -211,7 +218,6 @@ impl BroadcastStage {
&me,
&broadcast_table,
&window,
&recycler,
&receiver,
&sock,
&mut transmit_index,
@ -239,7 +245,6 @@ impl BroadcastStage {
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - CRDT structure
/// * `window` - Cache of blobs that we have broadcast
/// * `recycler` - Blob recycler.
/// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
/// * `exit_sender` - Set to true when this stage exits, allows rest of Tpu to exit cleanly. Otherwise,
/// when a Tpu stage closes, it only closes the stages that come after it. The stages
@ -256,12 +261,11 @@ impl BroadcastStage {
receiver: Receiver<Vec<Entry>>,
exit_sender: Arc<AtomicBool>,
) -> Self {
let recycler = BlobRecycler::default();
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_sender);
Self::run(&sock, &crdt, &window, entry_height, &recycler, &receiver)
Self::run(&sock, &crdt, &window, entry_height, &receiver)
}).unwrap();
BroadcastStage { thread_hdl }
@ -356,7 +360,7 @@ mod tests {
let window = shared_window.read().unwrap();
window.iter().fold(0, |m, w_slot| {
if let Some(ref blob) = w_slot.data {
cmp::max(m, blob.read().get_index().unwrap())
cmp::max(m, blob.read().unwrap().get_index().unwrap())
} else {
m
}

View File

@ -20,7 +20,7 @@ use hash::Hash;
use ledger::LedgerWindow;
use log::Level;
use netutil::{bind_in_range, bind_to, multi_bind_in_range};
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use packet::{to_blob, Blob, SharedBlob, BLOB_SIZE};
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use result::{Error, Result};
@ -606,7 +606,7 @@ impl Crdt {
// only leader should be broadcasting
assert!(me.leader_id != v.id);
let bl = b.unwrap();
let blob = bl.read();
let blob = bl.read().unwrap();
//TODO profile this, may need multiple sockets for par_iter
trace!(
"{}: BROADCAST idx: {} sz: {} to {},{} coding: {}",
@ -658,9 +658,10 @@ impl Crdt {
(s.my_data().clone(), s.table.values().cloned().collect())
};
blob.write()
.unwrap()
.set_id(me.id)
.expect("set_id in pub fn retransmit");
let rblob = blob.read();
let rblob = blob.read().unwrap();
let orders: Vec<_> = table
.iter()
.filter(|v| {
@ -814,11 +815,7 @@ impl Crdt {
}
/// At random pick a node and try to get updated changes from them
fn run_gossip(
obj: &Arc<RwLock<Self>>,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
) -> Result<()> {
fn run_gossip(obj: &Arc<RwLock<Self>>, blob_sender: &BlobSender) -> Result<()> {
//TODO we need to keep track of stakes and weight the selection by stake size
//TODO cache sockets
@ -831,7 +828,7 @@ impl Crdt {
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
let blob = to_blob(req, remote_gossip_addr, blob_recycler)?;
let blob = to_blob(req, remote_gossip_addr)?;
blob_sender.send(vec![blob])?;
Ok(())
}
@ -918,12 +915,11 @@ impl Crdt {
blob_sender: BlobSender,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let blob_recycler = BlobRecycler::default();
Builder::new()
.name("solana-gossip".to_string())
.spawn(move || loop {
let start = timestamp();
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler);
let _ = Self::run_gossip(&obj, &blob_sender);
if exit.load(Ordering::Relaxed) {
return;
}
@ -945,11 +941,10 @@ impl Crdt {
ledger_window: &mut Option<&mut LedgerWindow>,
me: &NodeInfo,
ix: u64,
blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> {
let pos = (ix as usize) % window.read().unwrap().len();
if let Some(ref mut blob) = &mut window.write().unwrap()[pos].data {
let mut wblob = blob.write();
let mut wblob = blob.write().unwrap();
let blob_ix = wblob.get_index().expect("run_window_request get_index");
if blob_ix == ix {
let num_retransmits = wblob.meta.num_retransmits;
@ -968,11 +963,11 @@ impl Crdt {
sender_id = me.id
}
let out = blob_recycler.allocate();
let out = SharedBlob::default();
// copy to avoid doing IO inside the lock
{
let mut outblob = out.write();
let mut outblob = out.write().unwrap();
let sz = wblob.meta.size;
outblob.meta.size = sz;
outblob.data[..sz].copy_from_slice(&wblob.data[..sz]);
@ -998,7 +993,6 @@ impl Crdt {
inc_new_counter_info!("crdt-window-request-ledger", 1);
let out = entry.to_blob(
blob_recycler,
Some(ix),
Some(me.id), // causes retransmission if I'm the leader
Some(from_addr),
@ -1025,18 +1019,12 @@ impl Crdt {
obj: &Arc<RwLock<Self>>,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
blob_recycler: &BlobRecycler,
blob: &Blob,
) -> Option<SharedBlob> {
match deserialize(&blob.data[..blob.meta.size]) {
Ok(request) => Crdt::handle_protocol(
obj,
&blob.meta.addr(),
request,
window,
ledger_window,
blob_recycler,
),
Ok(request) => {
Crdt::handle_protocol(obj, &blob.meta.addr(), request, window, ledger_window)
}
Err(_) => {
warn!("deserialize crdt packet failed");
None
@ -1050,7 +1038,6 @@ impl Crdt {
request: Protocol,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> {
match request {
// TODO sigverify these
@ -1119,7 +1106,7 @@ impl Crdt {
} else {
let rsp = Protocol::ReceiveUpdates(from_id, ups, data, liveness);
if let Ok(r) = to_blob(rsp, from.contact_info.ncp, &blob_recycler) {
if let Ok(r) = to_blob(rsp, from.contact_info.ncp) {
trace!(
"sending updates me {} len {} to {} {}",
id,
@ -1176,15 +1163,8 @@ impl Crdt {
let me = me.read().unwrap().my_data().clone();
inc_new_counter_info!("crdt-window-request-recv", 1);
trace!("{}: received RequestWindowIndex {} {} ", me.id, from.id, ix,);
let res = Self::run_window_request(
&from,
&from_addr,
&window,
ledger_window,
&me,
ix,
blob_recycler,
);
let res =
Self::run_window_request(&from, &from_addr, &window, ledger_window, &me, ix);
report_time_spent(
"RequestWindowIndex",
&now.elapsed(),
@ -1200,7 +1180,6 @@ impl Crdt {
obj: &Arc<RwLock<Self>>,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
blob_recycler: &BlobRecycler,
requests_receiver: &BlobReceiver,
response_sender: &BlobSender,
) -> Result<()> {
@ -1212,8 +1191,7 @@ impl Crdt {
}
let mut resps = Vec::new();
for req in reqs {
if let Some(resp) =
Self::handle_blob(obj, window, ledger_window, blob_recycler, &req.read())
if let Some(resp) = Self::handle_blob(obj, window, ledger_window, &req.read().unwrap())
{
resps.push(resp);
}
@ -1230,7 +1208,6 @@ impl Crdt {
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let mut ledger_window = ledger_path.map(|p| LedgerWindow::open(p).unwrap());
let blob_recycler = BlobRecycler::default();
Builder::new()
.name("solana-listen".to_string())
@ -1239,7 +1216,6 @@ impl Crdt {
&me,
&window,
&mut ledger_window.as_mut(),
&blob_recycler,
&requests_receiver,
&response_sender,
);
@ -1408,7 +1384,7 @@ mod tests {
use hash::{hash, Hash};
use ledger::{LedgerWindow, LedgerWriter};
use logger;
use packet::BlobRecycler;
use packet::SharedBlob;
use result::Error;
use signature::{Keypair, KeypairUtil};
use solana_program_interface::pubkey::Pubkey;
@ -1661,9 +1637,9 @@ mod tests {
}
assert!(rv.len() > 0);
for i in rv.iter() {
if i.read().meta.addr() == nxt1.contact_info.ncp {
if i.read().unwrap().meta.addr() == nxt1.contact_info.ncp {
one = true;
} else if i.read().meta.addr() == nxt2.contact_info.ncp {
} else if i.read().unwrap().meta.addr() == nxt2.contact_info.ncp {
two = true;
} else {
//unexpected request
@ -1760,43 +1736,18 @@ mod tests {
socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"),
);
let recycler = BlobRecycler::default();
let rv = Crdt::run_window_request(
&me,
&socketaddr_any!(),
&window,
&mut None,
&me,
0,
&recycler,
);
let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0);
assert!(rv.is_none());
let out = recycler.allocate();
out.write().meta.size = 200;
let out = SharedBlob::default();
out.write().unwrap().meta.size = 200;
window.write().unwrap()[0].data = Some(out);
let rv = Crdt::run_window_request(
&me,
&socketaddr_any!(),
&window,
&mut None,
&me,
0,
&recycler,
);
let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, 0);
assert!(rv.is_some());
let v = rv.unwrap();
//test we copied the blob
assert_eq!(v.read().meta.size, 200);
assert_eq!(v.read().unwrap().meta.size, 200);
let len = window.read().unwrap().len() as u64;
let rv = Crdt::run_window_request(
&me,
&socketaddr_any!(),
&window,
&mut None,
&me,
len,
&recycler,
);
let rv = Crdt::run_window_request(&me, &socketaddr_any!(), &window, &mut None, &me, len);
assert!(rv.is_none());
fn tmp_ledger(name: &str) -> String {
@ -1825,7 +1776,6 @@ mod tests {
&mut Some(&mut ledger_window),
&me,
1,
&recycler,
);
assert!(rv.is_some());
@ -1842,22 +1792,13 @@ mod tests {
let mock_peer = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let recycler = BlobRecycler::default();
// Simulate handling a repair request from mock_peer
let rv = Crdt::run_window_request(
&mock_peer,
&socketaddr_any!(),
&window,
&mut None,
&me,
0,
&recycler,
);
let rv =
Crdt::run_window_request(&mock_peer, &socketaddr_any!(), &window, &mut None, &me, 0);
assert!(rv.is_none());
let blob = recycler.allocate();
let blob = SharedBlob::default();
let blob_size = 200;
blob.write().meta.size = blob_size;
blob.write().unwrap().meta.size = blob_size;
window.write().unwrap()[0].data = Some(blob);
let num_requests: u32 = 64;
@ -1869,9 +1810,8 @@ mod tests {
&mut None,
&me,
0,
&recycler,
).unwrap();
let blob = shared_blob.read();
let blob = shared_blob.read().unwrap();
// Test we copied the blob
assert_eq!(blob.meta.size, blob_size);
@ -1944,7 +1884,6 @@ mod tests {
fn protocol_requestupdate_alive() {
logger::setup();
let window = Arc::new(RwLock::new(default_window()));
let recycler = BlobRecycler::default();
let node = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let node_with_same_addr = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
@ -1958,37 +1897,18 @@ mod tests {
let request = Protocol::RequestUpdates(1, node.clone());
assert!(
Crdt::handle_protocol(
&obj,
&node.contact_info.ncp,
request,
&window,
&mut None,
&recycler
).is_none()
Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,)
.is_none()
);
let request = Protocol::RequestUpdates(1, node_with_same_addr.clone());
assert!(
Crdt::handle_protocol(
&obj,
&node.contact_info.ncp,
request,
&window,
&mut None,
&recycler
).is_none()
Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None,)
.is_none()
);
let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone());
Crdt::handle_protocol(
&obj,
&node.contact_info.ncp,
request,
&window,
&mut None,
&recycler,
);
Crdt::handle_protocol(&obj, &node.contact_info.ncp, request, &window, &mut None);
let me = obj.write().unwrap();

View File

@ -5,7 +5,7 @@
use bincode::{serialize_into, serialized_size};
use budget_transaction::BudgetTransaction;
use hash::Hash;
use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE};
use packet::{SharedBlob, BLOB_DATA_SIZE};
use poh::Poh;
use rayon::prelude::*;
use solana_program_interface::pubkey::Pubkey;
@ -70,14 +70,13 @@ impl Entry {
pub fn to_blob(
&self,
blob_recycler: &BlobRecycler,
idx: Option<u64>,
id: Option<Pubkey>,
addr: Option<&SocketAddr>,
) -> SharedBlob {
let blob = blob_recycler.allocate();
let blob = SharedBlob::default();
{
let mut blob_w = blob.write();
let mut blob_w = blob.write().unwrap();
let pos = {
let mut out = Cursor::new(blob_w.data_mut());
serialize_into(&mut out, &self).expect("failed to serialize output");

View File

@ -1,5 +1,5 @@
// Support erasure coding
use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE};
use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE};
use solana_program_interface::pubkey::Pubkey;
use std::cmp;
use std::mem;
@ -217,7 +217,6 @@ pub fn decode_blocks(
pub fn generate_coding(
id: &Pubkey,
window: &mut [WindowSlot],
recycler: &BlobRecycler,
receive_index: u64,
num_blobs: usize,
transmit_index_coding: &mut u64,
@ -285,7 +284,7 @@ pub fn generate_coding(
let n = i % window.len();
assert!(window[n].coding.is_none());
window[n].coding = Some(recycler.allocate());
window[n].coding = Some(SharedBlob::default());
let coding = window[n].coding.clone().unwrap();
let mut coding_wl = coding.write();
@ -408,17 +407,10 @@ fn find_missing(
// Recover a missing block into window
// missing blocks should be None or old...
// Use recycler to allocate new ones.
// If not enough coding or data blocks are present to restore
// any of the blocks, the block is skipped.
// Side effect: old blobs in a block are None'd
pub fn recover(
id: &Pubkey,
recycler: &BlobRecycler,
window: &mut [WindowSlot],
start_idx: u64,
start: usize,
) -> Result<()> {
pub fn recover(id: &Pubkey, window: &mut [WindowSlot], start_idx: u64, start: usize) -> Result<()> {
let block_start = start - (start % NUM_DATA);
let block_start_idx = start_idx - (start_idx % NUM_DATA as u64);
@ -478,7 +470,7 @@ pub fn recover(
}
blobs.push(b);
} else {
let n = recycler.allocate();
let n = SharedBlob::default();
window[j].data = Some(n.clone());
// mark the missing memory
blobs.push(n);
@ -499,7 +491,7 @@ pub fn recover(
}
blobs.push(b);
} else {
let n = recycler.allocate();
let n = SharedBlob::default();
window[j].coding = Some(n.clone());
//mark the missing memory
blobs.push(n);
@ -602,7 +594,7 @@ mod test {
use crdt;
use erasure;
use logger;
use packet::{BlobRecycler, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE};
use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE};
use rand::{thread_rng, Rng};
use signature::{Keypair, KeypairUtil};
use solana_program_interface::pubkey::Pubkey;
@ -698,11 +690,7 @@ mod test {
}
const WINDOW_SIZE: usize = 64;
fn generate_window(
blob_recycler: &BlobRecycler,
offset: usize,
num_blobs: usize,
) -> Vec<WindowSlot> {
fn generate_window(offset: usize, num_blobs: usize) -> Vec<WindowSlot> {
let mut window = vec![
WindowSlot {
data: None,
@ -713,7 +701,7 @@ mod test {
];
let mut blobs = Vec::with_capacity(num_blobs);
for i in 0..num_blobs {
let b = blob_recycler.allocate();
let b = SharedBlob::default();
let b_ = b.clone();
let mut w = b.write();
// generate a random length, multiple of 4 between 8 and 32
@ -764,36 +752,13 @@ mod test {
}
}
fn pollute_recycler(blob_recycler: &BlobRecycler) {
let mut blobs = Vec::with_capacity(WINDOW_SIZE * 2);
for _ in 0..WINDOW_SIZE * 10 {
let blob = blob_recycler.allocate();
{
let mut b_l = blob.write();
for i in 0..BLOB_SIZE {
b_l.data[i] = thread_rng().gen();
}
// some of the blobs should previously been used for coding
if thread_rng().gen_bool(erasure::NUM_CODING as f64 / erasure::NUM_DATA as f64) {
b_l.set_coding().unwrap();
}
}
blobs.push(blob);
}
}
#[test]
pub fn test_window_recover_basic() {
logger::setup();
let blob_recycler = BlobRecycler::default();
pollute_recycler(&blob_recycler);
// Generate a window
let offset = 0;
let num_blobs = erasure::NUM_DATA + 2;
let mut window = generate_window(&blob_recycler, WINDOW_SIZE, num_blobs);
let mut window = generate_window(WINDOW_SIZE, num_blobs);
for slot in &window {
if let Some(blob) = &slot.data {
@ -809,14 +774,8 @@ mod test {
let mut index = (erasure::NUM_DATA + 2) as u64;
let id = Pubkey::default();
assert!(
erasure::generate_coding(
&id,
&mut window,
&blob_recycler,
offset as u64,
num_blobs,
&mut index
).is_ok()
erasure::generate_coding(&id, &mut window, offset as u64, num_blobs, &mut index)
.is_ok()
);
assert_eq!(index, (erasure::NUM_DATA - erasure::NUM_CODING) as u64);
@ -835,15 +794,7 @@ mod test {
scramble_window_tails(&mut window, num_blobs);
// Recover it from coding
assert!(
erasure::recover(
&id,
&blob_recycler,
&mut window,
(offset + WINDOW_SIZE) as u64,
offset,
).is_ok()
);
assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok());
println!("** after-recover:");
print_window(&window);
@ -880,15 +831,7 @@ mod test {
print_window(&window);
// Recover it from coding
assert!(
erasure::recover(
&id,
&blob_recycler,
&mut window,
(offset + WINDOW_SIZE) as u64,
offset,
).is_ok()
);
assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok());
println!("** after-recover:");
print_window(&window);
@ -923,15 +866,7 @@ mod test {
print_window(&window);
// Recover it from coding
assert!(
erasure::recover(
&id,
&blob_recycler,
&mut window,
(offset + WINDOW_SIZE) as u64,
offset,
).is_ok()
);
assert!(erasure::recover(&id, &mut window, (offset + WINDOW_SIZE) as u64, offset,).is_ok());
println!("** after-recover:");
print_window(&window);
@ -968,11 +903,10 @@ mod test {
// #[ignore]
// pub fn test_window_recover() {
// logger::setup();
// let blob_recycler = BlobRecycler::default();
// let offset = 4;
// let data_len = 16;
// let num_blobs = erasure::NUM_DATA + 2;
// let (mut window, blobs_len) = generate_window(data_len, &blob_recycler, offset, num_blobs);
// let (mut window, blobs_len) = generate_window(data_len, offset, num_blobs);
// println!("** after-gen:");
// print_window(&window);
// assert!(erasure::generate_coding(&mut window, offset, blobs_len).is_ok());
@ -989,7 +923,7 @@ mod test {
// window_l0.write().unwrap().data[0] = 55;
// println!("** after-nulling:");
// print_window(&window);
// assert!(erasure::recover(&blob_recycler, &mut window, offset, offset + blobs_len).is_ok());
// assert!(erasure::recover(&mut window, offset, offset + blobs_len).is_ok());
// println!("** after-restore:");
// print_window(&window);
// let window_l = window[offset + 1].clone().unwrap();

View File

@ -552,7 +552,7 @@ mod tests {
use crdt::Node;
use fullnode::{Fullnode, FullnodeReturnType};
use ledger::genesis;
use packet::{make_consecutive_blobs, BlobRecycler};
use packet::make_consecutive_blobs;
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::cmp;
@ -658,7 +658,6 @@ mod tests {
);
// Send blobs to the validator from our mock leader
let resp_recycler = BlobRecycler::default();
let t_responder = {
let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> = leader_node
@ -685,15 +684,11 @@ mod tests {
.expect("expected at least one genesis entry")
.id;
let tvu_address = &validator_info.contact_info.tvu;
let msgs = make_consecutive_blobs(
leader_id,
total_blobs_to_send,
last_id,
&tvu_address,
&resp_recycler,
).into_iter()
.rev()
.collect();
let msgs =
make_consecutive_blobs(leader_id, total_blobs_to_send, last_id, &tvu_address)
.into_iter()
.rev()
.collect();
s_responder.send(msgs).expect("send");
t_responder
};

View File

@ -10,7 +10,7 @@ use hash::Hash;
use log::Level::Trace;
#[cfg(test)]
use mint::Mint;
use packet::{self, SharedBlob, BLOB_DATA_SIZE};
use packet::{SharedBlob, BLOB_DATA_SIZE};
use rayon::prelude::*;
use result::{Error, Result};
#[cfg(test)]
@ -403,14 +403,8 @@ pub fn read_ledger(
pub trait Block {
/// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify(&self, start_hash: &Hash) -> bool;
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler) -> Vec<SharedBlob>;
fn to_blobs_with_id(
&self,
blob_recycler: &packet::BlobRecycler,
id: Pubkey,
start_id: u64,
addr: &SocketAddr,
) -> Vec<SharedBlob>;
fn to_blobs(&self) -> Vec<SharedBlob>;
fn to_blobs_with_id(&self, id: Pubkey, start_id: u64, addr: &SocketAddr) -> Vec<SharedBlob>;
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>;
}
@ -431,28 +425,16 @@ impl Block for [Entry] {
})
}
fn to_blobs_with_id(
&self,
blob_recycler: &packet::BlobRecycler,
id: Pubkey,
start_idx: u64,
addr: &SocketAddr,
) -> Vec<SharedBlob> {
fn to_blobs_with_id(&self, id: Pubkey, start_idx: u64, addr: &SocketAddr) -> Vec<SharedBlob> {
self.iter()
.enumerate()
.map(|(i, entry)| {
entry.to_blob(
blob_recycler,
Some(start_idx + i as u64),
Some(id),
Some(&addr),
)
}).collect()
.map(|(i, entry)| entry.to_blob(Some(start_idx + i as u64), Some(id), Some(&addr)))
.collect()
}
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler) -> Vec<SharedBlob> {
fn to_blobs(&self) -> Vec<SharedBlob> {
let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
self.to_blobs_with_id(blob_recycler, Pubkey::default(), 0, &default_addr)
self.to_blobs_with_id(Pubkey::default(), 0, &default_addr)
}
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)> {
@ -471,7 +453,7 @@ pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<Vec<Entr
for blob in blobs {
let entry = {
let msg = blob.read();
let msg = blob.read().unwrap();
let msg_size = msg.get_size()?;
deserialize(&msg.data()[..msg_size])
};
@ -586,7 +568,7 @@ mod tests {
use chrono::prelude::*;
use entry::{next_entry, Entry};
use hash::hash;
use packet::{BlobRecycler, BLOB_DATA_SIZE, PACKET_DATA_SIZE};
use packet::{to_blobs, BLOB_DATA_SIZE, PACKET_DATA_SIZE};
use signature::{Keypair, KeypairUtil};
use std;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@ -672,8 +654,7 @@ mod tests {
logger::setup();
let entries = make_test_entries();
let blob_recycler = BlobRecycler::default();
let blob_q = entries.to_blobs(&blob_recycler);
let blob_q = entries.to_blobs();
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries);
}
@ -682,9 +663,8 @@ mod tests {
fn test_bad_blobs_attack() {
use logger;
logger::setup();
let blob_recycler = BlobRecycler::default();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
let blobs_q = packet::to_blobs(vec![(0, addr)], &blob_recycler).unwrap(); // <-- attack!
let blobs_q = to_blobs(vec![(0, addr)]).unwrap(); // <-- attack!
assert!(reconstruct_entries_from_blobs(blobs_q).is_err());
}

View File

@ -41,7 +41,6 @@ pub mod payment_plan;
pub mod poh;
pub mod poh_recorder;
pub mod recvmmsg;
pub mod recycler;
pub mod replicate_stage;
pub mod replicator;
pub mod request;

View File

@ -8,7 +8,6 @@ use hash::Hash;
use ledger::{next_entries_mut, Block};
use log::Level;
use recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
use recycler;
use result::{Error, Result};
use serde::Serialize;
use solana_program_interface::pubkey::Pubkey;
@ -17,12 +16,11 @@ use std::io;
use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
pub type SharedPackets = recycler::Recyclable<Packets>;
pub type SharedBlob = recycler::Recyclable<Blob>;
pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>;
pub type SharedBlobs = Vec<SharedBlob>;
pub type PacketRecycler = recycler::Recycler<Packets>;
pub type BlobRecycler = recycler::Recycler<Blob>;
pub const NUM_PACKETS: usize = 1024 * 8;
pub const BLOB_SIZE: usize = (64 * 1024 - 128); // wikipedia says there should be 20b for ipv4 headers
@ -67,12 +65,6 @@ impl Default for Packet {
}
}
impl recycler::Reset for Packet {
fn reset(&mut self) {
self.meta = Meta::default();
}
}
impl Meta {
pub fn addr(&self) -> SocketAddr {
if !self.v6 {
@ -127,14 +119,6 @@ impl Default for Packets {
}
}
impl recycler::Reset for Packets {
fn reset(&mut self) {
for i in 0..self.packets.len() {
self.packets[i].reset();
}
}
}
#[derive(Clone)]
pub struct Blob {
pub data: [u8; BLOB_SIZE],
@ -162,13 +146,6 @@ impl Default for Blob {
}
}
impl recycler::Reset for Blob {
fn reset(&mut self) {
self.meta = Meta::default();
self.data[..BLOB_HEADER_SIZE].copy_from_slice(&[0u8; BLOB_HEADER_SIZE]);
}
}
#[derive(Debug)]
pub enum BlobError {
/// the Blob's meta and data are not self-consistent
@ -226,16 +203,15 @@ impl Packets {
}
}
pub fn to_packets_chunked<T: Serialize>(
r: &PacketRecycler,
xs: &[T],
chunks: usize,
) -> Vec<SharedPackets> {
pub fn to_packets_chunked<T: Serialize>(xs: &[T], chunks: usize) -> Vec<SharedPackets> {
let mut out = vec![];
for x in xs.chunks(chunks) {
let mut p = r.allocate();
p.write().packets.resize(x.len(), Default::default());
for (i, o) in x.iter().zip(p.write().packets.iter_mut()) {
let mut p = SharedPackets::default();
p.write()
.unwrap()
.packets
.resize(x.len(), Default::default());
for (i, o) in x.iter().zip(p.write().unwrap().packets.iter_mut()) {
let v = serialize(&i).expect("serialize request");
let len = v.len();
o.data[..len].copy_from_slice(&v);
@ -246,18 +222,14 @@ pub fn to_packets_chunked<T: Serialize>(
out
}
pub fn to_packets<T: Serialize>(r: &PacketRecycler, xs: &[T]) -> Vec<SharedPackets> {
to_packets_chunked(r, xs, NUM_PACKETS)
pub fn to_packets<T: Serialize>(xs: &[T]) -> Vec<SharedPackets> {
to_packets_chunked(xs, NUM_PACKETS)
}
pub fn to_blob<T: Serialize>(
resp: T,
rsp_addr: SocketAddr,
blob_recycler: &BlobRecycler,
) -> Result<SharedBlob> {
let blob = blob_recycler.allocate();
pub fn to_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<SharedBlob> {
let blob = SharedBlob::default();
{
let mut b = blob.write();
let mut b = blob.write().unwrap();
let v = serialize(&resp)?;
let len = v.len();
assert!(len <= BLOB_SIZE);
@ -268,13 +240,10 @@ pub fn to_blob<T: Serialize>(
Ok(blob)
}
pub fn to_blobs<T: Serialize>(
rsps: Vec<(T, SocketAddr)>,
blob_recycler: &BlobRecycler,
) -> Result<SharedBlobs> {
pub fn to_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<SharedBlobs> {
let mut blobs = Vec::new();
for (resp, rsp_addr) in rsps {
blobs.push(to_blob(resp, rsp_addr, blob_recycler)?);
blobs.push(to_blob(resp, rsp_addr)?);
}
Ok(blobs)
}
@ -374,7 +343,7 @@ impl Blob {
}
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
let mut p = r.write();
let mut p = r.write().unwrap();
trace!("receiving on {}", socket.local_addr().unwrap());
let (nrecv, from) = socket.recv_from(&mut p.data)?;
@ -384,7 +353,7 @@ impl Blob {
Ok(())
}
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<SharedBlobs> {
pub fn recv_from(socket: &UdpSocket) -> Result<SharedBlobs> {
let mut v = Vec::new();
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
@ -394,7 +363,7 @@ impl Blob {
// * set it back to blocking before returning
socket.set_nonblocking(false)?;
for i in 0..NUM_BLOBS {
let r = re.allocate();
let r = SharedBlob::default();
match Blob::recv_blob(socket, &r) {
Err(_) if i > 0 => {
@ -418,7 +387,7 @@ impl Blob {
pub fn send_to(socket: &UdpSocket, v: SharedBlobs) -> Result<()> {
for r in v {
{
let p = r.read();
let p = r.read().unwrap();
let a = p.meta.addr();
if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) {
warn!(
@ -439,7 +408,6 @@ pub fn make_consecutive_blobs(
num_blobs_to_make: u64,
start_hash: Hash,
addr: &SocketAddr,
resp_recycler: &BlobRecycler,
) -> SharedBlobs {
let mut last_hash = start_hash;
let mut num_hashes = 0;
@ -447,7 +415,7 @@ pub fn make_consecutive_blobs(
for _ in 0..num_blobs_to_make {
all_entries.extend(next_entries_mut(&mut last_hash, &mut num_hashes, vec![]));
}
let mut new_blobs = all_entries.to_blobs_with_id(&resp_recycler, me_id, 0, addr);
let mut new_blobs = all_entries.to_blobs_with_id(me_id, 0, addr);
new_blobs.truncate(num_blobs_to_make as usize);
new_blobs
}
@ -455,10 +423,9 @@ pub fn make_consecutive_blobs(
#[cfg(test)]
mod tests {
use packet::{
to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, BLOB_HEADER_SIZE,
NUM_PACKETS, PACKET_DATA_SIZE,
to_packets, Blob, Meta, Packet, Packets, SharedBlob, SharedPackets, NUM_PACKETS,
PACKET_DATA_SIZE,
};
use recycler::Reset;
use request::Request;
use std::io;
use std::io::Write;
@ -470,16 +437,15 @@ mod tests {
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let saddr = sender.local_addr().unwrap();
let r = PacketRecycler::default();
let p = r.allocate();
p.write().packets.resize(10, Packet::default());
for m in p.write().packets.iter_mut() {
let p = SharedPackets::default();
p.write().unwrap().packets.resize(10, Packet::default());
for m in p.write().unwrap().packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.size = PACKET_DATA_SIZE;
}
p.read().send_to(&sender).unwrap();
p.write().recv_from(&reader).unwrap();
for m in p.write().packets.iter_mut() {
p.read().unwrap().send_to(&sender).unwrap();
p.write().unwrap().recv_from(&reader).unwrap();
for m in p.write().unwrap().packets.iter_mut() {
assert_eq!(m.meta.size, PACKET_DATA_SIZE);
assert_eq!(m.meta.addr(), saddr);
}
@ -488,19 +454,18 @@ mod tests {
#[test]
fn test_to_packets() {
let tx = Request::GetTransactionCount;
let re = PacketRecycler::default();
let rv = to_packets(&re, &vec![tx.clone(); 1]);
let rv = to_packets(&vec![tx.clone(); 1]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().packets.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), 1);
let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS]);
let rv = to_packets(&vec![tx.clone(); NUM_PACKETS]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().packets.len(), NUM_PACKETS);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS + 1]);
let rv = to_packets(&vec![tx.clone(); NUM_PACKETS + 1]);
assert_eq!(rv.len(), 2);
assert_eq!(rv[0].read().packets.len(), NUM_PACKETS);
assert_eq!(rv[1].read().packets.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
}
#[test]
@ -509,17 +474,16 @@ mod tests {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let r = BlobRecycler::default();
let p = r.allocate();
p.write().meta.set_addr(&addr);
p.write().meta.size = 1024;
let p = SharedBlob::default();
p.write().unwrap().meta.set_addr(&addr);
p.write().unwrap().meta.size = 1024;
let v = vec![p];
Blob::send_to(&sender, v).unwrap();
trace!("send_to");
let rv = Blob::recv_from(&r, &reader).unwrap();
let rv = Blob::recv_from(&reader).unwrap();
trace!("recv_from");
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().meta.size, 1024);
assert_eq!(rv[0].read().unwrap().meta.size, 1024);
}
#[cfg(all(feature = "ipv6", test))]
@ -528,14 +492,13 @@ mod tests {
let reader = UdpSocket::bind("[::1]:0").expect("bind");
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("[::1]:0").expect("bind");
let r = BlobRecycler::default();
let p = r.allocate();
p.as_mut().meta.set_addr(&addr);
p.as_mut().meta.size = 1024;
let p = SharedBlob::default();
p.as_mut().unwrap().meta.set_addr(&addr);
p.as_mut().unwrap().meta.size = 1024;
let mut v = VecDeque::default();
v.push_back(p);
Blob::send_to(&r, &sender, &mut v).unwrap();
let mut rv = Blob::recv_from(&r, &reader).unwrap();
let mut rv = Blob::recv_from(&reader).unwrap();
let rp = rv.pop_front().unwrap();
assert_eq!(rp.as_mut().meta.size, 1024);
}
@ -554,8 +517,6 @@ mod tests {
b.data_mut()[0] = 1;
assert_eq!(b.data()[0], 1);
assert_eq!(b.get_index().unwrap(), <u64>::max_value());
b.reset();
assert!(b.data[..BLOB_HEADER_SIZE].starts_with(&[0u8; BLOB_HEADER_SIZE]));
assert_eq!(b.meta, Meta::default());
}

View File

@ -1,170 +0,0 @@
use std::fmt;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
/// A function that leaves the given type in the same state as Default,
/// but starts with an existing type instead of allocating a new one.
pub trait Reset {
fn reset(&mut self);
}
/// An value that's returned to its heap once dropped.
pub struct Recyclable<T: Default + Reset> {
val: Arc<RwLock<T>>,
landfill: Arc<Mutex<Vec<Arc<RwLock<T>>>>>,
}
impl<T: Default + Reset> Recyclable<T> {
pub fn read(&self) -> RwLockReadGuard<T> {
self.val.read().unwrap()
}
pub fn write(&self) -> RwLockWriteGuard<T> {
self.val.write().unwrap()
}
}
impl<T: Default + Reset> Drop for Recyclable<T> {
fn drop(&mut self) {
if Arc::strong_count(&self.val) == 1 {
// this isn't thread safe, it will allow some concurrent drops to leak and not recycle
// if that happens the allocator will end up allocating from the heap
self.landfill.lock().unwrap().push(self.val.clone());
}
}
}
impl<T: Default + Reset> Clone for Recyclable<T> {
fn clone(&self) -> Self {
Recyclable {
val: self.val.clone(),
landfill: self.landfill.clone(),
}
}
}
impl<T: fmt::Debug + Default + Reset> fmt::Debug for Recyclable<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Recyclable {:?}", &self.read())
}
}
/// An object to minimize memory allocations. Use `allocate()`
/// to get recyclable values of type `T`. When those recyclables
/// are dropped, they're returned to the recycler. The next time
/// `allocate()` is called, the value will be pulled from the
/// recycler instead being allocated from memory.
pub struct Recycler<T: Default + Reset> {
landfill: Arc<Mutex<Vec<Arc<RwLock<T>>>>>,
}
impl<T: Default + Reset> Clone for Recycler<T> {
fn clone(&self) -> Self {
Recycler {
landfill: self.landfill.clone(),
}
}
}
impl<T: Default + Reset> Default for Recycler<T> {
fn default() -> Self {
Recycler {
landfill: Arc::new(Mutex::new(vec![])),
}
}
}
impl<T: Default + Reset> Recycler<T> {
pub fn allocate(&self) -> Recyclable<T> {
let val = self
.landfill
.lock()
.unwrap()
.pop()
.map(|val| {
val.write().unwrap().reset();
val
}).unwrap_or_else(|| Arc::new(RwLock::new(Default::default())));
Recyclable {
val,
landfill: self.landfill.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::mem;
use std::sync::mpsc::channel;
#[derive(Default)]
struct Foo {
x: u8,
}
impl Reset for Foo {
fn reset(&mut self) {
self.x = 0;
}
}
#[test]
fn test_allocate() {
let recycler: Recycler<Foo> = Recycler::default();
let r = recycler.allocate();
assert_eq!(r.read().x, 0);
}
#[test]
fn test_recycle() {
let recycler: Recycler<Foo> = Recycler::default();
{
let foo = recycler.allocate();
foo.write().x = 1;
}
assert_eq!(recycler.landfill.lock().unwrap().len(), 1);
let foo = recycler.allocate();
assert_eq!(foo.read().x, 0);
assert_eq!(recycler.landfill.lock().unwrap().len(), 0);
}
#[test]
fn test_channel() {
let recycler: Recycler<Foo> = Recycler::default();
let (sender, receiver) = channel();
{
let foo = recycler.allocate();
foo.write().x = 1;
sender.send(foo).unwrap();
assert_eq!(recycler.landfill.lock().unwrap().len(), 0);
}
{
let foo = receiver.recv().unwrap();
assert_eq!(foo.read().x, 1);
assert_eq!(recycler.landfill.lock().unwrap().len(), 0);
}
assert_eq!(recycler.landfill.lock().unwrap().len(), 1);
}
#[test]
fn test_window() {
let recycler: Recycler<Foo> = Recycler::default();
let mut window = vec![None];
let (sender, receiver) = channel();
{
// item is in the window while its in the pipeline
// which is used to serve requests from other threads
let item = recycler.allocate();
item.write().x = 1;
window[0] = Some(item);
sender.send(window[0].clone().unwrap()).unwrap();
}
{
let foo = receiver.recv().unwrap();
assert_eq!(foo.read().x, 1);
let old = mem::replace(&mut window[0], None).unwrap();
assert_eq!(old.read().x, 1);
}
// only one thing should be in the landfill at the end
assert_eq!(recycler.landfill.lock().unwrap().len(), 1);
}
}

View File

@ -6,7 +6,6 @@ use crdt::Crdt;
use entry::EntryReceiver;
use ledger::{Block, LedgerWriter};
use log::Level;
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use signature::Keypair;
@ -48,7 +47,6 @@ impl ReplicateStage {
fn replicate_requests(
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
window_receiver: &EntryReceiver,
ledger_writer: Option<&mut LedgerWriter>,
keypair: &Arc<Keypair>,
@ -64,7 +62,7 @@ impl ReplicateStage {
let res = bank.process_entries(&entries);
if let Some(sender) = vote_blob_sender {
send_validator_vote(bank, keypair, crdt, blob_recycler, sender)?;
send_validator_vote(bank, keypair, crdt, sender)?;
}
{
@ -100,7 +98,6 @@ impl ReplicateStage {
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap());
let keypair = Arc::new(keypair);
let blob_recycler = BlobRecycler::default();
let t_replicate = Builder::new()
.name("solana-replicate-stage".to_string())
@ -120,7 +117,6 @@ impl ReplicateStage {
if let Err(e) = Self::replicate_requests(
&bank,
&crdt,
&blob_recycler,
&window_receiver,
ledger_writer.as_mut(),
&keypair,

View File

@ -3,7 +3,7 @@
use bincode::deserialize;
use counter::Counter;
use log::Level;
use packet::{to_blobs, BlobRecycler, Packets, SharedPackets};
use packet::{to_blobs, Packets, SharedPackets};
use rayon::prelude::*;
use request::Request;
use request_processor::RequestProcessor;
@ -38,7 +38,6 @@ impl RequestStage {
request_processor: &RequestProcessor,
packet_receiver: &Receiver<SharedPackets>,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
) -> Result<()> {
let (batch, batch_len, _recv_time) = streamer::recv_batch(packet_receiver)?;
@ -51,7 +50,7 @@ impl RequestStage {
let mut reqs_len = 0;
let proc_start = Instant::now();
for msgs in batch {
let reqs: Vec<_> = Self::deserialize_requests(&msgs.read())
let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap())
.into_iter()
.filter_map(|x| x)
.collect();
@ -59,7 +58,7 @@ impl RequestStage {
let rsps = request_processor.process_requests(reqs);
let blobs = to_blobs(rsps, blob_recycler)?;
let blobs = to_blobs(rsps)?;
if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len());
//don't wake up the other side if there is nothing
@ -85,7 +84,6 @@ impl RequestStage {
) -> (Self, BlobReceiver) {
let request_processor = Arc::new(request_processor);
let request_processor_ = request_processor.clone();
let blob_recycler = BlobRecycler::default();
let (blob_sender, blob_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-request-stage".to_string())
@ -94,7 +92,6 @@ impl RequestStage {
&request_processor_,
&packet_receiver,
&blob_sender,
&blob_recycler,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,

View File

@ -73,7 +73,10 @@ fn verify_packet_disabled(_packet: &Packet) -> u8 {
}
fn batch_size(batches: &[SharedPackets]) -> usize {
batches.iter().map(|p| p.read().packets.len()).sum()
batches
.iter()
.map(|p| p.read().unwrap().packets.len())
.sum()
}
#[cfg(not(feature = "cuda"))]
@ -87,8 +90,14 @@ pub fn ed25519_verify_cpu(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
info!("CPU ECDSA for {}", batch_size(batches));
let rv = batches
.into_par_iter()
.map(|p| p.read().packets.par_iter().map(verify_packet).collect())
.collect();
.map(|p| {
p.read()
.unwrap()
.packets
.par_iter()
.map(verify_packet)
.collect()
}).collect();
inc_new_counter_info!("ed25519_verify_cpu", count);
rv
}
@ -101,6 +110,7 @@ pub fn ed25519_verify_disabled(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
.into_par_iter()
.map(|p| {
p.read()
.unwrap()
.packets
.par_iter()
.map(verify_packet_disabled)
@ -196,7 +206,7 @@ pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
#[cfg(test)]
mod tests {
use bincode::serialize;
use packet::{Packet, PacketRecycler};
use packet::{Packet, SharedPackets};
use sigverify;
use system_transaction::{memfind, test_tx};
use transaction::Transaction;
@ -228,15 +238,18 @@ mod tests {
}
// generate packet vector
let packet_recycler = PacketRecycler::default();
let batches: Vec<_> = (0..2)
.map(|_| {
let packets = packet_recycler.allocate();
packets.write().packets.resize(0, Default::default());
let packets = SharedPackets::default();
packets
.write()
.unwrap()
.packets
.resize(0, Default::default());
for _ in 0..n {
packets.write().packets.push(packet.clone());
packets.write().unwrap().packets.push(packet.clone());
}
assert_eq!(packets.read().packets.len(), n);
assert_eq!(packets.read().unwrap().packets.len(), n);
packets
}).collect();
assert_eq!(batches.len(), 2);

View File

@ -2,7 +2,7 @@
//!
use influx_db_client as influxdb;
use metrics;
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlobs, SharedPackets};
use packet::{Blob, SharedBlobs, SharedPackets};
use result::{Error, Result};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
@ -20,22 +20,21 @@ pub type BlobReceiver = Receiver<SharedBlobs>;
fn recv_loop(
sock: &UdpSocket,
exit: &Arc<AtomicBool>,
re: &PacketRecycler,
channel: &PacketSender,
channel_tag: &'static str,
) -> Result<()> {
loop {
let msgs = re.allocate();
let msgs = SharedPackets::default();
loop {
// Check for exit signal, even if socket is busy
// (for instance the leader trasaction socket)
if exit.load(Ordering::Relaxed) {
return Ok(());
}
let result = msgs.write().recv_from(sock);
let result = msgs.write().unwrap().recv_from(sock);
match result {
Ok(()) => {
let len = msgs.read().packets.len();
let len = msgs.read().unwrap().packets.len();
metrics::submit(
influxdb::Point::new(channel_tag)
.add_field("count", influxdb::Value::Integer(len as i64))
@ -57,14 +56,13 @@ pub fn receiver(
sender_tag: &'static str,
) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
let recycler = PacketRecycler::default();
if res.is_err() {
panic!("streamer::receiver set_read_timeout error");
}
Builder::new()
.name("solana-receiver".to_string())
.spawn(move || {
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender, sender_tag);
let _ = recv_loop(&sock, &exit, &packet_sender, sender_tag);
()
}).unwrap()
}
@ -81,11 +79,11 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize,
let msgs = recvr.recv_timeout(timer)?;
let recv_start = Instant::now();
trace!("got msgs");
let mut len = msgs.read().packets.len();
let mut len = msgs.read().unwrap().packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
len += more.read().packets.len();
len += more.read().unwrap().packets.len();
batch.push(more);
if len > 100_000 {
@ -112,9 +110,9 @@ pub fn responder(name: &'static str, sock: Arc<UdpSocket>, r: BlobReceiver) -> J
//TODO, we would need to stick block authentication before we create the
//window.
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
fn recv_blobs(sock: &UdpSocket, s: &BlobSender) -> Result<()> {
trace!("recv_blobs: receiving on {}", sock.local_addr().unwrap());
let dq = Blob::recv_from(recycler, sock)?;
let dq = Blob::recv_from(sock)?;
if !dq.is_empty() {
s.send(dq)?;
}
@ -127,20 +125,19 @@ pub fn blob_receiver(sock: Arc<UdpSocket>, exit: Arc<AtomicBool>, s: BlobSender)
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))
.expect("set socket timeout");
let recycler = BlobRecycler::default();
Builder::new()
.name("solana-blob_receiver".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
let _ = recv_blobs(&sock, &s);
}).unwrap()
}
#[cfg(test)]
mod test {
use packet::{Blob, BlobRecycler, Packet, Packets, PACKET_DATA_SIZE};
use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
use std::io;
use std::io::Write;
use std::net::UdpSocket;
@ -155,7 +152,7 @@ mod test {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => *num += m.read().packets.len(),
Ok(m) => *num += m.read().unwrap().packets.len(),
_ => info!("get_msgs error"),
}
if *num == 10 {
@ -177,7 +174,6 @@ mod test {
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader, "streamer-test");
let t_responder = {
@ -185,9 +181,9 @@ mod test {
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);
let mut msgs = Vec::new();
for i in 0..10 {
let mut b = resp_recycler.allocate();
let mut b = SharedBlob::default();
{
let mut w = b.write();
let mut w = b.write().unwrap();
w.data[0] = i as u8;
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);

View File

@ -157,7 +157,7 @@ pub mod tests {
use logger;
use mint::Mint;
use ncp::Ncp;
use packet::BlobRecycler;
use packet::SharedBlob;
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
@ -209,7 +209,6 @@ pub mod tests {
// setup some blob services to send blobs into the socket
// to simulate the source peer and get blobs out of the socket to
// simulate target peer
let recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> = target2
.sockets
@ -279,9 +278,9 @@ pub mod tests {
alice_ref_balance -= transfer_amount;
for entry in vec![entry0, entry1] {
let mut b = recycler.allocate();
let mut b = SharedBlob::default();
{
let mut w = b.write();
let mut w = b.write().unwrap();
w.set_index(blob_id).unwrap();
blob_id += 1;
w.set_id(leader_id).unwrap();

View File

@ -9,7 +9,7 @@ use hash::Hash;
use influx_db_client as influxdb;
use log::Level;
use metrics;
use packet::{BlobRecycler, SharedBlob};
use packet::SharedBlob;
use result::Result;
use signature::Keypair;
use solana_program_interface::pubkey::Pubkey;
@ -31,9 +31,8 @@ pub fn create_new_signed_vote_blob(
last_id: &Hash,
keypair: &Keypair,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
) -> Result<SharedBlob> {
let shared_blob = blob_recycler.allocate();
let shared_blob = SharedBlob::default();
let (vote, addr) = {
let mut wcrdt = crdt.write().unwrap();
//TODO: doesn't seem like there is a synchronous call to get height and id
@ -42,7 +41,7 @@ pub fn create_new_signed_vote_blob(
}?;
let tx = Transaction::budget_new_vote(&keypair, vote, *last_id, 0);
{
let mut blob = shared_blob.write();
let mut blob = shared_blob.write().unwrap();
let bytes = serialize(&tx)?;
let len = bytes.len();
blob.data[..len].copy_from_slice(&bytes);
@ -109,7 +108,6 @@ pub fn send_leader_vote(
keypair: &Keypair,
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
vote_blob_sender: &BlobSender,
last_vote: &mut u64,
last_valid_validator_timestamp: &mut u64,
@ -125,9 +123,7 @@ pub fn send_leader_vote(
last_vote,
last_valid_validator_timestamp,
) {
if let Ok(shared_blob) =
create_new_signed_vote_blob(&last_id, keypair, crdt, blob_recycler)
{
if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt) {
vote_blob_sender.send(vec![shared_blob])?;
let finality_ms = now - super_majority_timestamp;
@ -152,11 +148,10 @@ pub fn send_validator_vote(
bank: &Arc<Bank>,
keypair: &Arc<Keypair>,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
vote_blob_sender: &BlobSender,
) -> Result<()> {
let last_id = bank.last_id();
if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt, blob_recycler) {
if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt) {
inc_new_counter_info!("replicate-vote_sent", 1);
vote_blob_sender.send(vec![shared_blob])?;
@ -228,7 +223,6 @@ pub mod tests {
leader_crdt.insert_vote(&validator.id, &vote, entry.id);
}
let leader = Arc::new(RwLock::new(leader_crdt));
let blob_recycler = BlobRecycler::default();
let (vote_blob_sender, vote_blob_receiver) = channel();
let mut last_vote: u64 = timing::timestamp() - VOTE_TIMEOUT_MS - 1;
let mut last_valid_validator_timestamp = 0;
@ -237,7 +231,6 @@ pub mod tests {
&mint.keypair(),
&bank,
&leader,
&blob_recycler,
&vote_blob_sender,
&mut last_vote,
&mut last_valid_validator_timestamp,
@ -277,7 +270,6 @@ pub mod tests {
&mint.keypair(),
&bank,
&leader,
&blob_recycler,
&vote_blob_sender,
&mut last_vote,
&mut last_valid_validator_timestamp,
@ -292,7 +284,7 @@ pub mod tests {
// vote should be valid
let blob = &vote_blob.unwrap()[0];
let tx = deserialize(&(blob.read().data)).unwrap();
let tx = deserialize(&(blob.read().unwrap().data)).unwrap();
assert!(bank.process_transaction(&tx).is_ok());
}

View File

@ -7,7 +7,7 @@ use entry::Entry;
use erasure;
use ledger::{reconstruct_entries_from_blobs, Block};
use log::Level;
use packet::{BlobRecycler, SharedBlob};
use packet::SharedBlob;
use result::Result;
use solana_program_interface::pubkey::Pubkey;
use std::cmp;
@ -28,7 +28,7 @@ pub struct WindowSlot {
impl WindowSlot {
fn blob_index(&self) -> Option<u64> {
match self.data {
Some(ref blob) => blob.read().get_index().ok(),
Some(ref blob) => blob.read().unwrap().get_index().ok(),
None => None,
}
}
@ -70,7 +70,6 @@ pub trait WindowUtil {
blob: SharedBlob,
pix: u64,
consume_queue: &mut Vec<Entry>,
recycler: &BlobRecycler,
consumed: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
@ -200,7 +199,6 @@ impl WindowUtil for Window {
blob: SharedBlob,
pix: u64,
consume_queue: &mut Vec<Entry>,
recycler: &BlobRecycler,
consumed: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
@ -208,7 +206,7 @@ impl WindowUtil for Window {
) {
let w = (pix % WINDOW_SIZE) as usize;
let is_coding = blob.read().is_coding();
let is_coding = blob.read().unwrap().is_coding();
// insert a newly received blob into a window slot, clearing out and recycling any previous
// blob unless the incoming blob is a duplicate (based on idx)
@ -221,7 +219,7 @@ impl WindowUtil for Window {
c_or_d: &str,
) -> bool {
if let Some(old) = mem::replace(window_slot, Some(blob)) {
let is_dup = old.read().get_index().unwrap() == pix;
let is_dup = old.read().unwrap().get_index().unwrap() == pix;
trace!(
"{}: occupied {} window slot {:}, is_dup: {}",
id,
@ -250,21 +248,9 @@ impl WindowUtil for Window {
self[w].leader_unknown = leader_unknown;
*pending_retransmits = true;
#[cfg(not(feature = "erasure"))]
{
// suppress warning: unused variable: `recycler`
let _ = recycler;
}
#[cfg(feature = "erasure")]
{
if erasure::recover(
id,
recycler,
self,
*consumed,
(*consumed % WINDOW_SIZE) as usize,
).is_err()
{
if erasure::recover(id, self, *consumed, (*consumed % WINDOW_SIZE) as usize).is_err() {
trace!("{}: erasure::recover failed", id);
}
}
@ -289,7 +275,7 @@ impl WindowUtil for Window {
let k_data_blob;
let k_data_slot = &mut self[k].data;
if let Some(blob) = k_data_slot {
if blob.read().get_index().unwrap() < *consumed {
if blob.read().unwrap().get_index().unwrap() < *consumed {
// window wrap-around, end of received
break;
}
@ -389,7 +375,7 @@ pub fn index_blobs(
trace!("{}: INDEX_BLOBS {}", node_info.id, blobs.len());
for (i, b) in blobs.iter().enumerate() {
// only leader should be broadcasting
let mut blob = b.write();
let mut blob = b.write().unwrap();
blob.set_id(node_info.id)
.expect("set_id in pub fn broadcast");
blob.set_index(*receive_index + i as u64)
@ -426,7 +412,7 @@ pub fn initialized_window(
// populate the window, offset by implied index
let diff = cmp::max(blobs.len() as isize - window.len() as isize, 0) as usize;
for b in blobs.into_iter().skip(diff) {
let ix = b.read().get_index().expect("blob index");
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix % WINDOW_SIZE) as usize;
trace!("{} caching {} at {}", id, ix, pos);
assert!(window[pos].data.is_none());
@ -442,14 +428,13 @@ pub fn new_window_from_entries(
node_info: &NodeInfo,
) -> Window {
// convert to blobs
let blob_recycler = BlobRecycler::default();
let blobs = ledger_tail.to_blobs(&blob_recycler);
let blobs = ledger_tail.to_blobs();
initialized_window(&node_info, blobs, entry_height)
}
#[cfg(test)]
mod test {
use packet::{Blob, BlobRecycler, Packet, Packets, PACKET_DATA_SIZE};
use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
use solana_program_interface::pubkey::Pubkey;
use std::io;
use std::io::Write;
@ -465,7 +450,7 @@ mod test {
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => *num += m.read().packets.len(),
Ok(m) => *num += m.read().unwrap().packets.len(),
e => info!("error {:?}", e),
}
if *num == 10 {
@ -487,7 +472,6 @@ mod test {
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = receiver(
Arc::new(read),
@ -500,9 +484,9 @@ mod test {
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);
let mut msgs = Vec::new();
for i in 0..10 {
let mut b = resp_recycler.allocate();
let mut b = SharedBlob::default();
{
let mut w = b.write();
let mut w = b.write().unwrap();
w.data[0] = i as u8;
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);

View File

@ -4,7 +4,7 @@ use counter::Counter;
use crdt::{Crdt, NodeInfo};
use entry::EntrySender;
use log::Level;
use packet::{BlobRecycler, SharedBlob};
use packet::SharedBlob;
use rand::{thread_rng, Rng};
use result::{Error, Result};
use solana_program_interface::pubkey::Pubkey;
@ -49,10 +49,9 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
fn add_block_to_retransmit_queue(
b: &SharedBlob,
leader_id: Pubkey,
recycler: &BlobRecycler,
retransmit_queue: &mut Vec<SharedBlob>,
) {
let p = b.read();
let p = b.read().unwrap();
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
trace!(
@ -73,11 +72,9 @@ fn add_block_to_retransmit_queue(
//otherwise we get into races with which thread
//should do the recycling
//
//a better abstraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
let nv = SharedBlob::default();
{
let mut mnv = nv.write();
let mut mnv = nv.write().unwrap();
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
@ -91,7 +88,6 @@ fn retransmit_all_leader_blocks(
maybe_leader: Option<NodeInfo>,
dq: &[SharedBlob],
id: &Pubkey,
recycler: &BlobRecycler,
consumed: u64,
received: u64,
retransmit: &BlobSender,
@ -101,7 +97,7 @@ fn retransmit_all_leader_blocks(
if let Some(leader) = maybe_leader {
let leader_id = leader.id;
for b in dq {
add_block_to_retransmit_queue(b, leader_id, recycler, &mut retransmit_queue);
add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
}
if *pending_retransmits {
@ -113,12 +109,7 @@ fn retransmit_all_leader_blocks(
*pending_retransmits = false;
if w.leader_unknown {
if let Some(ref b) = w.data {
add_block_to_retransmit_queue(
b,
leader_id,
recycler,
&mut retransmit_queue,
);
add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
w.leader_unknown = false;
}
}
@ -146,7 +137,6 @@ fn recv_window(
window: &SharedWindow,
id: &Pubkey,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
consumed: &mut u64,
received: &mut u64,
max_ix: u64,
@ -183,7 +173,6 @@ fn recv_window(
maybe_leader,
&dq,
id,
recycler,
*consumed,
*received,
retransmit,
@ -195,7 +184,7 @@ fn recv_window(
let mut consume_queue = Vec::new();
for b in dq {
let (pix, meta_size) = {
let p = b.read();
let p = b.read().unwrap();
(p.get_index()?, p.meta.size)
};
pixs.push(pix);
@ -219,7 +208,6 @@ fn recv_window(
b,
pix,
&mut consume_queue,
recycler,
consumed,
leader_unknown,
pending_retransmits,
@ -276,7 +264,6 @@ pub fn window_service(
leader_rotation_interval = rcrdt.get_leader_rotation_interval();
}
let mut pending_retransmits = false;
let recycler = BlobRecycler::default();
trace!("{}: RECV_WINDOW started", id);
loop {
if consumed != 0 && consumed % (leader_rotation_interval as u64) == 0 {
@ -297,7 +284,6 @@ pub fn window_service(
&window,
&id,
&crdt,
&recycler,
&mut consumed,
&mut received,
max_entry_height,
@ -354,7 +340,7 @@ mod test {
use entry::Entry;
use hash::Hash;
use logger;
use packet::{make_consecutive_blobs, BlobRecycler, PACKET_DATA_SIZE};
use packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE};
use signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
@ -390,7 +376,6 @@ mod test {
crdt_me.set_leader(me_id);
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
let (s_window, r_window) = channel();
@ -416,15 +401,11 @@ mod test {
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut num_blobs_to_make = 10;
let gossip_address = &tn.info.contact_info.ncp;
let msgs = make_consecutive_blobs(
me_id,
num_blobs_to_make,
Hash::default(),
&gossip_address,
&resp_recycler,
).into_iter()
.rev()
.collect();;
let msgs =
make_consecutive_blobs(me_id, num_blobs_to_make, Hash::default(), &gossip_address)
.into_iter()
.rev()
.collect();;
s_responder.send(msgs).expect("send");
t_responder
};
@ -452,7 +433,6 @@ mod test {
let me_id = crdt_me.my_data().id;
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
let (s_window, _r_window) = channel();
@ -478,9 +458,9 @@ mod test {
let mut msgs = Vec::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
let b = SharedBlob::default();
{
let mut w = b.write();
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
@ -509,7 +489,6 @@ mod test {
let me_id = crdt_me.my_data().id;
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
let (s_window, _r_window) = channel();
@ -535,9 +514,9 @@ mod test {
let mut msgs = Vec::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
let b = SharedBlob::default();
{
let mut w = b.write();
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
@ -555,9 +534,9 @@ mod test {
let mut msgs1 = Vec::new();
for v in 1..5 {
let i = 9 + v;
let b = resp_recycler.allocate();
let b = SharedBlob::default();
{
let mut w = b.write();
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap());
@ -631,7 +610,6 @@ mod test {
let subs = Arc::new(RwLock::new(crdt_me));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
let (s_window, _r_window) = channel();
@ -667,15 +645,11 @@ mod test {
let extra_blobs = leader_rotation_interval;
let total_blobs_to_send =
my_leader_begin_epoch * leader_rotation_interval + extra_blobs;
let msgs = make_consecutive_blobs(
me_id,
total_blobs_to_send,
Hash::default(),
&ncp_address,
&resp_recycler,
).into_iter()
.rev()
.collect();;
let msgs =
make_consecutive_blobs(me_id, total_blobs_to_send, Hash::default(), &ncp_address)
.into_iter()
.rev()
.collect();;
s_responder.send(msgs).expect("send");
t_responder
};

View File

@ -8,7 +8,6 @@ use crdt::Crdt;
use entry::Entry;
use ledger::{Block, LedgerWriter};
use log::Level;
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use signature::Keypair;
@ -207,7 +206,6 @@ impl WriteStage {
leader_rotation_interval = rcrdt.get_leader_rotation_interval();
}
let mut entry_height = entry_height;
let blob_recycler = BlobRecycler::default();
loop {
// Note that entry height is not zero indexed, it starts at 1, so the
// old leader is in power up to and including entry height
@ -258,7 +256,6 @@ impl WriteStage {
&keypair,
&bank,
&crdt,
&blob_recycler,
&vote_blob_sender,
&mut last_vote,
&mut last_valid_validator_timestamp,

View File

@ -7,7 +7,7 @@ use rayon::iter::*;
use solana::crdt::{Crdt, Node};
use solana::logger;
use solana::ncp::Ncp;
use solana::packet::{Blob, BlobRecycler};
use solana::packet::{Blob, SharedBlob};
use solana::result;
use solana::service::Service;
use std::net::UdpSocket;
@ -159,9 +159,8 @@ pub fn crdt_retransmit() -> result::Result<()> {
sleep(Duration::new(1, 0));
}
assert!(done);
let r = BlobRecycler::default();
let b = r.allocate();
b.write().meta.size = 10;
let b = SharedBlob::default();
b.write().unwrap().meta.size = 10;
Crdt::retransmit(&c1, &b, &tn1)?;
let res: Vec<_> = [tn1, tn2, tn3]
.into_par_iter()