Trigger blockstream on full-slot notification (clean up superfluous stuff)

This commit is contained in:
Tyera Eulberg
2019-02-28 02:14:37 -07:00
committed by Greg Fitzgerald
parent e04d2379df
commit 6eb09a6901
6 changed files with 113 additions and 214 deletions

View File

@ -78,7 +78,6 @@ pub trait BlockstreamEvents {
#[derive(Debug)]
pub struct Blockstream<T: EntryWriter> {
pub output: T,
pub queued_block: Option<BlockData>,
}
impl<T> BlockstreamEvents for Blockstream<T>
@ -131,7 +130,6 @@ impl SocketBlockstream {
pub fn new(socket: String) -> Self {
Blockstream {
output: EntrySocket { socket },
queued_block: None,
}
}
}
@ -142,7 +140,6 @@ impl MockBlockstream {
pub fn new(_: String) -> Self {
Blockstream {
output: EntryVec::new(),
queued_block: None,
}
}
@ -151,14 +148,6 @@ impl MockBlockstream {
}
}
#[derive(Debug)]
pub struct BlockData {
pub slot: u64,
pub tick_height: u64,
pub id: Hash,
pub leader_id: Pubkey,
}
#[cfg(test)]
mod test {
use super::*;

View File

@ -2,16 +2,17 @@
//! using the `blockstream` module, providing client services such as a block explorer with
//! real-time access to entries.
use crate::blockstream::BlockstreamEvents;
#[cfg(test)]
use crate::blockstream::MockBlockstream as Blockstream;
#[cfg(not(test))]
use crate::blockstream::SocketBlockstream as Blockstream;
use crate::blockstream::{BlockData, BlockstreamEvents};
use crate::entry::{EntryReceiver, EntrySender};
use crate::blocktree::Blocktree;
use crate::result::{Error, Result};
use crate::service::Service;
use solana_sdk::pubkey::Pubkey;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
@ -23,11 +24,11 @@ pub struct BlockstreamService {
impl BlockstreamService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
ledger_entry_receiver: EntryReceiver,
slot_full_receiver: Receiver<(u64, Pubkey)>,
blocktree: Arc<Blocktree>,
blockstream_socket: String,
exit: Arc<AtomicBool>,
) -> (Self, EntryReceiver) {
let (blockstream_sender, blockstream_receiver) = channel();
) -> Self {
let mut blockstream = Blockstream::new(blockstream_socket);
let t_blockstream = Builder::new()
.name("solana-blockstream".to_string())
@ -35,11 +36,9 @@ impl BlockstreamService {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = Self::process_entries(
&ledger_entry_receiver,
&blockstream_sender,
&mut blockstream,
) {
if let Err(e) =
Self::process_entries(&slot_full_receiver, &blocktree, &mut blockstream)
{
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
@ -48,51 +47,50 @@ impl BlockstreamService {
}
})
.unwrap();
(Self { t_blockstream }, blockstream_receiver)
Self { t_blockstream }
}
fn process_entries(
ledger_entry_receiver: &EntryReceiver,
blockstream_sender: &EntrySender,
slot_full_receiver: &Receiver<(u64, Pubkey)>,
blocktree: &Arc<Blocktree>,
blockstream: &mut Blockstream,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let entries_with_meta = ledger_entry_receiver.recv_timeout(timeout)?;
let (slot, slot_leader) = slot_full_receiver.recv_timeout(timeout)?;
for entry_meta in &entries_with_meta {
if entry_meta.entry.is_tick() && blockstream.queued_block.is_some() {
let queued_block = blockstream.queued_block.as_ref();
let block_slot = queued_block.unwrap().slot;
let block_tick_height = queued_block.unwrap().tick_height;
let block_id = queued_block.unwrap().id;
let block_leader = queued_block.unwrap().leader_id;
let entries = blocktree.get_slot_entries(slot, 0, None).unwrap();
let blocktree_meta = blocktree.meta(slot).unwrap().unwrap();
let _parent_slot = if slot == 0 {
None
} else {
Some(blocktree_meta.parent_slot)
};
let ticks_per_slot = entries
.iter()
.filter(|entry| entry.is_tick())
.fold(0, |acc, _| acc + 1);
let mut tick_height = if slot > 0 {
ticks_per_slot * slot - 1
} else {
0
};
for (i, entry) in entries.iter().enumerate() {
if entry.is_tick() {
tick_height += 1;
}
blockstream
.emit_block_event(block_slot, block_tick_height, block_leader, block_id)
.emit_entry_event(slot, tick_height, slot_leader, &entry)
.unwrap_or_else(|e| {
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
});
blockstream.queued_block = None;
}
if i == entries.len() - 1 {
blockstream
.emit_entry_event(
entry_meta.slot,
entry_meta.tick_height,
entry_meta.slot_leader,
&entry_meta.entry,
)
.emit_block_event(slot, tick_height, slot_leader, entry.id)
.unwrap_or_else(|e| {
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
});
if entry_meta.is_end_of_slot {
blockstream.queued_block = Some(BlockData {
slot: entry_meta.slot,
tick_height: entry_meta.tick_height,
id: entry_meta.entry.id,
leader_id: entry_meta.slot_leader,
});
}
}
blockstream_sender.send(entries_with_meta)?;
Ok(())
}
}
@ -108,76 +106,62 @@ impl Service for BlockstreamService {
#[cfg(test)]
mod test {
use super::*;
use crate::entry::{Entry, EntryMeta};
use crate::blocktree::{create_new_ledger, get_tmp_ledger_path};
use crate::entry::{create_ticks, Entry};
use chrono::{DateTime, FixedOffset};
use serde_json::Value;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use std::sync::mpsc::channel;
#[test]
fn test_blockstream_service_process_entries() {
let ticks_per_slot = 5;
let leader_id = Keypair::new().pubkey();
// Set up genesis block and blocktree
let (mut genesis_block, _mint_keypair) = GenesisBlock::new(1000);
genesis_block.ticks_per_slot = ticks_per_slot;
let (ledger_path, _last_id) = {
let ledger_path = get_tmp_ledger_path(&format!("{}-{}", file!(), line!()));
let last_id = create_new_ledger(&ledger_path, &genesis_block).unwrap();
(ledger_path, last_id)
};
let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot).unwrap();
// Set up blockstream
let mut blockstream = Blockstream::new("test_stream".to_string());
// Set up dummy channels to host an BlockstreamService
let (ledger_entry_sender, ledger_entry_receiver) = channel();
let (blockstream_sender, blockstream_receiver) = channel();
// Set up dummy channel to receive a full-slot notification
let (slot_full_sender, slot_full_receiver) = channel();
// Create entries - 4 ticks + 1 populated entry + 1 tick
let mut entries = create_ticks(4, Hash::default());
let mut last_id = Hash::default();
let mut entries = Vec::new();
let mut expected_entries = Vec::new();
let mut expected_tick_heights = Vec::new();
for x in 0..6 {
let entry = Entry::new(&mut last_id, 1, vec![]); //just ticks
last_id = entry.id;
let slot_height = x / ticks_per_slot;
let parent_slot = if slot_height > 0 {
Some(slot_height - 1)
} else {
None
};
let entry_meta = EntryMeta {
tick_height: x,
slot: slot_height,
slot_leader: leader_id,
is_end_of_slot: x == ticks_per_slot - 1,
parent_slot,
entry,
};
expected_entries.push(entry_meta.clone());
expected_tick_heights.push(x);
entries.push(entry_meta);
}
let keypair = Keypair::new();
let mut last_id = entries[3].id;
let tx = SystemTransaction::new_account(&keypair, keypair.pubkey(), 1, Hash::default(), 0);
let entry = Entry::new(&mut last_id, 1, vec![tx]);
let entry_meta = EntryMeta {
tick_height: ticks_per_slot - 1,
slot: 0,
slot_leader: leader_id,
is_end_of_slot: true,
parent_slot: None,
entry,
};
expected_entries.insert(ticks_per_slot as usize, entry_meta.clone());
expected_tick_heights.insert(
ticks_per_slot as usize,
ticks_per_slot - 1, // Populated entries should share the tick height of the previous tick.
);
entries.insert(ticks_per_slot as usize, entry_meta);
last_id = entry.id;
entries.push(entry);
let final_tick = create_ticks(1, last_id);
entries.extend_from_slice(&final_tick);
ledger_entry_sender.send(entries).unwrap();
let expected_entries = entries.clone();
let expected_tick_heights = [5, 6, 7, 8, 8, 9];
blocktree.write_entries(1, 0, 0, &entries).unwrap();
slot_full_sender.send((1, leader_id)).unwrap();
BlockstreamService::process_entries(
&ledger_entry_receiver,
&blockstream_sender,
&slot_full_receiver,
&Arc::new(blocktree),
&mut blockstream,
)
.unwrap();
assert_eq!(blockstream.entries().len(), 8);
assert_eq!(blockstream.entries().len(), 7);
let (entry_events, block_events): (Vec<Value>, Vec<Value>) = blockstream
.entries()
@ -203,18 +187,14 @@ mod test {
// `serde_json::from_str` does not work for populated Entries.
// Remove this `if` when fixed.
let entry: Entry = serde_json::from_value(entry_obj).unwrap();
assert_eq!(entry, expected_entries[i].entry);
assert_eq!(entry, expected_entries[i]);
}
}
for json in block_events {
let slot = json["s"].as_u64().unwrap();
assert_eq!(0, slot);
assert_eq!(1, slot);
let height = json["h"].as_u64().unwrap();
assert_eq!(ticks_per_slot - 1, height);
}
// Ensure entries pass through stage unadulterated
let recv_entries = blockstream_receiver.recv().unwrap();
assert_eq!(expected_entries, recv_entries);
assert_eq!(2 * ticks_per_slot - 1, height);
}
}
}

View File

@ -21,31 +21,8 @@ use std::mem::size_of;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, RwLock};
pub type EntrySender = Sender<Vec<EntryMeta>>;
pub type EntryReceiver = Receiver<Vec<EntryMeta>>;
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct EntryMeta {
pub tick_height: u64,
pub slot: u64,
pub slot_leader: Pubkey,
pub is_end_of_slot: bool,
pub parent_slot: Option<u64>,
pub entry: Entry,
}
impl EntryMeta {
pub fn new(entry: Entry) -> Self {
Self {
tick_height: 0,
slot: 0,
slot_leader: Pubkey::default(),
is_end_of_slot: false,
parent_slot: None,
entry,
}
}
}
pub type EntrySender = Sender<Vec<Entry>>;
pub type EntryReceiver = Receiver<Vec<Entry>>;
/// Each Entry contains three pieces of data. The `num_hashes` field is the number
/// of hashes performed since the previous entry. The `id` field is the result

View File

@ -4,7 +4,7 @@ use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree;
use crate::blocktree_processor::{self, BankForksInfo};
use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntryMeta, EntryReceiver, EntrySender, EntrySlice};
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use crate::leader_schedule_utils;
use crate::packet::BlobError;
use crate::result::{Error, Result};
@ -60,6 +60,7 @@ impl ReplayStage {
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
voting_keypair: &Option<Arc<T>>,
forward_entry_sender: &EntrySender,
current_blob_index: &mut u64,
last_entry_id: &mut Hash,
subscriptions: &Arc<RpcSubscriptions>,
@ -146,6 +147,12 @@ impl ReplayStage {
);
let entries_len = entries.len() as u64;
// TODO: In line with previous behavior, this will write all the entries even if
// an error occurred processing one of the entries (causing the rest of the entries to
// not be processed).
if entries_len != 0 {
forward_entry_sender.send(entries)?;
}
*current_blob_index += entries_len;
res?;
@ -168,12 +175,12 @@ impl ReplayStage {
to_leader_sender: &TvuRotationSender,
ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>,
) -> (Self, EntryReceiver)
) -> (Self, Receiver<(u64, Pubkey)>, EntryReceiver)
where
T: 'static + KeypairUtil + Send + Sync,
{
let (forward_entry_sender, forward_entry_receiver) = channel();
// let (_slot_full_sender, _slot_full_receiver) = channel();
let (slot_full_sender, slot_full_receiver) = channel();
let exit_ = exit.clone();
let to_leader_sender = to_leader_sender.clone();
let subscriptions_ = subscriptions.clone();
@ -301,22 +308,12 @@ impl ReplayStage {
};
if !entries.is_empty() {
// if let Err(e) = Self::forward_entries(
// entries.clone(),
// slot,
// current_leader_id,
// bank.ticks_per_slot(),
// bank.tick_height(),
// &blocktree,
// &forward_entry_sender,
// ) {
// error!("{} forward_entries failed: {:?}", my_id, e);
// }
if let Err(e) = Self::process_entries(
entries,
&bank,
&cluster_info,
&voting_keypair,
&forward_entry_sender,
&mut current_blob_index,
&mut last_entry_id,
&subscriptions_,
@ -330,15 +327,8 @@ impl ReplayStage {
// We've reached the end of a slot, reset our state and check
// for leader rotation
if max_tick_height_for_slot == current_tick_height {
let entries_to_stream = blocktree.get_slot_entries(slot, 0, None).unwrap();
if let Err(e) = Self::forward_entries(
entries_to_stream,
slot,
current_leader_id,
&blocktree,
&forward_entry_sender,
) {
error!("{} forward_entries failed: {:?}", my_id, e);
if let Err(e) = slot_full_sender.send((slot, current_leader_id)) {
error!("{} slot_full alert failed: {:?}", my_id, e);
}
// Check for leader rotation
@ -408,7 +398,11 @@ impl ReplayStage {
})
.unwrap();
(Self { t_replay, exit }, forward_entry_receiver)
(
Self { t_replay, exit },
slot_full_receiver,
forward_entry_receiver,
)
}
pub fn close(self) -> thread::Result<()> {
@ -448,44 +442,6 @@ impl ReplayStage {
let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error");
next_slots.first().cloned()
}
fn forward_entries(
entries: Vec<Entry>,
slot: u64,
slot_leader: Pubkey,
blocktree: &Arc<Blocktree>,
forward_entry_sender: &EntrySender,
) -> Result<()> {
let blocktree_meta = blocktree.meta(slot).unwrap().unwrap();
let parent_slot = if slot == 0 {
None
} else {
Some(blocktree_meta.parent_slot)
};
let ticks_per_slot = entries
.iter()
.filter(|entry| entry.is_tick())
.fold(0, |acc, _| acc + 1);
let mut tick_height = ticks_per_slot * slot - 1;
let mut entries_with_meta = Vec::new();
for entry in entries.into_iter() {
if entry.is_tick() {
tick_height += 1;
}
let is_end_of_slot = (tick_height + 1) % ticks_per_slot == 0;
let entry_meta = EntryMeta {
tick_height,
slot,
slot_leader,
is_end_of_slot,
parent_slot,
entry,
};
entries_with_meta.push(entry_meta);
}
forward_entry_sender.send(entries_with_meta)?;
Ok(())
}
}
impl Service for ReplayStage {
@ -541,7 +497,7 @@ mod test {
let last_entry_id = bank_forks_info[0].last_entry_id;
let blocktree = Arc::new(blocktree);
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(),
Some(voting_keypair.clone()),
blocktree.clone(),
@ -566,7 +522,7 @@ mod test {
.recv()
.expect("Expected to receive an entry on the ledger writer receiver");
assert_eq!(next_tick[0], received_tick[0].entry);
assert_eq!(next_tick[0], received_tick[0]);
replay_stage
.close()
@ -583,6 +539,7 @@ mod test {
let my_node = Node::new_localhost_with_pubkey(my_id);
// Set up the cluster info
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
let (forward_entry_sender, _forward_entry_receiver) = channel();
let mut last_entry_id = Hash::default();
let mut current_blob_index = 0;
let mut last_id = Hash::default();
@ -600,6 +557,7 @@ mod test {
&bank,
&cluster_info_me,
&voting_keypair,
&forward_entry_sender,
&mut current_blob_index,
&mut last_entry_id,
&Arc::new(RpcSubscriptions::default()),
@ -622,6 +580,7 @@ mod test {
&bank,
&cluster_info_me,
&voting_keypair,
&forward_entry_sender,
&mut current_blob_index,
&mut last_entry_id,
&Arc::new(RpcSubscriptions::default()),

View File

@ -362,11 +362,7 @@ impl StorageStage {
tx_sender: &TransactionSender,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let entries: Vec<Entry> = entry_receiver
.recv_timeout(timeout)?
.iter()
.map(|entry_meta| entry_meta.entry.clone())
.collect();
let entries: Vec<Entry> = entry_receiver.recv_timeout(timeout)?;
for entry in entries {
// Go through the transactions, find votes, and use them to update
// the storage_keys with their signatures.
@ -450,7 +446,7 @@ impl Service for StorageStage {
mod tests {
use crate::blocktree::{create_new_tmp_ledger, Blocktree};
use crate::cluster_info::{ClusterInfo, NodeInfo};
use crate::entry::{make_tiny_test_entries, Entry, EntryMeta};
use crate::entry::{make_tiny_test_entries, Entry};
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::storage_stage::NUM_IDENTITIES;
@ -529,8 +525,7 @@ mod tests {
STORAGE_ROTATE_TEST_COUNT,
&cluster_info,
);
let entries_meta: Vec<EntryMeta> = entries.into_iter().map(EntryMeta::new).collect();
storage_entry_sender.send(entries_meta.clone()).unwrap();
storage_entry_sender.send(entries.clone()).unwrap();
let keypair = Keypair::new();
let hash = Hash::default();
@ -539,7 +534,7 @@ mod tests {
assert_eq!(result, Hash::default());
for _ in 0..9 {
storage_entry_sender.send(entries_meta.clone()).unwrap();
storage_entry_sender.send(entries.clone()).unwrap();
}
for _ in 0..5 {
result = storage_state.get_mining_result(&signature);
@ -592,8 +587,7 @@ mod tests {
STORAGE_ROTATE_TEST_COUNT,
&cluster_info,
);
let entries_meta: Vec<EntryMeta> = entries.into_iter().map(EntryMeta::new).collect();
storage_entry_sender.send(entries_meta.clone()).unwrap();
storage_entry_sender.send(entries.clone()).unwrap();
let mut reference_keys;
{
@ -605,7 +599,7 @@ mod tests {
let keypair = Keypair::new();
let vote_tx = VoteTransaction::new_vote(&keypair, 123456, Hash::default(), 1);
vote_txs.push(vote_tx);
let vote_entries = vec![EntryMeta::new(Entry::new(&Hash::default(), 1, vote_txs))];
let vote_entries = vec![Entry::new(&Hash::default(), 1, vote_txs)];
storage_entry_sender.send(vote_entries).unwrap();
for _ in 0..5 {

View File

@ -117,7 +117,7 @@ impl Tvu {
exit.clone(),
);
let (replay_stage, mut previous_receiver) = ReplayStage::new(
let (replay_stage, slot_full_receiver, forward_entry_receiver) = ReplayStage::new(
keypair.pubkey(),
voting_keypair,
blocktree.clone(),
@ -131,12 +131,12 @@ impl Tvu {
);
let blockstream_service = if blockstream.is_some() {
let (blockstream_service, blockstream_receiver) = BlockstreamService::new(
previous_receiver,
let blockstream_service = BlockstreamService::new(
slot_full_receiver,
blocktree.clone(),
blockstream.unwrap().to_string(),
exit.clone(),
);
previous_receiver = blockstream_receiver;
Some(blockstream_service)
} else {
None
@ -144,7 +144,7 @@ impl Tvu {
let storage_stage = StorageStage::new(
storage_state,
previous_receiver,
forward_entry_receiver,
Some(blocktree),
&keypair,
&exit.clone(),