Mutex<bool> -> AtomicBool

This commit is contained in:
Greg Fitzgerald
2018-03-22 14:05:23 -06:00
parent fde320e2f2
commit 803dcb0800
5 changed files with 25 additions and 20 deletions

View File

@ -9,6 +9,7 @@ use result::Result;
use streamer; use streamer;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::default::Default; use std::default::Default;
@ -131,7 +132,7 @@ impl AccountantSkel {
pub fn serve( pub fn serve(
obj: Arc<Mutex<AccountantSkel>>, obj: Arc<Mutex<AccountantSkel>>,
addr: &str, addr: &str,
exit: Arc<Mutex<bool>>, exit: Arc<AtomicBool>,
) -> Result<[Arc<JoinHandle<()>>; 3]> { ) -> Result<[Arc<JoinHandle<()>>; 3]> {
let read = UdpSocket::bind(addr)?; let read = UdpSocket::bind(addr)?;
// make sure we are on the same interface // make sure we are on the same interface
@ -152,7 +153,7 @@ impl AccountantSkel {
let e = me.lock() let e = me.lock()
.unwrap() .unwrap()
.process(&r_reader, &s_sender, recycler.clone()); .process(&r_reader, &s_sender, recycler.clone());
if e.is_err() && *exit.lock().unwrap() { if e.is_err() && exit.load(Ordering::Relaxed) {
break; break;
} }
}, },

View File

@ -127,6 +127,7 @@ mod tests {
use mint::Mint; use mint::Mint;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
#[test] #[test]
fn test_accountant_stub() { fn test_accountant_stub() {
@ -135,7 +136,7 @@ mod tests {
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let acc = Accountant::new(&alice, Some(30)); let acc = Accountant::new(&alice, Some(30));
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(Mutex::new(false)); let exit = Arc::new(AtomicBool::new(false));
let acc = Arc::new(Mutex::new(AccountantSkel::new(acc))); let acc = Arc::new(Mutex::new(AccountantSkel::new(acc)));
let threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); let threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap();
sleep(Duration::from_millis(30)); sleep(Duration::from_millis(30));
@ -147,7 +148,7 @@ mod tests {
.unwrap(); .unwrap();
acc.wait_on_signature(&sig, &last_id).unwrap(); acc.wait_on_signature(&sig, &last_id).unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500); assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500);
*exit.lock().unwrap() = true; exit.store(true, Ordering::Relaxed);
for t in threads.iter() { for t in threads.iter() {
match Arc::try_unwrap((*t).clone()) { match Arc::try_unwrap((*t).clone()) {
Ok(j) => j.join().expect("join"), Ok(j) => j.join().expect("join"),

View File

@ -10,7 +10,7 @@ use silk::hash::Hash;
use std::io::stdin; use std::io::stdin;
fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event { fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event {
Event::Transaction(Transaction::new(&from, to, tokens, last_id)) Event::Transaction(Transaction::new(from, to, tokens, last_id))
} }
fn main() { fn main() {

View File

@ -5,6 +5,7 @@ use silk::accountant_skel::AccountantSkel;
use silk::accountant::Accountant; use silk::accountant::Accountant;
use std::io::{self, BufRead}; use std::io::{self, BufRead};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicBool;
fn main() { fn main() {
let addr = "127.0.0.1:8000"; let addr = "127.0.0.1:8000";
@ -14,7 +15,7 @@ fn main() {
.lines() .lines()
.map(|line| serde_json::from_str(&line.unwrap()).unwrap()); .map(|line| serde_json::from_str(&line.unwrap()).unwrap());
let acc = Accountant::new_from_entries(entries, Some(1000)); let acc = Accountant::new_from_entries(entries, Some(1000));
let exit = Arc::new(Mutex::new(false)); let exit = Arc::new(AtomicBool::new(false));
let skel = Arc::new(Mutex::new(AccountantSkel::new(acc))); let skel = Arc::new(Mutex::new(AccountantSkel::new(acc)));
eprintln!("Listening on {}", addr); eprintln!("Listening on {}", addr);
let _threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap(); let _threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap();

View File

@ -1,4 +1,5 @@
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc; use std::sync::mpsc;
use std::fmt; use std::fmt;
use std::time::Duration; use std::time::Duration;
@ -153,7 +154,7 @@ pub fn recycle(recycler: Recycler, msgs: SharedPacketData) {
fn recv_loop( fn recv_loop(
sock: &UdpSocket, sock: &UdpSocket,
exit: Arc<Mutex<bool>>, exit: Arc<AtomicBool>,
recycler: Recycler, recycler: Recycler,
channel: Sender, channel: Sender,
) -> Result<()> { ) -> Result<()> {
@ -167,7 +168,7 @@ fn recv_loop(
break; break;
} }
Err(_) => { Err(_) => {
if *exit.lock().unwrap() { if exit.load(Ordering::Relaxed) {
recycle(recycler.clone(), msgs_); recycle(recycler.clone(), msgs_);
return Ok(()); return Ok(());
} }
@ -179,7 +180,7 @@ fn recv_loop(
pub fn receiver( pub fn receiver(
sock: UdpSocket, sock: UdpSocket,
exit: Arc<Mutex<bool>>, exit: Arc<AtomicBool>,
recycler: Recycler, recycler: Recycler,
channel: Sender, channel: Sender,
) -> Result<JoinHandle<()>> { ) -> Result<JoinHandle<()>> {
@ -203,12 +204,12 @@ fn recv_send(sock: &UdpSocket, recycler: Recycler, r: &Receiver) -> Result<()> {
pub fn sender( pub fn sender(
sock: UdpSocket, sock: UdpSocket,
exit: Arc<Mutex<bool>>, exit: Arc<AtomicBool>,
recycler: Recycler, recycler: Recycler,
r: Receiver, r: Receiver,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
if recv_send(&sock, recycler.clone(), &r).is_err() && *exit.lock().unwrap() { if recv_send(&sock, recycler.clone(), &r).is_err() && exit.load(Ordering::Relaxed) {
break; break;
} }
}) })
@ -228,7 +229,7 @@ mod bench {
use result::Result; use result::Result;
use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE}; use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE};
fn producer(addr: &SocketAddr, recycler: Recycler, exit: Arc<Mutex<bool>>) -> JoinHandle<()> { fn producer(addr: &SocketAddr, recycler: Recycler, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap(); let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let msgs = allocate(recycler.clone()); let msgs = allocate(recycler.clone());
msgs.write().unwrap().packets.resize(10, Packet::default()); msgs.write().unwrap().packets.resize(10, Packet::default());
@ -237,7 +238,7 @@ mod bench {
w.set_addr(&addr); w.set_addr(&addr);
} }
spawn(move || loop { spawn(move || loop {
if *exit.lock().unwrap() { if exit.load(Ordering::Relaxed) {
return; return;
} }
let mut num = 0; let mut num = 0;
@ -248,12 +249,12 @@ mod bench {
fn sinc( fn sinc(
recycler: Recycler, recycler: Recycler,
exit: Arc<Mutex<bool>>, exit: Arc<AtomicBool>,
rvs: Arc<Mutex<usize>>, rvs: Arc<Mutex<usize>>,
r: Receiver, r: Receiver,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
if *exit.lock().unwrap() { if exit.load(Ordering::Relaxed) {
return; return;
} }
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
@ -270,7 +271,7 @@ mod bench {
fn run_streamer_bench() -> Result<()> { fn run_streamer_bench() -> Result<()> {
let read = UdpSocket::bind("127.0.0.1:0")?; let read = UdpSocket::bind("127.0.0.1:0")?;
let addr = read.local_addr()?; let addr = read.local_addr()?;
let exit = Arc::new(Mutex::new(false)); let exit = Arc::new(AtomicBool::new(false));
let recycler = Arc::new(Mutex::new(Vec::new())); let recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
@ -291,7 +292,7 @@ mod bench {
let ftime = (time as f64) / 10000000000f64; let ftime = (time as f64) / 10000000000f64;
let fcount = (end_val - start_val) as f64; let fcount = (end_val - start_val) as f64;
println!("performance: {:?}", fcount / ftime); println!("performance: {:?}", fcount / ftime);
*exit.lock().unwrap() = true; exit.store(true, Ordering::Relaxed);
t_reader.join()?; t_reader.join()?;
t_producer1.join()?; t_producer1.join()?;
t_producer2.join()?; t_producer2.join()?;
@ -310,6 +311,7 @@ mod test {
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::time::Duration; use std::time::Duration;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::io::Write; use std::io::Write;
use std::io; use std::io;
@ -351,7 +353,7 @@ mod test {
let mut num = 0; let mut num = 0;
get_msgs(r_reader, &mut num); get_msgs(r_reader, &mut num);
assert_eq!(num, 10); assert_eq!(num, 10);
*exit.lock().unwrap() = true; exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join"); t_receiver.join().expect("join");
t_sender.join().expect("join"); t_sender.join().expect("join");
} }
@ -364,7 +366,7 @@ mod test {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap(); let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(Mutex::new(false)); let exit = Arc::new(AtomicBool::new(false));
let recycler = Arc::new(Mutex::new(Vec::new())); let recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
@ -382,7 +384,7 @@ mod test {
let mut num = 0; let mut num = 0;
get_msgs(r_reader, &mut num); get_msgs(r_reader, &mut num);
assert_eq!(num, 10); assert_eq!(num, 10);
*exit.lock().unwrap() = true; exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join"); t_receiver.join().expect("join");
t_sender.join().expect("join"); t_sender.join().expect("join");
} }