Use a Drop trait to keep track of lifetimes for recycled objects.

* Move recycler instances to the point of allocation
* sinks no longer need to call `recycle`
* Remove the recycler arguments from all the apis that no longer need them
This commit is contained in:
Anatoly Yakovenko
2018-09-18 08:02:57 -07:00
committed by Greg Fitzgerald
parent 6732a9078d
commit 431692d9d0
32 changed files with 414 additions and 733 deletions

View File

@ -4,6 +4,7 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use counter::Counter;
use log::Level;
use recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
use recycler;
use result::{Error, Result};
use serde::Serialize;
use signature::Pubkey;
@ -11,14 +12,13 @@ use std::fmt;
use std::io;
use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::AtomicUsize;
pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>;
pub type SharedPackets = recycler::Recyclable<Packets>;
pub type SharedBlob = recycler::Recyclable<Blob>;
pub type SharedBlobs = Vec<SharedBlob>;
pub type PacketRecycler = Recycler<Packets>;
pub type BlobRecycler = Recycler<Blob>;
pub type PacketRecycler = recycler::Recycler<Packets>;
pub type BlobRecycler = recycler::Recycler<Blob>;
pub const NUM_PACKETS: usize = 1024 * 8;
pub const BLOB_SIZE: usize = (64 * 1024 - 128); // wikipedia says there should be 20b for ipv4 headers
@ -63,14 +63,7 @@ impl Default for Packet {
}
}
pub trait Reset {
// Reset trait is an object that can re-initialize important parts
// of itself, similar to Default, but not necessarily a full clear
// also, we do it in-place.
fn reset(&mut self);
}
impl Reset for Packet {
impl recycler::Reset for Packet {
fn reset(&mut self) {
self.meta = Meta::default();
}
@ -130,7 +123,7 @@ impl Default for Packets {
}
}
impl Reset for Packets {
impl recycler::Reset for Packets {
fn reset(&mut self) {
for i in 0..self.packets.len() {
self.packets[i].reset();
@ -165,7 +158,7 @@ impl Default for Blob {
}
}
impl Reset for Blob {
impl recycler::Reset for Blob {
fn reset(&mut self) {
self.meta = Meta::default();
self.data[..BLOB_HEADER_SIZE].copy_from_slice(&[0u8; BLOB_HEADER_SIZE]);
@ -178,118 +171,6 @@ pub enum BlobError {
BadState,
}
pub struct Recycler<T> {
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
gc: Arc<Mutex<Vec<(Arc<RwLock<T>>, &'static str)>>>,
allocated_count: Arc<AtomicUsize>,
recycled_count: Arc<AtomicUsize>,
reuse_count: Arc<AtomicUsize>,
skipped_count: Arc<AtomicUsize>,
name: String,
}
impl<T: Default> Default for Recycler<T> {
fn default() -> Recycler<T> {
Recycler {
gc: Arc::new(Mutex::new(vec![])),
allocated_count: Arc::new(AtomicUsize::new(0)),
recycled_count: Arc::new(AtomicUsize::new(0)),
reuse_count: Arc::new(AtomicUsize::new(0)),
skipped_count: Arc::new(AtomicUsize::new(0)),
name: format!("? sz: {}", size_of::<T>()).to_string(),
}
}
}
impl<T: Default> Clone for Recycler<T> {
fn clone(&self) -> Recycler<T> {
Recycler {
gc: self.gc.clone(),
allocated_count: self.allocated_count.clone(),
recycled_count: self.recycled_count.clone(),
reuse_count: self.reuse_count.clone(),
skipped_count: self.skipped_count.clone(),
name: self.name.clone(),
}
}
}
fn inc_counter(x: &AtomicUsize) {
x.fetch_add(1, Ordering::Relaxed);
}
impl<T: Default + Reset> Recycler<T> {
pub fn set_name(&mut self, name: &'static str) {
self.name = name.to_string();
}
pub fn allocate(&self) -> Arc<RwLock<T>> {
let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate");
let gc_count = gc.len();
loop {
if let Some((x, who)) = gc.pop() {
// Only return the item if this recycler is the last reference to it.
// Remove this check once `T` holds a Weak reference back to this
// recycler and implements `Drop`. At the time of this writing, Weak can't
// be passed across threads ('alloc' is a nightly-only API), and so our
// reference-counted recyclables are awkwardly being recycled by hand,
// which allows this race condition to exist.
if Arc::strong_count(&x) > 1 {
// Commenting out this message, is annoying for known use case of
// validator hanging onto a blob in the window, but also sending it over
// to retransmmit_request
//
// warn!("Recycled item still in use. Booting it.");
trace!(
"{} Recycled item from \"{}\" still in use. {} Booting it.",
self.name,
who,
Arc::strong_count(&x)
);
inc_counter(&self.skipped_count);
continue;
}
{
let mut w = x.write().unwrap();
w.reset();
}
inc_counter(&self.reuse_count);
return x;
} else {
inc_counter(&self.allocated_count);
if self.allocated_count.load(Ordering::Relaxed) % 2048 == 0 {
self.print_stats(gc_count);
}
return Arc::new(RwLock::new(Default::default()));
}
}
}
fn print_stats(&self, gc_count: usize) {
info!(
"{} recycler stats: allocated: {} reused: {} skipped: {} recycled: {} gc_count: {}",
self.name,
self.allocated_count.load(Ordering::Relaxed),
self.reuse_count.load(Ordering::Relaxed),
self.skipped_count.load(Ordering::Relaxed),
self.recycled_count.load(Ordering::Relaxed),
gc_count
);
}
pub fn recycle(&self, x: Arc<RwLock<T>>, who: &'static str) {
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
inc_counter(&self.recycled_count);
if self.recycled_count.load(Ordering::Relaxed) % 2048 == 0 {
self.print_stats(0);
}
gc.push((x, who));
}
}
impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
self.packets.resize(NUM_PACKETS, Packet::default());
@ -348,12 +229,9 @@ pub fn to_packets_chunked<T: Serialize>(
) -> Vec<SharedPackets> {
let mut out = vec![];
for x in xs.chunks(chunks) {
let p = r.allocate();
p.write()
.unwrap()
.packets
.resize(x.len(), Default::default());
for (i, o) in x.iter().zip(p.write().unwrap().packets.iter_mut()) {
let mut p = r.allocate();
p.write().packets.resize(x.len(), Default::default());
for (i, o) in x.iter().zip(p.write().packets.iter_mut()) {
let v = serialize(&i).expect("serialize request");
let len = v.len();
o.data[..len].copy_from_slice(&v);
@ -375,7 +253,7 @@ pub fn to_blob<T: Serialize>(
) -> Result<SharedBlob> {
let blob = blob_recycler.allocate();
{
let mut b = blob.write().unwrap();
let mut b = blob.write();
let v = serialize(&resp)?;
let len = v.len();
assert!(len <= BLOB_SIZE);
@ -492,7 +370,7 @@ impl Blob {
}
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
let mut p = r.write();
trace!("receiving on {}", socket.local_addr().unwrap());
let (nrecv, from) = socket.recv_from(&mut p.data)?;
@ -517,14 +395,12 @@ impl Blob {
match Blob::recv_blob(socket, &r) {
Err(_) if i > 0 => {
trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
re.recycle(r, "Bob::recv_from::i>0");
break;
}
Err(e) => {
if e.kind() != io::ErrorKind::WouldBlock {
info!("recv_from err {:?}", e);
}
re.recycle(r, "Blob::recv_from::empty");
return Err(Error::IO(e));
}
Ok(()) => if i == 0 {
@ -535,10 +411,10 @@ impl Blob {
}
Ok(v)
}
pub fn send_to(re: &BlobRecycler, socket: &UdpSocket, v: SharedBlobs) -> Result<()> {
pub fn send_to(socket: &UdpSocket, v: SharedBlobs) -> Result<()> {
for r in v {
{
let p = r.read().expect("'r' read lock in pub fn send_to");
let p = r.read();
let a = p.meta.addr();
if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) {
warn!(
@ -548,7 +424,6 @@ impl Blob {
Err(e)?;
}
}
re.recycle(r, "send_to");
}
Ok(())
}
@ -557,87 +432,15 @@ impl Blob {
#[cfg(test)]
mod tests {
use packet::{
to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, Recycler, Reset,
BLOB_HEADER_SIZE, NUM_PACKETS, PACKET_DATA_SIZE,
to_packets, Blob, BlobRecycler, Meta, Packet, PacketRecycler, Packets, BLOB_HEADER_SIZE,
NUM_PACKETS, PACKET_DATA_SIZE,
};
use recycler::Reset;
use request::Request;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::Arc;
#[test]
pub fn packet_recycler_test() {
let r = PacketRecycler::default();
let p = r.allocate();
r.recycle(p, "recycler_test");
assert_eq!(r.gc.lock().unwrap().len(), 1);
let _ = r.allocate();
assert_eq!(r.gc.lock().unwrap().len(), 0);
}
impl Reset for u8 {
fn reset(&mut self) {
*self = Default::default();
}
}
#[test]
pub fn test_leaked_recyclable() {
// Ensure that the recycler won't return an item
// that is still referenced outside the recycler.
let r = Recycler::<u8>::default();
let x0 = r.allocate();
r.recycle(x0.clone(), "leaked_recyclable:1");
assert_eq!(Arc::strong_count(&x0), 2);
assert_eq!(r.gc.lock().unwrap().len(), 1);
let x1 = r.allocate();
assert_eq!(Arc::strong_count(&x1), 1);
assert_eq!(r.gc.lock().unwrap().len(), 0);
}
#[test]
pub fn test_leaked_recyclable_recursion() {
// In the case of a leaked recyclable, ensure the recycler drops its lock before recursing.
let r = Recycler::<u8>::default();
let x0 = r.allocate();
let x1 = r.allocate();
r.recycle(x0, "leaked_recyclable_recursion:1"); // <-- allocate() of this will require locking the recycler's stack.
r.recycle(x1.clone(), "leaked_recyclable_recursion:2"); // <-- allocate() of this will cause it to be dropped and recurse.
assert_eq!(Arc::strong_count(&x1), 2);
assert_eq!(r.gc.lock().unwrap().len(), 2);
r.allocate(); // Ensure lock is released before recursing.
assert_eq!(r.gc.lock().unwrap().len(), 0);
}
#[test]
pub fn test_recycling_is_happening() {
// Test the case in allocate() which should return a re-used object and not allocate a new
// one.
let r = PacketRecycler::default();
let x0 = r.allocate();
{
x0.write().unwrap().packets.resize(1, Packet::default());
}
r.recycle(x0, "recycle");
let x1 = r.allocate();
assert_ne!(
x1.read().unwrap().packets.len(),
Packets::default().packets.len()
);
}
#[test]
pub fn blob_recycler_test() {
let r = BlobRecycler::default();
let p = r.allocate();
r.recycle(p, "blob_recycler_test");
assert_eq!(r.gc.lock().unwrap().len(), 1);
let _ = r.allocate();
assert_eq!(r.gc.lock().unwrap().len(), 0);
}
#[test]
pub fn packet_send_recv() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
@ -646,14 +449,14 @@ mod tests {
let saddr = sender.local_addr().unwrap();
let r = PacketRecycler::default();
let p = r.allocate();
p.write().unwrap().packets.resize(10, Packet::default());
for m in p.write().unwrap().packets.iter_mut() {
p.write().packets.resize(10, Packet::default());
for m in p.write().packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.size = PACKET_DATA_SIZE;
}
p.read().unwrap().send_to(&sender).unwrap();
p.write().unwrap().recv_from(&reader).unwrap();
for m in p.write().unwrap().packets.iter_mut() {
p.read().send_to(&sender).unwrap();
p.write().recv_from(&reader).unwrap();
for m in p.write().packets.iter_mut() {
assert_eq!(m.meta.size, PACKET_DATA_SIZE);
assert_eq!(m.meta.addr(), saddr);
}
@ -667,16 +470,16 @@ mod tests {
let re = PacketRecycler::default();
let rv = to_packets(&re, &vec![tx.clone(); 1]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), 1);
assert_eq!(rv[0].read().packets.len(), 1);
let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
assert_eq!(rv[0].read().packets.len(), NUM_PACKETS);
let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS + 1]);
assert_eq!(rv.len(), 2);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
assert_eq!(rv[0].read().packets.len(), NUM_PACKETS);
assert_eq!(rv[1].read().packets.len(), 1);
}
#[test]
@ -687,15 +490,15 @@ mod tests {
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let r = BlobRecycler::default();
let p = r.allocate();
p.write().unwrap().meta.set_addr(&addr);
p.write().unwrap().meta.size = 1024;
p.write().meta.set_addr(&addr);
p.write().meta.size = 1024;
let v = vec![p];
Blob::send_to(&r, &sender, v).unwrap();
Blob::send_to(&sender, v).unwrap();
trace!("send_to");
let rv = Blob::recv_from(&r, &reader).unwrap();
trace!("recv_from");
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].write().unwrap().meta.size, 1024);
assert_eq!(rv[0].read().meta.size, 1024);
}
#[cfg(all(feature = "ipv6", test))]
@ -706,14 +509,14 @@ mod tests {
let sender = UdpSocket::bind("[::1]:0").expect("bind");
let r = BlobRecycler::default();
let p = r.allocate();
p.write().unwrap().meta.set_addr(&addr);
p.write().unwrap().meta.size = 1024;
p.as_mut().meta.set_addr(&addr);
p.as_mut().meta.size = 1024;
let mut v = VecDeque::default();
v.push_back(p);
Blob::send_to(&r, &sender, &mut v).unwrap();
let mut rv = Blob::recv_from(&r, &reader).unwrap();
let rp = rv.pop_front().unwrap();
assert_eq!(rp.write().unwrap().meta.size, 1024);
assert_eq!(rp.as_mut().meta.size, 1024);
r.recycle(rp, "blob_ip6_send_recv");
}