Add missing websocket methods to rust RPC PubSub client (backport #21065) (#21073)

* Add missing websocket methods to rust RPC PubSub client (#21065)

- Added accountSubscribe,  programSubscribe, slotSubscribe and rootSubscribe to rust RpcClient
 - Removed duplication on cleanup threads
 - Moved RPCVote from rpc/ to client/rpc_response

(cherry picked from commit a0f9e0e8ee)

# Conflicts:
#	Cargo.lock
#	client-test/Cargo.toml
#	core/tests/client.rs

* Fix conflicts

* Make test result not depend on TestValidator setup

Co-authored-by: Manuel Gil <manugildev@gmail.com>
Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
mergify[bot]
2021-10-29 21:46:59 +00:00
committed by GitHub
parent a595e06b48
commit 506d39ea82
5 changed files with 486 additions and 135 deletions

View File

@ -1,10 +1,12 @@
use {
crate::{
rpc_config::{
RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter,
RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig,
RpcTransactionLogsConfig, RpcTransactionLogsFilter,
},
rpc_response::{
Response as RpcResponse, RpcLogsResponse, RpcSignatureResult, SlotInfo, SlotUpdate,
Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult,
SlotInfo, SlotUpdate,
},
},
log::*,
@ -14,12 +16,13 @@ use {
value::Value::{Number, Object},
Map, Value,
},
solana_sdk::signature::Signature,
solana_account_decoder::UiAccount,
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
std::{
marker::PhantomData,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver},
mpsc::{channel, Receiver, Sender},
Arc, RwLock,
},
thread::{sleep, JoinHandle},
@ -153,16 +156,37 @@ where
}
}
pub type PubsubLogsClientSubscription = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
pub type LogsSubscription = (
PubsubClientSubscription<RpcResponse<RpcLogsResponse>>,
PubsubLogsClientSubscription,
Receiver<RpcResponse<RpcLogsResponse>>,
);
pub type SlotsSubscription = (PubsubClientSubscription<SlotInfo>, Receiver<SlotInfo>);
pub type PubsubSlotClientSubscription = PubsubClientSubscription<SlotInfo>;
pub type SlotsSubscription = (PubsubSlotClientSubscription, Receiver<SlotInfo>);
pub type PubsubSignatureClientSubscription =
PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
pub type SignatureSubscription = (
PubsubClientSubscription<RpcResponse<RpcSignatureResult>>,
PubsubSignatureClientSubscription,
Receiver<RpcResponse<RpcSignatureResult>>,
);
pub type PubsubProgramClientSubscription = PubsubClientSubscription<RpcResponse<RpcKeyedAccount>>;
pub type ProgramSubscription = (
PubsubProgramClientSubscription,
Receiver<RpcResponse<RpcKeyedAccount>>,
);
pub type PubsubAccountClientSubscription = PubsubClientSubscription<RpcResponse<UiAccount>>;
pub type AccountSubscription = (
PubsubAccountClientSubscription,
Receiver<RpcResponse<UiAccount>>,
);
pub type PubsubRootClientSubscription = PubsubClientSubscription<Slot>;
pub type RootSubscription = (PubsubRootClientSubscription, Receiver<Slot>);
pub struct PubsubClient {}
fn connect_with_retry(url: Url) -> Result<WebSocket<AutoStream>, tungstenite::Error> {
@ -188,6 +212,47 @@ fn connect_with_retry(url: Url) -> Result<WebSocket<AutoStream>, tungstenite::Er
}
impl PubsubClient {
pub fn account_subscribe(
url: &str,
pubkey: &Pubkey,
config: Option<RpcAccountInfoConfig>,
) -> Result<AccountSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let body = json!({
"jsonrpc":"2.0",
"id":1,
"method":"accountSubscribe",
"params":[
pubkey.to_string(),
config
]
})
.to_string();
let subscription_id = PubsubAccountClientSubscription::send_subscribe(&socket_clone, body)?;
let t_cleanup = std::thread::spawn(move || {
Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
});
let result = PubsubClientSubscription {
message_type: PhantomData,
operation: "account",
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
};
Ok((result, receiver))
}
pub fn logs_subscribe(
url: &str,
filter: RpcTransactionLogsFilter,
@ -201,38 +266,18 @@ impl PubsubClient {
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let subscription_id =
PubsubClientSubscription::<RpcResponse<RpcLogsResponse>>::send_subscribe(
&socket_clone,
json!({
"jsonrpc":"2.0","id":1,"method":"logsSubscribe","params":[filter, config]
let body = json!({
"jsonrpc":"2.0",
"id":1,
"method":"logsSubscribe",
"params":[filter, config]
})
.to_string(),
)?;
.to_string();
let subscription_id = PubsubLogsClientSubscription::send_subscribe(&socket_clone, body)?;
let t_cleanup = std::thread::spawn(move || {
loop {
if exit_clone.load(Ordering::Relaxed) {
break;
}
match PubsubClientSubscription::read_message(&socket_clone) {
Ok(message) => match sender.send(message) {
Ok(_) => (),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
},
Err(err) => {
info!("receive error: {:?}", err);
break;
}
}
}
info!("websocket - exited receive loop");
Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
});
let result = PubsubClientSubscription {
@ -247,49 +292,71 @@ impl PubsubClient {
Ok((result, receiver))
}
pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
pub fn program_subscribe(
url: &str,
pubkey: &Pubkey,
config: Option<RpcProgramAccountsConfig>,
) -> Result<ProgramSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel::<SlotInfo>();
let (sender, receiver) = channel();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let subscription_id = PubsubClientSubscription::<SlotInfo>::send_subscribe(
&socket_clone,
json!({
"jsonrpc":"2.0","id":1,"method":"slotSubscribe","params":[]
let body = json!({
"jsonrpc":"2.0",
"id":1,
"method":"programSubscribe",
"params":[
pubkey.to_string(),
config
]
})
.to_string(),
)?;
.to_string();
let subscription_id = PubsubProgramClientSubscription::send_subscribe(&socket_clone, body)?;
let t_cleanup = std::thread::spawn(move || {
loop {
if exit_clone.load(Ordering::Relaxed) {
break;
}
match PubsubClientSubscription::read_message(&socket_clone) {
Ok(message) => match sender.send(message) {
Ok(_) => (),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
},
Err(err) => {
info!("receive error: {:?}", err);
break;
}
}
}
info!("websocket - exited receive loop");
Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
});
let result = PubsubClientSubscription {
message_type: PhantomData,
operation: "slot",
operation: "program",
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
};
Ok((result, receiver))
}
pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let body = json!({
"jsonrpc":"2.0",
"id":1,
"method":"rootSubscribe",
})
.to_string();
let subscription_id = PubsubRootClientSubscription::send_subscribe(&socket_clone, body)?;
let t_cleanup = std::thread::spawn(move || {
Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
});
let result = PubsubClientSubscription {
message_type: PhantomData,
operation: "root",
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
@ -323,35 +390,10 @@ impl PubsubClient {
})
.to_string();
let subscription_id =
PubsubClientSubscription::<RpcResponse<RpcSignatureResult>>::send_subscribe(
&socket_clone,
body,
)?;
PubsubSignatureClientSubscription::send_subscribe(&socket_clone, body)?;
let t_cleanup = std::thread::spawn(move || {
loop {
if exit_clone.load(Ordering::Relaxed) {
break;
}
let message: Result<RpcResponse<RpcSignatureResult>, PubsubClientError> =
PubsubClientSubscription::read_message(&socket_clone);
if let Ok(msg) = message {
match sender.send(msg.clone()) {
Ok(_) => (),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
}
} else {
info!("receive error: {:?}", message);
break;
}
}
info!("websocket - exited receive loop");
Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
});
let result = PubsubClientSubscription {
@ -366,6 +408,40 @@ impl PubsubClient {
Ok((result, receiver))
}
pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel::<SlotInfo>();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let body = json!({
"jsonrpc":"2.0",
"id":1,
"method":"slotSubscribe",
"params":[]
})
.to_string();
let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket_clone, body)?;
let t_cleanup = std::thread::spawn(move || {
Self::cleanup_with_sender(exit_clone, &socket_clone, sender)
});
let result = PubsubClientSubscription {
message_type: PhantomData,
operation: "slot",
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
};
Ok((result, receiver))
}
pub fn slot_updates_subscribe(
url: &str,
handler: impl Fn(SlotUpdate) + Send + 'static,
@ -374,35 +450,21 @@ impl PubsubClient {
let socket = connect_with_retry(url)?;
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let subscription_id = PubsubClientSubscription::<SlotUpdate>::send_subscribe(
&socket,
json!({
"jsonrpc":"2.0","id":1,"method":"slotsUpdatesSubscribe","params":[]
let body = json!({
"jsonrpc":"2.0",
"id":1,
"method":"slotsUpdatesSubscribe",
"params":[]
})
.to_string(),
)?;
.to_string();
let subscription_id = PubsubSlotClientSubscription::send_subscribe(&socket, body)?;
let t_cleanup = {
let socket = socket.clone();
std::thread::spawn(move || {
loop {
if exit_clone.load(Ordering::Relaxed) {
break;
}
match PubsubClientSubscription::read_message(&socket) {
Ok(message) => handler(message),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
}
}
info!("websocket - exited receive loop");
})
};
let t_cleanup = std::thread::spawn(move || {
Self::cleanup_with_handler(exit_clone, &socket_clone, handler)
});
Ok(PubsubClientSubscription {
message_type: PhantomData,
@ -413,6 +475,47 @@ impl PubsubClient {
exit,
})
}
fn cleanup_with_sender<T>(
exit: Arc<AtomicBool>,
socket: &Arc<RwLock<WebSocket<AutoStream>>>,
sender: Sender<T>,
) where
T: DeserializeOwned + Send + 'static,
{
let handler = move |message| match sender.send(message) {
Ok(_) => (),
Err(err) => {
info!("receive error: {:?}", err);
}
};
Self::cleanup_with_handler(exit, socket, handler);
}
fn cleanup_with_handler<T, F>(
exit: Arc<AtomicBool>,
socket: &Arc<RwLock<WebSocket<AutoStream>>>,
handler: F,
) where
T: DeserializeOwned,
F: Fn(T) + Send + 'static,
{
loop {
if exit.load(Ordering::Relaxed) {
break;
}
match PubsubClientSubscription::read_message(socket) {
Ok(message) => handler(message),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
}
}
info!("websocket - exited receive loop");
}
}
#[cfg(test)]

View File

@ -279,6 +279,14 @@ pub struct RpcIdentity {
pub identity: String,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RpcVote {
pub slots: Vec<Slot>,
pub hash: String,
pub timestamp: Option<UnixTimestamp>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RpcVoteAccountStatus {

View File

@ -1,4 +1,11 @@
use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo};
use serde_json::{json, Value};
use serial_test::serial;
use solana_client::{
pubsub_client::PubsubClient,
rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::SlotInfo,
};
use solana_core::test_validator::TestValidator;
use solana_rpc::{
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
@ -8,18 +15,21 @@ use solana_rpc::{
use solana_runtime::{
bank::Bank,
bank_forks::BankForks,
commitment::BlockCommitmentCache,
commitment::{BlockCommitmentCache, CommitmentSlots},
genesis_utils::{create_genesis_config, GenesisConfigInfo},
};
use solana_sdk::{
clock::Slot,
commitment_config::CommitmentConfig,
native_token::sol_to_lamports,
pubkey::Pubkey,
rpc_port,
signature::{Keypair, Signer},
system_transaction,
system_program, system_transaction,
};
use solana_streamer::socket::SocketAddrSpace;
use std::{
collections::HashSet,
net::{IpAddr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
@ -87,6 +97,246 @@ fn test_rpc_client() {
}
#[test]
#[serial]
fn test_account_subscription() {
let pubsub_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo {
genesis_config,
mint_keypair: alice,
..
} = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bob = Keypair::new();
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
));
let (trigger, pubsub_service) =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
std::thread::sleep(Duration::from_millis(400));
let config = Some(RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::finalized()),
encoding: None,
data_slice: None,
});
let (mut client, receiver) = PubsubClient::account_subscribe(
&format!("ws://0.0.0.0:{}/", pubsub_addr.port()),
&bob.pubkey(),
config,
)
.unwrap();
// Transfer 100 lamports from alice to bob
let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash);
bank_forks
.write()
.unwrap()
.get(1)
.unwrap()
.process_transaction(&tx)
.unwrap();
let commitment_slots = CommitmentSlots {
slot: 1,
..CommitmentSlots::default()
};
subscriptions.notify_subscribers(commitment_slots);
let commitment_slots = CommitmentSlots {
slot: 2,
root: 1,
highest_confirmed_slot: 1,
highest_confirmed_root: 1,
};
subscriptions.notify_subscribers(commitment_slots);
let expected = json!({
"context": { "slot": 1 },
"value": {
"owner": system_program::id().to_string(),
"lamports": 100,
"data": "",
"executable": false,
"rentEpoch": 0,
},
});
// Read notification
let mut errors: Vec<(Value, Value)> = Vec::new();
let response = receiver.recv();
match response {
Ok(response) => {
let actual = serde_json::to_value(response).unwrap();
if expected != actual {
errors.push((expected, actual));
}
}
Err(_) => eprintln!("unexpected websocket receive timeout"),
}
exit.store(true, Ordering::Relaxed);
trigger.cancel();
client.shutdown().unwrap();
pubsub_service.close().unwrap();
assert_eq!(errors, [].to_vec());
}
#[test]
#[serial]
fn test_program_subscription() {
let pubsub_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo {
genesis_config,
mint_keypair: alice,
..
} = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bob = Keypair::new();
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
));
let (trigger, pubsub_service) =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
std::thread::sleep(Duration::from_millis(400));
let config = Some(RpcProgramAccountsConfig {
..RpcProgramAccountsConfig::default()
});
let program_id = Pubkey::new_unique();
let (mut client, receiver) = PubsubClient::program_subscribe(
&format!("ws://0.0.0.0:{}/", pubsub_addr.port()),
&program_id,
config,
)
.unwrap();
// Create new program account at bob's address
let tx = system_transaction::create_account(&alice, &bob, blockhash, 100, 0, &program_id);
bank_forks
.write()
.unwrap()
.get(1)
.unwrap()
.process_transaction(&tx)
.unwrap();
let commitment_slots = CommitmentSlots {
slot: 1,
..CommitmentSlots::default()
};
subscriptions.notify_subscribers(commitment_slots);
let commitment_slots = CommitmentSlots {
slot: 2,
root: 1,
highest_confirmed_slot: 1,
highest_confirmed_root: 1,
};
subscriptions.notify_subscribers(commitment_slots);
// Poll notifications generated by the transfer
let mut notifications = Vec::new();
let mut pubkeys = HashSet::new();
loop {
let response = receiver.recv_timeout(Duration::from_millis(100));
match response {
Ok(response) => {
notifications.push(response.clone());
pubkeys.insert(response.value.pubkey);
}
Err(_) => {
break;
}
}
}
// Shutdown
exit.store(true, Ordering::Relaxed);
trigger.cancel();
client.shutdown().unwrap();
pubsub_service.close().unwrap();
assert_eq!(notifications.len(), 1);
assert!(pubkeys.contains(&bob.pubkey().to_string()));
}
#[test]
#[serial]
fn test_root_subscription() {
let pubsub_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
));
let (trigger, pubsub_service) =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
std::thread::sleep(Duration::from_millis(400));
let (mut client, receiver) =
PubsubClient::root_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
let roots = vec![1, 2, 3];
subscriptions.notify_roots(roots.clone());
// Read notifications
let mut errors: Vec<(Slot, Slot)> = Vec::new();
for expected in roots {
let response = receiver.recv();
match response {
Ok(response) => {
if expected != response {
errors.push((expected, response));
}
}
Err(_) => eprintln!("unexpected websocket receive timeout"),
}
}
exit.store(true, Ordering::Relaxed);
trigger.cancel();
client.shutdown().unwrap();
pubsub_service.close().unwrap();
assert_eq!(errors, [].to_vec());
}
#[test]
#[serial]
fn test_slot_subscription() {
let pubsub_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),

View File

@ -8,7 +8,6 @@ use {
ProgramSubscriptionParams, SignatureSubscriptionParams, SubscriptionControl,
SubscriptionId, SubscriptionParams, SubscriptionToken,
},
rpc_subscriptions::RpcVote,
},
dashmap::DashMap,
jsonrpc_core::{Error, ErrorCode, Result},
@ -21,7 +20,7 @@ use {
RpcTransactionLogsConfig, RpcTransactionLogsFilter,
},
rpc_response::{
Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult,
Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult, RpcVote,
SlotInfo, SlotUpdate,
},
},

View File

@ -18,7 +18,7 @@ use {
rpc_filter::RpcFilterType,
rpc_response::{
ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcKeyedAccount,
RpcLogsResponse, RpcResponseContext, RpcSignatureResult, SlotInfo, SlotUpdate,
RpcLogsResponse, RpcResponseContext, RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
},
},
solana_measure::measure::Measure,
@ -29,7 +29,7 @@ use {
},
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::{Slot, UnixTimestamp},
clock::Slot,
pubkey::Pubkey,
signature::Signature,
timing::timestamp,
@ -69,15 +69,6 @@ fn get_transaction_logs(
}
logs
}
// A more human-friendly version of Vote, with the bank state signature base58 encoded.
#[derive(Serialize, Deserialize, Debug)]
pub struct RpcVote {
pub slots: Vec<Slot>,
pub hash: String,
pub timestamp: Option<UnixTimestamp>,
}
pub enum NotificationEntry {
Slot(SlotInfo),
SlotUpdate(SlotUpdate),