From d8f68653387fcfbcdfcc225ce48d244fcba46292 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 21 Feb 2019 16:16:09 -0700 Subject: [PATCH] Rename EntryStream to Blockstream --- book/src/api-node.md | 2 +- fullnode/src/main.rs | 8 +- multinode-demo/bootstrap-leader.sh | 10 +-- multinode-demo/fullnode.sh | 14 +-- net/remote/remote-node.sh | 2 +- run.sh | 10 +-- src/{entry_stream.rs => blockstream.rs} | 39 +++++---- ...stream_stage.rs => blockstream_service.rs} | 85 +++++++++---------- src/fullnode.rs | 6 +- src/lib.rs | 4 +- src/tvu.rs | 22 ++--- 11 files changed, 100 insertions(+), 102 deletions(-) rename src/{entry_stream.rs => blockstream.rs} (90%) rename src/{entry_stream_stage.rs => blockstream_service.rs} (75%) diff --git a/book/src/api-node.md b/book/src/api-node.md index c2cf0a64f5..21023b26d5 100644 --- a/book/src/api-node.md +++ b/book/src/api-node.md @@ -12,7 +12,7 @@ To run an api node, include the argument `no-signer` and (optional) `entry-stream` socket location: ```bash -$ ./multinode-demo/fullnode-x.sh --no-signer --entry-stream +$ ./multinode-demo/fullnode-x.sh --no-signer --blockstream ``` The stream will output a series of JSON objects: diff --git a/fullnode/src/main.rs b/fullnode/src/main.rs index fe914e3189..048a4be9b6 100644 --- a/fullnode/src/main.rs +++ b/fullnode/src/main.rs @@ -129,11 +129,11 @@ fn main() { let matches = App::new("solana-fullnode") .version(crate_version!()) .arg( - Arg::with_name("entry_stream") - .long("entry-stream") + Arg::with_name("blockstream") + .long("blockstream") .takes_value(true) .value_name("UNIX DOMAIN SOCKET") - .help("Open entry stream at this unix domain socket location") + .help("Open blockstream at this unix domain socket location") ) .arg( Arg::with_name("identity") @@ -238,7 +238,7 @@ fn main() { ) }; let init_complete_file = matches.value_of("init_complete_file"); - fullnode_config.entry_stream = matches.value_of("entry_stream").map(|s| s.to_string()); + fullnode_config.blockstream = matches.value_of("blockstream").map(|s| s.to_string()); let keypair = Arc::new(keypair); let mut node = Node::new_with_external_ip(keypair.pubkey(), &gossip); diff --git a/multinode-demo/bootstrap-leader.sh b/multinode-demo/bootstrap-leader.sh index fd5bd4b163..4b1be20b45 100755 --- a/multinode-demo/bootstrap-leader.sh +++ b/multinode-demo/bootstrap-leader.sh @@ -29,7 +29,7 @@ else program="$solana_fullnode" fi -maybe_entry_stream= +maybe_blockstream= maybe_init_complete_file= maybe_no_leader_rotation= @@ -37,8 +37,8 @@ while [[ -n $1 ]]; do if [[ $1 = --init-complete-file ]]; then maybe_init_complete_file="--init-complete-file $2" shift 2 - elif [[ $1 = --entry-stream ]]; then - maybe_entry_stream="$1 $2" + elif [[ $1 = --blockstream ]]; then + maybe_blockstream="$1 $2" shift 2 elif [[ $1 = --no-leader-rotation ]]; then maybe_no_leader_rotation="--no-leader-rotation" @@ -61,9 +61,9 @@ tune_system trap 'kill "$pid" && wait "$pid"' INT TERM $solana_ledger_tool --ledger "$SOLANA_CONFIG_DIR"/bootstrap-leader-ledger verify -# shellcheck disable=SC2086 # Don't want to double quote maybe_entry_stream or maybe_init_complete_file +# shellcheck disable=SC2086 # Don't want to double quote maybe_blockstream or maybe_init_complete_file $program \ - $maybe_entry_stream \ + $maybe_blockstream \ $maybe_init_complete_file \ $maybe_no_leader_rotation \ --identity "$SOLANA_CONFIG_DIR"/bootstrap-leader.json \ diff --git a/multinode-demo/fullnode.sh b/multinode-demo/fullnode.sh index 75bf6252bb..d41b4b5721 100755 --- a/multinode-demo/fullnode.sh +++ b/multinode-demo/fullnode.sh @@ -21,14 +21,14 @@ usage() { echo fi cat < { +pub struct Blockstream { pub output: T, pub leader_scheduler: Arc>, - pub queued_block: Option, + pub queued_block: Option, } -impl EntryStreamHandler for EntryStream +impl BlockstreamEvents for Blockstream where T: EntryWriter, { @@ -127,11 +127,11 @@ where } } -pub type SocketEntryStream = EntryStream; +pub type SocketBlockstream = Blockstream; -impl SocketEntryStream { +impl SocketBlockstream { pub fn new(socket: String, leader_scheduler: Arc>) -> Self { - EntryStream { + Blockstream { output: EntrySocket { socket }, leader_scheduler, queued_block: None, @@ -139,11 +139,11 @@ impl SocketEntryStream { } } -pub type MockEntryStream = EntryStream; +pub type MockBlockstream = Blockstream; -impl MockEntryStream { +impl MockBlockstream { pub fn new(_: String, leader_scheduler: Arc>) -> Self { - EntryStream { + Blockstream { output: EntryVec::new(), leader_scheduler, queued_block: None, @@ -156,7 +156,7 @@ impl MockEntryStream { } #[derive(Debug)] -pub struct EntryStreamBlock { +pub struct BlockData { pub slot: u64, pub tick_height: u64, pub id: Hash, @@ -175,7 +175,7 @@ mod test { use std::collections::HashSet; #[test] - fn test_entry_stream() -> () { + fn test_blockstream() -> () { // Set up bank and leader_scheduler let leader_scheduler_config = LeaderSchedulerConfig::new(5, 2, 10); let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000); @@ -183,9 +183,8 @@ mod test { let leader_scheduler = LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank); let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); - // Set up entry stream - let entry_stream = - MockEntryStream::new("test_stream".to_string(), leader_scheduler.clone()); + // Set up blockstream + let blockstream = MockBlockstream::new("test_stream".to_string(), leader_scheduler.clone()); let ticks_per_slot = leader_scheduler.read().unwrap().ticks_per_slot; let mut last_id = Hash::default(); @@ -215,14 +214,14 @@ mod test { .unwrap() .tick_height_to_slot(tick_height); if curr_slot != previous_slot { - entry_stream + blockstream .emit_block_event(previous_slot, tick_height - 1, &leader_id, last_id) .unwrap(); } let entry = Entry::new(&mut last_id, 1, vec![]); // just ticks last_id = entry.id; previous_slot = curr_slot; - entry_stream + blockstream .emit_entry_event(curr_slot, tick_height, &leader_id, &entry) .unwrap(); expected_entries.push(entry.clone()); @@ -230,7 +229,7 @@ mod test { } assert_eq!( - entry_stream.entries().len() as u64, + blockstream.entries().len() as u64, // one entry per tick (0..=N+2) is +3, plus one block ticks_per_slot + 3 + 1 ); @@ -240,7 +239,7 @@ mod test { let mut matched_slots = HashSet::new(); let mut matched_blocks = HashSet::new(); - for item in entry_stream.entries() { + for item in blockstream.entries() { let json: Value = serde_json::from_str(&item).unwrap(); let dt_str = json["dt"].as_str().unwrap(); diff --git a/src/entry_stream_stage.rs b/src/blockstream_service.rs similarity index 75% rename from src/entry_stream_stage.rs rename to src/blockstream_service.rs index 0cf37186f9..1f719e6db9 100644 --- a/src/entry_stream_stage.rs +++ b/src/blockstream_service.rs @@ -1,13 +1,13 @@ -//! The `entry_stream_stage` implements optional streaming of entries using the -//! `entry_stream` module, providing client services such as a block explorer with +//! The `blockstream_service` implements optional streaming of entries and block metadata +//! using the `blockstream` module, providing client services such as a block explorer with //! real-time access to entries. -use crate::entry::{EntryReceiver, EntrySender}; #[cfg(test)] -use crate::entry_stream::MockEntryStream as EntryStream; +use crate::blockstream::MockBlockstream as Blockstream; #[cfg(not(test))] -use crate::entry_stream::SocketEntryStream as EntryStream; -use crate::entry_stream::{EntryStreamBlock, EntryStreamHandler}; +use crate::blockstream::SocketBlockstream as Blockstream; +use crate::blockstream::{BlockData, BlockstreamEvents}; +use crate::entry::{EntryReceiver, EntrySender}; use crate::leader_scheduler::LeaderScheduler; use crate::result::{Error, Result}; use crate::service::Service; @@ -17,32 +17,32 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -pub struct EntryStreamStage { - t_entry_stream: JoinHandle<()>, +pub struct BlockstreamService { + t_blockstream: JoinHandle<()>, } -impl EntryStreamStage { +impl BlockstreamService { #[allow(clippy::new_ret_no_self)] pub fn new( ledger_entry_receiver: EntryReceiver, - entry_stream_socket: String, + blockstream_socket: String, mut tick_height: u64, leader_scheduler: Arc>, exit: Arc, ) -> (Self, EntryReceiver) { - let (entry_stream_sender, entry_stream_receiver) = channel(); - let mut entry_stream = EntryStream::new(entry_stream_socket, leader_scheduler); - let t_entry_stream = Builder::new() - .name("solana-entry-stream".to_string()) + let (blockstream_sender, blockstream_receiver) = channel(); + let mut blockstream = Blockstream::new(blockstream_socket, leader_scheduler); + let t_blockstream = Builder::new() + .name("solana-blockstream".to_string()) .spawn(move || loop { if exit.load(Ordering::Relaxed) { break; } if let Err(e) = Self::process_entries( &ledger_entry_receiver, - &entry_stream_sender, + &blockstream_sender, &mut tick_height, - &mut entry_stream, + &mut blockstream, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -52,17 +52,17 @@ impl EntryStreamStage { } }) .unwrap(); - (Self { t_entry_stream }, entry_stream_receiver) + (Self { t_blockstream }, blockstream_receiver) } fn process_entries( ledger_entry_receiver: &EntryReceiver, - entry_stream_sender: &EntrySender, + blockstream_sender: &EntrySender, tick_height: &mut u64, - entry_stream: &mut EntryStream, + blockstream: &mut Blockstream, ) -> Result<()> { let timeout = Duration::new(1, 0); let entries = ledger_entry_receiver.recv_timeout(timeout)?; - let leader_scheduler = entry_stream.leader_scheduler.read().unwrap(); + let leader_scheduler = blockstream.leader_scheduler.read().unwrap(); for entry in &entries { if entry.is_tick() { @@ -74,25 +74,25 @@ impl EntryStreamStage { .map(|leader| leader.to_string()) .unwrap_or_else(|| "None".to_string()); - if entry.is_tick() && entry_stream.queued_block.is_some() { - let queued_block = entry_stream.queued_block.as_ref(); + if entry.is_tick() && blockstream.queued_block.is_some() { + let queued_block = blockstream.queued_block.as_ref(); let block_slot = queued_block.unwrap().slot; let block_tick_height = queued_block.unwrap().tick_height; let block_id = queued_block.unwrap().id; - entry_stream + blockstream .emit_block_event(block_slot, block_tick_height, &leader_id, block_id) .unwrap_or_else(|e| { - debug!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); + debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); }); - entry_stream.queued_block = None; + blockstream.queued_block = None; } - entry_stream + blockstream .emit_entry_event(slot, *tick_height, &leader_id, &entry) .unwrap_or_else(|e| { - debug!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); + debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); }); if 0 == leader_scheduler.num_ticks_left_in_slot(*tick_height) { - entry_stream.queued_block = Some(EntryStreamBlock { + blockstream.queued_block = Some(BlockData { slot, tick_height: *tick_height, id: entry.id, @@ -100,16 +100,16 @@ impl EntryStreamStage { } } - entry_stream_sender.send(entries)?; + blockstream_sender.send(entries)?; Ok(()) } } -impl Service for EntryStreamStage { +impl Service for BlockstreamService { type JoinReturnType = (); fn join(self) -> thread::Result<()> { - self.t_entry_stream.join() + self.t_blockstream.join() } } @@ -127,7 +127,7 @@ mod test { use solana_sdk::system_transaction::SystemTransaction; #[test] - fn test_entry_stream_stage_process_entries() { + fn test_blockstream_stage_process_entries() { // Set up the bank and leader_scheduler let ticks_per_slot = 5; let starting_tick_height = 1; @@ -138,13 +138,12 @@ mod test { let leader_scheduler = LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank); let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); - // Set up entry stream - let mut entry_stream = - EntryStream::new("test_stream".to_string(), leader_scheduler.clone()); + // Set up blockstream + let mut blockstream = Blockstream::new("test_stream".to_string(), leader_scheduler.clone()); - // Set up dummy channels to host an EntryStreamStage + // Set up dummy channels to host an BlockstreamService let (ledger_entry_sender, ledger_entry_receiver) = channel(); - let (entry_stream_sender, entry_stream_receiver) = channel(); + let (blockstream_sender, blockstream_receiver) = channel(); let mut last_id = Hash::default(); let mut entries = Vec::new(); @@ -168,16 +167,16 @@ mod test { entries.insert(ticks_per_slot as usize, entry); ledger_entry_sender.send(entries).unwrap(); - EntryStreamStage::process_entries( + BlockstreamService::process_entries( &ledger_entry_receiver, - &entry_stream_sender, + &blockstream_sender, &mut (starting_tick_height - 1), - &mut entry_stream, + &mut blockstream, ) .unwrap(); - assert_eq!(entry_stream.entries().len(), 8); + assert_eq!(blockstream.entries().len(), 8); - let (entry_events, block_events): (Vec, Vec) = entry_stream + let (entry_events, block_events): (Vec, Vec) = blockstream .entries() .iter() .map(|item| { @@ -212,7 +211,7 @@ mod test { } // Ensure entries pass through stage unadulterated - let recv_entries = entry_stream_receiver.recv().unwrap(); + let recv_entries = blockstream_receiver.recv().unwrap(); assert_eq!(expected_entries, recv_entries); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 6b9d15a11a..108a64f1ca 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -64,7 +64,7 @@ pub enum FullnodeReturnType { pub struct FullnodeConfig { pub sigverify_disabled: bool, pub voting_disabled: bool, - pub entry_stream: Option, + pub blockstream: Option, pub storage_rotate_count: u64, pub leader_scheduler_config: LeaderSchedulerConfig, pub tick_config: PohServiceConfig, @@ -78,7 +78,7 @@ impl Default for FullnodeConfig { Self { sigverify_disabled: false, voting_disabled: false, - entry_stream: None, + blockstream: None, storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE, leader_scheduler_config: LeaderSchedulerConfig::default(), tick_config: PohServiceConfig::default(), @@ -270,7 +270,7 @@ impl Fullnode { config.storage_rotate_count, &rotation_sender, &storage_state, - config.entry_stream.as_ref(), + config.blockstream.as_ref(), ledger_signal_receiver, leader_scheduler.clone(), &subscriptions, diff --git a/src/lib.rs b/src/lib.rs index 02c0772b4d..611dd2af3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,13 +26,13 @@ pub mod crds_gossip_push; pub mod crds_value; #[macro_use] pub mod contact_info; +pub mod blockstream; +pub mod blockstream_service; pub mod blocktree; pub mod blocktree_processor; pub mod cluster_info; pub mod db_window; pub mod entry; -pub mod entry_stream; -pub mod entry_stream_stage; #[cfg(feature = "erasure")] pub mod erasure; pub mod fetch_stage; diff --git a/src/tvu.rs b/src/tvu.rs index 1b9f117186..574d6be54f 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -14,9 +14,9 @@ use crate::bank_forks::BankForks; use crate::blob_fetch_stage::BlobFetchStage; +use crate::blockstream_service::BlockstreamService; use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; -use crate::entry_stream_stage::EntryStreamStage; use crate::leader_scheduler::LeaderScheduler; use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; @@ -45,7 +45,7 @@ pub struct Tvu { fetch_stage: BlobFetchStage, retransmit_stage: RetransmitStage, replay_stage: ReplayStage, - entry_stream_stage: Option, + blockstream_service: Option, storage_stage: StorageStage, exit: Arc, } @@ -78,7 +78,7 @@ impl Tvu { storage_rotate_count: u64, to_leader_sender: &TvuRotationSender, storage_state: &StorageState, - entry_stream: Option<&String>, + blockstream: Option<&String>, ledger_signal_receiver: Receiver, leader_scheduler: Arc>, subscriptions: &Arc, @@ -134,16 +134,16 @@ impl Tvu { subscriptions, ); - let entry_stream_stage = if entry_stream.is_some() { - let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new( + let blockstream_service = if blockstream.is_some() { + let (blockstream_service, blockstream_receiver) = BlockstreamService::new( previous_receiver, - entry_stream.unwrap().to_string(), + blockstream.unwrap().to_string(), bank.tick_height(), leader_scheduler, exit.clone(), ); - previous_receiver = entry_stream_receiver; - Some(entry_stream_stage) + previous_receiver = blockstream_receiver; + Some(blockstream_service) } else { None }; @@ -163,7 +163,7 @@ impl Tvu { fetch_stage, retransmit_stage, replay_stage, - entry_stream_stage, + blockstream_service, storage_stage, exit, } @@ -193,8 +193,8 @@ impl Service for Tvu { self.retransmit_stage.join()?; self.fetch_stage.join()?; self.storage_stage.join()?; - if self.entry_stream_stage.is_some() { - self.entry_stream_stage.unwrap().join()?; + if self.blockstream_service.is_some() { + self.blockstream_service.unwrap().join()?; } self.replay_stage.join()?; Ok(())