diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index a040740834..f748c0a9e8 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -98,7 +98,7 @@ impl AccountantSkel { let msgs = r_reader.recv_timeout(timer)?; let msgs_ = msgs.clone(); let msgs__ = msgs.clone(); - let rsps = streamer::allocate(recycler.clone()); + let rsps = streamer::allocate(&recycler); let rsps_ = rsps.clone(); let l = msgs__.read().unwrap().packets.len(); rsps.write() @@ -124,7 +124,7 @@ impl AccountantSkel { ursps.packets.resize(num, streamer::Packet::default()); } s_sender.send(rsps_)?; - streamer::recycle(recycler, msgs_); + streamer::recycle(&recycler, msgs_); Ok(()) } diff --git a/src/streamer.rs b/src/streamer.rs index 817c070c72..14e1105883 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -65,8 +65,8 @@ impl Packet { } pub fn set_addr(&mut self, a: &SocketAddr) { - match a { - &SocketAddr::V4(v4) => { + match *a { + SocketAddr::V4(v4) => { let ip = v4.ip().octets(); self.addr[0] = ip[0] as u16; self.addr[1] = ip[1] as u16; @@ -74,7 +74,7 @@ impl Packet { self.addr[3] = ip[3] as u16; self.port = a.port(); } - &SocketAddr::V6(v6) => { + SocketAddr::V6(v6) => { self.addr = v6.ip().segments(); self.port = a.port(); self.v6 = true; @@ -102,7 +102,7 @@ impl PacketData { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { self.packets.resize(BLOCK_SIZE, Packet::default()); let mut i = 0; - for p in self.packets.iter_mut() { + for p in &mut self.packets { p.size = 0; match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { @@ -131,7 +131,7 @@ impl PacketData { Ok(()) } fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { - for p in self.packets.iter() { + for p in &self.packets { let a = p.get_addr(); socket.send_to(&p.data[0..p.size], &a)?; //TODO(anatoly): wtf do we do about errors? @@ -141,35 +141,35 @@ impl PacketData { } } -pub fn allocate(recycler: Recycler) -> SharedPacketData { +pub fn allocate(recycler: &Recycler) -> SharedPacketData { let mut gc = recycler.lock().expect("lock"); gc.pop() .unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new()))) } -pub fn recycle(recycler: Recycler, msgs: SharedPacketData) { +pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) { let mut gc = recycler.lock().expect("lock"); gc.push(msgs); } fn recv_loop( sock: &UdpSocket, - exit: Arc, - recycler: Recycler, - channel: Sender, + exit: &Arc, + recycler: &Recycler, + channel: &Sender, ) -> Result<()> { loop { - let msgs = allocate(recycler.clone()); + let msgs = allocate(recycler); let msgs_ = msgs.clone(); loop { - match msgs.write().unwrap().read_from(&sock) { + match msgs.write().unwrap().read_from(sock) { Ok(()) => { channel.send(msgs_)?; break; } Err(_) => { if exit.load(Ordering::Relaxed) { - recycle(recycler.clone(), msgs_); + recycle(recycler, msgs_); return Ok(()); } } @@ -187,12 +187,12 @@ pub fn receiver( let timer = Duration::new(1, 0); sock.set_read_timeout(Some(timer))?; Ok(spawn(move || { - let _ = recv_loop(&sock, exit, recycler, channel); + let _ = recv_loop(&sock, &exit, &recycler, &channel); () })) } -fn recv_send(sock: &UdpSocket, recycler: Recycler, r: &Receiver) -> Result<()> { +fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r.recv_timeout(timer)?; let msgs_ = msgs.clone(); @@ -209,7 +209,7 @@ pub fn sender( r: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - if recv_send(&sock, recycler.clone(), &r).is_err() && exit.load(Ordering::Relaxed) { + if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { break; } }) @@ -229,9 +229,9 @@ mod bench { use result::Result; use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE}; - fn producer(addr: &SocketAddr, recycler: Recycler, exit: Arc) -> JoinHandle<()> { + fn producer(addr: &SocketAddr, recycler: &Recycler, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let msgs = allocate(recycler.clone()); + let msgs = allocate(recycler); msgs.write().unwrap().packets.resize(10, Packet::default()); for w in msgs.write().unwrap().packets.iter_mut() { w.size = PACKET_SIZE; @@ -248,7 +248,7 @@ mod bench { } fn sinc( - recycler: Recycler, + recycler: &Recycler, exit: Arc, rvs: Arc>, r: Receiver, @@ -262,7 +262,7 @@ mod bench { Ok(msgs) => { let msgs_ = msgs.clone(); *rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); - recycle(recycler.clone(), msgs_); + recycle(recycler, msgs_); } _ => (), } @@ -275,10 +275,10 @@ mod bench { let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?; - let t_producer1 = producer(&addr, recycler.clone(), exit.clone()); - let t_producer2 = producer(&addr, recycler.clone(), exit.clone()); - let t_producer3 = producer(&addr, recycler.clone(), exit.clone()); + let t_reader = receiver(read, exit.clone(), &recycler, s_reader)?; + let t_producer1 = producer(&addr, &recycler, exit.clone()); + let t_producer2 = producer(&addr, &recycler, exit.clone()); + let t_producer3 = producer(&addr, &recycler, exit.clone()); let rvs = Arc::new(Mutex::new(0)); let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader); @@ -341,7 +341,7 @@ mod test { let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); let (s_sender, r_sender) = channel(); let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); - let msgs = allocate(recycler.clone()); + let msgs = allocate(&recycler); msgs.write().unwrap().packets.resize(10, Packet::default()); for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { w.data[0] = i as u8; @@ -372,7 +372,7 @@ mod test { let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); let (s_sender, r_sender) = channel(); let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); - let msgs = allocate(recycler.clone()); + let msgs = allocate(&recycler); msgs.write().unwrap().packets.resize(10, Packet::default()); for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { w.data[0] = i as u8;