Revert signature-notification format change (bp #12032) (#12038)

* Revert signature-notification format change (#12032)

* Use untagged RpcSignatureResult enum to avoid breaking downstream consumers of current signature subscriptions

* Clean up client duplication

* Clippy

(cherry picked from commit 39246f9dd7)

# Conflicts:
#	core/src/rpc_pubsub.rs

* Fix conflicts

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
mergify[bot]
2020-09-04 01:34:49 +00:00
committed by GitHub
parent 962aed0961
commit aa3bdd3730
8 changed files with 56 additions and 96 deletions

View File

@ -10,9 +10,10 @@ use solana_clap_utils::{
commitment::commitment_arg, input_parsers::*, input_validators::*, keypair::signer_from_path, commitment::commitment_arg, input_parsers::*, input_validators::*, keypair::signer_from_path,
}; };
use solana_client::{ use solana_client::{
pubsub_client::{PubsubClient, SlotInfoMessage}, pubsub_client::PubsubClient,
rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient}, rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient},
rpc_config::{RpcLargestAccountsConfig, RpcLargestAccountsFilter}, rpc_config::{RpcLargestAccountsConfig, RpcLargestAccountsFilter},
rpc_response::SlotInfo,
}; };
use solana_remote_wallet::remote_wallet::RemoteWalletManager; use solana_remote_wallet::remote_wallet::RemoteWalletManager;
use solana_sdk::{ use solana_sdk::{
@ -1033,7 +1034,7 @@ pub fn process_live_slots(url: &str) -> ProcessResult {
})?; })?;
*/ */
let mut current: Option<SlotInfoMessage> = None; let mut current: Option<SlotInfo> = None;
let mut message = "".to_string(); let mut message = "".to_string();
let slot_progress = new_spinner_progress_bar(); let slot_progress = new_spinner_progress_bar();

View File

@ -1,13 +1,12 @@
use crate::rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotInfo};
use log::*; use log::*;
use serde::{de::DeserializeOwned, Deserialize}; use serde::de::DeserializeOwned;
use serde_json::{ use serde_json::{
json, json,
value::Value::{Number, Object}, value::Value::{Number, Object},
Map, Value, Map, Value,
}; };
use solana_sdk::{ use solana_sdk::signature::Signature;
commitment_config::CommitmentConfig, signature::Signature, transaction::TransactionError,
};
use std::{ use std::{
marker::PhantomData, marker::PhantomData,
sync::{ sync::{
@ -21,7 +20,7 @@ use thiserror::Error;
use tungstenite::{client::AutoStream, connect, Message, WebSocket}; use tungstenite::{client::AutoStream, connect, Message, WebSocket};
use url::{ParseError, Url}; use url::{ParseError, Url};
type PubsubSignatureResponse = PubsubClientSubscription<RpcResponse<SignatureResult>>; type PubsubSignatureResponse = PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum PubsubClientError { pub enum PubsubClientError {
@ -38,45 +37,6 @@ pub enum PubsubClientError {
UnexpectedMessageError, UnexpectedMessageError,
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "camelCase", tag = "type", content = "result")]
pub enum SignatureResult {
ProcessedSignatureResult(ProcessedSignatureResult),
ReceivedSignature,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct RpcResponseContext {
pub slot: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ProcessedSignatureResult {
pub err: Option<TransactionError>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RpcResponse<T> {
pub context: RpcResponseContext,
pub value: T,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct SlotInfoMessage {
pub parent: u64,
pub root: u64,
pub slot: u64,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcSignatureSubscribeConfig {
#[serde(flatten)]
pub commitment: Option<CommitmentConfig>,
pub enable_received_notification: Option<bool>,
}
pub struct PubsubClientSubscription<T> pub struct PubsubClientSubscription<T>
where where
T: DeserializeOwned, T: DeserializeOwned,
@ -186,22 +146,16 @@ pub struct PubsubClient {}
impl PubsubClient { impl PubsubClient {
pub fn slot_subscribe( pub fn slot_subscribe(
url: &str, url: &str,
) -> Result< ) -> Result<(PubsubClientSubscription<SlotInfo>, Receiver<SlotInfo>), PubsubClientError> {
(
PubsubClientSubscription<SlotInfoMessage>,
Receiver<SlotInfoMessage>,
),
PubsubClientError,
> {
let url = Url::parse(url)?; let url = Url::parse(url)?;
let (socket, _response) = connect(url)?; let (socket, _response) = connect(url)?;
let (sender, receiver) = channel::<SlotInfoMessage>(); let (sender, receiver) = channel::<SlotInfo>();
let socket = Arc::new(RwLock::new(socket)); let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone(); let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone(); let exit_clone = exit.clone();
let subscription_id = PubsubClientSubscription::<SlotInfoMessage>::send_subscribe( let subscription_id = PubsubClientSubscription::<SlotInfo>::send_subscribe(
&socket_clone, &socket_clone,
json!({ json!({
"jsonrpc":"2.0","id":1,"method":format!("{}Subscribe", SLOT_OPERATION),"params":[] "jsonrpc":"2.0","id":1,"method":format!("{}Subscribe", SLOT_OPERATION),"params":[]
@ -216,11 +170,11 @@ impl PubsubClient {
break; break;
} }
let message: Result<SlotInfoMessage, PubsubClientError> = let message: Result<SlotInfo, PubsubClientError> =
PubsubClientSubscription::read_message(&socket_clone); PubsubClientSubscription::read_message(&socket_clone);
if let Ok(msg) = message { if let Ok(msg) = message {
match sender.send(msg.clone()) { match sender.send(msg) {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
info!("receive error: {:?}", err); info!("receive error: {:?}", err);
@ -236,7 +190,7 @@ impl PubsubClient {
info!("websocket - exited receive loop"); info!("websocket - exited receive loop");
}); });
let result: PubsubClientSubscription<SlotInfoMessage> = PubsubClientSubscription { let result: PubsubClientSubscription<SlotInfo> = PubsubClientSubscription {
message_type: PhantomData, message_type: PhantomData,
operation: SLOT_OPERATION, operation: SLOT_OPERATION,
socket, socket,
@ -254,13 +208,13 @@ impl PubsubClient {
) -> Result< ) -> Result<
( (
PubsubSignatureResponse, PubsubSignatureResponse,
Receiver<RpcResponse<SignatureResult>>, Receiver<RpcResponse<RpcSignatureResult>>,
), ),
PubsubClientError, PubsubClientError,
> { > {
let url = Url::parse(url)?; let url = Url::parse(url)?;
let (socket, _response) = connect(url)?; let (socket, _response) = connect(url)?;
let (sender, receiver) = channel::<RpcResponse<SignatureResult>>(); let (sender, receiver) = channel::<RpcResponse<RpcSignatureResult>>();
let socket = Arc::new(RwLock::new(socket)); let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone(); let socket_clone = socket.clone();
@ -277,7 +231,7 @@ impl PubsubClient {
}) })
.to_string(); .to_string();
let subscription_id = let subscription_id =
PubsubClientSubscription::<RpcResponse<SignatureResult>>::send_subscribe( PubsubClientSubscription::<RpcResponse<RpcSignatureResult>>::send_subscribe(
&socket_clone, &socket_clone,
body, body,
) )
@ -289,7 +243,7 @@ impl PubsubClient {
break; break;
} }
let message: Result<RpcResponse<SignatureResult>, PubsubClientError> = let message: Result<RpcResponse<RpcSignatureResult>, PubsubClientError> =
PubsubClientSubscription::read_message(&socket_clone); PubsubClientSubscription::read_message(&socket_clone);
if let Ok(msg) = message { if let Ok(msg) = message {
@ -309,7 +263,7 @@ impl PubsubClient {
info!("websocket - exited receive loop"); info!("websocket - exited receive loop");
}); });
let result: PubsubClientSubscription<RpcResponse<SignatureResult>> = let result: PubsubClientSubscription<RpcResponse<RpcSignatureResult>> =
PubsubClientSubscription { PubsubClientSubscription {
message_type: PhantomData, message_type: PhantomData,
operation: SIGNATURE_OPERATION, operation: SIGNATURE_OPERATION,

View File

@ -94,11 +94,18 @@ pub struct RpcKeyedAccount {
pub account: UiAccount, pub account: UiAccount,
} }
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)]
pub struct SlotInfo {
pub slot: Slot,
pub parent: Slot,
pub root: Slot,
}
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase", tag = "type", content = "result")] #[serde(rename_all = "camelCase", untagged)]
pub enum RpcSignatureResult { pub enum RpcSignatureResult {
ProcessedSignatureResult(ProcessedSignatureResult), ProcessedSignature(ProcessedSignatureResult),
ReceivedSignature, ReceivedSignature(ReceivedSignatureResult),
} }
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
@ -107,6 +114,12 @@ pub struct ProcessedSignatureResult {
pub err: Option<TransactionError>, pub err: Option<TransactionError>,
} }
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub enum ReceivedSignatureResult {
ReceivedSignature,
}
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub struct RpcContactInfo { pub struct RpcContactInfo {
/// Pubkey of the node as a base-58 string /// Pubkey of the node as a base-58 string

View File

@ -1,13 +1,13 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request //! The `pubsub` module implements a threaded subscription service on client RPC request
use crate::rpc_subscriptions::{RpcSubscriptions, RpcVote, SlotInfo}; use crate::rpc_subscriptions::{RpcSubscriptions, RpcVote};
use jsonrpc_core::{Error, ErrorCode, Result}; use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc; use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
use solana_account_decoder::UiAccount; use solana_account_decoder::UiAccount;
use solana_client::{ use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcSignatureResult}, rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcSignatureResult, SlotInfo},
}; };
#[cfg(test)] #[cfg(test)]
use solana_runtime::bank_forks::BankForks; use solana_runtime::bank_forks::BankForks;
@ -362,7 +362,7 @@ mod tests {
use serial_test_derive::serial; use serial_test_derive::serial;
use solana_account_decoder::{parse_account_data::parse_account_data, UiAccountEncoding}; use solana_account_decoder::{parse_account_data::parse_account_data, UiAccountEncoding};
use solana_budget_program::{self, budget_instruction}; use solana_budget_program::{self, budget_instruction};
use solana_client::rpc_response::ProcessedSignatureResult; use solana_client::rpc_response::{ProcessedSignatureResult, ReceivedSignatureResult};
use solana_runtime::{ use solana_runtime::{
bank::Bank, bank::Bank,
bank_forks::BankForks, bank_forks::BankForks,
@ -444,7 +444,7 @@ mod tests {
// Test signature confirmation notification // Test signature confirmation notification
let (response, _) = robust_poll_or_panic(receiver); let (response, _) = robust_poll_or_panic(receiver);
let expected_res = let expected_res =
RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None }); RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None });
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "signatureNotification", "method": "signatureNotification",
@ -476,7 +476,8 @@ mod tests {
.notify_signatures_received((received_slot, vec![tx.signatures[0]])); .notify_signatures_received((received_slot, vec![tx.signatures[0]]));
// Test signature confirmation notification // Test signature confirmation notification
let (response, _) = robust_poll_or_panic(receiver); let (response, _) = robust_poll_or_panic(receiver);
let expected_res = RpcSignatureResult::ReceivedSignature; let expected_res =
RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature);
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "signatureNotification", "method": "signatureNotification",

View File

@ -13,7 +13,8 @@ use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
rpc_filter::RpcFilterType, rpc_filter::RpcFilterType,
rpc_response::{ rpc_response::{
ProcessedSignatureResult, Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult, ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcKeyedAccount,
RpcResponseContext, RpcSignatureResult, SlotInfo,
}, },
}; };
use solana_runtime::{ use solana_runtime::{
@ -47,13 +48,6 @@ use tokio_01::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor};
const RECEIVE_DELAY_MILLIS: u64 = 100; const RECEIVE_DELAY_MILLIS: u64 = 100;
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct SlotInfo {
pub slot: Slot,
pub parent: Slot,
pub root: Slot,
}
// A more human-friendly version of Vote, with the bank state signature base58 encoded. // A more human-friendly version of Vote, with the bank state signature base58 encoded.
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct RpcVote { pub struct RpcVote {
@ -291,9 +285,7 @@ fn filter_signature_result(
) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) { ) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) {
( (
Box::new(result.into_iter().map(|result| { Box::new(result.into_iter().map(|result| {
RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: result.err() })
err: result.err(),
})
})), })),
last_notified_slot, last_notified_slot,
) )
@ -986,7 +978,9 @@ impl RpcSubscriptions {
context: RpcResponseContext { context: RpcResponseContext {
slot: *received_slot, slot: *received_slot,
}, },
value: RpcSignatureResult::ReceivedSignature, value: RpcSignatureResult::ReceivedSignature(
ReceivedSignatureResult::ReceivedSignature,
),
}, },
&sink, &sink,
); );
@ -1371,8 +1365,9 @@ pub(crate) mod tests {
.notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]])); .notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]]));
subscriptions.notify_subscribers(commitment_slots); subscriptions.notify_subscribers(commitment_slots);
let expected_res = let expected_res =
RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None }); RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None });
let received_expected_res = RpcSignatureResult::ReceivedSignature; let received_expected_res =
RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature);
struct Notification { struct Notification {
slot: Slot, slot: Slot,
id: u64, id: u64,

View File

@ -1,7 +1,4 @@
use solana_client::{ use solana_client::{pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::SlotInfo};
pubsub_client::{PubsubClient, SlotInfoMessage},
rpc_client::RpcClient,
};
use solana_core::{ use solana_core::{
rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions, rpc_pubsub_service::PubSubService, rpc_subscriptions::RpcSubscriptions,
validator::TestValidator, validator::TestValidator,
@ -105,7 +102,7 @@ fn test_slot_subscription() {
let (mut client, receiver) = let (mut client, receiver) =
PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap(); PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
let mut errors: Vec<(SlotInfoMessage, SlotInfoMessage)> = Vec::new(); let mut errors: Vec<(SlotInfo, SlotInfo)> = Vec::new();
for i in 0..3 { for i in 0..3 {
subscriptions.notify_slot(i + 1, i, i); subscriptions.notify_slot(i + 1, i, i);
@ -114,7 +111,7 @@ fn test_slot_subscription() {
match maybe_actual { match maybe_actual {
Ok(actual) => { Ok(actual) => {
let expected = SlotInfoMessage { let expected = SlotInfo {
slot: i + 1, slot: i + 1,
parent: i, parent: i,
root: i, root: i,

View File

@ -285,7 +285,7 @@ fn test_rpc_subscriptions() {
let timeout = deadline.saturating_duration_since(Instant::now()); let timeout = deadline.saturating_duration_since(Instant::now());
match status_receiver.recv_timeout(timeout) { match status_receiver.recv_timeout(timeout) {
Ok((sig, result)) => { Ok((sig, result)) => {
if let RpcSignatureResult::ProcessedSignatureResult(result) = result.value { if let RpcSignatureResult::ProcessedSignature(result) = result.value {
assert!(result.err.is_none()); assert!(result.err.is_none());
assert!(signature_set.remove(&sig)); assert!(signature_set.remove(&sig));
} else { } else {

View File

@ -3,8 +3,7 @@ use gag::BufferRedirect;
use log::*; use log::*;
use serial_test_derive::serial; use serial_test_derive::serial;
use solana_client::{ use solana_client::{
pubsub_client::{PubsubClient, SignatureResult}, pubsub_client::PubsubClient, rpc_client::RpcClient, rpc_response::RpcSignatureResult,
rpc_client::RpcClient,
thin_client::create_client, thin_client::create_client,
}; };
use solana_core::{ use solana_core::{
@ -179,11 +178,11 @@ fn test_local_cluster_signature_subscribe() {
let mut should_break = false; let mut should_break = false;
for response in responses { for response in responses {
match response.value { match response.value {
SignatureResult::ProcessedSignatureResult(_) => { RpcSignatureResult::ProcessedSignature(_) => {
should_break = true; should_break = true;
break; break;
} }
SignatureResult::ReceivedSignature => { RpcSignatureResult::ReceivedSignature(_) => {
got_received_notification = true; got_received_notification = true;
} }
} }