diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index d67edc5958..e5f1e4702a 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -432,7 +432,7 @@ pub fn create_test_recorder( ) { let exit = Arc::new(AtomicBool::new(false)); let (poh_recorder, entry_receiver) = - PohRecorder::new(bank.tick_height(), bank.last_blockhash()); + PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit); (exit, poh_recorder, poh_service, entry_receiver) @@ -641,7 +641,7 @@ mod tests { }; let (poh_recorder, entry_receiver) = - PohRecorder::new(bank.tick_height(), bank.last_blockhash()); + PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); poh_recorder.lock().unwrap().set_working_bank(working_bank); @@ -694,7 +694,7 @@ mod tests { max_tick_height: bank.tick_height() + 1, }; let (poh_recorder, entry_receiver) = - PohRecorder::new(bank.tick_height(), bank.last_blockhash()); + PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); poh_recorder.lock().unwrap().set_working_bank(working_bank); diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index f3d7fc6329..ef9ee0fd8e 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -107,7 +107,7 @@ impl Fullnode { bank.last_blockhash(), ); let (poh_recorder, entry_receiver) = - PohRecorder::new(bank.tick_height(), bank.last_blockhash()); + PohRecorder::new(bank.tick_height(), bank.last_blockhash(), bank.slot()); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit); poh_recorder.lock().unwrap().clear_bank_signal = diff --git a/core/src/leader_schedule_utils.rs b/core/src/leader_schedule_utils.rs index 63c17525cd..4e0d6bf2e6 100644 --- a/core/src/leader_schedule_utils.rs +++ b/core/src/leader_schedule_utils.rs @@ -43,8 +43,8 @@ pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 { bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1 } -pub fn tick_height_to_slot(bank: &Bank, tick_height: u64) -> u64 { - tick_height / bank.ticks_per_slot() +pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 { + tick_height / ticks_per_slot } #[cfg(test)] diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 7ae89ea442..0df9058e17 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -37,10 +37,11 @@ pub struct WorkingBank { pub struct PohRecorder { pub poh: Poh, + pub clear_bank_signal: Option>, + start_slot: u64, tick_cache: Vec<(Entry, u64)>, working_bank: Option, sender: Sender, - pub clear_bank_signal: Option>, } impl PohRecorder { @@ -57,14 +58,20 @@ impl PohRecorder { self.poh.hash(); } + pub fn start_slot(&self) -> u64 { + self.start_slot + } + pub fn bank(&self) -> Option> { self.working_bank.clone().map(|w| w.bank) } + pub fn tick_height(&self) -> u64 { self.poh.tick_height } + // synchronize PoH with a bank - pub fn reset(&mut self, tick_height: u64, blockhash: Hash) { + pub fn reset(&mut self, tick_height: u64, blockhash: Hash, start_slot: u64) { self.clear_bank(); let existing = self.tick_cache.iter().any(|(entry, entry_tick_height)| { if entry.hash == blockhash { @@ -85,6 +92,7 @@ impl PohRecorder { self.poh.hash, self.poh.tick_height, blockhash, tick_height, ); std::mem::swap(&mut cache, &mut self.tick_cache); + self.start_slot = start_slot; self.poh = Poh::new(blockhash, tick_height); } @@ -180,7 +188,11 @@ impl PohRecorder { /// A recorder to synchronize PoH with the following data structures /// * bank - the LastId's queue is updated on `tick` and `record` events /// * sender - the Entry channel that outputs to the ledger - pub fn new(tick_height: u64, last_entry_hash: Hash) -> (Self, Receiver) { + pub fn new( + tick_height: u64, + last_entry_hash: Hash, + start_slot: u64, + ) -> (Self, Receiver) { let poh = Poh::new(last_entry_hash, tick_height); let (sender, receiver) = channel(); ( @@ -190,6 +202,7 @@ impl PohRecorder { working_bank: None, sender, clear_bank_signal: None, + start_slot, }, receiver, ) @@ -241,7 +254,7 @@ mod tests { #[test] fn test_poh_recorder_no_zero_tick() { let prev_hash = Hash::default(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 1); assert_eq!(poh_recorder.tick_cache[0].1, 1); @@ -251,7 +264,7 @@ mod tests { #[test] fn test_poh_recorder_tick_height_is_last_tick() { let prev_hash = Hash::default(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); @@ -261,10 +274,10 @@ mod tests { #[test] fn test_poh_recorder_reset_clears_cache() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 1); - poh_recorder.reset(0, Hash::default()); + poh_recorder.reset(0, Hash::default(), 0); assert_eq!(poh_recorder.tick_cache.len(), 0); } @@ -273,7 +286,7 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash, 0); let working_bank = WorkingBank { bank, @@ -291,7 +304,7 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); let working_bank = WorkingBank { bank: bank.clone(), @@ -321,7 +334,7 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); poh_recorder.tick(); poh_recorder.tick(); @@ -349,7 +362,7 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); let working_bank = WorkingBank { bank, @@ -369,7 +382,7 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); let working_bank = WorkingBank { bank, @@ -398,7 +411,7 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); let working_bank = WorkingBank { bank, @@ -424,7 +437,7 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash, 0); let working_bank = WorkingBank { bank, @@ -443,28 +456,30 @@ mod tests { #[test] fn test_reset_current() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); - poh_recorder.reset(poh_recorder.poh.tick_height, poh_recorder.poh.hash); + poh_recorder.reset(poh_recorder.poh.tick_height, poh_recorder.poh.hash, 0); assert_eq!(poh_recorder.tick_cache.len(), 2); } #[test] fn test_reset_with_cached() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); poh_recorder.reset( poh_recorder.tick_cache[0].1, poh_recorder.tick_cache[0].0.hash, + 0, ); assert_eq!(poh_recorder.tick_cache.len(), 2); poh_recorder.reset( poh_recorder.tick_cache[1].1, poh_recorder.tick_cache[1].0.hash, + 0, ); assert_eq!(poh_recorder.tick_cache.len(), 2); } @@ -472,7 +487,7 @@ mod tests { #[test] #[should_panic] fn test_reset_with_cached_bad_height() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); @@ -480,18 +495,19 @@ mod tests { poh_recorder.reset( poh_recorder.tick_cache[0].1, poh_recorder.tick_cache[1].0.hash, + 0, ); } #[test] fn test_reset_to_new_value() { - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); poh_recorder.tick(); poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 3); assert_eq!(poh_recorder.poh.tick_height, 3); - poh_recorder.reset(1, hash(b"hello")); + poh_recorder.reset(1, hash(b"hello"), 0); assert_eq!(poh_recorder.tick_cache.len(), 0); poh_recorder.tick(); assert_eq!(poh_recorder.poh.tick_height, 2); @@ -501,14 +517,14 @@ mod tests { fn test_reset_clear_bank() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); let working_bank = WorkingBank { bank, min_tick_height: 2, max_tick_height: 3, }; poh_recorder.set_working_bank(working_bank); - poh_recorder.reset(1, hash(b"hello")); + poh_recorder.reset(1, hash(b"hello"), 0); assert!(poh_recorder.working_bank.is_none()); } @@ -516,7 +532,7 @@ mod tests { pub fn test_clear_signal() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default(), 0); let (sender, receiver) = sync_channel(1); poh_recorder.set_bank(&bank); poh_recorder.clear_bank_signal = Some(sender); diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index caf337fd0a..2f97209993 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -110,7 +110,8 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (poh_recorder, entry_receiver) = PohRecorder::new(bank.tick_height(), prev_hash); + let (poh_recorder, entry_receiver) = + PohRecorder::new(bank.tick_height(), prev_hash, bank.slot()); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let exit = Arc::new(AtomicBool::new(false)); let working_bank = WorkingBank { diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 9fa1f60a35..026fb28222 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -76,6 +76,7 @@ impl ReplayStage { let poh_recorder = poh_recorder.clone(); let my_id = *my_id; let vote_account = *vote_account; + let mut ticks_per_slot = 0; // Start the replay stage loop let t_replay = Builder::new() @@ -92,10 +93,11 @@ impl ReplayStage { Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap()); let active_banks = bank_forks.read().unwrap().active_banks(); trace!("active banks {:?}", active_banks); - let mut votable: Vec = vec![]; + let mut votable: Vec> = vec![]; let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some(); for bank_slot in &active_banks { let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); + ticks_per_slot = bank.ticks_per_slot(); if bank.collector_id() != my_id { Self::replay_blocktree_into_bank( &bank, @@ -108,49 +110,63 @@ impl ReplayStage { if bank.tick_height() == max_tick_height { bank.freeze(); info!("bank frozen {}", bank.slot()); - votable.push(*bank_slot); progress.remove(bank_slot); if let Err(e) = slot_full_sender.send((bank.slot(), bank.collector_id())) { info!("{} slot_full alert failed: {:?}", my_id, e); } + votable.push(bank); } } + + if ticks_per_slot == 0 { + let frozen_banks = bank_forks.read().unwrap().frozen_banks(); + let bank = frozen_banks.values().next().unwrap(); + ticks_per_slot = bank.ticks_per_slot(); + } + // TODO: fork selection // vote on the latest one for now - votable.sort(); + votable.sort_by(|b1, b2| b1.slot().cmp(&b2.slot())); - if let Some(latest_slot_vote) = votable.last() { - let parent = bank_forks - .read() - .unwrap() - .get(*latest_slot_vote) - .unwrap() - .clone(); - - subscriptions.notify_subscribers(&parent); + if let Some(bank) = votable.last() { + subscriptions.notify_subscribers(&bank); if let Some(ref voting_keypair) = voting_keypair { let keypair = voting_keypair.as_ref(); let vote = VoteTransaction::new_vote( &vote_account, keypair, - *latest_slot_vote, - parent.last_blockhash(), + bank.slot(), + bank.last_blockhash(), 0, ); cluster_info.write().unwrap().push_vote(vote); } - poh_recorder - .lock() - .unwrap() - .reset(parent.tick_height(), parent.last_blockhash()); + poh_recorder.lock().unwrap().reset( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + ); is_tpu_bank_active = false; } if !is_tpu_bank_active { - Self::start_leader(&my_id, &bank_forks, &poh_recorder, &cluster_info); + assert!(ticks_per_slot > 0); + let poh_tick_height = poh_recorder.lock().unwrap().tick_height(); + let poh_slot = leader_schedule_utils::tick_height_to_slot( + ticks_per_slot, + poh_tick_height + 1, + ); + Self::start_leader( + &my_id, + &bank_forks, + &poh_recorder, + &cluster_info, + &blocktree, + poh_slot, + ); } inc_new_counter_info!( @@ -179,58 +195,50 @@ impl ReplayStage { bank_forks: &Arc>, poh_recorder: &Arc>, cluster_info: &Arc>, + blocktree: &Blocktree, + poh_slot: u64, ) { - let frozen = bank_forks.read().unwrap().frozen_banks(); + trace!("{} checking poh slot {}", my_id, poh_slot); + if blocktree.meta(poh_slot).unwrap().is_some() { + // We've already broadcasted entries for this slot, skip it + return; + } + if bank_forks.read().unwrap().get(poh_slot).is_none() { + let frozen = bank_forks.read().unwrap().frozen_banks(); + let parent_slot = poh_recorder.lock().unwrap().start_slot(); + assert!(frozen.contains_key(&parent_slot)); + let parent = &frozen[&parent_slot]; - // TODO: fork selection - let mut newest_frozen: Vec<(&u64, &Arc)> = frozen.iter().collect(); - newest_frozen.sort_by_key(|x| *x.0); - if let Some((_, parent)) = newest_frozen.last() { - let poh_tick_height = poh_recorder.lock().unwrap().tick_height(); - let poh_slot = leader_schedule_utils::tick_height_to_slot(parent, poh_tick_height + 1); - trace!("checking poh slot for leader {}", poh_slot); - if frozen.get(&poh_slot).is_some() { - // Already been a leader for this slot, skip it - return; - } - if bank_forks.read().unwrap().get(poh_slot).is_none() { - leader_schedule_utils::slot_leader_at(poh_slot, parent) - .map(|next_leader| { - debug!( - "me: {} leader {} at poh slot {}", - my_id, next_leader, poh_slot - ); - cluster_info.write().unwrap().set_leader(&next_leader); - if next_leader == *my_id { - debug!("starting tpu for slot {}", poh_slot); - let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot); - bank_forks.write().unwrap().insert(poh_slot, tpu_bank); - if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() { - assert_eq!( - bank_forks.read().unwrap().working_bank().slot(), - tpu_bank.slot() - ); - debug!( - "poh_recorder new working bank: me: {} next_slot: {} next_leader: {}", - my_id, - tpu_bank.slot(), - next_leader - ); - poh_recorder.lock().unwrap().set_bank(&tpu_bank); - inc_new_counter_info!( - "replay_stage-new_leader", - tpu_bank.slot() as usize - ); - } + leader_schedule_utils::slot_leader_at(poh_slot, parent) + .map(|next_leader| { + debug!( + "me: {} leader {} at poh slot {}", + my_id, next_leader, poh_slot + ); + cluster_info.write().unwrap().set_leader(&next_leader); + if next_leader == *my_id { + debug!("{} starting tpu for slot {}", my_id, poh_slot); + let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot); + bank_forks.write().unwrap().insert(poh_slot, tpu_bank); + if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() { + assert_eq!( + bank_forks.read().unwrap().working_bank().slot(), + tpu_bank.slot() + ); + debug!( + "poh_recorder new working bank: me: {} next_slot: {} next_leader: {}", + my_id, + tpu_bank.slot(), + next_leader + ); + poh_recorder.lock().unwrap().set_bank(&tpu_bank); } - }) - .or_else(|| { - error!("No next leader found"); - None - }); - } - } else { - error!("No frozen banks available!"); + } + }) + .or_else(|| { + error!("{} No next leader found", my_id); + None + }); } } pub fn replay_blocktree_into_bank( diff --git a/tests/tvu.rs b/tests/tvu.rs index 2aa8a742e8..1d71ddc4a4 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -2,13 +2,12 @@ extern crate solana; use log::*; -use solana::bank_forks::BankForks; use solana::banking_stage::create_test_recorder; -use solana::blocktree::{get_tmp_ledger_path, Blocktree}; -use solana::blocktree_processor::BankForksInfo; +use solana::blocktree::{create_new_tmp_ledger, Blocktree}; use solana::cluster_info::{ClusterInfo, Node}; use solana::entry::next_entry_mut; use solana::entry::EntrySlice; +use solana::fullnode; use solana::gossip_service::GossipService; use solana::packet::index_blobs; use solana::rpc_subscriptions::RpcSubscriptions; @@ -17,7 +16,6 @@ use solana::storage_stage::StorageState; use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana::streamer; use solana::tvu::{Sockets, Tvu}; -use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; @@ -74,44 +72,37 @@ fn test_replay() { r_responder, ); - let starting_balance = 10_000; - let (mut genesis_block, mint_keypair) = GenesisBlock::new(starting_balance); + let total_balance = 10_000; + let leader_balance = 100; + let starting_mint_balance = total_balance - leader_balance; + let (genesis_block, mint_keypair) = + GenesisBlock::new_with_leader(total_balance, &leader.info.id, leader_balance); + let (blocktree_path, blockhash) = create_new_tmp_ledger!(&genesis_block); - // TODO: Fix this test so it always works with the default GenesisBlock configuration - genesis_block.ticks_per_slot = 64; - - let ticks_per_slot = genesis_block.ticks_per_slot; let tvu_addr = target1.info.tvu; - let bank = Bank::new(&genesis_block); - let blockhash = bank.last_blockhash(); - let bank_forks = BankForks::new(0, bank); - let bank_forks_info = vec![BankForksInfo { - bank_slot: 0, - entry_height: 0, - }]; - + let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = + fullnode::new_banks_from_blocktree(&blocktree_path, None); let bank = bank_forks.working_bank(); - assert_eq!(bank.get_balance(&mint_keypair.pubkey()), starting_balance); + assert_eq!( + bank.get_balance(&mint_keypair.pubkey()), + starting_mint_balance + ); // start cluster_info1 + let bank_forks = Arc::new(RwLock::new(bank_forks)); let mut cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone()); cluster_info1.insert_info(leader.info.clone()); let cref1 = Arc::new(RwLock::new(cluster_info1)); let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, &exit); - let blocktree_path = get_tmp_ledger_path!(); - - let (blocktree, ledger_signal_receiver) = - Blocktree::open_with_config_signal(&blocktree_path, ticks_per_slot) - .expect("Expected to successfully open ledger"); let voting_keypair = Keypair::new(); let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(Arc::new(voting_keypair)), - &Arc::new(RwLock::new(bank_forks)), + &bank_forks, &bank_forks_info, &cref1, { @@ -131,7 +122,7 @@ fn test_replay() { &exit, ); - let mut alice_ref_balance = starting_balance; + let mut mint_ref_balance = starting_mint_balance; let mut msgs = Vec::new(); let mut blob_idx = 0; let num_transfers = 10; @@ -153,12 +144,12 @@ fn test_replay() { let entry1 = next_entry_mut(&mut cur_hash, i + num_transfers, vec![tx0]); let entry_tick2 = next_entry_mut(&mut cur_hash, i + 1, vec![]); - alice_ref_balance -= transfer_amount; + mint_ref_balance -= transfer_amount; transfer_amount -= 1; // Sneaky: change transfer_amount slightly to avoid DuplicateSignature errors let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2]; let blobs = entries.to_shared_blobs(); - index_blobs(&blobs, &leader.info.id, blob_idx, 0, 0); + index_blobs(&blobs, &leader.info.id, blob_idx, 1, 0); blob_idx += blobs.len() as u64; blobs .iter() @@ -176,11 +167,12 @@ fn test_replay() { trace!("got msg"); } - let alice_balance = bank.get_balance(&mint_keypair.pubkey()); - assert_eq!(alice_balance, alice_ref_balance); + let working_bank = bank_forks.read().unwrap().working_bank(); + let final_mint_balance = working_bank.get_balance(&mint_keypair.pubkey()); + assert_eq!(final_mint_balance, mint_ref_balance); - let bob_balance = bank.get_balance(&bob_keypair.pubkey()); - assert_eq!(bob_balance, starting_balance - alice_ref_balance); + let bob_balance = working_bank.get_balance(&bob_keypair.pubkey()); + assert_eq!(bob_balance, starting_mint_balance - mint_ref_balance); exit.store(true, Ordering::Relaxed); poh_service_exit.store(true, Ordering::Relaxed);