@ -1,283 +0,0 @@
|
|||||||
//! The `blockstream` module provides a method for streaming entries out via a
|
|
||||||
//! local unix socket, to provide client services such as a block explorer with
|
|
||||||
//! real-time access to entries.
|
|
||||||
|
|
||||||
use bincode::serialize;
|
|
||||||
use chrono::{SecondsFormat, Utc};
|
|
||||||
use serde_json::json;
|
|
||||||
use solana_ledger::entry::Entry;
|
|
||||||
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
|
|
||||||
use std::cell::RefCell;
|
|
||||||
use std::io::Result;
|
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
|
|
||||||
pub trait EntryWriter: std::fmt::Debug {
|
|
||||||
fn write(&self, payload: String) -> Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
|
||||||
pub struct EntryVec {
|
|
||||||
values: RefCell<Vec<String>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EntryWriter for EntryVec {
|
|
||||||
fn write(&self, payload: String) -> Result<()> {
|
|
||||||
self.values.borrow_mut().push(payload);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EntryVec {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
EntryVec {
|
|
||||||
values: RefCell::new(Vec::new()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn entries(&self) -> Vec<String> {
|
|
||||||
self.values.borrow().clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct EntrySocket {
|
|
||||||
unix_socket: PathBuf,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EntryWriter for EntrySocket {
|
|
||||||
#[cfg(not(windows))]
|
|
||||||
fn write(&self, payload: String) -> Result<()> {
|
|
||||||
use std::io::prelude::*;
|
|
||||||
use std::net::Shutdown;
|
|
||||||
use std::os::unix::net::UnixStream;
|
|
||||||
|
|
||||||
const MESSAGE_TERMINATOR: &str = "\n";
|
|
||||||
|
|
||||||
let mut socket = UnixStream::connect(&self.unix_socket)?;
|
|
||||||
socket.write_all(payload.as_bytes())?;
|
|
||||||
socket.write_all(MESSAGE_TERMINATOR.as_bytes())?;
|
|
||||||
socket.shutdown(Shutdown::Write)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
#[cfg(windows)]
|
|
||||||
fn write(&self, _payload: String) -> Result<()> {
|
|
||||||
Err(std::io::Error::new(
|
|
||||||
std::io::ErrorKind::Other,
|
|
||||||
"EntryWriter::write() not implemented for windows",
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait BlockstreamEvents {
|
|
||||||
fn emit_entry_event(
|
|
||||||
&self,
|
|
||||||
slot: Slot,
|
|
||||||
tick_height: u64,
|
|
||||||
leader_pubkey: &Pubkey,
|
|
||||||
entries: &Entry,
|
|
||||||
) -> Result<()>;
|
|
||||||
fn emit_block_event(
|
|
||||||
&self,
|
|
||||||
slot: Slot,
|
|
||||||
tick_height: u64,
|
|
||||||
leader_pubkey: &Pubkey,
|
|
||||||
blockhash: Hash,
|
|
||||||
) -> Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Blockstream<T: EntryWriter> {
|
|
||||||
pub output: T,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> BlockstreamEvents for Blockstream<T>
|
|
||||||
where
|
|
||||||
T: EntryWriter,
|
|
||||||
{
|
|
||||||
fn emit_entry_event(
|
|
||||||
&self,
|
|
||||||
slot: Slot,
|
|
||||||
tick_height: u64,
|
|
||||||
leader_pubkey: &Pubkey,
|
|
||||||
entry: &Entry,
|
|
||||||
) -> Result<()> {
|
|
||||||
let transactions: Vec<Vec<u8>> = serialize_transactions(entry);
|
|
||||||
let stream_entry = json!({
|
|
||||||
"num_hashes": entry.num_hashes,
|
|
||||||
"hash": entry.hash,
|
|
||||||
"transactions": transactions
|
|
||||||
});
|
|
||||||
let json_entry = serde_json::to_string(&stream_entry)?;
|
|
||||||
let payload = format!(
|
|
||||||
r#"{{"dt":"{}","t":"entry","s":{},"h":{},"l":"{:?}","entry":{}}}"#,
|
|
||||||
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
|
|
||||||
slot,
|
|
||||||
tick_height,
|
|
||||||
leader_pubkey,
|
|
||||||
json_entry,
|
|
||||||
);
|
|
||||||
self.output.write(payload)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn emit_block_event(
|
|
||||||
&self,
|
|
||||||
slot: Slot,
|
|
||||||
tick_height: u64,
|
|
||||||
leader_pubkey: &Pubkey,
|
|
||||||
blockhash: Hash,
|
|
||||||
) -> Result<()> {
|
|
||||||
let payload = format!(
|
|
||||||
r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":"{:?}","hash":"{:?}"}}"#,
|
|
||||||
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
|
|
||||||
slot,
|
|
||||||
tick_height,
|
|
||||||
leader_pubkey,
|
|
||||||
blockhash,
|
|
||||||
);
|
|
||||||
self.output.write(payload)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type SocketBlockstream = Blockstream<EntrySocket>;
|
|
||||||
|
|
||||||
impl SocketBlockstream {
|
|
||||||
pub fn new(unix_socket: &Path) -> Self {
|
|
||||||
Blockstream {
|
|
||||||
output: EntrySocket {
|
|
||||||
unix_socket: unix_socket.to_path_buf(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type MockBlockstream = Blockstream<EntryVec>;
|
|
||||||
|
|
||||||
impl MockBlockstream {
|
|
||||||
pub fn new(_: &Path) -> Self {
|
|
||||||
Blockstream {
|
|
||||||
output: EntryVec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn entries(&self) -> Vec<String> {
|
|
||||||
self.output.entries()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_transactions(entry: &Entry) -> Vec<Vec<u8>> {
|
|
||||||
entry
|
|
||||||
.transactions
|
|
||||||
.iter()
|
|
||||||
.map(|tx| serialize(&tx).unwrap())
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use super::*;
|
|
||||||
use chrono::{DateTime, FixedOffset};
|
|
||||||
use serde_json::Value;
|
|
||||||
use solana_sdk::hash::Hash;
|
|
||||||
use solana_sdk::signature::{Keypair, Signer};
|
|
||||||
use solana_sdk::system_transaction;
|
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_serialize_transactions() {
|
|
||||||
let entry = Entry::new(&Hash::default(), 1, vec![]);
|
|
||||||
let empty_vec: Vec<Vec<u8>> = vec![];
|
|
||||||
assert_eq!(serialize_transactions(&entry), empty_vec);
|
|
||||||
|
|
||||||
let keypair0 = Keypair::new();
|
|
||||||
let keypair1 = Keypair::new();
|
|
||||||
let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
|
|
||||||
let tx1 = system_transaction::transfer(&keypair1, &keypair0.pubkey(), 2, Hash::default());
|
|
||||||
let serialized_tx0 = serialize(&tx0).unwrap();
|
|
||||||
let serialized_tx1 = serialize(&tx1).unwrap();
|
|
||||||
let entry = Entry::new(&Hash::default(), 1, vec![tx0, tx1]);
|
|
||||||
assert_eq!(
|
|
||||||
serialize_transactions(&entry),
|
|
||||||
vec![serialized_tx0, serialized_tx1]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_blockstream() -> () {
|
|
||||||
let blockstream = MockBlockstream::new(&PathBuf::from("test_stream"));
|
|
||||||
let ticks_per_slot = 5;
|
|
||||||
|
|
||||||
let mut blockhash = Hash::default();
|
|
||||||
let mut entries = Vec::new();
|
|
||||||
let mut expected_entries = Vec::new();
|
|
||||||
|
|
||||||
let tick_height_initial = 1;
|
|
||||||
let tick_height_final = tick_height_initial + ticks_per_slot + 2;
|
|
||||||
let mut curr_slot = 0;
|
|
||||||
let leader_pubkey = Pubkey::new_rand();
|
|
||||||
|
|
||||||
for tick_height in tick_height_initial..=tick_height_final {
|
|
||||||
if tick_height == 5 {
|
|
||||||
blockstream
|
|
||||||
.emit_block_event(curr_slot, tick_height, &leader_pubkey, blockhash)
|
|
||||||
.unwrap();
|
|
||||||
curr_slot += 1;
|
|
||||||
}
|
|
||||||
let entry = Entry::new(&mut blockhash, 1, vec![]); // just ticks
|
|
||||||
blockhash = entry.hash;
|
|
||||||
blockstream
|
|
||||||
.emit_entry_event(curr_slot, tick_height, &leader_pubkey, &entry)
|
|
||||||
.unwrap();
|
|
||||||
expected_entries.push(entry.clone());
|
|
||||||
entries.push(entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
blockstream.entries().len() as u64,
|
|
||||||
// one entry per tick (1..=N+2) is +3, plus one block
|
|
||||||
ticks_per_slot + 3 + 1
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut j = 0;
|
|
||||||
let mut matched_entries = 0;
|
|
||||||
let mut matched_slots = HashSet::new();
|
|
||||||
let mut matched_blocks = HashSet::new();
|
|
||||||
|
|
||||||
for item in blockstream.entries() {
|
|
||||||
let json: Value = serde_json::from_str(&item).unwrap();
|
|
||||||
let dt_str = json["dt"].as_str().unwrap();
|
|
||||||
|
|
||||||
// Ensure `ts` field parses as valid DateTime
|
|
||||||
let _dt: DateTime<FixedOffset> = DateTime::parse_from_rfc3339(dt_str).unwrap();
|
|
||||||
|
|
||||||
let item_type = json["t"].as_str().unwrap();
|
|
||||||
match item_type {
|
|
||||||
"block" => {
|
|
||||||
let hash = json["hash"].to_string();
|
|
||||||
matched_blocks.insert(hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
"entry" => {
|
|
||||||
let slot = json["s"].as_u64().unwrap();
|
|
||||||
matched_slots.insert(slot);
|
|
||||||
let entry_obj = json["entry"].clone();
|
|
||||||
let entry: Entry = serde_json::from_value(entry_obj).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(entry, expected_entries[j]);
|
|
||||||
matched_entries += 1;
|
|
||||||
j += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => {
|
|
||||||
assert!(false, "unknown item type {}", item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(matched_entries, expected_entries.len());
|
|
||||||
assert_eq!(matched_slots.len(), 2);
|
|
||||||
assert_eq!(matched_blocks.len(), 1);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,228 +0,0 @@
|
|||||||
//! The `blockstream_service` implements optional streaming of entries and block metadata
|
|
||||||
//! using the `blockstream` module, providing client services such as a block explorer with
|
|
||||||
//! real-time access to entries.
|
|
||||||
|
|
||||||
use crate::blockstream::BlockstreamEvents;
|
|
||||||
#[cfg(test)]
|
|
||||||
use crate::blockstream::MockBlockstream as Blockstream;
|
|
||||||
#[cfg(not(test))]
|
|
||||||
use crate::blockstream::SocketBlockstream as Blockstream;
|
|
||||||
use crate::result::{Error, Result};
|
|
||||||
use solana_ledger::blockstore::Blockstore;
|
|
||||||
use solana_sdk::pubkey::Pubkey;
|
|
||||||
use std::path::Path;
|
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
use std::sync::mpsc::{Receiver, RecvTimeoutError};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
pub struct BlockstreamService {
|
|
||||||
t_blockstream: JoinHandle<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BlockstreamService {
|
|
||||||
#[allow(clippy::new_ret_no_self)]
|
|
||||||
pub fn new(
|
|
||||||
slot_full_receiver: Receiver<(u64, Pubkey)>,
|
|
||||||
blockstore: Arc<Blockstore>,
|
|
||||||
unix_socket: &Path,
|
|
||||||
exit: &Arc<AtomicBool>,
|
|
||||||
) -> Self {
|
|
||||||
let mut blockstream = Blockstream::new(unix_socket);
|
|
||||||
let exit = exit.clone();
|
|
||||||
let t_blockstream = Builder::new()
|
|
||||||
.name("solana-blockstream".to_string())
|
|
||||||
.spawn(move || loop {
|
|
||||||
if exit.load(Ordering::Relaxed) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if let Err(e) =
|
|
||||||
Self::process_entries(&slot_full_receiver, &blockstore, &mut blockstream)
|
|
||||||
{
|
|
||||||
match e {
|
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
|
||||||
_ => info!("Error from process_entries: {:?}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
Self { t_blockstream }
|
|
||||||
}
|
|
||||||
fn process_entries(
|
|
||||||
slot_full_receiver: &Receiver<(u64, Pubkey)>,
|
|
||||||
blockstore: &Arc<Blockstore>,
|
|
||||||
blockstream: &mut Blockstream,
|
|
||||||
) -> Result<()> {
|
|
||||||
let timeout = Duration::new(1, 0);
|
|
||||||
let (slot, slot_leader) = slot_full_receiver.recv_timeout(timeout)?;
|
|
||||||
|
|
||||||
// Slot might not exist due to LedgerCleanupService, check first
|
|
||||||
let blockstore_meta = blockstore.meta(slot).unwrap();
|
|
||||||
if let Some(blockstore_meta) = blockstore_meta {
|
|
||||||
// Return error to main loop. Thread won't exit, will just log the error
|
|
||||||
let entries = blockstore.get_slot_entries(slot, 0, None)?;
|
|
||||||
let _parent_slot = if slot == 0 {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(blockstore_meta.parent_slot)
|
|
||||||
};
|
|
||||||
let ticks_per_slot = entries.iter().filter(|entry| entry.is_tick()).count() as u64;
|
|
||||||
let mut tick_height = ticks_per_slot * slot;
|
|
||||||
|
|
||||||
for (i, entry) in entries.iter().enumerate() {
|
|
||||||
if entry.is_tick() {
|
|
||||||
tick_height += 1;
|
|
||||||
}
|
|
||||||
blockstream
|
|
||||||
.emit_entry_event(slot, tick_height, &slot_leader, &entry)
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
|
|
||||||
});
|
|
||||||
if i == entries.len() - 1 {
|
|
||||||
blockstream
|
|
||||||
.emit_block_event(slot, tick_height, &slot_leader, entry.hash)
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn join(self) -> thread::Result<()> {
|
|
||||||
self.t_blockstream.join()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use super::*;
|
|
||||||
use bincode::{deserialize, serialize};
|
|
||||||
use chrono::{DateTime, FixedOffset};
|
|
||||||
use serde_json::Value;
|
|
||||||
use solana_ledger::create_new_tmp_ledger;
|
|
||||||
use solana_ledger::entry::{create_ticks, Entry};
|
|
||||||
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
|
|
||||||
use solana_sdk::hash::Hash;
|
|
||||||
use solana_sdk::signature::{Keypair, Signer};
|
|
||||||
use solana_sdk::system_transaction;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::mpsc::channel;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_blockstream_service_process_entries() {
|
|
||||||
let ticks_per_slot = 5;
|
|
||||||
let leader_pubkey = Pubkey::new_rand();
|
|
||||||
|
|
||||||
// Set up genesis config and blockstore
|
|
||||||
let GenesisConfigInfo {
|
|
||||||
mut genesis_config, ..
|
|
||||||
} = create_genesis_config(1000);
|
|
||||||
genesis_config.ticks_per_slot = ticks_per_slot;
|
|
||||||
|
|
||||||
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
|
|
||||||
let blockstore = Blockstore::open(&ledger_path).unwrap();
|
|
||||||
|
|
||||||
// Set up blockstream
|
|
||||||
let mut blockstream = Blockstream::new(&PathBuf::from("test_stream"));
|
|
||||||
|
|
||||||
// Set up dummy channel to receive a full-slot notification
|
|
||||||
let (slot_full_sender, slot_full_receiver) = channel();
|
|
||||||
|
|
||||||
// Create entries - 4 ticks + 1 populated entry + 1 tick
|
|
||||||
let mut entries = create_ticks(4, 0, Hash::default());
|
|
||||||
|
|
||||||
let keypair = Keypair::new();
|
|
||||||
let mut blockhash = entries[3].hash;
|
|
||||||
let tx = system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default());
|
|
||||||
let entry = Entry::new(&mut blockhash, 1, vec![tx]);
|
|
||||||
blockhash = entry.hash;
|
|
||||||
entries.push(entry);
|
|
||||||
let final_tick = create_ticks(1, 0, blockhash);
|
|
||||||
entries.extend_from_slice(&final_tick);
|
|
||||||
|
|
||||||
let expected_entries = entries.clone();
|
|
||||||
let expected_tick_heights = [6, 7, 8, 9, 9, 10];
|
|
||||||
|
|
||||||
blockstore
|
|
||||||
.write_entries(
|
|
||||||
1,
|
|
||||||
0,
|
|
||||||
0,
|
|
||||||
ticks_per_slot,
|
|
||||||
None,
|
|
||||||
true,
|
|
||||||
&Arc::new(Keypair::new()),
|
|
||||||
entries,
|
|
||||||
0,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
slot_full_sender.send((1, leader_pubkey)).unwrap();
|
|
||||||
BlockstreamService::process_entries(
|
|
||||||
&slot_full_receiver,
|
|
||||||
&Arc::new(blockstore),
|
|
||||||
&mut blockstream,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(blockstream.entries().len(), 7);
|
|
||||||
|
|
||||||
let (entry_events, block_events): (Vec<Value>, Vec<Value>) = blockstream
|
|
||||||
.entries()
|
|
||||||
.iter()
|
|
||||||
.map(|item| {
|
|
||||||
let json: Value = serde_json::from_str(&item).unwrap();
|
|
||||||
let dt_str = json["dt"].as_str().unwrap();
|
|
||||||
// Ensure `ts` field parses as valid DateTime
|
|
||||||
let _dt: DateTime<FixedOffset> = DateTime::parse_from_rfc3339(dt_str).unwrap();
|
|
||||||
json
|
|
||||||
})
|
|
||||||
.partition(|json| {
|
|
||||||
let item_type = json["t"].as_str().unwrap();
|
|
||||||
item_type == "entry"
|
|
||||||
});
|
|
||||||
for (i, json) in entry_events.iter().enumerate() {
|
|
||||||
let height = json["h"].as_u64().unwrap();
|
|
||||||
assert_eq!(height, expected_tick_heights[i]);
|
|
||||||
let entry_obj = json["entry"].clone();
|
|
||||||
let tx = entry_obj["transactions"].as_array().unwrap();
|
|
||||||
let entry: Entry;
|
|
||||||
if tx.len() == 0 {
|
|
||||||
entry = serde_json::from_value(entry_obj).unwrap();
|
|
||||||
} else {
|
|
||||||
let entry_json = entry_obj.as_object().unwrap();
|
|
||||||
entry = Entry {
|
|
||||||
num_hashes: entry_json.get("num_hashes").unwrap().as_u64().unwrap(),
|
|
||||||
hash: serde_json::from_value(entry_json.get("hash").unwrap().clone()).unwrap(),
|
|
||||||
transactions: entry_json
|
|
||||||
.get("transactions")
|
|
||||||
.unwrap()
|
|
||||||
.as_array()
|
|
||||||
.unwrap()
|
|
||||||
.into_iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(j, tx)| {
|
|
||||||
let tx_vec: Vec<u8> = serde_json::from_value(tx.clone()).unwrap();
|
|
||||||
// Check explicitly that transaction matches bincode-serialized format
|
|
||||||
assert_eq!(
|
|
||||||
tx_vec,
|
|
||||||
serialize(&expected_entries[i].transactions[j]).unwrap()
|
|
||||||
);
|
|
||||||
deserialize(&tx_vec).unwrap()
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
assert_eq!(entry, expected_entries[i]);
|
|
||||||
}
|
|
||||||
for json in block_events {
|
|
||||||
let slot = json["s"].as_u64().unwrap();
|
|
||||||
assert_eq!(1, slot);
|
|
||||||
let height = json["h"].as_u64().unwrap();
|
|
||||||
assert_eq!(2 * ticks_per_slot, height);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -14,8 +14,6 @@ mod deprecated;
|
|||||||
pub mod shred_fetch_stage;
|
pub mod shred_fetch_stage;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
pub mod contact_info;
|
pub mod contact_info;
|
||||||
pub mod blockstream;
|
|
||||||
pub mod blockstream_service;
|
|
||||||
pub mod cluster_info;
|
pub mod cluster_info;
|
||||||
pub mod cluster_slots;
|
pub mod cluster_slots;
|
||||||
pub mod consensus;
|
pub mod consensus;
|
||||||
|
@ -85,7 +85,6 @@ pub struct ReplayStageConfig {
|
|||||||
pub exit: Arc<AtomicBool>,
|
pub exit: Arc<AtomicBool>,
|
||||||
pub subscriptions: Arc<RpcSubscriptions>,
|
pub subscriptions: Arc<RpcSubscriptions>,
|
||||||
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
|
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||||
pub slot_full_senders: Vec<Sender<(u64, Pubkey)>>,
|
|
||||||
pub latest_root_senders: Vec<Sender<Slot>>,
|
pub latest_root_senders: Vec<Sender<Slot>>,
|
||||||
pub accounts_hash_sender: Option<SnapshotPackageSender>,
|
pub accounts_hash_sender: Option<SnapshotPackageSender>,
|
||||||
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||||
@ -190,7 +189,6 @@ impl ReplayStage {
|
|||||||
exit,
|
exit,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
leader_schedule_cache,
|
leader_schedule_cache,
|
||||||
slot_full_senders,
|
|
||||||
latest_root_senders,
|
latest_root_senders,
|
||||||
accounts_hash_sender,
|
accounts_hash_sender,
|
||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
@ -255,7 +253,6 @@ impl ReplayStage {
|
|||||||
&bank_forks,
|
&bank_forks,
|
||||||
&my_pubkey,
|
&my_pubkey,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
&slot_full_senders,
|
|
||||||
transaction_status_sender.clone(),
|
transaction_status_sender.clone(),
|
||||||
&verify_recyclers,
|
&verify_recyclers,
|
||||||
);
|
);
|
||||||
@ -796,7 +793,6 @@ impl ReplayStage {
|
|||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
progress: &mut ProgressMap,
|
progress: &mut ProgressMap,
|
||||||
slot_full_senders: &[Sender<(u64, Pubkey)>],
|
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
verify_recyclers: &VerifyRecyclers,
|
verify_recyclers: &VerifyRecyclers,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
@ -846,7 +842,8 @@ impl ReplayStage {
|
|||||||
bank_progress.replay_progress.num_shreds,
|
bank_progress.replay_progress.num_shreds,
|
||||||
);
|
);
|
||||||
did_complete_bank = true;
|
did_complete_bank = true;
|
||||||
Self::process_completed_bank(my_pubkey, bank, slot_full_senders);
|
info!("bank frozen: {}", bank.slot());
|
||||||
|
bank.freeze();
|
||||||
} else {
|
} else {
|
||||||
trace!(
|
trace!(
|
||||||
"bank {} not completed tick_height: {}, max_tick_height: {}",
|
"bank {} not completed tick_height: {}, max_tick_height: {}",
|
||||||
@ -1183,20 +1180,6 @@ impl ReplayStage {
|
|||||||
progress.retain(|k, _| r_bank_forks.get(*k).is_some());
|
progress.retain(|k, _| r_bank_forks.get(*k).is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_completed_bank(
|
|
||||||
my_pubkey: &Pubkey,
|
|
||||||
bank: Arc<Bank>,
|
|
||||||
slot_full_senders: &[Sender<(u64, Pubkey)>],
|
|
||||||
) {
|
|
||||||
info!("bank frozen: {}", bank.slot());
|
|
||||||
bank.freeze();
|
|
||||||
slot_full_senders.iter().for_each(|sender| {
|
|
||||||
if let Err(e) = sender.send((bank.slot(), *bank.collector_id())) {
|
|
||||||
trace!("{} slot_full alert failed: {:?}", my_pubkey, e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn generate_new_bank_forks(
|
fn generate_new_bank_forks(
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
forks_lock: &RwLock<BankForks>,
|
forks_lock: &RwLock<BankForks>,
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
accounts_hash_verifier::AccountsHashVerifier,
|
accounts_hash_verifier::AccountsHashVerifier,
|
||||||
blockstream_service::BlockstreamService,
|
|
||||||
broadcast_stage::RetransmitSlotsSender,
|
broadcast_stage::RetransmitSlotsSender,
|
||||||
cluster_info::ClusterInfo,
|
cluster_info::ClusterInfo,
|
||||||
cluster_info_vote_listener::VoteTracker,
|
cluster_info_vote_listener::VoteTracker,
|
||||||
@ -34,7 +33,6 @@ use solana_sdk::{
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::{
|
use std::{
|
||||||
net::UdpSocket,
|
net::UdpSocket,
|
||||||
path::PathBuf,
|
|
||||||
sync::{
|
sync::{
|
||||||
atomic::AtomicBool,
|
atomic::AtomicBool,
|
||||||
mpsc::{channel, Receiver},
|
mpsc::{channel, Receiver},
|
||||||
@ -48,7 +46,6 @@ pub struct Tvu {
|
|||||||
sigverify_stage: SigVerifyStage,
|
sigverify_stage: SigVerifyStage,
|
||||||
retransmit_stage: RetransmitStage,
|
retransmit_stage: RetransmitStage,
|
||||||
replay_stage: ReplayStage,
|
replay_stage: ReplayStage,
|
||||||
blockstream_service: Option<BlockstreamService>,
|
|
||||||
ledger_cleanup_service: Option<LedgerCleanupService>,
|
ledger_cleanup_service: Option<LedgerCleanupService>,
|
||||||
storage_stage: StorageStage,
|
storage_stage: StorageStage,
|
||||||
accounts_hash_verifier: AccountsHashVerifier,
|
accounts_hash_verifier: AccountsHashVerifier,
|
||||||
@ -88,7 +85,6 @@ impl Tvu {
|
|||||||
sockets: Sockets,
|
sockets: Sockets,
|
||||||
blockstore: Arc<Blockstore>,
|
blockstore: Arc<Blockstore>,
|
||||||
storage_state: &StorageState,
|
storage_state: &StorageState,
|
||||||
blockstream_unix_socket: Option<&PathBuf>,
|
|
||||||
ledger_signal_receiver: Receiver<bool>,
|
ledger_signal_receiver: Receiver<bool>,
|
||||||
subscriptions: &Arc<RpcSubscriptions>,
|
subscriptions: &Arc<RpcSubscriptions>,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
@ -162,7 +158,6 @@ impl Tvu {
|
|||||||
tvu_config.shred_version,
|
tvu_config.shred_version,
|
||||||
);
|
);
|
||||||
|
|
||||||
let (blockstream_slot_sender, blockstream_slot_receiver) = channel();
|
|
||||||
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
|
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
|
||||||
|
|
||||||
let (accounts_hash_sender, accounts_hash_receiver) = channel();
|
let (accounts_hash_sender, accounts_hash_receiver) = channel();
|
||||||
@ -183,7 +178,6 @@ impl Tvu {
|
|||||||
exit: exit.clone(),
|
exit: exit.clone(),
|
||||||
subscriptions: subscriptions.clone(),
|
subscriptions: subscriptions.clone(),
|
||||||
leader_schedule_cache: leader_schedule_cache.clone(),
|
leader_schedule_cache: leader_schedule_cache.clone(),
|
||||||
slot_full_senders: vec![blockstream_slot_sender],
|
|
||||||
latest_root_senders: vec![ledger_cleanup_slot_sender],
|
latest_root_senders: vec![ledger_cleanup_slot_sender],
|
||||||
accounts_hash_sender: Some(accounts_hash_sender),
|
accounts_hash_sender: Some(accounts_hash_sender),
|
||||||
block_commitment_cache,
|
block_commitment_cache,
|
||||||
@ -202,18 +196,6 @@ impl Tvu {
|
|||||||
retransmit_slots_sender,
|
retransmit_slots_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket {
|
|
||||||
let blockstream_service = BlockstreamService::new(
|
|
||||||
blockstream_slot_receiver,
|
|
||||||
blockstore.clone(),
|
|
||||||
blockstream_unix_socket,
|
|
||||||
&exit,
|
|
||||||
);
|
|
||||||
Some(blockstream_service)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let ledger_cleanup_service = tvu_config.max_ledger_slots.map(|max_ledger_slots| {
|
let ledger_cleanup_service = tvu_config.max_ledger_slots.map(|max_ledger_slots| {
|
||||||
LedgerCleanupService::new(
|
LedgerCleanupService::new(
|
||||||
ledger_cleanup_slot_receiver,
|
ledger_cleanup_slot_receiver,
|
||||||
@ -239,7 +221,6 @@ impl Tvu {
|
|||||||
sigverify_stage,
|
sigverify_stage,
|
||||||
retransmit_stage,
|
retransmit_stage,
|
||||||
replay_stage,
|
replay_stage,
|
||||||
blockstream_service,
|
|
||||||
ledger_cleanup_service,
|
ledger_cleanup_service,
|
||||||
storage_stage,
|
storage_stage,
|
||||||
accounts_hash_verifier,
|
accounts_hash_verifier,
|
||||||
@ -251,9 +232,6 @@ impl Tvu {
|
|||||||
self.fetch_stage.join()?;
|
self.fetch_stage.join()?;
|
||||||
self.sigverify_stage.join()?;
|
self.sigverify_stage.join()?;
|
||||||
self.storage_stage.join()?;
|
self.storage_stage.join()?;
|
||||||
if self.blockstream_service.is_some() {
|
|
||||||
self.blockstream_service.unwrap().join()?;
|
|
||||||
}
|
|
||||||
if self.ledger_cleanup_service.is_some() {
|
if self.ledger_cleanup_service.is_some() {
|
||||||
self.ledger_cleanup_service.unwrap().join()?;
|
self.ledger_cleanup_service.unwrap().join()?;
|
||||||
}
|
}
|
||||||
@ -319,7 +297,6 @@ pub mod tests {
|
|||||||
},
|
},
|
||||||
blockstore,
|
blockstore,
|
||||||
&StorageState::default(),
|
&StorageState::default(),
|
||||||
None,
|
|
||||||
l_receiver,
|
l_receiver,
|
||||||
&Arc::new(RpcSubscriptions::new(&exit)),
|
&Arc::new(RpcSubscriptions::new(&exit)),
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
|
@ -64,7 +64,6 @@ pub struct ValidatorConfig {
|
|||||||
pub expected_genesis_hash: Option<Hash>,
|
pub expected_genesis_hash: Option<Hash>,
|
||||||
pub expected_shred_version: Option<u16>,
|
pub expected_shred_version: Option<u16>,
|
||||||
pub voting_disabled: bool,
|
pub voting_disabled: bool,
|
||||||
pub blockstream_unix_socket: Option<PathBuf>,
|
|
||||||
pub storage_slots_per_turn: u64,
|
pub storage_slots_per_turn: u64,
|
||||||
pub account_paths: Vec<PathBuf>,
|
pub account_paths: Vec<PathBuf>,
|
||||||
pub rpc_config: JsonRpcConfig,
|
pub rpc_config: JsonRpcConfig,
|
||||||
@ -89,7 +88,6 @@ impl Default for ValidatorConfig {
|
|||||||
expected_genesis_hash: None,
|
expected_genesis_hash: None,
|
||||||
expected_shred_version: None,
|
expected_shred_version: None,
|
||||||
voting_disabled: false,
|
voting_disabled: false,
|
||||||
blockstream_unix_socket: None,
|
|
||||||
storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN,
|
storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN,
|
||||||
max_ledger_slots: None,
|
max_ledger_slots: None,
|
||||||
account_paths: Vec::new(),
|
account_paths: Vec::new(),
|
||||||
@ -418,7 +416,6 @@ impl Validator {
|
|||||||
},
|
},
|
||||||
blockstore.clone(),
|
blockstore.clone(),
|
||||||
&storage_state,
|
&storage_state,
|
||||||
config.blockstream_unix_socket.as_ref(),
|
|
||||||
ledger_signal_receiver,
|
ledger_signal_receiver,
|
||||||
&subscriptions,
|
&subscriptions,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
|
@ -121,35 +121,7 @@ thread apply all bt
|
|||||||
|
|
||||||
This will dump all the threads stack traces into gdb.txt
|
This will dump all the threads stack traces into gdb.txt
|
||||||
|
|
||||||
### Blockstreamer
|
## Developer Testnet
|
||||||
|
|
||||||
Solana supports a node type called a _blockstreamer_. This validator variation is intended for applications that need to observe the data plane without participating in transaction validation or ledger replication.
|
|
||||||
|
|
||||||
A blockstreamer runs without a vote signer, and can optionally stream ledger entries out to a Unix domain socket as they are processed. The JSON-RPC service still functions as on any other node.
|
|
||||||
|
|
||||||
To run a blockstreamer, include the argument `no-signer` and \(optional\) `blockstream` socket location:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
$ NDEBUG=1 ./multinode-demo/validator-x.sh --no-signer --blockstream <SOCKET>
|
|
||||||
```
|
|
||||||
|
|
||||||
The stream will output a series of JSON objects:
|
|
||||||
|
|
||||||
* An Entry event JSON object is sent when each ledger entry is processed, with the following fields:
|
|
||||||
* `dt`, the system datetime, as RFC3339-formatted string
|
|
||||||
* `t`, the event type, always "entry"
|
|
||||||
* `s`, the slot height, as unsigned 64-bit integer
|
|
||||||
* `h`, the tick height, as unsigned 64-bit integer
|
|
||||||
* `entry`, the entry, as JSON object
|
|
||||||
* A Block event JSON object is sent when a block is complete, with the following fields:
|
|
||||||
* `dt`, the system datetime, as RFC3339-formatted string
|
|
||||||
* `t`, the event type, always "block"
|
|
||||||
* `s`, the slot height, as unsigned 64-bit integer
|
|
||||||
* `h`, the tick height, as unsigned 64-bit integer
|
|
||||||
* `l`, the slot leader id, as base-58 encoded string
|
|
||||||
* `hash`, the [blockhash](terminology.md#blockhash), as base-58 encoded string
|
|
||||||
|
|
||||||
## Public Testnet
|
|
||||||
|
|
||||||
In this example the client connects to our public testnet. To run validators on the testnet you would need to open udp ports `8000-10000`.
|
In this example the client connects to our public testnet. To run validators on the testnet you would need to open udp ports `8000-10000`.
|
||||||
|
|
||||||
|
@ -29,7 +29,6 @@ Start a validator with no stake
|
|||||||
|
|
||||||
OPTIONS:
|
OPTIONS:
|
||||||
--ledger PATH - store ledger under this PATH
|
--ledger PATH - store ledger under this PATH
|
||||||
--blockstream PATH - open blockstream at this unix domain socket location
|
|
||||||
--init-complete-file FILE - create this file, if it doesn't already exist, once node initialization is complete
|
--init-complete-file FILE - create this file, if it doesn't already exist, once node initialization is complete
|
||||||
--label LABEL - Append the given label to the configuration files, useful when running
|
--label LABEL - Append the given label to the configuration files, useful when running
|
||||||
multiple validators in the same workspace
|
multiple validators in the same workspace
|
||||||
@ -60,9 +59,6 @@ while [[ -n $1 ]]; do
|
|||||||
airdrops_enabled=0
|
airdrops_enabled=0
|
||||||
shift
|
shift
|
||||||
# solana-validator options
|
# solana-validator options
|
||||||
elif [[ $1 = --blockstream ]]; then
|
|
||||||
args+=("$1" "$2")
|
|
||||||
shift 2
|
|
||||||
elif [[ $1 = --expected-genesis-hash ]]; then
|
elif [[ $1 = --expected-genesis-hash ]]; then
|
||||||
args+=("$1" "$2")
|
args+=("$1" "$2")
|
||||||
shift 2
|
shift 2
|
||||||
|
14
run.sh
14
run.sh
@ -29,17 +29,6 @@ $ok || {
|
|||||||
exit 1
|
exit 1
|
||||||
}
|
}
|
||||||
|
|
||||||
blockstreamSocket=/tmp/solana-blockstream.sock # Default to location used by the block explorer
|
|
||||||
while [[ -n $1 ]]; do
|
|
||||||
if [[ $1 = --blockstream ]]; then
|
|
||||||
blockstreamSocket=$2
|
|
||||||
shift 2
|
|
||||||
else
|
|
||||||
echo "Unknown argument: $1"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
|
|
||||||
export RUST_LOG=${RUST_LOG:-solana=info} # if RUST_LOG is unset, default to info
|
export RUST_LOG=${RUST_LOG:-solana=info} # if RUST_LOG is unset, default to info
|
||||||
export RUST_BACKTRACE=1
|
export RUST_BACKTRACE=1
|
||||||
dataDir=$PWD/config/"$(basename "$0" .sh)"
|
dataDir=$PWD/config/"$(basename "$0" .sh)"
|
||||||
@ -108,9 +97,6 @@ args=(
|
|||||||
--enable-rpc-get-confirmed-block
|
--enable-rpc-get-confirmed-block
|
||||||
--init-complete-file "$dataDir"/init-completed
|
--init-complete-file "$dataDir"/init-completed
|
||||||
)
|
)
|
||||||
if [[ -n $blockstreamSocket ]]; then
|
|
||||||
args+=(--blockstream "$blockstreamSocket")
|
|
||||||
fi
|
|
||||||
solana-validator "${args[@]}" &
|
solana-validator "${args[@]}" &
|
||||||
validator=$!
|
validator=$!
|
||||||
|
|
||||||
|
@ -356,14 +356,6 @@ pub fn main() {
|
|||||||
|
|
||||||
let matches = App::new(crate_name!()).about(crate_description!())
|
let matches = App::new(crate_name!()).about(crate_description!())
|
||||||
.version(solana_clap_utils::version!())
|
.version(solana_clap_utils::version!())
|
||||||
.arg(
|
|
||||||
Arg::with_name("blockstream_unix_socket")
|
|
||||||
.long("blockstream")
|
|
||||||
.takes_value(true)
|
|
||||||
.hidden(true) // Don't document this argument to discourage its use
|
|
||||||
.value_name("UNIX DOMAIN SOCKET")
|
|
||||||
.help("Stream entries to this unix domain socket path")
|
|
||||||
)
|
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name(SKIP_SEED_PHRASE_VALIDATION_ARG.name)
|
Arg::with_name(SKIP_SEED_PHRASE_VALIDATION_ARG.name)
|
||||||
.long(SKIP_SEED_PHRASE_VALIDATION_ARG.long)
|
.long(SKIP_SEED_PHRASE_VALIDATION_ARG.long)
|
||||||
@ -719,9 +711,6 @@ pub fn main() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut validator_config = ValidatorConfig {
|
let mut validator_config = ValidatorConfig {
|
||||||
blockstream_unix_socket: matches
|
|
||||||
.value_of("blockstream_unix_socket")
|
|
||||||
.map(PathBuf::from),
|
|
||||||
dev_sigverify_disabled: matches.is_present("dev_no_sigverify"),
|
dev_sigverify_disabled: matches.is_present("dev_no_sigverify"),
|
||||||
dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(),
|
dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(),
|
||||||
expected_genesis_hash: matches
|
expected_genesis_hash: matches
|
||||||
|
Reference in New Issue
Block a user