* reimplement rpc pubsub with a broadcast queue * update tests for new pubsub implementation * fix: fix review suggestions * chore(rpc): add additional pubsub metrics * integrate max subscriptions check into SubscriptionTracker to reduce locking * separate subscription control from tracker * limit memory usage of items in pubsub broadcast queue, improve error handling * add more pubsub metrics * add final count metrics to pubsub * add metric for total number of subscriptions * fix small review suggestions * remove by_params from SubscriptionTracker and add node_progress_watchers map instead * add subscription tracker tests * add metrics for number of pubsub notifications as a counter * ignore clippy lint in TokenCounter * fix underflow in token counter * reduce queue capacity in pubsub tests * fix(rpc): fix test timeouts * fix race in account subscription test * Add RpcSubscriptions::new_for_tests Co-authored-by: Pavel Strakhov <p.strakhov@iconic.vc> Co-authored-by: Nikita Podoliako <n.podoliako@zubr.io> Co-authored-by: Tyera Eulberg <tyera@solana.com>
		
			
				
	
	
		
			147 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			147 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo};
 | |
| use solana_core::test_validator::TestValidator;
 | |
| use solana_rpc::{
 | |
|     optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
 | |
|     rpc_pubsub_service::{PubSubConfig, PubSubService},
 | |
|     rpc_subscriptions::RpcSubscriptions,
 | |
| };
 | |
| use solana_runtime::{
 | |
|     bank::Bank,
 | |
|     bank_forks::BankForks,
 | |
|     commitment::BlockCommitmentCache,
 | |
|     genesis_utils::{create_genesis_config, GenesisConfigInfo},
 | |
| };
 | |
| use solana_sdk::{
 | |
|     commitment_config::CommitmentConfig,
 | |
|     native_token::sol_to_lamports,
 | |
|     rpc_port,
 | |
|     signature::{Keypair, Signer},
 | |
|     system_transaction,
 | |
| };
 | |
| use solana_streamer::socket::SocketAddrSpace;
 | |
| use std::{
 | |
|     net::{IpAddr, SocketAddr},
 | |
|     sync::{
 | |
|         atomic::{AtomicBool, Ordering},
 | |
|         Arc, RwLock,
 | |
|     },
 | |
|     thread::sleep,
 | |
|     time::{Duration, Instant},
 | |
| };
 | |
| use systemstat::Ipv4Addr;
 | |
| 
 | |
| #[test]
 | |
| fn test_rpc_client() {
 | |
|     solana_logger::setup();
 | |
| 
 | |
|     let alice = Keypair::new();
 | |
|     let test_validator =
 | |
|         TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
 | |
| 
 | |
|     let bob_pubkey = solana_sdk::pubkey::new_rand();
 | |
| 
 | |
|     let client = RpcClient::new(test_validator.rpc_url());
 | |
| 
 | |
|     assert_eq!(
 | |
|         client.get_version().unwrap().solana_core,
 | |
|         solana_version::semver!()
 | |
|     );
 | |
| 
 | |
|     assert!(client.get_account(&bob_pubkey).is_err());
 | |
| 
 | |
|     assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 0);
 | |
| 
 | |
|     let original_alice_balance = client.get_balance(&alice.pubkey()).unwrap();
 | |
| 
 | |
|     let blockhash = client.get_latest_blockhash().unwrap();
 | |
| 
 | |
|     let tx = system_transaction::transfer(&alice, &bob_pubkey, sol_to_lamports(20.0), blockhash);
 | |
|     let signature = client.send_transaction(&tx).unwrap();
 | |
| 
 | |
|     let mut confirmed_tx = false;
 | |
| 
 | |
|     let now = Instant::now();
 | |
|     while now.elapsed().as_secs() <= 20 {
 | |
|         let response = client
 | |
|             .confirm_transaction_with_commitment(&signature, CommitmentConfig::default())
 | |
|             .unwrap();
 | |
| 
 | |
|         if response.value {
 | |
|             confirmed_tx = true;
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|         sleep(Duration::from_millis(500));
 | |
|     }
 | |
| 
 | |
|     assert!(confirmed_tx);
 | |
| 
 | |
|     assert_eq!(
 | |
|         client.get_balance(&bob_pubkey).unwrap(),
 | |
|         sol_to_lamports(20.0)
 | |
|     );
 | |
|     assert_eq!(
 | |
|         client.get_balance(&alice.pubkey()).unwrap(),
 | |
|         original_alice_balance - sol_to_lamports(20.0)
 | |
|     );
 | |
| }
 | |
| 
 | |
| #[test]
 | |
| fn test_slot_subscription() {
 | |
|     let pubsub_addr = SocketAddr::new(
 | |
|         IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
 | |
|         rpc_port::DEFAULT_RPC_PUBSUB_PORT,
 | |
|     );
 | |
|     let exit = Arc::new(AtomicBool::new(false));
 | |
|     let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
 | |
|     let bank = Bank::new_for_tests(&genesis_config);
 | |
|     let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
 | |
|     let optimistically_confirmed_bank =
 | |
|         OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
 | |
|     let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
 | |
|         &exit,
 | |
|         bank_forks,
 | |
|         Arc::new(RwLock::new(BlockCommitmentCache::default())),
 | |
|         optimistically_confirmed_bank,
 | |
|     ));
 | |
|     let (trigger, pubsub_service) =
 | |
|         PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
 | |
|     std::thread::sleep(Duration::from_millis(400));
 | |
| 
 | |
|     let (mut client, receiver) =
 | |
|         PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
 | |
| 
 | |
|     let mut errors: Vec<(SlotInfo, SlotInfo)> = Vec::new();
 | |
| 
 | |
|     for i in 0..3 {
 | |
|         subscriptions.notify_slot(i + 1, i, i);
 | |
| 
 | |
|         let maybe_actual = receiver.recv_timeout(Duration::from_millis(400));
 | |
| 
 | |
|         match maybe_actual {
 | |
|             Ok(actual) => {
 | |
|                 let expected = SlotInfo {
 | |
|                     slot: i + 1,
 | |
|                     parent: i,
 | |
|                     root: i,
 | |
|                 };
 | |
| 
 | |
|                 if actual != expected {
 | |
|                     errors.push((actual, expected));
 | |
|                 }
 | |
|             }
 | |
|             Err(_err) => {
 | |
|                 eprintln!("unexpected websocket receive timeout");
 | |
|                 break;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     exit.store(true, Ordering::Relaxed);
 | |
|     trigger.cancel();
 | |
|     client.shutdown().unwrap();
 | |
|     pubsub_service.close().unwrap();
 | |
| 
 | |
|     assert_eq!(errors, [].to_vec());
 | |
| }
 |