Clippy review

This commit is contained in:
Greg Fitzgerald
2018-03-22 14:31:58 -06:00
parent 383d445ba1
commit fad7ff8bf0
2 changed files with 28 additions and 28 deletions

View File

@ -98,7 +98,7 @@ impl AccountantSkel {
let msgs = r_reader.recv_timeout(timer)?; let msgs = r_reader.recv_timeout(timer)?;
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
let msgs__ = msgs.clone(); let msgs__ = msgs.clone();
let rsps = streamer::allocate(recycler.clone()); let rsps = streamer::allocate(&recycler);
let rsps_ = rsps.clone(); let rsps_ = rsps.clone();
let l = msgs__.read().unwrap().packets.len(); let l = msgs__.read().unwrap().packets.len();
rsps.write() rsps.write()
@ -124,7 +124,7 @@ impl AccountantSkel {
ursps.packets.resize(num, streamer::Packet::default()); ursps.packets.resize(num, streamer::Packet::default());
} }
s_sender.send(rsps_)?; s_sender.send(rsps_)?;
streamer::recycle(recycler, msgs_); streamer::recycle(&recycler, msgs_);
Ok(()) Ok(())
} }

View File

@ -65,8 +65,8 @@ impl Packet {
} }
pub fn set_addr(&mut self, a: &SocketAddr) { pub fn set_addr(&mut self, a: &SocketAddr) {
match a { match *a {
&SocketAddr::V4(v4) => { SocketAddr::V4(v4) => {
let ip = v4.ip().octets(); let ip = v4.ip().octets();
self.addr[0] = ip[0] as u16; self.addr[0] = ip[0] as u16;
self.addr[1] = ip[1] as u16; self.addr[1] = ip[1] as u16;
@ -74,7 +74,7 @@ impl Packet {
self.addr[3] = ip[3] as u16; self.addr[3] = ip[3] as u16;
self.port = a.port(); self.port = a.port();
} }
&SocketAddr::V6(v6) => { SocketAddr::V6(v6) => {
self.addr = v6.ip().segments(); self.addr = v6.ip().segments();
self.port = a.port(); self.port = a.port();
self.v6 = true; self.v6 = true;
@ -102,7 +102,7 @@ impl PacketData {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> { fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
self.packets.resize(BLOCK_SIZE, Packet::default()); self.packets.resize(BLOCK_SIZE, Packet::default());
let mut i = 0; let mut i = 0;
for p in self.packets.iter_mut() { for p in &mut self.packets {
p.size = 0; p.size = 0;
match socket.recv_from(&mut p.data) { match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => { Err(_) if i > 0 => {
@ -131,7 +131,7 @@ impl PacketData {
Ok(()) Ok(())
} }
fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> {
for p in self.packets.iter() { for p in &self.packets {
let a = p.get_addr(); let a = p.get_addr();
socket.send_to(&p.data[0..p.size], &a)?; socket.send_to(&p.data[0..p.size], &a)?;
//TODO(anatoly): wtf do we do about errors? //TODO(anatoly): wtf do we do about errors?
@ -141,35 +141,35 @@ impl PacketData {
} }
} }
pub fn allocate(recycler: Recycler) -> SharedPacketData { pub fn allocate(recycler: &Recycler) -> SharedPacketData {
let mut gc = recycler.lock().expect("lock"); let mut gc = recycler.lock().expect("lock");
gc.pop() gc.pop()
.unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new()))) .unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new())))
} }
pub fn recycle(recycler: Recycler, msgs: SharedPacketData) { pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) {
let mut gc = recycler.lock().expect("lock"); let mut gc = recycler.lock().expect("lock");
gc.push(msgs); gc.push(msgs);
} }
fn recv_loop( fn recv_loop(
sock: &UdpSocket, sock: &UdpSocket,
exit: Arc<AtomicBool>, exit: &Arc<AtomicBool>,
recycler: Recycler, recycler: &Recycler,
channel: Sender, channel: &Sender,
) -> Result<()> { ) -> Result<()> {
loop { loop {
let msgs = allocate(recycler.clone()); let msgs = allocate(recycler);
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
loop { loop {
match msgs.write().unwrap().read_from(&sock) { match msgs.write().unwrap().read_from(sock) {
Ok(()) => { Ok(()) => {
channel.send(msgs_)?; channel.send(msgs_)?;
break; break;
} }
Err(_) => { Err(_) => {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
recycle(recycler.clone(), msgs_); recycle(recycler, msgs_);
return Ok(()); return Ok(());
} }
} }
@ -187,12 +187,12 @@ pub fn receiver(
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?; sock.set_read_timeout(Some(timer))?;
Ok(spawn(move || { Ok(spawn(move || {
let _ = recv_loop(&sock, exit, recycler, channel); let _ = recv_loop(&sock, &exit, &recycler, &channel);
() ()
})) }))
} }
fn recv_send(sock: &UdpSocket, recycler: Recycler, r: &Receiver) -> Result<()> { fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let msgs = r.recv_timeout(timer)?; let msgs = r.recv_timeout(timer)?;
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
@ -209,7 +209,7 @@ pub fn sender(
r: Receiver, r: Receiver,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
if recv_send(&sock, recycler.clone(), &r).is_err() && exit.load(Ordering::Relaxed) { if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
break; break;
} }
}) })
@ -229,9 +229,9 @@ mod bench {
use result::Result; use result::Result;
use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE}; use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE};
fn producer(addr: &SocketAddr, recycler: Recycler, exit: Arc<AtomicBool>) -> JoinHandle<()> { fn producer(addr: &SocketAddr, recycler: &Recycler, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap(); let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let msgs = allocate(recycler.clone()); let msgs = allocate(recycler);
msgs.write().unwrap().packets.resize(10, Packet::default()); msgs.write().unwrap().packets.resize(10, Packet::default());
for w in msgs.write().unwrap().packets.iter_mut() { for w in msgs.write().unwrap().packets.iter_mut() {
w.size = PACKET_SIZE; w.size = PACKET_SIZE;
@ -248,7 +248,7 @@ mod bench {
} }
fn sinc( fn sinc(
recycler: Recycler, recycler: &Recycler,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
rvs: Arc<Mutex<usize>>, rvs: Arc<Mutex<usize>>,
r: Receiver, r: Receiver,
@ -262,7 +262,7 @@ mod bench {
Ok(msgs) => { Ok(msgs) => {
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); *rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
recycle(recycler.clone(), msgs_); recycle(recycler, msgs_);
} }
_ => (), _ => (),
} }
@ -275,10 +275,10 @@ mod bench {
let recycler = Arc::new(Mutex::new(Vec::new())); let recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?; let t_reader = receiver(read, exit.clone(), &recycler, s_reader)?;
let t_producer1 = producer(&addr, recycler.clone(), exit.clone()); let t_producer1 = producer(&addr, &recycler, exit.clone());
let t_producer2 = producer(&addr, recycler.clone(), exit.clone()); let t_producer2 = producer(&addr, &recycler, exit.clone());
let t_producer3 = producer(&addr, recycler.clone(), exit.clone()); let t_producer3 = producer(&addr, &recycler, exit.clone());
let rvs = Arc::new(Mutex::new(0)); let rvs = Arc::new(Mutex::new(0));
let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader); let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader);
@ -341,7 +341,7 @@ mod test {
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
let (s_sender, r_sender) = channel(); let (s_sender, r_sender) = channel();
let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender);
let msgs = allocate(recycler.clone()); let msgs = allocate(&recycler);
msgs.write().unwrap().packets.resize(10, Packet::default()); msgs.write().unwrap().packets.resize(10, Packet::default());
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
w.data[0] = i as u8; w.data[0] = i as u8;
@ -372,7 +372,7 @@ mod test {
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
let (s_sender, r_sender) = channel(); let (s_sender, r_sender) = channel();
let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender);
let msgs = allocate(recycler.clone()); let msgs = allocate(&recycler);
msgs.write().unwrap().packets.resize(10, Packet::default()); msgs.write().unwrap().packets.resize(10, Packet::default());
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
w.data[0] = i as u8; w.data[0] = i as u8;