From 0d4a2c5eb0ab98ea165278746fab1efae2a9b133 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Wed, 18 Sep 2019 12:16:22 -0700 Subject: [PATCH] simplify poh recorder => broadcast channel (#5940) * simplify poh recorder broadcast channel * fixup * fixup --- banking_bench/src/main.rs | 12 +- core/benches/banking_stage.rs | 11 +- core/src/banking_stage.rs | 52 ++++---- core/src/broadcast_stage.rs | 14 +- .../broadcast_fake_blobs_run.rs | 13 +- core/src/broadcast_stage/broadcast_utils.rs | 120 +++++++----------- .../fail_entry_verification_broadcast_run.rs | 17 +-- .../broadcast_stage/standard_broadcast_run.rs | 10 +- core/src/poh_recorder.rs | 65 +++++----- core/src/poh_service.rs | 47 ++++--- core/src/tpu.rs | 4 +- programs/bpf/Cargo.lock | 6 +- 12 files changed, 168 insertions(+), 203 deletions(-) diff --git a/banking_bench/src/main.rs b/banking_bench/src/main.rs index 544d350121..18d49fea43 100644 --- a/banking_bench/src/main.rs +++ b/banking_bench/src/main.rs @@ -14,7 +14,7 @@ use solana_core::cluster_info::Node; use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_core::packet::to_packets_chunked; use solana_core::poh_recorder::PohRecorder; -use solana_core::poh_recorder::WorkingBankEntries; +use solana_core::poh_recorder::WorkingBankEntry; use solana_core::service::Service; use solana_measure::measure::Measure; use solana_runtime::bank::Bank; @@ -33,7 +33,7 @@ use std::thread::sleep; use std::time::{Duration, Instant}; fn check_txs( - receiver: &Arc>, + receiver: &Arc>, ref_tx_count: usize, poh_recorder: &Arc>, ) -> bool { @@ -41,11 +41,9 @@ fn check_txs( let now = Instant::now(); let mut no_bank = false; loop { - let entries = receiver.recv_timeout(Duration::from_millis(10)); - if let Ok((_, entries)) = entries { - for (entry, _) in &entries { - total += entry.transactions.len(); - } + if let Ok((_bank, (entry, _tick_count))) = receiver.recv_timeout(Duration::from_millis(10)) + { + total += entry.transactions.len(); } if total >= ref_tx_count { break; diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 9ec955cddc..01154e1d97 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -17,7 +17,7 @@ use solana_core::entry::next_hash; use solana_core::entry::Entry; use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_core::packet::to_packets_chunked; -use solana_core::poh_recorder::WorkingBankEntries; +use solana_core::poh_recorder::WorkingBankEntry; use solana_core::service::Service; use solana_core::test_tx::test_tx; use solana_runtime::bank::Bank; @@ -38,15 +38,12 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use test::Bencher; -fn check_txs(receiver: &Arc>, ref_tx_count: usize) { +fn check_txs(receiver: &Arc>, ref_tx_count: usize) { let mut total = 0; let now = Instant::now(); loop { - let entries = receiver.recv_timeout(Duration::new(1, 0)); - if let Ok((_, entries)) = entries { - for (entry, _) in &entries { - total += entry.transactions.len(); - } + if let Ok((_bank, (entry, _tick_height))) = receiver.recv_timeout(Duration::new(1, 0)) { + total += entry.transactions.len(); } if total >= ref_tx_count { break; diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 6455527a1e..9029f2cc9c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -7,7 +7,7 @@ use crate::entry::hash_transactions; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::packet::PACKETS_PER_BATCH; use crate::packet::{Packet, Packets}; -use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; +use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}; use crate::poh_service::PohService; use crate::result::{Error, Result}; use crate::service::Service; @@ -929,7 +929,7 @@ pub fn create_test_recorder( Arc, Arc>, PohService, - Receiver, + Receiver, ) { let exit = Arc::new(AtomicBool::new(false)); let poh_config = Arc::new(PohConfig::default()); @@ -1038,7 +1038,7 @@ mod tests { trace!("getting entries"); let entries: Vec<_> = entry_receiver .iter() - .flat_map(|x| x.1.into_iter().map(|e| e.0)) + .map(|(_bank, (entry, _tick_height))| entry) .collect(); trace!("done"); assert_eq!(entries.len(), genesis_block.ticks_per_slot as usize - 1); @@ -1126,19 +1126,17 @@ mod tests { bank.process_transaction(&fund_tx).unwrap(); //receive entries + ticks for _ in 0..10 { - let ventries: Vec> = entry_receiver + let entries: Vec = entry_receiver .iter() - .map(|x| x.1.into_iter().map(|e| e.0).collect()) + .map(|(_bank, (entry, _tick_height))| entry) .collect(); - for entries in &ventries { - for entry in entries { - bank.process_transactions(&entry.transactions) - .iter() - .for_each(|x| assert_eq!(*x, Ok(()))); - } - assert!(entries.verify(&blockhash)); - blockhash = entries.last().unwrap().hash; + assert!(entries.verify(&blockhash)); + blockhash = entries.last().unwrap().hash; + for entry in entries { + bank.process_transactions(&entry.transactions) + .iter() + .for_each(|x| assert_eq!(*x, Ok(()))); } if bank.get_balance(&to) == 1 { @@ -1239,7 +1237,7 @@ mod tests { // check that the balance is what we expect. let entries: Vec<_> = entry_receiver .iter() - .flat_map(|x| x.1.into_iter().map(|e| e.0)) + .map(|(_bank, (entry, _tick_height))| entry) .collect(); let bank = Bank::new(&genesis_block); @@ -1304,8 +1302,8 @@ mod tests { &results, &poh_recorder, ); - let (_, entries) = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].0.transactions.len(), transactions.len()); + let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); + assert_eq!(entry.transactions.len(), transactions.len()); // InstructionErrors should still be recorded results[0] = Err(TransactionError::InstructionError( @@ -1320,8 +1318,8 @@ mod tests { ); res.unwrap(); assert!(retryable.is_empty()); - let (_, entries) = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].0.transactions.len(), transactions.len()); + let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); + assert_eq!(entry.transactions.len(), transactions.len()); // Other TransactionErrors should not be recorded results[0] = Err(TransactionError::AccountNotFound); @@ -1333,8 +1331,8 @@ mod tests { ); res.unwrap(); assert!(retryable.is_empty()); - let (_, entries) = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1); + let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); + assert_eq!(entry.transactions.len(), transactions.len() - 1); // Once bank is set to a new bank (setting bank.slot() + 1 in record_transactions), // record_transactions should throw MaxHeightReached and return the set of retryable @@ -1629,14 +1627,12 @@ mod tests { let mut done = false; // read entries until I find mine, might be ticks... - while let Ok((_, entries)) = entry_receiver.recv() { - for (entry, _) in entries { - if !entry.is_tick() { - trace!("got entry"); - assert_eq!(entry.transactions.len(), transactions.len()); - assert_eq!(bank.get_balance(&pubkey), 1); - done = true; - } + while let Ok((_bank, (entry, _tick_height))) = entry_receiver.recv() { + if !entry.is_tick() { + trace!("got entry"); + assert_eq!(entry.transactions.len(), transactions.len()); + assert_eq!(bank.get_balance(&pubkey), 1); + done = true; } if done { break; diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 74ae60cdc4..4a91bb3341 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -4,7 +4,7 @@ use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastR use self::standard_broadcast_run::StandardBroadcastRun; use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError}; -use crate::poh_recorder::WorkingBankEntries; +use crate::poh_recorder::WorkingBankEntry; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; @@ -40,7 +40,7 @@ impl BroadcastStageType { &self, sock: UdpSocket, cluster_info: Arc>, - receiver: Receiver, + receiver: Receiver, exit_sender: &Arc, blocktree: &Arc, ) -> BroadcastStage { @@ -79,7 +79,7 @@ trait BroadcastRun { fn run( &mut self, cluster_info: &Arc>, - receiver: &Receiver, + receiver: &Receiver, sock: &UdpSocket, blocktree: &Arc, ) -> Result<()>; @@ -112,7 +112,7 @@ impl BroadcastStage { fn run( sock: &UdpSocket, cluster_info: &Arc>, - receiver: &Receiver, + receiver: &Receiver, blocktree: &Arc, mut broadcast_stage_run: impl BroadcastRun, ) -> BroadcastStageReturnType { @@ -152,7 +152,7 @@ impl BroadcastStage { fn new( sock: UdpSocket, cluster_info: Arc>, - receiver: Receiver, + receiver: Receiver, exit_sender: &Arc, blocktree: &Arc, broadcast_stage_run: impl BroadcastRun + Send + 'static, @@ -213,7 +213,7 @@ mod test { fn setup_dummy_broadcast_service( leader_pubkey: &Pubkey, ledger_path: &Path, - entry_receiver: Receiver, + entry_receiver: Receiver, ) -> MockBroadcastStage { // Make the database ledger let blocktree = Arc::new(Blocktree::open(ledger_path).unwrap()); @@ -280,7 +280,7 @@ mod test { let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default()); for (i, tick) in ticks.into_iter().enumerate() { entry_sender - .send((bank.clone(), vec![(tick, i as u64 + 1)])) + .send((bank.clone(), (tick, i as u64 + 1))) .expect("Expect successful send to broadcast service"); } } diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs index 6e4fe818ee..401da166bb 100644 --- a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -20,12 +20,12 @@ impl BroadcastRun for BroadcastFakeBlobsRun { fn run( &mut self, cluster_info: &Arc>, - receiver: &Receiver, + receiver: &Receiver, sock: &UdpSocket, blocktree: &Arc, ) -> Result<()> { // 1) Pull entries from banking stage - let receive_results = broadcast_utils::recv_slot_shreds(receiver)?; + let receive_results = broadcast_utils::recv_slot_entries(receiver)?; let bank = receive_results.bank.clone(); let last_tick = receive_results.last_tick; @@ -36,8 +36,9 @@ impl BroadcastRun for BroadcastFakeBlobsRun { .map(|meta| meta.consumed) .unwrap_or(0); + let num_entries = receive_results.entries.len(); let (_, shred_bufs, _) = broadcast_utils::entries_to_shreds( - receive_results.ventries, + receive_results.entries, bank.slot(), receive_results.last_tick, bank.max_tick_height(), @@ -52,12 +53,12 @@ impl BroadcastRun for BroadcastFakeBlobsRun { self.last_blockhash = bank.parent().unwrap().last_blockhash(); } - let fake_ventries: Vec<_> = (0..receive_results.num_entries) - .map(|_| vec![(Entry::new(&self.last_blockhash, 0, vec![]), 0)]) + let fake_entries: Vec<_> = (0..num_entries) + .map(|_| Entry::new(&self.last_blockhash, 0, vec![])) .collect(); let (_fake_shreds, fake_shred_bufs, _) = broadcast_utils::entries_to_shreds( - fake_ventries, + fake_entries, bank.slot(), receive_results.last_tick, bank.max_tick_height(), diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index bb3df6ddcf..f354a70aaf 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -1,5 +1,5 @@ use crate::entry::Entry; -use crate::poh_recorder::WorkingBankEntries; +use crate::poh_recorder::WorkingBankEntry; use crate::result::Result; use crate::shred::{Shred, ShredInfo, Shredder, RECOMMENDED_FEC_RATE}; use solana_runtime::bank::Bank; @@ -9,110 +9,82 @@ use std::sync::Arc; use std::time::{Duration, Instant}; pub(super) struct ReceiveResults { - pub ventries: Vec>, - pub num_entries: usize, + pub entries: Vec, pub time_elapsed: Duration, pub bank: Arc, pub last_tick: u64, } -impl ReceiveResults { - pub fn new( - ventries: Vec>, - num_entries: usize, - time_elapsed: Duration, - bank: Arc, - last_tick: u64, - ) -> Self { - Self { - ventries, - num_entries, - time_elapsed, - bank, - last_tick, - } - } -} - -pub(super) fn recv_slot_shreds(receiver: &Receiver) -> Result { +pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result { let timer = Duration::new(1, 0); - let (mut bank, entries) = receiver.recv_timeout(timer)?; + let (bank, (entry, mut last_tick)) = receiver.recv_timeout(timer)?; let recv_start = Instant::now(); + + let mut entries = vec![entry]; + let mut slot = bank.slot(); let mut max_tick_height = bank.max_tick_height(); - let mut num_entries = entries.len(); - let mut ventries = Vec::new(); - let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0); - ventries.push(entries); assert!(last_tick <= max_tick_height); + if last_tick != max_tick_height { - while let Ok((same_bank, entries)) = receiver.try_recv() { + while let Ok((bank, (entry, tick_height))) = receiver.try_recv() { // If the bank changed, that implies the previous slot was interrupted and we do not have to // broadcast its entries. - if same_bank.slot() != bank.slot() { - num_entries = 0; - ventries.clear(); - bank = same_bank.clone(); + if bank.slot() != slot { + entries.clear(); + slot = bank.slot(); max_tick_height = bank.max_tick_height(); } - num_entries += entries.len(); - last_tick = entries.last().map(|v| v.1).unwrap_or(0); - ventries.push(entries); - assert!(last_tick <= max_tick_height,); + last_tick = tick_height; + entries.push(entry); + + assert!(last_tick <= max_tick_height); if last_tick == max_tick_height { break; } } } - let recv_end = recv_start.elapsed(); - let receive_results = ReceiveResults::new(ventries, num_entries, recv_end, bank, last_tick); - Ok(receive_results) + let time_elapsed = recv_start.elapsed(); + Ok(ReceiveResults { + entries, + time_elapsed, + bank, + last_tick, + }) } pub(super) fn entries_to_shreds( - ventries: Vec>, - slot: u64, + entries: Vec, last_tick: u64, + slot: u64, bank_max_tick: u64, keypair: &Arc, - mut latest_shred_index: u64, + latest_shred_index: u64, parent_slot: u64, ) -> (Vec, Vec, u64) { - let mut all_shred_bufs = vec![]; - let mut all_shreds = vec![]; - let num_ventries = ventries.len(); - ventries - .into_iter() - .enumerate() - .for_each(|(i, entries_tuple)| { - let (entries, _): (Vec<_>, Vec<_>) = entries_tuple.into_iter().unzip(); - //entries - let mut shredder = Shredder::new( - slot, - parent_slot, - RECOMMENDED_FEC_RATE, - keypair, - latest_shred_index as u32, - ) - .expect("Expected to create a new shredder"); + let mut shredder = Shredder::new( + slot, + parent_slot, + RECOMMENDED_FEC_RATE, + keypair, + latest_shred_index as u32, + ) + .expect("Expected to create a new shredder"); - bincode::serialize_into(&mut shredder, &entries) - .expect("Expect to write all entries to shreds"); + bincode::serialize_into(&mut shredder, &entries) + .expect("Expect to write all entries to shreds"); - if i == (num_ventries - 1) && last_tick == bank_max_tick { - shredder.finalize_slot(); - } else { - shredder.finalize_data(); - } + if last_tick == bank_max_tick { + shredder.finalize_slot(); + } else { + shredder.finalize_data(); + } - let (mut shreds, mut shred_bufs): (Vec, Vec) = - shredder.shred_tuples.into_iter().unzip(); + let (shreds, shred_infos): (Vec, Vec) = + shredder.shred_tuples.into_iter().unzip(); - trace!("Inserting {:?} shreds in blocktree", shreds.len()); - latest_shred_index = u64::from(shredder.index); - all_shreds.append(&mut shreds); - all_shred_bufs.append(&mut shred_bufs); - }); - (all_shreds, all_shred_bufs, latest_shred_index) + trace!("Inserting {:?} shreds in blocktree", shreds.len()); + + (shreds, shred_infos, u64::from(shredder.index)) } diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 765380355a..d5529084d0 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -13,25 +13,20 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { fn run( &mut self, cluster_info: &Arc>, - receiver: &Receiver, + receiver: &Receiver, sock: &UdpSocket, blocktree: &Arc, ) -> Result<()> { // 1) Pull entries from banking stage - let mut receive_results = broadcast_utils::recv_slot_shreds(receiver)?; + let mut receive_results = broadcast_utils::recv_slot_entries(receiver)?; let bank = receive_results.bank.clone(); let last_tick = receive_results.last_tick; // 2) Convert entries to blobs + generate coding blobs. Set a garbage PoH on the last entry // in the slot to make verification fail on validators if last_tick == bank.max_tick_height() { - let mut last_entry = receive_results - .ventries - .last_mut() - .unwrap() - .last_mut() - .unwrap(); - last_entry.0.hash = Hash::default(); + let mut last_entry = receive_results.entries.last_mut().unwrap(); + last_entry.hash = Hash::default(); } let keypair = &cluster_info.read().unwrap().keypair.clone(); @@ -42,9 +37,9 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { .unwrap_or(0); let (shreds, shred_infos, _) = broadcast_utils::entries_to_shreds( - receive_results.ventries, - bank.slot(), + receive_results.entries, last_tick, + bank.slot(), bank.max_tick_height(), keypair, latest_blob_index, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index bedda4c06c..f99899df33 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -52,14 +52,14 @@ impl BroadcastRun for StandardBroadcastRun { fn run( &mut self, cluster_info: &Arc>, - receiver: &Receiver, + receiver: &Receiver, sock: &UdpSocket, blocktree: &Arc, ) -> Result<()> { // 1) Pull entries from banking stage - let receive_results = broadcast_utils::recv_slot_shreds(receiver)?; + let receive_results = broadcast_utils::recv_slot_entries(receiver)?; let receive_elapsed = receive_results.time_elapsed; - let num_entries = receive_results.num_entries; + let num_entries = receive_results.entries.len(); let bank = receive_results.bank.clone(); let last_tick = receive_results.last_tick; inc_new_counter_info!("broadcast_service-entries_received", num_entries); @@ -80,9 +80,9 @@ impl BroadcastRun for StandardBroadcastRun { }; let (all_shreds, shred_infos, latest_shred_index) = entries_to_shreds( - receive_results.ventries, - bank.slot(), + receive_results.entries, last_tick, + bank.slot(), bank.max_tick_height(), keypair, latest_shred_index, diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 49c64f562a..80ee717976 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -24,7 +24,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing; use solana_sdk::transaction::Transaction; use std::cmp; -use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; +use std::sync::mpsc::{channel, Receiver, SendError, Sender, SyncSender}; use std::sync::{Arc, Mutex}; use std::time::Instant; @@ -38,7 +38,7 @@ pub enum PohRecorderError { MinHeightNotReached, } -pub type WorkingBankEntries = (Arc, Vec<(Entry, u64)>); +pub type WorkingBankEntry = (Arc, (Entry, u64)); #[derive(Clone)] pub struct WorkingBank { @@ -55,7 +55,7 @@ pub struct PohRecorder { start_tick: u64, // first tick this recorder will observe tick_cache: Vec<(Entry, u64)>, working_bank: Option, - sender: Sender, + sender: Sender, start_leader_at_tick: Option, last_leader_tick: u64, // zero if none grace_ticks: u64, @@ -254,7 +254,9 @@ impl PohRecorder { .iter() .take_while(|x| x.1 <= working_bank.max_tick_height) .count(); - let send_result = if entry_count > 0 { + let mut send_result: std::result::Result<(), SendError> = Ok(()); + + if entry_count > 0 { trace!( "flush_cache: bank_slot: {} tick_height: {} max: {} sending: {}", working_bank.bank.slot(), @@ -262,15 +264,15 @@ impl PohRecorder { working_bank.max_tick_height, entry_count, ); - let cache = &self.tick_cache[..entry_count]; - for t in cache { - working_bank.bank.register_tick(&t.0.hash); + + for tick in &self.tick_cache[..entry_count] { + working_bank.bank.register_tick(&tick.0.hash); + send_result = self.sender.send((working_bank.bank.clone(), tick.clone())); + if send_result.is_err() { + break; + } } - self.sender - .send((working_bank.bank.clone(), cache.to_vec())) - } else { - Ok(()) - }; + } if self.tick_height >= working_bank.max_tick_height { info!( "poh_record: max_tick_height {} reached, clearing working_bank {}", @@ -360,7 +362,7 @@ impl PohRecorder { transactions, }; self.sender - .send((working_bank.bank.clone(), vec![(entry, self.tick_height)]))?; + .send((working_bank.bank.clone(), (entry, self.tick_height)))?; return Ok(()); } // record() might fail if the next PoH hash needs to be a tick. But that's ok, tick() @@ -381,7 +383,7 @@ impl PohRecorder { clear_bank_signal: Option>, leader_schedule_cache: &Arc, poh_config: &Arc, - ) -> (Self, Receiver) { + ) -> (Self, Receiver) { let poh = Arc::new(Mutex::new(Poh::new( last_entry_hash, poh_config.hashes_per_tick, @@ -425,7 +427,7 @@ impl PohRecorder { blocktree: &Arc, leader_schedule_cache: &Arc, poh_config: &Arc, - ) -> (Self, Receiver) { + ) -> (Self, Receiver) { Self::new_with_clear_signal( tick_height, last_entry_hash, @@ -603,9 +605,12 @@ mod tests { poh_recorder.tick(); assert_eq!(poh_recorder.tick_height, 3); assert_eq!(poh_recorder.tick_cache.len(), 0); - let (bank_, e) = entry_receiver.recv().expect("recv 1"); - assert_eq!(e.len(), 3); - assert_eq!(bank_.slot(), bank.slot()); + let mut num_entries = 0; + while let Ok((wbank, (_entry, _tick_height))) = entry_receiver.try_recv() { + assert_eq!(wbank.slot(), bank.slot()); + num_entries += 1; + } + assert_eq!(num_entries, 3); assert!(poh_recorder.working_bank.is_none()); } Blocktree::destroy(&ledger_path).unwrap(); @@ -649,8 +654,11 @@ mod tests { assert_eq!(poh_recorder.tick_height, 5); assert!(poh_recorder.working_bank.is_none()); - let (_, e) = entry_receiver.recv().expect("recv 1"); - assert_eq!(e.len(), 3); + let mut num_entries = 0; + while let Ok(_) = entry_receiver.try_recv() { + num_entries += 1; + } + assert_eq!(num_entries, 3); } Blocktree::destroy(&ledger_path).unwrap(); } @@ -771,11 +779,10 @@ mod tests { assert_eq!(poh_recorder.tick_cache.len(), 0); //tick in the cache + entry - let (_b, e) = entry_receiver.recv().expect("recv 1"); - assert_eq!(e.len(), 1); - assert!(e[0].0.is_tick()); - let (_b, e) = entry_receiver.recv().expect("recv 2"); - assert!(!e[0].0.is_tick()); + let (_bank, (e, _tick_height)) = entry_receiver.recv().expect("recv 1"); + assert!(e.is_tick()); + let (_bank, (e, _tick_height)) = entry_receiver.recv().expect("recv 2"); + assert!(!e.is_tick()); } Blocktree::destroy(&ledger_path).unwrap(); } @@ -816,10 +823,10 @@ mod tests { .record(bank.slot(), h1, vec![tx.clone()]) .is_err()); - let (_bank, e) = entry_receiver.recv().expect("recv 1"); - assert_eq!(e.len(), 2); - assert!(e[0].0.is_tick()); - assert!(e[1].0.is_tick()); + let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); + assert!(entry.is_tick()); + let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); + assert!(entry.is_tick()); } Blocktree::destroy(&ledger_path).unwrap(); } diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 40c927fd54..798b37ab6a 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -162,34 +162,33 @@ mod tests { let mut need_partial = true; while need_tick || need_entry || need_partial { - for entry in entry_receiver.recv().unwrap().1 { - let entry = &entry.0; - if entry.is_tick() { - assert!( - entry.num_hashes <= poh_config.hashes_per_tick.unwrap(), - format!( - "{} <= {}", - entry.num_hashes, - poh_config.hashes_per_tick.unwrap() - ) - ); + let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); - if entry.num_hashes == poh_config.hashes_per_tick.unwrap() { - need_tick = false; - } else { - need_partial = false; - } + if entry.is_tick() { + assert!( + entry.num_hashes <= poh_config.hashes_per_tick.unwrap(), + format!( + "{} <= {}", + entry.num_hashes, + poh_config.hashes_per_tick.unwrap() + ) + ); - hashes += entry.num_hashes; - - assert_eq!(hashes, poh_config.hashes_per_tick.unwrap()); - - hashes = 0; + if entry.num_hashes == poh_config.hashes_per_tick.unwrap() { + need_tick = false; } else { - assert!(entry.num_hashes >= 1); - need_entry = false; - hashes += entry.num_hashes; + need_partial = false; } + + hashes += entry.num_hashes; + + assert_eq!(hashes, poh_config.hashes_per_tick.unwrap()); + + hashes = 0; + } else { + assert!(entry.num_hashes >= 1); + need_entry = false; + hashes += entry.num_hashes; } } exit.store(true, Ordering::Relaxed); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index bb035f7c3d..69167c11ef 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -7,7 +7,7 @@ use crate::broadcast_stage::{BroadcastStage, BroadcastStageType}; use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::fetch_stage::FetchStage; -use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; +use crate::poh_recorder::{PohRecorder, WorkingBankEntry}; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; use crossbeam_channel::unbounded; @@ -30,7 +30,7 @@ impl Tpu { pub fn new( cluster_info: &Arc>, poh_recorder: &Arc>, - entry_receiver: Receiver, + entry_receiver: Receiver, transactions_sockets: Vec, tpu_forwards_sockets: Vec, broadcast_socket: UdpSocket, diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 8236680bf6..1e5f8b88ed 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -560,7 +560,7 @@ dependencies = [ [[package]] name = "hex" -version = "0.3.2" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -1442,7 +1442,7 @@ dependencies = [ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "generic-array 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)", - "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "hex 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2006,7 +2006,7 @@ dependencies = [ "checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb" "checksum hash32 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "12d790435639c06a7b798af9e1e331ae245b7ef915b92f70a39b4cf8c00686af" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" -"checksum hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" +"checksum hex 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "023b39be39e3a2da62a94feb433e91e8bcd37676fbc8bea371daf52b7a769a3e" "checksum humantime 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" "checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9"