Merge pull request #200 from jackson-sandland/153-panic-cleanup
issue #153 - panic cleanup
This commit is contained in:
@ -16,8 +16,8 @@ use signature::{KeyPair, PublicKey, Signature};
|
|||||||
use std::collections::hash_map::Entry::Occupied;
|
use std::collections::hash_map::Entry::Occupied;
|
||||||
use std::collections::{HashMap, HashSet, VecDeque};
|
use std::collections::{HashMap, HashSet, VecDeque};
|
||||||
use std::result;
|
use std::result;
|
||||||
use std::sync::RwLock;
|
|
||||||
use std::sync::atomic::{AtomicIsize, Ordering};
|
use std::sync::atomic::{AtomicIsize, Ordering};
|
||||||
|
use std::sync::RwLock;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
pub const MAX_ENTRY_IDS: usize = 1024 * 4;
|
pub const MAX_ENTRY_IDS: usize = 1024 * 4;
|
||||||
@ -34,13 +34,17 @@ pub type Result<T> = result::Result<T, AccountingError>;
|
|||||||
/// Commit funds to the 'to' party.
|
/// Commit funds to the 'to' party.
|
||||||
fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &Payment) {
|
fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &Payment) {
|
||||||
// First we check balances with a read lock to maximize potential parallelization.
|
// First we check balances with a read lock to maximize potential parallelization.
|
||||||
if balances.read().unwrap().contains_key(&payment.to) {
|
if balances
|
||||||
let bals = balances.read().unwrap();
|
.read()
|
||||||
|
.expect("'balances' read lock in apply_payment")
|
||||||
|
.contains_key(&payment.to)
|
||||||
|
{
|
||||||
|
let bals = balances.read().expect("'balances' read lock");
|
||||||
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
|
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
|
||||||
} else {
|
} else {
|
||||||
// Now we know the key wasn't present a nanosecond ago, but it might be there
|
// Now we know the key wasn't present a nanosecond ago, but it might be there
|
||||||
// by the time we aquire a write lock, so we'll have to check again.
|
// by the time we aquire a write lock, so we'll have to check again.
|
||||||
let mut bals = balances.write().unwrap();
|
let mut bals = balances.write().expect("'balances' write lock");
|
||||||
if bals.contains_key(&payment.to) {
|
if bals.contains_key(&payment.to) {
|
||||||
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
|
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
|
||||||
} else {
|
} else {
|
||||||
@ -84,27 +88,37 @@ impl Accountant {
|
|||||||
|
|
||||||
/// Return the last entry ID registered
|
/// Return the last entry ID registered
|
||||||
pub fn last_id(&self) -> Hash {
|
pub fn last_id(&self) -> Hash {
|
||||||
let last_ids = self.last_ids.read().unwrap();
|
let last_ids = self.last_ids.read().expect("'last_ids' read lock");
|
||||||
let last_item = last_ids.iter().last().expect("empty last_ids list");
|
let last_item = last_ids.iter().last().expect("empty 'last_ids' list");
|
||||||
last_item.0
|
last_item.0
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reserve_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
|
fn reserve_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
|
||||||
if signatures.read().unwrap().contains(sig) {
|
if signatures
|
||||||
|
.read()
|
||||||
|
.expect("'signatures' read lock")
|
||||||
|
.contains(sig)
|
||||||
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
signatures.write().unwrap().insert(*sig);
|
signatures
|
||||||
|
.write()
|
||||||
|
.expect("'signatures' write lock")
|
||||||
|
.insert(*sig);
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
fn forget_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
|
fn forget_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
|
||||||
signatures.write().unwrap().remove(sig)
|
signatures
|
||||||
|
.write()
|
||||||
|
.expect("'signatures' write lock in forget_signature")
|
||||||
|
.remove(sig)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool {
|
fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool {
|
||||||
if let Some(entry) = self.last_ids
|
if let Some(entry) = self.last_ids
|
||||||
.read()
|
.read()
|
||||||
.unwrap()
|
.expect("'last_ids' read lock in forget_signature_with_last_id")
|
||||||
.iter()
|
.iter()
|
||||||
.rev()
|
.rev()
|
||||||
.find(|x| x.0 == *last_id)
|
.find(|x| x.0 == *last_id)
|
||||||
@ -117,7 +131,7 @@ impl Accountant {
|
|||||||
fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool {
|
fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool {
|
||||||
if let Some(entry) = self.last_ids
|
if let Some(entry) = self.last_ids
|
||||||
.read()
|
.read()
|
||||||
.unwrap()
|
.expect("'last_ids' read lock in reserve_signature_with_last_id")
|
||||||
.iter()
|
.iter()
|
||||||
.rev()
|
.rev()
|
||||||
.find(|x| x.0 == *last_id)
|
.find(|x| x.0 == *last_id)
|
||||||
@ -132,7 +146,9 @@ impl Accountant {
|
|||||||
/// the oldest ones once its internal cache is full. Once boot, the
|
/// the oldest ones once its internal cache is full. Once boot, the
|
||||||
/// accountant will reject transactions using that `last_id`.
|
/// accountant will reject transactions using that `last_id`.
|
||||||
pub fn register_entry_id(&self, last_id: &Hash) {
|
pub fn register_entry_id(&self, last_id: &Hash) {
|
||||||
let mut last_ids = self.last_ids.write().unwrap();
|
let mut last_ids = self.last_ids
|
||||||
|
.write()
|
||||||
|
.expect("'last_ids' write lock in register_entry_id");
|
||||||
if last_ids.len() >= MAX_ENTRY_IDS {
|
if last_ids.len() >= MAX_ENTRY_IDS {
|
||||||
last_ids.pop_front();
|
last_ids.pop_front();
|
||||||
}
|
}
|
||||||
@ -142,7 +158,9 @@ impl Accountant {
|
|||||||
/// Deduct tokens from the 'from' address the account has sufficient
|
/// Deduct tokens from the 'from' address the account has sufficient
|
||||||
/// funds and isn't a duplicate.
|
/// funds and isn't a duplicate.
|
||||||
pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> {
|
pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> {
|
||||||
let bals = self.balances.read().unwrap();
|
let bals = self.balances
|
||||||
|
.read()
|
||||||
|
.expect("'balances' read lock in process_verified_transaction_debits");
|
||||||
let option = bals.get(&tr.from);
|
let option = bals.get(&tr.from);
|
||||||
|
|
||||||
if option.is_none() {
|
if option.is_none() {
|
||||||
@ -154,7 +172,7 @@ impl Accountant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let bal = option.unwrap();
|
let bal = option.expect("assignment of option to bal");
|
||||||
let current = bal.load(Ordering::Relaxed) as i64;
|
let current = bal.load(Ordering::Relaxed) as i64;
|
||||||
|
|
||||||
if current < tr.data.tokens {
|
if current < tr.data.tokens {
|
||||||
@ -178,12 +196,16 @@ impl Accountant {
|
|||||||
|
|
||||||
pub fn process_verified_transaction_credits(&self, tr: &Transaction) {
|
pub fn process_verified_transaction_credits(&self, tr: &Transaction) {
|
||||||
let mut plan = tr.data.plan.clone();
|
let mut plan = tr.data.plan.clone();
|
||||||
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap()));
|
plan.apply_witness(&Witness::Timestamp(*self.last_time
|
||||||
|
.read()
|
||||||
|
.expect("timestamp creation in process_verified_transaction_credits")));
|
||||||
|
|
||||||
if let Some(ref payment) = plan.final_payment() {
|
if let Some(ref payment) = plan.final_payment() {
|
||||||
apply_payment(&self.balances, payment);
|
apply_payment(&self.balances, payment);
|
||||||
} else {
|
} else {
|
||||||
let mut pending = self.pending.write().unwrap();
|
let mut pending = self.pending
|
||||||
|
.write()
|
||||||
|
.expect("'pending' write lock in process_verified_transaction_credits");
|
||||||
pending.insert(tr.sig, plan);
|
pending.insert(tr.sig, plan);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -252,7 +274,11 @@ impl Accountant {
|
|||||||
|
|
||||||
/// Process a Witness Signature that has already been verified.
|
/// Process a Witness Signature that has already been verified.
|
||||||
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
|
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
|
||||||
if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) {
|
if let Occupied(mut e) = self.pending
|
||||||
|
.write()
|
||||||
|
.expect("write() in process_verified_sig")
|
||||||
|
.entry(tx_sig)
|
||||||
|
{
|
||||||
e.get_mut().apply_witness(&Witness::Signature(from));
|
e.get_mut().apply_witness(&Witness::Signature(from));
|
||||||
if let Some(ref payment) = e.get().final_payment() {
|
if let Some(ref payment) = e.get().final_payment() {
|
||||||
apply_payment(&self.balances, payment);
|
apply_payment(&self.balances, payment);
|
||||||
@ -267,13 +293,24 @@ impl Accountant {
|
|||||||
fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> {
|
fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> {
|
||||||
// If this is the first timestamp we've seen, it probably came from the genesis block,
|
// If this is the first timestamp we've seen, it probably came from the genesis block,
|
||||||
// so we'll trust it.
|
// so we'll trust it.
|
||||||
if *self.last_time.read().unwrap() == Utc.timestamp(0, 0) {
|
if *self.last_time
|
||||||
self.time_sources.write().unwrap().insert(from);
|
.read()
|
||||||
|
.expect("'last_time' read lock on first timestamp check")
|
||||||
|
== Utc.timestamp(0, 0)
|
||||||
|
{
|
||||||
|
self.time_sources
|
||||||
|
.write()
|
||||||
|
.expect("'time_sources' write lock on first timestamp")
|
||||||
|
.insert(from);
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.time_sources.read().unwrap().contains(&from) {
|
if self.time_sources
|
||||||
if dt > *self.last_time.read().unwrap() {
|
.read()
|
||||||
*self.last_time.write().unwrap() = dt;
|
.expect("'time_sources' read lock")
|
||||||
|
.contains(&from)
|
||||||
|
{
|
||||||
|
if dt > *self.last_time.read().expect("'last_time' read lock") {
|
||||||
|
*self.last_time.write().expect("'last_time' write lock") = dt;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@ -284,9 +321,13 @@ impl Accountant {
|
|||||||
|
|
||||||
// Hold 'pending' write lock until the end of this function. Otherwise another thread can
|
// Hold 'pending' write lock until the end of this function. Otherwise another thread can
|
||||||
// double-spend if it enters before the modified plan is removed from 'pending'.
|
// double-spend if it enters before the modified plan is removed from 'pending'.
|
||||||
let mut pending = self.pending.write().unwrap();
|
let mut pending = self.pending
|
||||||
|
.write()
|
||||||
|
.expect("'pending' write lock in process_verified_timestamp");
|
||||||
for (key, plan) in pending.iter_mut() {
|
for (key, plan) in pending.iter_mut() {
|
||||||
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap()));
|
plan.apply_witness(&Witness::Timestamp(*self.last_time
|
||||||
|
.read()
|
||||||
|
.expect("'last_time' read lock when creating timestamp")));
|
||||||
if let Some(ref payment) = plan.final_payment() {
|
if let Some(ref payment) = plan.final_payment() {
|
||||||
apply_payment(&self.balances, payment);
|
apply_payment(&self.balances, payment);
|
||||||
completed.push(key.clone());
|
completed.push(key.clone());
|
||||||
@ -341,7 +382,9 @@ impl Accountant {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
|
pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
|
||||||
let bals = self.balances.read().unwrap();
|
let bals = self.balances
|
||||||
|
.read()
|
||||||
|
.expect("'balances' read lock in get_balance");
|
||||||
bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64)
|
bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,8 +17,8 @@ use std::env;
|
|||||||
use std::io::{stdin, stdout, Read};
|
use std::io::{stdin, stdout, Read};
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
fn print_usage(program: &str, opts: Options) {
|
fn print_usage(program: &str, opts: Options) {
|
||||||
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
|
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);
|
||||||
|
58
src/crdt.rs
58
src/crdt.rs
@ -168,7 +168,7 @@ impl Crdt {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
|
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
|
||||||
// copy to avoid locking durring IO
|
// copy to avoid locking durring IO
|
||||||
let robj = obj.read().unwrap();
|
let robj = obj.read().expect("'obj' read lock in pub fn broadcast");
|
||||||
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
|
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
|
||||||
(robj.table[&robj.me].clone(), cloned_table)
|
(robj.table[&robj.me].clone(), cloned_table)
|
||||||
};
|
};
|
||||||
@ -194,10 +194,10 @@ impl Crdt {
|
|||||||
.map(|((i, v), b)| {
|
.map(|((i, v), b)| {
|
||||||
// only leader should be broadcasting
|
// only leader should be broadcasting
|
||||||
assert!(me.current_leader_id != v.id);
|
assert!(me.current_leader_id != v.id);
|
||||||
let mut blob = b.write().unwrap();
|
let mut blob = b.write().expect("'b' write lock in pub fn broadcast");
|
||||||
blob.set_id(me.id).expect("set_id");
|
blob.set_id(me.id).expect("set_id in pub fn broadcast");
|
||||||
blob.set_index(*transmit_index + i as u64)
|
blob.set_index(*transmit_index + i as u64)
|
||||||
.expect("set_index");
|
.expect("set_index in pub fn broadcast");
|
||||||
//TODO profile this, may need multiple sockets for par_iter
|
//TODO profile this, may need multiple sockets for par_iter
|
||||||
s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr)
|
s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr)
|
||||||
})
|
})
|
||||||
@ -219,10 +219,10 @@ impl Crdt {
|
|||||||
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
|
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
|
||||||
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
|
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
|
||||||
// copy to avoid locking durring IO
|
// copy to avoid locking durring IO
|
||||||
let s = obj.read().unwrap();
|
let s = obj.read().expect("'obj' read lock in pub fn retransmit");
|
||||||
(s.table[&s.me].clone(), s.table.values().cloned().collect())
|
(s.table[&s.me].clone(), s.table.values().cloned().collect())
|
||||||
};
|
};
|
||||||
let rblob = blob.read().unwrap();
|
let rblob = blob.read().expect("'blob' read lock in pub fn retransmit");
|
||||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||||
let orders: Vec<_> = table
|
let orders: Vec<_> = table
|
||||||
.iter()
|
.iter()
|
||||||
@ -261,9 +261,10 @@ impl Crdt {
|
|||||||
fn random() -> u64 {
|
fn random() -> u64 {
|
||||||
let rnd = SystemRandom::new();
|
let rnd = SystemRandom::new();
|
||||||
let mut buf = [0u8; 8];
|
let mut buf = [0u8; 8];
|
||||||
rnd.fill(&mut buf).unwrap();
|
rnd.fill(&mut buf).expect("rnd.fill in pub fn random");
|
||||||
let mut rdr = Cursor::new(&buf);
|
let mut rdr = Cursor::new(&buf);
|
||||||
rdr.read_u64::<LittleEndian>().unwrap()
|
rdr.read_u64::<LittleEndian>()
|
||||||
|
.expect("rdr.read_u64 in fn random")
|
||||||
}
|
}
|
||||||
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
|
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
|
||||||
//trace!("get updates since {}", v);
|
//trace!("get updates since {}", v);
|
||||||
@ -287,10 +288,19 @@ impl Crdt {
|
|||||||
return Err(Error::GeneralError);
|
return Err(Error::GeneralError);
|
||||||
}
|
}
|
||||||
let mut n = (Self::random() as usize) % self.table.len();
|
let mut n = (Self::random() as usize) % self.table.len();
|
||||||
while self.table.values().nth(n).unwrap().id == self.me {
|
while self.table
|
||||||
|
.values()
|
||||||
|
.nth(n)
|
||||||
|
.expect("'values().nth(n)' while loop in fn gossip_request")
|
||||||
|
.id == self.me
|
||||||
|
{
|
||||||
n = (Self::random() as usize) % self.table.len();
|
n = (Self::random() as usize) % self.table.len();
|
||||||
}
|
}
|
||||||
let v = self.table.values().nth(n).unwrap().clone();
|
let v = self.table
|
||||||
|
.values()
|
||||||
|
.nth(n)
|
||||||
|
.expect("'values().nth(n)' in fn gossip_request")
|
||||||
|
.clone();
|
||||||
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
|
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
|
||||||
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
|
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
|
||||||
Ok((v.gossip_addr, req))
|
Ok((v.gossip_addr, req))
|
||||||
@ -303,7 +313,9 @@ impl Crdt {
|
|||||||
|
|
||||||
// Lock the object only to do this operation and not for any longer
|
// Lock the object only to do this operation and not for any longer
|
||||||
// especially not when doing the `sock.send_to`
|
// especially not when doing the `sock.send_to`
|
||||||
let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request()?;
|
let (remote_gossip_addr, req) = obj.read()
|
||||||
|
.expect("'obj' read lock in fn run_gossip")
|
||||||
|
.gossip_request()?;
|
||||||
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
||||||
// TODO this will get chatty, so we need to first ask for number of updates since
|
// TODO this will get chatty, so we need to first ask for number of updates since
|
||||||
// then only ask for specific data that we dont have
|
// then only ask for specific data that we dont have
|
||||||
@ -335,7 +347,11 @@ impl Crdt {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
//TODO this should be a tuned parameter
|
//TODO this should be a tuned parameter
|
||||||
sleep(obj.read().unwrap().timeout);
|
sleep(
|
||||||
|
obj.read()
|
||||||
|
.expect("'obj' read lock in pub fn gossip")
|
||||||
|
.timeout,
|
||||||
|
);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -353,18 +369,25 @@ impl Crdt {
|
|||||||
trace!("RequestUpdates {}", v);
|
trace!("RequestUpdates {}", v);
|
||||||
let addr = reqdata.gossip_addr;
|
let addr = reqdata.gossip_addr;
|
||||||
// only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from`
|
// only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from`
|
||||||
let (from, ups, data) = obj.read().unwrap().get_updates_since(v);
|
let (from, ups, data) = obj.read()
|
||||||
|
.expect("'obj' read lock in RequestUpdates")
|
||||||
|
.get_updates_since(v);
|
||||||
trace!("get updates since response {} {}", v, data.len());
|
trace!("get updates since response {} {}", v, data.len());
|
||||||
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
|
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
|
||||||
trace!("send_to {}", addr);
|
trace!("send_to {}", addr);
|
||||||
//TODO verify reqdata belongs to sender
|
//TODO verify reqdata belongs to sender
|
||||||
obj.write().unwrap().insert(reqdata);
|
obj.write()
|
||||||
sock.send_to(&rsp, addr).unwrap();
|
.expect("'obj' write lock in RequestUpdates")
|
||||||
|
.insert(reqdata);
|
||||||
|
sock.send_to(&rsp, addr)
|
||||||
|
.expect("'sock.send_to' in RequestUpdates");
|
||||||
trace!("send_to done!");
|
trace!("send_to done!");
|
||||||
}
|
}
|
||||||
Protocol::ReceiveUpdates(from, ups, data) => {
|
Protocol::ReceiveUpdates(from, ups, data) => {
|
||||||
trace!("ReceivedUpdates");
|
trace!("ReceivedUpdates");
|
||||||
obj.write().unwrap().apply_updates(from, ups, &data);
|
obj.write()
|
||||||
|
.expect("'obj' write lock in ReceiveUpdates")
|
||||||
|
.apply_updates(from, ups, &data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -374,7 +397,8 @@ impl Crdt {
|
|||||||
sock: UdpSocket,
|
sock: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
sock.set_read_timeout(Some(Duration::new(2, 0))).unwrap();
|
sock.set_read_timeout(Some(Duration::new(2, 0)))
|
||||||
|
.expect("'sock.set_read_timeout' in crdt.rs");
|
||||||
spawn(move || loop {
|
spawn(move || loop {
|
||||||
let _ = Self::run_listen(&obj, &sock);
|
let _ = Self::run_listen(&obj, &sock);
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
10
src/ecdsa.rs
10
src/ecdsa.rs
@ -59,7 +59,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
|
|||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|p| {
|
.map(|p| {
|
||||||
p.read()
|
p.read()
|
||||||
.unwrap()
|
.expect("'p' read lock in ed25519_verify")
|
||||||
.packets
|
.packets
|
||||||
.par_iter()
|
.par_iter()
|
||||||
.map(verify_packet)
|
.map(verify_packet)
|
||||||
@ -78,7 +78,11 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
|
|||||||
let mut rvs = Vec::new();
|
let mut rvs = Vec::new();
|
||||||
|
|
||||||
for packets in batches {
|
for packets in batches {
|
||||||
locks.push(packets.read().unwrap());
|
locks.push(
|
||||||
|
packets
|
||||||
|
.read()
|
||||||
|
.expect("'packets' read lock in pub fn ed25519_verify"),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
let mut num = 0;
|
let mut num = 0;
|
||||||
for p in locks {
|
for p in locks {
|
||||||
@ -135,8 +139,8 @@ mod tests {
|
|||||||
use packet::{Packet, Packets, SharedPackets};
|
use packet::{Packet, Packets, SharedPackets};
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use thin_client_service::Request;
|
use thin_client_service::Request;
|
||||||
use transaction::Transaction;
|
|
||||||
use transaction::test_tx;
|
use transaction::test_tx;
|
||||||
|
use transaction::Transaction;
|
||||||
|
|
||||||
fn make_packet_from_transaction(tr: Transaction) -> Packet {
|
fn make_packet_from_transaction(tr: Transaction) -> Packet {
|
||||||
let tx = serialize(&Request::Transaction(tr)).unwrap();
|
let tx = serialize(&Request::Transaction(tr)).unwrap();
|
||||||
|
@ -164,10 +164,14 @@ pub fn generate_coding(
|
|||||||
let mut coding_ptrs: Vec<&mut [u8]> = Vec::new();
|
let mut coding_ptrs: Vec<&mut [u8]> = Vec::new();
|
||||||
for i in consumed..consumed + NUM_DATA {
|
for i in consumed..consumed + NUM_DATA {
|
||||||
let n = i % window.len();
|
let n = i % window.len();
|
||||||
data_blobs.push(window[n].clone().unwrap());
|
data_blobs.push(
|
||||||
|
window[n]
|
||||||
|
.clone()
|
||||||
|
.expect("'data_blobs' arr in pub fn generate_coding"),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
for b in &data_blobs {
|
for b in &data_blobs {
|
||||||
data_locks.push(b.write().unwrap());
|
data_locks.push(b.write().expect("'b' write lock in pub fn generate_coding"));
|
||||||
}
|
}
|
||||||
for (i, l) in data_locks.iter_mut().enumerate() {
|
for (i, l) in data_locks.iter_mut().enumerate() {
|
||||||
trace!("i: {} data: {}", i, l.data[0]);
|
trace!("i: {} data: {}", i, l.data[0]);
|
||||||
@ -180,10 +184,17 @@ pub fn generate_coding(
|
|||||||
for i in coding_start..coding_end {
|
for i in coding_start..coding_end {
|
||||||
let n = i % window.len();
|
let n = i % window.len();
|
||||||
window[n] = re.allocate();
|
window[n] = re.allocate();
|
||||||
coding_blobs.push(window[n].clone().unwrap());
|
coding_blobs.push(
|
||||||
|
window[n]
|
||||||
|
.clone()
|
||||||
|
.expect("'coding_blobs' arr in pub fn generate_coding"),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
for b in &coding_blobs {
|
for b in &coding_blobs {
|
||||||
coding_locks.push(b.write().unwrap());
|
coding_locks.push(
|
||||||
|
b.write()
|
||||||
|
.expect("'coding_locks' arr in pub fn generate_coding"),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
for (i, l) in coding_locks.iter_mut().enumerate() {
|
for (i, l) in coding_locks.iter_mut().enumerate() {
|
||||||
trace!("i: {} data: {}", i, l.data[0]);
|
trace!("i: {} data: {}", i, l.data[0]);
|
||||||
@ -231,7 +242,7 @@ pub fn recover(
|
|||||||
let j = i % window.len();
|
let j = i % window.len();
|
||||||
let mut b = &mut window[j];
|
let mut b = &mut window[j];
|
||||||
if b.is_some() {
|
if b.is_some() {
|
||||||
blobs.push(b.clone().unwrap());
|
blobs.push(b.clone().expect("'blobs' arr in pb fn recover"));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let n = re.allocate();
|
let n = re.allocate();
|
||||||
@ -244,7 +255,7 @@ pub fn recover(
|
|||||||
trace!("erasures: {:?}", erasures);
|
trace!("erasures: {:?}", erasures);
|
||||||
//lock everything
|
//lock everything
|
||||||
for b in &blobs {
|
for b in &blobs {
|
||||||
locks.push(b.write().unwrap());
|
locks.push(b.write().expect("'locks' arr in pb fn recover"));
|
||||||
}
|
}
|
||||||
for (i, l) in locks.iter_mut().enumerate() {
|
for (i, l) in locks.iter_mut().enumerate() {
|
||||||
if i >= NUM_DATA {
|
if i >= NUM_DATA {
|
||||||
|
@ -24,7 +24,7 @@ pub enum Event {
|
|||||||
impl Event {
|
impl Event {
|
||||||
/// Create and sign a new Witness Timestamp. Used for unit-testing.
|
/// Create and sign a new Witness Timestamp. Used for unit-testing.
|
||||||
pub fn new_timestamp(from: &KeyPair, dt: DateTime<Utc>) -> Self {
|
pub fn new_timestamp(from: &KeyPair, dt: DateTime<Utc>) -> Self {
|
||||||
let sign_data = serialize(&dt).unwrap();
|
let sign_data = serialize(&dt).expect("serialize 'dt' in pub fn new_timestamp");
|
||||||
let sig = Signature::clone_from_slice(from.sign(&sign_data).as_ref());
|
let sig = Signature::clone_from_slice(from.sign(&sign_data).as_ref());
|
||||||
Event::Timestamp {
|
Event::Timestamp {
|
||||||
from: from.pubkey(),
|
from: from.pubkey(),
|
||||||
@ -49,7 +49,10 @@ impl Event {
|
|||||||
match *self {
|
match *self {
|
||||||
Event::Transaction(ref tr) => tr.verify_sig(),
|
Event::Transaction(ref tr) => tr.verify_sig(),
|
||||||
Event::Signature { from, tx_sig, sig } => sig.verify(&from, &tx_sig),
|
Event::Signature { from, tx_sig, sig } => sig.verify(&from, &tx_sig),
|
||||||
Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).unwrap()),
|
Event::Timestamp { from, dt, sig } => sig.verify(
|
||||||
|
&from,
|
||||||
|
&serialize(&dt).expect("serialize 'dt' in pub fn verify"),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
//! The `hash` module provides functions for creating SHA-256 hashes.
|
//! The `hash` module provides functions for creating SHA-256 hashes.
|
||||||
|
|
||||||
use generic_array::GenericArray;
|
|
||||||
use generic_array::typenum::U32;
|
use generic_array::typenum::U32;
|
||||||
|
use generic_array::GenericArray;
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
pub type Hash = GenericArray<u8, U32>;
|
pub type Hash = GenericArray<u8, U32>;
|
||||||
|
@ -4,8 +4,8 @@
|
|||||||
use entry::Entry;
|
use entry::Entry;
|
||||||
use hash::Hash;
|
use hash::Hash;
|
||||||
use recorder::{ExitReason, Recorder, Signal};
|
use recorder::{ExitReason, Recorder, Signal};
|
||||||
use std::sync::Mutex;
|
|
||||||
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
|
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
|
||||||
|
use std::sync::Mutex;
|
||||||
use std::thread::{spawn, JoinHandle};
|
use std::thread::{spawn, JoinHandle};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
@ -52,7 +52,10 @@ impl Historian {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn receive(self: &Self) -> Result<Entry, TryRecvError> {
|
pub fn receive(self: &Self) -> Result<Entry, TryRecvError> {
|
||||||
self.output.lock().unwrap().try_recv()
|
self.output
|
||||||
|
.lock()
|
||||||
|
.expect("'output' lock in pub fn receive")
|
||||||
|
.try_recv()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
11
src/mint.rs
11
src/mint.rs
@ -1,7 +1,7 @@
|
|||||||
//! The `mint` module is a library for generating the chain's genesis block.
|
//! The `mint` module is a library for generating the chain's genesis block.
|
||||||
|
|
||||||
use entry::Entry;
|
|
||||||
use entry::create_entry;
|
use entry::create_entry;
|
||||||
|
use entry::Entry;
|
||||||
use event::Event;
|
use event::Event;
|
||||||
use hash::{hash, Hash};
|
use hash::{hash, Hash};
|
||||||
use ring::rand::SystemRandom;
|
use ring::rand::SystemRandom;
|
||||||
@ -19,8 +19,11 @@ pub struct Mint {
|
|||||||
impl Mint {
|
impl Mint {
|
||||||
pub fn new(tokens: i64) -> Self {
|
pub fn new(tokens: i64) -> Self {
|
||||||
let rnd = SystemRandom::new();
|
let rnd = SystemRandom::new();
|
||||||
let pkcs8 = KeyPair::generate_pkcs8(&rnd).unwrap().to_vec();
|
let pkcs8 = KeyPair::generate_pkcs8(&rnd)
|
||||||
let keypair = KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap();
|
.expect("generate_pkcs8 in mint pub fn new")
|
||||||
|
.to_vec();
|
||||||
|
let keypair =
|
||||||
|
KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in mint pub fn new");
|
||||||
let pubkey = keypair.pubkey();
|
let pubkey = keypair.pubkey();
|
||||||
Mint {
|
Mint {
|
||||||
pkcs8,
|
pkcs8,
|
||||||
@ -38,7 +41,7 @@ impl Mint {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn keypair(&self) -> KeyPair {
|
pub fn keypair(&self) -> KeyPair {
|
||||||
KeyPair::from_pkcs8(Input::from(&self.pkcs8)).unwrap()
|
KeyPair::from_pkcs8(Input::from(&self.pkcs8)).expect("from_pkcs8 in mint pub fn keypair")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn pubkey(&self) -> PublicKey {
|
pub fn pubkey(&self) -> PublicKey {
|
||||||
|
@ -156,12 +156,12 @@ impl<T: Default> Clone for Recycler<T> {
|
|||||||
|
|
||||||
impl<T: Default> Recycler<T> {
|
impl<T: Default> Recycler<T> {
|
||||||
pub fn allocate(&self) -> Arc<RwLock<T>> {
|
pub fn allocate(&self) -> Arc<RwLock<T>> {
|
||||||
let mut gc = self.gc.lock().expect("recycler lock");
|
let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate");
|
||||||
gc.pop()
|
gc.pop()
|
||||||
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default())))
|
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default())))
|
||||||
}
|
}
|
||||||
pub fn recycle(&self, msgs: Arc<RwLock<T>>) {
|
pub fn recycle(&self, msgs: Arc<RwLock<T>>) {
|
||||||
let mut gc = self.gc.lock().expect("recycler lock");
|
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
|
||||||
gc.push(msgs);
|
gc.push(msgs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -264,7 +264,7 @@ impl Blob {
|
|||||||
for i in 0..NUM_BLOBS {
|
for i in 0..NUM_BLOBS {
|
||||||
let r = re.allocate();
|
let r = re.allocate();
|
||||||
{
|
{
|
||||||
let mut p = r.write().unwrap();
|
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
|
||||||
match socket.recv_from(&mut p.data) {
|
match socket.recv_from(&mut p.data) {
|
||||||
Err(_) if i > 0 => {
|
Err(_) if i > 0 => {
|
||||||
trace!("got {:?} messages", i);
|
trace!("got {:?} messages", i);
|
||||||
@ -294,7 +294,7 @@ impl Blob {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
while let Some(r) = v.pop_front() {
|
while let Some(r) = v.pop_front() {
|
||||||
{
|
{
|
||||||
let p = r.read().unwrap();
|
let p = r.read().expect("'r' read lock in pub fn send_to");
|
||||||
let a = p.meta.addr();
|
let a = p.meta.addr();
|
||||||
socket.send_to(&p.data[..p.meta.size], &a)?;
|
socket.send_to(&p.data[..p.meta.size], &a)?;
|
||||||
}
|
}
|
||||||
|
@ -78,9 +78,9 @@ mod tests {
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::mpsc::RecvError;
|
use std::sync::mpsc::RecvError;
|
||||||
use std::sync::mpsc::RecvTimeoutError;
|
use std::sync::mpsc::RecvTimeoutError;
|
||||||
use std::sync::mpsc::channel;
|
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
fn addr_parse_error() -> Result<SocketAddr> {
|
fn addr_parse_error() -> Result<SocketAddr> {
|
||||||
|
@ -24,8 +24,10 @@ impl KeyPairUtil for Ed25519KeyPair {
|
|||||||
/// Return a new ED25519 keypair
|
/// Return a new ED25519 keypair
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
let rng = rand::SystemRandom::new();
|
let rng = rand::SystemRandom::new();
|
||||||
let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng).unwrap();
|
let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng)
|
||||||
signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)).unwrap()
|
.expect("generate_pkcs8 in signature pb fn new");
|
||||||
|
signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes))
|
||||||
|
.expect("from_pcks8 in signature pb fn new")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the public key for the given keypair
|
/// Return the public key for the given keypair
|
||||||
|
@ -27,7 +27,10 @@ fn recv_loop(
|
|||||||
let msgs = re.allocate();
|
let msgs = re.allocate();
|
||||||
let msgs_ = msgs.clone();
|
let msgs_ = msgs.clone();
|
||||||
loop {
|
loop {
|
||||||
match msgs.write().unwrap().recv_from(sock) {
|
match msgs.write()
|
||||||
|
.expect("write lock in fn recv_loop")
|
||||||
|
.recv_from(sock)
|
||||||
|
{
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
channel.send(msgs_)?;
|
channel.send(msgs_)?;
|
||||||
break;
|
break;
|
||||||
@ -136,7 +139,10 @@ fn recv_window(
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let mut dq = r.recv_timeout(timer)?;
|
let mut dq = r.recv_timeout(timer)?;
|
||||||
let leader_id = crdt.read().unwrap().leader_data().id;
|
let leader_id = crdt.read()
|
||||||
|
.expect("'crdt' read lock in fn recv_window")
|
||||||
|
.leader_data()
|
||||||
|
.id;
|
||||||
while let Ok(mut nq) = r.try_recv() {
|
while let Ok(mut nq) = r.try_recv() {
|
||||||
dq.append(&mut nq)
|
dq.append(&mut nq)
|
||||||
}
|
}
|
||||||
@ -144,17 +150,17 @@ fn recv_window(
|
|||||||
//retransmit all leader blocks
|
//retransmit all leader blocks
|
||||||
let mut retransmitq = VecDeque::new();
|
let mut retransmitq = VecDeque::new();
|
||||||
for b in &dq {
|
for b in &dq {
|
||||||
let p = b.read().unwrap();
|
let p = b.read().expect("'b' read lock in fn recv_window");
|
||||||
//TODO this check isn't safe against adverserial packets
|
//TODO this check isn't safe against adverserial packets
|
||||||
//we need to maintain a sequence window
|
//we need to maintain a sequence window
|
||||||
trace!(
|
trace!(
|
||||||
"idx: {} addr: {:?} id: {:?} leader: {:?}",
|
"idx: {} addr: {:?} id: {:?} leader: {:?}",
|
||||||
p.get_index().unwrap(),
|
p.get_index().expect("get_index in fn recv_window"),
|
||||||
p.get_id().unwrap(),
|
p.get_id().expect("get_id in trace! fn recv_window"),
|
||||||
p.meta.addr(),
|
p.meta.addr(),
|
||||||
leader_id
|
leader_id
|
||||||
);
|
);
|
||||||
if p.get_id().unwrap() == leader_id {
|
if p.get_id().expect("get_id in fn recv_window") == leader_id {
|
||||||
//TODO
|
//TODO
|
||||||
//need to copy the retransmited blob
|
//need to copy the retransmited blob
|
||||||
//otherwise we get into races with which thread
|
//otherwise we get into races with which thread
|
||||||
@ -164,7 +170,7 @@ fn recv_window(
|
|||||||
//is dropped via a weakref to the recycler
|
//is dropped via a weakref to the recycler
|
||||||
let nv = recycler.allocate();
|
let nv = recycler.allocate();
|
||||||
{
|
{
|
||||||
let mut mnv = nv.write().unwrap();
|
let mut mnv = nv.write().expect("recycler write lock in fn recv_window");
|
||||||
let sz = p.meta.size;
|
let sz = p.meta.size;
|
||||||
mnv.meta.size = sz;
|
mnv.meta.size = sz;
|
||||||
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
||||||
@ -180,7 +186,7 @@ fn recv_window(
|
|||||||
let mut contq = VecDeque::new();
|
let mut contq = VecDeque::new();
|
||||||
while let Some(b) = dq.pop_front() {
|
while let Some(b) = dq.pop_front() {
|
||||||
let b_ = b.clone();
|
let b_ = b.clone();
|
||||||
let p = b.write().unwrap();
|
let p = b.write().expect("'b' write lock in fn recv_window");
|
||||||
let pix = p.get_index()? as usize;
|
let pix = p.get_index()? as usize;
|
||||||
let w = pix % NUM_BLOBS;
|
let w = pix % NUM_BLOBS;
|
||||||
//TODO, after the block are authenticated
|
//TODO, after the block are authenticated
|
||||||
@ -199,7 +205,7 @@ fn recv_window(
|
|||||||
if window[k].is_none() {
|
if window[k].is_none() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
contq.push_back(window[k].clone().unwrap());
|
contq.push_back(window[k].clone().expect("clone in fn recv_window"));
|
||||||
window[k] = None;
|
window[k] = None;
|
||||||
*consumed += 1;
|
*consumed += 1;
|
||||||
}
|
}
|
||||||
@ -457,8 +463,8 @@ mod test {
|
|||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer::{BlobReceiver, PacketReceiver};
|
|
||||||
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
|
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
|
||||||
|
use streamer::{BlobReceiver, PacketReceiver};
|
||||||
|
|
||||||
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||||
for _t in 0..5 {
|
for _t in 0..5 {
|
||||||
|
@ -40,7 +40,7 @@ impl ThinClient {
|
|||||||
pub fn init(&self) {
|
pub fn init(&self) {
|
||||||
let subscriptions = vec![Subscription::EntryInfo];
|
let subscriptions = vec![Subscription::EntryInfo];
|
||||||
let req = Request::Subscribe { subscriptions };
|
let req = Request::Subscribe { subscriptions };
|
||||||
let data = serialize(&req).expect("serialize Subscribe");
|
let data = serialize(&req).expect("serialize Subscribe in thin_client");
|
||||||
trace!("subscribing to {}", self.addr);
|
trace!("subscribing to {}", self.addr);
|
||||||
let _res = self.socket.send_to(&data, &self.addr);
|
let _res = self.socket.send_to(&data, &self.addr);
|
||||||
}
|
}
|
||||||
@ -50,7 +50,7 @@ impl ThinClient {
|
|||||||
info!("start recv_from");
|
info!("start recv_from");
|
||||||
self.socket.recv_from(&mut buf)?;
|
self.socket.recv_from(&mut buf)?;
|
||||||
info!("end recv_from");
|
info!("end recv_from");
|
||||||
let resp = deserialize(&buf).expect("deserialize balance");
|
let resp = deserialize(&buf).expect("deserialize balance in thin_client");
|
||||||
Ok(resp)
|
Ok(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -72,7 +72,7 @@ impl ThinClient {
|
|||||||
/// does not wait for a response.
|
/// does not wait for a response.
|
||||||
pub fn transfer_signed(&self, tr: Transaction) -> io::Result<usize> {
|
pub fn transfer_signed(&self, tr: Transaction) -> io::Result<usize> {
|
||||||
let req = Request::Transaction(tr);
|
let req = Request::Transaction(tr);
|
||||||
let data = serialize(&req).unwrap();
|
let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed");
|
||||||
self.socket.send_to(&data, &self.addr)
|
self.socket.send_to(&data, &self.addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,10 +95,10 @@ impl ThinClient {
|
|||||||
pub fn get_balance(&mut self, pubkey: &PublicKey) -> io::Result<i64> {
|
pub fn get_balance(&mut self, pubkey: &PublicKey) -> io::Result<i64> {
|
||||||
info!("get_balance");
|
info!("get_balance");
|
||||||
let req = Request::GetBalance { key: *pubkey };
|
let req = Request::GetBalance { key: *pubkey };
|
||||||
let data = serialize(&req).expect("serialize GetBalance");
|
let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance");
|
||||||
self.socket
|
self.socket
|
||||||
.send_to(&data, &self.addr)
|
.send_to(&data, &self.addr)
|
||||||
.expect("buffer error");
|
.expect("buffer error in pub fn get_balance");
|
||||||
let mut done = false;
|
let mut done = false;
|
||||||
while !done {
|
while !done {
|
||||||
let resp = self.recv_response()?;
|
let resp = self.recv_response()?;
|
||||||
@ -124,7 +124,8 @@ impl ThinClient {
|
|||||||
// Wait for at least one EntryInfo.
|
// Wait for at least one EntryInfo.
|
||||||
let mut done = false;
|
let mut done = false;
|
||||||
while !done {
|
while !done {
|
||||||
let resp = self.recv_response().expect("recv response");
|
let resp = self.recv_response()
|
||||||
|
.expect("recv_response in pub fn transaction_count");
|
||||||
if let &Response::EntryInfo(_) = &resp {
|
if let &Response::EntryInfo(_) = &resp {
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
@ -132,14 +133,18 @@ impl ThinClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Then take the rest.
|
// Then take the rest.
|
||||||
self.socket.set_nonblocking(true).expect("set nonblocking");
|
self.socket
|
||||||
|
.set_nonblocking(true)
|
||||||
|
.expect("set_nonblocking in pub fn transaction_count");
|
||||||
loop {
|
loop {
|
||||||
match self.recv_response() {
|
match self.recv_response() {
|
||||||
Err(_) => break,
|
Err(_) => break,
|
||||||
Ok(resp) => self.process_response(resp),
|
Ok(resp) => self.process_response(resp),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.socket.set_nonblocking(false).expect("set blocking");
|
self.socket
|
||||||
|
.set_nonblocking(false)
|
||||||
|
.expect("set_nonblocking in pub fn transaction_count");
|
||||||
self.num_events
|
self.num_events
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,8 @@ pub fn duration_as_s(d: &Duration) -> f32 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn timestamp() -> u64 {
|
pub fn timestamp() -> u64 {
|
||||||
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
|
let now = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.expect("create timestamp in timing");
|
||||||
return duration_as_ms(&now);
|
return duration_as_ms(&now);
|
||||||
}
|
}
|
||||||
|
36
src/tpu.rs
36
src/tpu.rs
@ -12,8 +12,8 @@ use rand::{thread_rng, Rng};
|
|||||||
use result::Result;
|
use result::Result;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io::Write;
|
|
||||||
use std::io::sink;
|
use std::io::sink;
|
||||||
|
use std::io::Write;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{channel, Sender};
|
use std::sync::mpsc::{channel, Sender};
|
||||||
@ -48,10 +48,10 @@ impl Tpu {
|
|||||||
.accountant
|
.accountant
|
||||||
.register_entry_id(&entry.id);
|
.register_entry_id(&entry.id);
|
||||||
writeln!(
|
writeln!(
|
||||||
writer.lock().unwrap(),
|
writer.lock().expect("'writer' lock in fn fn write_entry"),
|
||||||
"{}",
|
"{}",
|
||||||
serde_json::to_string(&entry).unwrap()
|
serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry")
|
||||||
).unwrap();
|
).expect("writeln! in fn write_entry");
|
||||||
self.thin_client_service
|
self.thin_client_service
|
||||||
.notify_entry_info_subscribers(&entry);
|
.notify_entry_info_subscribers(&entry);
|
||||||
}
|
}
|
||||||
@ -62,11 +62,16 @@ impl Tpu {
|
|||||||
let entry = self.accounting_stage
|
let entry = self.accounting_stage
|
||||||
.output
|
.output
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.expect("'ouput' lock in fn receive_all")
|
||||||
.recv_timeout(Duration::new(1, 0))?;
|
.recv_timeout(Duration::new(1, 0))?;
|
||||||
self.write_entry(writer, &entry);
|
self.write_entry(writer, &entry);
|
||||||
l.push(entry);
|
l.push(entry);
|
||||||
while let Ok(entry) = self.accounting_stage.output.lock().unwrap().try_recv() {
|
while let Ok(entry) = self.accounting_stage
|
||||||
|
.output
|
||||||
|
.lock()
|
||||||
|
.expect("'output' lock in fn write_entries")
|
||||||
|
.try_recv()
|
||||||
|
{
|
||||||
self.write_entry(writer, &entry);
|
self.write_entry(writer, &entry);
|
||||||
l.push(entry);
|
l.push(entry);
|
||||||
}
|
}
|
||||||
@ -130,7 +135,10 @@ impl Tpu {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let r = ecdsa::ed25519_verify(&batch);
|
let r = ecdsa::ed25519_verify(&batch);
|
||||||
let res = batch.into_iter().zip(r).collect();
|
let res = batch.into_iter().zip(r).collect();
|
||||||
sendr.lock().unwrap().send(res)?;
|
sendr
|
||||||
|
.lock()
|
||||||
|
.expect("lock in fn verify_batch in tpu")
|
||||||
|
.send(res)?;
|
||||||
// TODO: fix error handling here?
|
// TODO: fix error handling here?
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -139,7 +147,9 @@ impl Tpu {
|
|||||||
recvr: &Arc<Mutex<streamer::PacketReceiver>>,
|
recvr: &Arc<Mutex<streamer::PacketReceiver>>,
|
||||||
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
|
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (batch, len) = streamer::recv_batch(&recvr.lock().unwrap())?;
|
let (batch, len) =
|
||||||
|
streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let batch_len = batch.len();
|
let batch_len = batch.len();
|
||||||
let rand_id = thread_rng().gen_range(0, 100);
|
let rand_id = thread_rng().gen_range(0, 100);
|
||||||
@ -150,7 +160,7 @@ impl Tpu {
|
|||||||
rand_id
|
rand_id
|
||||||
);
|
);
|
||||||
|
|
||||||
Self::verify_batch(batch, sendr).unwrap();
|
Self::verify_batch(batch, sendr).expect("verify_batch in fn verifier");
|
||||||
|
|
||||||
let total_time_ms = timing::duration_as_ms(&now.elapsed());
|
let total_time_ms = timing::duration_as_ms(&now.elapsed());
|
||||||
let total_time_s = timing::duration_as_s(&now.elapsed());
|
let total_time_s = timing::duration_as_s(&now.elapsed());
|
||||||
@ -314,8 +324,12 @@ impl Tpu {
|
|||||||
) -> Result<Vec<JoinHandle<()>>> {
|
) -> Result<Vec<JoinHandle<()>>> {
|
||||||
//replicate pipeline
|
//replicate pipeline
|
||||||
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
|
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
|
||||||
crdt.write().unwrap().set_leader(leader.id);
|
crdt.write()
|
||||||
crdt.write().unwrap().insert(leader);
|
.expect("'crdt' write lock in pub fn replicate")
|
||||||
|
.set_leader(leader.id);
|
||||||
|
crdt.write()
|
||||||
|
.expect("'crdt' write lock before insert() in pub fn replicate")
|
||||||
|
.insert(leader);
|
||||||
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
|
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
|
||||||
let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());
|
let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());
|
||||||
|
|
||||||
|
@ -70,7 +70,7 @@ impl Transaction {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn get_sign_data(&self) -> Vec<u8> {
|
fn get_sign_data(&self) -> Vec<u8> {
|
||||||
serialize(&(&self.data)).unwrap()
|
serialize(&(&self.data)).expect("serialize TransactionData in fn get_sign_data")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sign this transaction.
|
/// Sign this transaction.
|
||||||
|
Reference in New Issue
Block a user