Fix resetting PohRecorder to wrong bank (#3553)

* Check whether future slot already has transmission
This commit is contained in:
carllin
2019-03-29 20:00:36 -07:00
committed by GitHub
parent 5646daa820
commit f886b3b12b
9 changed files with 1260 additions and 961 deletions

View File

@ -1,10 +1,13 @@
#![feature(test)] #![feature(test)]
extern crate test; extern crate test;
#[macro_use]
extern crate solana;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
use solana::banking_stage::{create_test_recorder, BankingStage}; use solana::banking_stage::{create_test_recorder, BankingStage};
use solana::blocktree::{get_tmp_ledger_path, Blocktree};
use solana::cluster_info::ClusterInfo; use solana::cluster_info::ClusterInfo;
use solana::cluster_info::Node; use solana::cluster_info::Node;
use solana::packet::to_packets_chunked; use solana::packet::to_packets_chunked;
@ -104,33 +107,41 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect()) (x, iter::repeat(1).take(len).collect())
}) })
.collect(); .collect();
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); let ledger_path = get_tmp_ledger_path!();
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); {
let cluster_info = Arc::new(RwLock::new(cluster_info)); let blocktree = Arc::new(
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
poh_recorder.lock().unwrap().set_bank(&bank); );
let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(&bank, &blocktree);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
poh_recorder.lock().unwrap().set_bank(&bank);
let mut id = genesis_block.hash(); let mut id = genesis_block.hash();
for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) { for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) {
id = hash(&id.as_ref()); id = hash(&id.as_ref());
bank.register_tick(&id); bank.register_tick(&id);
}
let half_len = verified.len() / 2;
let mut start = 0;
bencher.iter(move || {
// make sure the transactions are still valid
bank.register_tick(&genesis_block.hash());
for v in verified[start..start + half_len].chunks(verified.len() / num_threads) {
verified_sender.send(v.to_vec()).unwrap();
} }
check_txs(&signal_receiver, txes / 2);
bank.clear_signatures(); let half_len = verified.len() / 2;
start += half_len; let mut start = 0;
start %= verified.len(); bencher.iter(move || {
}); // make sure the transactions are still valid
exit.store(true, Ordering::Relaxed); bank.register_tick(&genesis_block.hash());
poh_service.join().unwrap(); for v in verified[start..start + half_len].chunks(verified.len() / num_threads) {
verified_sender.send(v.to_vec()).unwrap();
}
check_txs(&signal_receiver, txes / 2);
bank.clear_signatures();
start += half_len;
start %= verified.len();
});
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
Blocktree::destroy(&ledger_path).unwrap();
} }
#[bench] #[bench]
@ -211,31 +222,40 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect()) (x, iter::repeat(1).take(len).collect())
}) })
.collect(); .collect();
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
poh_recorder.lock().unwrap().set_bank(&bank);
let mut id = genesis_block.hash(); let ledger_path = get_tmp_ledger_path!();
for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) { {
id = hash(&id.as_ref()); let blocktree = Arc::new(
bank.register_tick(&id); Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
} );
let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(&bank, &blocktree);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
poh_recorder.lock().unwrap().set_bank(&bank);
let half_len = verified.len() / 2; let mut id = genesis_block.hash();
let mut start = 0; for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) {
bencher.iter(move || { id = hash(&id.as_ref());
// make sure the transactions are still valid bank.register_tick(&id);
bank.register_tick(&genesis_block.hash());
for v in verified[start..start + half_len].chunks(verified.len() / num_threads) {
verified_sender.send(v.to_vec()).unwrap();
} }
check_txs(&signal_receiver, txes / 2);
bank.clear_signatures(); let half_len = verified.len() / 2;
start += half_len; let mut start = 0;
start %= verified.len(); bencher.iter(move || {
}); // make sure the transactions are still valid
exit.store(true, Ordering::Relaxed); bank.register_tick(&genesis_block.hash());
poh_service.join().unwrap(); for v in verified[start..start + half_len].chunks(verified.len() / num_threads) {
verified_sender.send(v.to_vec()).unwrap();
}
check_txs(&signal_receiver, txes / 2);
bank.clear_signatures();
start += half_len;
start %= verified.len();
});
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
Blocktree::destroy(&ledger_path).unwrap();
} }

View File

@ -1,7 +1,7 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used //! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and //! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU. //! can do its processing in parallel with signature verification on the GPU.
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::entry; use crate::entry;
use crate::entry::{hash_transactions, Entry}; use crate::entry::{hash_transactions, Entry};
@ -472,6 +472,7 @@ impl Service for BankingStage {
pub fn create_test_recorder( pub fn create_test_recorder(
bank: &Arc<Bank>, bank: &Arc<Bank>,
blocktree: &Arc<Blocktree>,
) -> ( ) -> (
Arc<AtomicBool>, Arc<AtomicBool>,
Arc<Mutex<PohRecorder>>, Arc<Mutex<PohRecorder>>,
@ -486,6 +487,7 @@ pub fn create_test_recorder(
Some(4), Some(4),
bank.ticks_per_slot(), bank.ticks_per_slot(),
&Pubkey::default(), &Pubkey::default(),
blocktree,
); );
poh_recorder.set_bank(&bank); poh_recorder.set_bank(&bank);
@ -498,10 +500,12 @@ pub fn create_test_recorder(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::cluster_info::Node; use crate::cluster_info::Node;
use crate::entry::EntrySlice; use crate::entry::EntrySlice;
use crate::packet::to_packets; use crate::packet::to_packets;
use crate::poh_recorder::WorkingBank; use crate::poh_recorder::WorkingBank;
use crate::{get_tmp_ledger_path, tmp_ledger_name};
use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::instruction::InstructionError; use solana_sdk::instruction::InstructionError;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
@ -514,14 +518,22 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (exit, poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank); let ledger_path = get_tmp_ledger_path!();
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); {
let cluster_info = Arc::new(RwLock::new(cluster_info)); let blocktree = Arc::new(
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
drop(verified_sender); );
exit.store(true, Ordering::Relaxed); let (exit, poh_recorder, poh_service, _entry_receiever) =
banking_stage.join().unwrap(); create_test_recorder(&bank, &blocktree);
poh_service.join().unwrap(); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
drop(verified_sender);
exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap();
poh_service.join().unwrap();
}
Blocktree::destroy(&ledger_path).unwrap();
} }
#[test] #[test]
@ -532,27 +544,35 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); let ledger_path = get_tmp_ledger_path!();
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); {
let cluster_info = Arc::new(RwLock::new(cluster_info)); let blocktree = Arc::new(
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
trace!("sending bank"); );
sleep(Duration::from_millis(600)); let (exit, poh_recorder, poh_service, entry_receiver) =
drop(verified_sender); create_test_recorder(&bank, &blocktree);
exit.store(true, Ordering::Relaxed); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
poh_service.join().unwrap(); let cluster_info = Arc::new(RwLock::new(cluster_info));
drop(poh_recorder); let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
trace!("sending bank");
sleep(Duration::from_millis(600));
drop(verified_sender);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);
trace!("getting entries"); trace!("getting entries");
let entries: Vec<_> = entry_receiver let entries: Vec<_> = entry_receiver
.iter() .iter()
.flat_map(|x| x.1.into_iter().map(|e| e.0)) .flat_map(|x| x.1.into_iter().map(|e| e.0))
.collect(); .collect();
trace!("done"); trace!("done");
assert_eq!(entries.len(), genesis_block.ticks_per_slot as usize - 1); assert_eq!(entries.len(), genesis_block.ticks_per_slot as usize - 1);
assert!(entries.verify(&start_hash)); assert!(entries.verify(&start_hash));
assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash()); assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash());
banking_stage.join().unwrap(); banking_stage.join().unwrap();
}
Blocktree::destroy(&ledger_path).unwrap();
} }
#[test] #[test]
@ -562,76 +582,84 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash(); let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); let ledger_path = get_tmp_ledger_path!();
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); {
let cluster_info = Arc::new(RwLock::new(cluster_info)); let blocktree = Arc::new(
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, entry_receiver) =
create_test_recorder(&bank, &blocktree);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
// fund another account so we can send 2 good transactions in a single batch. // fund another account so we can send 2 good transactions in a single batch.
let keypair = Keypair::new(); let keypair = Keypair::new();
let fund_tx = let fund_tx =
SystemTransaction::new_account(&mint_keypair, &keypair.pubkey(), 2, start_hash, 0); SystemTransaction::new_account(&mint_keypair, &keypair.pubkey(), 2, start_hash, 0);
bank.process_transaction(&fund_tx).unwrap(); bank.process_transaction(&fund_tx).unwrap();
// good tx // good tx
let to = Keypair::new().pubkey(); let to = Keypair::new().pubkey();
let tx = SystemTransaction::new_account(&mint_keypair, &to, 1, start_hash, 0); let tx = SystemTransaction::new_account(&mint_keypair, &to, 1, start_hash, 0);
// good tx, but no verify // good tx, but no verify
let to2 = Keypair::new().pubkey(); let to2 = Keypair::new().pubkey();
let tx_no_ver = SystemTransaction::new_account(&keypair, &to2, 2, start_hash, 0); let tx_no_ver = SystemTransaction::new_account(&keypair, &to2, 2, start_hash, 0);
// bad tx, AccountNotFound // bad tx, AccountNotFound
let keypair = Keypair::new(); let keypair = Keypair::new();
let to3 = Keypair::new().pubkey(); let to3 = Keypair::new().pubkey();
let tx_anf = SystemTransaction::new_account(&keypair, &to3, 1, start_hash, 0); let tx_anf = SystemTransaction::new_account(&keypair, &to3, 1, start_hash, 0);
// send 'em over // send 'em over
let packets = to_packets(&[tx_no_ver, tx_anf, tx]); let packets = to_packets(&[tx_no_ver, tx_anf, tx]);
// glad they all fit // glad they all fit
assert_eq!(packets.len(), 1); assert_eq!(packets.len(), 1);
verified_sender // no_ver, anf, tx verified_sender // no_ver, anf, tx
.send(vec![(packets[0].clone(), vec![0u8, 1u8, 1u8])]) .send(vec![(packets[0].clone(), vec![0u8, 1u8, 1u8])])
.unwrap(); .unwrap();
drop(verified_sender); drop(verified_sender);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap(); poh_service.join().unwrap();
drop(poh_recorder); drop(poh_recorder);
let mut blockhash = start_hash; let mut blockhash = start_hash;
let bank = Bank::new(&genesis_block); let bank = Bank::new(&genesis_block);
bank.process_transaction(&fund_tx).unwrap(); bank.process_transaction(&fund_tx).unwrap();
//receive entries + ticks //receive entries + ticks
for _ in 0..10 { for _ in 0..10 {
let ventries: Vec<Vec<Entry>> = entry_receiver let ventries: Vec<Vec<Entry>> = entry_receiver
.iter() .iter()
.map(|x| x.1.into_iter().map(|e| e.0).collect()) .map(|x| x.1.into_iter().map(|e| e.0).collect())
.collect(); .collect();
for entries in &ventries { for entries in &ventries {
for entry in entries { for entry in entries {
bank.process_transactions(&entry.transactions) bank.process_transactions(&entry.transactions)
.iter() .iter()
.for_each(|x| assert_eq!(*x, Ok(()))); .for_each(|x| assert_eq!(*x, Ok(())));
}
assert!(entries.verify(&blockhash));
blockhash = entries.last().unwrap().hash;
} }
assert!(entries.verify(&blockhash));
blockhash = entries.last().unwrap().hash; if bank.get_balance(&to) == 1 {
break;
}
sleep(Duration::from_millis(200));
} }
if bank.get_balance(&to) == 1 { assert_eq!(bank.get_balance(&to), 1);
break; assert_eq!(bank.get_balance(&to2), 0);
}
sleep(Duration::from_millis(200)); drop(entry_receiver);
banking_stage.join().unwrap();
} }
Blocktree::destroy(&ledger_path).unwrap();
assert_eq!(bank.get_balance(&to), 1);
assert_eq!(bank.get_balance(&to2), 0);
drop(entry_receiver);
banking_stage.join().unwrap();
} }
#[test] #[test]
@ -671,43 +699,57 @@ mod tests {
.send(vec![(packets[0].clone(), vec![1u8])]) .send(vec![(packets[0].clone(), vec![1u8])])
.unwrap(); .unwrap();
let entry_receiver = { let ledger_path = get_tmp_ledger_path!();
// start a banking_stage to eat verified receiver {
let bank = Arc::new(Bank::new(&genesis_block)); let entry_receiver = {
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); // start a banking_stage to eat verified receiver
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let bank = Arc::new(Bank::new(&genesis_block));
let cluster_info = Arc::new(RwLock::new(cluster_info)); let blocktree = Arc::new(
let _banking_stage = Blocktree::open(&ledger_path)
BankingStage::new_num_threads(&cluster_info, &poh_recorder, verified_receiver, 1); .expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, entry_receiver) =
create_test_recorder(&bank, &blocktree);
let cluster_info =
ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new_num_threads(
&cluster_info,
&poh_recorder,
verified_receiver,
1,
);
// wait for banking_stage to eat the packets // wait for banking_stage to eat the packets
while bank.get_balance(&alice.pubkey()) != 1 { while bank.get_balance(&alice.pubkey()) != 1 {
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(100));
} }
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap(); poh_service.join().unwrap();
entry_receiver entry_receiver
}; };
drop(verified_sender); drop(verified_sender);
// consume the entire entry_receiver, feed it into a new bank // consume the entire entry_receiver, feed it into a new bank
// check that the balance is what we expect. // check that the balance is what we expect.
let entries: Vec<_> = entry_receiver let entries: Vec<_> = entry_receiver
.iter()
.flat_map(|x| x.1.into_iter().map(|e| e.0))
.collect();
let bank = Bank::new(&genesis_block);
for entry in &entries {
bank.process_transactions(&entry.transactions)
.iter() .iter()
.for_each(|x| assert_eq!(*x, Ok(()))); .flat_map(|x| x.1.into_iter().map(|e| e.0))
} .collect();
// Assert the user holds one lamport, not two. If the stage only outputs one let bank = Bank::new(&genesis_block);
// entry, then the second transaction will be rejected, because it drives for entry in &entries {
// the account balance below zero before the credit is added. bank.process_transactions(&entry.transactions)
assert_eq!(bank.get_balance(&alice.pubkey()), 1); .iter()
.for_each(|x| assert_eq!(*x, Ok(())));
}
// Assert the user holds one lamport, not two. If the stage only outputs one
// entry, then the second transaction will be rejected, because it drives
// the account balance below zero before the credit is added.
assert_eq!(bank.get_balance(&alice.pubkey()), 1);
}
Blocktree::destroy(&ledger_path).unwrap();
} }
#[test] #[test]
@ -719,47 +761,53 @@ mod tests {
min_tick_height: bank.tick_height(), min_tick_height: bank.tick_height(),
max_tick_height: std::u64::MAX, max_tick_height: std::u64::MAX,
}; };
let ledger_path = get_tmp_ledger_path!();
{
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
let (poh_recorder, entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
None,
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let (poh_recorder, entry_receiver) = PohRecorder::new( poh_recorder.lock().unwrap().set_working_bank(working_bank);
bank.tick_height(), let pubkey = Keypair::new().pubkey();
bank.last_blockhash(),
bank.slot(),
None,
bank.ticks_per_slot(),
&Pubkey::default(),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank); let transactions = vec![
let pubkey = Keypair::new().pubkey(); SystemTransaction::new_move(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
SystemTransaction::new_move(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0),
];
let transactions = vec![ let mut results = vec![Ok(()), Ok(())];
SystemTransaction::new_move(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder)
SystemTransaction::new_move(&mint_keypair, &pubkey, 1, genesis_block.hash(), 0), .unwrap();
]; let (_, entries) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len());
let mut results = vec![Ok(()), Ok(())]; // InstructionErrors should still be recorded
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) results[0] = Err(TransactionError::InstructionError(
.unwrap(); 1,
let (_, entries) = entry_receiver.recv().unwrap(); InstructionError::new_result_with_negative_lamports(),
assert_eq!(entries[0].0.transactions.len(), transactions.len()); ));
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder)
.unwrap();
let (_, entries) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len());
// InstructionErrors should still be recorded // Other TransactionErrors should not be recorded
results[0] = Err(TransactionError::InstructionError( results[0] = Err(TransactionError::AccountNotFound);
1, BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder)
InstructionError::new_result_with_negative_lamports(), .unwrap();
)); let (_, entries) = entry_receiver.recv().unwrap();
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1);
.unwrap(); }
let (_, entries) = entry_receiver.recv().unwrap(); Blocktree::destroy(&ledger_path).unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len());
// Other TransactionErrors should not be recorded
results[0] = Err(TransactionError::AccountNotFound);
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder)
.unwrap();
let (_, entries) = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1);
} }
#[test] #[test]
@ -782,53 +830,61 @@ mod tests {
min_tick_height: bank.tick_height(), min_tick_height: bank.tick_height(),
max_tick_height: bank.tick_height() + 1, max_tick_height: bank.tick_height() + 1,
}; };
let (poh_recorder, entry_receiver) = PohRecorder::new( let ledger_path = get_tmp_ledger_path!();
bank.tick_height(), {
bank.last_blockhash(), let blocktree =
bank.slot(), Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
Some(4), let (poh_recorder, entry_receiver) = PohRecorder::new(
bank.ticks_per_slot(), bank.tick_height(),
&pubkey, bank.last_blockhash(),
); bank.slot(),
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); Some(4),
bank.ticks_per_slot(),
&pubkey,
&Arc::new(blocktree),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_working_bank(working_bank); poh_recorder.lock().unwrap().set_working_bank(working_bank);
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap(); BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder)
poh_recorder.lock().unwrap().tick(); .unwrap();
poh_recorder.lock().unwrap().tick();
let mut done = false; let mut done = false;
// read entries until I find mine, might be ticks... // read entries until I find mine, might be ticks...
while let Ok((_, entries)) = entry_receiver.recv() { while let Ok((_, entries)) = entry_receiver.recv() {
for (entry, _) in entries { for (entry, _) in entries {
if !entry.is_tick() { if !entry.is_tick() {
trace!("got entry"); trace!("got entry");
assert_eq!(entry.transactions.len(), transactions.len()); assert_eq!(entry.transactions.len(), transactions.len());
assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(bank.get_balance(&pubkey), 1);
done = true; done = true;
}
}
if done {
break;
} }
} }
if done { trace!("done ticking");
break;
} assert_eq!(done, true);
let transactions = vec![SystemTransaction::new_move(
&mint_keypair,
&pubkey,
2,
genesis_block.hash(),
0,
)];
assert_matches!(
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder),
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
);
assert_eq!(bank.get_balance(&pubkey), 1);
} }
trace!("done ticking"); Blocktree::destroy(&ledger_path).unwrap();
assert_eq!(done, true);
let transactions = vec![SystemTransaction::new_move(
&mint_keypair,
&pubkey,
2,
genesis_block.hash(),
0,
)];
assert_matches!(
BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder),
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
);
assert_eq!(bank.get_balance(&pubkey), 1);
} }
} }

View File

@ -105,13 +105,15 @@ impl Fullnode {
bank.tick_height(), bank.tick_height(),
bank.last_blockhash(), bank.last_blockhash(),
); );
let blocktree = Arc::new(blocktree);
let (poh_recorder, entry_receiver) = PohRecorder::new( let (poh_recorder, entry_receiver) = PohRecorder::new(
bank.tick_height(), bank.tick_height(),
bank.last_blockhash(), bank.last_blockhash(),
bank.slot(), bank.slot(),
leader_schedule_utils::next_leader_slot(&id, bank.slot(), &bank), leader_schedule_utils::next_leader_slot(&id, bank.slot(), &bank, Some(&blocktree)),
bank.ticks_per_slot(), bank.ticks_per_slot(),
&id, &id,
&blocktree,
); );
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit);
@ -130,7 +132,6 @@ impl Fullnode {
node.sockets.gossip.local_addr().unwrap() node.sockets.gossip.local_addr().unwrap()
); );
let blocktree = Arc::new(blocktree);
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
node.info.wallclock = timestamp(); node.info.wallclock = timestamp();

View File

@ -1,3 +1,4 @@
use crate::blocktree::Blocktree;
use crate::leader_schedule::LeaderSchedule; use crate::leader_schedule::LeaderSchedule;
use crate::staking_utils; use crate::staking_utils;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
@ -44,7 +45,12 @@ pub fn slot_leader_at(slot: u64, bank: &Bank) -> Option<Pubkey> {
} }
/// Return the next slot after the given current_slot that the given node will be leader /// Return the next slot after the given current_slot that the given node will be leader
pub fn next_leader_slot(pubkey: &Pubkey, mut current_slot: u64, bank: &Bank) -> Option<u64> { pub fn next_leader_slot(
pubkey: &Pubkey,
mut current_slot: u64,
bank: &Bank,
blocktree: Option<&Blocktree>,
) -> Option<u64> {
let (mut epoch, mut start_index) = bank.get_epoch_and_slot_index(current_slot + 1); let (mut epoch, mut start_index) = bank.get_epoch_and_slot_index(current_slot + 1);
while let Some(leader_schedule) = leader_schedule(epoch, bank) { while let Some(leader_schedule) = leader_schedule(epoch, bank) {
// clippy thinks I should do this: // clippy thinks I should do this:
@ -59,6 +65,15 @@ pub fn next_leader_slot(pubkey: &Pubkey, mut current_slot: u64, bank: &Bank) ->
for i in start_index..bank.get_slots_in_epoch(epoch) { for i in start_index..bank.get_slots_in_epoch(epoch) {
current_slot += 1; current_slot += 1;
if *pubkey == leader_schedule[i] { if *pubkey == leader_schedule[i] {
if let Some(blocktree) = blocktree {
if let Some(meta) = blocktree.meta(current_slot).unwrap() {
// We have already sent a blob for this slot, so skip it
if meta.received > 0 {
continue;
}
}
}
return Some(current_slot); return Some(current_slot);
} }
} }
@ -82,6 +97,8 @@ pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::tests::make_slot_entries;
use crate::staking_utils; use crate::staking_utils;
use crate::voting_keypair::tests::new_vote_account_with_delegate; use crate::voting_keypair::tests::new_vote_account_with_delegate;
use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS}; use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS};
@ -101,13 +118,14 @@ mod tests {
let bank = Bank::new(&genesis_block); let bank = Bank::new(&genesis_block);
assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey); assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey);
assert_eq!(next_leader_slot(&pubkey, 0, &bank), Some(1)); assert_eq!(next_leader_slot(&pubkey, 0, &bank, None), Some(1));
assert_eq!(next_leader_slot(&pubkey, 1, &bank), Some(2)); assert_eq!(next_leader_slot(&pubkey, 1, &bank, None), Some(2));
assert_eq!( assert_eq!(
next_leader_slot( next_leader_slot(
&pubkey, &pubkey,
2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2 2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2
&bank &bank,
None
), ),
None None
); );
@ -116,12 +134,81 @@ mod tests {
next_leader_slot( next_leader_slot(
&Keypair::new().pubkey(), // not in leader_schedule &Keypair::new().pubkey(), // not in leader_schedule
0, 0,
&bank &bank,
None
), ),
None None
); );
} }
#[test]
fn test_next_leader_slot_blocktree() {
let pubkey = Keypair::new().pubkey();
let mut genesis_block = GenesisBlock::new_with_leader(
BOOTSTRAP_LEADER_LAMPORTS,
&pubkey,
BOOTSTRAP_LEADER_LAMPORTS,
)
.0;
genesis_block.epoch_warmup = false;
let bank = Bank::new(&genesis_block);
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey);
// Check that the next leader slot after 0 is slot 1
assert_eq!(
next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)),
Some(1)
);
// Write a blob into slot 2 that chains to slot 1,
// but slot 1 is empty so should not be skipped
let (blobs, _) = make_slot_entries(2, 1, 1);
blocktree.write_blobs(&blobs[..]).unwrap();
assert_eq!(
next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)),
Some(1)
);
// Write a blob into slot 1
let (blobs, _) = make_slot_entries(1, 0, 1);
// Check that slot 1 and 2 are skipped
blocktree.write_blobs(&blobs[..]).unwrap();
assert_eq!(
next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)),
Some(3)
);
// Integrity checks
assert_eq!(
next_leader_slot(
&pubkey,
2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2
&bank,
Some(&blocktree)
),
None
);
assert_eq!(
next_leader_slot(
&Keypair::new().pubkey(), // not in leader_schedule
0,
&bank,
Some(&blocktree)
),
None
);
}
Blocktree::destroy(&ledger_path).unwrap();
}
#[test] #[test]
fn test_next_leader_slot_next_epoch() { fn test_next_leader_slot_next_epoch() {
let pubkey = Keypair::new().pubkey(); let pubkey = Keypair::new().pubkey();
@ -169,8 +256,8 @@ mod tests {
expected_slot += index; expected_slot += index;
assert_eq!( assert_eq!(
next_leader_slot(&delegate_id, 0, &bank), next_leader_slot(&delegate_id, 0, &bank, None),
Some(expected_slot) Some(expected_slot),
); );
} }

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,5 @@
//! The `poh_service` module implements a service that records the passing of //! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream //! "ticks", a measure of time in the PoH stream
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::service::Service; use crate::service::Service;
use solana_sdk::timing::NUM_TICKS_PER_SECOND; use solana_sdk::timing::NUM_TICKS_PER_SECOND;
@ -98,6 +97,7 @@ impl Service for PohService {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::poh_recorder::WorkingBank; use crate::poh_recorder::WorkingBank;
use crate::result::Result; use crate::result::Result;
use crate::test_tx::test_tx; use crate::test_tx::test_tx;
@ -111,87 +111,94 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let prev_hash = bank.last_blockhash(); let prev_hash = bank.last_blockhash();
let (poh_recorder, entry_receiver) = PohRecorder::new( let ledger_path = get_tmp_ledger_path!();
bank.tick_height(), {
prev_hash, let blocktree =
bank.slot(), Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger");
Some(4), let (poh_recorder, entry_receiver) = PohRecorder::new(
bank.ticks_per_slot(), bank.tick_height(),
&Pubkey::default(), prev_hash,
); bank.slot(),
let poh_recorder = Arc::new(Mutex::new(poh_recorder)); Some(4),
let exit = Arc::new(AtomicBool::new(false)); bank.ticks_per_slot(),
let working_bank = WorkingBank { &Pubkey::default(),
bank: bank.clone(), &Arc::new(blocktree),
min_tick_height: bank.tick_height(), );
max_tick_height: std::u64::MAX, let poh_recorder = Arc::new(Mutex::new(poh_recorder));
}; let exit = Arc::new(AtomicBool::new(false));
let working_bank = WorkingBank {
bank: bank.clone(),
min_tick_height: bank.tick_height(),
max_tick_height: std::u64::MAX,
};
let entry_producer: JoinHandle<Result<()>> = { let entry_producer: JoinHandle<Result<()>> = {
let poh_recorder = poh_recorder.clone(); let poh_recorder = poh_recorder.clone();
let exit = exit.clone(); let exit = exit.clone();
Builder::new() Builder::new()
.name("solana-poh-service-entry_producer".to_string()) .name("solana-poh-service-entry_producer".to_string())
.spawn(move || { .spawn(move || {
loop { loop {
// send some data // send some data
let h1 = hash(b"hello world!"); let h1 = hash(b"hello world!");
let tx = test_tx(); let tx = test_tx();
poh_recorder poh_recorder
.lock() .lock()
.unwrap() .unwrap()
.record(bank.slot(), h1, vec![tx]) .record(bank.slot(), h1, vec![tx])
.unwrap(); .unwrap();
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break Ok(()); break Ok(());
}
} }
} })
}) .unwrap()
.unwrap() };
};
const HASHES_PER_TICK: u64 = 2; const HASHES_PER_TICK: u64 = 2;
let poh_service = PohService::new( let poh_service = PohService::new(
poh_recorder.clone(), poh_recorder.clone(),
&PohServiceConfig::Tick(HASHES_PER_TICK as usize), &PohServiceConfig::Tick(HASHES_PER_TICK as usize),
&exit, &exit,
); );
poh_recorder.lock().unwrap().set_working_bank(working_bank); poh_recorder.lock().unwrap().set_working_bank(working_bank);
// get some events // get some events
let mut hashes = 0; let mut hashes = 0;
let mut need_tick = true; let mut need_tick = true;
let mut need_entry = true; let mut need_entry = true;
let mut need_partial = true; let mut need_partial = true;
while need_tick || need_entry || need_partial { while need_tick || need_entry || need_partial {
for entry in entry_receiver.recv().unwrap().1 { for entry in entry_receiver.recv().unwrap().1 {
let entry = &entry.0; let entry = &entry.0;
if entry.is_tick() { if entry.is_tick() {
assert!(entry.num_hashes <= HASHES_PER_TICK); assert!(entry.num_hashes <= HASHES_PER_TICK);
if entry.num_hashes == HASHES_PER_TICK { if entry.num_hashes == HASHES_PER_TICK {
need_tick = false; need_tick = false;
} else {
need_partial = false;
}
hashes += entry.num_hashes;
assert_eq!(hashes, HASHES_PER_TICK);
hashes = 0;
} else { } else {
need_partial = false; assert!(entry.num_hashes >= 1);
need_entry = false;
hashes += entry.num_hashes - 1;
} }
hashes += entry.num_hashes;
assert_eq!(hashes, HASHES_PER_TICK);
hashes = 0;
} else {
assert!(entry.num_hashes >= 1);
need_entry = false;
hashes += entry.num_hashes - 1;
} }
} }
exit.store(true, Ordering::Relaxed);
let _ = poh_service.join().unwrap();
let _ = entry_producer.join().unwrap();
} }
exit.store(true, Ordering::Relaxed); Blocktree::destroy(&ledger_path).unwrap();
let _ = poh_service.join().unwrap();
let _ = entry_producer.join().unwrap();
} }
} }

View File

@ -147,7 +147,13 @@ impl ReplayStage {
&cluster_info, &cluster_info,
); );
Self::reset_poh_recorder(&my_id, &bank, &poh_recorder, ticks_per_slot); Self::reset_poh_recorder(
&my_id,
&blocktree,
&bank,
&poh_recorder,
ticks_per_slot,
);
is_tpu_bank_active = false; is_tpu_bank_active = false;
} }
@ -171,7 +177,6 @@ impl ReplayStage {
&bank_forks, &bank_forks,
&poh_recorder, &poh_recorder,
&cluster_info, &cluster_info,
&blocktree,
poh_slot, poh_slot,
reached_leader_tick, reached_leader_tick,
grace_ticks, grace_ticks,
@ -200,35 +205,11 @@ impl ReplayStage {
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
blocktree: &Blocktree,
poh_slot: u64, poh_slot: u64,
reached_leader_tick: bool, reached_leader_tick: bool,
grace_ticks: u64, grace_ticks: u64,
) { ) {
trace!("{} checking poh slot {}", my_id, poh_slot); trace!("{} checking poh slot {}", my_id, poh_slot);
if blocktree.meta(poh_slot).unwrap().is_some() {
// We've already broadcasted entries for this slot, skip it
// Since we are skipping our leader slot, let's tell poh recorder when we should be
// leader again
if reached_leader_tick {
let _ = bank_forks.read().unwrap().get(poh_slot).map(|bank| {
let next_leader_slot =
leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank);
let mut poh = poh_recorder.lock().unwrap();
let start_slot = poh.start_slot();
poh.reset(
bank.tick_height(),
bank.last_blockhash(),
start_slot,
next_leader_slot,
bank.ticks_per_slot(),
);
});
}
return;
}
if bank_forks.read().unwrap().get(poh_slot).is_none() { if bank_forks.read().unwrap().get(poh_slot).is_none() {
let parent_slot = poh_recorder.lock().unwrap().start_slot(); let parent_slot = poh_recorder.lock().unwrap().start_slot();
let parent = { let parent = {
@ -332,11 +313,13 @@ impl ReplayStage {
fn reset_poh_recorder( fn reset_poh_recorder(
my_id: &Pubkey, my_id: &Pubkey,
blocktree: &Blocktree,
bank: &Arc<Bank>, bank: &Arc<Bank>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
ticks_per_slot: u64, ticks_per_slot: u64,
) { ) {
let next_leader_slot = leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank); let next_leader_slot =
leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank, Some(blocktree));
poh_recorder.lock().unwrap().reset( poh_recorder.lock().unwrap().reset(
bank.tick_height(), bank.tick_height(),
bank.last_blockhash(), bank.last_blockhash(),
@ -635,7 +618,8 @@ mod test {
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
let blocktree = Arc::new(blocktree); let blocktree = Arc::new(blocktree);
let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blocktree);
let (ledger_writer_sender, ledger_writer_receiver) = channel(); let (ledger_writer_sender, ledger_writer_receiver) = channel();
let (replay_stage, _slot_full_receiver) = ReplayStage::new( let (replay_stage, _slot_full_receiver) = ReplayStage::new(
&my_keypair.pubkey(), &my_keypair.pubkey(),

View File

@ -208,8 +208,10 @@ pub mod tests {
let blocktree_path = get_tmp_ledger_path!(); let blocktree_path = get_tmp_ledger_path!();
let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path) let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path)
.expect("Expected to successfully open ledger"); .expect("Expected to successfully open ledger");
let blocktree = Arc::new(blocktree);
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blocktree);
let voting_keypair = Keypair::new(); let voting_keypair = Keypair::new();
let (storage_entry_sender, storage_entry_receiver) = channel(); let (storage_entry_sender, storage_entry_receiver) = channel();
let tvu = Tvu::new( let tvu = Tvu::new(
@ -225,7 +227,7 @@ pub mod tests {
fetch: target1.sockets.tvu, fetch: target1.sockets.tvu,
} }
}, },
Arc::new(blocktree), blocktree,
STORAGE_ROTATE_TEST_COUNT, STORAGE_ROTATE_TEST_COUNT,
&StorageState::default(), &StorageState::default(),
None, None,

View File

@ -99,95 +99,98 @@ fn test_replay() {
let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, &exit); let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, &exit);
let voting_keypair = Keypair::new(); let voting_keypair = Keypair::new();
let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = let blocktree = Arc::new(blocktree);
create_test_recorder(&bank); {
let (storage_sender, storage_receiver) = channel(); let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) =
let tvu = Tvu::new( create_test_recorder(&bank, &blocktree);
&voting_keypair.pubkey(), let (storage_sender, storage_receiver) = channel();
Some(Arc::new(voting_keypair)), let tvu = Tvu::new(
&bank_forks, &voting_keypair.pubkey(),
&bank_forks_info, Some(Arc::new(voting_keypair)),
&cref1, &bank_forks,
{ &bank_forks_info,
Sockets { &cref1,
repair: target1.sockets.repair, {
retransmit: target1.sockets.retransmit, Sockets {
fetch: target1.sockets.tvu, repair: target1.sockets.repair,
} retransmit: target1.sockets.retransmit,
}, fetch: target1.sockets.tvu,
Arc::new(blocktree), }
STORAGE_ROTATE_TEST_COUNT, },
&StorageState::default(), blocktree,
None, STORAGE_ROTATE_TEST_COUNT,
ledger_signal_receiver, &StorageState::default(),
&Arc::new(RpcSubscriptions::default()), None,
&poh_recorder, ledger_signal_receiver,
storage_sender, &Arc::new(RpcSubscriptions::default()),
storage_receiver, &poh_recorder,
&exit, storage_sender,
); storage_receiver,
&exit,
let mut mint_ref_balance = starting_mint_balance;
let mut msgs = Vec::new();
let mut blob_idx = 0;
let num_transfers = 10;
let mut transfer_amount = 501;
let bob_keypair = Keypair::new();
let mut cur_hash = blockhash;
for i in 0..num_transfers {
let entry0 = next_entry_mut(&mut cur_hash, i, vec![]);
let entry_tick0 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
let tx0 = SystemTransaction::new_account(
&mint_keypair,
&bob_keypair.pubkey(),
transfer_amount,
blockhash,
0,
); );
let entry_tick1 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
let entry1 = next_entry_mut(&mut cur_hash, i + num_transfers, vec![tx0]);
let entry_tick2 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
mint_ref_balance -= transfer_amount; let mut mint_ref_balance = starting_mint_balance;
transfer_amount -= 1; // Sneaky: change transfer_amount slightly to avoid DuplicateSignature errors let mut msgs = Vec::new();
let mut blob_idx = 0;
let num_transfers = 10;
let mut transfer_amount = 501;
let bob_keypair = Keypair::new();
let mut cur_hash = blockhash;
for i in 0..num_transfers {
let entry0 = next_entry_mut(&mut cur_hash, i, vec![]);
let entry_tick0 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2]; let tx0 = SystemTransaction::new_account(
let blobs = entries.to_shared_blobs(); &mint_keypair,
index_blobs(&blobs, &leader.info.id, blob_idx, 1, 0); &bob_keypair.pubkey(),
blob_idx += blobs.len() as u64; transfer_amount,
blobs blockhash,
.iter() 0,
.for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr)); );
msgs.extend(blobs.into_iter()); let entry_tick1 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
let entry1 = next_entry_mut(&mut cur_hash, i + num_transfers, vec![tx0]);
let entry_tick2 = next_entry_mut(&mut cur_hash, i + 1, vec![]);
mint_ref_balance -= transfer_amount;
transfer_amount -= 1; // Sneaky: change transfer_amount slightly to avoid DuplicateSignature errors
let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2];
let blobs = entries.to_shared_blobs();
index_blobs(&blobs, &leader.info.id, blob_idx, 1, 0);
blob_idx += blobs.len() as u64;
blobs
.iter()
.for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));
msgs.extend(blobs.into_iter());
}
// send the blobs into the socket
s_responder.send(msgs).expect("send");
drop(s_responder);
// receive retransmitted messages
let timer = Duration::new(1, 0);
while let Ok(_msg) = r_reader.recv_timeout(timer) {
trace!("got msg");
}
let working_bank = bank_forks.read().unwrap().working_bank();
let final_mint_balance = working_bank.get_balance(&mint_keypair.pubkey());
assert_eq!(final_mint_balance, mint_ref_balance);
let bob_balance = working_bank.get_balance(&bob_keypair.pubkey());
assert_eq!(bob_balance, starting_mint_balance - mint_ref_balance);
exit.store(true, Ordering::Relaxed);
poh_service_exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
tvu.join().unwrap();
dr_l.join().unwrap();
dr_2.join().unwrap();
dr_1.join().unwrap();
t_receiver.join().unwrap();
t_responder.join().unwrap();
} }
// send the blobs into the socket
s_responder.send(msgs).expect("send");
drop(s_responder);
// receive retransmitted messages
let timer = Duration::new(1, 0);
while let Ok(_msg) = r_reader.recv_timeout(timer) {
trace!("got msg");
}
let working_bank = bank_forks.read().unwrap().working_bank();
let final_mint_balance = working_bank.get_balance(&mint_keypair.pubkey());
assert_eq!(final_mint_balance, mint_ref_balance);
let bob_balance = working_bank.get_balance(&bob_keypair.pubkey());
assert_eq!(bob_balance, starting_mint_balance - mint_ref_balance);
exit.store(true, Ordering::Relaxed);
poh_service_exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
tvu.join().unwrap();
dr_l.join().unwrap();
dr_2.join().unwrap();
dr_1.join().unwrap();
t_receiver.join().unwrap();
t_responder.join().unwrap();
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&blocktree_path); let _ignored = remove_dir_all(&blocktree_path);
} }