Move test-validator to own module to reduce core dependencies (#20658)

* Move test-validator to own module to reduce core dependencies

* Fix a few TestValidator paths

* Use solana_test_validator crate for solana_test_validator bin

* Move client int tests to separate crate

Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
sakridge
2021-10-28 18:27:07 -07:00
committed by GitHub
parent e16c060abf
commit a8d78e89d3
26 changed files with 193 additions and 27 deletions

View File

@@ -34,10 +34,8 @@ rayon = "1.5.1"
retain_mut = "0.1.4"
serde = "1.0.130"
serde_derive = "1.0.103"
solana-account-decoder = { path = "../account-decoder", version = "=1.9.0" }
solana-accountsdb-plugin-manager = { path = "../accountsdb-plugin-manager", version = "=1.9.0" }
solana-client = { path = "../client", version = "=1.9.0" }
solana-config-program = { path = "../programs/config", version = "=1.9.0" }
solana-entry = { path = "../entry", version = "=1.9.0" }
solana-gossip = { path = "../gossip", version = "=1.9.0" }
solana-ledger = { path = "../ledger", version = "=1.9.0" }
@@ -47,7 +45,6 @@ solana-metrics = { path = "../metrics", version = "=1.9.0" }
solana-net-utils = { path = "../net-utils", version = "=1.9.0" }
solana-perf = { path = "../perf", version = "=1.9.0" }
solana-poh = { path = "../poh", version = "=1.9.0" }
solana-program-test = { path = "../program-test", version = "=1.9.0" }
solana-rpc = { path = "../rpc", version = "=1.9.0" }
solana-replica-lib = { path = "../replica-lib", version = "=1.9.0" }
solana-runtime = { path = "../runtime", version = "=1.9.0" }

View File

@@ -50,7 +50,6 @@ pub mod sigverify_shreds;
pub mod sigverify_stage;
pub mod snapshot_packager_service;
pub mod system_monitor_service;
pub mod test_validator;
pub mod tower_storage;
pub mod tpu;
pub mod tree_diff;

View File

@@ -1,682 +0,0 @@
use {
crate::{
tower_storage::TowerStorage,
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
},
solana_client::rpc_client::RpcClient,
solana_gossip::{
cluster_info::{ClusterInfo, Node},
gossip_service::discover_cluster,
socketaddr,
},
solana_ledger::{blockstore::create_new_ledger, create_new_tmp_ledger},
solana_net_utils::PortRange,
solana_rpc::rpc::JsonRpcConfig,
solana_runtime::{
genesis_utils::create_genesis_config_with_leader_ex,
hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, snapshot_config::SnapshotConfig,
},
solana_sdk::{
account::{Account, AccountSharedData},
clock::{Slot, DEFAULT_MS_PER_SLOT},
commitment_config::CommitmentConfig,
epoch_schedule::EpochSchedule,
exit::Exit,
fee_calculator::{FeeCalculator, FeeRateGovernor},
hash::Hash,
instruction::{AccountMeta, Instruction},
message::Message,
native_token::sol_to_lamports,
pubkey::Pubkey,
rent::Rent,
signature::{read_keypair_file, write_keypair_file, Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
std::{
collections::HashMap,
fs::remove_dir_all,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
sync::{Arc, RwLock},
thread::sleep,
time::Duration,
},
};
#[derive(Clone)]
pub struct ProgramInfo {
pub program_id: Pubkey,
pub loader: Pubkey,
pub program_path: PathBuf,
}
#[derive(Debug)]
pub struct TestValidatorNodeConfig {
gossip_addr: SocketAddr,
port_range: PortRange,
bind_ip_addr: IpAddr,
}
impl Default for TestValidatorNodeConfig {
fn default() -> Self {
const MIN_PORT_RANGE: u16 = 1024;
const MAX_PORT_RANGE: u16 = 65535;
let bind_ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let port_range = (MIN_PORT_RANGE, MAX_PORT_RANGE);
Self {
gossip_addr: socketaddr!("127.0.0.1:0"),
port_range,
bind_ip_addr,
}
}
}
#[derive(Default)]
pub struct TestValidatorGenesis {
fee_rate_governor: FeeRateGovernor,
ledger_path: Option<PathBuf>,
tower_storage: Option<Arc<dyn TowerStorage>>,
pub rent: Rent,
rpc_config: JsonRpcConfig,
rpc_ports: Option<(u16, u16)>, // (JsonRpc, JsonRpcPubSub), None == random ports
warp_slot: Option<Slot>,
no_bpf_jit: bool,
accounts: HashMap<Pubkey, AccountSharedData>,
programs: Vec<ProgramInfo>,
epoch_schedule: Option<EpochSchedule>,
node_config: TestValidatorNodeConfig,
pub validator_exit: Arc<RwLock<Exit>>,
pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
pub max_ledger_shreds: Option<u64>,
}
impl TestValidatorGenesis {
pub fn ledger_path<P: Into<PathBuf>>(&mut self, ledger_path: P) -> &mut Self {
self.ledger_path = Some(ledger_path.into());
self
}
pub fn tower_storage(&mut self, tower_storage: Arc<dyn TowerStorage>) -> &mut Self {
self.tower_storage = Some(tower_storage);
self
}
/// Check if a given TestValidator ledger has already been initialized
pub fn ledger_exists(ledger_path: &Path) -> bool {
ledger_path.join("vote-account-keypair.json").exists()
}
pub fn fee_rate_governor(&mut self, fee_rate_governor: FeeRateGovernor) -> &mut Self {
self.fee_rate_governor = fee_rate_governor;
self
}
pub fn epoch_schedule(&mut self, epoch_schedule: EpochSchedule) -> &mut Self {
self.epoch_schedule = Some(epoch_schedule);
self
}
pub fn rent(&mut self, rent: Rent) -> &mut Self {
self.rent = rent;
self
}
pub fn rpc_config(&mut self, rpc_config: JsonRpcConfig) -> &mut Self {
self.rpc_config = rpc_config;
self
}
pub fn rpc_port(&mut self, rpc_port: u16) -> &mut Self {
self.rpc_ports = Some((rpc_port, rpc_port + 1));
self
}
pub fn faucet_addr(&mut self, faucet_addr: Option<SocketAddr>) -> &mut Self {
self.rpc_config.faucet_addr = faucet_addr;
self
}
pub fn warp_slot(&mut self, warp_slot: Slot) -> &mut Self {
self.warp_slot = Some(warp_slot);
self
}
pub fn bpf_jit(&mut self, bpf_jit: bool) -> &mut Self {
self.no_bpf_jit = !bpf_jit;
self
}
pub fn gossip_host(&mut self, gossip_host: IpAddr) -> &mut Self {
self.node_config.gossip_addr.set_ip(gossip_host);
self
}
pub fn gossip_port(&mut self, gossip_port: u16) -> &mut Self {
self.node_config.gossip_addr.set_port(gossip_port);
self
}
pub fn port_range(&mut self, port_range: PortRange) -> &mut Self {
self.node_config.port_range = port_range;
self
}
pub fn bind_ip_addr(&mut self, bind_ip_addr: IpAddr) -> &mut Self {
self.node_config.bind_ip_addr = bind_ip_addr;
self
}
/// Add an account to the test environment
pub fn add_account(&mut self, address: Pubkey, account: AccountSharedData) -> &mut Self {
self.accounts.insert(address, account);
self
}
pub fn add_accounts<T>(&mut self, accounts: T) -> &mut Self
where
T: IntoIterator<Item = (Pubkey, AccountSharedData)>,
{
for (address, account) in accounts {
self.add_account(address, account);
}
self
}
pub fn clone_accounts<T>(&mut self, addresses: T, rpc_client: &RpcClient) -> &mut Self
where
T: IntoIterator<Item = Pubkey>,
{
for address in addresses {
info!("Fetching {} over RPC...", address);
let account = rpc_client.get_account(&address).unwrap_or_else(|err| {
error!("Failed to fetch {}: {}", address, err);
crate::validator::abort();
});
self.add_account(address, AccountSharedData::from(account));
}
self
}
/// Add an account to the test environment with the account data in the provided `filename`
pub fn add_account_with_file_data(
&mut self,
address: Pubkey,
lamports: u64,
owner: Pubkey,
filename: &str,
) -> &mut Self {
self.add_account(
address,
AccountSharedData::from(Account {
lamports,
data: solana_program_test::read_file(
solana_program_test::find_file(filename).unwrap_or_else(|| {
panic!("Unable to locate {}", filename);
}),
),
owner,
executable: false,
rent_epoch: 0,
}),
)
}
/// Add an account to the test environment with the account data in the provided as a base 64
/// string
pub fn add_account_with_base64_data(
&mut self,
address: Pubkey,
lamports: u64,
owner: Pubkey,
data_base64: &str,
) -> &mut Self {
self.add_account(
address,
AccountSharedData::from(Account {
lamports,
data: base64::decode(data_base64)
.unwrap_or_else(|err| panic!("Failed to base64 decode: {}", err)),
owner,
executable: false,
rent_epoch: 0,
}),
)
}
/// Add a BPF program to the test environment.
///
/// `program_name` will also used to locate the BPF shared object in the current or fixtures
/// directory.
pub fn add_program(&mut self, program_name: &str, program_id: Pubkey) -> &mut Self {
let program_path = solana_program_test::find_file(&format!("{}.so", program_name))
.unwrap_or_else(|| panic!("Unable to locate program {}", program_name));
self.programs.push(ProgramInfo {
program_id,
loader: solana_sdk::bpf_loader::id(),
program_path,
});
self
}
/// Add a list of programs to the test environment.
///pub fn add_programs_with_path<'a>(&'a mut self, programs: &[ProgramInfo]) -> &'a mut Self {
pub fn add_programs_with_path(&mut self, programs: &[ProgramInfo]) -> &mut Self {
for program in programs {
self.programs.push(program.clone());
}
self
}
/// Start a test validator with the address of the mint account that will receive tokens
/// created at genesis.
///
pub fn start_with_mint_address(
&self,
mint_address: Pubkey,
socket_addr_space: SocketAddrSpace,
) -> Result<TestValidator, Box<dyn std::error::Error>> {
TestValidator::start(mint_address, self, socket_addr_space)
}
/// Start a test validator
///
/// Returns a new `TestValidator` as well as the keypair for the mint account that will receive tokens
/// created at genesis.
///
/// This function panics on initialization failure.
pub fn start(&self) -> (TestValidator, Keypair) {
self.start_with_socket_addr_space(SocketAddrSpace::new(/*allow_private_addr=*/ true))
}
/// Start a test validator with the given `SocketAddrSpace`
///
/// Returns a new `TestValidator` as well as the keypair for the mint account that will receive tokens
/// created at genesis.
///
/// This function panics on initialization failure.
pub fn start_with_socket_addr_space(
&self,
socket_addr_space: SocketAddrSpace,
) -> (TestValidator, Keypair) {
let mint_keypair = Keypair::new();
TestValidator::start(mint_keypair.pubkey(), self, socket_addr_space)
.map(|test_validator| (test_validator, mint_keypair))
.expect("Test validator failed to start")
}
}
pub struct TestValidator {
ledger_path: PathBuf,
preserve_ledger: bool,
rpc_pubsub_url: String,
rpc_url: String,
tpu: SocketAddr,
gossip: SocketAddr,
validator: Option<Validator>,
vote_account_address: Pubkey,
}
impl TestValidator {
/// Create and start a `TestValidator` with no transaction fees and minimal rent.
/// Faucet optional.
///
/// This function panics on initialization failure.
pub fn with_no_fees(
mint_address: Pubkey,
faucet_addr: Option<SocketAddr>,
socket_addr_space: SocketAddrSpace,
) -> Self {
TestValidatorGenesis::default()
.fee_rate_governor(FeeRateGovernor::new(0, 0))
.rent(Rent {
lamports_per_byte_year: 1,
exemption_threshold: 1.0,
..Rent::default()
})
.faucet_addr(faucet_addr)
.start_with_mint_address(mint_address, socket_addr_space)
.expect("validator start failed")
}
/// Create and start a `TestValidator` with custom transaction fees and minimal rent.
/// Faucet optional.
///
/// This function panics on initialization failure.
pub fn with_custom_fees(
mint_address: Pubkey,
target_lamports_per_signature: u64,
faucet_addr: Option<SocketAddr>,
socket_addr_space: SocketAddrSpace,
) -> Self {
TestValidatorGenesis::default()
.fee_rate_governor(FeeRateGovernor::new(target_lamports_per_signature, 0))
.rent(Rent {
lamports_per_byte_year: 1,
exemption_threshold: 1.0,
..Rent::default()
})
.faucet_addr(faucet_addr)
.start_with_mint_address(mint_address, socket_addr_space)
.expect("validator start failed")
}
/// Initialize the ledger directory
///
/// If `ledger_path` is `None`, a temporary ledger will be created. Otherwise the ledger will
/// be initialized in the provided directory if it doesn't already exist.
///
/// Returns the path to the ledger directory.
fn initialize_ledger(
mint_address: Pubkey,
config: &TestValidatorGenesis,
) -> Result<PathBuf, Box<dyn std::error::Error>> {
let validator_identity = Keypair::new();
let validator_vote_account = Keypair::new();
let validator_stake_account = Keypair::new();
let validator_identity_lamports = sol_to_lamports(500.);
let validator_stake_lamports = sol_to_lamports(1_000_000.);
let mint_lamports = sol_to_lamports(500_000_000.);
let mut accounts = config.accounts.clone();
for (address, account) in solana_program_test::programs::spl_programs(&config.rent) {
accounts.entry(address).or_insert(account);
}
for program in &config.programs {
let data = solana_program_test::read_file(&program.program_path);
accounts.insert(
program.program_id,
AccountSharedData::from(Account {
lamports: Rent::default().minimum_balance(data.len()).min(1),
data,
owner: program.loader,
executable: true,
rent_epoch: 0,
}),
);
}
let mut genesis_config = create_genesis_config_with_leader_ex(
mint_lamports,
&mint_address,
&validator_identity.pubkey(),
&validator_vote_account.pubkey(),
&validator_stake_account.pubkey(),
validator_stake_lamports,
validator_identity_lamports,
config.fee_rate_governor.clone(),
config.rent,
solana_sdk::genesis_config::ClusterType::Development,
accounts.into_iter().collect(),
);
genesis_config.epoch_schedule = config
.epoch_schedule
.unwrap_or_else(EpochSchedule::without_warmup);
let ledger_path = match &config.ledger_path {
None => create_new_tmp_ledger!(&genesis_config).0,
Some(ledger_path) => {
if TestValidatorGenesis::ledger_exists(ledger_path) {
return Ok(ledger_path.to_path_buf());
}
let _ = create_new_ledger(
ledger_path,
&genesis_config,
MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
solana_ledger::blockstore_db::AccessType::PrimaryOnly,
)
.map_err(|err| {
format!(
"Failed to create ledger at {}: {}",
ledger_path.display(),
err
)
})?;
ledger_path.to_path_buf()
}
};
write_keypair_file(
&validator_identity,
ledger_path.join("validator-keypair.json").to_str().unwrap(),
)?;
// `ledger_exists` should fail until the vote account keypair is written
assert!(!TestValidatorGenesis::ledger_exists(&ledger_path));
write_keypair_file(
&validator_vote_account,
ledger_path
.join("vote-account-keypair.json")
.to_str()
.unwrap(),
)?;
Ok(ledger_path)
}
/// Starts a TestValidator at the provided ledger directory
fn start(
mint_address: Pubkey,
config: &TestValidatorGenesis,
socket_addr_space: SocketAddrSpace,
) -> Result<Self, Box<dyn std::error::Error>> {
let preserve_ledger = config.ledger_path.is_some();
let ledger_path = TestValidator::initialize_ledger(mint_address, config)?;
let validator_identity =
read_keypair_file(ledger_path.join("validator-keypair.json").to_str().unwrap())?;
let validator_vote_account = read_keypair_file(
ledger_path
.join("vote-account-keypair.json")
.to_str()
.unwrap(),
)?;
let mut node = Node::new_single_bind(
&validator_identity.pubkey(),
&config.node_config.gossip_addr,
config.node_config.port_range,
config.node_config.bind_ip_addr,
);
if let Some((rpc, rpc_pubsub)) = config.rpc_ports {
node.info.rpc = SocketAddr::new(node.info.gossip.ip(), rpc);
node.info.rpc_pubsub = SocketAddr::new(node.info.gossip.ip(), rpc_pubsub);
}
let vote_account_address = validator_vote_account.pubkey();
let rpc_url = format!("http://{}", node.info.rpc);
let rpc_pubsub_url = format!("ws://{}/", node.info.rpc_pubsub);
let tpu = node.info.tpu;
let gossip = node.info.gossip;
{
let mut authorized_voter_keypairs = config.authorized_voter_keypairs.write().unwrap();
if !authorized_voter_keypairs
.iter()
.any(|x| x.pubkey() == vote_account_address)
{
authorized_voter_keypairs.push(Arc::new(validator_vote_account))
}
}
let mut validator_config = ValidatorConfig {
rpc_addrs: Some((
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()),
SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
node.info.rpc_pubsub.port(),
),
)),
rpc_config: config.rpc_config.clone(),
accounts_hash_interval_slots: 100,
account_paths: vec![ledger_path.join("accounts")],
poh_verify: false, // Skip PoH verification of ledger on startup for speed
snapshot_config: Some(SnapshotConfig {
full_snapshot_archive_interval_slots: 100,
incremental_snapshot_archive_interval_slots: Slot::MAX,
bank_snapshots_dir: ledger_path.join("snapshot"),
snapshot_archives_dir: ledger_path.to_path_buf(),
..SnapshotConfig::default()
}),
enforce_ulimit_nofile: false,
warp_slot: config.warp_slot,
bpf_jit: !config.no_bpf_jit,
validator_exit: config.validator_exit.clone(),
rocksdb_compaction_interval: Some(100), // Compact every 100 slots
max_ledger_shreds: config.max_ledger_shreds,
no_wait_for_vote_to_start_leader: true,
..ValidatorConfig::default()
};
if let Some(ref tower_storage) = config.tower_storage {
validator_config.tower_storage = tower_storage.clone();
}
let validator = Some(Validator::new(
node,
Arc::new(validator_identity),
&ledger_path,
&vote_account_address,
config.authorized_voter_keypairs.clone(),
vec![],
&validator_config,
true, // should_check_duplicate_instance
config.start_progress.clone(),
socket_addr_space,
));
// Needed to avoid panics in `solana-responder-gossip` in tests that create a number of
// test validators concurrently...
discover_cluster(&gossip, 1, socket_addr_space)
.map_err(|err| format!("TestValidator startup failed: {:?}", err))?;
// This is a hack to delay until the fees are non-zero for test consistency
// (fees from genesis are zero until the first block with a transaction in it is completed
// due to a bug in the Bank)
{
let rpc_client =
RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::processed());
let message = Message::new(
&[Instruction::new_with_bytes(
Pubkey::new_unique(),
&[],
vec![AccountMeta::new(Pubkey::new_unique(), true)],
)],
None,
);
const MAX_TRIES: u64 = 10;
let mut num_tries = 0;
loop {
num_tries += 1;
if num_tries > MAX_TRIES {
break;
}
println!("Waiting for fees to stabilize {:?}...", num_tries);
match rpc_client.get_latest_blockhash() {
Ok(_) => match rpc_client.get_fee_for_message(&message) {
Ok(fee) => {
if fee != 0 {
break;
}
}
Err(err) => {
warn!("get_fee_for_message() failed: {:?}", err);
break;
}
},
Err(err) => {
warn!("get_latest_blockhash() failed: {:?}", err);
break;
}
}
sleep(Duration::from_millis(DEFAULT_MS_PER_SLOT));
}
}
Ok(TestValidator {
ledger_path,
preserve_ledger,
rpc_pubsub_url,
rpc_url,
tpu,
gossip,
validator,
vote_account_address,
})
}
/// Return the validator's TPU address
pub fn tpu(&self) -> &SocketAddr {
&self.tpu
}
/// Return the validator's Gossip address
pub fn gossip(&self) -> &SocketAddr {
&self.gossip
}
/// Return the validator's JSON RPC URL
pub fn rpc_url(&self) -> String {
self.rpc_url.clone()
}
/// Return the validator's JSON RPC PubSub URL
pub fn rpc_pubsub_url(&self) -> String {
self.rpc_pubsub_url.clone()
}
/// Return the validator's vote account address
pub fn vote_account_address(&self) -> Pubkey {
self.vote_account_address
}
/// Return an RpcClient for the validator. As a convenience, also return a recent blockhash and
/// associated fee calculator
#[deprecated(since = "1.9.0", note = "Please use `get_rpc_client` instead")]
pub fn rpc_client(&self) -> (RpcClient, Hash, FeeCalculator) {
let rpc_client =
RpcClient::new_with_commitment(self.rpc_url.clone(), CommitmentConfig::processed());
#[allow(deprecated)]
let (recent_blockhash, fee_calculator) = rpc_client
.get_recent_blockhash()
.expect("get_recent_blockhash");
(rpc_client, recent_blockhash, fee_calculator)
}
/// Return an RpcClient for the validator.
pub fn get_rpc_client(&self) -> RpcClient {
RpcClient::new_with_commitment(self.rpc_url.clone(), CommitmentConfig::processed())
}
pub fn join(mut self) {
if let Some(validator) = self.validator.take() {
validator.join();
}
}
pub fn cluster_info(&self) -> Arc<ClusterInfo> {
self.validator.as_ref().unwrap().cluster_info.clone()
}
}
impl Drop for TestValidator {
fn drop(&mut self) {
if let Some(validator) = self.validator.take() {
validator.close();
}
if !self.preserve_ledger {
remove_dir_all(&self.ledger_path).unwrap_or_else(|err| {
panic!(
"Failed to remove ledger directory {}: {}",
self.ledger_path.display(),
err
)
});
}
}
}

View File

@@ -292,7 +292,7 @@ pub struct Validator {
}
// in the distant future, get rid of ::new()/exit() and use Result properly...
pub(crate) fn abort() -> ! {
pub fn abort() -> ! {
#[cfg(not(test))]
{
// standard error is usually redirected to a log file, cry for help on standard output as

View File

@@ -1,146 +0,0 @@
use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo};
use solana_core::test_validator::TestValidator;
use solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
rpc_pubsub_service::{PubSubConfig, PubSubService},
rpc_subscriptions::RpcSubscriptions,
};
use solana_runtime::{
bank::Bank,
bank_forks::BankForks,
commitment::BlockCommitmentCache,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
};
use solana_sdk::{
commitment_config::CommitmentConfig,
native_token::sol_to_lamports,
rpc_port,
signature::{Keypair, Signer},
system_transaction,
};
use solana_streamer::socket::SocketAddrSpace;
use std::{
net::{IpAddr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::sleep,
time::{Duration, Instant},
};
use systemstat::Ipv4Addr;
#[test]
fn test_rpc_client() {
solana_logger::setup();
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let bob_pubkey = solana_sdk::pubkey::new_rand();
let client = RpcClient::new(test_validator.rpc_url());
assert_eq!(
client.get_version().unwrap().solana_core,
solana_version::semver!()
);
assert!(client.get_account(&bob_pubkey).is_err());
assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 0);
let original_alice_balance = client.get_balance(&alice.pubkey()).unwrap();
let blockhash = client.get_latest_blockhash().unwrap();
let tx = system_transaction::transfer(&alice, &bob_pubkey, sol_to_lamports(20.0), blockhash);
let signature = client.send_transaction(&tx).unwrap();
let mut confirmed_tx = false;
let now = Instant::now();
while now.elapsed().as_secs() <= 20 {
let response = client
.confirm_transaction_with_commitment(&signature, CommitmentConfig::default())
.unwrap();
if response.value {
confirmed_tx = true;
break;
}
sleep(Duration::from_millis(500));
}
assert!(confirmed_tx);
assert_eq!(
client.get_balance(&bob_pubkey).unwrap(),
sol_to_lamports(20.0)
);
assert_eq!(
client.get_balance(&alice.pubkey()).unwrap(),
original_alice_balance - sol_to_lamports(20.0)
);
}
#[test]
fn test_slot_subscription() {
let pubsub_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
optimistically_confirmed_bank,
));
let (trigger, pubsub_service) =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
std::thread::sleep(Duration::from_millis(400));
let (mut client, receiver) =
PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
let mut errors: Vec<(SlotInfo, SlotInfo)> = Vec::new();
for i in 0..3 {
subscriptions.notify_slot(i + 1, i, i);
let maybe_actual = receiver.recv_timeout(Duration::from_millis(400));
match maybe_actual {
Ok(actual) => {
let expected = SlotInfo {
slot: i + 1,
parent: i,
root: i,
};
if actual != expected {
errors.push((actual, expected));
}
}
Err(_err) => {
eprintln!("unexpected websocket receive timeout");
break;
}
}
}
exit.store(true, Ordering::Relaxed);
trigger.cancel();
client.shutdown().unwrap();
pubsub_service.close().unwrap();
assert_eq!(errors, [].to_vec());
}

View File

@@ -1,457 +0,0 @@
use bincode::serialize;
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::ws;
use log::*;
use reqwest::{self, header::CONTENT_TYPE};
use serde_json::{json, Value};
use solana_account_decoder::UiAccount;
use solana_client::{
client_error::{ClientErrorKind, Result as ClientResult},
rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig},
rpc_request::RpcError,
rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotUpdate},
tpu_client::{TpuClient, TpuClientConfig},
};
use solana_core::test_validator::TestValidator;
use solana_rpc::rpc_pubsub::gen_client::Client as PubsubClient;
use solana_sdk::{
commitment_config::CommitmentConfig,
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
system_transaction,
transaction::Transaction,
};
use solana_streamer::socket::SocketAddrSpace;
use solana_transaction_status::TransactionStatus;
use std::{
collections::HashSet,
net::UdpSocket,
sync::{mpsc::channel, Arc},
thread::sleep,
time::{Duration, Instant},
};
use tokio::runtime::Runtime;
macro_rules! json_req {
($method: expr, $params: expr) => {{
json!({
"jsonrpc": "2.0",
"id": 1,
"method": $method,
"params": $params,
})
}}
}
fn post_rpc(request: Value, rpc_url: &str) -> Value {
let client = reqwest::blocking::Client::new();
let response = client
.post(rpc_url)
.header(CONTENT_TYPE, "application/json")
.body(request.to_string())
.send()
.unwrap();
serde_json::from_str(&response.text().unwrap()).unwrap()
}
#[test]
fn test_rpc_send_tx() {
solana_logger::setup();
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let rpc_url = test_validator.rpc_url();
let bob_pubkey = solana_sdk::pubkey::new_rand();
let req = json_req!("getRecentBlockhash", json!([]));
let json = post_rpc(req, &rpc_url);
let blockhash: Hash = json["result"]["value"]["blockhash"]
.as_str()
.unwrap()
.parse()
.unwrap();
info!("blockhash: {:?}", blockhash);
let tx = system_transaction::transfer(&alice, &bob_pubkey, 20, blockhash);
let serialized_encoded_tx = bs58::encode(serialize(&tx).unwrap()).into_string();
let req = json_req!("sendTransaction", json!([serialized_encoded_tx]));
let json: Value = post_rpc(req, &rpc_url);
let signature = &json["result"];
let mut confirmed_tx = false;
let request = json_req!("getSignatureStatuses", [[signature]]);
for _ in 0..solana_sdk::clock::DEFAULT_TICKS_PER_SLOT {
let json = post_rpc(request.clone(), &rpc_url);
let result: Option<TransactionStatus> =
serde_json::from_value(json["result"]["value"][0].clone()).unwrap();
if let Some(result) = result.as_ref() {
if result.err.is_none() {
confirmed_tx = true;
break;
}
}
sleep(Duration::from_millis(500));
}
assert!(confirmed_tx);
use solana_account_decoder::UiAccountEncoding;
use solana_client::rpc_config::RpcAccountInfoConfig;
let config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: None,
data_slice: None,
};
let req = json_req!(
"getAccountInfo",
json!([bs58::encode(bob_pubkey).into_string(), config])
);
let json: Value = post_rpc(req, &rpc_url);
info!("{:?}", json["result"]["value"]);
}
#[test]
fn test_rpc_invalid_requests() {
solana_logger::setup();
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let rpc_url = test_validator.rpc_url();
let bob_pubkey = solana_sdk::pubkey::new_rand();
// test invalid get_balance request
let req = json_req!("getBalance", json!(["invalid9999"]));
let json = post_rpc(req, &rpc_url);
let the_error = json["error"]["message"].as_str().unwrap();
assert_eq!(the_error, "Invalid param: Invalid");
// test invalid get_account_info request
let req = json_req!("getAccountInfo", json!(["invalid9999"]));
let json = post_rpc(req, &rpc_url);
let the_error = json["error"]["message"].as_str().unwrap();
assert_eq!(the_error, "Invalid param: Invalid");
// test invalid get_account_info request
let req = json_req!("getAccountInfo", json!([bob_pubkey.to_string()]));
let json = post_rpc(req, &rpc_url);
let the_value = &json["result"]["value"];
assert!(the_value.is_null());
}
#[test]
fn test_rpc_slot_updates() {
solana_logger::setup();
let test_validator =
TestValidator::with_no_fees(Pubkey::new_unique(), None, SocketAddrSpace::Unspecified);
// Create the pub sub runtime
let rt = Runtime::new().unwrap();
let rpc_pubsub_url = test_validator.rpc_pubsub_url();
let (update_sender, update_receiver) = channel::<Arc<SlotUpdate>>();
// Subscribe to slot updates
rt.spawn(async move {
let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap();
let client = connect.await.unwrap();
tokio::spawn(async move {
let mut update_sub = client.slots_updates_subscribe().unwrap();
loop {
let response = update_sub.next().await.unwrap();
update_sender.send(response.unwrap()).unwrap();
}
});
});
let first_update = update_receiver
.recv_timeout(Duration::from_secs(2))
.unwrap();
// Verify that updates are received in order for an upcoming slot
let verify_slot = first_update.slot() + 2;
let mut expected_update_index = 0;
let expected_updates = vec![
"CreatedBank",
"Completed",
"Frozen",
"OptimisticConfirmation",
"Root",
];
let test_start = Instant::now();
loop {
assert!(test_start.elapsed() < Duration::from_secs(30));
let update = update_receiver
.recv_timeout(Duration::from_secs(2))
.unwrap();
if update.slot() == verify_slot {
let update_name = match *update {
SlotUpdate::CreatedBank { .. } => "CreatedBank",
SlotUpdate::Completed { .. } => "Completed",
SlotUpdate::Frozen { .. } => "Frozen",
SlotUpdate::OptimisticConfirmation { .. } => "OptimisticConfirmation",
SlotUpdate::Root { .. } => "Root",
_ => continue,
};
assert_eq!(update_name, expected_updates[expected_update_index]);
expected_update_index += 1;
if expected_update_index == expected_updates.len() {
break;
}
}
}
}
#[test]
fn test_rpc_subscriptions() {
solana_logger::setup();
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
transactions_socket.connect(test_validator.tpu()).unwrap();
let rpc_client = RpcClient::new(test_validator.rpc_url());
let recent_blockhash = rpc_client.get_latest_blockhash().unwrap();
// Create transaction signatures to subscribe to
let transactions: Vec<Transaction> = (0..1000)
.map(|_| {
system_transaction::transfer(
&alice,
&solana_sdk::pubkey::new_rand(),
1,
recent_blockhash,
)
})
.collect();
let mut signature_set: HashSet<String> = transactions
.iter()
.map(|tx| tx.signatures[0].to_string())
.collect();
let account_set: HashSet<String> = transactions
.iter()
.map(|tx| tx.message.account_keys[1].to_string())
.collect();
// Track when subscriptions are ready
let (ready_sender, ready_receiver) = channel::<()>();
// Track account notifications are received
let (account_sender, account_receiver) = channel::<RpcResponse<UiAccount>>();
// Track when status notifications are received
let (status_sender, status_receiver) = channel::<(String, RpcResponse<RpcSignatureResult>)>();
// Create the pub sub runtime
let rt = Runtime::new().unwrap();
let rpc_pubsub_url = test_validator.rpc_pubsub_url();
let signature_set_clone = signature_set.clone();
rt.spawn(async move {
let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap();
let client = connect.await.unwrap();
// Subscribe to signature notifications
for sig in signature_set_clone {
let status_sender = status_sender.clone();
let mut sig_sub = client
.signature_subscribe(
sig.clone(),
Some(RpcSignatureSubscribeConfig {
commitment: Some(CommitmentConfig::confirmed()),
..RpcSignatureSubscribeConfig::default()
}),
)
.unwrap_or_else(|err| panic!("sig sub err: {:#?}", err));
tokio::spawn(async move {
let response = sig_sub.next().await.unwrap();
status_sender
.send((sig.clone(), response.unwrap()))
.unwrap();
});
}
// Subscribe to account notifications
for pubkey in account_set {
let account_sender = account_sender.clone();
let mut client_sub = client
.account_subscribe(
pubkey,
Some(RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::confirmed()),
..RpcAccountInfoConfig::default()
}),
)
.unwrap_or_else(|err| panic!("acct sub err: {:#?}", err));
tokio::spawn(async move {
let response = client_sub.next().await.unwrap();
account_sender.send(response.unwrap()).unwrap();
});
}
// Signal ready after the next slot notification
let mut slot_sub = client
.slot_subscribe()
.unwrap_or_else(|err| panic!("sig sub err: {:#?}", err));
tokio::spawn(async move {
let _response = slot_sub.next().await.unwrap();
ready_sender.send(()).unwrap();
});
});
// Wait for signature subscriptions
ready_receiver.recv_timeout(Duration::from_secs(2)).unwrap();
let rpc_client = RpcClient::new(test_validator.rpc_url());
let mut mint_balance = rpc_client
.get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::processed())
.unwrap()
.value;
assert!(mint_balance >= transactions.len() as u64);
// Send all transactions to tpu socket for processing
transactions.iter().for_each(|tx| {
transactions_socket
.send(&bincode::serialize(&tx).unwrap())
.unwrap();
});
// Track mint balance to know when transactions have completed
let now = Instant::now();
let expected_mint_balance = mint_balance - transactions.len() as u64;
while mint_balance != expected_mint_balance && now.elapsed() < Duration::from_secs(5) {
mint_balance = rpc_client
.get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::processed())
.unwrap()
.value;
sleep(Duration::from_millis(100));
}
// Wait for all signature subscriptions
let deadline = Instant::now() + Duration::from_secs(7);
while !signature_set.is_empty() {
let timeout = deadline.saturating_duration_since(Instant::now());
match status_receiver.recv_timeout(timeout) {
Ok((sig, result)) => {
if let RpcSignatureResult::ProcessedSignature(result) = result.value {
assert!(result.err.is_none());
assert!(signature_set.remove(&sig));
} else {
panic!("Unexpected result");
}
}
Err(_err) => {
panic!(
"recv_timeout, {}/{} signatures remaining",
signature_set.len(),
transactions.len()
);
}
}
}
let deadline = Instant::now() + Duration::from_secs(5);
let mut account_notifications = transactions.len();
while account_notifications > 0 {
let timeout = deadline.saturating_duration_since(Instant::now());
match account_receiver.recv_timeout(timeout) {
Ok(result) => {
assert_eq!(result.value.lamports, 1);
account_notifications -= 1;
}
Err(_err) => {
panic!(
"recv_timeout, {}/{} accounts remaining",
account_notifications,
transactions.len()
);
}
}
}
}
#[test]
fn test_tpu_send_transaction() {
let mint_keypair = Keypair::new();
let mint_pubkey = mint_keypair.pubkey();
let test_validator =
TestValidator::with_no_fees(mint_pubkey, None, SocketAddrSpace::Unspecified);
let rpc_client = Arc::new(RpcClient::new_with_commitment(
test_validator.rpc_url(),
CommitmentConfig::processed(),
));
let tpu_client = TpuClient::new(
rpc_client.clone(),
&test_validator.rpc_pubsub_url(),
TpuClientConfig::default(),
)
.unwrap();
let recent_blockhash = rpc_client.get_latest_blockhash().unwrap();
let tx =
system_transaction::transfer(&mint_keypair, &Pubkey::new_unique(), 42, recent_blockhash);
assert!(tpu_client.send_transaction(&tx));
let timeout = Duration::from_secs(5);
let now = Instant::now();
let signatures = vec![tx.signatures[0]];
loop {
assert!(now.elapsed() < timeout);
let statuses = rpc_client.get_signature_statuses(&signatures).unwrap();
if statuses.value.get(0).is_some() {
return;
}
}
}
#[test]
fn deserialize_rpc_error() -> ClientResult<()> {
solana_logger::setup();
let alice = Keypair::new();
let validator = TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let rpc_client = RpcClient::new(validator.rpc_url());
let bob = Keypair::new();
let lamports = 50;
let blockhash = rpc_client.get_latest_blockhash()?;
let mut tx = system_transaction::transfer(&alice, &bob.pubkey(), lamports, blockhash);
// This will cause an error
tx.signatures.clear();
let err = rpc_client.send_transaction(&tx);
let err = err.unwrap_err();
match err.kind {
ClientErrorKind::RpcError(RpcError::RpcRequestError { .. }) => {
// This is what used to happen
panic!()
}
ClientErrorKind::RpcError(RpcError::RpcResponseError { .. }) => Ok(()),
_ => {
panic!()
}
}
}