diff --git a/src/bank.rs b/src/bank.rs index d9f3182895..491723ebe2 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -19,6 +19,7 @@ use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::RwLock; use std::time::Instant; +use streamer::WINDOW_SIZE; use timing::duration_as_us; use transaction::{Instruction, Plan, Transaction}; @@ -306,10 +307,7 @@ impl Bank { } /// Process an ordered list of entries. - pub fn process_entries(&self, entries: I) -> Result - where - I: IntoIterator, - { + pub fn process_entries(&self, entries: Vec) -> Result { let mut entry_count = 0; for entry in entries { entry_count += 1; @@ -348,7 +346,7 @@ impl Bank { } /// Process a full ledger. - pub fn process_ledger(&self, entries: I) -> Result + pub fn process_ledger(&self, entries: I) -> Result<(u64, Vec)> where I: IntoIterator, { @@ -364,20 +362,39 @@ impl Bank { let entry1 = entries .next() .expect("invalid ledger: need at least 2 entries"); - let tx = &entry1.transactions[0]; - let deposit = if let Instruction::NewContract(contract) = &tx.instruction { - contract.plan.final_payment() - } else { - None - }.expect("invalid ledger, needs to start with a contract"); + { + let tx = &entry1.transactions[0]; + let deposit = if let Instruction::NewContract(contract) = &tx.instruction { + contract.plan.final_payment() + } else { + None + }.expect("invalid ledger, needs to start with a contract"); - self.apply_payment(&deposit, &mut self.balances.write().unwrap()); + self.apply_payment(&deposit, &mut self.balances.write().unwrap()); + } self.register_entry_id(&entry0.id); self.register_entry_id(&entry1.id); let mut entry_count = 2; - entry_count += self.process_blocks(entries)?; - Ok(entry_count) + let mut tail = Vec::with_capacity(WINDOW_SIZE as usize); + let mut next = Vec::with_capacity(WINDOW_SIZE as usize); + + for block in &entries.into_iter().chunks(WINDOW_SIZE as usize) { + tail = next; + next = block.collect(); + entry_count += self.process_blocks(next.clone())?; + } + + tail.append(&mut next); + + if tail.len() < WINDOW_SIZE as usize { + tail.insert(0, entry1); + if tail.len() < WINDOW_SIZE as usize { + tail.insert(0, entry0); + } + } + + Ok((entry_count, tail)) } /// Process a Witness Signature. Any payment plans waiting on this signature @@ -483,9 +500,9 @@ mod tests { use super::*; use bincode::serialize; use entry::next_entry; + use entry::Entry; use entry_writer::{self, EntryWriter}; use hash::hash; - use ledger::next_entries; use signature::KeyPairUtil; use std::io::{BufReader, Cursor, Seek, SeekFrom}; @@ -720,25 +737,52 @@ mod tests { assert_eq!(bank.get_balance(&mint.pubkey()), 1); } - fn create_sample_block(mint: &Mint) -> impl Iterator { - let keypair = KeyPair::new(); - let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id()); - next_entries(&mint.last_id(), 0, vec![tx]).into_iter() + fn create_sample_block(mint: &Mint, length: usize) -> impl Iterator { + let mut entries = Vec::with_capacity(length); + let mut hash = mint.last_id(); + let mut cur_hashes = 0; + for _ in 0..length { + let keypair = KeyPair::new(); + let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id()); + let entry = Entry::new_mut(&mut hash, &mut cur_hashes, vec![tx], false); + entries.push(entry); + } + entries.into_iter() } - - fn create_sample_ledger() -> (impl Iterator, PublicKey) { - let mint = Mint::new(2); + fn create_sample_ledger(length: usize) -> (impl Iterator, PublicKey) { + let mint = Mint::new(1 + length as i64); let genesis = mint.create_entries(); - let block = create_sample_block(&mint); + let block = create_sample_block(&mint, length); (genesis.into_iter().chain(block), mint.pubkey()) } #[test] fn test_process_ledger() { - let (ledger, pubkey) = create_sample_ledger(); + let (ledger, pubkey) = create_sample_ledger(1); + let (ledger, dup) = ledger.tee(); let bank = Bank::default(); - bank.process_ledger(ledger).unwrap(); + let (ledger_height, tail) = bank.process_ledger(ledger).unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); + assert_eq!(ledger_height, 3); + assert_eq!(tail.len(), 3); + assert_eq!(tail, dup.collect_vec()); + let last_entry = &tail[tail.len() - 1]; + assert_eq!(bank.last_id(), last_entry.id); + } + + #[test] + fn test_process_ledger_around_window_size() { + let window_size = WINDOW_SIZE as usize; + for entry_count in window_size - 1..window_size + 1 { + let (ledger, pubkey) = create_sample_ledger(entry_count); + let bank = Bank::default(); + let (ledger_height, tail) = bank.process_ledger(ledger).unwrap(); + assert_eq!(bank.get_balance(&pubkey), 1); + assert_eq!(ledger_height, entry_count as u64 + 2); + assert!(tail.len() <= window_size); + let last_entry = &tail[tail.len() - 1]; + assert_eq!(bank.last_id(), last_entry.id); + } } // Write the given entries to a file and then return a file iterator to them. @@ -753,7 +797,7 @@ mod tests { #[test] fn test_process_ledger_from_file() { - let (ledger, pubkey) = create_sample_ledger(); + let (ledger, pubkey) = create_sample_ledger(1); let ledger = to_file_iter(ledger); let bank = Bank::default(); @@ -765,7 +809,7 @@ mod tests { fn test_process_ledger_from_files() { let mint = Mint::new(2); let genesis = to_file_iter(mint.create_entries().into_iter()); - let block = to_file_iter(create_sample_block(&mint)); + let block = to_file_iter(create_sample_block(&mint, 1)); let bank = Bank::default(); bank.process_ledger(genesis.chain(block)).unwrap(); diff --git a/src/crdt.rs b/src/crdt.rs index 4beb3073b8..e7338c4277 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -388,7 +388,7 @@ impl Crdt { //filter myself false } else if v.replicate_addr == daddr { - //filter nodes that are not listening + trace!("broadcast skip not listening {:x}", v.debug_id()); false } else { trace!("broadcast node {}", v.replicate_addr); @@ -400,7 +400,7 @@ impl Crdt { warn!("crdt too small"); Err(CrdtError::TooSmall)?; } - trace!("nodes table {}", nodes.len()); + trace!("broadcast nodes {}", nodes.len()); // enumerate all the blobs in the window, those are the indices // transmit them to nodes, starting from a different node @@ -414,7 +414,7 @@ impl Crdt { orders.push((window_l[k].clone(), nodes[is % nodes.len()])); } - trace!("orders table {}", orders.len()); + trace!("broadcast orders table {}", orders.len()); let errs: Vec<_> = orders .into_iter() .map(|(b, v)| { @@ -471,13 +471,14 @@ impl Crdt { trace!("skip retransmit to leader {:?}", v.id); false } else if v.replicate_addr == daddr { - trace!("skip nodes that are not listening {:?}", v.id); + trace!("retransmit skip not listening {:x}", v.debug_id()); false } else { true } }) .collect(); + trace!("retransmit orders {}", orders.len()); let errs: Vec<_> = orders .par_iter() .map(|v| { @@ -757,6 +758,11 @@ impl Crdt { } return Some(out); + } else { + info!( + "requested ix {} != blob_ix {}, outside window!", + ix, blob_ix + ); } } else { assert!(window.read().unwrap()[pos].is_none()); diff --git a/src/drone.rs b/src/drone.rs index 88ae672d0b..6c67587f96 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -273,6 +273,7 @@ mod tests { let server = FullNode::new_leader( bank, 0, + None, Some(Duration::from_millis(30)), leader, exit.clone(), diff --git a/src/fullnode.rs b/src/fullnode.rs index 080745a6cf..26ba6eae53 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -2,11 +2,14 @@ use bank::Bank; use crdt::{Crdt, ReplicatedData, TestNode}; +use entry::Entry; use entry_writer; +use ledger::Block; use ncp::Ncp; use packet::BlobRecycler; use rpu::Rpu; use service::Service; +use std::collections::VecDeque; use std::fs::{File, OpenOptions}; use std::io::{sink, stdin, stdout, BufReader}; use std::io::{Read, Write}; @@ -51,9 +54,9 @@ impl FullNode { }; let reader = BufReader::new(infile); let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry")); - info!("processing ledger..."); - let entry_height = bank.process_ledger(entries).expect("process_ledger"); + info!("processing ledger..."); + let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger"); // entry_height is the network-wide agreed height of the ledger. // initialize it from the input ledger info!("processed {} ledger...", entry_height); @@ -74,6 +77,7 @@ impl FullNode { let server = FullNode::new_validator( bank, entry_height, + Some(ledger_tail), node, network_entry_point, exit.clone(), @@ -100,6 +104,7 @@ impl FullNode { let server = FullNode::new_leader( bank, entry_height, + Some(ledger_tail), //Some(Duration::from_millis(1000)), None, node, @@ -113,6 +118,27 @@ impl FullNode { server } } + + fn new_window( + ledger_tail: Option>, + entry_height: u64, + crdt: &Arc>, + blob_recycler: &BlobRecycler, + ) -> streamer::Window { + match ledger_tail { + Some(ledger_tail) => { + // convert to blobs + let mut blobs = VecDeque::new(); + ledger_tail.to_blobs(&blob_recycler, &mut blobs); + + // flatten deque to vec + let blobs: Vec<_> = blobs.into_iter().collect(); + streamer::initialized_window(&crdt, blobs, entry_height) + } + None => streamer::default_window(), + } + } + /// Create a server instance acting as a leader. /// /// ```text @@ -140,6 +166,7 @@ impl FullNode { pub fn new_leader( bank: Bank, entry_height: u64, + ledger_tail: Option>, tick_duration: Option, node: TestNode, exit: Arc, @@ -166,7 +193,9 @@ impl FullNode { ); thread_hdls.extend(tpu.thread_hdls()); let crdt = Arc::new(RwLock::new(Crdt::new(node.data))); - let window = streamer::default_window(); + + let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler); + let ncp = Ncp::new( crdt.clone(), window.clone(), @@ -221,6 +250,7 @@ impl FullNode { pub fn new_validator( bank: Bank, entry_height: u64, + ledger_tail: Option>, node: TestNode, entry_point: ReplicatedData, exit: Arc, @@ -239,7 +269,11 @@ impl FullNode { crdt.write() .expect("'crdt' write lock before insert() in pub fn replicate") .insert(&entry_point); - let window = streamer::default_window(); + + let blob_recycler = BlobRecycler::default(); + + let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler); + let ncp = Ncp::new( crdt.clone(), window.clone(), @@ -292,7 +326,7 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let v = FullNode::new_validator(bank, 0, tn, entry, exit.clone()); + let v = FullNode::new_validator(bank, 0, None, tn, entry, exit.clone()); exit.store(true, Ordering::Relaxed); for t in v.thread_hdls { t.join().unwrap(); diff --git a/src/streamer.rs b/src/streamer.rs index ea8eb8aef4..e45ba0861b 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -462,6 +462,44 @@ pub fn default_window() -> Window { Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize])) } +/// Initialize a rebroadcast window with most recent Entry blobs +/// * `crdt` - gossip instance, used to set blob ids +/// * `blobs` - up to WINDOW_SIZE most recent blobs +/// * `entry_height` - current entry height +pub fn initialized_window( + crdt: &Arc>, + blobs: Vec, + entry_height: u64, +) -> Window { + let window = default_window(); + + { + let mut win = window.write().unwrap(); + assert!(blobs.len() <= win.len()); + + debug!( + "initialized window entry_height:{} blobs_len:{}", + entry_height, + blobs.len() + ); + + // Index the blobs + let mut received = entry_height - blobs.len() as u64; + Crdt::index_blobs(crdt, &blobs, &mut received).expect("index blobs for initial window"); + + // populate the window, offset by implied index + for b in blobs { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix % WINDOW_SIZE) as usize; + trace!("caching {} at {}", ix, pos); + assert!(win[pos].is_none()); + win[pos] = Some(b); + } + } + + window +} + pub fn window( crdt: Arc>, window: Window, diff --git a/src/thin_client.rs b/src/thin_client.rs index ff0d4a1bc2..3782ca0252 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -290,6 +290,7 @@ mod tests { let server = FullNode::new_leader( bank, 0, + None, Some(Duration::from_millis(30)), leader, exit.clone(), @@ -329,6 +330,7 @@ mod tests { let server = FullNode::new_leader( bank, 0, + None, Some(Duration::from_millis(30)), leader, exit.clone(), @@ -380,6 +382,7 @@ mod tests { let server = FullNode::new_leader( bank, 0, + None, Some(Duration::from_millis(30)), leader, exit.clone(), diff --git a/tests/multinode.rs b/tests/multinode.rs index 1437c448ff..8de9d60ef1 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -306,6 +306,7 @@ fn test_multi_node_dynamic_network() { Some(OutFile::Path(ledger_path.clone())), exit.clone(), ); + info!("{:x} LEADER", leader_data.debug_id()); let threads = server.thread_hdls(); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); @@ -328,6 +329,7 @@ fn test_multi_node_dynamic_network() { Some(OutFile::Path(ledger_path.clone())), exit.clone(), ); + info!("{:x} VALIDATOR", rd.debug_id()); (rd, exit, val) }) .collect();