From 918d04e3f00a253f10d1a8413f8d3b4a21af5fc0 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Fri, 12 Mar 2021 21:44:06 +0800 Subject: [PATCH] Add more slot update notifications (#15734) * Add more slot update notifications * fix merge * Address feedback and add integration test * switch to datapoint * remove unused shred method * fix clippy * new thread for rpc completed slots * remove extra constant * fixes * rely on channel closing * fix check --- client/src/rpc_response.rs | 58 +++++++++++- core/src/cluster_slots_service.rs | 18 +++- core/src/lib.rs | 1 + .../optimistically_confirmed_bank_tracker.rs | 22 ++++- core/src/replay_stage.rs | 34 ++++++- core/src/retransmit_stage.rs | 93 +++++++++---------- core/src/rpc_completed_slots_service.rs | 33 +++++++ core/src/rpc_subscriptions.rs | 5 + core/src/sigverify_shreds.rs | 21 +---- core/src/tvu.rs | 8 +- core/src/validator.rs | 10 +- core/tests/rpc.rs | 69 +++++++++++++- ledger/src/blockstore.rs | 17 ++-- ledger/src/shred.rs | 12 +++ runtime/src/bank.rs | 37 ++++++++ 15 files changed, 340 insertions(+), 98 deletions(-) create mode 100644 core/src/rpc_completed_slots_service.rs diff --git a/client/src/rpc_response.rs b/client/src/rpc_response.rs index f51afd956b..a6d14e6b33 100644 --- a/client/src/rpc_response.rs +++ b/client/src/rpc_response.rs @@ -101,13 +101,63 @@ pub struct SlotInfo { pub root: Slot, } +#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct SlotTransactionStats { + pub num_transaction_entries: u64, + pub num_successful_transactions: u64, + pub num_failed_transactions: u64, + pub max_transactions_per_entry: u64, +} + #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase", tag = "type")] pub enum SlotUpdate { - OptimisticConfirmation { slot: Slot, timestamp: u64 }, - FirstShredReceived { slot: Slot, timestamp: u64 }, - Frozen { slot: Slot, timestamp: u64 }, - Root { slot: Slot, timestamp: u64 }, + FirstShredReceived { + slot: Slot, + timestamp: u64, + }, + Completed { + slot: Slot, + timestamp: u64, + }, + CreatedBank { + slot: Slot, + parent: Slot, + timestamp: u64, + }, + Frozen { + slot: Slot, + timestamp: u64, + stats: SlotTransactionStats, + }, + Dead { + slot: Slot, + timestamp: u64, + err: String, + }, + OptimisticConfirmation { + slot: Slot, + timestamp: u64, + }, + Root { + slot: Slot, + timestamp: u64, + }, +} + +impl SlotUpdate { + pub fn slot(&self) -> Slot { + match self { + Self::FirstShredReceived { slot, .. } => *slot, + Self::Completed { slot, .. } => *slot, + Self::CreatedBank { slot, .. } => *slot, + Self::Frozen { slot, .. } => *slot, + Self::Dead { slot, .. } => *slot, + Self::OptimisticConfirmation { slot, .. } => *slot, + Self::Root { slot, .. } => *slot, + } + } } #[derive(Serialize, Deserialize, Clone, Debug)] diff --git a/core/src/cluster_slots_service.rs b/core/src/cluster_slots_service.rs index 6e7a73b312..db77bc5cdb 100644 --- a/core/src/cluster_slots_service.rs +++ b/core/src/cluster_slots_service.rs @@ -6,9 +6,9 @@ use solana_sdk::{clock::Slot, pubkey::Pubkey}; use std::{ sync::{ atomic::{AtomicBool, Ordering}, + mpsc::RecvTimeoutError, {Arc, RwLock}, }, - thread::sleep, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }; @@ -79,6 +79,14 @@ impl ClusterSlotsService { if exit.load(Ordering::Relaxed) { break; } + let slots = match completed_slots_receiver.recv_timeout(Duration::from_millis(200)) { + Ok(slots) => Some(slots), + Err(RecvTimeoutError::Timeout) => None, + Err(RecvTimeoutError::Disconnected) => { + warn!("Cluster slots service - sender disconnected"); + break; + } + }; let new_root = bank_forks.read().unwrap().root(); let id = cluster_info.id(); let mut lowest_slot_elapsed = Measure::start("lowest_slot_elapsed"); @@ -87,7 +95,9 @@ impl ClusterSlotsService { lowest_slot_elapsed.stop(); let mut update_completed_slots_elapsed = Measure::start("update_completed_slots_elapsed"); - Self::update_completed_slots(&completed_slots_receiver, &cluster_info); + if let Some(slots) = slots { + Self::update_completed_slots(slots, &completed_slots_receiver, &cluster_info); + } cluster_slots.update(new_root, &cluster_info, &bank_forks); update_completed_slots_elapsed.stop(); @@ -113,20 +123,20 @@ impl ClusterSlotsService { cluster_slots_service_timing = ClusterSlotsServiceTiming::default(); last_stats = Instant::now(); } - sleep(Duration::from_millis(200)); } } fn update_completed_slots( + mut slots: Vec, completed_slots_receiver: &CompletedSlotsReceiver, cluster_info: &ClusterInfo, ) { - let mut slots: Vec = vec![]; while let Ok(mut more) = completed_slots_receiver.try_recv() { slots.append(&mut more); } #[allow(clippy::stable_sort_primitive)] slots.sort(); + if !slots.is_empty() { cluster_info.push_epoch_slots(&slots); } diff --git a/core/src/lib.rs b/core/src/lib.rs index 59b987c4dc..f096cead27 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -58,6 +58,7 @@ mod result; pub mod retransmit_stage; pub mod rewards_recorder_service; pub mod rpc; +pub mod rpc_completed_slots_service; pub mod rpc_health; pub mod rpc_pubsub; pub mod rpc_pubsub_service; diff --git a/core/src/optimistically_confirmed_bank_tracker.rs b/core/src/optimistically_confirmed_bank_tracker.rs index 2a422819b2..0cf4262c6a 100644 --- a/core/src/optimistically_confirmed_bank_tracker.rs +++ b/core/src/optimistically_confirmed_bank_tracker.rs @@ -4,7 +4,7 @@ use crate::rpc_subscriptions::RpcSubscriptions; use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; -use solana_client::rpc_response::SlotUpdate; +use solana_client::rpc_response::{SlotTransactionStats, SlotUpdate}; use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::{clock::Slot, timing::timestamp}; use std::{ @@ -140,6 +140,22 @@ impl OptimisticallyConfirmedBankTracker { } BankNotification::Frozen(bank) => { let frozen_slot = bank.slot(); + if let Some(parent) = bank.parent() { + let num_successful_transactions = bank + .transaction_count() + .saturating_sub(parent.transaction_count()); + subscriptions.notify_slot_update(SlotUpdate::Frozen { + slot: frozen_slot, + timestamp: timestamp(), + stats: SlotTransactionStats { + num_transaction_entries: bank.transaction_entries_count(), + num_successful_transactions, + num_failed_transactions: bank.transaction_error_count(), + max_transactions_per_entry: bank.transactions_per_entry_max(), + }, + }); + } + if pending_optimistically_confirmed_banks.remove(&bank.slot()) { let mut w_optimistically_confirmed_bank = optimistically_confirmed_bank.write().unwrap(); @@ -149,10 +165,6 @@ impl OptimisticallyConfirmedBankTracker { } drop(w_optimistically_confirmed_bank); } - subscriptions.notify_slot_update(SlotUpdate::Frozen { - slot: frozen_slot, - timestamp: timestamp(), - }); } BankNotification::Root(bank) => { let root_slot = bank.slot(); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index fc7a349d42..7313067f69 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -18,6 +18,7 @@ use crate::{ rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, }; +use solana_client::rpc_response::SlotUpdate; use solana_ledger::{ block_error::BlockError, blockstore::Blockstore, @@ -331,6 +332,7 @@ impl ReplayStage { &replay_vote_sender, &bank_notification_sender, &rewards_recorder_sender, + &subscriptions, ); replay_active_banks_time.stop(); Self::report_memory(&allocated, "replay_active_banks", start); @@ -997,6 +999,7 @@ impl ReplayStage { transaction_status_sender: Option, replay_vote_sender: &ReplayVoteSender, verify_recyclers: &VerifyRecyclers, + subscriptions: &Arc, ) -> result::Result { let tx_count_before = bank_progress.replay_progress.num_txs; let confirm_result = blockstore_processor::confirm_slot( @@ -1032,7 +1035,15 @@ impl ReplayStage { } else { info!("Slot had too few ticks: {}", slot); } - Self::mark_dead_slot(blockstore, bank_progress, slot, &err, is_serious); + Self::mark_dead_slot( + blockstore, + bank_progress, + slot, + &err, + is_serious, + subscriptions, + ); + err })?; @@ -1045,6 +1056,7 @@ impl ReplayStage { slot: Slot, err: &BlockstoreProcessorError, is_serious: bool, + subscriptions: &Arc, ) { if is_serious { datapoint_error!( @@ -1063,6 +1075,11 @@ impl ReplayStage { blockstore .set_dead_slot(slot) .expect("Failed to mark slot as dead in blockstore"); + subscriptions.notify_slot_update(SlotUpdate::Dead { + slot, + err: format!("error: {:?}", err), + timestamp: timestamp(), + }); } #[allow(clippy::too_many_arguments)] @@ -1313,6 +1330,7 @@ impl ReplayStage { replay_vote_sender: &ReplayVoteSender, bank_notification_sender: &Option, rewards_recorder_sender: &Option, + subscriptions: &Arc, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -1360,6 +1378,7 @@ impl ReplayStage { transaction_status_sender.clone(), replay_vote_sender, verify_recyclers, + subscriptions, ); match replay_result { Ok(replay_tx_count) => tx_count += replay_tx_count, @@ -1400,6 +1419,7 @@ impl ReplayStage { bank.slot(), &BlockstoreProcessorError::InvalidBlock(BlockError::DuplicateBlock), true, + subscriptions, ); warn!( "{} duplicate shreds detected, not freezing bank {}", @@ -2541,7 +2561,8 @@ pub(crate) mod tests { .. } = create_genesis_config(1000); genesis_config.poh_config.hashes_per_tick = Some(2); - let bank0 = Arc::new(Bank::new(&genesis_config)); + let bank_forks = BankForks::new(Bank::new(&genesis_config)); + let bank0 = bank_forks.working_bank(); let mut progress = ProgressMap::default(); let last_blockhash = bank0.last_blockhash(); let mut bank0_progress = progress @@ -2549,6 +2570,9 @@ pub(crate) mod tests { .or_insert_with(|| ForkProgress::new(last_blockhash, None, None, 0, 0)); let shreds = shred_to_insert(&mint_keypair, bank0.clone()); blockstore.insert_shreds(shreds, None, false).unwrap(); + let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let bank_forks = Arc::new(RwLock::new(bank_forks)); + let exit = Arc::new(AtomicBool::new(false)); let res = ReplayStage::replay_blockstore_into_bank( &bank0, &blockstore, @@ -2556,6 +2580,12 @@ pub(crate) mod tests { None, &replay_vote_sender, &&VerifyRecyclers::default(), + &Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + block_commitment_cache, + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + )), ); // Check that the erroring bank was marked as dead in the progress map diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index e59fcfe3c3..3ac3fd4388 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -12,6 +12,7 @@ use crate::{ repair_service::DuplicateSlotsResetSender, repair_service::RepairInfo, result::{Error, Result}, + rpc_completed_slots_service::RpcCompletedSlotsService, rpc_subscriptions::RpcSubscriptions, window_service::{should_retransmit_and_persist, WindowService}, }; @@ -219,7 +220,7 @@ pub type ShredFilter = LruCache<(Slot, u32, bool), Vec>; pub type ShredFilterAndHasher = (ShredFilter, PacketHasher); // Returns None if shred is already received and should skip retransmit. -// Otherwise returns shred's slot. +// Otherwise returns shred's slot and whether the shred is a data shred. fn check_if_already_received( packet: &Packet, shreds_received: &Mutex, @@ -246,40 +247,29 @@ fn check_if_already_received( } } -fn notify_first_shred_received( +// Returns true if this is the first time receiving a shred for `shred_slot`. +fn check_if_first_shred_received( shred_slot: Slot, - rpc_subscriptions: &RpcSubscriptions, - sent_received_slot_notification: &Mutex>, + first_shreds_received: &Mutex>, root_bank: &Bank, -) { - let notify_slot = { - let mut sent_received_slot_notification_locked = - sent_received_slot_notification.lock().unwrap(); - if !sent_received_slot_notification_locked.contains(&shred_slot) - && shred_slot > root_bank.slot() - { - sent_received_slot_notification_locked.insert(shred_slot); - if sent_received_slot_notification_locked.len() > 100 { - let mut slots_before_root = - sent_received_slot_notification_locked.split_off(&(root_bank.slot() + 1)); - // `slots_before_root` now contains all slots <= root - std::mem::swap( - &mut slots_before_root, - &mut sent_received_slot_notification_locked, - ); - } - Some(shred_slot) - } else { - None - } - }; +) -> bool { + if shred_slot <= root_bank.slot() { + return false; + } - if let Some(slot) = notify_slot { - info!("First time receiving a shred from slot: {}", slot); - rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived { - slot, - timestamp: timestamp(), - }); + let mut first_shreds_received_locked = first_shreds_received.lock().unwrap(); + if !first_shreds_received_locked.contains(&shred_slot) { + datapoint_info!("retransmit-first-shred", ("slot", shred_slot, i64)); + first_shreds_received_locked.insert(shred_slot); + if first_shreds_received_locked.len() > 100 { + let mut slots_before_root = + first_shreds_received_locked.split_off(&(root_bank.slot() + 1)); + // `slots_before_root` now contains all slots <= root + std::mem::swap(&mut slots_before_root, &mut first_shreds_received_locked); + } + true + } else { + false } } @@ -312,7 +302,7 @@ fn retransmit( last_peer_update: &AtomicU64, shreds_received: &Mutex, max_slots: &MaxSlots, - sent_received_slot_notification: &Mutex>, + first_shreds_received: &Mutex>, rpc_subscriptions: &Option>, ) -> Result<()> { let timer = Duration::new(1, 0); @@ -391,12 +381,12 @@ fn retransmit( max_slot = max_slot.max(shred_slot); if let Some(rpc_subscriptions) = rpc_subscriptions { - notify_first_shred_received( - shred_slot, - rpc_subscriptions, - sent_received_slot_notification, - &root_bank, - ); + if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) { + rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived { + slot: shred_slot, + timestamp: timestamp(), + }); + } } let mut compute_turbine_peers = Measure::start("turbine_start"); @@ -504,7 +494,7 @@ pub fn retransmitter( LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default(), ))); - let sent_received_slot_notification = Arc::new(Mutex::new(BTreeSet::new())); + let first_shreds_received = Arc::new(Mutex::new(BTreeSet::new())); (0..sockets.len()) .map(|s| { let sockets = sockets.clone(); @@ -517,7 +507,7 @@ pub fn retransmitter( let last_peer_update = Arc::new(AtomicU64::new(0)); let shreds_received = shreds_received.clone(); let max_slots = max_slots.clone(); - let sent_received_slot_notification = sent_received_slot_notification.clone(); + let first_shreds_received = first_shreds_received.clone(); let rpc_subscriptions = rpc_subscriptions.clone(); Builder::new() @@ -537,7 +527,7 @@ pub fn retransmitter( &last_peer_update, &shreds_received, &max_slots, - &sent_received_slot_notification, + &first_shreds_received, &rpc_subscriptions, ) { match e { @@ -574,7 +564,7 @@ impl RetransmitStage { repair_socket: Arc, verified_receiver: Receiver>, exit: &Arc, - completed_slots_receiver: CompletedSlotsReceiver, + completed_slots_receivers: [CompletedSlotsReceiver; 2], epoch_schedule: EpochSchedule, cfg: Option>, shred_version: u16, @@ -596,18 +586,23 @@ impl RetransmitStage { cluster_info.clone(), retransmit_receiver, max_slots, - rpc_subscriptions, + rpc_subscriptions.clone(), ); - let leader_schedule_cache_clone = leader_schedule_cache.clone(); + let [rpc_completed_slots_receiver, cluster_completed_slots_receiver] = + completed_slots_receivers; + let rpc_completed_slots_hdl = + RpcCompletedSlotsService::spawn(rpc_completed_slots_receiver, rpc_subscriptions); let cluster_slots_service = ClusterSlotsService::new( blockstore.clone(), cluster_slots.clone(), bank_forks.clone(), cluster_info.clone(), - completed_slots_receiver, + cluster_completed_slots_receiver, exit.clone(), ); + + let leader_schedule_cache_clone = leader_schedule_cache.clone(); let repair_info = RepairInfo { bank_forks, epoch_schedule, @@ -643,7 +638,11 @@ impl RetransmitStage { completed_data_sets_sender, ); - let thread_hdls = t_retransmit; + let mut thread_hdls = t_retransmit; + if let Some(thread_hdl) = rpc_completed_slots_hdl { + thread_hdls.push(thread_hdl); + } + Self { thread_hdls, window_service, diff --git a/core/src/rpc_completed_slots_service.rs b/core/src/rpc_completed_slots_service.rs new file mode 100644 index 0000000000..ec5d46ca8e --- /dev/null +++ b/core/src/rpc_completed_slots_service.rs @@ -0,0 +1,33 @@ +use crate::rpc_subscriptions::RpcSubscriptions; +use solana_client::rpc_response::SlotUpdate; +use solana_ledger::blockstore::CompletedSlotsReceiver; +use solana_sdk::timing::timestamp; +use std::{ + sync::Arc, + thread::{Builder, JoinHandle}, +}; + +pub struct RpcCompletedSlotsService; +impl RpcCompletedSlotsService { + pub fn spawn( + completed_slots_receiver: CompletedSlotsReceiver, + rpc_subscriptions: Option>, + ) -> Option> { + let rpc_subscriptions = rpc_subscriptions?; + Some( + Builder::new() + .name("solana-rpc-completed-slots-service".to_string()) + .spawn(move || { + for slots in completed_slots_receiver.iter() { + for slot in slots { + rpc_subscriptions.notify_slot_update(SlotUpdate::Completed { + slot, + timestamp: timestamp(), + }); + } + } + }) + .unwrap(), + ) + } +} diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 4deb5946ab..cefef7a256 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -952,6 +952,11 @@ impl RpcSubscriptions { pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) { self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root })); + self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::CreatedBank { + slot, + parent, + timestamp: timestamp(), + })); } pub fn notify_signatures_received(&self, slot_signatures: (Slot, Vec)) { diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 33af8a9661..99f74036d3 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -2,13 +2,9 @@ use crate::sigverify; use crate::sigverify_stage::SigVerifier; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; -use solana_ledger::shred::{OFFSET_OF_SHRED_SLOT, SIZE_OF_SHRED_SLOT}; +use solana_ledger::shred::Shred; use solana_ledger::sigverify_shreds::verify_shreds_gpu; -use solana_perf::{ - self, - packet::{limited_deserialize, Packets}, - recycler_cache::RecyclerCache, -}; +use solana_perf::{self, packet::Packets, recycler_cache::RecyclerCache}; use solana_runtime::bank_forks::BankForks; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; @@ -38,18 +34,7 @@ impl ShredSigVerifier { fn read_slots(batches: &[Packets]) -> HashSet { batches .iter() - .flat_map(|batch| { - batch.packets.iter().filter_map(|packet| { - let slot_start = OFFSET_OF_SHRED_SLOT; - let slot_end = slot_start + SIZE_OF_SHRED_SLOT; - trace!("slot {} {}", slot_start, slot_end,); - if slot_end <= packet.meta.size { - limited_deserialize(&packet.data[slot_start..slot_end]).ok() - } else { - None - } - }) - }) + .flat_map(|batch| batch.packets.iter().filter_map(Shred::get_slot_from_packet)) .collect() } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c83614dbd2..4b033b2032 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -107,7 +107,7 @@ impl Tvu { tower: Tower, leader_schedule_cache: &Arc, exit: &Arc, - completed_slots_receiver: CompletedSlotsReceiver, + completed_slots_receivers: [CompletedSlotsReceiver; 2], block_commitment_cache: Arc>, cfg: Option>, transaction_status_sender: Option, @@ -168,7 +168,7 @@ impl Tvu { repair_socket, verified_receiver, &exit, - completed_slots_receiver, + completed_slots_receivers, *bank_forks.read().unwrap().working_bank().epoch_schedule(), cfg, tvu_config.shred_version, @@ -356,7 +356,7 @@ pub mod tests { let BlockstoreSignals { blockstore, ledger_signal_receiver, - completed_slots_receiver, + completed_slots_receivers, .. } = Blockstore::open_with_signal(&blockstore_path, None, true) .expect("Expected to successfully open ledger"); @@ -398,7 +398,7 @@ pub mod tests { tower, &leader_schedule_cache, &exit, - completed_slots_receiver, + completed_slots_receivers, block_commitment_cache, None, None, diff --git a/core/src/validator.rs b/core/src/validator.rs index 1f9197705a..08aa18536a 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -376,7 +376,7 @@ impl Validator { bank_forks, blockstore, ledger_signal_receiver, - completed_slots_receiver, + completed_slots_receivers, leader_schedule_cache, snapshot_hash, TransactionHistoryServices { @@ -694,7 +694,7 @@ impl Validator { tower, &leader_schedule_cache, &exit, - completed_slots_receiver, + completed_slots_receivers, block_commitment_cache, config.enable_partition.clone(), transaction_status_sender.clone(), @@ -1010,7 +1010,7 @@ fn new_banks_from_ledger( BankForks, Arc, Receiver, - CompletedSlotsReceiver, + [CompletedSlotsReceiver; 2], LeaderScheduleCache, Option<(Slot, Hash)>, TransactionHistoryServices, @@ -1041,7 +1041,7 @@ fn new_banks_from_ledger( let BlockstoreSignals { mut blockstore, ledger_signal_receiver, - completed_slots_receiver, + completed_slots_receivers, .. } = Blockstore::open_with_signal( ledger_path, @@ -1165,7 +1165,7 @@ fn new_banks_from_ledger( bank_forks, blockstore, ledger_signal_receiver, - completed_slots_receiver, + completed_slots_receivers, leader_schedule_cache, snapshot_hash, transaction_history_services, diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index ab309dc59e..60040a9dc2 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -7,12 +7,13 @@ use serde_json::{json, Value}; use solana_account_decoder::UiAccount; use solana_client::{ rpc_client::RpcClient, - rpc_response::{Response, RpcSignatureResult}, + rpc_response::{Response, RpcSignatureResult, SlotUpdate}, }; use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, test_validator::TestValidator}; use solana_sdk::{ commitment_config::CommitmentConfig, hash::Hash, + pubkey::Pubkey, signature::{Keypair, Signer}, system_transaction, transaction::Transaction, @@ -20,7 +21,7 @@ use solana_sdk::{ use std::{ collections::HashSet, net::UdpSocket, - sync::mpsc::channel, + sync::{mpsc::channel, Arc}, thread::sleep, time::{Duration, Instant}, }; @@ -140,6 +141,70 @@ fn test_rpc_invalid_requests() { assert!(the_value.is_null()); } +#[test] +fn test_rpc_slot_updates() { + solana_logger::setup(); + + let test_validator = TestValidator::with_no_fees(Pubkey::new_unique()); + + // Create the pub sub runtime + let rt = Runtime::new().unwrap(); + let rpc_pubsub_url = test_validator.rpc_pubsub_url(); + let (update_sender, update_receiver) = channel::>(); + + // Subscribe to slot updates + rt.spawn(async move { + let connect = ws::try_connect::(&rpc_pubsub_url).unwrap(); + let client = connect.await.unwrap(); + + tokio_02::spawn(async move { + let mut update_sub = client.slots_updates_subscribe().unwrap(); + loop { + let response = update_sub.next().await.unwrap(); + update_sender.send(response.unwrap()).unwrap(); + } + }); + }); + + let first_update = update_receiver + .recv_timeout(Duration::from_secs(2)) + .unwrap(); + + // Verify that updates are received in order for an upcoming slot + let verify_slot = first_update.slot() + 2; + let mut expected_update_index = 0; + let expected_updates = vec![ + "CreatedBank", + "Completed", + "Frozen", + "OptimisticConfirmation", + "Root", + ]; + + let test_start = Instant::now(); + loop { + assert!(test_start.elapsed() < Duration::from_secs(30)); + let update = update_receiver + .recv_timeout(Duration::from_secs(2)) + .unwrap(); + if update.slot() == verify_slot { + let update_name = match *update { + SlotUpdate::CreatedBank { .. } => "CreatedBank", + SlotUpdate::Completed { .. } => "Completed", + SlotUpdate::Frozen { .. } => "Frozen", + SlotUpdate::OptimisticConfirmation { .. } => "OptimisticConfirmation", + SlotUpdate::Root { .. } => "Root", + _ => continue, + }; + assert_eq!(update_name, expected_updates[expected_update_index]); + expected_update_index += 1; + if expected_update_index == expected_updates.len() { + break; + } + } + } +} + #[test] fn test_rpc_subscriptions() { solana_logger::setup(); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 4ec6321e77..a0d5102bba 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -114,7 +114,7 @@ pub struct CompletedDataSetInfo { pub struct BlockstoreSignals { pub blockstore: Blockstore, pub ledger_signal_receiver: Receiver, - pub completed_slots_receiver: CompletedSlotsReceiver, + pub completed_slots_receivers: [CompletedSlotsReceiver; 2], } // ledger window @@ -378,15 +378,18 @@ impl Blockstore { enforce_ulimit_nofile, )?; let (ledger_signal_sender, ledger_signal_receiver) = sync_channel(1); - let (completed_slots_sender, completed_slots_receiver) = + let (completed_slots_sender1, completed_slots_receiver1) = sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); + let (completed_slots_sender2, completed_slots_receiver2) = + sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); + blockstore.new_shreds_signals = vec![ledger_signal_sender]; - blockstore.completed_slots_senders = vec![completed_slots_sender]; + blockstore.completed_slots_senders = vec![completed_slots_sender1, completed_slots_sender2]; Ok(BlockstoreSignals { blockstore, ledger_signal_receiver, - completed_slots_receiver, + completed_slots_receivers: [completed_slots_receiver1, completed_slots_receiver2], }) } @@ -4302,7 +4305,7 @@ pub mod tests { let ledger_path = get_tmp_ledger_path!(); let BlockstoreSignals { blockstore: ledger, - completed_slots_receiver: recvr, + completed_slots_receivers: [recvr, _], .. } = Blockstore::open_with_signal(&ledger_path, None, true).unwrap(); let ledger = Arc::new(ledger); @@ -4328,7 +4331,7 @@ pub mod tests { let ledger_path = get_tmp_ledger_path!(); let BlockstoreSignals { blockstore: ledger, - completed_slots_receiver: recvr, + completed_slots_receivers: [recvr, _], .. } = Blockstore::open_with_signal(&ledger_path, None, true).unwrap(); let ledger = Arc::new(ledger); @@ -4372,7 +4375,7 @@ pub mod tests { let ledger_path = get_tmp_ledger_path!(); let BlockstoreSignals { blockstore: ledger, - completed_slots_receiver: recvr, + completed_slots_receivers: [recvr, _], .. } = Blockstore::open_with_signal(&ledger_path, None, true).unwrap(); let ledger = Arc::new(ledger); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index c6bfbba9fd..013dc57a5a 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -489,6 +489,18 @@ impl Shred { } } + // Get slot from a shred packet with partial deserialize + pub fn get_slot_from_packet(p: &Packet) -> Option { + let slot_start = OFFSET_OF_SHRED_SLOT; + let slot_end = slot_start + SIZE_OF_SHRED_SLOT; + + if slot_end > p.meta.size { + return None; + } + + limited_deserialize::(&p.data[slot_start..slot_end]).ok() + } + pub fn reference_tick_from_data(data: &[u8]) -> u8 { let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER - size_of::() diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index a0aa2933c4..8b70bd5056 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -754,6 +754,15 @@ pub struct Bank { /// The number of transactions processed without error transaction_count: AtomicU64, + /// The number of transaction errors in this slot + transaction_error_count: AtomicU64, + + /// The number of transaction entries in this slot + transaction_entries_count: AtomicU64, + + /// The max number of transaction in an entry in this slot + transactions_per_entry_max: AtomicU64, + /// Bank tick height tick_height: AtomicU64, @@ -1020,6 +1029,9 @@ impl Bank { capitalization: AtomicU64::new(parent.capitalization()), inflation: parent.inflation.clone(), transaction_count: AtomicU64::new(parent.transaction_count()), + transaction_error_count: AtomicU64::new(0), + transaction_entries_count: AtomicU64::new(0), + transactions_per_entry_max: AtomicU64::new(0), // we will .clone_with_epoch() this soon after stake data update; so just .clone() for now stakes: RwLock::new(parent.stakes.read().unwrap().clone()), epoch_stakes: parent.epoch_stakes.clone(), @@ -1158,6 +1170,9 @@ impl Bank { parent_slot: fields.parent_slot, hard_forks: Arc::new(RwLock::new(fields.hard_forks)), transaction_count: AtomicU64::new(fields.transaction_count), + transaction_error_count: new(), + transaction_entries_count: new(), + transactions_per_entry_max: new(), tick_height: AtomicU64::new(fields.tick_height), signature_count: AtomicU64::new(fields.signature_count), capitalization: AtomicU64::new(fields.capitalization), @@ -3149,6 +3164,16 @@ impl Bank { inc_new_counter_info!("bank-process_transactions-txs", tx_count as usize); inc_new_counter_info!("bank-process_transactions-sigs", signature_count as usize); + if !txs.is_empty() { + let processed_tx_count = txs.len() as u64; + let failed_tx_count = processed_tx_count.saturating_sub(tx_count); + self.transaction_error_count + .fetch_add(failed_tx_count, Relaxed); + self.transaction_entries_count.fetch_add(1, Relaxed); + self.transactions_per_entry_max + .fetch_max(processed_tx_count, Relaxed); + } + if executed .iter() .any(|(res, _nonce_rollback)| Self::can_commit(res)) @@ -4088,6 +4113,18 @@ impl Bank { self.transaction_count.load(Relaxed) } + pub fn transaction_error_count(&self) -> u64 { + self.transaction_error_count.load(Relaxed) + } + + pub fn transaction_entries_count(&self) -> u64 { + self.transaction_entries_count.load(Relaxed) + } + + pub fn transactions_per_entry_max(&self) -> u64 { + self.transactions_per_entry_max.load(Relaxed) + } + fn increment_transaction_count(&self, tx_count: u64) { self.transaction_count.fetch_add(tx_count, Relaxed); }