Create solana-poh and move remaining rpc modules to solana-rpc (#17698)

* Create solana-poh crate

* Move BigTableUploadService to solana-ledger

* Add solana-rpc to workspace

* Move dependencies to solana-rpc

* Move remaining rpc modules to solana-rpc

* Single use statement solana-poh

* Single use statement solana-rpc
This commit is contained in:
Tyera Eulberg
2021-06-04 09:23:06 -06:00
committed by GitHub
parent f97ce2cd7e
commit 544b3c0d17
41 changed files with 663 additions and 518 deletions

View File

@ -5,18 +5,13 @@ use crate::{
cost_model::{CostModel, ACCOUNT_MAX_COST, BLOCK_MAX_COST},
cost_tracker::CostTracker,
packet_hasher::PacketHasher,
poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry},
poh_service::{self, PohService},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use lru::LruCache;
use retain_mut::RetainMut;
use solana_gossip::cluster_info::ClusterInfo;
use solana_ledger::{
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
entry::hash_transactions, leader_schedule_cache::LeaderScheduleCache,
};
use solana_ledger::{blockstore_processor::TransactionStatusSender, entry::hash_transactions};
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info};
use solana_perf::{
@ -24,6 +19,7 @@ use solana_perf::{
packet::{limited_deserialize, Packet, Packets, PACKETS_PER_BATCH},
perf_libs,
};
use solana_poh::poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder};
use solana_runtime::{
accounts_db::ErrorCounters,
bank::{
@ -41,7 +37,6 @@ use solana_sdk::{
MAX_TRANSACTION_FORWARDING_DELAY_GPU,
},
message::Message,
poh_config::PohConfig,
pubkey::Pubkey,
short_vec::decode_shortu16_len,
signature::Signature,
@ -59,8 +54,7 @@ use std::{
mem::size_of,
net::UdpSocket,
ops::DerefMut,
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
sync::mpsc::Receiver,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex},
thread::{self, Builder, JoinHandle},
time::Duration,
@ -1518,66 +1512,29 @@ fn next_leader_tpu_forwards(
}
}
pub fn create_test_recorder(
bank: &Arc<Bank>,
blockstore: &Arc<Blockstore>,
poh_config: Option<PohConfig>,
) -> (
Arc<AtomicBool>,
Arc<Mutex<PohRecorder>>,
PohService,
Receiver<WorkingBankEntry>,
) {
let exit = Arc::new(AtomicBool::new(false));
let poh_config = Arc::new(poh_config.unwrap_or_default());
let (mut poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
Some((4, 4)),
bank.ticks_per_slot(),
&Pubkey::default(),
blockstore,
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&poh_config,
exit.clone(),
);
poh_recorder.set_bank(&bank);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(
poh_recorder.clone(),
&poh_config,
&exit,
bank.ticks_per_slot(),
poh_service::DEFAULT_PINNED_CPU_CORE,
poh_service::DEFAULT_HASHES_PER_BATCH,
record_receiver,
);
(exit, poh_recorder, poh_service, entry_receiver)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
poh_recorder::Record, poh_recorder::WorkingBank,
transaction_status_service::TransactionStatusService,
};
use crossbeam_channel::unbounded;
use itertools::Itertools;
use solana_gossip::cluster_info::Node;
use solana_ledger::{
blockstore::entries_to_test_shreds,
blockstore::{entries_to_test_shreds, Blockstore},
entry::{next_entry, Entry, EntrySlice},
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
leader_schedule_cache::LeaderScheduleCache,
};
use solana_perf::packet::to_packets_chunked;
use solana_poh::{
poh_recorder::{create_test_recorder, Record, WorkingBank, WorkingBankEntry},
poh_service::PohService,
};
use solana_rpc::transaction_status_service::TransactionStatusService;
use solana_sdk::{
hash::Hash,
instruction::InstructionError,
poh_config::PohConfig,
signature::{Keypair, Signer},
system_instruction::SystemError,
system_transaction,
@ -1587,7 +1544,10 @@ mod tests {
use std::{
net::SocketAddr,
path::Path,
sync::atomic::{AtomicBool, Ordering},
sync::{
atomic::{AtomicBool, Ordering},
mpsc::Receiver,
},
thread::sleep,
};

View File

@ -1,94 +0,0 @@
use solana_ledger::blockstore::Blockstore;
use solana_runtime::commitment::BlockCommitmentCache;
use std::{
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::runtime::Runtime;
// Delay uploading the largest confirmed root for this many slots. This is done in an attempt to
// ensure that the `CacheBlockMetaService` has had enough time to add the block time for the root
// before it's uploaded to BigTable.
//
// A more direct connection between CacheBlockMetaService and BigTableUploadService would be
// preferable...
const LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY: usize = 100;
pub struct BigTableUploadService {
thread: JoinHandle<()>,
}
impl BigTableUploadService {
pub fn new(
runtime: Arc<Runtime>,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) -> Self {
info!("Starting BigTable upload service");
let thread = Builder::new()
.name("bigtable-upload".to_string())
.spawn(move || {
Self::run(
runtime,
bigtable_ledger_storage,
blockstore,
block_commitment_cache,
exit,
)
})
.unwrap();
Self { thread }
}
fn run(
runtime: Arc<Runtime>,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) {
let mut start_slot = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let end_slot = block_commitment_cache
.read()
.unwrap()
.highest_confirmed_root()
.saturating_sub(LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY as u64);
if end_slot <= start_slot {
std::thread::sleep(std::time::Duration::from_secs(1));
continue;
}
let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks(
blockstore.clone(),
bigtable_ledger_storage.clone(),
start_slot,
Some(end_slot),
true,
false,
exit.clone(),
));
match result {
Ok(()) => start_slot = end_slot,
Err(err) => {
warn!("bigtable: upload_confirmed_blocks: {}", err);
std::thread::sleep(std::time::Duration::from_secs(2));
}
}
}
}
pub fn join(self) -> thread::Result<()> {
self.thread.join()
}
}

View File

@ -5,10 +5,7 @@ use self::{
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
standard_broadcast_run::StandardBroadcastRun,
};
use crate::{
poh_recorder::WorkingBankEntry,
result::{Error, Result},
};
use crate::result::{Error, Result};
use crossbeam_channel::{
Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError,
Sender as CrossbeamSender,
@ -22,6 +19,7 @@ use solana_gossip::{
use solana_ledger::{blockstore::Blockstore, shred::Shred};
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
use solana_poh::poh_recorder::WorkingBankEntry;
use solana_runtime::bank::Bank;
use solana_sdk::timing::timestamp;
use solana_sdk::{clock::Slot, pubkey::Pubkey};

View File

@ -1,6 +1,6 @@
use crate::poh_recorder::WorkingBankEntry;
use crate::result::Result;
use solana_ledger::{entry::Entry, shred::Shred};
use solana_poh::poh_recorder::WorkingBankEntry;
use solana_runtime::bank::Bank;
use solana_sdk::clock::Slot;
use std::{

View File

@ -1,6 +1,5 @@
use crate::{
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
poh_recorder::PohRecorder,
replay_stage::DUPLICATE_THRESHOLD,
result::{Error, Result},
sigverify,
@ -20,6 +19,7 @@ use solana_gossip::{
use solana_ledger::blockstore::Blockstore;
use solana_metrics::inc_new_counter_debug;
use solana_perf::packet::{self, Packets};
use solana_poh::poh_recorder::PohRecorder;
use solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
rpc_subscriptions::RpcSubscriptions,

View File

@ -1,11 +1,11 @@
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel.
use crate::banking_stage::HOLD_TRANSACTIONS_SLOT_OFFSET;
use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
use solana_metrics::{inc_new_counter_debug, inc_new_counter_info};
use solana_perf::packet::PacketsRecycler;
use solana_perf::recycler::Recycler;
use solana_poh::poh_recorder::PohRecorder;
use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT;
use solana_streamer::streamer::{self, PacketReceiver, PacketSender};
use std::net::UdpSocket;

View File

@ -9,7 +9,6 @@
pub mod accounts_hash_verifier;
pub mod banking_stage;
pub mod bigtable_upload_service;
pub mod broadcast_stage;
pub mod cache_block_meta_service;
pub mod cluster_info_vote_listener;
@ -30,8 +29,6 @@ pub mod ledger_cleanup_service;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_hasher;
pub mod poh_recorder;
pub mod poh_service;
pub mod progress_map;
pub mod repair_response;
pub mod repair_service;
@ -42,11 +39,7 @@ pub mod request_response;
mod result;
pub mod retransmit_stage;
pub mod rewards_recorder_service;
pub mod rpc;
pub mod rpc_health;
pub mod rpc_service;
pub mod sample_performance_service;
pub mod send_transaction_service;
pub mod serve_repair;
pub mod serve_repair_service;
pub mod shred_fetch_stage;
@ -56,7 +49,6 @@ pub mod sigverify_stage;
pub mod snapshot_packager_service;
pub mod test_validator;
pub mod tpu;
pub mod transaction_status_service;
pub mod tree_diff;
pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes;
@ -71,10 +63,6 @@ extern crate log;
#[macro_use]
extern crate serde_derive;
#[cfg(test)]
#[macro_use]
extern crate serde_json;
#[macro_use]
extern crate solana_metrics;

File diff suppressed because it is too large Load Diff

View File

@ -1,543 +0,0 @@
//! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream
use crate::poh_recorder::{PohRecorder, Record};
use crossbeam_channel::Receiver;
use solana_ledger::poh::Poh;
use solana_measure::measure::Measure;
use solana_sdk::poh_config::PohConfig;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::{Duration, Instant};
pub struct PohService {
tick_producer: JoinHandle<()>,
}
// Number of hashes to batch together.
// * If this number is too small, PoH hash rate will suffer.
// * The larger this number is from 1, the speed of recording transactions will suffer due to lock
// contention with the PoH hashing within `tick_producer()`.
//
// Can use test_poh_service to calibrate this
pub const DEFAULT_HASHES_PER_BATCH: u64 = 64;
pub const DEFAULT_PINNED_CPU_CORE: usize = 0;
const TARGET_SLOT_ADJUSTMENT_NS: u64 = 50_000_000;
#[derive(Debug)]
struct PohTiming {
num_ticks: u64,
num_hashes: u64,
total_sleep_us: u64,
total_lock_time_ns: u64,
total_hash_time_ns: u64,
total_tick_time_ns: u64,
last_metric: Instant,
total_record_time_us: u64,
}
impl PohTiming {
fn new() -> Self {
Self {
num_ticks: 0,
num_hashes: 0,
total_sleep_us: 0,
total_lock_time_ns: 0,
total_hash_time_ns: 0,
total_tick_time_ns: 0,
last_metric: Instant::now(),
total_record_time_us: 0,
}
}
fn report(&mut self, ticks_per_slot: u64) {
if self.last_metric.elapsed().as_millis() > 1000 {
let elapsed_us = self.last_metric.elapsed().as_micros() as u64;
let us_per_slot = (elapsed_us * ticks_per_slot) / self.num_ticks;
datapoint_info!(
"poh-service",
("ticks", self.num_ticks as i64, i64),
("hashes", self.num_hashes as i64, i64),
("elapsed_us", us_per_slot, i64),
("total_sleep_us", self.total_sleep_us, i64),
("total_tick_time_us", self.total_tick_time_ns / 1000, i64),
("total_lock_time_us", self.total_lock_time_ns / 1000, i64),
("total_hash_time_us", self.total_hash_time_ns / 1000, i64),
("total_record_time_us", self.total_record_time_us, i64),
);
self.total_sleep_us = 0;
self.num_ticks = 0;
self.num_hashes = 0;
self.total_tick_time_ns = 0;
self.total_lock_time_ns = 0;
self.total_hash_time_ns = 0;
self.last_metric = Instant::now();
self.total_record_time_us = 0;
}
}
}
impl PohService {
pub fn new(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_config: &Arc<PohConfig>,
poh_exit: &Arc<AtomicBool>,
ticks_per_slot: u64,
pinned_cpu_core: usize,
hashes_per_batch: u64,
record_receiver: Receiver<Record>,
) -> Self {
let poh_exit_ = poh_exit.clone();
let poh_config = poh_config.clone();
let tick_producer = Builder::new()
.name("solana-poh-service-tick_producer".to_string())
.spawn(move || {
solana_sys_tuner::request_realtime_poh();
if poh_config.hashes_per_tick.is_none() {
if poh_config.target_tick_count.is_none() {
Self::sleepy_tick_producer(
poh_recorder,
&poh_config,
&poh_exit_,
record_receiver,
);
} else {
Self::short_lived_sleepy_tick_producer(
poh_recorder,
&poh_config,
&poh_exit_,
record_receiver,
);
}
} else {
// PoH service runs in a tight loop, generating hashes as fast as possible.
// Let's dedicate one of the CPU cores to this thread so that it can gain
// from cache performance.
if let Some(cores) = core_affinity::get_core_ids() {
core_affinity::set_for_current(cores[pinned_cpu_core]);
}
Self::tick_producer(
poh_recorder,
&poh_exit_,
ticks_per_slot,
hashes_per_batch,
record_receiver,
Self::target_ns_per_tick(
ticks_per_slot,
poh_config.target_tick_duration.as_nanos() as u64,
),
);
}
poh_exit_.store(true, Ordering::Relaxed);
})
.unwrap();
Self { tick_producer }
}
pub fn target_ns_per_tick(ticks_per_slot: u64, target_tick_duration_ns: u64) -> u64 {
// Account for some extra time outside of PoH generation to account
// for processing time outside PoH.
let adjustment_per_tick = if ticks_per_slot > 0 {
TARGET_SLOT_ADJUSTMENT_NS / ticks_per_slot
} else {
0
};
target_tick_duration_ns.saturating_sub(adjustment_per_tick)
}
fn sleepy_tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
record_receiver: Receiver<Record>,
) {
while !poh_exit.load(Ordering::Relaxed) {
Self::read_record_receiver_and_process(
&poh_recorder,
&record_receiver,
Duration::from_millis(0),
);
sleep(poh_config.target_tick_duration);
poh_recorder.lock().unwrap().tick();
}
}
pub fn read_record_receiver_and_process(
poh_recorder: &Arc<Mutex<PohRecorder>>,
record_receiver: &Receiver<Record>,
timeout: Duration,
) {
let record = record_receiver.recv_timeout(timeout);
if let Ok(record) = record {
if record
.sender
.send(poh_recorder.lock().unwrap().record(
record.slot,
record.mixin,
record.transactions,
))
.is_err()
{
panic!("Error returning mixin hash");
}
}
}
fn short_lived_sleepy_tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_config: &PohConfig,
poh_exit: &AtomicBool,
record_receiver: Receiver<Record>,
) {
let mut warned = false;
for _ in 0..poh_config.target_tick_count.unwrap() {
Self::read_record_receiver_and_process(
&poh_recorder,
&record_receiver,
Duration::from_millis(0),
);
sleep(poh_config.target_tick_duration);
poh_recorder.lock().unwrap().tick();
if poh_exit.load(Ordering::Relaxed) && !warned {
warned = true;
warn!("exit signal is ignored because PohService is scheduled to exit soon");
}
}
}
// returns true if we need to tick
fn record_or_hash(
next_record: &mut Option<Record>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
timing: &mut PohTiming,
record_receiver: &Receiver<Record>,
hashes_per_batch: u64,
poh: &Arc<Mutex<Poh>>,
target_ns_per_tick: u64,
) -> bool {
match next_record.take() {
Some(mut record) => {
// received message to record
// so, record for as long as we have queued up record requests
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
let mut record_time = Measure::start("record");
loop {
let res = poh_recorder_l.record(
record.slot,
record.mixin,
std::mem::take(&mut record.transactions),
);
let _ = record.sender.send(res); // what do we do on failure here? Ignore for now.
timing.num_hashes += 1; // note: may have also ticked inside record
let new_record_result = record_receiver.try_recv();
match new_record_result {
Ok(new_record) => {
// we already have second request to record, so record again while we still have the mutex
record = new_record;
}
Err(_) => {
break;
}
}
}
record_time.stop();
timing.total_record_time_us += record_time.as_us();
// PohRecorder.record would have ticked if it needed to, so should_tick will be false
}
None => {
// did not receive instructions to record, so hash until we notice we've been asked to record (or we need to tick) and then remember what to record
let mut lock_time = Measure::start("lock");
let mut poh_l = poh.lock().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
loop {
timing.num_hashes += hashes_per_batch;
let mut hash_time = Measure::start("hash");
let should_tick = poh_l.hash(hashes_per_batch);
let ideal_time = poh_l.target_poh_time(target_ns_per_tick);
hash_time.stop();
timing.total_hash_time_ns += hash_time.as_ns();
if should_tick {
// nothing else can be done. tick required.
return true;
}
// check to see if a record request has been sent
if let Ok(record) = record_receiver.try_recv() {
// remember the record we just received as the next record to occur
*next_record = Some(record);
break;
}
// check to see if we need to wait to catch up to ideal
let wait_start = Instant::now();
if ideal_time <= wait_start {
// no, keep hashing. We still hold the lock.
continue;
}
// busy wait, polling for new records and after dropping poh lock (reset can occur, for example)
drop(poh_l);
while ideal_time > Instant::now() {
// check to see if a record request has been sent
if let Ok(record) = record_receiver.try_recv() {
// remember the record we just received as the next record to occur
*next_record = Some(record);
break;
}
}
timing.total_sleep_us += wait_start.elapsed().as_micros() as u64;
break;
}
}
};
false // should_tick = false for all code that reaches here
}
fn tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_exit: &AtomicBool,
ticks_per_slot: u64,
hashes_per_batch: u64,
record_receiver: Receiver<Record>,
target_ns_per_tick: u64,
) {
let poh = poh_recorder.lock().unwrap().poh.clone();
let mut timing = PohTiming::new();
let mut next_record = None;
loop {
let should_tick = Self::record_or_hash(
&mut next_record,
&poh_recorder,
&mut timing,
&record_receiver,
hashes_per_batch,
&poh,
target_ns_per_tick,
);
if should_tick {
// Lock PohRecorder only for the final hash. record_or_hash will lock PohRecorder for record calls but not for hashing.
{
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
let mut tick_time = Measure::start("tick");
poh_recorder_l.tick();
tick_time.stop();
timing.total_tick_time_ns += tick_time.as_ns();
}
timing.num_ticks += 1;
timing.report(ticks_per_slot);
if poh_exit.load(Ordering::Relaxed) {
break;
}
}
}
}
pub fn join(self) -> thread::Result<()> {
self.tick_producer.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::poh_recorder::WorkingBank;
use rand::{thread_rng, Rng};
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
use solana_measure::measure::Measure;
use solana_perf::test_tx::test_tx;
use solana_runtime::bank::Bank;
use solana_sdk::clock;
use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing;
use std::time::Duration;
#[test]
#[ignore]
fn test_poh_service() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new_no_wallclock_throttle(&genesis_config));
let prev_hash = bank.last_blockhash();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let default_target_tick_duration =
timing::duration_as_us(&PohConfig::default().target_tick_duration);
let target_tick_duration = Duration::from_micros(default_target_tick_duration);
let poh_config = Arc::new(PohConfig {
hashes_per_tick: Some(clock::DEFAULT_HASHES_PER_TICK),
target_tick_duration,
target_tick_count: None,
});
let exit = Arc::new(AtomicBool::new(false));
let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
prev_hash,
bank.slot(),
Some((4, 4)),
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&poh_config,
exit.clone(),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let start = Arc::new(Instant::now());
let working_bank = WorkingBank {
bank: bank.clone(),
start,
min_tick_height: bank.tick_height(),
max_tick_height: std::u64::MAX,
};
let ticks_per_slot = bank.ticks_per_slot();
// specify RUN_TIME to run in a benchmark-like mode
// to calibrate batch size
let run_time = std::env::var("RUN_TIME")
.map(|x| x.parse().unwrap())
.unwrap_or(0);
let is_test_run = run_time == 0;
let entry_producer = {
let poh_recorder = poh_recorder.clone();
let exit = exit.clone();
Builder::new()
.name("solana-poh-service-entry_producer".to_string())
.spawn(move || {
let now = Instant::now();
let mut total_us = 0;
let mut total_times = 0;
let h1 = hash(b"hello world!");
let tx = test_tx();
loop {
// send some data
let mut time = Measure::start("record");
let _ = poh_recorder.lock().unwrap().record(
bank.slot(),
h1,
vec![tx.clone()],
);
time.stop();
total_us += time.as_us();
total_times += 1;
if is_test_run && thread_rng().gen_ratio(1, 4) {
sleep(Duration::from_millis(200));
}
if exit.load(Ordering::Relaxed) {
info!(
"spent:{}ms record: {}ms entries recorded: {}",
now.elapsed().as_millis(),
total_us / 1000,
total_times,
);
break;
}
}
})
.unwrap()
};
let hashes_per_batch = std::env::var("HASHES_PER_BATCH")
.map(|x| x.parse().unwrap())
.unwrap_or(DEFAULT_HASHES_PER_BATCH);
let poh_service = PohService::new(
poh_recorder.clone(),
&poh_config,
&exit,
0,
DEFAULT_PINNED_CPU_CORE,
hashes_per_batch,
record_receiver,
);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
// get some events
let mut hashes = 0;
let mut need_tick = true;
let mut need_entry = true;
let mut need_partial = true;
let mut num_ticks = 0;
let time = Instant::now();
while run_time != 0 || need_tick || need_entry || need_partial {
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
if entry.is_tick() {
num_ticks += 1;
assert!(
entry.num_hashes <= poh_config.hashes_per_tick.unwrap(),
"{} <= {}",
entry.num_hashes,
poh_config.hashes_per_tick.unwrap()
);
if entry.num_hashes == poh_config.hashes_per_tick.unwrap() {
need_tick = false;
} else {
need_partial = false;
}
hashes += entry.num_hashes;
assert_eq!(hashes, poh_config.hashes_per_tick.unwrap());
hashes = 0;
} else {
assert!(entry.num_hashes >= 1);
need_entry = false;
hashes += entry.num_hashes;
}
if run_time != 0 {
if time.elapsed().as_millis() > run_time {
break;
}
} else {
assert!(
time.elapsed().as_secs() < 60,
"Test should not run for this long! {}s tick {} entry {} partial {}",
time.elapsed().as_secs(),
need_tick,
need_entry,
need_partial,
);
}
}
info!(
"target_tick_duration: {} ticks_per_slot: {}",
poh_config.target_tick_duration.as_nanos(),
ticks_per_slot
);
let elapsed = time.elapsed();
info!(
"{} ticks in {}ms {}us/tick",
num_ticks,
elapsed.as_millis(),
elapsed.as_micros() / num_ticks
);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
entry_producer.join().unwrap();
}
Blockstore::destroy(&ledger_path).unwrap();
}
}

View File

@ -16,7 +16,6 @@ use crate::{
fork_choice::{ForkChoice, SelectVoteAndResetForkResult},
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
progress_map::{DuplicateStats, ForkProgress, ProgressMap, PropagatedStats},
repair_service::DuplicateSlotsResetReceiver,
result::Result,
@ -35,6 +34,7 @@ use solana_ledger::{
};
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_info;
use solana_poh::poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS};
use solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
rpc_subscriptions::RpcSubscriptions,
@ -2474,22 +2474,21 @@ impl ReplayStage {
}
#[cfg(test)]
pub(crate) mod tests {
mod tests {
use super::*;
use crate::{
consensus::test::{initialize_state, VoteSimulator},
consensus::Tower,
progress_map::ValidatorStakeInfo,
replay_stage::ReplayStage,
transaction_status_service::TransactionStatusService,
};
use crossbeam_channel::unbounded;
use solana_gossip::{cluster_info::Node, crds::Cursor};
use solana_ledger::{
blockstore::make_slot_entries,
blockstore::{entries_to_test_shreds, BlockstoreError},
blockstore_processor, create_new_tmp_ledger,
entry::{self, next_entry, Entry},
create_new_tmp_ledger,
entry::{self, Entry},
genesis_utils::{create_genesis_config, create_genesis_config_with_leader},
get_tmp_ledger_path,
shred::{
@ -2497,7 +2496,10 @@ pub(crate) mod tests {
SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD,
},
};
use solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank;
use solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc::create_test_transactions_and_populate_blockstore,
};
use solana_runtime::{
accounts_background_service::AbsRequestSender,
commitment::BlockCommitment,
@ -2510,7 +2512,7 @@ pub(crate) mod tests {
instruction::InstructionError,
packet::PACKET_DATA_SIZE,
poh_config::PohConfig,
signature::{Keypair, Signature, Signer},
signature::{Keypair, Signer},
system_transaction,
transaction::TransactionError,
};
@ -3301,68 +3303,6 @@ pub(crate) mod tests {
);
}
pub fn create_test_transactions_and_populate_blockstore(
keypairs: Vec<&Keypair>,
previous_slot: Slot,
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
) -> Vec<Signature> {
let mint_keypair = keypairs[0];
let keypair1 = keypairs[1];
let keypair2 = keypairs[2];
let keypair3 = keypairs[3];
let slot = bank.slot();
let blockhash = bank.confirmed_last_blockhash().0;
// Generate transactions for processing
// Successful transaction
let success_tx =
system_transaction::transfer(&mint_keypair, &keypair1.pubkey(), 2, blockhash);
let success_signature = success_tx.signatures[0];
let entry_1 = next_entry(&blockhash, 1, vec![success_tx]);
// Failed transaction, InstructionError
let ix_error_tx =
system_transaction::transfer(&keypair2, &keypair3.pubkey(), 10, blockhash);
let ix_error_signature = ix_error_tx.signatures[0];
let entry_2 = next_entry(&entry_1.hash, 1, vec![ix_error_tx]);
// Failed transaction
let fail_tx =
system_transaction::transfer(&mint_keypair, &keypair2.pubkey(), 2, Hash::default());
let entry_3 = next_entry(&entry_2.hash, 1, vec![fail_tx]);
let mut entries = vec![entry_1, entry_2, entry_3];
let shreds = entries_to_test_shreds(entries.clone(), slot, previous_slot, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.set_roots(&[slot]).unwrap();
let (transaction_status_sender, transaction_status_receiver) = unbounded();
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let transaction_status_service = TransactionStatusService::new(
transaction_status_receiver,
max_complete_transaction_status_slot,
blockstore,
&Arc::new(AtomicBool::new(false)),
);
// Check that process_entries successfully writes can_commit transactions statuses, and
// that they are matched properly by get_rooted_block
let _result = blockstore_processor::process_entries(
&bank,
&mut entries,
true,
Some(&TransactionStatusSender {
sender: transaction_status_sender,
enable_cpi_and_log_storage: false,
}),
Some(&replay_vote_sender),
);
transaction_status_service.join().unwrap();
vec![success_signature, ix_error_signature]
}
#[test]
fn test_write_persist_transaction_status() {
let GenesisConfigInfo {

File diff suppressed because it is too large Load Diff

View File

@ -1,124 +0,0 @@
use {
solana_gossip::cluster_info::ClusterInfo,
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
collections::HashSet,
sync::atomic::{AtomicBool, Ordering},
sync::Arc,
},
};
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum RpcHealthStatus {
Ok,
Behind { num_slots: Slot }, // Validator is behind its trusted validators
Unknown,
}
pub struct RpcHealth {
cluster_info: Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>,
health_check_slot_distance: u64,
override_health_check: Arc<AtomicBool>,
#[cfg(test)]
stub_health_status: std::sync::RwLock<Option<RpcHealthStatus>>,
}
impl RpcHealth {
pub fn new(
cluster_info: Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>,
health_check_slot_distance: u64,
override_health_check: Arc<AtomicBool>,
) -> Self {
Self {
cluster_info,
trusted_validators,
health_check_slot_distance,
override_health_check,
#[cfg(test)]
stub_health_status: std::sync::RwLock::new(None),
}
}
pub fn check(&self) -> RpcHealthStatus {
#[cfg(test)]
{
if let Some(stub_health_status) = *self.stub_health_status.read().unwrap() {
return stub_health_status;
}
}
if self.override_health_check.load(Ordering::Relaxed) {
RpcHealthStatus::Ok
} else if let Some(trusted_validators) = &self.trusted_validators {
match (
self.cluster_info
.get_accounts_hash_for_node(&self.cluster_info.id(), |hashes| {
hashes
.iter()
.max_by(|a, b| a.0.cmp(&b.0))
.map(|slot_hash| slot_hash.0)
})
.flatten(),
trusted_validators
.iter()
.filter_map(|trusted_validator| {
self.cluster_info
.get_accounts_hash_for_node(&trusted_validator, |hashes| {
hashes
.iter()
.max_by(|a, b| a.0.cmp(&b.0))
.map(|slot_hash| slot_hash.0)
})
.flatten()
})
.max(),
) {
(
Some(latest_account_hash_slot),
Some(latest_trusted_validator_account_hash_slot),
) => {
// The validator is considered healthy if its latest account hash slot is within
// `health_check_slot_distance` of the latest trusted validator's account hash slot
if latest_account_hash_slot
> latest_trusted_validator_account_hash_slot
.saturating_sub(self.health_check_slot_distance)
{
RpcHealthStatus::Ok
} else {
let num_slots = latest_trusted_validator_account_hash_slot
.saturating_sub(latest_account_hash_slot);
warn!(
"health check: behind by {} slots: me={}, latest trusted_validator={}",
num_slots,
latest_account_hash_slot,
latest_trusted_validator_account_hash_slot
);
RpcHealthStatus::Behind { num_slots }
}
}
_ => RpcHealthStatus::Unknown,
}
} else {
// No trusted validator point of reference available, so this validator is healthy
// because it's running
RpcHealthStatus::Ok
}
}
#[cfg(test)]
pub(crate) fn stub() -> Arc<Self> {
Arc::new(Self::new(
Arc::new(ClusterInfo::default()),
None,
42,
Arc::new(AtomicBool::new(false)),
))
}
#[cfg(test)]
pub(crate) fn stub_set_health_status(&self, stub_health_status: Option<RpcHealthStatus>) {
*self.stub_health_status.write().unwrap() = stub_health_status;
}
}

View File

@ -1,785 +0,0 @@
//! The `rpc_service` module implements the Solana JSON RPC service.
use crate::{
bigtable_upload_service::BigTableUploadService,
poh_recorder::PohRecorder,
rpc::{rpc_deprecated_v1_7::*, rpc_full::*, rpc_minimal::*, rpc_obsolete_v1_7::*, *},
rpc_health::*,
send_transaction_service::{LeaderInfo, SendTransactionService},
};
use jsonrpc_core::{futures::prelude::*, MetaIoHandler};
use jsonrpc_http_server::{
hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware,
RequestMiddlewareAction, ServerBuilder,
};
use regex::Regex;
use solana_client::rpc_cache::LargestAccountsCache;
use solana_gossip::cluster_info::ClusterInfo;
use solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache};
use solana_metrics::inc_new_counter_info;
use solana_rpc::{
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
};
use solana_runtime::{
bank_forks::{BankForks, SnapshotConfig},
commitment::BlockCommitmentCache,
snapshot_utils,
};
use solana_sdk::{
exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash,
native_token::lamports_to_sol, pubkey::Pubkey,
};
use std::{
collections::HashSet,
net::SocketAddr,
path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::{mpsc::channel, Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::runtime;
use tokio_util::codec::{BytesCodec, FramedRead};
const LARGEST_ACCOUNTS_CACHE_DURATION: u64 = 60 * 60 * 2;
pub struct JsonRpcService {
thread_hdl: JoinHandle<()>,
#[cfg(test)]
pub request_processor: JsonRpcRequestProcessor, // Used only by test_rpc_new()...
close_handle: Option<CloseHandle>,
}
struct RpcRequestMiddleware {
ledger_path: PathBuf,
snapshot_archive_path_regex: Regex,
snapshot_config: Option<SnapshotConfig>,
bank_forks: Arc<RwLock<BankForks>>,
health: Arc<RpcHealth>,
}
impl RpcRequestMiddleware {
pub fn new(
ledger_path: PathBuf,
snapshot_config: Option<SnapshotConfig>,
bank_forks: Arc<RwLock<BankForks>>,
health: Arc<RpcHealth>,
) -> Self {
Self {
ledger_path,
snapshot_archive_path_regex: Regex::new(
r"^/snapshot-\d+-[[:alnum:]]+\.(tar|tar\.bz2|tar\.zst|tar\.gz)$",
)
.unwrap(),
snapshot_config,
bank_forks,
health,
}
}
fn redirect(location: &str) -> hyper::Response<hyper::Body> {
hyper::Response::builder()
.status(hyper::StatusCode::SEE_OTHER)
.header(hyper::header::LOCATION, location)
.body(hyper::Body::from(String::from(location)))
.unwrap()
}
fn not_found() -> hyper::Response<hyper::Body> {
hyper::Response::builder()
.status(hyper::StatusCode::NOT_FOUND)
.body(hyper::Body::empty())
.unwrap()
}
#[allow(dead_code)]
fn internal_server_error() -> hyper::Response<hyper::Body> {
hyper::Response::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.body(hyper::Body::empty())
.unwrap()
}
fn is_file_get_path(&self, path: &str) -> bool {
match path {
DEFAULT_GENESIS_DOWNLOAD_PATH => true,
_ => {
if self.snapshot_config.is_some() {
self.snapshot_archive_path_regex.is_match(path)
} else {
false
}
}
}
}
#[cfg(unix)]
async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio_02::fs::File> {
// Stuck on tokio 0.2 until the jsonrpc crates upgrade
use tokio_02::fs::os::unix::OpenOptionsExt;
tokio_02::fs::OpenOptions::new()
.read(true)
.write(false)
.create(false)
.custom_flags(libc::O_NOFOLLOW)
.open(path)
.await
}
#[cfg(not(unix))]
async fn open_no_follow(path: impl AsRef<Path>) -> std::io::Result<tokio_02::fs::File> {
// TODO: Is there any way to achieve the same on Windows?
// Stuck on tokio 0.2 until the jsonrpc crates upgrade
tokio_02::fs::File::open(path).await
}
fn process_file_get(&self, path: &str) -> RequestMiddlewareAction {
let stem = path.split_at(1).1; // Drop leading '/' from path
let filename = {
match path {
DEFAULT_GENESIS_DOWNLOAD_PATH => {
inc_new_counter_info!("rpc-get_genesis", 1);
self.ledger_path.join(stem)
}
_ => {
inc_new_counter_info!("rpc-get_snapshot", 1);
self.snapshot_config
.as_ref()
.unwrap()
.snapshot_package_output_path
.join(stem)
}
}
};
let file_length = std::fs::metadata(&filename)
.map(|m| m.len())
.unwrap_or(0)
.to_string();
info!("get {} -> {:?} ({} bytes)", path, filename, file_length);
RequestMiddlewareAction::Respond {
should_validate_hosts: true,
response: Box::pin(async {
match Self::open_no_follow(filename).await {
Err(_) => Ok(Self::internal_server_error()),
Ok(file) => {
let stream =
FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze());
let body = hyper::Body::wrap_stream(stream);
Ok(hyper::Response::builder()
.header(hyper::header::CONTENT_LENGTH, file_length)
.body(body)
.unwrap())
}
}
}),
}
}
fn health_check(&self) -> &'static str {
let response = match self.health.check() {
RpcHealthStatus::Ok => "ok",
RpcHealthStatus::Behind { .. } => "behind",
RpcHealthStatus::Unknown => "unknown",
};
info!("health check: {}", response);
response
}
}
impl RequestMiddleware for RpcRequestMiddleware {
fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
trace!("request uri: {}", request.uri());
if let Some(ref snapshot_config) = self.snapshot_config {
if request.uri().path() == "/snapshot.tar.bz2" {
// Convenience redirect to the latest snapshot
return if let Some((snapshot_archive, _)) =
snapshot_utils::get_highest_snapshot_archive_path(
&snapshot_config.snapshot_package_output_path,
) {
RpcRequestMiddleware::redirect(&format!(
"/{}",
snapshot_archive
.file_name()
.unwrap_or_else(|| std::ffi::OsStr::new(""))
.to_str()
.unwrap_or(&"")
))
} else {
RpcRequestMiddleware::not_found()
}
.into();
}
}
if let Some(result) = process_rest(&self.bank_forks, request.uri().path()) {
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.body(hyper::Body::from(result))
.unwrap()
.into()
} else if self.is_file_get_path(request.uri().path()) {
self.process_file_get(request.uri().path())
} else if request.uri().path() == "/health" {
hyper::Response::builder()
.status(hyper::StatusCode::OK)
.body(hyper::Body::from(self.health_check()))
.unwrap()
.into()
} else {
request.into()
}
}
}
fn process_rest(bank_forks: &Arc<RwLock<BankForks>>, path: &str) -> Option<String> {
match path {
"/v0/circulating-supply" => {
let r_bank_forks = bank_forks.read().unwrap();
let bank = r_bank_forks.root_bank();
let total_supply = bank.capitalization();
let non_circulating_supply =
solana_runtime::non_circulating_supply::calculate_non_circulating_supply(&bank)
.lamports;
Some(format!(
"{}",
lamports_to_sol(total_supply - non_circulating_supply)
))
}
"/v0/total-supply" => {
let r_bank_forks = bank_forks.read().unwrap();
let bank = r_bank_forks.root_bank();
let total_supply = bank.capitalization();
Some(format!("{}", lamports_to_sol(total_supply)))
}
_ => None,
}
}
impl JsonRpcService {
#[allow(clippy::too_many_arguments)]
pub fn new(
rpc_addr: SocketAddr,
config: JsonRpcConfig,
snapshot_config: Option<SnapshotConfig>,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
poh_recorder: Option<Arc<Mutex<PohRecorder>>>,
genesis_hash: Hash,
ledger_path: &Path,
validator_exit: Arc<RwLock<Exit>>,
trusted_validators: Option<HashSet<Pubkey>>,
override_health_check: Arc<AtomicBool>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
send_transaction_retry_ms: u64,
send_transaction_leader_forward_count: u64,
max_slots: Arc<MaxSlots>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
current_transaction_status_slot: Arc<AtomicU64>,
) -> Self {
info!("rpc bound to {:?}", rpc_addr);
info!("rpc configuration: {:?}", config);
let rpc_threads = 1.max(config.rpc_threads);
let health = Arc::new(RpcHealth::new(
cluster_info.clone(),
trusted_validators,
config.health_check_slot_distance,
override_health_check,
));
let largest_accounts_cache = Arc::new(RwLock::new(LargestAccountsCache::new(
LARGEST_ACCOUNTS_CACHE_DURATION,
)));
let tpu_address = cluster_info.my_contact_info().tpu;
let runtime = Arc::new(
runtime::Builder::new_multi_thread()
.thread_name("rpc-runtime")
.enable_all()
.build()
.expect("Runtime"),
);
let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false));
let (bigtable_ledger_storage, _bigtable_ledger_upload_service) =
if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload {
runtime
.block_on(solana_storage_bigtable::LedgerStorage::new(
!config.enable_bigtable_ledger_upload,
config.rpc_bigtable_timeout,
))
.map(|bigtable_ledger_storage| {
info!("BigTable ledger storage initialized");
let bigtable_ledger_upload_service = if config.enable_bigtable_ledger_upload
{
Some(Arc::new(BigTableUploadService::new(
runtime.clone(),
bigtable_ledger_storage.clone(),
blockstore.clone(),
block_commitment_cache.clone(),
exit_bigtable_ledger_upload_service.clone(),
)))
} else {
None
};
(
Some(bigtable_ledger_storage),
bigtable_ledger_upload_service,
)
})
.unwrap_or_else(|err| {
error!("Failed to initialize BigTable ledger storage: {:?}", err);
(None, None)
})
} else {
(None, None)
};
let minimal_api = config.minimal_api;
let obsolete_v1_7_api = config.obsolete_v1_7_api;
let (request_processor, receiver) = JsonRpcRequestProcessor::new(
config,
snapshot_config.clone(),
bank_forks.clone(),
block_commitment_cache,
blockstore,
validator_exit.clone(),
health.clone(),
cluster_info.clone(),
genesis_hash,
runtime,
bigtable_ledger_storage,
optimistically_confirmed_bank,
largest_accounts_cache,
max_slots,
leader_schedule_cache,
current_transaction_status_slot,
);
let leader_info =
poh_recorder.map(|recorder| LeaderInfo::new(cluster_info.clone(), recorder));
let _send_transaction_service = Arc::new(SendTransactionService::new(
tpu_address,
&bank_forks,
leader_info,
receiver,
send_transaction_retry_ms,
send_transaction_leader_forward_count,
));
#[cfg(test)]
let test_request_processor = request_processor.clone();
let ledger_path = ledger_path.to_path_buf();
// sadly, some parts of our current rpc implemention block the jsonrpc's
// _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU,
// causing no further processing of incoming requests and ultimatily innocent clients timing-out.
// So create a (shared) multi-threaded event_loop for jsonrpc and set its .threads() to 1,
// so that we avoid the single-threaded event loops from being created automatically by
// jsonrpc for threads when .threads(N > 1) is given.
let event_loop = {
// Stuck on tokio 0.2 until the jsonrpc crates upgrade
tokio_02::runtime::Builder::new()
.core_threads(rpc_threads)
.threaded_scheduler()
.enable_all()
.thread_name("sol-rpc-el")
.build()
.unwrap()
};
let (close_handle_sender, close_handle_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-jsonrpc".to_string())
.spawn(move || {
let mut io = MetaIoHandler::default();
io.extend_with(rpc_minimal::MinimalImpl.to_delegate());
if !minimal_api {
io.extend_with(rpc_full::FullImpl.to_delegate());
io.extend_with(rpc_deprecated_v1_7::DeprecatedV1_7Impl.to_delegate());
}
if obsolete_v1_7_api {
io.extend_with(rpc_obsolete_v1_7::ObsoleteV1_7Impl.to_delegate());
}
let request_middleware = RpcRequestMiddleware::new(
ledger_path,
snapshot_config,
bank_forks.clone(),
health.clone(),
);
let server = ServerBuilder::with_meta_extractor(
io,
move |_req: &hyper::Request<hyper::Body>| request_processor.clone(),
)
.event_loop_executor(event_loop.handle().clone())
.threads(1)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
]))
.cors_max_age(86400)
.request_middleware(request_middleware)
.max_request_body_size(MAX_REQUEST_PAYLOAD_SIZE)
.start_http(&rpc_addr);
if let Err(e) = server {
warn!(
"JSON RPC service unavailable error: {:?}. \n\
Also, check that port {} is not already in use by another application",
e,
rpc_addr.port()
);
return;
}
let server = server.unwrap();
close_handle_sender.send(server.close_handle()).unwrap();
server.wait();
exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed);
})
.unwrap();
let close_handle = close_handle_receiver.recv().unwrap();
let close_handle_ = close_handle.clone();
validator_exit
.write()
.unwrap()
.register_exit(Box::new(move || close_handle_.close()));
Self {
thread_hdl,
#[cfg(test)]
request_processor: test_request_processor,
close_handle: Some(close_handle),
}
}
pub fn exit(&mut self) {
if let Some(c) = self.close_handle.take() {
c.close()
}
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rpc::create_validator_exit;
use solana_gossip::crds_value::{CrdsData, CrdsValue, SnapshotHash};
use solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
};
use solana_runtime::{
bank::Bank, bank_forks::ArchiveFormat, snapshot_utils::SnapshotVersion,
snapshot_utils::DEFAULT_MAX_SNAPSHOTS_TO_RETAIN,
};
use solana_sdk::{
genesis_config::{ClusterType, DEFAULT_GENESIS_ARCHIVE},
signature::Signer,
};
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr};
#[test]
fn test_rpc_new() {
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let exit = Arc::new(AtomicBool::new(false));
let validator_exit = create_validator_exit(&exit);
let bank = Bank::new(&genesis_config);
let cluster_info = Arc::new(ClusterInfo::default());
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let rpc_addr = SocketAddr::new(
ip_addr,
solana_net_utils::find_available_port_in_range(ip_addr, (10000, 65535)).unwrap(),
);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut rpc_service = JsonRpcService::new(
rpc_addr,
JsonRpcConfig::default(),
None,
bank_forks,
block_commitment_cache,
blockstore,
cluster_info,
None,
Hash::default(),
&PathBuf::from("farf"),
validator_exit,
None,
Arc::new(AtomicBool::new(false)),
optimistically_confirmed_bank,
1000,
1,
Arc::new(MaxSlots::default()),
Arc::new(LeaderScheduleCache::default()),
Arc::new(AtomicU64::default()),
);
let thread = rpc_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-jsonrpc");
assert_eq!(
10_000,
rpc_service
.request_processor
.get_balance(&mint_keypair.pubkey(), None)
.value
);
rpc_service.exit();
rpc_service.join().unwrap();
}
fn create_bank_forks() -> Arc<RwLock<BankForks>> {
let GenesisConfigInfo {
mut genesis_config, ..
} = create_genesis_config(10_000);
genesis_config.cluster_type = ClusterType::MainnetBeta;
let bank = Bank::new(&genesis_config);
Arc::new(RwLock::new(BankForks::new(bank)))
}
#[test]
fn test_process_rest_api() {
let bank_forks = create_bank_forks();
assert_eq!(None, process_rest(&bank_forks, "not-a-supported-rest-api"));
assert_eq!(
process_rest(&bank_forks, "/v0/circulating-supply"),
process_rest(&bank_forks, "/v0/total-supply")
);
}
#[test]
fn test_is_file_get_path() {
let bank_forks = create_bank_forks();
let rrm = RpcRequestMiddleware::new(
PathBuf::from("/"),
None,
bank_forks.clone(),
RpcHealth::stub(),
);
let rrm_with_snapshot_config = RpcRequestMiddleware::new(
PathBuf::from("/"),
Some(SnapshotConfig {
snapshot_interval_slots: 0,
snapshot_package_output_path: PathBuf::from("/"),
snapshot_path: PathBuf::from("/"),
archive_format: ArchiveFormat::TarBzip2,
snapshot_version: SnapshotVersion::default(),
maximum_snapshots_to_retain: DEFAULT_MAX_SNAPSHOTS_TO_RETAIN,
}),
bank_forks,
RpcHealth::stub(),
);
assert!(rrm.is_file_get_path(DEFAULT_GENESIS_DOWNLOAD_PATH));
assert!(!rrm.is_file_get_path(DEFAULT_GENESIS_ARCHIVE));
assert!(!rrm.is_file_get_path("/snapshot.tar.bz2")); // This is a redirect
assert!(!rrm.is_file_get_path(
"/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
));
assert!(rrm_with_snapshot_config.is_file_get_path(
"/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
));
assert!(rrm_with_snapshot_config.is_file_get_path(
"/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.zst"
));
assert!(rrm_with_snapshot_config
.is_file_get_path("/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.gz"));
assert!(rrm_with_snapshot_config
.is_file_get_path("/snapshot-100-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar"));
assert!(!rrm_with_snapshot_config.is_file_get_path(
"/snapshot-notaslotnumber-AvFf9oS8A8U78HdjT9YG2sTTThLHJZmhaMn2g8vkWYnr.tar.bz2"
));
assert!(!rrm_with_snapshot_config.is_file_get_path("../../../test/snapshot-123-xxx.tar"));
assert!(!rrm.is_file_get_path("/"));
assert!(!rrm.is_file_get_path(".."));
assert!(!rrm.is_file_get_path("🎣"));
}
#[test]
fn test_process_file_get() {
let mut runtime = tokio_02::runtime::Runtime::new().unwrap();
let ledger_path = get_tmp_ledger_path!();
std::fs::create_dir(&ledger_path).unwrap();
let genesis_path = ledger_path.join(DEFAULT_GENESIS_ARCHIVE);
let rrm = RpcRequestMiddleware::new(
ledger_path.clone(),
None,
create_bank_forks(),
RpcHealth::stub(),
);
// File does not exist => request should fail.
let action = rrm.process_file_get(DEFAULT_GENESIS_DOWNLOAD_PATH);
if let RequestMiddlewareAction::Respond { response, .. } = action {
let response = runtime.block_on(response);
let response = response.unwrap();
assert_ne!(response.status(), 200);
} else {
panic!("Unexpected RequestMiddlewareAction variant");
}
{
let mut file = std::fs::File::create(&genesis_path).unwrap();
file.write_all(b"should be ok").unwrap();
}
// Normal file exist => request should succeed.
let action = rrm.process_file_get(DEFAULT_GENESIS_DOWNLOAD_PATH);
if let RequestMiddlewareAction::Respond { response, .. } = action {
let response = runtime.block_on(response);
let response = response.unwrap();
assert_eq!(response.status(), 200);
} else {
panic!("Unexpected RequestMiddlewareAction variant");
}
#[cfg(unix)]
{
std::fs::remove_file(&genesis_path).unwrap();
{
let mut file = std::fs::File::create(ledger_path.join("wrong")).unwrap();
file.write_all(b"wrong file").unwrap();
}
symlink::symlink_file("wrong", &genesis_path).unwrap();
// File is a symbolic link => request should fail.
let action = rrm.process_file_get(DEFAULT_GENESIS_DOWNLOAD_PATH);
if let RequestMiddlewareAction::Respond { response, .. } = action {
let response = runtime.block_on(response);
let response = response.unwrap();
assert_ne!(response.status(), 200);
} else {
panic!("Unexpected RequestMiddlewareAction variant");
}
}
}
#[test]
fn test_health_check_with_no_trusted_validators() {
let rm = RpcRequestMiddleware::new(
PathBuf::from("/"),
None,
create_bank_forks(),
RpcHealth::stub(),
);
assert_eq!(rm.health_check(), "ok");
}
#[test]
fn test_health_check_with_trusted_validators() {
let cluster_info = Arc::new(ClusterInfo::default());
let health_check_slot_distance = 123;
let override_health_check = Arc::new(AtomicBool::new(false));
let trusted_validators = vec![
solana_sdk::pubkey::new_rand(),
solana_sdk::pubkey::new_rand(),
solana_sdk::pubkey::new_rand(),
];
let health = Arc::new(RpcHealth::new(
cluster_info.clone(),
Some(trusted_validators.clone().into_iter().collect()),
health_check_slot_distance,
override_health_check.clone(),
));
let rm = RpcRequestMiddleware::new(PathBuf::from("/"), None, create_bank_forks(), health);
// No account hashes for this node or any trusted validators
assert_eq!(rm.health_check(), "unknown");
// No account hashes for any trusted validators
cluster_info.push_accounts_hashes(vec![(1000, Hash::default()), (900, Hash::default())]);
cluster_info.flush_push_queue();
assert_eq!(rm.health_check(), "unknown");
// Override health check
override_health_check.store(true, Ordering::Relaxed);
assert_eq!(rm.health_check(), "ok");
override_health_check.store(false, Ordering::Relaxed);
// This node is ahead of the trusted validators
cluster_info
.gossip
.write()
.unwrap()
.crds
.insert(
CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new(
trusted_validators[0],
vec![
(1, Hash::default()),
(1001, Hash::default()),
(2, Hash::default()),
],
))),
1,
)
.unwrap();
assert_eq!(rm.health_check(), "ok");
// Node is slightly behind the trusted validators
cluster_info
.gossip
.write()
.unwrap()
.crds
.insert(
CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new(
trusted_validators[1],
vec![(1000 + health_check_slot_distance - 1, Hash::default())],
))),
1,
)
.unwrap();
assert_eq!(rm.health_check(), "ok");
// Node is far behind the trusted validators
cluster_info
.gossip
.write()
.unwrap()
.crds
.insert(
CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new(
trusted_validators[2],
vec![(1000 + health_check_slot_distance, Hash::default())],
))),
1,
)
.unwrap();
assert_eq!(rm.health_check(), "behind");
}
}

View File

@ -1,888 +0,0 @@
// TODO: Merge this implementation with the one at `banks-server/src/send_transaction_service.rs`
use crate::poh_recorder::PohRecorder;
use log::*;
use solana_gossip::cluster_info::ClusterInfo;
use solana_metrics::{datapoint_warn, inc_new_counter_info};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
hash::Hash,
nonce_account,
pubkey::Pubkey,
signature::Signature,
};
use std::sync::Mutex;
use std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
mpsc::{Receiver, RecvTimeoutError},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
};
/// Maximum size of the transaction queue
const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but maybe it needs to be bigger one day
pub struct SendTransactionService {
thread: JoinHandle<()>,
}
pub struct TransactionInfo {
pub signature: Signature,
pub wire_transaction: Vec<u8>,
pub last_valid_slot: Slot,
pub durable_nonce_info: Option<(Pubkey, Hash)>,
}
impl TransactionInfo {
pub fn new(
signature: Signature,
wire_transaction: Vec<u8>,
last_valid_slot: Slot,
durable_nonce_info: Option<(Pubkey, Hash)>,
) -> Self {
Self {
signature,
wire_transaction,
last_valid_slot,
durable_nonce_info,
}
}
}
pub struct LeaderInfo {
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>,
recent_peers: HashMap<Pubkey, SocketAddr>,
}
impl LeaderInfo {
pub fn new(cluster_info: Arc<ClusterInfo>, poh_recorder: Arc<Mutex<PohRecorder>>) -> Self {
Self {
cluster_info,
poh_recorder,
recent_peers: HashMap::new(),
}
}
pub fn refresh_recent_peers(&mut self) {
self.recent_peers = self
.cluster_info
.tpu_peers()
.into_iter()
.map(|ci| (ci.id, ci.tpu))
.collect();
}
pub fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.lock().unwrap();
let leaders: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
.collect();
drop(recorder);
let mut unique_leaders = vec![];
for leader in leaders.iter() {
if let Some(addr) = self.recent_peers.get(leader) {
if !unique_leaders.contains(&addr) {
unique_leaders.push(addr);
}
}
}
unique_leaders
}
}
#[derive(Default, Debug, PartialEq)]
struct ProcessTransactionsResult {
rooted: u64,
expired: u64,
retried: u64,
failed: u64,
retained: u64,
}
impl SendTransactionService {
pub fn new(
tpu_address: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<LeaderInfo>,
receiver: Receiver<TransactionInfo>,
retry_rate_ms: u64,
leader_forward_count: u64,
) -> Self {
let thread = Self::retry_thread(
tpu_address,
receiver,
bank_forks.clone(),
leader_info,
retry_rate_ms,
leader_forward_count,
);
Self { thread }
}
fn retry_thread(
tpu_address: SocketAddr,
receiver: Receiver<TransactionInfo>,
bank_forks: Arc<RwLock<BankForks>>,
mut leader_info: Option<LeaderInfo>,
retry_rate_ms: u64,
leader_forward_count: u64,
) -> JoinHandle<()> {
let mut last_status_check = Instant::now();
let mut last_leader_refresh = Instant::now();
let mut transactions = HashMap::new();
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
Builder::new()
.name("send-tx-sv2".to_string())
.spawn(move || loop {
match receiver.recv_timeout(Duration::from_millis(1000.min(retry_rate_ms))) {
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => {}
Ok(transaction_info) => {
let addresses = leader_info
.as_ref()
.map(|leader_info| leader_info.get_leader_tpus(leader_forward_count));
let addresses = addresses
.map(|address_list| {
if address_list.is_empty() {
vec![&tpu_address]
} else {
address_list
}
})
.unwrap_or_else(|| vec![&tpu_address]);
for address in addresses {
Self::send_transaction(
&send_socket,
address,
&transaction_info.wire_transaction,
);
}
if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE {
transactions.insert(transaction_info.signature, transaction_info);
} else {
datapoint_warn!("send_transaction_service-queue-overflow");
}
}
}
if last_status_check.elapsed().as_millis() as u64 >= retry_rate_ms {
if !transactions.is_empty() {
datapoint_info!(
"send_transaction_service-queue-size",
("len", transactions.len(), i64)
);
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(
bank_forks.root_bank().clone(),
bank_forks.working_bank().clone(),
)
};
let _result = Self::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&leader_info,
leader_forward_count,
);
}
last_status_check = Instant::now();
if last_leader_refresh.elapsed().as_millis() > 1000 {
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
last_leader_refresh = Instant::now();
}
}
})
.unwrap()
}
fn process_transactions(
working_bank: &Arc<Bank>,
root_bank: &Arc<Bank>,
send_socket: &UdpSocket,
tpu_address: &SocketAddr,
transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info: &Option<LeaderInfo>,
leader_forward_count: u64,
) -> ProcessTransactionsResult {
let mut result = ProcessTransactionsResult::default();
transactions.retain(|signature, transaction_info| {
if transaction_info.durable_nonce_info.is_some() {
inc_new_counter_info!("send_transaction_service-nonced", 1);
}
if root_bank.has_signature(signature) {
info!("Transaction is rooted: {}", signature);
result.rooted += 1;
inc_new_counter_info!("send_transaction_service-rooted", 1);
return false;
}
if let Some((nonce_pubkey, durable_nonce)) = transaction_info.durable_nonce_info {
let nonce_account = working_bank.get_account(&nonce_pubkey).unwrap_or_default();
if !nonce_account::verify_nonce_account(&nonce_account, &durable_nonce)
&& working_bank.get_signature_status_slot(signature).is_none()
{
info!("Dropping expired durable-nonce transaction: {}", signature);
result.expired += 1;
inc_new_counter_info!("send_transaction_service-expired", 1);
return false;
}
}
if transaction_info.last_valid_slot < root_bank.slot() {
info!("Dropping expired transaction: {}", signature);
result.expired += 1;
inc_new_counter_info!("send_transaction_service-expired", 1);
return false;
}
match working_bank.get_signature_status_slot(signature) {
None => {
// Transaction is unknown to the working bank, it might have been
// dropped or landed in another fork. Re-send it
info!("Retrying transaction: {}", signature);
result.retried += 1;
inc_new_counter_info!("send_transaction_service-retry", 1);
let addresses = leader_info
.as_ref()
.map(|leader_info| leader_info.get_leader_tpus(leader_forward_count));
let addresses = addresses
.map(|address_list| {
if address_list.is_empty() {
vec![tpu_address]
} else {
address_list
}
})
.unwrap_or_else(|| vec![&tpu_address]);
for address in addresses {
Self::send_transaction(
&send_socket,
address,
&transaction_info.wire_transaction,
);
}
true
}
Some((_slot, status)) => {
if status.is_err() {
info!("Dropping failed transaction: {}", signature);
result.failed += 1;
inc_new_counter_info!("send_transaction_service-failed", 1);
false
} else {
result.retained += 1;
true
}
}
}
});
result
}
fn send_transaction(
send_socket: &UdpSocket,
tpu_address: &SocketAddr,
wire_transaction: &[u8],
) {
if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) {
warn!("Failed to send transaction to {}: {:?}", tpu_address, err);
}
}
pub fn join(self) -> thread::Result<()> {
self.thread.join()
}
}
#[cfg(test)]
mod test {
use super::*;
use solana_gossip::contact_info::ContactInfo;
use solana_ledger::{
blockstore::Blockstore, get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache,
};
use solana_runtime::genesis_utils::{
create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs,
};
use solana_sdk::{
account::AccountSharedData,
fee_calculator::FeeCalculator,
genesis_config::create_genesis_config,
nonce,
poh_config::PohConfig,
pubkey::Pubkey,
signature::{Keypair, Signer},
system_program, system_transaction,
timing::timestamp,
};
use std::sync::{atomic::AtomicBool, mpsc::channel};
#[test]
fn service_exit() {
let tpu_address = "127.0.0.1:0".parse().unwrap();
let bank = Bank::default();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let (sender, receiver) = channel();
let send_tranaction_service =
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
drop(sender);
send_tranaction_service.join().unwrap();
}
#[test]
fn process_transactions() {
solana_logger::setup();
let (genesis_config, mint_keypair) = create_genesis_config(4);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let tpu_address = "127.0.0.1:0".parse().unwrap();
let leader_forward_count = 1;
let root_bank = Arc::new(Bank::new_from_parent(
&bank_forks.read().unwrap().working_bank(),
&Pubkey::default(),
1,
));
let rooted_signature = root_bank
.transfer(1, &mint_keypair, &mint_keypair.pubkey())
.unwrap();
let working_bank = Arc::new(Bank::new_from_parent(&root_bank, &Pubkey::default(), 2));
let non_rooted_signature = working_bank
.transfer(2, &mint_keypair, &mint_keypair.pubkey())
.unwrap();
let failed_signature = {
let blockhash = working_bank.last_blockhash();
let transaction =
system_transaction::transfer(&mint_keypair, &Pubkey::default(), 1, blockhash);
let signature = transaction.signatures[0];
working_bank.process_transaction(&transaction).unwrap_err();
signature
};
let mut transactions = HashMap::new();
info!("Expired transactions are dropped...");
transactions.insert(
Signature::default(),
TransactionInfo::new(Signature::default(), vec![], root_bank.slot() - 1, None),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
expired: 1,
..ProcessTransactionsResult::default()
}
);
info!("Rooted transactions are dropped...");
transactions.insert(
rooted_signature,
TransactionInfo::new(rooted_signature, vec![], working_bank.slot(), None),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
rooted: 1,
..ProcessTransactionsResult::default()
}
);
info!("Failed transactions are dropped...");
transactions.insert(
failed_signature,
TransactionInfo::new(failed_signature, vec![], working_bank.slot(), None),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
failed: 1,
..ProcessTransactionsResult::default()
}
);
info!("Non-rooted transactions are kept...");
transactions.insert(
non_rooted_signature,
TransactionInfo::new(non_rooted_signature, vec![], working_bank.slot(), None),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
result,
ProcessTransactionsResult {
retained: 1,
..ProcessTransactionsResult::default()
}
);
transactions.clear();
info!("Unknown transactions are retried...");
transactions.insert(
Signature::default(),
TransactionInfo::new(Signature::default(), vec![], working_bank.slot(), None),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
result,
ProcessTransactionsResult {
retried: 1,
..ProcessTransactionsResult::default()
}
);
}
#[test]
fn test_retry_durable_nonce_transactions() {
solana_logger::setup();
let (genesis_config, mint_keypair) = create_genesis_config(4);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let tpu_address = "127.0.0.1:0".parse().unwrap();
let leader_forward_count = 1;
let root_bank = Arc::new(Bank::new_from_parent(
&bank_forks.read().unwrap().working_bank(),
&Pubkey::default(),
1,
));
let rooted_signature = root_bank
.transfer(1, &mint_keypair, &mint_keypair.pubkey())
.unwrap();
let nonce_address = Pubkey::new_unique();
let durable_nonce = Hash::new_unique();
let nonce_state =
nonce::state::Versions::new_current(nonce::State::Initialized(nonce::state::Data {
authority: Pubkey::default(),
blockhash: durable_nonce,
fee_calculator: FeeCalculator::new(42),
}));
let nonce_account =
AccountSharedData::new_data(43, &nonce_state, &system_program::id()).unwrap();
root_bank.store_account(&nonce_address, &nonce_account);
let working_bank = Arc::new(Bank::new_from_parent(&root_bank, &Pubkey::default(), 2));
let non_rooted_signature = working_bank
.transfer(2, &mint_keypair, &mint_keypair.pubkey())
.unwrap();
let last_valid_slot = working_bank.slot() + 300;
let failed_signature = {
let blockhash = working_bank.last_blockhash();
let transaction =
system_transaction::transfer(&mint_keypair, &Pubkey::default(), 1, blockhash);
let signature = transaction.signatures[0];
working_bank.process_transaction(&transaction).unwrap_err();
signature
};
let mut transactions = HashMap::new();
info!("Rooted durable-nonce transactions are dropped...");
transactions.insert(
rooted_signature,
TransactionInfo::new(
rooted_signature,
vec![],
last_valid_slot,
Some((nonce_address, durable_nonce)),
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
rooted: 1,
..ProcessTransactionsResult::default()
}
);
// Nonce expired case
transactions.insert(
rooted_signature,
TransactionInfo::new(
rooted_signature,
vec![],
last_valid_slot,
Some((nonce_address, Hash::new_unique())),
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
rooted: 1,
..ProcessTransactionsResult::default()
}
);
// Expired durable-nonce transactions are dropped; nonce has advanced...
info!("Expired durable-nonce transactions are dropped...");
transactions.insert(
Signature::default(),
TransactionInfo::new(
Signature::default(),
vec![],
last_valid_slot,
Some((nonce_address, Hash::new_unique())),
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
expired: 1,
..ProcessTransactionsResult::default()
}
);
// ... or last_valid_slot timeout has passed
transactions.insert(
Signature::default(),
TransactionInfo::new(
Signature::default(),
vec![],
root_bank.slot() - 1,
Some((nonce_address, durable_nonce)),
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
expired: 1,
..ProcessTransactionsResult::default()
}
);
info!("Failed durable-nonce transactions are dropped...");
transactions.insert(
failed_signature,
TransactionInfo::new(
failed_signature,
vec![],
last_valid_slot,
Some((nonce_address, Hash::new_unique())), // runtime should advance nonce on failed transactions
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
failed: 1,
..ProcessTransactionsResult::default()
}
);
info!("Non-rooted durable-nonce transactions are kept...");
transactions.insert(
non_rooted_signature,
TransactionInfo::new(
non_rooted_signature,
vec![],
last_valid_slot,
Some((nonce_address, Hash::new_unique())), // runtime advances nonce when transaction lands
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
result,
ProcessTransactionsResult {
retained: 1,
..ProcessTransactionsResult::default()
}
);
transactions.clear();
info!("Unknown durable-nonce transactions are retried until nonce advances...");
transactions.insert(
Signature::default(),
TransactionInfo::new(
Signature::default(),
vec![],
last_valid_slot,
Some((nonce_address, durable_nonce)),
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
result,
ProcessTransactionsResult {
retried: 1,
..ProcessTransactionsResult::default()
}
);
// Advance nonce
let new_durable_nonce = Hash::new_unique();
let new_nonce_state =
nonce::state::Versions::new_current(nonce::State::Initialized(nonce::state::Data {
authority: Pubkey::default(),
blockhash: new_durable_nonce,
fee_calculator: FeeCalculator::new(42),
}));
let nonce_account =
AccountSharedData::new_data(43, &new_nonce_state, &system_program::id()).unwrap();
working_bank.store_account(&nonce_address, &nonce_account);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
leader_forward_count,
);
assert_eq!(transactions.len(), 0);
assert_eq!(
result,
ProcessTransactionsResult {
expired: 1,
..ProcessTransactionsResult::default()
}
);
}
#[test]
fn test_get_leader_tpus() {
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&ledger_path).unwrap();
let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypairs2 = ValidatorVoteKeypairs::new_rand();
let validator_keypairs = vec![
&validator_vote_keypairs0,
&validator_vote_keypairs1,
&validator_vote_keypairs2,
];
let GenesisConfigInfo {
genesis_config,
mint_keypair: _,
voting_keypair: _,
} = create_genesis_config_with_vote_accounts(
1_000_000_000,
&validator_keypairs,
vec![10_000; 3],
);
let bank = Arc::new(Bank::new(&genesis_config));
let (poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
bank.last_blockhash(),
0,
Some((2, 2)),
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
Arc::new(AtomicBool::default()),
);
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
));
let validator0_socket = SocketAddr::from(([127, 0, 0, 1], 1111));
let validator1_socket = SocketAddr::from(([127, 0, 0, 1], 2222));
let validator2_socket = SocketAddr::from(([127, 0, 0, 1], 3333));
let recent_peers: HashMap<_, _> = vec![
(
validator_vote_keypairs0.node_keypair.pubkey(),
validator0_socket,
),
(
validator_vote_keypairs1.node_keypair.pubkey(),
validator1_socket,
),
(
validator_vote_keypairs2.node_keypair.pubkey(),
validator2_socket,
),
]
.iter()
.cloned()
.collect();
let leader_info = LeaderInfo {
cluster_info,
poh_recorder: Arc::new(Mutex::new(poh_recorder)),
recent_peers: recent_peers.clone(),
};
let slot = bank.slot();
let first_leader =
solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap();
assert_eq!(
leader_info.get_leader_tpus(1),
vec![recent_peers.get(&first_leader).unwrap()]
);
let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
slot + NUM_CONSECUTIVE_LEADER_SLOTS,
&bank,
)
.unwrap();
let mut expected_leader_sockets = vec![
recent_peers.get(&first_leader).unwrap(),
recent_peers.get(&second_leader).unwrap(),
];
expected_leader_sockets.dedup();
assert_eq!(leader_info.get_leader_tpus(2), expected_leader_sockets);
let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS),
&bank,
)
.unwrap();
let mut expected_leader_sockets = vec![
recent_peers.get(&first_leader).unwrap(),
recent_peers.get(&second_leader).unwrap(),
recent_peers.get(&third_leader).unwrap(),
];
expected_leader_sockets.dedup();
assert_eq!(leader_info.get_leader_tpus(3), expected_leader_sockets);
for x in 4..8 {
assert!(leader_info.get_leader_tpus(x).len() <= recent_peers.len());
}
}
Blockstore::destroy(&ledger_path).unwrap();
}
}

View File

@ -1,12 +1,10 @@
use {
crate::{
rpc::JsonRpcConfig,
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
},
crate::validator::{Validator, ValidatorConfig, ValidatorStartProgress},
solana_client::rpc_client::RpcClient,
solana_gossip::{cluster_info::Node, gossip_service::discover_cluster, socketaddr},
solana_ledger::{blockstore::create_new_ledger, create_new_tmp_ledger},
solana_net_utils::PortRange,
solana_rpc::rpc::JsonRpcConfig,
solana_runtime::{
bank_forks::{ArchiveFormat, SnapshotConfig, SnapshotVersion},
genesis_utils::create_genesis_config_with_leader_ex,

View File

@ -9,13 +9,13 @@ use crate::{
VerifiedVoteSender, VoteTracker,
},
fetch_stage::FetchStage,
poh_recorder::{PohRecorder, WorkingBankEntry},
sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
};
use crossbeam_channel::unbounded;
use solana_gossip::cluster_info::ClusterInfo;
use solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender};
use solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry};
use solana_rpc::{
optimistically_confirmed_bank_tracker::BankNotificationSender,
rpc_subscriptions::RpcSubscriptions,

View File

@ -1,174 +0,0 @@
use crossbeam_channel::{Receiver, RecvTimeoutError};
use itertools::izip;
use solana_ledger::{
blockstore::Blockstore,
blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage},
};
use solana_runtime::bank::{
Bank, InnerInstructionsList, NonceRollbackInfo, TransactionLogMessages,
};
use solana_transaction_status::{InnerInstructions, Reward, TransactionStatusMeta};
use std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};
pub struct TransactionStatusService {
thread_hdl: JoinHandle<()>,
}
impl TransactionStatusService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
write_transaction_status_receiver: Receiver<TransactionStatusMessage>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-transaction-status-writer".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(RecvTimeoutError::Disconnected) = Self::write_transaction_status_batch(
&write_transaction_status_receiver,
&max_complete_transaction_status_slot,
&blockstore,
) {
break;
}
})
.unwrap();
Self { thread_hdl }
}
fn write_transaction_status_batch(
write_transaction_status_receiver: &Receiver<TransactionStatusMessage>,
max_complete_transaction_status_slot: &Arc<AtomicU64>,
blockstore: &Arc<Blockstore>,
) -> Result<(), RecvTimeoutError> {
match write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))? {
TransactionStatusMessage::Batch(TransactionStatusBatch {
bank,
transactions,
statuses,
balances,
token_balances,
inner_instructions,
transaction_logs,
rent_debits,
}) => {
let slot = bank.slot();
let inner_instructions_iter: Box<
dyn Iterator<Item = Option<InnerInstructionsList>>,
> = if let Some(inner_instructions) = inner_instructions {
Box::new(inner_instructions.into_iter())
} else {
Box::new(std::iter::repeat_with(|| None))
};
let transaction_logs_iter: Box<dyn Iterator<Item = TransactionLogMessages>> =
if let Some(transaction_logs) = transaction_logs {
Box::new(transaction_logs.into_iter())
} else {
Box::new(std::iter::repeat_with(Vec::new))
};
for (
transaction,
(status, nonce_rollback),
pre_balances,
post_balances,
pre_token_balances,
post_token_balances,
inner_instructions,
log_messages,
rent_debits,
) in izip!(
&transactions,
statuses,
balances.pre_balances,
balances.post_balances,
token_balances.pre_token_balances,
token_balances.post_token_balances,
inner_instructions_iter,
transaction_logs_iter,
rent_debits.into_iter(),
) {
if Bank::can_commit(&status) && !transaction.signatures.is_empty() {
let fee_calculator = nonce_rollback
.map(|nonce_rollback| nonce_rollback.fee_calculator())
.unwrap_or_else(|| {
bank.get_fee_calculator(&transaction.message().recent_blockhash)
})
.expect("FeeCalculator must exist");
let fee = fee_calculator.calculate_fee(transaction.message());
let (writable_keys, readonly_keys) = transaction
.message
.get_account_keys_by_lock_type(bank.demote_sysvar_write_locks());
let inner_instructions = inner_instructions.map(|inner_instructions| {
inner_instructions
.into_iter()
.enumerate()
.map(|(index, instructions)| InnerInstructions {
index: index as u8,
instructions,
})
.filter(|i| !i.instructions.is_empty())
.collect()
});
let log_messages = Some(log_messages);
let pre_token_balances = Some(pre_token_balances);
let post_token_balances = Some(post_token_balances);
let rewards = Some(
rent_debits
.0
.into_iter()
.map(|(pubkey, reward_info)| Reward {
pubkey: pubkey.to_string(),
lamports: reward_info.lamports,
post_balance: reward_info.post_balance,
reward_type: Some(reward_info.reward_type),
})
.collect(),
);
blockstore
.write_transaction_status(
slot,
transaction.signatures[0],
writable_keys,
readonly_keys,
TransactionStatusMeta {
status,
fee,
pre_balances,
post_balances,
inner_instructions,
log_messages,
pre_token_balances,
post_token_balances,
rewards,
},
)
.expect("Expect database write to succeed");
}
}
}
TransactionStatusMessage::Freeze(slot) => {
max_complete_transaction_status_slot.fetch_max(slot, Ordering::SeqCst);
}
}
Ok(())
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -13,7 +13,6 @@ use crate::{
completed_data_sets_service::CompletedDataSetsSender,
consensus::Tower,
ledger_cleanup_service::LedgerCleanupService,
poh_recorder::PohRecorder,
replay_stage::{ReplayStage, ReplayStageConfig},
retransmit_stage::RetransmitStage,
rewards_recorder_service::RewardsRecorderSender,
@ -29,6 +28,7 @@ use solana_ledger::{
blockstore_processor::TransactionStatusSender,
leader_schedule_cache::LeaderScheduleCache,
};
use solana_poh::poh_recorder::PohRecorder;
use solana_rpc::{
max_slots::MaxSlots, optimistically_confirmed_bank_tracker::BankNotificationSender,
rpc_subscriptions::RpcSubscriptions,
@ -341,7 +341,6 @@ impl Tvu {
#[cfg(test)]
pub mod tests {
use super::*;
use crate::banking_stage::create_test_recorder;
use serial_test::serial;
use solana_gossip::cluster_info::{ClusterInfo, Node};
use solana_ledger::{
@ -349,6 +348,7 @@ pub mod tests {
create_new_tmp_ledger,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
};
use solana_poh::poh_recorder::create_test_recorder;
use solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank;
use solana_runtime::bank::Bank;
use std::sync::atomic::Ordering;

View File

@ -6,18 +6,13 @@ use crate::{
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
consensus::{reconcile_blockstore_roots_with_tower, Tower},
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
poh_service::{self, PohService},
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
rpc::JsonRpcConfig,
rpc_service::JsonRpcService,
sample_performance_service::SamplePerformanceService,
serve_repair::ServeRepair,
serve_repair_service::ServeRepairService,
sigverify,
snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService},
tpu::{Tpu, DEFAULT_TPU_COALESCE_MS},
transaction_status_service::TransactionStatusService,
tvu::{Sockets, Tvu, TvuConfig},
};
use crossbeam_channel::{bounded, unbounded};
@ -41,13 +36,20 @@ use solana_ledger::{
};
use solana_measure::measure::Measure;
use solana_metrics::datapoint_info;
use solana_poh::{
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
poh_service::{self, PohService},
};
use solana_rpc::{
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
},
rpc::JsonRpcConfig,
rpc_pubsub_service::{PubSubConfig, PubSubService},
rpc_service::JsonRpcService,
rpc_subscriptions::RpcSubscriptions,
transaction_status_service::TransactionStatusService,
};
use solana_runtime::{
accounts_index::AccountSecondaryIndexes,
@ -68,7 +70,6 @@ use solana_sdk::{
timing::timestamp,
};
use solana_vote_program::vote_state::VoteState;
use std::time::Instant;
use std::{
collections::HashSet,
net::SocketAddr,
@ -78,7 +79,7 @@ use std::{
sync::mpsc::Receiver,
sync::{Arc, Mutex, RwLock},
thread::{sleep, Builder},
time::Duration,
time::{Duration, Instant},
};
const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;