diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index 725ee901f0..2d6a16bd77 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -1,10 +1,12 @@ use { crate::{ rpc_config::{ - RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter, + RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, + RpcTransactionLogsConfig, RpcTransactionLogsFilter, }, rpc_response::{ - Response as RpcResponse, RpcLogsResponse, RpcSignatureResult, SlotInfo, SlotUpdate, + Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult, + SlotInfo, SlotUpdate, }, }, log::*, @@ -14,12 +16,13 @@ use { value::Value::{Number, Object}, Map, Value, }, - solana_sdk::signature::Signature, + solana_account_decoder::UiAccount, + solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}, std::{ marker::PhantomData, sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{channel, Receiver}, + mpsc::{channel, Receiver, Sender}, Arc, RwLock, }, thread::{sleep, JoinHandle}, @@ -153,16 +156,37 @@ where } } +pub type PubsubLogsClientSubscription = PubsubClientSubscription>; pub type LogsSubscription = ( - PubsubClientSubscription>, + PubsubLogsClientSubscription, Receiver>, ); -pub type SlotsSubscription = (PubsubClientSubscription, Receiver); + +pub type PubsubSlotClientSubscription = PubsubClientSubscription; +pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver); + +pub type PubsubSignatureClientSubscription = + PubsubClientSubscription>; pub type SignatureSubscription = ( - PubsubClientSubscription>, + PubsubSignatureClientSubscription, Receiver>, ); +pub type PubsubProgramClientSubscription = PubsubClientSubscription>; +pub type ProgramSubscription = ( + PubsubProgramClientSubscription, + Receiver>, +); + +pub type PubsubAccountClientSubscription = PubsubClientSubscription>; +pub type AccountSubscription = ( + PubsubAccountClientSubscription, + Receiver>, +); + +pub type PubsubRootClientSubscription = PubsubClientSubscription; +pub type RootSubscription = (PubsubRootClientSubscription, Receiver); + pub struct PubsubClient {} fn connect_with_retry(url: Url) -> Result, tungstenite::Error> { @@ -188,6 +212,47 @@ fn connect_with_retry(url: Url) -> Result, tungstenite::Er } impl PubsubClient { + pub fn account_subscribe( + url: &str, + pubkey: &Pubkey, + config: Option, + ) -> Result { + let url = Url::parse(url)?; + let socket = connect_with_retry(url)?; + let (sender, receiver) = channel(); + + let socket = Arc::new(RwLock::new(socket)); + let socket_clone = socket.clone(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + let body = json!({ + "jsonrpc":"2.0", + "id":1, + "method":"accountSubscribe", + "params":[ + pubkey.to_string(), + config + ] + }) + .to_string(); + let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?; + + let t_cleanup = std::thread::spawn(move || { + Self::cleanup_with_sender(exit_clone, &socket_clone, sender) + }); + + let result = PubsubClientSubscription { + message_type: PhantomData, + operation: "account", + socket, + subscription_id, + t_cleanup: Some(t_cleanup), + exit, + }; + + Ok((result, receiver)) + } + pub fn logs_subscribe( url: &str, filter: RpcTransactionLogsFilter, @@ -201,38 +266,18 @@ impl PubsubClient { let socket_clone = socket.clone(); let exit = Arc::new(AtomicBool::new(false)); let exit_clone = exit.clone(); + let body = json!({ + "jsonrpc":"2.0", + "id":1, + "method":"logsSubscribe", + "params":[filter, config] + }) + .to_string(); - let subscription_id = - PubsubClientSubscription::>::send_subscribe( - &socket_clone, - json!({ - "jsonrpc":"2.0","id":1,"method":"logsSubscribe","params":[filter, config] - }) - .to_string(), - )?; + let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?; let t_cleanup = std::thread::spawn(move || { - loop { - if exit_clone.load(Ordering::Relaxed) { - break; - } - - match PubsubClientSubscription::read_message(&socket_clone) { - Ok(message) => match sender.send(message) { - Ok(_) => (), - Err(err) => { - info!("receive error: {:?}", err); - break; - } - }, - Err(err) => { - info!("receive error: {:?}", err); - break; - } - } - } - - info!("websocket - exited receive loop"); + Self::cleanup_with_sender(exit_clone, &socket_clone, sender) }); let result = PubsubClientSubscription { @@ -247,49 +292,71 @@ impl PubsubClient { Ok((result, receiver)) } - pub fn slot_subscribe(url: &str) -> Result { + pub fn program_subscribe( + url: &str, + pubkey: &Pubkey, + config: Option, + ) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; - let (sender, receiver) = channel::(); + let (sender, receiver) = channel(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); let exit = Arc::new(AtomicBool::new(false)); let exit_clone = exit.clone(); - let subscription_id = PubsubClientSubscription::::send_subscribe( - &socket_clone, - json!({ - "jsonrpc":"2.0","id":1,"method":"slotSubscribe","params":[] - }) - .to_string(), - )?; + let body = json!({ + "jsonrpc":"2.0", + "id":1, + "method":"programSubscribe", + "params":[ + pubkey.to_string(), + config + ] + }) + .to_string(); + let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?; let t_cleanup = std::thread::spawn(move || { - loop { - if exit_clone.load(Ordering::Relaxed) { - break; - } - match PubsubClientSubscription::read_message(&socket_clone) { - Ok(message) => match sender.send(message) { - Ok(_) => (), - Err(err) => { - info!("receive error: {:?}", err); - break; - } - }, - Err(err) => { - info!("receive error: {:?}", err); - break; - } - } - } - - info!("websocket - exited receive loop"); + Self::cleanup_with_sender(exit_clone, &socket_clone, sender) }); let result = PubsubClientSubscription { message_type: PhantomData, - operation: "slot", + operation: "program", + socket, + subscription_id, + t_cleanup: Some(t_cleanup), + exit, + }; + + Ok((result, receiver)) + } + + pub fn root_subscribe(url: &str) -> Result { + let url = Url::parse(url)?; + let socket = connect_with_retry(url)?; + let (sender, receiver) = channel(); + + let socket = Arc::new(RwLock::new(socket)); + let socket_clone = socket.clone(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + let body = json!({ + "jsonrpc":"2.0", + "id":1, + "method":"rootSubscribe", + }) + .to_string(); + let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?; + + let t_cleanup = std::thread::spawn(move || { + Self::cleanup_with_sender(exit_clone, &socket_clone, sender) + }); + + let result = PubsubClientSubscription { + message_type: PhantomData, + operation: "root", socket, subscription_id, t_cleanup: Some(t_cleanup), @@ -323,35 +390,10 @@ impl PubsubClient { }) .to_string(); let subscription_id = - PubsubClientSubscription::>::send_subscribe( - &socket_clone, - body, - )?; + PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?; let t_cleanup = std::thread::spawn(move || { - loop { - if exit_clone.load(Ordering::Relaxed) { - break; - } - - let message: Result, PubsubClientError> = - PubsubClientSubscription::read_message(&socket_clone); - - if let Ok(msg) = message { - match sender.send(msg.clone()) { - Ok(_) => (), - Err(err) => { - info!("receive error: {:?}", err); - break; - } - } - } else { - info!("receive error: {:?}", message); - break; - } - } - - info!("websocket - exited receive loop"); + Self::cleanup_with_sender(exit_clone, &socket_clone, sender) }); let result = PubsubClientSubscription { @@ -366,6 +408,40 @@ impl PubsubClient { Ok((result, receiver)) } + pub fn slot_subscribe(url: &str) -> Result { + let url = Url::parse(url)?; + let socket = connect_with_retry(url)?; + let (sender, receiver) = channel::(); + + let socket = Arc::new(RwLock::new(socket)); + let socket_clone = socket.clone(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + let body = json!({ + "jsonrpc":"2.0", + "id":1, + "method":"slotSubscribe", + "params":[] + }) + .to_string(); + let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?; + + let t_cleanup = std::thread::spawn(move || { + Self::cleanup_with_sender(exit_clone, &socket_clone, sender) + }); + + let result = PubsubClientSubscription { + message_type: PhantomData, + operation: "slot", + socket, + subscription_id, + t_cleanup: Some(t_cleanup), + exit, + }; + + Ok((result, receiver)) + } + pub fn slot_updates_subscribe( url: &str, handler: impl Fn(SlotUpdate) + Send + 'static, @@ -374,35 +450,21 @@ impl PubsubClient { let socket = connect_with_retry(url)?; let socket = Arc::new(RwLock::new(socket)); + let socket_clone = socket.clone(); let exit = Arc::new(AtomicBool::new(false)); let exit_clone = exit.clone(); - let subscription_id = PubsubClientSubscription::::send_subscribe( - &socket, - json!({ - "jsonrpc":"2.0","id":1,"method":"slotsUpdatesSubscribe","params":[] - }) - .to_string(), - )?; + let body = json!({ + "jsonrpc":"2.0", + "id":1, + "method":"slotsUpdatesSubscribe", + "params":[] + }) + .to_string(); + let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?; - let t_cleanup = { - let socket = socket.clone(); - std::thread::spawn(move || { - loop { - if exit_clone.load(Ordering::Relaxed) { - break; - } - match PubsubClientSubscription::read_message(&socket) { - Ok(message) => handler(message), - Err(err) => { - info!("receive error: {:?}", err); - break; - } - } - } - - info!("websocket - exited receive loop"); - }) - }; + let t_cleanup = std::thread::spawn(move || { + Self::cleanup_with_handler(exit_clone, &socket_clone, handler) + }); Ok(PubsubClientSubscription { message_type: PhantomData, @@ -413,6 +475,47 @@ impl PubsubClient { exit, }) } + + fn cleanup_with_sender( + exit: Arc, + socket: &Arc>>, + sender: Sender, + ) where + T: DeserializeOwned + Send + 'static, + { + let handler = move |message| match sender.send(message) { + Ok(_) => (), + Err(err) => { + info!("receive error: {:?}", err); + } + }; + Self::cleanup_with_handler(exit, socket, handler); + } + + fn cleanup_with_handler( + exit: Arc, + socket: &Arc>>, + handler: F, + ) where + T: DeserializeOwned, + F: Fn(T) + Send + 'static, + { + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + match PubsubClientSubscription::read_message(socket) { + Ok(message) => handler(message), + Err(err) => { + info!("receive error: {:?}", err); + break; + } + } + } + + info!("websocket - exited receive loop"); + } } #[cfg(test)] diff --git a/client/src/rpc_response.rs b/client/src/rpc_response.rs index ddbc41ef20..55ea2f1db9 100644 --- a/client/src/rpc_response.rs +++ b/client/src/rpc_response.rs @@ -279,6 +279,14 @@ pub struct RpcIdentity { pub identity: String, } +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct RpcVote { + pub slots: Vec, + pub hash: String, + pub timestamp: Option, +} + #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct RpcVoteAccountStatus { diff --git a/core/tests/client.rs b/core/tests/client.rs index 99a8c87c83..325124650f 100644 --- a/core/tests/client.rs +++ b/core/tests/client.rs @@ -1,4 +1,11 @@ -use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo}; +use serde_json::{json, Value}; +use serial_test::serial; +use solana_client::{ + pubsub_client::PubsubClient, + rpc_client::RpcClient, + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + rpc_response::SlotInfo, +}; use solana_core::test_validator::TestValidator; use solana_rpc::{ optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, @@ -8,18 +15,21 @@ use solana_rpc::{ use solana_runtime::{ bank::Bank, bank_forks::BankForks, - commitment::BlockCommitmentCache, + commitment::{BlockCommitmentCache, CommitmentSlots}, genesis_utils::{create_genesis_config, GenesisConfigInfo}, }; use solana_sdk::{ + clock::Slot, commitment_config::CommitmentConfig, native_token::sol_to_lamports, + pubkey::Pubkey, rpc_port, signature::{Keypair, Signer}, - system_transaction, + system_program, system_transaction, }; use solana_streamer::socket::SocketAddrSpace; use std::{ + collections::HashSet, net::{IpAddr, SocketAddr}, sync::{ atomic::{AtomicBool, Ordering}, @@ -87,6 +97,246 @@ fn test_rpc_client() { } #[test] +#[serial] +fn test_account_subscription() { + let pubsub_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + rpc_port::DEFAULT_RPC_PUBSUB_PORT, + ); + let exit = Arc::new(AtomicBool::new(false)); + + let GenesisConfigInfo { + genesis_config, + mint_keypair: alice, + .. + } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); + let bob = Keypair::new(); + + let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( + &exit, + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::default())), + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + )); + let (trigger, pubsub_service) = + PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); + std::thread::sleep(Duration::from_millis(400)); + let config = Some(RpcAccountInfoConfig { + commitment: Some(CommitmentConfig::finalized()), + encoding: None, + data_slice: None, + }); + let (mut client, receiver) = PubsubClient::account_subscribe( + &format!("ws://0.0.0.0:{}/", pubsub_addr.port()), + &bob.pubkey(), + config, + ) + .unwrap(); + + // Transfer 100 lamports from alice to bob + let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash); + bank_forks + .write() + .unwrap() + .get(1) + .unwrap() + .process_transaction(&tx) + .unwrap(); + let commitment_slots = CommitmentSlots { + slot: 1, + ..CommitmentSlots::default() + }; + subscriptions.notify_subscribers(commitment_slots); + let commitment_slots = CommitmentSlots { + slot: 2, + root: 1, + highest_confirmed_slot: 1, + highest_confirmed_root: 1, + }; + subscriptions.notify_subscribers(commitment_slots); + + let expected = json!({ + "context": { "slot": 1 }, + "value": { + "owner": system_program::id().to_string(), + "lamports": 100, + "data": "", + "executable": false, + "rentEpoch": 0, + }, + }); + + // Read notification + let mut errors: Vec<(Value, Value)> = Vec::new(); + let response = receiver.recv(); + match response { + Ok(response) => { + let actual = serde_json::to_value(response).unwrap(); + if expected != actual { + errors.push((expected, actual)); + } + } + Err(_) => eprintln!("unexpected websocket receive timeout"), + } + + exit.store(true, Ordering::Relaxed); + trigger.cancel(); + client.shutdown().unwrap(); + pubsub_service.close().unwrap(); + assert_eq!(errors, [].to_vec()); +} + +#[test] +#[serial] +fn test_program_subscription() { + let pubsub_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + rpc_port::DEFAULT_RPC_PUBSUB_PORT, + ); + let exit = Arc::new(AtomicBool::new(false)); + + let GenesisConfigInfo { + genesis_config, + mint_keypair: alice, + .. + } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); + let bob = Keypair::new(); + + let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( + &exit, + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::default())), + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + )); + let (trigger, pubsub_service) = + PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); + std::thread::sleep(Duration::from_millis(400)); + let config = Some(RpcProgramAccountsConfig { + ..RpcProgramAccountsConfig::default() + }); + + let program_id = Pubkey::new_unique(); + let (mut client, receiver) = PubsubClient::program_subscribe( + &format!("ws://0.0.0.0:{}/", pubsub_addr.port()), + &program_id, + config, + ) + .unwrap(); + + // Create new program account at bob's address + let tx = system_transaction::create_account(&alice, &bob, blockhash, 100, 0, &program_id); + bank_forks + .write() + .unwrap() + .get(1) + .unwrap() + .process_transaction(&tx) + .unwrap(); + let commitment_slots = CommitmentSlots { + slot: 1, + ..CommitmentSlots::default() + }; + subscriptions.notify_subscribers(commitment_slots); + let commitment_slots = CommitmentSlots { + slot: 2, + root: 1, + highest_confirmed_slot: 1, + highest_confirmed_root: 1, + }; + subscriptions.notify_subscribers(commitment_slots); + + // Poll notifications generated by the transfer + let mut notifications = Vec::new(); + let mut pubkeys = HashSet::new(); + loop { + let response = receiver.recv_timeout(Duration::from_millis(100)); + match response { + Ok(response) => { + notifications.push(response.clone()); + pubkeys.insert(response.value.pubkey); + } + Err(_) => { + break; + } + } + } + + // Shutdown + exit.store(true, Ordering::Relaxed); + trigger.cancel(); + client.shutdown().unwrap(); + pubsub_service.close().unwrap(); + + assert_eq!(notifications.len(), 1); + assert!(pubkeys.contains(&bob.pubkey().to_string())); +} + +#[test] +#[serial] +fn test_root_subscription() { + let pubsub_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + rpc_port::DEFAULT_RPC_PUBSUB_PORT, + ); + let exit = Arc::new(AtomicBool::new(false)); + + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); + + let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( + &exit, + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::default())), + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + )); + let (trigger, pubsub_service) = + PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); + std::thread::sleep(Duration::from_millis(400)); + let (mut client, receiver) = + PubsubClient::root_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap(); + + let roots = vec![1, 2, 3]; + subscriptions.notify_roots(roots.clone()); + + // Read notifications + let mut errors: Vec<(Slot, Slot)> = Vec::new(); + for expected in roots { + let response = receiver.recv(); + match response { + Ok(response) => { + if expected != response { + errors.push((expected, response)); + } + } + Err(_) => eprintln!("unexpected websocket receive timeout"), + } + } + + exit.store(true, Ordering::Relaxed); + trigger.cancel(); + client.shutdown().unwrap(); + pubsub_service.close().unwrap(); + assert_eq!(errors, [].to_vec()); +} + +#[test] +#[serial] fn test_slot_subscription() { let pubsub_addr = SocketAddr::new( IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), diff --git a/rpc/src/rpc_pubsub.rs b/rpc/src/rpc_pubsub.rs index bdb48f8e04..c8002e1ee7 100644 --- a/rpc/src/rpc_pubsub.rs +++ b/rpc/src/rpc_pubsub.rs @@ -8,7 +8,6 @@ use { ProgramSubscriptionParams, SignatureSubscriptionParams, SubscriptionControl, SubscriptionId, SubscriptionParams, SubscriptionToken, }, - rpc_subscriptions::RpcVote, }, dashmap::DashMap, jsonrpc_core::{Error, ErrorCode, Result}, @@ -21,7 +20,7 @@ use { RpcTransactionLogsConfig, RpcTransactionLogsFilter, }, rpc_response::{ - Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult, + Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate, }, }, diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index cb9e391c34..3191e5c951 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -18,7 +18,7 @@ use { rpc_filter::RpcFilterType, rpc_response::{ ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcKeyedAccount, - RpcLogsResponse, RpcResponseContext, RpcSignatureResult, SlotInfo, SlotUpdate, + RpcLogsResponse, RpcResponseContext, RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate, }, }, solana_measure::measure::Measure, @@ -29,7 +29,7 @@ use { }, solana_sdk::{ account::{AccountSharedData, ReadableAccount}, - clock::{Slot, UnixTimestamp}, + clock::Slot, pubkey::Pubkey, signature::Signature, timing::timestamp, @@ -69,15 +69,6 @@ fn get_transaction_logs( } logs } - -// A more human-friendly version of Vote, with the bank state signature base58 encoded. -#[derive(Serialize, Deserialize, Debug)] -pub struct RpcVote { - pub slots: Vec, - pub hash: String, - pub timestamp: Option, -} - pub enum NotificationEntry { Slot(SlotInfo), SlotUpdate(SlotUpdate),