diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 5a57de4bbc..e2698444f1 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -40,6 +40,7 @@ use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ cmp::min, net::SocketAddr, + str::FromStr, sync::RwLock, thread::sleep, time::{Duration, Instant}, @@ -405,6 +406,24 @@ impl RpcClient { ) } + pub fn get_slot_leaders(&self, start_slot: Slot, limit: u64) -> ClientResult> { + self.send(RpcRequest::GetSlotLeaders, json!([start_slot, limit])) + .and_then(|slot_leaders: Vec| { + slot_leaders + .iter() + .map(|slot_leader| { + Pubkey::from_str(slot_leader).map_err(|err| { + ClientErrorKind::Custom(format!( + "pubkey deserialization failed: {}", + err + )) + .into() + }) + }) + .collect() + }) + } + pub fn supply(&self) -> RpcResult { self.supply_with_commitment(self.commitment_config) } diff --git a/client/src/rpc_request.rs b/client/src/rpc_request.rs index 5a73e427fb..4b6f0717be 100644 --- a/client/src/rpc_request.rs +++ b/client/src/rpc_request.rs @@ -39,6 +39,7 @@ pub enum RpcRequest { GetSignatureStatuses, GetSlot, GetSlotLeader, + GetSlotLeaders, GetStorageTurn, GetStorageTurnRate, GetSlotsPerSegment, @@ -96,6 +97,7 @@ impl fmt::Display for RpcRequest { RpcRequest::GetSignatureStatuses => "getSignatureStatuses", RpcRequest::GetSlot => "getSlot", RpcRequest::GetSlotLeader => "getSlotLeader", + RpcRequest::GetSlotLeaders => "getSlotLeaders", RpcRequest::GetStorageTurn => "getStorageTurn", RpcRequest::GetStorageTurnRate => "getStorageTurnRate", RpcRequest::GetSlotsPerSegment => "getSlotsPerSegment", @@ -128,6 +130,7 @@ pub const MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT: usize = 1_000; pub const MAX_MULTIPLE_ACCOUNTS: usize = 100; pub const NUM_LARGEST_ACCOUNTS: usize = 20; pub const MAX_GET_PROGRAM_ACCOUNT_FILTERS: usize = 4; +pub const MAX_GET_SLOT_LEADERS: usize = 5000; // Validators that are this number of slots behind are considered delinquent pub const DELINQUENT_VALIDATOR_SLOT_DISTANCE: u64 = 128; diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 64cf8a29d0..63e3c450b8 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -30,13 +30,17 @@ use solana_client::{ TokenAccountsFilter, DELINQUENT_VALIDATOR_SLOT_DISTANCE, MAX_GET_CONFIRMED_BLOCKS_RANGE, MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT, MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS_SLOT_RANGE, MAX_GET_PROGRAM_ACCOUNT_FILTERS, - MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, MAX_MULTIPLE_ACCOUNTS, NUM_LARGEST_ACCOUNTS, + MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, MAX_GET_SLOT_LEADERS, MAX_MULTIPLE_ACCOUNTS, + NUM_LARGEST_ACCOUNTS, }, rpc_response::Response as RpcResponse, rpc_response::*, }; use solana_faucet::faucet::request_airdrop_transaction; -use solana_ledger::{blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path}; +use solana_ledger::{ + blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path, + leader_schedule_cache::LeaderScheduleCache, +}; use solana_metrics::inc_new_counter_info; use solana_perf::packet::PACKET_DATA_SIZE; use solana_runtime::{ @@ -139,6 +143,7 @@ pub struct JsonRpcRequestProcessor { optimistically_confirmed_bank: Arc>, largest_accounts_cache: Arc>, max_slots: Arc, + leader_schedule_cache: Arc, } impl Metadata for JsonRpcRequestProcessor {} @@ -223,6 +228,7 @@ impl JsonRpcRequestProcessor { optimistically_confirmed_bank: Arc>, largest_accounts_cache: Arc>, max_slots: Arc, + leader_schedule_cache: Arc, ) -> (Self, Receiver) { let (sender, receiver) = channel(); ( @@ -242,6 +248,7 @@ impl JsonRpcRequestProcessor { optimistically_confirmed_bank, largest_accounts_cache, max_slots, + leader_schedule_cache, }, receiver, ) @@ -283,6 +290,7 @@ impl JsonRpcRequestProcessor { })), largest_accounts_cache: Arc::new(RwLock::new(LargestAccountsCache::new(30))), max_slots: Arc::new(MaxSlots::default()), + leader_schedule_cache: Arc::new(LeaderScheduleCache::new_from_bank(bank)), } } @@ -1971,29 +1979,14 @@ pub mod rpc_minimal { debug!("get_leader_schedule rpc request received: {:?}", slot); - Ok( - solana_ledger::leader_schedule_utils::leader_schedule(epoch, &bank).map( - |leader_schedule| { - let mut leader_schedule_by_identity = HashMap::new(); - - for (slot_index, identity_pubkey) in - leader_schedule.get_slot_leaders().iter().enumerate() - { - leader_schedule_by_identity - .entry(identity_pubkey) - .or_insert_with(Vec::new) - .push(slot_index); - } - - leader_schedule_by_identity - .into_iter() - .map(|(identity_pubkey, slot_indices)| { - (identity_pubkey.to_string(), slot_indices) - }) - .collect() - }, - ), - ) + Ok(meta + .leader_schedule_cache + .get_epoch_leader_schedule(epoch) + .map(|leader_schedule| { + solana_ledger::leader_schedule_utils::leader_schedule_by_identity( + leader_schedule.get_slot_leaders().iter().enumerate(), + ) + })) } } } @@ -2194,6 +2187,14 @@ pub mod rpc_full { commitment: Option, ) -> Result; + #[rpc(meta, name = "getSlotLeaders")] + fn get_slot_leaders( + &self, + meta: Self::Metadata, + start_slot: Slot, + end_slot: Slot, + ) -> Result>; + #[rpc(meta, name = "minimumLedgerSlot")] fn minimum_ledger_slot(&self, meta: Self::Metadata) -> Result; @@ -2801,6 +2802,56 @@ pub mod rpc_full { Ok(meta.get_slot_leader(commitment)) } + fn get_slot_leaders( + &self, + meta: Self::Metadata, + start_slot: Slot, + limit: u64, + ) -> Result> { + debug!( + "get_slot_leaders rpc request received (start: {} limit: {})", + start_slot, limit + ); + + let limit = limit as usize; + if limit > MAX_GET_SLOT_LEADERS { + return Err(Error::invalid_params(format!( + "Invalid limit; max {}", + MAX_GET_SLOT_LEADERS + ))); + } + + let bank = meta.bank(None); + let (mut epoch, mut slot_index) = + bank.epoch_schedule().get_epoch_and_slot_index(start_slot); + + let mut slot_leaders = Vec::with_capacity(limit); + while slot_leaders.len() < limit { + if let Some(leader_schedule) = + meta.leader_schedule_cache.get_epoch_leader_schedule(epoch) + { + slot_leaders.extend( + leader_schedule + .get_slot_leaders() + .iter() + .skip(slot_index as usize) + .take(limit.saturating_sub(slot_leaders.len())) + .map(|pubkey| pubkey.to_string()), + ); + } else { + return Err(Error::invalid_params(format!( + "Invalid slot range: leader schedule for epoch {} is unavailable", + epoch + ))); + } + + epoch += 1; + slot_index = 0; + } + + Ok(slot_leaders) + } + fn minimum_ledger_slot(&self, meta: Self::Metadata) -> Result { debug!("minimum_ledger_slot rpc request received"); meta.minimum_ledger_slot() @@ -3314,6 +3365,7 @@ pub mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), Arc::new(RwLock::new(LargestAccountsCache::new(30))), max_slots, + Arc::new(LeaderScheduleCache::new_from_bank(&bank)), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); @@ -3819,6 +3871,62 @@ pub mod tests { assert_eq!(schedule, None); } + #[test] + fn test_rpc_get_slot_leaders() { + let bob_pubkey = solana_sdk::pubkey::new_rand(); + let RpcHandler { io, meta, bank, .. } = start_rpc_handler_with_tx(&bob_pubkey); + + // Test that slot leaders will be returned across epochs + let query_start = 0; + let query_limit = 2 * bank.epoch_schedule().slots_per_epoch; + + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"getSlotLeaders", "params": [{}, {}]}}"#, + query_start, query_limit + ); + let rep = io.handle_request_sync(&req, meta.clone()); + let res: Response = serde_json::from_str(&rep.expect("actual response")) + .expect("actual response deserialization"); + + let slot_leaders: Vec = if let Response::Single(res) = res { + if let Output::Success(res) = res { + serde_json::from_value(res.result).unwrap() + } else { + panic!("Expected success for {} but received: {:?}", req, res); + } + } else { + panic!("Expected single response"); + }; + + assert_eq!(slot_leaders.len(), query_limit as usize); + + // Test that invalid limit returns an error + let query_start = 0; + let query_limit = 5001; + + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"getSlotLeaders", "params": [{}, {}]}}"#, + query_start, query_limit + ); + let rep = io.handle_request_sync(&req, meta.clone()); + let res: Value = serde_json::from_str(&rep.expect("actual response")) + .expect("actual response deserialization"); + assert!(res.get("error").is_some()); + + // Test that invalid epoch returns an error + let query_start = 2 * bank.epoch_schedule().slots_per_epoch; + let query_limit = 10; + + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"getSlotLeaders", "params": [{}, {}]}}"#, + query_start, query_limit + ); + let rep = io.handle_request_sync(&req, meta); + let res: Value = serde_json::from_str(&rep.expect("actual response")) + .expect("actual response deserialization"); + assert!(res.get("error").is_some()); + } + #[test] fn test_rpc_get_account_info() { let bob_pubkey = solana_sdk::pubkey::new_rand(); @@ -4723,6 +4831,7 @@ pub mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), Arc::new(RwLock::new(LargestAccountsCache::new(30))), Arc::new(MaxSlots::default()), + Arc::new(LeaderScheduleCache::default()), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); @@ -4998,6 +5107,7 @@ pub mod tests { OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), Arc::new(RwLock::new(LargestAccountsCache::new(30))), Arc::new(MaxSlots::default()), + Arc::new(LeaderScheduleCache::default()), ); SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1); assert_eq!( @@ -6287,6 +6397,7 @@ pub mod tests { optimistically_confirmed_bank.clone(), Arc::new(RwLock::new(LargestAccountsCache::new(30))), Arc::new(MaxSlots::default()), + Arc::new(LeaderScheduleCache::default()), ); let mut io = MetaIoHandler::default(); diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 86d0302933..2a3a21059a 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -18,7 +18,7 @@ use jsonrpc_http_server::{ }; use regex::Regex; use solana_client::rpc_cache::LargestAccountsCache; -use solana_ledger::blockstore::Blockstore; +use solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}; use solana_metrics::inc_new_counter_info; use solana_runtime::{ bank_forks::{BankForks, SnapshotConfig}, @@ -275,6 +275,7 @@ impl JsonRpcService { send_transaction_retry_ms: u64, send_transaction_leader_forward_count: u64, max_slots: Arc, + leader_schedule_cache: Arc, ) -> Self { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); @@ -354,6 +355,7 @@ impl JsonRpcService { optimistically_confirmed_bank, largest_accounts_cache, max_slots, + leader_schedule_cache, ); let leader_info = @@ -518,6 +520,7 @@ mod tests { 1000, 1, Arc::new(MaxSlots::default()), + Arc::new(LeaderScheduleCache::default()), ); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); diff --git a/core/src/validator.rs b/core/src/validator.rs index cbebfd52d3..f75b5897a8 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -542,6 +542,7 @@ impl Validator { config.send_transaction_retry_ms, config.send_transaction_leader_forward_count, max_slots.clone(), + leader_schedule_cache.clone(), )), if config.rpc_config.minimal_api { None diff --git a/docs/src/developing/clients/jsonrpc-api.md b/docs/src/developing/clients/jsonrpc-api.md index 6786d09951..d3f80b0ad9 100644 --- a/docs/src/developing/clients/jsonrpc-api.md +++ b/docs/src/developing/clients/jsonrpc-api.md @@ -52,6 +52,7 @@ gives a convenient interface for the RPC methods. - [getSignatureStatuses](jsonrpc-api.md#getsignaturestatuses) - [getSlot](jsonrpc-api.md#getslot) - [getSlotLeader](jsonrpc-api.md#getslotleader) +- [getSlotLeaders](jsonrpc-api.md#getslotleaders) - [getStakeActivation](jsonrpc-api.md#getstakeactivation) - [getSupply](jsonrpc-api.md#getsupply) - [getTokenAccountBalance](jsonrpc-api.md#gettokenaccountbalance) @@ -2252,6 +2253,53 @@ Result: {"jsonrpc":"2.0","result":"ENvAW7JScgYq6o4zKZwewtkzzJgDzuJAFxYasvmEQdpS","id":1} ``` +### getSlotLeaders + +Returns the slot leaders for a given slot range + +#### Parameters: + +- `` - Start slot, as u64 integer +- `` - Limit, as u64 integer + +#### Results: + +- `>` - Node identity public keys as base-58 encoded strings + +#### Example: + +If the current slot is #99, query the next 10 leaders with the following request: + +Request: +```bash +curl http://localhost:8899 -X POST -H "Content-Type: application/json" -d ' + {"jsonrpc":"2.0","id":1, "method":"getSlotLeaders", "params":[100, 10]} +' +``` + +Result: + +The first leader returned is the leader for slot #100: + +```json +{ + "jsonrpc": "2.0", + "result": [ + "ChorusmmK7i1AxXeiTtQgQZhQNiXYU84ULeaYF1EH15n", + "ChorusmmK7i1AxXeiTtQgQZhQNiXYU84ULeaYF1EH15n", + "ChorusmmK7i1AxXeiTtQgQZhQNiXYU84ULeaYF1EH15n", + "ChorusmmK7i1AxXeiTtQgQZhQNiXYU84ULeaYF1EH15n", + "Awes4Tr6TX8JDzEhCZY2QVNimT6iD1zWHzf1vNyGvpLM", + "Awes4Tr6TX8JDzEhCZY2QVNimT6iD1zWHzf1vNyGvpLM", + "Awes4Tr6TX8JDzEhCZY2QVNimT6iD1zWHzf1vNyGvpLM", + "Awes4Tr6TX8JDzEhCZY2QVNimT6iD1zWHzf1vNyGvpLM", + "DWvDTSh3qfn88UoQTEKRV2JnLt5jtJAVoiCo3ivtMwXP", + "DWvDTSh3qfn88UoQTEKRV2JnLt5jtJAVoiCo3ivtMwXP" + ], + "id": 1 +} +``` + ### getStakeActivation Returns epoch activation information for a stake account diff --git a/ledger/src/leader_schedule_cache.rs b/ledger/src/leader_schedule_cache.rs index 5752750c29..d82d2f9014 100644 --- a/ledger/src/leader_schedule_cache.rs +++ b/ledger/src/leader_schedule_cache.rs @@ -195,6 +195,10 @@ impl LeaderScheduleCache { } } + pub fn get_epoch_leader_schedule(&self, epoch: Epoch) -> Option> { + self.cached_schedules.read().unwrap().0.get(&epoch).cloned() + } + fn get_epoch_schedule_else_compute( &self, epoch: Epoch, @@ -205,8 +209,7 @@ impl LeaderScheduleCache { return Some(fixed_schedule.leader_schedule.clone()); } } - let epoch_schedule = self.cached_schedules.read().unwrap().0.get(&epoch).cloned(); - + let epoch_schedule = self.get_epoch_leader_schedule(epoch); if epoch_schedule.is_some() { epoch_schedule } else if let Some(epoch_schedule) = self.compute_epoch_schedule(epoch, bank) { diff --git a/ledger/src/leader_schedule_utils.rs b/ledger/src/leader_schedule_utils.rs index 2789333bc7..47df872428 100644 --- a/ledger/src/leader_schedule_utils.rs +++ b/ledger/src/leader_schedule_utils.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use crate::leader_schedule::LeaderSchedule; use solana_runtime::bank::Bank; use solana_sdk::{ @@ -21,6 +23,27 @@ pub fn leader_schedule(epoch: Epoch, bank: &Bank) -> Option { }) } +/// Map of leader base58 identity pubkeys to the slot indices relative to the first epoch slot +pub type LeaderScheduleByIdentity = HashMap>; + +pub fn leader_schedule_by_identity<'a>( + upcoming_leaders: impl Iterator, +) -> LeaderScheduleByIdentity { + let mut leader_schedule_by_identity = HashMap::new(); + + for (slot_index, identity_pubkey) in upcoming_leaders { + leader_schedule_by_identity + .entry(identity_pubkey) + .or_insert_with(Vec::new) + .push(slot_index); + } + + leader_schedule_by_identity + .into_iter() + .map(|(identity_pubkey, slot_indices)| (identity_pubkey.to_string(), slot_indices)) + .collect() +} + /// Return the leader for the given slot. pub fn slot_leader_at(slot: Slot, bank: &Bank) -> Option { let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot);