simplify poh recorder => broadcast channel (#5940)

* simplify poh recorder broadcast channel

* fixup

* fixup
This commit is contained in:
Rob Walker
2019-09-18 12:16:22 -07:00
committed by GitHub
parent 64f23ab26a
commit 0d4a2c5eb0
12 changed files with 168 additions and 203 deletions

View File

@ -14,7 +14,7 @@ use solana_core::cluster_info::Node;
use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use solana_core::packet::to_packets_chunked; use solana_core::packet::to_packets_chunked;
use solana_core::poh_recorder::PohRecorder; use solana_core::poh_recorder::PohRecorder;
use solana_core::poh_recorder::WorkingBankEntries; use solana_core::poh_recorder::WorkingBankEntry;
use solana_core::service::Service; use solana_core::service::Service;
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
@ -33,7 +33,7 @@ use std::thread::sleep;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
fn check_txs( fn check_txs(
receiver: &Arc<Receiver<WorkingBankEntries>>, receiver: &Arc<Receiver<WorkingBankEntry>>,
ref_tx_count: usize, ref_tx_count: usize,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> bool { ) -> bool {
@ -41,12 +41,10 @@ fn check_txs(
let now = Instant::now(); let now = Instant::now();
let mut no_bank = false; let mut no_bank = false;
loop { loop {
let entries = receiver.recv_timeout(Duration::from_millis(10)); if let Ok((_bank, (entry, _tick_count))) = receiver.recv_timeout(Duration::from_millis(10))
if let Ok((_, entries)) = entries { {
for (entry, _) in &entries {
total += entry.transactions.len(); total += entry.transactions.len();
} }
}
if total >= ref_tx_count { if total >= ref_tx_count {
break; break;
} }

View File

@ -17,7 +17,7 @@ use solana_core::entry::next_hash;
use solana_core::entry::Entry; use solana_core::entry::Entry;
use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use solana_core::packet::to_packets_chunked; use solana_core::packet::to_packets_chunked;
use solana_core::poh_recorder::WorkingBankEntries; use solana_core::poh_recorder::WorkingBankEntry;
use solana_core::service::Service; use solana_core::service::Service;
use solana_core::test_tx::test_tx; use solana_core::test_tx::test_tx;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
@ -38,16 +38,13 @@ use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use test::Bencher; use test::Bencher;
fn check_txs(receiver: &Arc<Receiver<WorkingBankEntries>>, ref_tx_count: usize) { fn check_txs(receiver: &Arc<Receiver<WorkingBankEntry>>, ref_tx_count: usize) {
let mut total = 0; let mut total = 0;
let now = Instant::now(); let now = Instant::now();
loop { loop {
let entries = receiver.recv_timeout(Duration::new(1, 0)); if let Ok((_bank, (entry, _tick_height))) = receiver.recv_timeout(Duration::new(1, 0)) {
if let Ok((_, entries)) = entries {
for (entry, _) in &entries {
total += entry.transactions.len(); total += entry.transactions.len();
} }
}
if total >= ref_tx_count { if total >= ref_tx_count {
break; break;
} }

View File

@ -7,7 +7,7 @@ use crate::entry::hash_transactions;
use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::packet::PACKETS_PER_BATCH; use crate::packet::PACKETS_PER_BATCH;
use crate::packet::{Packet, Packets}; use crate::packet::{Packet, Packets};
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry};
use crate::poh_service::PohService; use crate::poh_service::PohService;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
@ -929,7 +929,7 @@ pub fn create_test_recorder(
Arc<AtomicBool>, Arc<AtomicBool>,
Arc<Mutex<PohRecorder>>, Arc<Mutex<PohRecorder>>,
PohService, PohService,
Receiver<WorkingBankEntries>, Receiver<WorkingBankEntry>,
) { ) {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let poh_config = Arc::new(PohConfig::default()); let poh_config = Arc::new(PohConfig::default());
@ -1038,7 +1038,7 @@ mod tests {
trace!("getting entries"); trace!("getting entries");
let entries: Vec<_> = entry_receiver let entries: Vec<_> = entry_receiver
.iter() .iter()
.flat_map(|x| x.1.into_iter().map(|e| e.0)) .map(|(_bank, (entry, _tick_height))| entry)
.collect(); .collect();
trace!("done"); trace!("done");
assert_eq!(entries.len(), genesis_block.ticks_per_slot as usize - 1); assert_eq!(entries.len(), genesis_block.ticks_per_slot as usize - 1);
@ -1126,20 +1126,18 @@ mod tests {
bank.process_transaction(&fund_tx).unwrap(); bank.process_transaction(&fund_tx).unwrap();
//receive entries + ticks //receive entries + ticks
for _ in 0..10 { for _ in 0..10 {
let ventries: Vec<Vec<Entry>> = entry_receiver let entries: Vec<Entry> = entry_receiver
.iter() .iter()
.map(|x| x.1.into_iter().map(|e| e.0).collect()) .map(|(_bank, (entry, _tick_height))| entry)
.collect(); .collect();
for entries in &ventries { assert!(entries.verify(&blockhash));
blockhash = entries.last().unwrap().hash;
for entry in entries { for entry in entries {
bank.process_transactions(&entry.transactions) bank.process_transactions(&entry.transactions)
.iter() .iter()
.for_each(|x| assert_eq!(*x, Ok(()))); .for_each(|x| assert_eq!(*x, Ok(())));
} }
assert!(entries.verify(&blockhash));
blockhash = entries.last().unwrap().hash;
}
if bank.get_balance(&to) == 1 { if bank.get_balance(&to) == 1 {
break; break;
@ -1239,7 +1237,7 @@ mod tests {
// check that the balance is what we expect. // check that the balance is what we expect.
let entries: Vec<_> = entry_receiver let entries: Vec<_> = entry_receiver
.iter() .iter()
.flat_map(|x| x.1.into_iter().map(|e| e.0)) .map(|(_bank, (entry, _tick_height))| entry)
.collect(); .collect();
let bank = Bank::new(&genesis_block); let bank = Bank::new(&genesis_block);
@ -1304,8 +1302,8 @@ mod tests {
&results, &results,
&poh_recorder, &poh_recorder,
); );
let (_, entries) = entry_receiver.recv().unwrap(); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len()); assert_eq!(entry.transactions.len(), transactions.len());
// InstructionErrors should still be recorded // InstructionErrors should still be recorded
results[0] = Err(TransactionError::InstructionError( results[0] = Err(TransactionError::InstructionError(
@ -1320,8 +1318,8 @@ mod tests {
); );
res.unwrap(); res.unwrap();
assert!(retryable.is_empty()); assert!(retryable.is_empty());
let (_, entries) = entry_receiver.recv().unwrap(); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len()); assert_eq!(entry.transactions.len(), transactions.len());
// Other TransactionErrors should not be recorded // Other TransactionErrors should not be recorded
results[0] = Err(TransactionError::AccountNotFound); results[0] = Err(TransactionError::AccountNotFound);
@ -1333,8 +1331,8 @@ mod tests {
); );
res.unwrap(); res.unwrap();
assert!(retryable.is_empty()); assert!(retryable.is_empty());
let (_, entries) = entry_receiver.recv().unwrap(); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1); assert_eq!(entry.transactions.len(), transactions.len() - 1);
// Once bank is set to a new bank (setting bank.slot() + 1 in record_transactions), // Once bank is set to a new bank (setting bank.slot() + 1 in record_transactions),
// record_transactions should throw MaxHeightReached and return the set of retryable // record_transactions should throw MaxHeightReached and return the set of retryable
@ -1629,15 +1627,13 @@ mod tests {
let mut done = false; let mut done = false;
// read entries until I find mine, might be ticks... // read entries until I find mine, might be ticks...
while let Ok((_, entries)) = entry_receiver.recv() { while let Ok((_bank, (entry, _tick_height))) = entry_receiver.recv() {
for (entry, _) in entries {
if !entry.is_tick() { if !entry.is_tick() {
trace!("got entry"); trace!("got entry");
assert_eq!(entry.transactions.len(), transactions.len()); assert_eq!(entry.transactions.len(), transactions.len());
assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(bank.get_balance(&pubkey), 1);
done = true; done = true;
} }
}
if done { if done {
break; break;
} }

View File

@ -4,7 +4,7 @@ use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastR
use self::standard_broadcast_run::StandardBroadcastRun; use self::standard_broadcast_run::StandardBroadcastRun;
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError}; use crate::cluster_info::{ClusterInfo, ClusterInfoError};
use crate::poh_recorder::WorkingBankEntries; use crate::poh_recorder::WorkingBankEntry;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use crate::staking_utils; use crate::staking_utils;
@ -40,7 +40,7 @@ impl BroadcastStageType {
&self, &self,
sock: UdpSocket, sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
receiver: Receiver<WorkingBankEntries>, receiver: Receiver<WorkingBankEntry>,
exit_sender: &Arc<AtomicBool>, exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
) -> BroadcastStage { ) -> BroadcastStage {
@ -79,7 +79,7 @@ trait BroadcastRun {
fn run( fn run(
&mut self, &mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>, receiver: &Receiver<WorkingBankEntry>,
sock: &UdpSocket, sock: &UdpSocket,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
) -> Result<()>; ) -> Result<()>;
@ -112,7 +112,7 @@ impl BroadcastStage {
fn run( fn run(
sock: &UdpSocket, sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>, receiver: &Receiver<WorkingBankEntry>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
mut broadcast_stage_run: impl BroadcastRun, mut broadcast_stage_run: impl BroadcastRun,
) -> BroadcastStageReturnType { ) -> BroadcastStageReturnType {
@ -152,7 +152,7 @@ impl BroadcastStage {
fn new( fn new(
sock: UdpSocket, sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
receiver: Receiver<WorkingBankEntries>, receiver: Receiver<WorkingBankEntry>,
exit_sender: &Arc<AtomicBool>, exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
broadcast_stage_run: impl BroadcastRun + Send + 'static, broadcast_stage_run: impl BroadcastRun + Send + 'static,
@ -213,7 +213,7 @@ mod test {
fn setup_dummy_broadcast_service( fn setup_dummy_broadcast_service(
leader_pubkey: &Pubkey, leader_pubkey: &Pubkey,
ledger_path: &Path, ledger_path: &Path,
entry_receiver: Receiver<WorkingBankEntries>, entry_receiver: Receiver<WorkingBankEntry>,
) -> MockBroadcastStage { ) -> MockBroadcastStage {
// Make the database ledger // Make the database ledger
let blocktree = Arc::new(Blocktree::open(ledger_path).unwrap()); let blocktree = Arc::new(Blocktree::open(ledger_path).unwrap());
@ -280,7 +280,7 @@ mod test {
let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default()); let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default());
for (i, tick) in ticks.into_iter().enumerate() { for (i, tick) in ticks.into_iter().enumerate() {
entry_sender entry_sender
.send((bank.clone(), vec![(tick, i as u64 + 1)])) .send((bank.clone(), (tick, i as u64 + 1)))
.expect("Expect successful send to broadcast service"); .expect("Expect successful send to broadcast service");
} }
} }

View File

@ -20,12 +20,12 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
fn run( fn run(
&mut self, &mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>, receiver: &Receiver<WorkingBankEntry>,
sock: &UdpSocket, sock: &UdpSocket,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
) -> Result<()> { ) -> Result<()> {
// 1) Pull entries from banking stage // 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_shreds(receiver)?; let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
let bank = receive_results.bank.clone(); let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick; let last_tick = receive_results.last_tick;
@ -36,8 +36,9 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
.map(|meta| meta.consumed) .map(|meta| meta.consumed)
.unwrap_or(0); .unwrap_or(0);
let num_entries = receive_results.entries.len();
let (_, shred_bufs, _) = broadcast_utils::entries_to_shreds( let (_, shred_bufs, _) = broadcast_utils::entries_to_shreds(
receive_results.ventries, receive_results.entries,
bank.slot(), bank.slot(),
receive_results.last_tick, receive_results.last_tick,
bank.max_tick_height(), bank.max_tick_height(),
@ -52,12 +53,12 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
self.last_blockhash = bank.parent().unwrap().last_blockhash(); self.last_blockhash = bank.parent().unwrap().last_blockhash();
} }
let fake_ventries: Vec<_> = (0..receive_results.num_entries) let fake_entries: Vec<_> = (0..num_entries)
.map(|_| vec![(Entry::new(&self.last_blockhash, 0, vec![]), 0)]) .map(|_| Entry::new(&self.last_blockhash, 0, vec![]))
.collect(); .collect();
let (_fake_shreds, fake_shred_bufs, _) = broadcast_utils::entries_to_shreds( let (_fake_shreds, fake_shred_bufs, _) = broadcast_utils::entries_to_shreds(
fake_ventries, fake_entries,
bank.slot(), bank.slot(),
receive_results.last_tick, receive_results.last_tick,
bank.max_tick_height(), bank.max_tick_height(),

View File

@ -1,5 +1,5 @@
use crate::entry::Entry; use crate::entry::Entry;
use crate::poh_recorder::WorkingBankEntries; use crate::poh_recorder::WorkingBankEntry;
use crate::result::Result; use crate::result::Result;
use crate::shred::{Shred, ShredInfo, Shredder, RECOMMENDED_FEC_RATE}; use crate::shred::{Shred, ShredInfo, Shredder, RECOMMENDED_FEC_RATE};
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
@ -9,85 +9,60 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
pub(super) struct ReceiveResults { pub(super) struct ReceiveResults {
pub ventries: Vec<Vec<(Entry, u64)>>, pub entries: Vec<Entry>,
pub num_entries: usize,
pub time_elapsed: Duration, pub time_elapsed: Duration,
pub bank: Arc<Bank>, pub bank: Arc<Bank>,
pub last_tick: u64, pub last_tick: u64,
} }
impl ReceiveResults { pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result<ReceiveResults> {
pub fn new(
ventries: Vec<Vec<(Entry, u64)>>,
num_entries: usize,
time_elapsed: Duration,
bank: Arc<Bank>,
last_tick: u64,
) -> Self {
Self {
ventries,
num_entries,
time_elapsed,
bank,
last_tick,
}
}
}
pub(super) fn recv_slot_shreds(receiver: &Receiver<WorkingBankEntries>) -> Result<ReceiveResults> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let (mut bank, entries) = receiver.recv_timeout(timer)?; let (bank, (entry, mut last_tick)) = receiver.recv_timeout(timer)?;
let recv_start = Instant::now(); let recv_start = Instant::now();
let mut entries = vec![entry];
let mut slot = bank.slot();
let mut max_tick_height = bank.max_tick_height(); let mut max_tick_height = bank.max_tick_height();
let mut num_entries = entries.len();
let mut ventries = Vec::new();
let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
assert!(last_tick <= max_tick_height); assert!(last_tick <= max_tick_height);
if last_tick != max_tick_height { if last_tick != max_tick_height {
while let Ok((same_bank, entries)) = receiver.try_recv() { while let Ok((bank, (entry, tick_height))) = receiver.try_recv() {
// If the bank changed, that implies the previous slot was interrupted and we do not have to // If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries. // broadcast its entries.
if same_bank.slot() != bank.slot() { if bank.slot() != slot {
num_entries = 0; entries.clear();
ventries.clear(); slot = bank.slot();
bank = same_bank.clone();
max_tick_height = bank.max_tick_height(); max_tick_height = bank.max_tick_height();
} }
num_entries += entries.len(); last_tick = tick_height;
last_tick = entries.last().map(|v| v.1).unwrap_or(0); entries.push(entry);
ventries.push(entries);
assert!(last_tick <= max_tick_height,); assert!(last_tick <= max_tick_height);
if last_tick == max_tick_height { if last_tick == max_tick_height {
break; break;
} }
} }
} }
let recv_end = recv_start.elapsed(); let time_elapsed = recv_start.elapsed();
let receive_results = ReceiveResults::new(ventries, num_entries, recv_end, bank, last_tick); Ok(ReceiveResults {
Ok(receive_results) entries,
time_elapsed,
bank,
last_tick,
})
} }
pub(super) fn entries_to_shreds( pub(super) fn entries_to_shreds(
ventries: Vec<Vec<(Entry, u64)>>, entries: Vec<Entry>,
slot: u64,
last_tick: u64, last_tick: u64,
slot: u64,
bank_max_tick: u64, bank_max_tick: u64,
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
mut latest_shred_index: u64, latest_shred_index: u64,
parent_slot: u64, parent_slot: u64,
) -> (Vec<Shred>, Vec<ShredInfo>, u64) { ) -> (Vec<Shred>, Vec<ShredInfo>, u64) {
let mut all_shred_bufs = vec![];
let mut all_shreds = vec![];
let num_ventries = ventries.len();
ventries
.into_iter()
.enumerate()
.for_each(|(i, entries_tuple)| {
let (entries, _): (Vec<_>, Vec<_>) = entries_tuple.into_iter().unzip();
//entries
let mut shredder = Shredder::new( let mut shredder = Shredder::new(
slot, slot,
parent_slot, parent_slot,
@ -100,19 +75,16 @@ pub(super) fn entries_to_shreds(
bincode::serialize_into(&mut shredder, &entries) bincode::serialize_into(&mut shredder, &entries)
.expect("Expect to write all entries to shreds"); .expect("Expect to write all entries to shreds");
if i == (num_ventries - 1) && last_tick == bank_max_tick { if last_tick == bank_max_tick {
shredder.finalize_slot(); shredder.finalize_slot();
} else { } else {
shredder.finalize_data(); shredder.finalize_data();
} }
let (mut shreds, mut shred_bufs): (Vec<Shred>, Vec<ShredInfo>) = let (shreds, shred_infos): (Vec<Shred>, Vec<ShredInfo>) =
shredder.shred_tuples.into_iter().unzip(); shredder.shred_tuples.into_iter().unzip();
trace!("Inserting {:?} shreds in blocktree", shreds.len()); trace!("Inserting {:?} shreds in blocktree", shreds.len());
latest_shred_index = u64::from(shredder.index);
all_shreds.append(&mut shreds); (shreds, shred_infos, u64::from(shredder.index))
all_shred_bufs.append(&mut shred_bufs);
});
(all_shreds, all_shred_bufs, latest_shred_index)
} }

View File

@ -13,25 +13,20 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
fn run( fn run(
&mut self, &mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>, receiver: &Receiver<WorkingBankEntry>,
sock: &UdpSocket, sock: &UdpSocket,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
) -> Result<()> { ) -> Result<()> {
// 1) Pull entries from banking stage // 1) Pull entries from banking stage
let mut receive_results = broadcast_utils::recv_slot_shreds(receiver)?; let mut receive_results = broadcast_utils::recv_slot_entries(receiver)?;
let bank = receive_results.bank.clone(); let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick; let last_tick = receive_results.last_tick;
// 2) Convert entries to blobs + generate coding blobs. Set a garbage PoH on the last entry // 2) Convert entries to blobs + generate coding blobs. Set a garbage PoH on the last entry
// in the slot to make verification fail on validators // in the slot to make verification fail on validators
if last_tick == bank.max_tick_height() { if last_tick == bank.max_tick_height() {
let mut last_entry = receive_results let mut last_entry = receive_results.entries.last_mut().unwrap();
.ventries last_entry.hash = Hash::default();
.last_mut()
.unwrap()
.last_mut()
.unwrap();
last_entry.0.hash = Hash::default();
} }
let keypair = &cluster_info.read().unwrap().keypair.clone(); let keypair = &cluster_info.read().unwrap().keypair.clone();
@ -42,9 +37,9 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
.unwrap_or(0); .unwrap_or(0);
let (shreds, shred_infos, _) = broadcast_utils::entries_to_shreds( let (shreds, shred_infos, _) = broadcast_utils::entries_to_shreds(
receive_results.ventries, receive_results.entries,
bank.slot(),
last_tick, last_tick,
bank.slot(),
bank.max_tick_height(), bank.max_tick_height(),
keypair, keypair,
latest_blob_index, latest_blob_index,

View File

@ -52,14 +52,14 @@ impl BroadcastRun for StandardBroadcastRun {
fn run( fn run(
&mut self, &mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>, receiver: &Receiver<WorkingBankEntry>,
sock: &UdpSocket, sock: &UdpSocket,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
) -> Result<()> { ) -> Result<()> {
// 1) Pull entries from banking stage // 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_shreds(receiver)?; let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
let receive_elapsed = receive_results.time_elapsed; let receive_elapsed = receive_results.time_elapsed;
let num_entries = receive_results.num_entries; let num_entries = receive_results.entries.len();
let bank = receive_results.bank.clone(); let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick; let last_tick = receive_results.last_tick;
inc_new_counter_info!("broadcast_service-entries_received", num_entries); inc_new_counter_info!("broadcast_service-entries_received", num_entries);
@ -80,9 +80,9 @@ impl BroadcastRun for StandardBroadcastRun {
}; };
let (all_shreds, shred_infos, latest_shred_index) = entries_to_shreds( let (all_shreds, shred_infos, latest_shred_index) = entries_to_shreds(
receive_results.ventries, receive_results.entries,
bank.slot(),
last_tick, last_tick,
bank.slot(),
bank.max_tick_height(), bank.max_tick_height(),
keypair, keypair,
latest_shred_index, latest_shred_index,

View File

@ -24,7 +24,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing; use solana_sdk::timing;
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::cmp; use std::cmp;
use std::sync::mpsc::{channel, Receiver, Sender, SyncSender}; use std::sync::mpsc::{channel, Receiver, SendError, Sender, SyncSender};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Instant; use std::time::Instant;
@ -38,7 +38,7 @@ pub enum PohRecorderError {
MinHeightNotReached, MinHeightNotReached,
} }
pub type WorkingBankEntries = (Arc<Bank>, Vec<(Entry, u64)>); pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
#[derive(Clone)] #[derive(Clone)]
pub struct WorkingBank { pub struct WorkingBank {
@ -55,7 +55,7 @@ pub struct PohRecorder {
start_tick: u64, // first tick this recorder will observe start_tick: u64, // first tick this recorder will observe
tick_cache: Vec<(Entry, u64)>, tick_cache: Vec<(Entry, u64)>,
working_bank: Option<WorkingBank>, working_bank: Option<WorkingBank>,
sender: Sender<WorkingBankEntries>, sender: Sender<WorkingBankEntry>,
start_leader_at_tick: Option<u64>, start_leader_at_tick: Option<u64>,
last_leader_tick: u64, // zero if none last_leader_tick: u64, // zero if none
grace_ticks: u64, grace_ticks: u64,
@ -254,7 +254,9 @@ impl PohRecorder {
.iter() .iter()
.take_while(|x| x.1 <= working_bank.max_tick_height) .take_while(|x| x.1 <= working_bank.max_tick_height)
.count(); .count();
let send_result = if entry_count > 0 { let mut send_result: std::result::Result<(), SendError<WorkingBankEntry>> = Ok(());
if entry_count > 0 {
trace!( trace!(
"flush_cache: bank_slot: {} tick_height: {} max: {} sending: {}", "flush_cache: bank_slot: {} tick_height: {} max: {} sending: {}",
working_bank.bank.slot(), working_bank.bank.slot(),
@ -262,15 +264,15 @@ impl PohRecorder {
working_bank.max_tick_height, working_bank.max_tick_height,
entry_count, entry_count,
); );
let cache = &self.tick_cache[..entry_count];
for t in cache { for tick in &self.tick_cache[..entry_count] {
working_bank.bank.register_tick(&t.0.hash); working_bank.bank.register_tick(&tick.0.hash);
send_result = self.sender.send((working_bank.bank.clone(), tick.clone()));
if send_result.is_err() {
break;
}
}
} }
self.sender
.send((working_bank.bank.clone(), cache.to_vec()))
} else {
Ok(())
};
if self.tick_height >= working_bank.max_tick_height { if self.tick_height >= working_bank.max_tick_height {
info!( info!(
"poh_record: max_tick_height {} reached, clearing working_bank {}", "poh_record: max_tick_height {} reached, clearing working_bank {}",
@ -360,7 +362,7 @@ impl PohRecorder {
transactions, transactions,
}; };
self.sender self.sender
.send((working_bank.bank.clone(), vec![(entry, self.tick_height)]))?; .send((working_bank.bank.clone(), (entry, self.tick_height)))?;
return Ok(()); return Ok(());
} }
// record() might fail if the next PoH hash needs to be a tick. But that's ok, tick() // record() might fail if the next PoH hash needs to be a tick. But that's ok, tick()
@ -381,7 +383,7 @@ impl PohRecorder {
clear_bank_signal: Option<SyncSender<bool>>, clear_bank_signal: Option<SyncSender<bool>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
poh_config: &Arc<PohConfig>, poh_config: &Arc<PohConfig>,
) -> (Self, Receiver<WorkingBankEntries>) { ) -> (Self, Receiver<WorkingBankEntry>) {
let poh = Arc::new(Mutex::new(Poh::new( let poh = Arc::new(Mutex::new(Poh::new(
last_entry_hash, last_entry_hash,
poh_config.hashes_per_tick, poh_config.hashes_per_tick,
@ -425,7 +427,7 @@ impl PohRecorder {
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
poh_config: &Arc<PohConfig>, poh_config: &Arc<PohConfig>,
) -> (Self, Receiver<WorkingBankEntries>) { ) -> (Self, Receiver<WorkingBankEntry>) {
Self::new_with_clear_signal( Self::new_with_clear_signal(
tick_height, tick_height,
last_entry_hash, last_entry_hash,
@ -603,9 +605,12 @@ mod tests {
poh_recorder.tick(); poh_recorder.tick();
assert_eq!(poh_recorder.tick_height, 3); assert_eq!(poh_recorder.tick_height, 3);
assert_eq!(poh_recorder.tick_cache.len(), 0); assert_eq!(poh_recorder.tick_cache.len(), 0);
let (bank_, e) = entry_receiver.recv().expect("recv 1"); let mut num_entries = 0;
assert_eq!(e.len(), 3); while let Ok((wbank, (_entry, _tick_height))) = entry_receiver.try_recv() {
assert_eq!(bank_.slot(), bank.slot()); assert_eq!(wbank.slot(), bank.slot());
num_entries += 1;
}
assert_eq!(num_entries, 3);
assert!(poh_recorder.working_bank.is_none()); assert!(poh_recorder.working_bank.is_none());
} }
Blocktree::destroy(&ledger_path).unwrap(); Blocktree::destroy(&ledger_path).unwrap();
@ -649,8 +654,11 @@ mod tests {
assert_eq!(poh_recorder.tick_height, 5); assert_eq!(poh_recorder.tick_height, 5);
assert!(poh_recorder.working_bank.is_none()); assert!(poh_recorder.working_bank.is_none());
let (_, e) = entry_receiver.recv().expect("recv 1"); let mut num_entries = 0;
assert_eq!(e.len(), 3); while let Ok(_) = entry_receiver.try_recv() {
num_entries += 1;
}
assert_eq!(num_entries, 3);
} }
Blocktree::destroy(&ledger_path).unwrap(); Blocktree::destroy(&ledger_path).unwrap();
} }
@ -771,11 +779,10 @@ mod tests {
assert_eq!(poh_recorder.tick_cache.len(), 0); assert_eq!(poh_recorder.tick_cache.len(), 0);
//tick in the cache + entry //tick in the cache + entry
let (_b, e) = entry_receiver.recv().expect("recv 1"); let (_bank, (e, _tick_height)) = entry_receiver.recv().expect("recv 1");
assert_eq!(e.len(), 1); assert!(e.is_tick());
assert!(e[0].0.is_tick()); let (_bank, (e, _tick_height)) = entry_receiver.recv().expect("recv 2");
let (_b, e) = entry_receiver.recv().expect("recv 2"); assert!(!e.is_tick());
assert!(!e[0].0.is_tick());
} }
Blocktree::destroy(&ledger_path).unwrap(); Blocktree::destroy(&ledger_path).unwrap();
} }
@ -816,10 +823,10 @@ mod tests {
.record(bank.slot(), h1, vec![tx.clone()]) .record(bank.slot(), h1, vec![tx.clone()])
.is_err()); .is_err());
let (_bank, e) = entry_receiver.recv().expect("recv 1"); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(e.len(), 2); assert!(entry.is_tick());
assert!(e[0].0.is_tick()); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert!(e[1].0.is_tick()); assert!(entry.is_tick());
} }
Blocktree::destroy(&ledger_path).unwrap(); Blocktree::destroy(&ledger_path).unwrap();
} }

View File

@ -162,8 +162,8 @@ mod tests {
let mut need_partial = true; let mut need_partial = true;
while need_tick || need_entry || need_partial { while need_tick || need_entry || need_partial {
for entry in entry_receiver.recv().unwrap().1 { let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
let entry = &entry.0;
if entry.is_tick() { if entry.is_tick() {
assert!( assert!(
entry.num_hashes <= poh_config.hashes_per_tick.unwrap(), entry.num_hashes <= poh_config.hashes_per_tick.unwrap(),
@ -191,7 +191,6 @@ mod tests {
hashes += entry.num_hashes; hashes += entry.num_hashes;
} }
} }
}
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
let _ = poh_service.join().unwrap(); let _ = poh_service.join().unwrap();
let _ = entry_producer.join().unwrap(); let _ = entry_producer.join().unwrap();

View File

@ -7,7 +7,7 @@ use crate::broadcast_stage::{BroadcastStage, BroadcastStageType};
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::fetch_stage::FetchStage; use crate::fetch_stage::FetchStage;
use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::poh_recorder::{PohRecorder, WorkingBankEntry};
use crate::service::Service; use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage; use crate::sigverify_stage::SigVerifyStage;
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
@ -30,7 +30,7 @@ impl Tpu {
pub fn new( pub fn new(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
entry_receiver: Receiver<WorkingBankEntries>, entry_receiver: Receiver<WorkingBankEntry>,
transactions_sockets: Vec<UdpSocket>, transactions_sockets: Vec<UdpSocket>,
tpu_forwards_sockets: Vec<UdpSocket>, tpu_forwards_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,

View File

@ -560,7 +560,7 @@ dependencies = [
[[package]] [[package]]
name = "hex" name = "hex"
version = "0.3.2" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
@ -1442,7 +1442,7 @@ dependencies = [
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)",
"generic-array 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)", "generic-array 0.13.2 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2006,7 +2006,7 @@ dependencies = [
"checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb" "checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb"
"checksum hash32 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "12d790435639c06a7b798af9e1e331ae245b7ef915b92f70a39b4cf8c00686af" "checksum hash32 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "12d790435639c06a7b798af9e1e331ae245b7ef915b92f70a39b4cf8c00686af"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
"checksum hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" "checksum hex 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "023b39be39e3a2da62a94feb433e91e8bcd37676fbc8bea371daf52b7a769a3e"
"checksum humantime 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" "checksum humantime 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
"checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e"
"checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" "checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9"