diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index bbf9df9025..f3eacfe613 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -103,7 +103,6 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { verified_receiver, Default::default(), &mint.last_id(), - 0, None, ); @@ -204,7 +203,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { verified_receiver, Default::default(), &mint.last_id(), - 0, None, ); diff --git a/src/bank.rs b/src/bank.rs index 37d1b474a3..49dd79ab48 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -40,7 +40,6 @@ use token_program::TokenProgram; use tokio::prelude::Future; use transaction::Transaction; use vote_program::VoteProgram; -use window::WINDOW_SIZE; /// The number of most recent `last_id` values that the bank will track the signatures /// of. Once the bank discards a `last_id`, it will reject any transactions that use @@ -906,36 +905,6 @@ impl Bank { Ok(()) } - /// Process an ordered list of entries, populating a circular buffer "tail" - /// as we go. - fn process_entries_tail( - &self, - entries: &[Entry], - tail: &mut Vec, - tail_idx: &mut usize, - ) -> Result { - let mut entry_count = 0; - - for entry in entries { - if tail.len() > *tail_idx { - tail[*tail_idx] = entry.clone(); - } else { - tail.push(entry.clone()); - } - *tail_idx = (*tail_idx + 1) % WINDOW_SIZE as usize; - - entry_count += 1; - // TODO: We prepare for implementing voting contract by making the associated - // process_entries functions aware of the vote-tracking structure inside - // the leader scheduler. Next we will extract the vote tracking structure - // out of the leader scheduler, and into the bank, and remove the leader - // scheduler from these banking functions. - self.process_entry(entry)?; - } - - Ok(entry_count) - } - /// Process an ordered list of entries. pub fn process_entries(&self, entries: &[Entry]) -> Result<()> { self.par_process_entries(entries) @@ -998,44 +967,56 @@ impl Bank { Ok(()) } + /// Process an ordered list of entries, populating a circular buffer "tail" + /// as we go. + fn process_block(&self, entries: &[Entry]) -> Result<()> { + for entry in entries { + // TODO: We prepare for implementing voting contract by making the associated + // process_entries functions aware of the vote-tracking structure inside + // the leader scheduler. Next we will extract the vote tracking structure + // out of the leader scheduler, and into the bank, and remove the leader + // scheduler from these banking functions. + self.process_entry(entry)?; + } + + Ok(()) + } + /// Append entry blocks to the ledger, verifying them along the way. - fn process_blocks( + fn process_ledger_blocks( &self, start_hash: Hash, + entry_height: u64, entries: I, - tail: &mut Vec, - tail_idx: &mut usize, - ) -> Result + ) -> Result<(u64, Hash)> where I: IntoIterator, { + // these magic numbers are from genesis of the mint, could pull them + // back out of this loop. + let mut entry_height = entry_height; + let mut last_id = start_hash; + // Ledger verification needs to be parallelized, but we can't pull the whole // thing into memory. We therefore chunk it. - let mut entry_height = *tail_idx as u64; - - for entry in &tail[0..*tail_idx] { - if entry.is_tick() { - *self.tick_height.lock().unwrap() += 1; - } - } - - let mut id = start_hash; for block in &entries.into_iter().chunks(VERIFY_BLOCK_SIZE) { let block: Vec<_> = block.collect(); - if !block.verify(&id) { + + if !block.verify(&last_id) { warn!("Ledger proof of history failed at entry: {}", entry_height); return Err(BankError::LedgerVerificationFailed); } - id = block.last().unwrap().id; - let entry_count = self.process_entries_tail(&block, tail, tail_idx)?; - entry_height += entry_count; + self.process_block(&block)?; + + last_id = block.last().unwrap().id; + entry_height += block.len() as u64; } - Ok(entry_height) + Ok((entry_height, last_id)) } /// Process a full ledger. - pub fn process_ledger(&self, entries: I) -> Result<(u64, u64, Vec)> + pub fn process_ledger(&self, entries: I) -> Result<(u64, Hash)> where I: IntoIterator, { @@ -1071,20 +1052,8 @@ impl Bank { } self.register_entry_id(&entry0.id); self.register_entry_id(&entry1.id); - let entry1_id = entry1.id; - let mut tail = Vec::with_capacity(WINDOW_SIZE as usize); - tail.push(entry0); - tail.push(entry1); - let mut tail_idx = 2; - let entry_height = self.process_blocks(entry1_id, entries, &mut tail, &mut tail_idx)?; - - // check if we need to rotate tail - if tail.len() == WINDOW_SIZE as usize { - tail.rotate_left(tail_idx) - } - - Ok((*self.tick_height.lock().unwrap(), entry_height, tail)) + Ok(self.process_ledger_blocks(entry1.id, 2, entries)?) } /// Create, sign, and process a Transaction from `keypair` to `to` of @@ -1682,42 +1651,12 @@ mod tests { #[test] fn test_process_ledger_simple() { let (ledger, pubkey) = create_sample_ledger(1); - let (ledger, dup) = ledger.tee(); let bank = Bank::default(); - let (tick_height, ledger_height, tail) = bank.process_ledger(ledger).unwrap(); + let (ledger_height, last_id) = bank.process_ledger(ledger).unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(ledger_height, 4); - assert_eq!(tick_height, 2); - assert_eq!(tail.len(), 4); - assert_eq!(tail, dup.collect_vec()); - let last_entry = &tail[tail.len() - 1]; - // last entry is a tick - assert_eq!(0, last_entry.transactions.len()); - // tick is registered - assert_eq!(bank.last_id(), last_entry.id); - } - - #[test] - fn test_process_ledger_around_window_size() { - // TODO: put me back in when Criterion is up - // for _ in 0..10 { - // let (ledger, _) = create_sample_ledger(WINDOW_SIZE as usize); - // let bank = Bank::default(); - // let (_, _) = bank.process_ledger(ledger).unwrap(); - // } - - let window_size = 128; - for entry_count in window_size - 3..window_size + 2 { - let (ledger, pubkey) = create_sample_ledger(entry_count); - let bank = Bank::default(); - let (tick_height, ledger_height, tail) = bank.process_ledger(ledger).unwrap(); - assert_eq!(bank.get_balance(&pubkey), 1); - assert_eq!(ledger_height, entry_count as u64 + 3); - assert_eq!(tick_height, 2); - assert!(tail.len() <= WINDOW_SIZE as usize); - let last_entry = &tail[tail.len() - 1]; - assert_eq!(bank.last_id(), last_entry.id); - } + assert_eq!(bank.get_tick_height(), 1); + assert_eq!(bank.last_id(), last_id); } // Write the given entries to a file and then return a file iterator to them. diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 9944cd1da5..a83977ff00 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -48,7 +48,6 @@ impl BankingStage { verified_receiver: Receiver, config: Config, last_entry_id: &Hash, - tick_height: u64, max_tick_height: Option, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); @@ -57,7 +56,6 @@ impl BankingStage { bank.clone(), entry_sender, *last_entry_id, - tick_height, max_tick_height, false, vec![], @@ -267,7 +265,6 @@ mod tests { verified_receiver, Default::default(), &bank.last_id(), - 0, None, ); drop(verified_sender); @@ -286,7 +283,6 @@ mod tests { verified_receiver, Default::default(), &bank.last_id(), - 0, None, ); drop(entry_receiver); @@ -306,7 +302,6 @@ mod tests { verified_receiver, Config::Sleep(Duration::from_millis(1)), &bank.last_id(), - 0, None, ); sleep(Duration::from_millis(500)); @@ -333,7 +328,6 @@ mod tests { verified_receiver, Default::default(), &bank.last_id(), - 0, None, ); @@ -388,7 +382,6 @@ mod tests { verified_receiver, Default::default(), &bank.last_id(), - 0, None, ); @@ -442,7 +435,6 @@ mod tests { verified_receiver, Default::default(), &bank.last_id(), - 0, Some(max_tick_height), ); assert_eq!( diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index e2ab5a9496..83da063d0f 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -21,7 +21,7 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; use timing::duration_as_ms; -use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE}; +use window::{self, SharedWindow, WindowIndex, WindowUtil}; #[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStageReturnType { @@ -71,16 +71,16 @@ fn broadcast( let blobs_chunking = Instant::now(); // We could receive more blobs than window slots so // break them up into window-sized chunks to process - let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); + let window_size = window.read().unwrap().window_size(); + let blobs_chunked = blobs_vec.chunks(window_size as usize).map(|x| x.to_vec()); let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed()); - trace!("{}", window.read().unwrap().print(&id, *receive_index)); - let broadcast_start = Instant::now(); for mut blobs in blobs_chunked { let blobs_len = blobs.len(); trace!("{}: broadcast blobs.len: {}", id, blobs_len); + // TODO: move all this into window.rs // Index the blobs window::index_blobs(node_info, &blobs, receive_index) .expect("index blobs for initial window"); @@ -92,7 +92,7 @@ fn broadcast( assert!(blobs.len() <= win.len()); for b in &blobs { let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix % WINDOW_SIZE) as usize; + let pos = (ix % window_size) as usize; if let Some(x) = win[pos].data.take() { trace!( "{} popped {} at {}", @@ -114,7 +114,7 @@ fn broadcast( } for b in &blobs { let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix % WINDOW_SIZE) as usize; + let pos = (ix % window_size) as usize; trace!("{} caching {} at {}", id, ix, pos); assert!(win[pos].data.is_none()); win[pos].data = Some(b.clone()); diff --git a/src/drone.rs b/src/drone.rs index 3cd46b7901..0b9cf1288d 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -329,13 +329,13 @@ mod tests { let ledger_path = get_tmp_ledger_path("send_airdrop"); let vote_account_keypair = Arc::new(Keypair::new()); + let last_id = bank.last_id(); let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, 0, - 0, - &[], + &last_id, leader, None, &ledger_path, diff --git a/src/fullnode.rs b/src/fullnode.rs index 142ff80de6..578fbf8782 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -3,7 +3,6 @@ use bank::Bank; use broadcast_stage::BroadcastStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; -use entry::Entry; use hash::Hash; use leader_scheduler::LeaderScheduler; use ledger::read_ledger; @@ -21,7 +20,7 @@ use std::thread::Result; use tpu::{Tpu, TpuReturnType}; use tvu::{Tvu, TvuReturnType}; use untrusted::Input; -use window; +use window::{new_window, SharedWindow}; pub enum NodeRole { Leader(LeaderServices), @@ -96,7 +95,7 @@ pub struct Fullnode { cluster_info: Arc>, ledger_path: String, sigverify_disabled: bool, - shared_window: window::SharedWindow, + shared_window: SharedWindow, replicate_socket: Vec, repair_socket: UdpSocket, retransmit_socket: UdpSocket, @@ -142,7 +141,8 @@ impl Fullnode { let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); info!("creating bank..."); - let (bank, tick_height, entry_height, ledger_tail) = + + let (bank, entry_height, last_id) = Self::new_bank_from_ledger(ledger_path, leader_scheduler); info!("creating networking stack..."); @@ -160,9 +160,8 @@ impl Fullnode { keypair, vote_account_keypair, bank, - tick_height, entry_height, - &ledger_tail, + &last_id, node, leader_info.as_ref(), ledger_path, @@ -243,9 +242,8 @@ impl Fullnode { keypair: Arc, vote_account_keypair: Arc, bank: Bank, - tick_height: u64, entry_height: u64, - ledger_tail: &[Entry], + last_id: &Hash, node: Node, bootstrap_leader_info_option: Option<&NodeInfo>, ledger_path: &str, @@ -267,12 +265,7 @@ impl Fullnode { .expect("Failed to clone respond socket"), )); - let last_entry_id = &ledger_tail - .last() - .expect("Expected at least one entry in the ledger") - .id; - - let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info); + let window = new_window(32 * 1024); let shared_window = Arc::new(RwLock::new(window)); let cluster_info = Arc::new(RwLock::new( ClusterInfo::new(node.info).expect("ClusterInfo::new"), @@ -330,7 +323,7 @@ impl Fullnode { } else { let max_tick_height = { let ls_lock = bank.leader_scheduler.read().unwrap(); - ls_lock.max_height_for_leader(tick_height) + ls_lock.max_height_for_leader(bank.get_tick_height()) }; // Start in leader mode. let (tpu, entry_receiver, tpu_exit) = Tpu::new( @@ -343,9 +336,8 @@ impl Fullnode { .collect(), ledger_path, sigverify_disabled, - tick_height, max_tick_height, - last_entry_id, + last_id, ); let broadcast_stage = BroadcastStage::new( @@ -358,7 +350,7 @@ impl Fullnode { entry_height, entry_receiver, bank.leader_scheduler.clone(), - tick_height, + bank.get_tick_height(), tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); @@ -414,10 +406,10 @@ impl Fullnode { // Clear the leader scheduler new_leader_scheduler.reset(); - let (new_bank, scheduled_leader, tick_height, entry_height, last_entry_id) = { + let (new_bank, scheduled_leader, entry_height, last_entry_id) = { // TODO: We can avoid building the bank again once RecordStage is // integrated with BankingStage - let (new_bank, tick_height, entry_height, ledger_tail) = Self::new_bank_from_ledger( + let (new_bank, entry_height, last_id) = Self::new_bank_from_ledger( &self.ledger_path, Arc::new(RwLock::new(new_leader_scheduler)), ); @@ -427,16 +419,7 @@ impl Fullnode { .get_current_leader() .expect("Scheduled leader should exist after rebuilding bank"); - ( - new_bank, - scheduled_leader, - tick_height, - entry_height, - ledger_tail - .last() - .expect("Expected at least one entry in the ledger") - .id, - ) + (new_bank, scheduled_leader, entry_height, last_id) }; self.cluster_info @@ -467,6 +450,7 @@ impl Fullnode { // in the active set, then the leader scheduler will pick the same leader again, so // check for that if scheduled_leader == self.keypair.pubkey() { + let tick_height = self.bank.get_tick_height(); self.validator_to_leader(tick_height, entry_height, last_entry_id); Ok(()) } else { @@ -495,7 +479,7 @@ impl Fullnode { } } - fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_entry_id: Hash) { + fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_id: Hash) { self.cluster_info .write() .unwrap() @@ -515,12 +499,11 @@ impl Fullnode { .collect(), &self.ledger_path, self.sigverify_disabled, - tick_height, max_tick_height, // We pass the last_entry_id from the replicate stage because we can't trust that // the window didn't overwrite the slot at for the last entry that the replicate stage // processed. We also want to avoid reading processing the ledger for the last id. - &last_entry_id, + &last_id, ); let broadcast_stage = BroadcastStage::new( @@ -596,19 +579,19 @@ impl Fullnode { pub fn new_bank_from_ledger( ledger_path: &str, leader_scheduler: Arc>, - ) -> (Bank, u64, u64, Vec) { + ) -> (Bank, u64, Hash) { let mut bank = Bank::new_with_builtin_programs(); bank.leader_scheduler = leader_scheduler; let entries = read_ledger(ledger_path, true).expect("opening ledger"); let entries = entries .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); info!("processing ledger..."); - let (tick_height, entry_height, ledger_tail) = - bank.process_ledger(entries).expect("process_ledger"); + + let (entry_height, last_id) = bank.process_ledger(entries).expect("process_ledger"); // entry_height is the network-wide agreed height of the ledger. // initialize it from the input ledger info!("processed {} ledger...", entry_height); - (bank, tick_height, entry_height, ledger_tail) + (bank, entry_height, last_id) } pub fn get_leader_scheduler(&self) -> &Arc> { @@ -707,13 +690,13 @@ mod tests { ))); bank.leader_scheduler = leader_scheduler; + let last_id = bank.last_id(); let v = Fullnode::new_with_bank( Arc::new(keypair), Arc::new(Keypair::new()), bank, - 0, entry_height, - &genesis_entries, + &last_id, tn, Some(&entry), &validator_ledger_path, @@ -737,21 +720,19 @@ mod tests { let mut bank = Bank::new(&mint); let entry = tn.info.clone(); - let genesis_entries = &mint.create_entries(); - let entry_height = genesis_entries.len() as u64; - let leader_scheduler = Arc::new(RwLock::new( LeaderScheduler::from_bootstrap_leader(entry.id), )); bank.leader_scheduler = leader_scheduler; + let entry_height = mint.create_entries().len() as u64; + let last_id = bank.last_id(); Fullnode::new_with_bank( Arc::new(keypair), Arc::new(Keypair::new()), bank, - 0, entry_height, - &genesis_entries, + &last_id, tn, Some(&entry), &validator_ledger_path, @@ -788,6 +769,7 @@ mod tests { let initial_tick_height = genesis_entries .iter() + .skip(2) .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); // Create the common leader scheduling configuration @@ -873,6 +855,7 @@ mod tests { let genesis_tick_height = genesis_entries .iter() + .skip(2) .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64) + num_ending_ticks as u64; ledger_writer.write_entries(&active_set_entries).unwrap(); @@ -970,6 +953,7 @@ mod tests { make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); let initial_tick_height = genesis_entries .iter() + .skip(2) .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; let active_set_entries_len = active_set_entries.len() as u64; @@ -1053,12 +1037,12 @@ mod tests { // Check the validator ledger for the correct entry + tick heights, we should've // transitioned after tick_height = bootstrap_height. - let (_, tick_height, entry_height, _) = Fullnode::new_bank_from_ledger( + let (bank, entry_height, _) = Fullnode::new_bank_from_ledger( &validator_ledger_path, Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))), ); - assert_eq!(tick_height, bootstrap_height); + assert_eq!(bank.get_tick_height(), bootstrap_height); assert_eq!( entry_height, // Only the first genesis entry has num_hashes = 0, every other entry diff --git a/src/ledger.rs b/src/ledger.rs index f8f0aeee5c..34deeb0f59 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -27,7 +27,6 @@ use std::path::Path; use transaction::Transaction; use vote_program::Vote; use vote_transaction::VoteTransaction; -use window::WINDOW_SIZE; // // A persistent ledger is 2 files: @@ -78,6 +77,10 @@ pub struct LedgerWindow { pub const LEDGER_DATA_FILE: &str = "data"; const LEDGER_INDEX_FILE: &str = "index"; +const LEDGER_BUF_COUNT: usize = 32 * 1024; +const LEDGER_DATA_BUF_SIZE: usize = LEDGER_BUF_COUNT * BLOB_DATA_SIZE; +const LEDGER_INDEX_BUF_SIZE: usize = LEDGER_BUF_COUNT * SIZEOF_U64 as usize; + // use a CONST because there's a cast, and we don't want "sizeof:: as u64"... const SIZEOF_U64: u64 = size_of::() as u64; @@ -113,9 +116,9 @@ impl LedgerWindow { let ledger_path = Path::new(&ledger_path); let index = File::open(ledger_path.join(LEDGER_INDEX_FILE))?; - let index = BufReader::with_capacity((WINDOW_SIZE * SIZEOF_U64) as usize, index); + let index = BufReader::with_capacity(LEDGER_INDEX_BUF_SIZE, index); let data = File::open(ledger_path.join(LEDGER_DATA_FILE))?; - let data = BufReader::with_capacity(WINDOW_SIZE as usize * BLOB_DATA_SIZE, data); + let data = BufReader::with_capacity(LEDGER_DATA_BUF_SIZE, data); Ok(LedgerWindow { index, data }) } @@ -185,10 +188,10 @@ pub fn verify_ledger(ledger_path: &str) -> io::Result<()> { format!("index is not a multiple of {} bytes long", SIZEOF_U64), ))?; } - let mut index = BufReader::with_capacity((WINDOW_SIZE * SIZEOF_U64) as usize, index); + let mut index = BufReader::with_capacity(LEDGER_INDEX_BUF_SIZE, index); let data = File::open(ledger_path.join(LEDGER_DATA_FILE))?; - let mut data = BufReader::with_capacity(WINDOW_SIZE as usize * BLOB_DATA_SIZE, data); + let mut data = BufReader::with_capacity(LEDGER_DATA_BUF_SIZE, data); let mut last_data_offset = 0; let mut index_offset = 0; @@ -503,6 +506,7 @@ impl Block for [Entry] { } } +// TODO: move this to the right file, entry.rs? pub fn reconstruct_entries_from_blobs(blobs: Vec) -> Result> { let mut entries: Vec = Vec::with_capacity(blobs.len()); diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index 96acf66e1f..1588b8099e 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -82,12 +82,11 @@ impl PohRecorder { bank: Arc, sender: Sender>, last_entry_id: Hash, - tick_height: u64, max_tick_height: Option, is_virtual: bool, virtual_tick_entries: Vec, ) -> Self { - let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height))); + let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, bank.get_tick_height()))); let virtual_tick_entries = Arc::new(Mutex::new(virtual_tick_entries)); PohRecorder { poh, @@ -156,8 +155,7 @@ mod tests { let bank = Arc::new(Bank::new(&mint)); let last_id = bank.last_id(); let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = - PohRecorder::new(bank, entry_sender, last_id, 0, None, false, vec![]); + let mut poh_recorder = PohRecorder::new(bank, entry_sender, last_id, None, false, vec![]); //send some data let h1 = hash(b"hello world!"); diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index c6c6f9b1b4..79f0a2205c 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -276,6 +276,7 @@ mod test { last_id = active_set_entries.last().unwrap().id; let initial_tick_height = genesis_entries .iter() + .skip(2) .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64); let active_set_entries_len = active_set_entries.len() as u64; let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; @@ -300,7 +301,7 @@ mod test { Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))); // Set up the bank - let (bank, _, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); + let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); // Set up the replicate stage let (entry_sender, entry_receiver) = channel(); diff --git a/src/replicator.rs b/src/replicator.rs index ac53ce737c..4d77dcb315 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -79,7 +79,8 @@ impl Replicator { network_addr: Option, done: Arc, ) -> (Replicator, NodeInfo) { - let window = window::new_window_from_entries(&[], entry_height, &node.info); + const REPLICATOR_WINDOW_SIZE: usize = 32 * 1024; + let window = window::new_window(REPLICATOR_WINDOW_SIZE); let shared_window = Arc::new(RwLock::new(window)); let cluster_info = Arc::new(RwLock::new( diff --git a/src/rpc.rs b/src/rpc.rs index 7ba02da438..d2d93a282e 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -610,22 +610,19 @@ mod tests { let serial_tx = serialize(&tx).unwrap(); let rpc_port = 22222; // Needs to be distinct known number to not conflict with other tests - let genesis_entries = &alice.create_entries(); - let entry_height = genesis_entries.len() as u64; - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_data.id, ))); bank.leader_scheduler = leader_scheduler; let vote_account_keypair = Arc::new(Keypair::new()); + let entry_height = alice.create_entries().len() as u64; let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, - 0, entry_height, - &genesis_entries, + &last_id, leader, None, &ledger_path, diff --git a/src/settings.rs.foo b/src/settings.rs.foo new file mode 100644 index 0000000000..a14a9bb98b --- /dev/null +++ b/src/settings.rs.foo @@ -0,0 +1,30 @@ +//! The `config` module pulls together global configuration +//! +use config::{Config, Value}; +use serde::de::Deserialize; +use std::sync::RwLock; + +lazy_static! { + static ref SETTINGS: RwLock = { + let settings = RwLock::new(Config::default()); + { + let mut settings = settings.write().unwrap(); + + // defaults go here + settings.set_default("window_size", 32*1024).unwrap(); + + } + settings + }; +} + +pub fn get<'de, T: Deserialize<'de>>(key: &'de str) -> T { + SETTINGS.read().unwrap().get(key).unwrap() +} + +pub fn set(key: &str, value: T) -> () +where + T: Into, +{ + SETTINGS.write().unwrap().set(key, value).unwrap(); +} diff --git a/src/thin_client.rs b/src/thin_client.rs index 4feb155de5..e77b6ed891 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -502,13 +502,13 @@ mod tests { ))); bank.leader_scheduler = leader_scheduler; let vote_account_keypair = Arc::new(Keypair::new()); + let last_id = bank.last_id(); let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, 0, - 0, - &[], + &last_id, leader, None, &ledger_path, @@ -555,13 +555,13 @@ mod tests { ))); bank.leader_scheduler = leader_scheduler; let vote_account_keypair = Arc::new(Keypair::new()); + let last_id = bank.last_id(); let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, 0, - 0, - &[], + &last_id, leader, None, &ledger_path, @@ -616,21 +616,19 @@ mod tests { let leader_data = leader.info.clone(); let ledger_path = create_tmp_ledger_with_mint("client_check_signature", &alice); - let genesis_entries = &alice.create_entries(); - let entry_height = genesis_entries.len() as u64; - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_data.id, ))); bank.leader_scheduler = leader_scheduler; let vote_account_keypair = Arc::new(Keypair::new()); + let entry_height = alice.create_entries().len() as u64; + let last_id = bank.last_id(); let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, - 0, entry_height, - &genesis_entries, + &last_id, leader, None, &ledger_path, @@ -686,21 +684,19 @@ mod tests { let leader_data = leader.info.clone(); let ledger_path = create_tmp_ledger_with_mint("zero_balance_check", &alice); - let genesis_entries = &alice.create_entries(); - let entry_height = genesis_entries.len() as u64; - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_data.id, ))); bank.leader_scheduler = leader_scheduler; let vote_account_keypair = Arc::new(Keypair::new()); + let last_id = bank.last_id(); + let entry_height = alice.create_entries().len() as u64; let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, - 0, entry_height, - &genesis_entries, + &last_id, leader, None, &ledger_path, diff --git a/src/tpu.rs b/src/tpu.rs index 531be39591..9b7d43463b 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -61,7 +61,6 @@ impl Tpu { transactions_sockets: Vec, ledger_path: &str, sigverify_disabled: bool, - tick_height: u64, max_tick_height: Option, last_entry_id: &Hash, ) -> (Self, Receiver>, Arc) { @@ -77,7 +76,6 @@ impl Tpu { verified_receiver, tick_duration, last_entry_id, - tick_height, max_tick_height, ); diff --git a/src/wallet.rs b/src/wallet.rs index c82525d923..cd236d42c8 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -1092,13 +1092,14 @@ mod tests { ))); bank.leader_scheduler = leader_scheduler; let vote_account_keypair = Arc::new(Keypair::new()); + let last_id = bank.last_id(); + let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, 0, - 0, - &[], + &last_id, leader, None, &ledger_path, @@ -1167,21 +1168,19 @@ mod tests { let rpc_port = 11111; // Needs to be distinct known number to not conflict with other tests - let genesis_entries = &alice.create_entries(); - let entry_height = genesis_entries.len() as u64; - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_data.id, ))); bank.leader_scheduler = leader_scheduler; let vote_account_keypair = Arc::new(Keypair::new()); + let last_id = bank.last_id(); + let entry_height = alice.create_entries().len() as u64; let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, - 0, entry_height, - &genesis_entries, + &last_id, leader, None, &ledger_path, @@ -1258,13 +1257,13 @@ mod tests { ))); bank.leader_scheduler = leader_scheduler; let vote_account_keypair = Arc::new(Keypair::new()); + let last_id = bank.last_id(); let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, 0, - 0, - &[], + &last_id, leader, None, &ledger_path, @@ -1377,19 +1376,18 @@ mod tests { let mut config_payer = WalletConfig::default(); let mut config_witness = WalletConfig::default(); let rpc_port = 11223; // Needs to be distinct known number to not conflict with other tests - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_data.id, ))); bank.leader_scheduler = leader_scheduler; let vote_account_keypair = Arc::new(Keypair::new()); + let last_id = bank.last_id(); let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, 0, - 0, - &[], + &last_id, leader, None, &ledger_path, @@ -1506,13 +1504,13 @@ mod tests { ))); bank.leader_scheduler = leader_scheduler; let vote_account_keypair = Arc::new(Keypair::new()); + let last_id = bank.last_id(); let server = Fullnode::new_with_bank( leader_keypair, vote_account_keypair, bank, 0, - 0, - &[], + &last_id, leader, None, &ledger_path, diff --git a/src/window.rs b/src/window.rs index 970354afa9..bff981ee9d 100644 --- a/src/window.rs +++ b/src/window.rs @@ -6,7 +6,7 @@ use entry::Entry; #[cfg(feature = "erasure")] use erasure; use leader_scheduler::LeaderScheduler; -use ledger::{reconstruct_entries_from_blobs, Block}; +use ledger::reconstruct_entries_from_blobs; use log::Level; use packet::SharedBlob; use result::Result; @@ -17,8 +17,6 @@ use std::net::SocketAddr; use std::sync::atomic::AtomicUsize; use std::sync::{Arc, RwLock}; -pub const WINDOW_SIZE: u64 = 32 * 1024; - #[derive(Default, Clone)] pub struct WindowSlot { pub data: Option, @@ -52,6 +50,8 @@ pub trait WindowUtil { /// Finds available slots, clears them, and returns their indices. fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec; + fn window_size(&self) -> u64; + #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn repair( &mut self, @@ -79,13 +79,15 @@ pub trait WindowUtil { leader_unknown: bool, pending_retransmits: &mut bool, ); + + fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool; } impl WindowUtil for Window { fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec { (consumed..received) .filter_map(|pix| { - let i = (pix % WINDOW_SIZE) as usize; + let i = (pix % self.window_size()) as usize; if let Some(blob_idx) = self[i].blob_index() { if blob_idx == pix { return None; @@ -96,6 +98,43 @@ impl WindowUtil for Window { }).collect() } + fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool { + // Prevent receive window from running over + // Got a blob which has already been consumed, skip it + // probably from a repair window request + if pix < consumed { + trace!( + "{}: received: {} but older than consumed: {} skipping..", + id, + pix, + consumed + ); + false + } else { + // received always has to be updated even if we don't accept the packet into + // the window. The worst case here is the server *starts* outside + // the window, none of the packets it receives fits in the window + // and repair requests (which are based on received) are never generated + *received = cmp::max(pix, *received); + + if pix >= consumed + self.window_size() { + trace!( + "{}: received: {} will overrun window: {} skipping..", + id, + pix, + consumed + self.window_size() + ); + false + } else { + true + } + } + } + + fn window_size(&self) -> u64 { + self.len() as u64 + } + fn repair( &mut self, cluster_info: &Arc>, @@ -144,7 +183,14 @@ impl WindowUtil for Window { let num_peers = rcluster_info.table.len() as u64; let max_repair = if max_entry_height == 0 { - calculate_max_repair(num_peers, consumed, received, times, is_next_leader) + calculate_max_repair( + num_peers, + consumed, + received, + times, + is_next_leader, + self.window_size(), + ) } else { max_entry_height + 1 }; @@ -181,7 +227,7 @@ impl WindowUtil for Window { .iter() .enumerate() .map(|(i, _v)| { - if i == (consumed % WINDOW_SIZE) as usize { + if i == (consumed % self.window_size()) as usize { "V" } else { " " @@ -237,7 +283,7 @@ impl WindowUtil for Window { leader_unknown: bool, pending_retransmits: &mut bool, ) { - let w = (pix % WINDOW_SIZE) as usize; + let w = (pix % self.window_size()) as usize; let is_coding = blob.read().unwrap().is_coding(); @@ -283,14 +329,15 @@ impl WindowUtil for Window { #[cfg(feature = "erasure")] { - if erasure::recover(id, self, *consumed, (*consumed % WINDOW_SIZE) as usize).is_err() { + let window_size = self.window_size(); + if erasure::recover(id, self, *consumed, (*consumed % window_size) as usize).is_err() { trace!("{}: erasure::recover failed", id); } } // push all contiguous blobs into consumed queue, increment consumed loop { - let k = (*consumed % WINDOW_SIZE) as usize; + let k = (*consumed % self.window_size()) as usize; trace!("{}: k: {} consumed: {}", id, k, *consumed,); let k_data_blob; @@ -334,6 +381,7 @@ fn calculate_max_repair( received: u64, times: usize, is_next_leader: bool, + window_size: u64, ) -> u64 { // Calculate the highest blob index that this node should have already received // via avalanche. The avalanche splits data stream into nodes and each node retransmits @@ -350,44 +398,15 @@ fn calculate_max_repair( // This check prevents repairing a blob that will cause window to roll over. Even if // the highes_lost blob is actually missing, asking to repair it might cause our // current window to move past other missing blobs - cmp::min(consumed + WINDOW_SIZE - 1, max_repair) + cmp::min(consumed + window_size - 1, max_repair) } -pub fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool { - // Prevent receive window from running over - // Got a blob which has already been consumed, skip it - // probably from a repair window request - if pix < consumed { - trace!( - "{}: received: {} but older than consumed: {} skipping..", - id, - pix, - consumed - ); - false - } else { - // received always has to be updated even if we don't accept the packet into - // the window. The worst case here is the server *starts* outside - // the window, none of the packets it receives fits in the window - // and repair requests (which are based on received) are never generated - *received = cmp::max(pix, *received); - - if pix >= consumed + WINDOW_SIZE { - trace!( - "{}: received: {} will overrun window: {} skipping..", - id, - pix, - consumed + WINDOW_SIZE - ); - false - } else { - true - } - } +pub fn new_window(window_size: usize) -> Window { + (0..window_size).map(|_| WindowSlot::default()).collect() } pub fn default_window() -> Window { - (0..WINDOW_SIZE).map(|_| WindowSlot::default()).collect() + (0..2048).map(|_| WindowSlot::default()).collect() } pub fn index_blobs( @@ -410,52 +429,6 @@ pub fn index_blobs( Ok(()) } -/// Initialize a rebroadcast window with most recent Entry blobs -/// * `cluster_info` - gossip instance, used to set blob ids -/// * `blobs` - up to WINDOW_SIZE most recent blobs -/// * `entry_height` - current entry height -pub fn initialized_window( - node_info: &NodeInfo, - blobs: Vec, - entry_height: u64, -) -> Window { - let mut window = default_window(); - let id = node_info.id; - - trace!( - "{} initialized window entry_height:{} blobs_len:{}", - id, - entry_height, - blobs.len() - ); - - // Index the blobs - let mut received = entry_height - blobs.len() as u64; - index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window"); - - // populate the window, offset by implied index - let diff = cmp::max(blobs.len() as isize - window.len() as isize, 0) as usize; - for b in blobs.into_iter().skip(diff) { - let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix % WINDOW_SIZE) as usize; - trace!("{} caching {} at {}", id, ix, pos); - assert!(window[pos].data.is_none()); - window[pos].data = Some(b); - } - - window -} - -pub fn new_window_from_entries( - ledger_tail: &[Entry], - entry_height: u64, - node_info: &NodeInfo, -) -> Window { - // convert to blobs - let blobs = ledger_tail.to_blobs(); - initialized_window(&node_info, blobs, entry_height) -} - #[cfg(test)] mod test { use packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; @@ -468,7 +441,7 @@ mod test { use std::sync::Arc; use std::time::Duration; use streamer::{receiver, responder, PacketReceiver}; - use window::{blob_idx_in_window, calculate_max_repair, WINDOW_SIZE}; + use window::{calculate_max_repair, new_window, Window, WindowUtil}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { @@ -530,59 +503,79 @@ mod test { } #[test] - pub fn calculate_max_repair_test() { - assert_eq!(calculate_max_repair(0, 10, 90, 0, false), 90); - assert_eq!(calculate_max_repair(15, 10, 90, 32, false), 90); - assert_eq!(calculate_max_repair(15, 10, 90, 0, false), 75); - assert_eq!(calculate_max_repair(90, 10, 90, 0, false), 10); - assert_eq!(calculate_max_repair(90, 10, 50, 0, false), 10); - assert_eq!(calculate_max_repair(90, 10, 99, 0, false), 10); - assert_eq!(calculate_max_repair(90, 10, 101, 0, false), 11); + pub fn test_calculate_max_repair() { + const WINDOW_SIZE: u64 = 200; + + assert_eq!(calculate_max_repair(0, 10, 90, 0, false, WINDOW_SIZE), 90); + assert_eq!(calculate_max_repair(15, 10, 90, 32, false, WINDOW_SIZE), 90); + assert_eq!(calculate_max_repair(15, 10, 90, 0, false, WINDOW_SIZE), 75); + assert_eq!(calculate_max_repair(90, 10, 90, 0, false, WINDOW_SIZE), 10); + assert_eq!(calculate_max_repair(90, 10, 50, 0, false, WINDOW_SIZE), 10); + assert_eq!(calculate_max_repair(90, 10, 99, 0, false, WINDOW_SIZE), 10); + assert_eq!(calculate_max_repair(90, 10, 101, 0, false, WINDOW_SIZE), 11); assert_eq!( - calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0, false), + calculate_max_repair(90, 10, 95 + WINDOW_SIZE, 0, false, WINDOW_SIZE), WINDOW_SIZE + 5 ); assert_eq!( - calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0, false), + calculate_max_repair(90, 10, 99 + WINDOW_SIZE, 0, false, WINDOW_SIZE), WINDOW_SIZE + 9 ); assert_eq!( - calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0, false), + calculate_max_repair(90, 10, 100 + WINDOW_SIZE, 0, false, WINDOW_SIZE), WINDOW_SIZE + 9 ); assert_eq!( - calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0, false), + calculate_max_repair(90, 10, 120 + WINDOW_SIZE, 0, false, WINDOW_SIZE), WINDOW_SIZE + 9 ); assert_eq!( - calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, false), + calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, false, WINDOW_SIZE), WINDOW_SIZE ); assert_eq!( - calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, true), + calculate_max_repair(50, 100, 50 + WINDOW_SIZE, 0, true, WINDOW_SIZE), 50 + WINDOW_SIZE ); } - fn wrap_blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: u64) -> (bool, u64) { + fn wrap_blob_idx_in_window( + window: &Window, + id: &Pubkey, + pix: u64, + consumed: u64, + received: u64, + ) -> (bool, u64) { let mut received = received; - let is_in_window = blob_idx_in_window(&id, pix, consumed, &mut received); + let is_in_window = window.blob_idx_in_window(&id, pix, consumed, &mut received); (is_in_window, received) } #[test] - pub fn blob_idx_in_window_test() { + pub fn test_blob_idx_in_window() { let id = Pubkey::default(); + const WINDOW_SIZE: u64 = 200; + let window = new_window(WINDOW_SIZE as usize); + assert_eq!( - wrap_blob_idx_in_window(&id, 90 + WINDOW_SIZE, 90, 100), + wrap_blob_idx_in_window(&window, &id, 90 + WINDOW_SIZE, 90, 100), (false, 90 + WINDOW_SIZE) ); assert_eq!( - wrap_blob_idx_in_window(&id, 91 + WINDOW_SIZE, 90, 100), + wrap_blob_idx_in_window(&window, &id, 91 + WINDOW_SIZE, 90, 100), (false, 91 + WINDOW_SIZE) ); - assert_eq!(wrap_blob_idx_in_window(&id, 89, 90, 100), (false, 100)); + assert_eq!( + wrap_blob_idx_in_window(&window, &id, 89, 90, 100), + (false, 100) + ); - assert_eq!(wrap_blob_idx_in_window(&id, 91, 90, 100), (true, 100)); - assert_eq!(wrap_blob_idx_in_window(&id, 101, 90, 100), (true, 101)); + assert_eq!( + wrap_blob_idx_in_window(&window, &id, 91, 90, 100), + (true, 100) + ); + assert_eq!( + wrap_blob_idx_in_window(&window, &id, 101, 90, 100), + (true, 101) + ); } } diff --git a/src/window_service.rs b/src/window_service.rs index af90ed6c89..12d94ef2f7 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -19,7 +19,7 @@ use std::thread::{Builder, JoinHandle}; use std::time::{Duration, Instant}; use streamer::{BlobReceiver, BlobSender}; use timing::duration_as_ms; -use window::{blob_idx_in_window, SharedWindow, WindowUtil}; +use window::{SharedWindow, WindowUtil}; pub const MAX_REPAIR_BACKOFF: usize = 128; @@ -206,7 +206,11 @@ fn recv_window( }; pixs.push(pix); - if !blob_idx_in_window(&id, pix, *consumed, received) { + if !window + .read() + .unwrap() + .blob_idx_in_window(&id, pix, *consumed, received) + { continue; } diff --git a/tests/multinode.rs b/tests/multinode.rs index a68d35ca05..dacb2b0787 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -24,7 +24,7 @@ use solana::system_transaction::SystemTransaction; use solana::thin_client::ThinClient; use solana::timing::{duration_as_ms, duration_as_s}; use solana::transaction::Transaction; -use solana::window::{default_window, WINDOW_SIZE}; +use solana::window::default_window; use solana_sdk::pubkey::Pubkey; use std::collections::{HashSet, VecDeque}; use std::env; @@ -126,7 +126,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { // write a bunch more ledger into leader's ledger, this should populate his window // and force him to respond to repair from the ledger window { - let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize); + let entries = make_tiny_test_entries(alice.last_id(), 100); let mut writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); writer.write_entries(&entries).unwrap(); @@ -897,12 +897,12 @@ fn test_leader_to_validator_transition() { // Check the ledger to make sure it's the right height, we should've // transitioned after tick_height == bootstrap_height - let (_, tick_height, _, _) = Fullnode::new_bank_from_ledger( + let (bank, _, _) = Fullnode::new_bank_from_ledger( &leader_ledger_path, Arc::new(RwLock::new(LeaderScheduler::default())), ); - assert_eq!(tick_height, bootstrap_height); + assert_eq!(bank.get_tick_height(), bootstrap_height); // Shut down ncp.close().unwrap();