diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index 8b73cba6c5..c0033abeca 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -2,12 +2,13 @@ //! use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo}; use crate::counter::Counter; +use crate::db_ledger::DbLedger; use crate::entry::Entry; #[cfg(feature = "erasure")] use crate::erasure; - +use crate::leader_scheduler::LeaderScheduler; use crate::ledger::Block; -use crate::packet::{index_blobs, SharedBlobs}; +use crate::packet::{index_blobs, SharedBlob}; use crate::result::{Error, Result}; use crate::service::Service; use crate::window::{SharedWindow, WindowIndex, WindowUtil}; @@ -27,12 +28,13 @@ use std::time::{Duration, Instant}; pub enum BroadcastServiceReturnType { LeaderRotation, ChannelDisconnected, + ExitSignal, } #[allow(clippy::too_many_arguments)] fn broadcast( + db_ledger: &Arc>, max_tick_height: Option, - tick_height: &mut u64, leader_id: Pubkey, node_info: &NodeInfo, broadcast_table: &[NodeInfo], @@ -41,7 +43,7 @@ fn broadcast( sock: &UdpSocket, transmit_index: &mut WindowIndex, receive_index: &mut u64, - leader_slot: u64, + leader_scheduler: &Arc>, ) -> Result<()> { let id = node_info.id; let timer = Duration::new(1, 0); @@ -50,36 +52,41 @@ fn broadcast( let mut num_entries = entries.len(); let mut ventries = Vec::new(); ventries.push(entries); + + let mut contains_last_tick = false; while let Ok(entries) = receiver.try_recv() { num_entries += entries.len(); ventries.push(entries); } + + if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) { + contains_last_tick |= Some(last.tick_height + 1) == max_tick_height && last.is_tick(); + } + inc_new_counter_info!("broadcast_service-entries_received", num_entries); let to_blobs_start = Instant::now(); - let num_ticks: u64 = ventries - .iter() - .flatten() - .map(|entry| (entry.is_tick()) as u64) - .sum(); - *tick_height += num_ticks; + // Generate the slot heights for all the entries inside ventries + let slot_heights = generate_slots(&ventries, leader_scheduler); - let dq: SharedBlobs = ventries + let blobs_vec: Vec<_> = ventries .into_par_iter() .flat_map(|p| p.to_blobs()) .collect(); - let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); + let blobs_slot_heights: Vec<(SharedBlob, u64)> = + blobs_vec.into_iter().zip(slot_heights).collect(); - // flatten deque to vec - let blobs_vec: SharedBlobs = dq.into_iter().collect(); + let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); let blobs_chunking = Instant::now(); // We could receive more blobs than window slots so // break them up into window-sized chunks to process let window_size = window.read().unwrap().window_size(); - let blobs_chunked = blobs_vec.chunks(window_size as usize).map(|x| x.to_vec()); + let blobs_chunked = blobs_slot_heights + .chunks(window_size as usize) + .map(|x| x.to_vec()); let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed()); let broadcast_start = Instant::now(); @@ -87,14 +94,14 @@ fn broadcast( let blobs_len = blobs.len(); trace!("{}: broadcast blobs.len: {}", id, blobs_len); - index_blobs(&blobs, &node_info.id, *receive_index, leader_slot); + index_blobs(blobs.iter(), &node_info.id, *receive_index); // keep the cache of blobs that are broadcast inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); { let mut win = window.write().unwrap(); assert!(blobs.len() <= win.len()); - for b in &blobs { + for (b, _) in &blobs { let ix = b.read().unwrap().index().expect("blob index"); let pos = (ix % window_size) as usize; if let Some(x) = win[pos].data.take() { @@ -116,12 +123,18 @@ fn broadcast( trace!("{} null {}", id, pos); } - for b in &blobs { - let ix = b.read().unwrap().index().expect("blob index"); - let pos = (ix % window_size) as usize; - trace!("{} caching {} at {}", id, ix, pos); - assert!(win[pos].data.is_none()); - win[pos].data = Some(b.clone()); + for (b, slot) in &blobs { + { + let ix = b.read().unwrap().index().expect("blob index"); + let pos = (ix % window_size) as usize; + trace!("{} caching {} at {}", id, ix, pos); + assert!(win[pos].data.is_none()); + win[pos].data = Some(b.clone()); + } + db_ledger + .write() + .unwrap() + .write_shared_blobs(*slot, vec![b])?; } } @@ -141,7 +154,7 @@ fn broadcast( // Send blobs out from the window ClusterInfo::broadcast( - Some(*tick_height) == max_tick_height, + contains_last_tick, leader_id, &node_info, &broadcast_table, @@ -174,6 +187,30 @@ fn broadcast( Ok(()) } +fn generate_slots( + ventries: &[Vec], + leader_scheduler: &Arc>, +) -> Vec { + // Generate the slot heights for all the entries inside ventries + let r_leader_scheduler = leader_scheduler.read().unwrap(); + ventries + .iter() + .flat_map(|p| { + let slot_heights: Vec = p + .iter() + .map(|e| { + let (_, slot) = r_leader_scheduler + .get_scheduled_leader(e.tick_height + 1) + .expect("Leader schedule should never be unknown while indexing blobs"); + slot + }) + .collect(); + + slot_heights + }) + .collect() +} + // Implement a destructor for the BroadcastService3 thread to signal it exited // even on panics struct Finalizer { @@ -198,14 +235,15 @@ pub struct BroadcastService { impl BroadcastService { fn run( + db_ledger: &Arc>, sock: &UdpSocket, cluster_info: &Arc>, window: &SharedWindow, entry_height: u64, - leader_slot: u64, + leader_scheduler: &Arc>, receiver: &Receiver>, max_tick_height: Option, - tick_height: u64, + exit_signal: &Arc, ) -> BroadcastServiceReturnType { let mut transmit_index = WindowIndex { data: entry_height, @@ -213,14 +251,16 @@ impl BroadcastService { }; let mut receive_index = entry_height; let me = cluster_info.read().unwrap().my_data().clone(); - let mut tick_height_ = tick_height; loop { + if exit_signal.load(Ordering::Relaxed) { + return BroadcastServiceReturnType::ExitSignal; + } let broadcast_table = cluster_info.read().unwrap().tvu_peers(); inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); let leader_id = cluster_info.read().unwrap().leader_id(); if let Err(e) = broadcast( + db_ledger, max_tick_height, - &mut tick_height_, leader_id, &me, &broadcast_table, @@ -229,7 +269,7 @@ impl BroadcastService { &sock, &mut transmit_index, &mut receive_index, - leader_slot, + leader_scheduler, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { @@ -261,35 +301,39 @@ impl BroadcastService { /// WriteStage is the last stage in the pipeline), which will then close Broadcast service, /// which will then close FetchStage in the Tpu, and then the rest of the Tpu, /// completing the cycle. + #[allow(clippy::too_many_arguments, clippy::new_ret_no_self)] pub fn new( + db_ledger: Arc>, sock: UdpSocket, cluster_info: Arc>, window: SharedWindow, entry_height: u64, - leader_slot: u64, + leader_scheduler: Arc>, receiver: Receiver>, max_tick_height: Option, - tick_height: u64, exit_sender: Arc, - ) -> Self { + ) -> (Self, Arc) { + let exit_signal = Arc::new(AtomicBool::new(false)); + let exit_signal_ = exit_signal.clone(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_sender); Self::run( + &db_ledger, &sock, &cluster_info, &window, entry_height, - leader_slot, + &leader_scheduler, &receiver, max_tick_height, - tick_height, + &exit_signal_, ) }) .unwrap(); - Self { thread_hdl } + (Self { thread_hdl }, exit_signal) } } @@ -300,3 +344,142 @@ impl Service for BroadcastService { self.thread_hdl.join() } } + +#[cfg(test)] +mod test { + use super::*; + use crate::cluster_info::{ClusterInfo, Node}; + use crate::db_ledger::DbLedger; + use crate::ledger::create_ticks; + use crate::ledger::get_tmp_ledger_path; + use crate::service::Service; + use crate::window::new_window; + use solana_sdk::hash::Hash; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::sync::atomic::AtomicBool; + use std::sync::mpsc::channel; + use std::sync::mpsc::Sender; + use std::sync::{Arc, RwLock}; + use std::thread::sleep; + use std::time::Duration; + + struct DummyBroadcastService { + db_ledger: Arc>, + broadcast_service: BroadcastService, + entry_sender: Sender>, + exit_signal: Arc, + } + + fn setup_dummy_broadcast_service( + leader_pubkey: Pubkey, + ledger_path: &str, + leader_scheduler: Arc>, + entry_height: u64, + max_tick_height: u64, + ) -> DummyBroadcastService { + // Make the database ledger + let db_ledger = Arc::new(RwLock::new(DbLedger::open(ledger_path).unwrap())); + + // Make the leader node and scheduler + let leader_info = Node::new_localhost_with_pubkey(leader_pubkey); + + // Make a node to broadcast to + let buddy_keypair = Keypair::new(); + let broadcast_buddy = Node::new_localhost_with_pubkey(buddy_keypair.pubkey()); + + // Fill the cluster_info with the buddy's info + let mut cluster_info = ClusterInfo::new(leader_info.info.clone()); + cluster_info.insert_info(broadcast_buddy.info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + + let window = new_window(32 * 1024); + let shared_window = Arc::new(RwLock::new(window)); + let (entry_sender, entry_receiver) = channel(); + let exit_sender = Arc::new(AtomicBool::new(false)); + + // Start up the broadcast stage + let (broadcast_service, exit_signal) = BroadcastService::new( + db_ledger.clone(), + leader_info.sockets.broadcast, + cluster_info, + shared_window, + entry_height, + leader_scheduler, + entry_receiver, + Some(max_tick_height), + exit_sender, + ); + + DummyBroadcastService { + db_ledger, + broadcast_service, + entry_sender, + exit_signal, + } + } + + #[test] + fn test_broadcast_ledger() { + let ledger_path = get_tmp_ledger_path("test_broadcast"); + { + // Create the leader scheduler + let leader_keypair = Keypair::new(); + let mut leader_scheduler = + LeaderScheduler::from_bootstrap_leader(leader_keypair.pubkey()); + + // Mock the tick height to look like the tick height right after a leader transition + leader_scheduler.last_seed_height = Some(leader_scheduler.bootstrap_height); + leader_scheduler.set_leader_schedule(vec![leader_keypair.pubkey()]); + leader_scheduler.use_only_bootstrap_leader = false; + let start_tick_height = leader_scheduler.bootstrap_height; + let max_tick_height = start_tick_height + leader_scheduler.last_seed_height.unwrap(); + let entry_height = 2 * start_tick_height; + + let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); + let broadcast_service = setup_dummy_broadcast_service( + leader_keypair.pubkey(), + &ledger_path, + leader_scheduler.clone(), + entry_height, + max_tick_height, + ); + + let ticks = create_ticks( + (max_tick_height - start_tick_height) as usize, + Hash::default(), + ); + for (i, mut tick) in ticks.into_iter().enumerate() { + // Simulate the tick heights generated in poh.rs + tick.tick_height = start_tick_height + i as u64; + broadcast_service + .entry_sender + .send(vec![tick]) + .expect("Expect successful send to broadcast service"); + } + + sleep(Duration::from_millis(2000)); + let r_db = broadcast_service.db_ledger.read().unwrap(); + for i in 0..max_tick_height - start_tick_height { + let (_, slot) = leader_scheduler + .read() + .unwrap() + .get_scheduled_leader(start_tick_height + i + 1) + .expect("Leader should exist"); + let result = r_db + .data_cf + .get_by_slot_index(&r_db.db, slot, entry_height + i) + .unwrap(); + + assert!(result.is_some()); + } + + broadcast_service.exit_signal.store(true, Ordering::Relaxed); + broadcast_service + .broadcast_service + .join() + .expect("Expect successful join of broadcast service"); + } + + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); + } +} diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 84f34a24ff..6b5f42ddfb 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -1,4 +1,4 @@ -//! The `ledger` module provides functions for parallel verification of the +//! The `db_ledger` module provides functions for parallel verification of the //! Proof of History ledger as well as iterative read, append write, and random //! access read to a persistent file-based ledger. @@ -88,6 +88,10 @@ pub struct SlotMeta { pub consumed: u64, // The entry height of the highest blob received for this slot. pub received: u64, + // The slot the blob with index == "consumed" is in + pub consumed_slot: u64, + // The slot the blob with index == "received" is in + pub received_slot: u64, } impl SlotMeta { @@ -95,6 +99,8 @@ impl SlotMeta { SlotMeta { consumed: 0, received: 0, + consumed_slot: 0, + received_slot: 0, } } } @@ -205,6 +211,10 @@ impl ErasureCf { DataCf::key(slot_height, index) } + pub fn slot_height_from_key(key: &[u8]) -> Result { + DataCf::slot_height_from_key(key) + } + pub fn index_from_key(key: &[u8]) -> Result { DataCf::index_from_key(key) } @@ -319,7 +329,7 @@ impl DbLedger { pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result> { let slot_height = DataCf::slot_height_from_key(key)?; - let meta_key = MetaCf::key(slot_height); + let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); let mut should_write_meta = false; @@ -344,6 +354,7 @@ impl DbLedger { // so received = index + 1 for the same blob. if index >= meta.received { meta.received = index + 1; + meta.received_slot = slot_height; should_write_meta = true; } @@ -362,18 +373,33 @@ impl DbLedger { // Find the next consecutive block of blobs. // TODO: account for consecutive blocks that // span multiple slots + + let mut current_slot = slot_height; loop { index += 1; - let key = DataCf::key(slot_height, index); - if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { - let serialized_entry_data = &blob_data[BLOB_HEADER_SIZE..]; - let entry: Entry = deserialize(serialized_entry_data) - .expect("Ledger should only contain well formed data"); - consumed_queue.push(entry); - meta.consumed += 1; - } else { - break; - } + let key = DataCf::key(current_slot, index); + let blob_data = { + if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { + blob_data + } else if meta.consumed < meta.received { + let key = DataCf::key(current_slot + 1, index); + if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { + current_slot += 1; + meta.consumed_slot = current_slot; + blob_data + } else { + break; + } + } else { + break; + } + }; + + let serialized_entry_data = &blob_data[BLOB_HEADER_SIZE..]; + let entry: Entry = deserialize(serialized_entry_data) + .expect("Ledger should only contain well formed data"); + consumed_queue.push(entry); + meta.consumed += 1; } } @@ -398,8 +424,9 @@ impl DbLedger { start_index: u64, num_blobs: u64, buf: &mut [u8], + slot_height: u64, ) -> Result<(u64, u64)> { - let start_key = DataCf::key(DEFAULT_SLOT_HEIGHT, start_index); + let start_key = DataCf::key(slot_height, start_index); let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle(&self.db))?; db_iterator.seek(&start_key); let mut total_blobs = 0; @@ -492,7 +519,7 @@ impl Iterator for EntryIterator { } } -pub fn write_entries_to_ledger(ledger_paths: &[&str], entries: I) +pub fn write_entries_to_ledger(ledger_paths: &[&str], entries: I, slot_height: u64) where I: IntoIterator, I::Item: Borrow, @@ -502,7 +529,7 @@ where let mut db_ledger = DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"); db_ledger - .write_entries(DEFAULT_SLOT_HEIGHT, entries.by_ref()) + .write_entries(slot_height, entries.by_ref()) .expect("Expected successful write of genesis entries"); } } @@ -537,7 +564,7 @@ mod tests { // Test meta column family let meta = SlotMeta::new(); - let meta_key = MetaCf::key(0); + let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); ledger.meta_cf.put(&ledger.db, &meta_key, &meta).unwrap(); let result = ledger .meta_cf @@ -586,13 +613,14 @@ mod tests { let shared_blobs = make_tiny_test_entries(10).to_blobs(); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); + let slot = DEFAULT_SLOT_HEIGHT; let ledger_path = get_tmp_ledger_path("test_get_blobs_bytes"); let mut ledger = DbLedger::open(&ledger_path).unwrap(); - ledger.write_blobs(DEFAULT_SLOT_HEIGHT, &blobs).unwrap(); + ledger.write_blobs(slot, &blobs).unwrap(); let mut buf = [0; 1024]; - let (num_blobs, bytes) = ledger.get_blob_bytes(0, 1, &mut buf).unwrap(); + let (num_blobs, bytes) = ledger.get_blob_bytes(0, 1, &mut buf, slot).unwrap(); let bytes = bytes as usize; assert_eq!(num_blobs, 1); { @@ -600,7 +628,7 @@ mod tests { assert_eq!(blob_data, &blobs[0].data[..bytes]); } - let (num_blobs, bytes2) = ledger.get_blob_bytes(0, 2, &mut buf).unwrap(); + let (num_blobs, bytes2) = ledger.get_blob_bytes(0, 2, &mut buf, slot).unwrap(); let bytes2 = bytes2 as usize; assert_eq!(num_blobs, 2); assert!(bytes2 > bytes); @@ -614,19 +642,19 @@ mod tests { // buf size part-way into blob[1], should just return blob[0] let mut buf = vec![0; bytes + 1]; - let (num_blobs, bytes3) = ledger.get_blob_bytes(0, 2, &mut buf).unwrap(); + let (num_blobs, bytes3) = ledger.get_blob_bytes(0, 2, &mut buf, slot).unwrap(); assert_eq!(num_blobs, 1); let bytes3 = bytes3 as usize; assert_eq!(bytes3, bytes); let mut buf = vec![0; bytes2 - 1]; - let (num_blobs, bytes4) = ledger.get_blob_bytes(0, 2, &mut buf).unwrap(); + let (num_blobs, bytes4) = ledger.get_blob_bytes(0, 2, &mut buf, slot).unwrap(); assert_eq!(num_blobs, 1); let bytes4 = bytes4 as usize; assert_eq!(bytes4, bytes); let mut buf = vec![0; bytes * 2]; - let (num_blobs, bytes6) = ledger.get_blob_bytes(9, 1, &mut buf).unwrap(); + let (num_blobs, bytes6) = ledger.get_blob_bytes(9, 1, &mut buf, slot).unwrap(); assert_eq!(num_blobs, 1); let bytes6 = bytes6 as usize; @@ -636,7 +664,7 @@ mod tests { } // Read out of range - assert!(ledger.get_blob_bytes(20, 2, &mut buf).is_err()); + assert!(ledger.get_blob_bytes(20, 2, &mut buf, slot).is_err()); // Destroying database without closing it first is undefined behavior drop(ledger); @@ -696,7 +724,7 @@ mod tests { let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_multiple"); let ledger = DbLedger::open(&ledger_path).unwrap(); - // Insert first blob, check for consecutive returned blobs + // Insert blobs in reverse, check for consecutive returned blobs for i in (0..num_blobs).rev() { let result = ledger .insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, i as u64), blobs[i]) @@ -721,6 +749,50 @@ mod tests { DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } + #[test] + fn test_insert_data_blobs_slots() { + let num_blobs = 10; + let entries = make_tiny_test_entries(num_blobs); + let shared_blobs = entries.to_blobs(); + let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); + let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); + + let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_slots"); + let ledger = DbLedger::open(&ledger_path).unwrap(); + + // Insert last blob into next slot + let result = ledger + .insert_data_blob( + &DataCf::key(DEFAULT_SLOT_HEIGHT + 1, (num_blobs - 1) as u64), + blobs.last().unwrap(), + ) + .unwrap(); + assert_eq!(result.len(), 0); + + // Insert blobs into first slot, check for consecutive blobs + for i in (0..num_blobs - 1).rev() { + let result = ledger + .insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, i as u64), blobs[i]) + .unwrap(); + let meta = ledger + .meta_cf + .get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) + .unwrap() + .expect("Expected metadata object to exist"); + if i != 0 { + assert_eq!(result.len(), 0); + assert!(meta.consumed == 0 && meta.received == num_blobs as u64); + } else { + assert_eq!(result, entries); + assert!(meta.consumed == num_blobs as u64 && meta.received == num_blobs as u64); + } + } + + // Destroying database without closing it first is undefined behavior + drop(ledger); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); + } + #[test] pub fn test_iteration_order() { let slot = 0; diff --git a/src/db_window.rs b/src/db_window.rs index 4e3c743555..3514c1217a 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -22,7 +22,6 @@ use std::sync::{Arc, RwLock}; pub const MAX_REPAIR_LENGTH: usize = 128; pub fn repair( - slot: u64, db_ledger: &DbLedger, cluster_info: &Arc>, id: &Pubkey, @@ -33,7 +32,9 @@ pub fn repair( ) -> Result)>> { let rcluster_info = cluster_info.read().unwrap(); let mut is_next_leader = false; - let meta = db_ledger.meta_cf.get(&db_ledger.db, &MetaCf::key(slot))?; + let meta = db_ledger + .meta_cf + .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?; if meta.is_none() { return Ok(vec![]); } @@ -84,7 +85,7 @@ pub fn repair( }; let idxs = find_missing_data_indexes( - slot, + DEFAULT_SLOT_HEIGHT, db_ledger, consumed, max_repair_entry_height - 1, @@ -219,7 +220,7 @@ pub fn find_missing_coding_indexes( pub fn retransmit_all_leader_blocks( dq: &[SharedBlob], - leader_scheduler: &LeaderScheduler, + leader_scheduler: &Arc>, retransmit: &BlobSender, ) -> Result<()> { let mut retransmit_queue: Vec = Vec::new(); @@ -227,7 +228,7 @@ pub fn retransmit_all_leader_blocks( // Check if the blob is from the scheduled leader for its slot. If so, // add to the retransmit_queue if let Ok(slot) = b.read().unwrap().slot() { - if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) { + if let Some(leader_id) = leader_scheduler.read().unwrap().get_leader_for_slot(slot) { add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); } } @@ -271,8 +272,8 @@ pub fn add_blob_to_retransmit_queue( /// starting from consumed is thereby formed, add that continuous /// range of blobs to a queue to be sent on to the next stage. pub fn process_blob( - leader_scheduler: &LeaderScheduler, - db_ledger: &mut DbLedger, + leader_scheduler: &Arc>, + db_ledger: &Arc>, blob: &SharedBlob, max_ix: u64, pix: u64, @@ -287,11 +288,10 @@ pub fn process_blob( // leader rotation enabled // Github issue: https://github.com/solana-labs/solana/issues/1899. let slot = blob.read().unwrap().slot()?; - let leader = leader_scheduler.get_leader_for_slot(slot); + let leader = leader_scheduler.read().unwrap().get_leader_for_slot(slot); // TODO: Once the original leader signature is added to the blob, make sure that // the blob was originally generated by the expected leader for this slot - if leader.is_none() { return Ok(()); } @@ -301,15 +301,21 @@ pub fn process_blob( let erasure_key = ErasureCf::key(slot, pix); let rblob = &blob.read().unwrap(); let size = rblob.size()?; - db_ledger.erasure_cf.put( - &db_ledger.db, - &erasure_key, - &rblob.data[..BLOB_HEADER_SIZE + size], - )?; + { + let w_db = db_ledger.write().unwrap(); + w_db.erasure_cf.put( + &w_db.db, + &erasure_key, + &rblob.data[..BLOB_HEADER_SIZE + size], + )?; + } vec![] } else { let data_key = DataCf::key(slot, pix); - db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())? + db_ledger + .write() + .unwrap() + .insert_data_blob(&data_key, &blob.read().unwrap())? }; #[cfg(feature = "erasure")] @@ -317,7 +323,7 @@ pub fn process_blob( // If write_shared_blobs() of these recovered blobs fails fails, don't return // because consumed_entries might be nonempty from earlier, and tick height needs to // be updated. Hopefully we can recover these blobs next time successfully. - if let Err(e) = try_erasure(db_ledger, slot, consume_queue) { + if let Err(e) = try_erasure(db_ledger, consume_queue) { trace!( "erasure::recover failed to write recovered coding blobs. Err: {:?}", e @@ -333,10 +339,12 @@ pub fn process_blob( // we only want up to a certain index // then stop if max_ix != 0 && !consumed_entries.is_empty() { - let meta = db_ledger - .meta_cf - .get(&db_ledger.db, &MetaCf::key(slot))? - .expect("Expect metadata to exist if consumed entries is nonzero"); + let meta = { + let r_db = db_ledger.read().unwrap(); + r_db.meta_cf + .get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))? + .expect("Expect metadata to exist if consumed entries is nonzero") + }; let consumed = meta.consumed; @@ -374,23 +382,31 @@ pub fn calculate_max_repair_entry_height( } #[cfg(feature = "erasure")] -fn try_erasure(db_ledger: &mut DbLedger, slot: u64, consume_queue: &mut Vec) -> Result<()> { - let meta = db_ledger.meta_cf.get(&db_ledger.db, &MetaCf::key(slot))?; +fn try_erasure(db_ledger: &Arc>, consume_queue: &mut Vec) -> Result<()> { + let meta = { + let r_db = db_ledger.read().unwrap(); + r_db.meta_cf + .get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))? + }; + if let Some(meta) = meta { - let (data, coding) = erasure::recover(db_ledger, slot, meta.consumed)?; + let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?; for c in coding { let cl = c.read().unwrap(); - let erasure_key = - ErasureCf::key(slot, cl.index().expect("Recovered blob must set index")); + let erasure_key = ErasureCf::key( + meta.consumed_slot, + cl.index().expect("Recovered blob must set index"), + ); let size = cl.size().expect("Recovered blob must set size"); - db_ledger.erasure_cf.put( - &db_ledger.db, - &erasure_key, - &cl.data[..BLOB_HEADER_SIZE + size], - )?; + let r_db = db_ledger.read().unwrap(); + r_db.erasure_cf + .put(&r_db.db, &erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?; } - let entries = db_ledger.write_shared_blobs(slot, data)?; + let entries = db_ledger + .write() + .unwrap() + .write_shared_blobs(meta.consumed_slot, data)?; consume_queue.extend(entries); } @@ -416,7 +432,7 @@ mod test { use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; - use std::sync::Arc; + use std::sync::{Arc, RwLock}; use std::time::Duration; fn get_msgs(r: PacketReceiver, num: &mut usize) { @@ -500,7 +516,8 @@ mod test { pub fn test_retransmit() { let leader = Keypair::new().pubkey(); let nonleader = Keypair::new().pubkey(); - let leader_scheduler = LeaderScheduler::from_bootstrap_leader(leader); + let leader_scheduler = + Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(leader))); let blob = SharedBlob::default(); let (blob_sender, blob_receiver) = channel(); @@ -714,12 +731,15 @@ mod test { // Generate the db_ledger from the window let ledger_path = get_tmp_ledger_path("test_try_erasure"); - let mut db_ledger = - generate_db_ledger_from_window(&ledger_path, &window, slot_height, false); + let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( + &ledger_path, + &window, + slot_height, + false, + ))); let mut consume_queue = vec![]; - try_erasure(&mut db_ledger, slot_height, &mut consume_queue) - .expect("Expected successful erasure attempt"); + try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt"); window[erase_offset].data = erased_data; let data_blobs: Vec<_> = window[erase_offset..end_index] @@ -730,10 +750,11 @@ mod test { assert_eq!(consume_queue, expected); let erased_coding_l = erased_coding.read().unwrap(); + let r_db = db_ledger.read().unwrap(); assert_eq!( - &db_ledger + &r_db .erasure_cf - .get_by_slot_index(&db_ledger.db, slot_height, erase_offset as u64) + .get_by_slot_index(&r_db.db, slot_height, erase_offset as u64) .unwrap() .unwrap()[BLOB_HEADER_SIZE..], &erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize], diff --git a/src/erasure.rs b/src/erasure.rs index 45d0ac2445..11c8e5d2ea 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -351,7 +351,7 @@ pub fn generate_coding( // Recover the missing data and coding blobs from the input ledger. Returns a vector // of the recovered missing data blobs and a vector of the recovered coding blobs pub fn recover( - db_ledger: &mut DbLedger, + db_ledger: &Arc>, slot: u64, start_idx: u64, ) -> Result<(Vec, Vec)> { @@ -367,11 +367,22 @@ pub fn recover( block_end_idx ); - let data_missing = - find_missing_data_indexes(slot, db_ledger, block_start_idx, block_end_idx, NUM_DATA).len(); - let coding_missing = - find_missing_coding_indexes(slot, db_ledger, coding_start_idx, block_end_idx, NUM_CODING) - .len(); + let data_missing = find_missing_data_indexes( + slot, + &db_ledger.read().unwrap(), + block_start_idx, + block_end_idx, + NUM_DATA, + ) + .len(); + let coding_missing = find_missing_coding_indexes( + slot, + &db_ledger.read().unwrap(), + coding_start_idx, + block_end_idx, + NUM_CODING, + ) + .len(); // if we're not missing data, or if we have too much missing but have enough coding if data_missing == 0 { @@ -405,9 +416,10 @@ pub fn recover( // Add the data blobs we have into the recovery vector, mark the missing ones for i in block_start_idx..block_end_idx { - let result = db_ledger - .data_cf - .get_by_slot_index(&db_ledger.db, slot, i)?; + let result = { + let r_db = db_ledger.read().unwrap(); + r_db.data_cf.get_by_slot_index(&r_db.db, slot, i)? + }; categorize_blob( &result, @@ -420,9 +432,10 @@ pub fn recover( // Add the coding blobs we have into the recovery vector, mark the missing ones for i in coding_start_idx..block_end_idx { - let result = db_ledger - .erasure_cf - .get_by_slot_index(&db_ledger.db, slot, i)?; + let result = { + let r_db = db_ledger.read().unwrap(); + r_db.erasure_cf.get_by_slot_index(&r_db.db, slot, i)? + }; categorize_blob( &result, @@ -515,9 +528,10 @@ pub fn recover( // Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct // the blobs again for i in coding_start_idx..block_end_idx { - db_ledger - .erasure_cf - .delete_by_slot_index(&db_ledger.db, slot, i)?; + { + let r_db = db_ledger.read().unwrap(); + r_db.erasure_cf.delete_by_slot_index(&r_db.db, slot, i)?; + } } return Ok((vec![], vec![])); } @@ -562,6 +576,7 @@ pub mod test { use rand::{thread_rng, Rng}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::sync::{Arc, RwLock}; #[test] pub fn test_coding() { @@ -749,7 +764,12 @@ pub mod test { blobs.push(b_); } - index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, slot); + { + // Make some dummy slots + let slot_tick_heights: Vec<(&SharedBlob, u64)> = + blobs.iter().zip(vec![slot; blobs.len()]).collect(); + index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64); + } for b in blobs { let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; @@ -770,7 +790,13 @@ pub mod test { let entries = make_tiny_test_entries(num_blobs); let blobs = entries.to_blobs(); - index_blobs(&blobs, &Keypair::new().pubkey(), offset as u64, 13); + { + // Make some dummy slots + let slot_tick_heights: Vec<(&SharedBlob, u64)> = + blobs.iter().zip(vec![0; blobs.len()]).collect(); + index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64); + } + for b in blobs.into_iter() { let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE; @@ -815,11 +841,15 @@ pub mod test { // Generate the db_ledger from the window let ledger_path = get_tmp_ledger_path("test_window_recover_basic"); - let mut db_ledger = - generate_db_ledger_from_window(&ledger_path, &window, DEFAULT_SLOT_HEIGHT, true); + let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( + &ledger_path, + &window, + DEFAULT_SLOT_HEIGHT, + true, + ))); // Recover it from coding - let (recovered_data, recovered_coding) = recover(&mut db_ledger, 0, offset as u64) + let (recovered_data, recovered_coding) = recover(&db_ledger, 0, offset as u64) .expect("Expected successful recovery of erased blobs"); assert!(recovered_coding.is_empty()); @@ -866,11 +896,15 @@ pub mod test { window[erase_offset].data = None; window[erase_offset].coding = None; let ledger_path = get_tmp_ledger_path("test_window_recover_basic2"); - let mut db_ledger = - generate_db_ledger_from_window(&ledger_path, &window, DEFAULT_SLOT_HEIGHT, true); + let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( + &ledger_path, + &window, + DEFAULT_SLOT_HEIGHT, + true, + ))); // Recover it from coding - let (recovered_data, recovered_coding) = recover(&mut db_ledger, 0, offset as u64) + let (recovered_data, recovered_coding) = recover(&db_ledger, 0, offset as u64) .expect("Expected successful recovery of erased blobs"); { diff --git a/src/fullnode.rs b/src/fullnode.rs index 0eb95161c3..19eda84f29 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -4,7 +4,7 @@ use crate::bank::Bank; use crate::broadcast_service::BroadcastService; use crate::cluster_info::{ClusterInfo, Node, NodeInfo}; use crate::counter::Counter; -use crate::db_ledger::{write_entries_to_ledger, DbLedger}; +use crate::db_ledger::{write_entries_to_ledger, DbLedger, DEFAULT_SLOT_HEIGHT}; use crate::gossip_service::GossipService; use crate::leader_scheduler::LeaderScheduler; use crate::ledger::read_ledger; @@ -260,7 +260,7 @@ impl Fullnode { } // Get the scheduled leader - let (scheduled_leader, leader_slot) = bank + let (scheduled_leader, _) = bank .get_current_leader() .expect("Leader not known after processing bank"); @@ -327,7 +327,8 @@ impl Fullnode { scheduled_leader, ); - let broadcast_service = BroadcastService::new( + let (broadcast_service, _) = BroadcastService::new( + db_ledger.clone(), node.sockets .broadcast .try_clone() @@ -335,10 +336,9 @@ impl Fullnode { cluster_info.clone(), shared_window.clone(), entry_height, - leader_slot, + bank.leader_scheduler.clone(), entry_receiver, max_tick_height, - bank.tick_height(), tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_service); @@ -401,7 +401,7 @@ impl Fullnode { let new_bank = Arc::new(new_bank); let (scheduled_leader, _) = new_bank .get_current_leader() - .expect("Scheduled leader should exist after rebuilding bank"); + .expect("Scheduled leader id should be calculated after rebuilding bank"); (new_bank, scheduled_leader, entry_height, last_id) }; @@ -494,17 +494,17 @@ impl Fullnode { self.keypair.pubkey(), ); - let broadcast_service = BroadcastService::new( + let (broadcast_service, _) = BroadcastService::new( + self.db_ledger.clone(), self.broadcast_socket .try_clone() .expect("Failed to clone broadcast socket"), self.cluster_info.clone(), self.shared_window.clone(), entry_height, - 0, // TODO: get real leader slot from leader_scheduler + self.bank.leader_scheduler.clone(), blob_receiver, max_tick_height, - tick_height, tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_service); @@ -614,7 +614,7 @@ impl Fullnode { .expect("opening ledger") .map(|entry| entry.unwrap()); - write_entries_to_ledger(&[ledger_path], ledger_entries); + write_entries_to_ledger(&[ledger_path], ledger_entries, DEFAULT_SLOT_HEIGHT); let db = DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"); Arc::new(RwLock::new(db)) @@ -938,8 +938,10 @@ mod tests { } } - validator.close().expect("Expected node to close"); - bootstrap_leader.close().expect("Expected node to close"); + validator.close().expect("Expected leader node to close"); + bootstrap_leader + .close() + .expect("Expected validator node to close"); } for path in ledger_paths { DbLedger::destroy(&path).expect("Expected successful database destruction"); diff --git a/src/leader_scheduler.rs b/src/leader_scheduler.rs index 71aa1e32e1..16a52b49b8 100644 --- a/src/leader_scheduler.rs +++ b/src/leader_scheduler.rs @@ -295,6 +295,11 @@ impl LeaderScheduler { self.get_scheduled_leader(tick_height).map(|(id, _)| id) } + #[cfg(test)] + pub fn set_leader_schedule(&mut self, schedule: Vec) { + self.leader_schedule = schedule; + } + // Maps the nth slot (where n == slot_height) to the tick height of // the first tick for that slot fn slot_height_to_first_tick_height(&self, slot_height: u64) -> u64 { diff --git a/src/packet.rs b/src/packet.rs index 6a787ac5db..da49f42c22 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -14,6 +14,7 @@ use serde::Serialize; use solana_sdk::hash::Hash; pub use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; +use std::borrow::Borrow; use std::cmp; use std::fmt; use std::io; @@ -436,13 +437,19 @@ impl Blob { } } -pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slot: u64) { +pub fn index_blobs(blobs: I, id: &Pubkey, mut index: u64) +where + I: IntoIterator, + J: Borrow<(K, u64)>, + K: Borrow, +{ // enumerate all the blobs, those are the indices for b in blobs { - let mut blob = b.write().unwrap(); + let (b, slot) = b.borrow(); + let mut blob = b.borrow().write().unwrap(); blob.set_index(index).expect("set_index"); - blob.set_slot(slot).expect("set_slot"); + blob.set_slot(*slot).expect("set_slot"); blob.set_id(id).expect("set_id"); blob.set_flags(0).unwrap(); diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 758e76c514..72d3c94a88 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -90,15 +90,16 @@ impl ReplayStage { "replicate_stage-verify-duration", duration_as_ms(&now.elapsed()) as usize ); + let (current_leader, _) = bank .get_current_leader() - .expect("Scheduled leader id should never be unknown while processing entries"); + .expect("Scheduled leader should be calculated by this point"); for (i, entry) in entries.iter().enumerate() { res = bank.process_entry(&entry); let my_id = keypair.pubkey(); let (scheduled_leader, _) = bank .get_current_leader() - .expect("Scheduled leader id should never be unknown while processing entries"); + .expect("Scheduled leader should be calculated by this point"); // TODO: Remove this soon once we boot the leader from ClusterInfo if scheduled_leader != current_leader { @@ -179,7 +180,7 @@ impl ReplayStage { loop { let (leader_id, _) = bank .get_current_leader() - .expect("Scheduled leader id should never be unknown at this point"); + .expect("Scheduled leader should be calculated by this point"); if leader_id == keypair.pubkey() { return Some(ReplayStageReturnType::LeaderRotation( diff --git a/src/window_service.rs b/src/window_service.rs index 1289e84b45..d8f2994849 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -14,7 +14,7 @@ use rand::{thread_rng, Rng}; use solana_metrics::{influxdb, submit}; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; -use std::borrow::{Borrow, BorrowMut}; +use std::borrow::Borrow; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::mpsc::RecvTimeoutError; @@ -52,9 +52,9 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { #[allow(clippy::too_many_arguments)] fn recv_window( - db_ledger: &mut DbLedger, + db_ledger: &Arc>, id: &Pubkey, - leader_scheduler: &LeaderScheduler, + leader_scheduler: &Arc>, tick_height: &mut u64, max_ix: u64, r: &BlobReceiver, @@ -149,9 +149,9 @@ pub fn window_service( trace!("{}: RECV_WINDOW started", id); loop { if let Err(e) = recv_window( - db_ledger.write().unwrap().borrow_mut(), + &db_ledger, &id, - leader_scheduler.read().unwrap().borrow(), + &leader_scheduler, &mut tick_height_, max_entry_height, &r, @@ -207,7 +207,6 @@ pub fn window_service( trace!("{} let's repair! times = {}", id, times); let reqs = repair( - DEFAULT_SLOT_HEIGHT, db_ledger.read().unwrap().borrow(), &cluster_info, &id, diff --git a/tests/multinode.rs b/tests/multinode.rs index 467cd02186..5557a5ef62 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1056,8 +1056,10 @@ fn test_leader_validator_basic() { } // Shut down - validator.close().unwrap(); - leader.close().unwrap(); + validator + .close() + .expect("Expected successful validator close"); + leader.close().expect("Expected successful leader close"); // Check the ledger of the validator to make sure the entry height is correct // and that the old leader and the new leader's ledgers agree up to the point diff --git a/tests/replicator.rs b/tests/replicator.rs index 27f05b41c9..51a79124e7 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -7,7 +7,6 @@ use solana::db_ledger::DbLedger; use solana::fullnode::Fullnode; use solana::leader_scheduler::LeaderScheduler; use solana::ledger::{create_tmp_genesis, get_tmp_ledger_path, read_ledger}; -use solana::logger; use solana::replicator::Replicator; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; @@ -19,7 +18,6 @@ use std::time::Duration; #[test] fn test_replicator_startup() { - logger::setup(); info!("starting replicator test"); let replicator_ledger_path = &get_tmp_ledger_path("replicator_test_replicator_ledger"); @@ -103,7 +101,7 @@ fn test_replicator_startup() { } assert!(num_entries > 0); replicator.close(); - leader.exit(); + leader.close().expect("Expected successful node closure"); } DbLedger::destroy(&leader_ledger_path).expect("Expected successful database destuction");