Add solana logs command

This commit is contained in:
Michael Vines
2020-11-20 13:52:58 -08:00
parent f96c4ec84e
commit 4ef2da0ff0
10 changed files with 785 additions and 105 deletions

View File

@ -1,6 +1,6 @@
use crate::{
rpc_config::RpcSignatureSubscribeConfig,
rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotInfo},
rpc_config::{RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter},
rpc_response::{Response as RpcResponse, RpcLogsResponse, RpcSignatureResult, SlotInfo},
};
use log::*;
use serde::de::DeserializeOwned;
@ -23,8 +23,6 @@ use thiserror::Error;
use tungstenite::{client::AutoStream, connect, Message, WebSocket};
use url::{ParseError, Url};
type PubsubSignatureResponse = PubsubClientSubscription<RpcResponse<RpcSignatureResult>>;
#[derive(Debug, Error)]
pub enum PubsubClientError {
#[error("url parse error")]
@ -36,8 +34,8 @@ pub enum PubsubClientError {
#[error("json parse error")]
JsonParseError(#[from] serde_json::error::Error),
#[error("unexpected message format")]
UnexpectedMessageError,
#[error("unexpected message format: {0}")]
UnexpectedMessageError(String),
}
pub struct PubsubClientSubscription<T>
@ -92,8 +90,11 @@ where
return Ok(x);
}
}
Err(PubsubClientError::UnexpectedMessageError)
// TODO: Add proper JSON RPC response/error handling...
Err(PubsubClientError::UnexpectedMessageError(format!(
"{:?}",
json_msg
)))
}
pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> {
@ -117,14 +118,18 @@ where
let message_text = &message.into_text().unwrap();
let json_msg: Map<String, Value> = serde_json::from_str(message_text)?;
if let Some(Object(value_1)) = json_msg.get("params") {
if let Some(value_2) = value_1.get("result") {
let x: T = serde_json::from_value::<T>(value_2.clone()).unwrap();
if let Some(Object(params)) = json_msg.get("params") {
if let Some(result) = params.get("result") {
let x: T = serde_json::from_value::<T>(result.clone()).unwrap();
return Ok(x);
}
}
Err(PubsubClientError::UnexpectedMessageError)
// TODO: Add proper JSON RPC response/error handling...
Err(PubsubClientError::UnexpectedMessageError(format!(
"{:?}",
json_msg
)))
}
pub fn shutdown(&mut self) -> std::thread::Result<()> {
@ -141,15 +146,79 @@ where
}
}
const SLOT_OPERATION: &str = "slot";
const SIGNATURE_OPERATION: &str = "signature";
pub type LogsSubscription = (
PubsubClientSubscription<RpcResponse<RpcLogsResponse>>,
Receiver<RpcResponse<RpcLogsResponse>>,
);
pub type SlotsSubscription = (PubsubClientSubscription<SlotInfo>, Receiver<SlotInfo>);
pub type SignatureSubscription = (
PubsubClientSubscription<RpcResponse<RpcSignatureResult>>,
Receiver<RpcResponse<RpcSignatureResult>>,
);
pub struct PubsubClient {}
impl PubsubClient {
pub fn slot_subscribe(
pub fn logs_subscribe(
url: &str,
) -> Result<(PubsubClientSubscription<SlotInfo>, Receiver<SlotInfo>), PubsubClientError> {
filter: RpcTransactionLogsFilter,
config: RpcTransactionLogsConfig,
) -> Result<LogsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let (sender, receiver) = channel();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let subscription_id =
PubsubClientSubscription::<RpcResponse<RpcLogsResponse>>::send_subscribe(
&socket_clone,
json!({
"jsonrpc":"2.0","id":1,"method":"logsSubscribe","params":[filter, config]
})
.to_string(),
)?;
let t_cleanup = std::thread::spawn(move || {
loop {
if exit_clone.load(Ordering::Relaxed) {
break;
}
match PubsubClientSubscription::read_message(&socket_clone) {
Ok(message) => match sender.send(message) {
Ok(_) => (),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
},
Err(err) => {
info!("receive error: {:?}", err);
break;
}
}
}
info!("websocket - exited receive loop");
});
let result = PubsubClientSubscription {
message_type: PhantomData,
operation: "logs",
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
};
Ok((result, receiver))
}
pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let (sender, receiver) = channel::<SlotInfo>();
@ -161,41 +230,37 @@ impl PubsubClient {
let subscription_id = PubsubClientSubscription::<SlotInfo>::send_subscribe(
&socket_clone,
json!({
"jsonrpc":"2.0","id":1,"method":format!("{}Subscribe", SLOT_OPERATION),"params":[]
"jsonrpc":"2.0","id":1,"method":"slotSubscribe","params":[]
})
.to_string(),
)
.unwrap();
)?;
let t_cleanup = std::thread::spawn(move || {
loop {
if exit_clone.load(Ordering::Relaxed) {
break;
}
let message: Result<SlotInfo, PubsubClientError> =
PubsubClientSubscription::read_message(&socket_clone);
if let Ok(msg) = message {
match sender.send(msg) {
match PubsubClientSubscription::read_message(&socket_clone) {
Ok(message) => match sender.send(message) {
Ok(_) => (),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
},
Err(err) => {
info!("receive error: {:?}", err);
break;
}
} else {
info!("receive error: {:?}", message);
break;
}
}
info!("websocket - exited receive loop");
});
let result: PubsubClientSubscription<SlotInfo> = PubsubClientSubscription {
let result = PubsubClientSubscription {
message_type: PhantomData,
operation: SLOT_OPERATION,
operation: "slot",
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
@ -209,16 +274,10 @@ impl PubsubClient {
url: &str,
signature: &Signature,
config: Option<RpcSignatureSubscribeConfig>,
) -> Result<
(
PubsubSignatureResponse,
Receiver<RpcResponse<RpcSignatureResult>>,
),
PubsubClientError,
> {
) -> Result<SignatureSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let (sender, receiver) = channel::<RpcResponse<RpcSignatureResult>>();
let (sender, receiver) = channel();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
@ -227,7 +286,7 @@ impl PubsubClient {
let body = json!({
"jsonrpc":"2.0",
"id":1,
"method":format!("{}Subscribe", SIGNATURE_OPERATION),
"method":"signatureSubscribe",
"params":[
signature.to_string(),
config
@ -238,8 +297,7 @@ impl PubsubClient {
PubsubClientSubscription::<RpcResponse<RpcSignatureResult>>::send_subscribe(
&socket_clone,
body,
)
.unwrap();
)?;
let t_cleanup = std::thread::spawn(move || {
loop {
@ -267,15 +325,14 @@ impl PubsubClient {
info!("websocket - exited receive loop");
});
let result: PubsubClientSubscription<RpcResponse<RpcSignatureResult>> =
PubsubClientSubscription {
message_type: PhantomData,
operation: SIGNATURE_OPERATION,
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
};
let result = PubsubClientSubscription {
message_type: PhantomData,
operation: "signature",
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
};
Ok((result, receiver))
}

View File

@ -71,6 +71,20 @@ pub struct RpcProgramAccountsConfig {
pub account_config: RpcAccountInfoConfig,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum RpcTransactionLogsFilter {
All,
AllWithVotes,
Mentions(Vec<String>), // base58-encoded list of addresses
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcTransactionLogsConfig {
pub commitment: Option<CommitmentConfig>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum RpcTokenAccountsFilter {

View File

@ -108,6 +108,14 @@ pub enum RpcSignatureResult {
ReceivedSignature(ReceivedSignatureResult),
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RpcLogsResponse {
pub signature: String, // Signature as base58 string
pub err: Option<TransactionError>,
pub logs: Vec<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ProcessedSignatureResult {