From 6eb09a69016752976a88d4838e708c056faabb77 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 28 Feb 2019 02:14:37 -0700 Subject: [PATCH] Trigger blockstream on full-slot notification (clean up superfluous stuff) --- src/blockstream.rs | 11 --- src/blockstream_service.rs | 174 ++++++++++++++++--------------------- src/entry.rs | 27 +----- src/replay_stage.rs | 87 +++++-------------- src/storage_stage.rs | 18 ++-- src/tvu.rs | 10 +-- 6 files changed, 113 insertions(+), 214 deletions(-) diff --git a/src/blockstream.rs b/src/blockstream.rs index 3d61cb2429..5f462de6fe 100644 --- a/src/blockstream.rs +++ b/src/blockstream.rs @@ -78,7 +78,6 @@ pub trait BlockstreamEvents { #[derive(Debug)] pub struct Blockstream { pub output: T, - pub queued_block: Option, } impl BlockstreamEvents for Blockstream @@ -131,7 +130,6 @@ impl SocketBlockstream { pub fn new(socket: String) -> Self { Blockstream { output: EntrySocket { socket }, - queued_block: None, } } } @@ -142,7 +140,6 @@ impl MockBlockstream { pub fn new(_: String) -> Self { Blockstream { output: EntryVec::new(), - queued_block: None, } } @@ -151,14 +148,6 @@ impl MockBlockstream { } } -#[derive(Debug)] -pub struct BlockData { - pub slot: u64, - pub tick_height: u64, - pub id: Hash, - pub leader_id: Pubkey, -} - #[cfg(test)] mod test { use super::*; diff --git a/src/blockstream_service.rs b/src/blockstream_service.rs index 9a9cbd8049..2a52235ae8 100644 --- a/src/blockstream_service.rs +++ b/src/blockstream_service.rs @@ -2,16 +2,17 @@ //! using the `blockstream` module, providing client services such as a block explorer with //! real-time access to entries. +use crate::blockstream::BlockstreamEvents; #[cfg(test)] use crate::blockstream::MockBlockstream as Blockstream; #[cfg(not(test))] use crate::blockstream::SocketBlockstream as Blockstream; -use crate::blockstream::{BlockData, BlockstreamEvents}; -use crate::entry::{EntryReceiver, EntrySender}; +use crate::blocktree::Blocktree; use crate::result::{Error, Result}; use crate::service::Service; +use solana_sdk::pubkey::Pubkey; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, RecvTimeoutError}; +use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -23,11 +24,11 @@ pub struct BlockstreamService { impl BlockstreamService { #[allow(clippy::new_ret_no_self)] pub fn new( - ledger_entry_receiver: EntryReceiver, + slot_full_receiver: Receiver<(u64, Pubkey)>, + blocktree: Arc, blockstream_socket: String, exit: Arc, - ) -> (Self, EntryReceiver) { - let (blockstream_sender, blockstream_receiver) = channel(); + ) -> Self { let mut blockstream = Blockstream::new(blockstream_socket); let t_blockstream = Builder::new() .name("solana-blockstream".to_string()) @@ -35,11 +36,9 @@ impl BlockstreamService { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = Self::process_entries( - &ledger_entry_receiver, - &blockstream_sender, - &mut blockstream, - ) { + if let Err(e) = + Self::process_entries(&slot_full_receiver, &blocktree, &mut blockstream) + { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -48,51 +47,50 @@ impl BlockstreamService { } }) .unwrap(); - (Self { t_blockstream }, blockstream_receiver) + Self { t_blockstream } } fn process_entries( - ledger_entry_receiver: &EntryReceiver, - blockstream_sender: &EntrySender, + slot_full_receiver: &Receiver<(u64, Pubkey)>, + blocktree: &Arc, blockstream: &mut Blockstream, ) -> Result<()> { let timeout = Duration::new(1, 0); - let entries_with_meta = ledger_entry_receiver.recv_timeout(timeout)?; + let (slot, slot_leader) = slot_full_receiver.recv_timeout(timeout)?; - for entry_meta in &entries_with_meta { - if entry_meta.entry.is_tick() && blockstream.queued_block.is_some() { - let queued_block = blockstream.queued_block.as_ref(); - let block_slot = queued_block.unwrap().slot; - let block_tick_height = queued_block.unwrap().tick_height; - let block_id = queued_block.unwrap().id; - let block_leader = queued_block.unwrap().leader_id; - blockstream - .emit_block_event(block_slot, block_tick_height, block_leader, block_id) - .unwrap_or_else(|e| { - debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); - }); - blockstream.queued_block = None; + let entries = blocktree.get_slot_entries(slot, 0, None).unwrap(); + let blocktree_meta = blocktree.meta(slot).unwrap().unwrap(); + let _parent_slot = if slot == 0 { + None + } else { + Some(blocktree_meta.parent_slot) + }; + let ticks_per_slot = entries + .iter() + .filter(|entry| entry.is_tick()) + .fold(0, |acc, _| acc + 1); + let mut tick_height = if slot > 0 { + ticks_per_slot * slot - 1 + } else { + 0 + }; + + for (i, entry) in entries.iter().enumerate() { + if entry.is_tick() { + tick_height += 1; } blockstream - .emit_entry_event( - entry_meta.slot, - entry_meta.tick_height, - entry_meta.slot_leader, - &entry_meta.entry, - ) + .emit_entry_event(slot, tick_height, slot_leader, &entry) .unwrap_or_else(|e| { debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); }); - if entry_meta.is_end_of_slot { - blockstream.queued_block = Some(BlockData { - slot: entry_meta.slot, - tick_height: entry_meta.tick_height, - id: entry_meta.entry.id, - leader_id: entry_meta.slot_leader, - }); + if i == entries.len() - 1 { + blockstream + .emit_block_event(slot, tick_height, slot_leader, entry.id) + .unwrap_or_else(|e| { + debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); + }); } } - - blockstream_sender.send(entries_with_meta)?; Ok(()) } } @@ -108,76 +106,62 @@ impl Service for BlockstreamService { #[cfg(test)] mod test { use super::*; - use crate::entry::{Entry, EntryMeta}; + use crate::blocktree::{create_new_ledger, get_tmp_ledger_path}; + use crate::entry::{create_ticks, Entry}; use chrono::{DateTime, FixedOffset}; use serde_json::Value; + use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; + use std::sync::mpsc::channel; #[test] fn test_blockstream_service_process_entries() { let ticks_per_slot = 5; let leader_id = Keypair::new().pubkey(); + // Set up genesis block and blocktree + let (mut genesis_block, _mint_keypair) = GenesisBlock::new(1000); + genesis_block.ticks_per_slot = ticks_per_slot; + let (ledger_path, _last_id) = { + let ledger_path = get_tmp_ledger_path(&format!("{}-{}", file!(), line!())); + let last_id = create_new_ledger(&ledger_path, &genesis_block).unwrap(); + (ledger_path, last_id) + }; + let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot).unwrap(); + // Set up blockstream let mut blockstream = Blockstream::new("test_stream".to_string()); - // Set up dummy channels to host an BlockstreamService - let (ledger_entry_sender, ledger_entry_receiver) = channel(); - let (blockstream_sender, blockstream_receiver) = channel(); + // Set up dummy channel to receive a full-slot notification + let (slot_full_sender, slot_full_receiver) = channel(); + + // Create entries - 4 ticks + 1 populated entry + 1 tick + let mut entries = create_ticks(4, Hash::default()); - let mut last_id = Hash::default(); - let mut entries = Vec::new(); - let mut expected_entries = Vec::new(); - let mut expected_tick_heights = Vec::new(); - for x in 0..6 { - let entry = Entry::new(&mut last_id, 1, vec![]); //just ticks - last_id = entry.id; - let slot_height = x / ticks_per_slot; - let parent_slot = if slot_height > 0 { - Some(slot_height - 1) - } else { - None - }; - let entry_meta = EntryMeta { - tick_height: x, - slot: slot_height, - slot_leader: leader_id, - is_end_of_slot: x == ticks_per_slot - 1, - parent_slot, - entry, - }; - expected_entries.push(entry_meta.clone()); - expected_tick_heights.push(x); - entries.push(entry_meta); - } let keypair = Keypair::new(); + let mut last_id = entries[3].id; let tx = SystemTransaction::new_account(&keypair, keypair.pubkey(), 1, Hash::default(), 0); let entry = Entry::new(&mut last_id, 1, vec![tx]); - let entry_meta = EntryMeta { - tick_height: ticks_per_slot - 1, - slot: 0, - slot_leader: leader_id, - is_end_of_slot: true, - parent_slot: None, - entry, - }; - expected_entries.insert(ticks_per_slot as usize, entry_meta.clone()); - expected_tick_heights.insert( - ticks_per_slot as usize, - ticks_per_slot - 1, // Populated entries should share the tick height of the previous tick. - ); - entries.insert(ticks_per_slot as usize, entry_meta); + last_id = entry.id; + entries.push(entry); + let final_tick = create_ticks(1, last_id); + entries.extend_from_slice(&final_tick); - ledger_entry_sender.send(entries).unwrap(); + let expected_entries = entries.clone(); + let expected_tick_heights = [5, 6, 7, 8, 8, 9]; + + blocktree.write_entries(1, 0, 0, &entries).unwrap(); + + slot_full_sender.send((1, leader_id)).unwrap(); BlockstreamService::process_entries( - &ledger_entry_receiver, - &blockstream_sender, + &slot_full_receiver, + &Arc::new(blocktree), &mut blockstream, ) .unwrap(); - assert_eq!(blockstream.entries().len(), 8); + assert_eq!(blockstream.entries().len(), 7); let (entry_events, block_events): (Vec, Vec) = blockstream .entries() @@ -203,18 +187,14 @@ mod test { // `serde_json::from_str` does not work for populated Entries. // Remove this `if` when fixed. let entry: Entry = serde_json::from_value(entry_obj).unwrap(); - assert_eq!(entry, expected_entries[i].entry); + assert_eq!(entry, expected_entries[i]); } } for json in block_events { let slot = json["s"].as_u64().unwrap(); - assert_eq!(0, slot); + assert_eq!(1, slot); let height = json["h"].as_u64().unwrap(); - assert_eq!(ticks_per_slot - 1, height); + assert_eq!(2 * ticks_per_slot - 1, height); } - - // Ensure entries pass through stage unadulterated - let recv_entries = blockstream_receiver.recv().unwrap(); - assert_eq!(expected_entries, recv_entries); } } diff --git a/src/entry.rs b/src/entry.rs index b9084ac742..c6b99e850e 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -21,31 +21,8 @@ use std::mem::size_of; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, RwLock}; -pub type EntrySender = Sender>; -pub type EntryReceiver = Receiver>; - -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct EntryMeta { - pub tick_height: u64, - pub slot: u64, - pub slot_leader: Pubkey, - pub is_end_of_slot: bool, - pub parent_slot: Option, - pub entry: Entry, -} - -impl EntryMeta { - pub fn new(entry: Entry) -> Self { - Self { - tick_height: 0, - slot: 0, - slot_leader: Pubkey::default(), - is_end_of_slot: false, - parent_slot: None, - entry, - } - } -} +pub type EntrySender = Sender>; +pub type EntryReceiver = Receiver>; /// Each Entry contains three pieces of data. The `num_hashes` field is the number /// of hashes performed since the previous entry. The `id` field is the result diff --git a/src/replay_stage.rs b/src/replay_stage.rs index e5a77cb845..b48b97ba5b 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -4,7 +4,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::blocktree_processor::{self, BankForksInfo}; use crate::cluster_info::ClusterInfo; -use crate::entry::{Entry, EntryMeta, EntryReceiver, EntrySender, EntrySlice}; +use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; use crate::leader_schedule_utils; use crate::packet::BlobError; use crate::result::{Error, Result}; @@ -60,6 +60,7 @@ impl ReplayStage { bank: &Arc, cluster_info: &Arc>, voting_keypair: &Option>, + forward_entry_sender: &EntrySender, current_blob_index: &mut u64, last_entry_id: &mut Hash, subscriptions: &Arc, @@ -146,6 +147,12 @@ impl ReplayStage { ); let entries_len = entries.len() as u64; + // TODO: In line with previous behavior, this will write all the entries even if + // an error occurred processing one of the entries (causing the rest of the entries to + // not be processed). + if entries_len != 0 { + forward_entry_sender.send(entries)?; + } *current_blob_index += entries_len; res?; @@ -168,12 +175,12 @@ impl ReplayStage { to_leader_sender: &TvuRotationSender, ledger_signal_receiver: Receiver, subscriptions: &Arc, - ) -> (Self, EntryReceiver) + ) -> (Self, Receiver<(u64, Pubkey)>, EntryReceiver) where T: 'static + KeypairUtil + Send + Sync, { let (forward_entry_sender, forward_entry_receiver) = channel(); - // let (_slot_full_sender, _slot_full_receiver) = channel(); + let (slot_full_sender, slot_full_receiver) = channel(); let exit_ = exit.clone(); let to_leader_sender = to_leader_sender.clone(); let subscriptions_ = subscriptions.clone(); @@ -301,22 +308,12 @@ impl ReplayStage { }; if !entries.is_empty() { - // if let Err(e) = Self::forward_entries( - // entries.clone(), - // slot, - // current_leader_id, - // bank.ticks_per_slot(), - // bank.tick_height(), - // &blocktree, - // &forward_entry_sender, - // ) { - // error!("{} forward_entries failed: {:?}", my_id, e); - // } if let Err(e) = Self::process_entries( entries, &bank, &cluster_info, &voting_keypair, + &forward_entry_sender, &mut current_blob_index, &mut last_entry_id, &subscriptions_, @@ -330,15 +327,8 @@ impl ReplayStage { // We've reached the end of a slot, reset our state and check // for leader rotation if max_tick_height_for_slot == current_tick_height { - let entries_to_stream = blocktree.get_slot_entries(slot, 0, None).unwrap(); - if let Err(e) = Self::forward_entries( - entries_to_stream, - slot, - current_leader_id, - &blocktree, - &forward_entry_sender, - ) { - error!("{} forward_entries failed: {:?}", my_id, e); + if let Err(e) = slot_full_sender.send((slot, current_leader_id)) { + error!("{} slot_full alert failed: {:?}", my_id, e); } // Check for leader rotation @@ -408,7 +398,11 @@ impl ReplayStage { }) .unwrap(); - (Self { t_replay, exit }, forward_entry_receiver) + ( + Self { t_replay, exit }, + slot_full_receiver, + forward_entry_receiver, + ) } pub fn close(self) -> thread::Result<()> { @@ -448,44 +442,6 @@ impl ReplayStage { let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error"); next_slots.first().cloned() } - - fn forward_entries( - entries: Vec, - slot: u64, - slot_leader: Pubkey, - blocktree: &Arc, - forward_entry_sender: &EntrySender, - ) -> Result<()> { - let blocktree_meta = blocktree.meta(slot).unwrap().unwrap(); - let parent_slot = if slot == 0 { - None - } else { - Some(blocktree_meta.parent_slot) - }; - let ticks_per_slot = entries - .iter() - .filter(|entry| entry.is_tick()) - .fold(0, |acc, _| acc + 1); - let mut tick_height = ticks_per_slot * slot - 1; - let mut entries_with_meta = Vec::new(); - for entry in entries.into_iter() { - if entry.is_tick() { - tick_height += 1; - } - let is_end_of_slot = (tick_height + 1) % ticks_per_slot == 0; - let entry_meta = EntryMeta { - tick_height, - slot, - slot_leader, - is_end_of_slot, - parent_slot, - entry, - }; - entries_with_meta.push(entry_meta); - } - forward_entry_sender.send(entries_with_meta)?; - Ok(()) - } } impl Service for ReplayStage { @@ -541,7 +497,7 @@ mod test { let last_entry_id = bank_forks_info[0].last_entry_id; let blocktree = Arc::new(blocktree); - let (replay_stage, ledger_writer_recv) = ReplayStage::new( + let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new( my_keypair.pubkey(), Some(voting_keypair.clone()), blocktree.clone(), @@ -566,7 +522,7 @@ mod test { .recv() .expect("Expected to receive an entry on the ledger writer receiver"); - assert_eq!(next_tick[0], received_tick[0].entry); + assert_eq!(next_tick[0], received_tick[0]); replay_stage .close() @@ -583,6 +539,7 @@ mod test { let my_node = Node::new_localhost_with_pubkey(my_id); // Set up the cluster info let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); + let (forward_entry_sender, _forward_entry_receiver) = channel(); let mut last_entry_id = Hash::default(); let mut current_blob_index = 0; let mut last_id = Hash::default(); @@ -600,6 +557,7 @@ mod test { &bank, &cluster_info_me, &voting_keypair, + &forward_entry_sender, &mut current_blob_index, &mut last_entry_id, &Arc::new(RpcSubscriptions::default()), @@ -622,6 +580,7 @@ mod test { &bank, &cluster_info_me, &voting_keypair, + &forward_entry_sender, &mut current_blob_index, &mut last_entry_id, &Arc::new(RpcSubscriptions::default()), diff --git a/src/storage_stage.rs b/src/storage_stage.rs index 540c63d680..a508393c0c 100644 --- a/src/storage_stage.rs +++ b/src/storage_stage.rs @@ -362,11 +362,7 @@ impl StorageStage { tx_sender: &TransactionSender, ) -> Result<()> { let timeout = Duration::new(1, 0); - let entries: Vec = entry_receiver - .recv_timeout(timeout)? - .iter() - .map(|entry_meta| entry_meta.entry.clone()) - .collect(); + let entries: Vec = entry_receiver.recv_timeout(timeout)?; for entry in entries { // Go through the transactions, find votes, and use them to update // the storage_keys with their signatures. @@ -450,7 +446,7 @@ impl Service for StorageStage { mod tests { use crate::blocktree::{create_new_tmp_ledger, Blocktree}; use crate::cluster_info::{ClusterInfo, NodeInfo}; - use crate::entry::{make_tiny_test_entries, Entry, EntryMeta}; + use crate::entry::{make_tiny_test_entries, Entry}; use crate::service::Service; use crate::storage_stage::StorageState; use crate::storage_stage::NUM_IDENTITIES; @@ -529,8 +525,7 @@ mod tests { STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); - let entries_meta: Vec = entries.into_iter().map(EntryMeta::new).collect(); - storage_entry_sender.send(entries_meta.clone()).unwrap(); + storage_entry_sender.send(entries.clone()).unwrap(); let keypair = Keypair::new(); let hash = Hash::default(); @@ -539,7 +534,7 @@ mod tests { assert_eq!(result, Hash::default()); for _ in 0..9 { - storage_entry_sender.send(entries_meta.clone()).unwrap(); + storage_entry_sender.send(entries.clone()).unwrap(); } for _ in 0..5 { result = storage_state.get_mining_result(&signature); @@ -592,8 +587,7 @@ mod tests { STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); - let entries_meta: Vec = entries.into_iter().map(EntryMeta::new).collect(); - storage_entry_sender.send(entries_meta.clone()).unwrap(); + storage_entry_sender.send(entries.clone()).unwrap(); let mut reference_keys; { @@ -605,7 +599,7 @@ mod tests { let keypair = Keypair::new(); let vote_tx = VoteTransaction::new_vote(&keypair, 123456, Hash::default(), 1); vote_txs.push(vote_tx); - let vote_entries = vec![EntryMeta::new(Entry::new(&Hash::default(), 1, vote_txs))]; + let vote_entries = vec![Entry::new(&Hash::default(), 1, vote_txs)]; storage_entry_sender.send(vote_entries).unwrap(); for _ in 0..5 { diff --git a/src/tvu.rs b/src/tvu.rs index 2dd4a41335..37066ee153 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -117,7 +117,7 @@ impl Tvu { exit.clone(), ); - let (replay_stage, mut previous_receiver) = ReplayStage::new( + let (replay_stage, slot_full_receiver, forward_entry_receiver) = ReplayStage::new( keypair.pubkey(), voting_keypair, blocktree.clone(), @@ -131,12 +131,12 @@ impl Tvu { ); let blockstream_service = if blockstream.is_some() { - let (blockstream_service, blockstream_receiver) = BlockstreamService::new( - previous_receiver, + let blockstream_service = BlockstreamService::new( + slot_full_receiver, + blocktree.clone(), blockstream.unwrap().to_string(), exit.clone(), ); - previous_receiver = blockstream_receiver; Some(blockstream_service) } else { None @@ -144,7 +144,7 @@ impl Tvu { let storage_stage = StorageStage::new( storage_state, - previous_receiver, + forward_entry_receiver, Some(blocktree), &keypair, &exit.clone(),