Apply most of clippy's feedback

This commit is contained in:
Greg Fitzgerald
2018-07-11 14:40:46 -06:00
committed by Greg Fitzgerald
parent f98e9aba48
commit 73ae3c3301
24 changed files with 133 additions and 146 deletions

View File

@ -134,7 +134,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let bank = Arc::new(Bank::new(&mint)); let bank = Arc::new(Bank::new(&mint));
let verified_setup: Vec<_> = let verified_setup: Vec<_> =
to_packets_chunked(&packet_recycler, setup_transactions.clone(), tx) to_packets_chunked(&packet_recycler, &setup_transactions.clone(), tx)
.into_iter() .into_iter()
.map(|x| { .map(|x| {
let len = (*x).read().unwrap().packets.len(); let len = (*x).read().unwrap().packets.len();
@ -153,7 +153,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
check_txs(verified_setup_len, &signal_receiver, num_src_accounts); check_txs(verified_setup_len, &signal_receiver, num_src_accounts);
let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), 192) let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192)
.into_iter() .into_iter()
.map(|x| { .map(|x| {
let len = (*x).read().unwrap().packets.len(); let len = (*x).read().unwrap().packets.len();
@ -201,7 +201,7 @@ fn bench_banking_stage_single_from(bencher: &mut Bencher) {
bencher.iter(move || { bencher.iter(move || {
let bank = Arc::new(Bank::new(&mint)); let bank = Arc::new(Bank::new(&mint));
let verified: Vec<_> = to_packets_chunked(&packet_recycler, transactions.clone(), tx) let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), tx)
.into_iter() .into_iter()
.map(|x| { .map(|x| {
let len = (*x).read().unwrap().packets.len(); let len = (*x).read().unwrap().packets.len();

View File

@ -120,11 +120,7 @@ impl Bank {
/// Commit funds to the `payment.to` party. /// Commit funds to the `payment.to` party.
fn apply_payment(&self, payment: &Payment, balances: &mut HashMap<PublicKey, i64>) { fn apply_payment(&self, payment: &Payment, balances: &mut HashMap<PublicKey, i64>) {
if balances.contains_key(&payment.to) { *balances.entry(payment.to).or_insert(0) += payment.tokens;
*balances.get_mut(&payment.to).unwrap() += payment.tokens;
} else {
balances.insert(payment.to, payment.tokens);
}
} }
/// Return the last entry ID registered. /// Return the last entry ID registered.
@ -511,7 +507,7 @@ impl Bank {
let bals = self.balances let bals = self.balances
.read() .read()
.expect("'balances' read lock in get_balance"); .expect("'balances' read lock in get_balance");
bals.get(pubkey).map(|x| *x).unwrap_or(0) bals.get(pubkey).cloned().unwrap_or(0)
} }
pub fn transaction_count(&self) -> usize { pub fn transaction_count(&self) -> usize {

View File

@ -129,7 +129,7 @@ fn generate_and_send_txs(
leader.contact_info.tpu leader.contact_info.tpu
); );
for tx in txs { for tx in txs {
client.transfer_signed(tx.clone()).unwrap(); client.transfer_signed(tx).unwrap();
} }
}); });
println!( println!(

View File

@ -159,7 +159,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
// Return u32 b/c the weighted sampling API from rand::distributions // Return u32 b/c the weighted sampling API from rand::distributions
// only takes u32 for weights // only takes u32 for weights
if weighted_vote >= std::u32::MAX as f64 { if weighted_vote >= f64::from(std::u32::MAX) {
return std::u32::MAX; return std::u32::MAX;
} }
@ -173,7 +173,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> { fn choose_peer<'b>(&self, options: Vec<&'b NodeInfo>) -> Result<&'b NodeInfo> {
if options.len() < 1 { if options.is_empty() {
Err(CrdtError::TooSmall)?; Err(CrdtError::TooSmall)?;
} }

View File

@ -56,7 +56,7 @@ pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
if let Some(addrstr) = optstr { if let Some(addrstr) = optstr {
if let Ok(port) = addrstr.parse() { if let Ok(port) = addrstr.parse() {
let mut addr = daddr.clone(); let mut addr = daddr;
addr.set_port(port); addr.set_port(port);
addr addr
} else if let Ok(addr) = addrstr.parse() { } else if let Ok(addr) = addrstr.parse() {
@ -173,12 +173,12 @@ impl NodeInfo {
make_debug_id(&self.id) make_debug_id(&self.id)
} }
fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr {
let mut nxt_addr = addr.clone(); let mut nxt_addr = *addr;
nxt_addr.set_port(addr.port() + nxt); nxt_addr.set_port(addr.port() + nxt);
nxt_addr nxt_addr
} }
pub fn new_leader_with_pubkey(pubkey: PublicKey, bind_addr: &SocketAddr) -> Self { pub fn new_leader_with_pubkey(pubkey: PublicKey, bind_addr: &SocketAddr) -> Self {
let transactions_addr = bind_addr.clone(); let transactions_addr = *bind_addr;
let gossip_addr = Self::next_port(&bind_addr, 1); let gossip_addr = Self::next_port(&bind_addr, 1);
let replicate_addr = Self::next_port(&bind_addr, 2); let replicate_addr = Self::next_port(&bind_addr, 2);
let requests_addr = Self::next_port(&bind_addr, 3); let requests_addr = Self::next_port(&bind_addr, 3);
@ -201,9 +201,9 @@ impl NodeInfo {
NodeInfo::new( NodeInfo::new(
PublicKey::default(), PublicKey::default(),
gossip_addr, gossip_addr,
daddr.clone(), daddr,
daddr.clone(), daddr,
daddr.clone(), daddr,
daddr, daddr,
) )
} }
@ -341,19 +341,19 @@ impl Crdt {
fn update_leader_liveness(&mut self) { fn update_leader_liveness(&mut self) {
//TODO: (leaders should vote) //TODO: (leaders should vote)
//until then we pet their liveness every time we see some votes from anyone //until then we pet their liveness every time we see some votes from anyone
let ld = self.leader_data().map(|x| x.id.clone()); let ld = self.leader_data().map(|x| x.id);
trace!("leader_id {:?}", ld); trace!("leader_id {:?}", ld);
if let Some(leader_id) = ld { if let Some(leader_id) = ld {
self.update_liveness(leader_id); self.update_liveness(leader_id);
} }
} }
pub fn insert_votes(&mut self, votes: Vec<(PublicKey, Vote, Hash)>) { pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) {
static mut COUNTER_VOTE: Counter = create_counter!("crdt-vote-count", LOG_RATE); static mut COUNTER_VOTE: Counter = create_counter!("crdt-vote-count", LOG_RATE);
inc_counter!(COUNTER_VOTE, votes.len()); inc_counter!(COUNTER_VOTE, votes.len());
if votes.len() > 0 { if !votes.is_empty() {
info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len()); info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len());
} }
for v in &votes { for v in votes {
self.insert_vote(&v.0, &v.1, v.2); self.insert_vote(&v.0, &v.1, v.2);
} }
} }
@ -371,7 +371,7 @@ impl Crdt {
); );
self.update_index += 1; self.update_index += 1;
let _ = self.table.insert(v.id.clone(), v.clone()); let _ = self.table.insert(v.id, v.clone());
let _ = self.local.insert(v.id, self.update_index); let _ = self.local.insert(v.id, self.update_index);
static mut COUNTER_UPDATE: Counter = create_counter!("crdt-update-count", LOG_RATE); static mut COUNTER_UPDATE: Counter = create_counter!("crdt-update-count", LOG_RATE);
inc_counter!(COUNTER_UPDATE, 1); inc_counter!(COUNTER_UPDATE, 1);
@ -449,7 +449,7 @@ impl Crdt {
static mut COUNTER_PURGE: Counter = create_counter!("crdt-purge-count", LOG_RATE); static mut COUNTER_PURGE: Counter = create_counter!("crdt-purge-count", LOG_RATE);
inc_counter!(COUNTER_PURGE, dead_ids.len()); inc_counter!(COUNTER_PURGE, dead_ids.len());
for id in dead_ids.iter() { for id in &dead_ids {
self.alive.remove(id); self.alive.remove(id);
self.table.remove(id); self.table.remove(id);
self.remote.remove(id); self.remote.remove(id);
@ -461,11 +461,7 @@ impl Crdt {
} }
} }
pub fn index_blobs( pub fn index_blobs(me: &NodeInfo, blobs: &[SharedBlob], receive_index: &mut u64) -> Result<()> {
me: &NodeInfo,
blobs: &Vec<SharedBlob>,
receive_index: &mut u64,
) -> Result<()> {
// enumerate all the blobs, those are the indices // enumerate all the blobs, those are the indices
trace!("{:x}: INDEX_BLOBS {}", me.debug_id(), blobs.len()); trace!("{:x}: INDEX_BLOBS {}", me.debug_id(), blobs.len());
for (i, b) in blobs.iter().enumerate() { for (i, b) in blobs.iter().enumerate() {
@ -518,13 +514,13 @@ impl Crdt {
/// We need to avoid having obj locked while doing any io, such as the `send_to` /// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast( pub fn broadcast(
me: &NodeInfo, me: &NodeInfo,
broadcast_table: &Vec<NodeInfo>, broadcast_table: &[NodeInfo],
window: &Window, window: &Window,
s: &UdpSocket, s: &UdpSocket,
transmit_index: &mut u64, transmit_index: &mut u64,
received_index: u64, received_index: u64,
) -> Result<()> { ) -> Result<()> {
if broadcast_table.len() < 1 { if broadcast_table.is_empty() {
warn!("{:x}:not enough peers in crdt table", me.debug_id()); warn!("{:x}:not enough peers in crdt table", me.debug_id());
Err(CrdtError::TooSmall)?; Err(CrdtError::TooSmall)?;
} }
@ -676,7 +672,7 @@ impl Crdt {
Err(CrdtError::TooSmall)?; Err(CrdtError::TooSmall)?;
} }
let n = (Self::random() as usize) % valid.len(); let n = (Self::random() as usize) % valid.len();
let addr = valid[n].contact_info.ncp.clone(); let addr = valid[n].contact_info.ncp;
let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix); let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix);
let out = serialize(&req)?; let out = serialize(&req)?;
Ok((addr, out)) Ok((addr, out))
@ -768,7 +764,7 @@ impl Crdt {
} }
let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect(); let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect();
let my_id = self.debug_id(); let my_id = self.debug_id();
for x in sorted.iter() { for x in &sorted {
trace!( trace!(
"{:x}: sorted leaders {:x} votes: {}", "{:x}: sorted leaders {:x} votes: {}",
my_id, my_id,
@ -784,10 +780,8 @@ impl Crdt {
/// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet /// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet
fn update_leader(&mut self) { fn update_leader(&mut self) {
if let Some(leader_id) = self.top_leader() { if let Some(leader_id) = self.top_leader() {
if self.my_data().leader_id != leader_id { if self.my_data().leader_id != leader_id && self.table.get(&leader_id).is_some() {
if self.table.get(&leader_id).is_some() { self.set_leader(leader_id);
self.set_leader(leader_id);
}
} }
} }
} }
@ -822,7 +816,9 @@ impl Crdt {
continue; continue;
} }
let liveness_entry = self.external_liveness.entry(*pk).or_insert(HashMap::new()); let liveness_entry = self.external_liveness
.entry(*pk)
.or_insert_with(HashMap::new);
let peer_index = *liveness_entry.entry(from).or_insert(*external_remote_index); let peer_index = *liveness_entry.entry(from).or_insert(*external_remote_index);
if *external_remote_index > peer_index { if *external_remote_index > peer_index {
liveness_entry.insert(from, *external_remote_index); liveness_entry.insert(from, *external_remote_index);
@ -854,7 +850,7 @@ impl Crdt {
obj.write().unwrap().purge(timestamp()); obj.write().unwrap().purge(timestamp());
//TODO: possibly tune this parameter //TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep //we saw a deadlock passing an obj.read().unwrap().timeout into sleep
let _ = obj.write().unwrap().update_leader(); obj.write().unwrap().update_leader();
let elapsed = timestamp() - start; let elapsed = timestamp() - start;
if GOSSIP_SLEEP_MILLIS > elapsed { if GOSSIP_SLEEP_MILLIS > elapsed {
let time_left = GOSSIP_SLEEP_MILLIS - elapsed; let time_left = GOSSIP_SLEEP_MILLIS - elapsed;
@ -948,10 +944,7 @@ impl Crdt {
let me = obj.read().unwrap(); let me = obj.read().unwrap();
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = me.get_updates_since(v); let (from, ups, data) = me.get_updates_since(v);
let external_liveness = me.remote let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect();
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
drop(me); drop(me);
trace!("get updates since response {} {}", v, data.len()); trace!("get updates since response {} {}", v, data.len());
let len = data.len(); let len = data.len();
@ -1091,6 +1084,12 @@ pub struct TestNode {
pub sockets: Sockets, pub sockets: Sockets,
} }
impl Default for TestNode {
fn default() -> Self {
Self::new()
}
}
impl TestNode { impl TestNode {
pub fn new() -> Self { pub fn new() -> Self {
let pubkey = KeyPair::new().pubkey(); let pubkey = KeyPair::new().pubkey();
@ -1115,7 +1114,7 @@ impl TestNode {
repair.local_addr().unwrap(), repair.local_addr().unwrap(),
); );
TestNode { TestNode {
data: data, data,
sockets: Sockets { sockets: Sockets {
gossip, gossip,
gossip_send, gossip_send,
@ -1130,19 +1129,19 @@ impl TestNode {
} }
} }
pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> TestNode { pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> TestNode {
let mut local_gossip_addr = bind_addr.clone(); let mut local_gossip_addr = bind_addr;
local_gossip_addr.set_port(data.contact_info.ncp.port()); local_gossip_addr.set_port(data.contact_info.ncp.port());
let mut local_replicate_addr = bind_addr.clone(); let mut local_replicate_addr = bind_addr;
local_replicate_addr.set_port(data.contact_info.tvu.port()); local_replicate_addr.set_port(data.contact_info.tvu.port());
let mut local_requests_addr = bind_addr.clone(); let mut local_requests_addr = bind_addr;
local_requests_addr.set_port(data.contact_info.rpu.port()); local_requests_addr.set_port(data.contact_info.rpu.port());
let mut local_transactions_addr = bind_addr.clone(); let mut local_transactions_addr = bind_addr;
local_transactions_addr.set_port(data.contact_info.tpu.port()); local_transactions_addr.set_port(data.contact_info.tpu.port());
let mut local_repair_addr = bind_addr.clone(); let mut local_repair_addr = bind_addr;
local_repair_addr.set_port(data.contact_info.tvu_window.port()); local_repair_addr.set_port(data.contact_info.tvu_window.port());
let transaction = UdpSocket::bind(local_transactions_addr).unwrap(); let transaction = UdpSocket::bind(local_transactions_addr).unwrap();
@ -1160,7 +1159,7 @@ impl TestNode {
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
TestNode { TestNode {
data: data, data,
sockets: Sockets { sockets: Sockets {
gossip, gossip,
gossip_send, gossip_send,
@ -1300,7 +1299,7 @@ mod tests {
}; };
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(100));
let votes = vec![(d.id.clone(), vote_new_version_old_addrs, Hash::default())]; let votes = vec![(d.id.clone(), vote_new_version_old_addrs, Hash::default())];
crdt.insert_votes(votes); crdt.insert_votes(&votes);
let updated = crdt.alive[&leader.id]; let updated = crdt.alive[&leader.id];
//should be accepted, since the update is for the same address field as the one we know //should be accepted, since the update is for the same address field as the one we know
assert_eq!(crdt.table[&d.id].version, 1); assert_eq!(crdt.table[&d.id].version, 1);

View File

@ -112,7 +112,7 @@ impl Drone {
airdrop_request_amount, airdrop_request_amount,
client_public_key, client_public_key,
} => { } => {
request_amount = airdrop_request_amount.clone(); request_amount = airdrop_request_amount;
tx = Transaction::new( tx = Transaction::new(
&self.mint_keypair, &self.mint_keypair,
client_public_key, client_public_key,
@ -136,7 +136,7 @@ impl Drone {
) )
.to_owned(), .to_owned(),
); );
client.transfer_signed(tx) client.transfer_signed(&tx)
} else { } else {
Err(Error::new(ErrorKind::Other, "token limit reached")) Err(Error::new(ErrorKind::Other, "token limit reached"))
} }

View File

@ -144,7 +144,7 @@ fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -
/// Creates the next Tick or Transaction Entry `num_hashes` after `start_hash`. /// Creates the next Tick or Transaction Entry `num_hashes` after `start_hash`.
pub fn next_entry(start_hash: &Hash, num_hashes: u64, transactions: Vec<Transaction>) -> Entry { pub fn next_entry(start_hash: &Hash, num_hashes: u64, transactions: Vec<Transaction>) -> Entry {
assert!(num_hashes > 0 || transactions.len() == 0); assert!(num_hashes > 0 || transactions.is_empty());
Entry { Entry {
num_hashes, num_hashes,
id: next_hash(start_hash, num_hashes, &transactions), id: next_hash(start_hash, num_hashes, &transactions),

View File

@ -99,7 +99,7 @@ impl FullNode {
"starting... local gossip address: {} (advertising {})", "starting... local gossip address: {} (advertising {})",
local_gossip_addr, node.data.contact_info.ncp local_gossip_addr, node.data.contact_info.ncp
); );
let requests_addr = node.data.contact_info.rpu.clone(); let requests_addr = node.data.contact_info.rpu;
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
if !leader { if !leader {
let testnet_addr = network_entry_for_validator.expect("validator requires entry"); let testnet_addr = network_entry_for_validator.expect("validator requires entry");
@ -121,7 +121,7 @@ impl FullNode {
); );
server server
} else { } else {
node.data.leader_id = node.data.id.clone(); node.data.leader_id = node.data.id;
let outfile_for_leader: Box<Write + Send> = match outfile_for_leader { let outfile_for_leader: Box<Write + Send> = match outfile_for_leader {
Some(OutFile::Path(file)) => Box::new( Some(OutFile::Path(file)) => Box::new(
OpenOptions::new() OpenOptions::new()
@ -218,11 +218,11 @@ impl FullNode {
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
let (tpu, blob_receiver) = Tpu::new( let (tpu, blob_receiver) = Tpu::new(
bank.clone(), &bank.clone(),
crdt.clone(), &crdt.clone(),
tick_duration, tick_duration,
node.sockets.transaction, node.sockets.transaction,
blob_recycler.clone(), &blob_recycler.clone(),
exit.clone(), exit.clone(),
writer, writer,
); );

View File

@ -9,6 +9,6 @@ static INIT: Once = ONCE_INIT;
/// Setup function that is only run once, even if called multiple times. /// Setup function that is only run once, even if called multiple times.
pub fn setup() { pub fn setup() {
INIT.call_once(|| { INIT.call_once(|| {
let _ = env_logger::init(); env_logger::init();
}); });
} }

View File

@ -36,10 +36,11 @@ impl InfluxDbMetricsWriter {
} }
fn build_client() -> Option<influxdb::Client> { fn build_client() -> Option<influxdb::Client> {
let host = env::var("INFLUX_HOST").unwrap_or("https://metrics.solana.com:8086".to_string()); let host = env::var("INFLUX_HOST")
let db = env::var("INFLUX_DATABASE").unwrap_or("scratch".to_string()); .unwrap_or_else(|_| "https://metrics.solana.com:8086".to_string());
let username = env::var("INFLUX_USERNAME").unwrap_or("scratch_writer".to_string()); let db = env::var("INFLUX_DATABASE").unwrap_or_else(|_| "scratch".to_string());
let password = env::var("INFLUX_PASSWORD").unwrap_or("topsecret".to_string()); let username = env::var("INFLUX_USERNAME").unwrap_or_else(|_| "scratch_writer".to_string());
let password = env::var("INFLUX_PASSWORD").unwrap_or_else(|_| "topsecret".to_string());
debug!("InfluxDB host={} db={} username={}", host, db, username); debug!("InfluxDB host={} db={} username={}", host, db, username);
let mut client = influxdb::Client::new_with_option(host, db, None) let mut client = influxdb::Client::new_with_option(host, db, None)
@ -120,13 +121,11 @@ impl MetricsAgent {
} }
let now = Instant::now(); let now = Instant::now();
if now.duration_since(last_write_time) >= write_frequency { if now.duration_since(last_write_time) >= write_frequency && !points.is_empty() {
if !points.is_empty() { debug!("run: writing {} points", points.len());
debug!("run: writing {} points", points.len()); writer.write(points);
writer.write(points); points = Vec::new();
points = Vec::new(); last_write_time = now;
last_write_time = now;
}
} }
} }
trace!("run: exit"); trace!("run: exit");

View File

@ -92,7 +92,7 @@ pub fn udp_public_bind(label: &str, startport: u16, endport: u16) -> UdpSocketPa
// //
// TODO: Remove the |sender| socket and deal with the downstream changes to // TODO: Remove the |sender| socket and deal with the downstream changes to
// the UDP signalling // the UDP signalling
let mut local_addr_sender = local_addr.clone(); let mut local_addr_sender = local_addr;
local_addr_sender.set_port(public_addr.port()); local_addr_sender.set_port(public_addr.port());
UdpSocket::bind(local_addr_sender).unwrap() UdpSocket::bind(local_addr_sender).unwrap()
}; };

View File

@ -240,7 +240,7 @@ impl Packets {
pub fn to_packets_chunked<T: Serialize>( pub fn to_packets_chunked<T: Serialize>(
r: &PacketRecycler, r: &PacketRecycler,
xs: Vec<T>, xs: &[T],
chunks: usize, chunks: usize,
) -> Vec<SharedPackets> { ) -> Vec<SharedPackets> {
let mut out = vec![]; let mut out = vec![];
@ -258,10 +258,10 @@ pub fn to_packets_chunked<T: Serialize>(
} }
out.push(p); out.push(p);
} }
return out; out
} }
pub fn to_packets<T: Serialize>(r: &PacketRecycler, xs: Vec<T>) -> Vec<SharedPackets> { pub fn to_packets<T: Serialize>(r: &PacketRecycler, xs: &[T]) -> Vec<SharedPackets> {
to_packets_chunked(r, xs, NUM_PACKETS) to_packets_chunked(r, xs, NUM_PACKETS)
} }
@ -347,7 +347,7 @@ impl Blob {
} }
pub fn is_coding(&self) -> bool { pub fn is_coding(&self) -> bool {
return (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0; (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0
} }
pub fn set_coding(&mut self) -> Result<()> { pub fn set_coding(&mut self) -> Result<()> {
@ -524,15 +524,15 @@ mod tests {
fn test_to_packets() { fn test_to_packets() {
let tx = Request::GetTransactionCount; let tx = Request::GetTransactionCount;
let re = PacketRecycler::default(); let re = PacketRecycler::default();
let rv = to_packets(&re, vec![tx.clone(); 1]); let rv = to_packets(&re, &vec![tx.clone(); 1]);
assert_eq!(rv.len(), 1); assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), 1); assert_eq!(rv[0].read().unwrap().packets.len(), 1);
let rv = to_packets(&re, vec![tx.clone(); NUM_PACKETS]); let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS]);
assert_eq!(rv.len(), 1); assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
let rv = to_packets(&re, vec![tx.clone(); NUM_PACKETS + 1]); let rv = to_packets(&re, &vec![tx.clone(); NUM_PACKETS + 1]);
assert_eq!(rv.len(), 2); assert_eq!(rv.len(), 2);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
assert_eq!(rv[1].read().unwrap().packets.len(), 1); assert_eq!(rv[1].read().unwrap().packets.len(), 1);

View File

@ -32,7 +32,7 @@ impl RecordStage {
start_hash: &Hash, start_hash: &Hash,
) -> (Self, Receiver<Vec<Entry>>) { ) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone(); let start_hash = *start_hash;
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-record-stage".to_string()) .name("solana-record-stage".to_string())
@ -52,7 +52,7 @@ impl RecordStage {
tick_duration: Duration, tick_duration: Duration,
) -> (Self, Receiver<Vec<Entry>>) { ) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let start_hash = start_hash.clone(); let start_hash = *start_hash;
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-record-stage".to_string()) .name("solana-record-stage".to_string())
@ -60,13 +60,14 @@ impl RecordStage {
let mut recorder = Recorder::new(start_hash); let mut recorder = Recorder::new(start_hash);
let start_time = Instant::now(); let start_time = Instant::now();
loop { loop {
if let Err(_) = Self::try_process_signals( if Self::try_process_signals(
&mut recorder, &mut recorder,
start_time, start_time,
tick_duration, tick_duration,
&signal_receiver, &signal_receiver,
&entry_sender, &entry_sender,
) { ).is_err()
{
return; return;
} }
recorder.hash(); recorder.hash();

View File

@ -66,7 +66,7 @@ impl ReplicateStage {
let shared_blob = blob_recycler.allocate(); let shared_blob = blob_recycler.allocate();
let (vote, addr) = { let (vote, addr) = {
let mut wcrdt = crdt.write().unwrap(); let mut wcrdt = crdt.write().unwrap();
wcrdt.insert_votes(votes); wcrdt.insert_votes(&votes);
//TODO: doesn't seem like there is a synchronous call to get height and id //TODO: doesn't seem like there is a synchronous call to get height and id
info!("replicate_stage {} {:?}", height, &last_id[..8]); info!("replicate_stage {} {:?}", height, &last_id[..8]);
wcrdt.new_vote(height, last_id) wcrdt.new_vote(height, last_id)

View File

@ -59,7 +59,7 @@ fn verify_packet(packet: &Packet) -> u8 {
).is_ok() as u8 ).is_ok() as u8
} }
fn batch_size(batches: &Vec<SharedPackets>) -> usize { fn batch_size(batches: &[SharedPackets]) -> usize {
batches batches
.iter() .iter()
.map(|p| p.read().unwrap().packets.len()) .map(|p| p.read().unwrap().packets.len())

View File

@ -17,27 +17,27 @@ use std::time::Instant;
use streamer::{self, PacketReceiver}; use streamer::{self, PacketReceiver};
use timing; use timing;
pub type VerifiedPackets = Vec<(SharedPackets, Vec<u8>)>;
pub struct SigVerifyStage { pub struct SigVerifyStage {
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
} }
impl SigVerifyStage { impl SigVerifyStage {
pub fn new( pub fn new(packet_receiver: Receiver<SharedPackets>) -> (Self, Receiver<VerifiedPackets>) {
packet_receiver: Receiver<SharedPackets>,
) -> (Self, Receiver<Vec<(SharedPackets, Vec<u8>)>>) {
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let thread_hdls = Self::verifier_services(packet_receiver, verified_sender); let thread_hdls = Self::verifier_services(packet_receiver, verified_sender);
(SigVerifyStage { thread_hdls }, verified_receiver) (SigVerifyStage { thread_hdls }, verified_receiver)
} }
fn verify_batch(batch: Vec<SharedPackets>) -> Vec<(SharedPackets, Vec<u8>)> { fn verify_batch(batch: Vec<SharedPackets>) -> VerifiedPackets {
let r = sigverify::ed25519_verify(&batch); let r = sigverify::ed25519_verify(&batch);
batch.into_iter().zip(r).collect() batch.into_iter().zip(r).collect()
} }
fn verifier( fn verifier(
recvr: &Arc<Mutex<PacketReceiver>>, recvr: &Arc<Mutex<PacketReceiver>>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>, sendr: &Arc<Mutex<Sender<VerifiedPackets>>>,
) -> Result<()> { ) -> Result<()> {
let (batch, len) = let (batch, len) =
streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
@ -74,7 +74,7 @@ impl SigVerifyStage {
fn verifier_service( fn verifier_service(
packet_receiver: Arc<Mutex<PacketReceiver>>, packet_receiver: Arc<Mutex<PacketReceiver>>,
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>, verified_sender: Arc<Mutex<Sender<VerifiedPackets>>>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
if let Err(e) = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()) { if let Err(e) = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()) {
@ -89,7 +89,7 @@ impl SigVerifyStage {
fn verifier_services( fn verifier_services(
packet_receiver: PacketReceiver, packet_receiver: PacketReceiver,
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>, verified_sender: Sender<VerifiedPackets>,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender)); let sender = Arc::new(Mutex::new(verified_sender));
let receiver = Arc::new(Mutex::new(packet_receiver)); let receiver = Arc::new(Mutex::new(packet_receiver));

View File

@ -170,7 +170,7 @@ fn find_next_missing(
let reqs: Vec<_> = (*consumed..*received) let reqs: Vec<_> = (*consumed..*received)
.filter_map(|pix| { .filter_map(|pix| {
let i = (pix % WINDOW_SIZE) as usize; let i = (pix % WINDOW_SIZE) as usize;
if let &None = &window[i] { if window[i].is_none() {
let val = crdt.read().unwrap().window_index_request(pix as u64); let val = crdt.read().unwrap().window_index_request(pix as u64);
if let Ok((to, req)) = val { if let Ok((to, req)) = val {
return Some((to, req)); return Some((to, req));
@ -223,7 +223,7 @@ fn repair_window(
let reqs = find_next_missing(locked_window, crdt, consumed, received)?; let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); trace!("{:x}: repair_window missing: {}", debug_id, reqs.len());
if reqs.len() > 0 { if !reqs.is_empty() {
static mut COUNTER_REPAIR: Counter = static mut COUNTER_REPAIR: Counter =
create_counter!("streamer-repair_window-repair", LOG_RATE); create_counter!("streamer-repair_window-repair", LOG_RATE);
inc_counter!(COUNTER_REPAIR, reqs.len()); inc_counter!(COUNTER_REPAIR, reqs.len());
@ -389,7 +389,7 @@ fn recv_window(
break; break;
} }
let mut is_coding = false; let mut is_coding = false;
if let &Some(ref cblob) = &window[k] { if let Some(ref cblob) = window[k] {
let cblob_r = cblob let cblob_r = cblob
.read() .read()
.expect("blob read lock for flogs streamer::window"); .expect("blob read lock for flogs streamer::window");
@ -461,16 +461,14 @@ fn print_window(debug_id: u64, locked_window: &Window, consumed: u64) {
"_" "_"
} else if v.is_none() { } else if v.is_none() {
"0" "0"
} else { } else if let Some(ref cblob) = v {
if let &Some(ref cblob) = &v { if cblob.read().unwrap().is_coding() {
if cblob.read().unwrap().is_coding() { "C"
"C"
} else {
"1"
}
} else { } else {
"0" "1"
} }
} else {
"0"
} }
}) })
.collect(); .collect();
@ -575,7 +573,7 @@ pub fn window(
fn broadcast( fn broadcast(
me: &NodeInfo, me: &NodeInfo,
broadcast_table: &Vec<NodeInfo>, broadcast_table: &[NodeInfo],
window: &Window, window: &Window,
recycler: &BlobRecycler, recycler: &BlobRecycler,
r: &BlobReceiver, r: &BlobReceiver,

View File

@ -39,7 +39,7 @@ impl ThinClient {
transactions_addr: SocketAddr, transactions_addr: SocketAddr,
transactions_socket: UdpSocket, transactions_socket: UdpSocket,
) -> Self { ) -> Self {
let client = ThinClient { ThinClient {
requests_addr, requests_addr,
requests_socket, requests_socket,
transactions_addr, transactions_addr,
@ -48,8 +48,7 @@ impl ThinClient {
transaction_count: 0, transaction_count: 0,
balances: HashMap::new(), balances: HashMap::new(),
signature_status: false, signature_status: false,
}; }
client
} }
pub fn recv_response(&self) -> io::Result<Response> { pub fn recv_response(&self) -> io::Result<Response> {
@ -60,8 +59,8 @@ impl ThinClient {
deserialize(&buf).or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize"))) deserialize(&buf).or_else(|_| Err(io::Error::new(io::ErrorKind::Other, "deserialize")))
} }
pub fn process_response(&mut self, resp: Response) { pub fn process_response(&mut self, resp: &Response) {
match resp { match *resp {
Response::Balance { key, val } => { Response::Balance { key, val } => {
trace!("Response balance {:?} {:?}", key, val); trace!("Response balance {:?} {:?}", key, val);
self.balances.insert(key, val); self.balances.insert(key, val);
@ -76,13 +75,10 @@ impl ThinClient {
} }
Response::SignatureStatus { signature_status } => { Response::SignatureStatus { signature_status } => {
self.signature_status = signature_status; self.signature_status = signature_status;
match signature_status { if signature_status {
true => { trace!("Response found signature");
trace!("Response found signature"); } else {
} trace!("Response signature not found");
false => {
trace!("Response signature not found");
}
} }
} }
} }
@ -90,7 +86,7 @@ impl ThinClient {
/// Send a signed Transaction to the server for processing. This method /// Send a signed Transaction to the server for processing. This method
/// does not wait for a response. /// does not wait for a response.
pub fn transfer_signed(&self, tx: Transaction) -> io::Result<usize> { pub fn transfer_signed(&self, tx: &Transaction) -> io::Result<usize> {
let data = serialize(&tx).expect("serialize Transaction in pub fn transfer_signed"); let data = serialize(&tx).expect("serialize Transaction in pub fn transfer_signed");
self.transactions_socket self.transactions_socket
.send_to(&data, &self.transactions_addr) .send_to(&data, &self.transactions_addr)
@ -107,7 +103,7 @@ impl ThinClient {
let now = Instant::now(); let now = Instant::now();
let tx = Transaction::new(keypair, to, n, *last_id); let tx = Transaction::new(keypair, to, n, *last_id);
let sig = tx.sig; let sig = tx.sig;
let result = self.transfer_signed(tx).map(|_| sig); let result = self.transfer_signed(&tx).map(|_| sig);
metrics::submit( metrics::submit(
influxdb::Point::new("thinclient") influxdb::Point::new("thinclient")
.add_tag("op", influxdb::Value::String("transfer".to_string())) .add_tag("op", influxdb::Value::String("transfer".to_string()))
@ -137,12 +133,12 @@ impl ThinClient {
if let Response::Balance { key, .. } = &resp { if let Response::Balance { key, .. } = &resp {
done = key == pubkey; done = key == pubkey;
} }
self.process_response(resp); self.process_response(&resp);
} }
self.balances self.balances
.get(pubkey) .get(pubkey)
.map(|x| *x) .cloned()
.ok_or(io::Error::new(io::ErrorKind::Other, "nokey")) .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "nokey"))
} }
/// Request the transaction count. If the response packet is dropped by the network, /// Request the transaction count. If the response packet is dropped by the network,
@ -160,10 +156,10 @@ impl ThinClient {
if let Ok(resp) = self.recv_response() { if let Ok(resp) = self.recv_response() {
info!("recv_response {:?}", resp); info!("recv_response {:?}", resp);
if let &Response::TransactionCount { .. } = &resp { if let Response::TransactionCount { .. } = resp {
done = true; done = true;
} }
self.process_response(resp); self.process_response(&resp);
} }
} }
self.transaction_count self.transaction_count
@ -184,10 +180,10 @@ impl ThinClient {
match self.recv_response() { match self.recv_response() {
Ok(resp) => { Ok(resp) => {
if let &Response::LastId { .. } = &resp { if let Response::LastId { .. } = resp {
done = true; done = true;
} }
self.process_response(resp); self.process_response(&resp);
} }
Err(e) => { Err(e) => {
debug!("thin_client get_last_id error: {}", e); debug!("thin_client get_last_id error: {}", e);
@ -232,10 +228,10 @@ impl ThinClient {
.expect("buffer error in pub fn get_last_id"); .expect("buffer error in pub fn get_last_id");
if let Ok(resp) = self.recv_response() { if let Ok(resp) = self.recv_response() {
if let &Response::SignatureStatus { .. } = &resp { if let Response::SignatureStatus { .. } = resp {
done = true; done = true;
} }
self.process_response(resp); self.process_response(&resp);
} }
} }
metrics::submit( metrics::submit(
@ -355,7 +351,7 @@ mod tests {
let tx = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id); let tx = Transaction::new(&alice.keypair(), bob_pubkey, 500, last_id);
let _sig = client.transfer_signed(tx).unwrap(); let _sig = client.transfer_signed(&tx).unwrap();
let last_id = client.get_last_id(); let last_id = client.get_last_id();
@ -364,7 +360,7 @@ mod tests {
contract.tokens = 502; contract.tokens = 502;
contract.plan = Plan::Budget(Budget::new_payment(502, bob_pubkey)); contract.plan = Plan::Budget(Budget::new_payment(502, bob_pubkey));
} }
let _sig = client.transfer_signed(tr2).unwrap(); let _sig = client.transfer_signed(&tr2).unwrap();
let balance = client.poll_get_balance(&bob_pubkey); let balance = client.poll_get_balance(&bob_pubkey);
assert_eq!(balance.unwrap(), 500); assert_eq!(balance.unwrap(), 500);

View File

@ -3,20 +3,20 @@ use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
pub fn duration_as_us(d: &Duration) -> u64 { pub fn duration_as_us(d: &Duration) -> u64 {
return (d.as_secs() * 1000 * 1000) + (d.subsec_nanos() as u64 / 1_000); (d.as_secs() * 1000 * 1000) + (u64::from(d.subsec_nanos()) / 1_000)
} }
pub fn duration_as_ms(d: &Duration) -> u64 { pub fn duration_as_ms(d: &Duration) -> u64 {
return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000); (d.as_secs() * 1000) + (u64::from(d.subsec_nanos()) / 1_000_000)
} }
pub fn duration_as_s(d: &Duration) -> f32 { pub fn duration_as_s(d: &Duration) -> f32 {
return d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0); d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0)
} }
pub fn timestamp() -> u64 { pub fn timestamp() -> u64 {
let now = SystemTime::now() let now = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.expect("create timestamp in timing"); .expect("create timestamp in timing");
return duration_as_ms(&now); duration_as_ms(&now)
} }

View File

@ -52,11 +52,11 @@ pub struct Tpu {
impl Tpu { impl Tpu {
pub fn new<W: Write + Send + 'static>( pub fn new<W: Write + Send + 'static>(
bank: Arc<Bank>, bank: &Arc<Bank>,
crdt: Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
tick_duration: Option<Duration>, tick_duration: Option<Duration>,
transactions_socket: UdpSocket, transactions_socket: UdpSocket,
blob_recycler: BlobRecycler, blob_recycler: &BlobRecycler,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
writer: W, writer: W,
) -> (Self, BlobReceiver) { ) -> (Self, BlobReceiver) {

View File

@ -89,11 +89,11 @@ impl Tvu {
//the packets coming out of blob_receiver need to be sent to the GPU and verified //the packets coming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction //then sent to the window, which does the erasure coding reconstruction
let (window_stage, blob_window_receiver) = WindowStage::new( let (window_stage, blob_window_receiver) = WindowStage::new(
crdt.clone(), &crdt.clone(),
window, window,
entry_height, entry_height,
retransmit_socket, retransmit_socket,
blob_recycler.clone(), &blob_recycler.clone(),
blob_fetch_receiver, blob_fetch_receiver,
); );

View File

@ -3,17 +3,15 @@ use hash::Hash;
use signature::PublicKey; use signature::PublicKey;
use transaction::{Instruction, Vote}; use transaction::{Instruction, Vote};
pub fn entries_to_votes(entries: &Vec<Entry>) -> Vec<(PublicKey, Vote, Hash)> { pub fn entries_to_votes(entries: &[Entry]) -> Vec<(PublicKey, Vote, Hash)> {
entries entries
.iter() .iter()
.flat_map(|entry| { .flat_map(|entry| {
let vs: Vec<(PublicKey, Vote, Hash)> = entry let vs: Vec<(PublicKey, Vote, Hash)> = entry
.transactions .transactions
.iter() .iter()
.filter_map(|tx| match &tx.instruction { .filter_map(|tx| match tx.instruction {
&Instruction::NewVote(ref vote) => { Instruction::NewVote(ref vote) => Some((tx.from, vote.clone(), tx.last_id)),
Some((tx.from.clone(), vote.clone(), tx.last_id.clone()))
}
_ => None, _ => None,
}) })
.collect(); .collect();

View File

@ -15,11 +15,11 @@ pub struct WindowStage {
impl WindowStage { impl WindowStage {
pub fn new( pub fn new(
crdt: Arc<RwLock<Crdt>>, crdt: &Arc<RwLock<Crdt>>,
window: Window, window: Window,
entry_height: u64, entry_height: u64,
retransmit_socket: UdpSocket, retransmit_socket: UdpSocket,
blob_recycler: BlobRecycler, blob_recycler: &BlobRecycler,
fetch_stage_receiver: BlobReceiver, fetch_stage_receiver: BlobReceiver,
) -> (Self, BlobReceiver) { ) -> (Self, BlobReceiver) {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();

View File

@ -35,7 +35,7 @@ impl WriteStage {
) -> Result<()> { ) -> Result<()> {
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let votes = entries_to_votes(&entries); let votes = entries_to_votes(&entries);
crdt.write().unwrap().insert_votes(votes); crdt.write().unwrap().insert_votes(&votes);
entry_writer.write_and_register_entries(&entries)?; entry_writer.write_and_register_entries(&entries)?;
trace!("New blobs? {}", entries.len()); trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new(); let mut blobs = VecDeque::new();