From 533b3170a7a28d5a8d7c4dda7dbb488118ff5c48 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 24 Mar 2018 23:31:54 -0700 Subject: [PATCH] responder --- src/services.rs | 221 ------------------------------------------------ src/streamer.rs | 97 ++++++++++++++------- 2 files changed, 66 insertions(+), 252 deletions(-) delete mode 100644 src/services.rs diff --git a/src/services.rs b/src/services.rs deleted file mode 100644 index 25fc795207..0000000000 --- a/src/services.rs +++ /dev/null @@ -1,221 +0,0 @@ -//! Small services library with named ports -//! see test for usage - -use std::sync::{Arc, Mutex, RwLock}; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::thread::{spawn, JoinHandle}; -use std::time::Duration; -use streamer; -use result::Result; -use result::Error; - -pub enum Port { - Main, - PacketReader, - Accountant, - PacketSender, -} - -impl Port { - fn to_usize(self) -> usize { - match self { - Port::Main => 0, - Port::PacketReader => 1, - Port::Accountant => 2, - Port::PacketSender => 3, - } - } -} - -#[derive(Clone)] -pub enum Data { - Signal, - SharedPacketData(streamer::SharedPacketData), -} - -struct Locked { - ports: Vec>, - readers: Vec>>>, - threads: Vec>>>>, -} - -pub struct Services { - lock: Arc>, - exit: Arc, -} - -pub type Ports = Vec>; - -impl Services { - pub fn new() -> Services { - let (s1, r1) = channel(); - let (s2, r2) = channel(); - let (s3, r3) = channel(); - let (s4, r4) = channel(); - let (s5, r5) = channel(); - let locked = Locked { - ports: [s1, s2, s3, s4, s5].to_vec(), - readers: [ - Arc::new(Mutex::new(r1)), - Arc::new(Mutex::new(r2)), - Arc::new(Mutex::new(r3)), - Arc::new(Mutex::new(r4)), - Arc::new(Mutex::new(r5)), - ].to_vec(), - threads: [ - Arc::new(None), - Arc::new(None), - Arc::new(None), - Arc::new(None), - Arc::new(None), - ].to_vec(), - }; - let exit = Arc::new(AtomicBool::new(false)); - Services { - lock: Arc::new(RwLock::new(locked)), - exit: exit, - } - } - pub fn source(&self, port: Port, func: F) -> Result<()> - where - F: Send + 'static + Fn(&Ports) -> Result<()>, - { - let mut w = self.lock.write().unwrap(); - let pz = port.to_usize(); - if w.threads[pz].is_some() { - return Err(Error::Services); - } - let c_ports = w.ports.clone(); - let c_exit = self.exit.clone(); - let j = spawn(move || loop { - match func(&c_ports) { - Ok(()) => (), - e => return e, - } - if c_exit.load(Ordering::Relaxed) == true { - return Ok(()); - } - }); - w.threads[pz] = Arc::new(Some(j)); - return Ok(()); - } - pub fn listen(&mut self, port: Port, func: F) -> Result<()> - where - F: Send + 'static + Fn(&Ports, Data) -> Result<()>, - { - let mut w = self.lock.write().unwrap(); - let pz = port.to_usize(); - if w.threads[pz].is_some() { - return Err(Error::Services); - } - let recv_lock = w.readers[pz].clone(); - let c_ports = w.ports.clone(); - let c_exit = self.exit.clone(); - let j: JoinHandle> = spawn(move || loop { - let recv = recv_lock.lock().unwrap(); - let timer = Duration::new(0, 500000); - match recv.recv_timeout(timer) { - Ok(val) => func(&c_ports, val).expect("services listen"), - _ => (), - } - if c_exit.load(Ordering::Relaxed) { - return Ok(()); - } - }); - w.threads[pz] = Arc::new(Some(j)); - return Ok(()); - } - pub fn send(ports: &Ports, to: Port, m: Data) -> Result<()> { - ports[to.to_usize()] - .send(m) - .or_else(|_| Err(Error::SendError)) - } - pub fn join(&mut self) -> Result<()> { - let pz = Port::Main.to_usize(); - let recv = self.lock.write().unwrap().readers[pz].clone(); - recv.lock().unwrap().recv()?; - self.shutdown()?; - return Ok(()); - } - pub fn shutdown(&mut self) -> Result<()> { - self.exit.store(true, Ordering::Relaxed); - let r = self.lock.read().unwrap(); - for t in r.threads.iter() { - match Arc::try_unwrap((*t).clone()) { - Ok(Some(j)) => j.join()??, - _ => (), - }; - } - return Ok(()); - } -} - -#[cfg(test)] -mod test { - use services::Services; - use services::Port::{Accountant, Main, PacketReader}; - use services::Data::Signal; - use std::sync::{Arc, Mutex}; - - #[test] - fn test_init() { - let mut o = Services::new(); - assert_matches!(o.shutdown(), Ok(())); - } - #[test] - fn test_join() { - let mut o = Services::new(); - assert_matches!( - o.source(PacketReader, move |ports| Services::send( - ports, - Main, - Signal - )), - Ok(()) - ); - assert_matches!(o.join(), Ok(())); - } - #[test] - fn test_source() { - let mut o = Services::new(); - assert_matches!( - o.source(PacketReader, move |ports| Services::send( - ports, - Main, - Signal - )), - Ok(()) - ); - assert!(o.source(PacketReader, move |_ports| Ok(())).is_err()); - assert!(o.listen(PacketReader, move |_ports, _data| Ok(())).is_err()); - assert_matches!(o.join(), Ok(())); - } - #[test] - fn test_listen() { - let mut o = Services::new(); - let val = Arc::new(Mutex::new(false)); - assert_matches!( - o.source(PacketReader, move |ports| Services::send( - ports, - Accountant, - Signal - )), - Ok(()) - ); - let c_val = val.clone(); - assert_matches!( - o.listen(Accountant, move |ports, data| match data { - Signal => { - *c_val.lock().unwrap() = true; - Services::send(ports, Main, Signal) - } - _ => Ok(()), - }), - Ok(()) - ); - assert_matches!(o.join(), Ok(())); - assert_eq!(*val.lock().unwrap(), true); - } - -} diff --git a/src/streamer.rs b/src/streamer.rs index a7cea82c45..f187d68490 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -9,6 +9,9 @@ use result::{Error, Result}; const BLOCK_SIZE: usize = 1024 * 8; pub const PACKET_SIZE: usize = 256; +pub const RESP_SIZE: usize = 64 * 1024; +pub const NUM_RESP: usize = (BLOCK_SIZE * PACKET_SIZE) / RESP_SIZE; + #[derive(Clone, Default)] pub struct Meta { pub size: usize, @@ -85,21 +88,48 @@ impl Meta { } #[derive(Clone, Debug, Default)] -pub struct PacketData { +pub struct Packets { pub packets: Vec, } -pub type SharedPacketData = Arc>; -pub type Recycler = Arc>>; -pub type Receiver = mpsc::Receiver; -pub type Sender = mpsc::Sender; +#[derive(Clone, Debug, Default)] +pub struct Response { + pub resp: [u8; RESP_SIZE], + pub meta: Meta, +} -impl PacketData { - pub fn new() -> PacketData { - PacketData { +pub struct Responses { + pub responses: Vec, +} + + +pub type SharedPackets = Arc>; +pub type PacketRecycler = Arc>>; +pub type Receiver = mpsc::Receiver; +pub type Sender = mpsc::Sender; +pub type SharedResponses = Arc>; +pub type ResponseRecycler = Arc>>; +pub type Responder = mpsc::Sender; +pub type ResponseReceiver = mpsc::Receiver; + +impl Default for Responses { + pub fn default() -> Responses { + Responses { + packets: vec![Response::default(); NUM_RESP], + } + } +} + + +impl Default for Packets { + pub fn default() -> Packets { + Packets { packets: vec![Packet::default(); BLOCK_SIZE], } } +} + +impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { self.packets.resize(BLOCK_SIZE, Packet::default()); let mut i = 0; @@ -142,13 +172,17 @@ impl PacketData { } } -pub fn allocate(recycler: &Recycler) -> SharedPacketData { +pub fn allocate(recycler: &Arc>>) -> Arc> +where T: Default +{ let mut gc = recycler.lock().expect("lock"); gc.pop() - .unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new()))) + .unwrap_or_else(|| Arc::new(RwLock::new(T::default()))) } -pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) { +pub fn recycle(recycler: &Arc>>, msgs: Arc>) +where T: Default +{ let mut gc = recycler.lock().expect("lock"); gc.push(msgs); } @@ -156,7 +190,7 @@ pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) { fn recv_loop( sock: &UdpSocket, exit: &Arc, - recycler: &Recycler, + recycler: &PacketRecycler, channel: &Sender, ) -> Result<()> { loop { @@ -182,7 +216,7 @@ fn recv_loop( pub fn receiver( sock: UdpSocket, exit: Arc, - recycler: Recycler, + recycler: PacketRecycler, channel: Sender, ) -> Result> { let timer = Duration::new(1, 0); @@ -193,7 +227,7 @@ pub fn receiver( })) } -fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> { +fn recv_send(sock: &UdpSocket, recycler: &ResponseRecycler, r: &ResponseReceiver) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r.recv_timeout(timer)?; let msgs_ = msgs.clone(); @@ -203,11 +237,11 @@ fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> Ok(()) } -pub fn sender( +pub fn responder( sock: UdpSocket, exit: Arc, - recycler: Recycler, - r: Receiver, + recycler: ResponseRecycler, + r: ResponseReceiver, ) -> JoinHandle<()> { spawn(move || loop { if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { @@ -316,7 +350,7 @@ mod test { use std::sync::mpsc::channel; use std::io::Write; use std::io; - use streamer::{allocate, receiver, sender, Packet, Receiver, PACKET_SIZE}; + use streamer::{allocate, receiver, responder, Packet, Receiver, PACKET_SIZE}; fn get_msgs(r: Receiver, num: &mut usize) { for _t in 0..5 { @@ -340,8 +374,8 @@ mod test { let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); - let (s_sender, r_sender) = channel(); - let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); + let (s_responder, r_responder) = channel(); + let t_responder = responder(send, exit.clone(), recycler.clone(), r_responder); let msgs = allocate(&recycler); msgs.write().unwrap().packets.resize(10, Packet::default()); for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { @@ -350,13 +384,13 @@ mod test { w.set_addr(&addr); assert_eq!(w.get_addr(), addr); } - s_sender.send(msgs).expect("send"); + s_responder.send(msgs).expect("send"); let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); - t_sender.join().expect("join"); + t_responder.join().expect("join"); } #[test] pub fn streamer_debug() { @@ -368,25 +402,26 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let recycler = Arc::new(Mutex::new(Vec::new())); + let packet_recycler = Arc::new(Mutex::new(Vec::new())); + let resp_recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); - let (s_sender, r_sender) = channel(); - let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); - let msgs = allocate(&recycler); - msgs.write().unwrap().packets.resize(10, Packet::default()); - for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { + let t_receiver = receiver(read, exit.clone(), packet_recycler.clone(), s_reader).unwrap(); + let (s_responder, r_responder) = channel(); + let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let msgs = allocate(&resp_recycler); + msgs.write().unwrap().responses.resize(10, Responses::default()); + for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() { w.data[0] = i as u8; w.meta.size = PACKET_SIZE; w.meta.set_addr(&addr); assert_eq!(w.meta.get_addr(), addr); } - s_sender.send(msgs).expect("send"); + s_responder.send(msgs).expect("send"); let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); - t_sender.join().expect("join"); + t_responder.join().expect("join"); } }