Use protobufs to store confirmed blocks in BigTable (#12526)
* Use protobufs to store confirmed blocks in BigTable * Cleanup * Reorganize proto * Clean up use statements * Split out function for unit testing * s/utils/convert Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
This commit is contained in:
parent
865d01c38d
commit
ce598c5c98
@ -15,5 +15,21 @@ fn main() -> Result<(), std::io::Error> {
|
||||
.compile(
|
||||
&[googleapis.join("google/bigtable/v2/bigtable.proto")],
|
||||
&[googleapis],
|
||||
)?;
|
||||
|
||||
let out_dir = manifest_dir.join("../proto");
|
||||
let proto_files = manifest_dir.join("../src");
|
||||
|
||||
println!("Protobuf directory: {}", proto_files.display());
|
||||
println!("output directory: {}", out_dir.display());
|
||||
|
||||
tonic_build::configure()
|
||||
.build_client(true)
|
||||
.build_server(false)
|
||||
.format(true)
|
||||
.out_dir(&out_dir)
|
||||
.compile(
|
||||
&[proto_files.join("confirmed_block.proto")],
|
||||
&[proto_files],
|
||||
)
|
||||
}
|
||||
|
95
storage-bigtable/proto/solana.bigtable.confirmed_block.rs
Normal file
95
storage-bigtable/proto/solana.bigtable.confirmed_block.rs
Normal file
@ -0,0 +1,95 @@
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ConfirmedBlock {
|
||||
#[prost(string, tag = "1")]
|
||||
pub previous_blockhash: std::string::String,
|
||||
#[prost(string, tag = "2")]
|
||||
pub blockhash: std::string::String,
|
||||
#[prost(uint64, tag = "3")]
|
||||
pub parent_slot: u64,
|
||||
#[prost(message, repeated, tag = "4")]
|
||||
pub transactions: ::std::vec::Vec<ConfirmedTransaction>,
|
||||
#[prost(message, repeated, tag = "5")]
|
||||
pub rewards: ::std::vec::Vec<Reward>,
|
||||
#[prost(message, optional, tag = "6")]
|
||||
pub block_time: ::std::option::Option<UnixTimestamp>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ConfirmedTransaction {
|
||||
#[prost(message, optional, tag = "1")]
|
||||
pub transaction: ::std::option::Option<Transaction>,
|
||||
#[prost(message, optional, tag = "2")]
|
||||
pub meta: ::std::option::Option<TransactionStatusMeta>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct Transaction {
|
||||
#[prost(bytes, repeated, tag = "1")]
|
||||
pub signatures: ::std::vec::Vec<std::vec::Vec<u8>>,
|
||||
#[prost(message, optional, tag = "2")]
|
||||
pub message: ::std::option::Option<Message>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct Message {
|
||||
#[prost(message, optional, tag = "1")]
|
||||
pub header: ::std::option::Option<MessageHeader>,
|
||||
#[prost(bytes, repeated, tag = "2")]
|
||||
pub account_keys: ::std::vec::Vec<std::vec::Vec<u8>>,
|
||||
#[prost(bytes, tag = "3")]
|
||||
pub recent_blockhash: std::vec::Vec<u8>,
|
||||
#[prost(message, repeated, tag = "4")]
|
||||
pub instructions: ::std::vec::Vec<CompiledInstruction>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct MessageHeader {
|
||||
#[prost(uint32, tag = "1")]
|
||||
pub num_required_signatures: u32,
|
||||
#[prost(uint32, tag = "2")]
|
||||
pub num_readonly_signed_accounts: u32,
|
||||
#[prost(uint32, tag = "3")]
|
||||
pub num_readonly_unsigned_accounts: u32,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct TransactionStatusMeta {
|
||||
#[prost(message, optional, tag = "1")]
|
||||
pub err: ::std::option::Option<TransactionError>,
|
||||
#[prost(uint64, tag = "2")]
|
||||
pub fee: u64,
|
||||
#[prost(uint64, repeated, tag = "3")]
|
||||
pub pre_balances: ::std::vec::Vec<u64>,
|
||||
#[prost(uint64, repeated, tag = "4")]
|
||||
pub post_balances: ::std::vec::Vec<u64>,
|
||||
#[prost(message, repeated, tag = "5")]
|
||||
pub inner_instructions: ::std::vec::Vec<InnerInstructions>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct TransactionError {
|
||||
#[prost(bytes, tag = "1")]
|
||||
pub err: std::vec::Vec<u8>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct InnerInstructions {
|
||||
#[prost(uint32, tag = "1")]
|
||||
pub index: u32,
|
||||
#[prost(message, repeated, tag = "2")]
|
||||
pub instructions: ::std::vec::Vec<CompiledInstruction>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct CompiledInstruction {
|
||||
#[prost(uint32, tag = "1")]
|
||||
pub program_id_index: u32,
|
||||
#[prost(bytes, tag = "2")]
|
||||
pub accounts: std::vec::Vec<u8>,
|
||||
#[prost(bytes, tag = "3")]
|
||||
pub data: std::vec::Vec<u8>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct Reward {
|
||||
#[prost(string, tag = "1")]
|
||||
pub pubkey: std::string::String,
|
||||
#[prost(int64, tag = "2")]
|
||||
pub lamports: i64,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct UnixTimestamp {
|
||||
#[prost(int64, tag = "1")]
|
||||
pub timestamp: i64,
|
||||
}
|
@ -1,8 +1,10 @@
|
||||
// Primitives for reading/writing BigTable tables
|
||||
|
||||
use crate::access_token::{AccessToken, Scope};
|
||||
use crate::compression::{compress_best, decompress};
|
||||
use crate::root_ca_certificate;
|
||||
use crate::{
|
||||
access_token::{AccessToken, Scope},
|
||||
compression::{compress_best, decompress},
|
||||
root_ca_certificate,
|
||||
};
|
||||
use log::*;
|
||||
use thiserror::Error;
|
||||
use tonic::{metadata::MetadataValue, transport::ClientTlsConfig, Request};
|
||||
@ -26,10 +28,14 @@ mod google {
|
||||
use google::bigtable::v2::*;
|
||||
|
||||
pub type RowKey = String;
|
||||
pub type CellName = String;
|
||||
pub type CellValue = Vec<u8>;
|
||||
pub type RowData = Vec<(CellName, CellValue)>;
|
||||
pub type RowDataSlice<'a> = &'a [(CellName, CellValue)];
|
||||
pub type CellName = String;
|
||||
pub type CellValue = Vec<u8>;
|
||||
pub enum CellData<B, P> {
|
||||
Bincode(B),
|
||||
Protobuf(P),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
@ -196,6 +202,23 @@ impl BigTableConnection {
|
||||
.retry(ExponentialBackoff::default())
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn put_protobuf_cells_with_retry<T>(
|
||||
&self,
|
||||
table: &str,
|
||||
cells: &[(RowKey, T)],
|
||||
) -> Result<usize>
|
||||
where
|
||||
T: prost::Message,
|
||||
{
|
||||
use backoff::{future::FutureOperation as _, ExponentialBackoff};
|
||||
(|| async {
|
||||
let mut client = self.client();
|
||||
Ok(client.put_protobuf_cells(table, cells).await?)
|
||||
})
|
||||
.retry(ExponentialBackoff::default())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BigTable {
|
||||
@ -484,7 +507,20 @@ impl BigTable {
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
let row_data = self.get_single_row_data(table, key.clone()).await?;
|
||||
deserialize_cell_data(&row_data, table, key.to_string())
|
||||
deserialize_bincode_cell_data(&row_data, table, key.to_string())
|
||||
}
|
||||
|
||||
pub async fn get_protobuf_or_bincode_cell<B, P>(
|
||||
&mut self,
|
||||
table: &str,
|
||||
key: RowKey,
|
||||
) -> Result<CellData<B, P>>
|
||||
where
|
||||
B: serde::de::DeserializeOwned,
|
||||
P: prost::Message + Default,
|
||||
{
|
||||
let row_data = self.get_single_row_data(table, key.clone()).await?;
|
||||
deserialize_protobuf_or_bincode_cell_data(&row_data, table, key)
|
||||
}
|
||||
|
||||
pub async fn put_bincode_cells<T>(
|
||||
@ -506,9 +542,74 @@ impl BigTable {
|
||||
self.put_row_data(table, "x", &new_row_data).await?;
|
||||
Ok(bytes_written)
|
||||
}
|
||||
|
||||
pub async fn put_protobuf_cells<T>(
|
||||
&mut self,
|
||||
table: &str,
|
||||
cells: &[(RowKey, T)],
|
||||
) -> Result<usize>
|
||||
where
|
||||
T: prost::Message,
|
||||
{
|
||||
let mut bytes_written = 0;
|
||||
let mut new_row_data = vec![];
|
||||
for (row_key, data) in cells {
|
||||
let mut buf = Vec::with_capacity(data.encoded_len());
|
||||
data.encode(&mut buf).unwrap();
|
||||
let data = compress_best(&buf)?;
|
||||
bytes_written += data.len();
|
||||
new_row_data.push((row_key, vec![("proto".to_string(), data)]));
|
||||
}
|
||||
|
||||
self.put_row_data(table, "x", &new_row_data).await?;
|
||||
Ok(bytes_written)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn deserialize_cell_data<T>(
|
||||
pub(crate) fn deserialize_protobuf_or_bincode_cell_data<B, P>(
|
||||
row_data: RowDataSlice,
|
||||
table: &str,
|
||||
key: RowKey,
|
||||
) -> Result<CellData<B, P>>
|
||||
where
|
||||
B: serde::de::DeserializeOwned,
|
||||
P: prost::Message + Default,
|
||||
{
|
||||
match deserialize_protobuf_cell_data(row_data, table, key.to_string()) {
|
||||
Ok(result) => return Ok(CellData::Protobuf(result)),
|
||||
Err(err) => match err {
|
||||
Error::ObjectNotFound(_) => {}
|
||||
_ => return Err(err),
|
||||
},
|
||||
}
|
||||
match deserialize_bincode_cell_data(row_data, table, key) {
|
||||
Ok(result) => Ok(CellData::Bincode(result)),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn deserialize_protobuf_cell_data<T>(
|
||||
row_data: RowDataSlice,
|
||||
table: &str,
|
||||
key: RowKey,
|
||||
) -> Result<T>
|
||||
where
|
||||
T: prost::Message + Default,
|
||||
{
|
||||
let value = &row_data
|
||||
.iter()
|
||||
.find(|(name, _)| name == "proto")
|
||||
.ok_or_else(|| Error::ObjectNotFound(format!("{}/{}", table, key)))?
|
||||
.1;
|
||||
|
||||
let data = decompress(&value)?;
|
||||
T::decode(&data[..]).map_err(|err| {
|
||||
warn!("Failed to deserialize {}/{}: {}", table, key, err);
|
||||
Error::ObjectCorrupt(format!("{}/{}", table, key))
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn deserialize_bincode_cell_data<T>(
|
||||
row_data: RowDataSlice,
|
||||
table: &str,
|
||||
key: RowKey,
|
||||
@ -528,3 +629,111 @@ where
|
||||
Error::ObjectCorrupt(format!("{}/{}", table, key))
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{convert::generated, StoredConfirmedBlock};
|
||||
use prost::Message;
|
||||
use solana_sdk::{hash::Hash, pubkey::Pubkey, signature::Keypair, system_transaction};
|
||||
use solana_transaction_status::{
|
||||
ConfirmedBlock, TransactionStatusMeta, TransactionWithStatusMeta,
|
||||
};
|
||||
use std::convert::TryInto;
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_protobuf_or_bincode_cell_data() {
|
||||
let from = Keypair::new();
|
||||
let recipient = Pubkey::new_rand();
|
||||
let transaction = system_transaction::transfer(&from, &recipient, 42, Hash::default());
|
||||
let with_meta = TransactionWithStatusMeta {
|
||||
transaction,
|
||||
meta: Some(TransactionStatusMeta {
|
||||
status: Ok(()),
|
||||
fee: 1,
|
||||
pre_balances: vec![43, 0, 1],
|
||||
post_balances: vec![0, 42, 1],
|
||||
inner_instructions: Some(vec![]),
|
||||
}),
|
||||
};
|
||||
let block = ConfirmedBlock {
|
||||
transactions: vec![with_meta],
|
||||
parent_slot: 1,
|
||||
blockhash: Hash::default().to_string(),
|
||||
previous_blockhash: Hash::default().to_string(),
|
||||
rewards: vec![],
|
||||
block_time: Some(1_234_567_890),
|
||||
};
|
||||
let bincode_block = compress_best(
|
||||
&bincode::serialize::<StoredConfirmedBlock>(&block.clone().into()).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let protobuf_block = generated::ConfirmedBlock::from(block.clone());
|
||||
let mut buf = Vec::with_capacity(protobuf_block.encoded_len());
|
||||
protobuf_block.encode(&mut buf).unwrap();
|
||||
let protobuf_block = compress_best(&buf).unwrap();
|
||||
|
||||
let deserialized = deserialize_protobuf_or_bincode_cell_data::<
|
||||
StoredConfirmedBlock,
|
||||
generated::ConfirmedBlock,
|
||||
>(
|
||||
&[("proto".to_string(), protobuf_block.clone())],
|
||||
"",
|
||||
"".to_string(),
|
||||
)
|
||||
.unwrap();
|
||||
if let CellData::Protobuf(protobuf_block) = deserialized {
|
||||
assert_eq!(block, protobuf_block.try_into().unwrap());
|
||||
} else {
|
||||
panic!("deserialization should produce CellData::Protobuf");
|
||||
}
|
||||
|
||||
let deserialized = deserialize_protobuf_or_bincode_cell_data::<
|
||||
StoredConfirmedBlock,
|
||||
generated::ConfirmedBlock,
|
||||
>(
|
||||
&[("bin".to_string(), bincode_block.clone())],
|
||||
"",
|
||||
"".to_string(),
|
||||
)
|
||||
.unwrap();
|
||||
if let CellData::Bincode(bincode_block) = deserialized {
|
||||
let mut block = block;
|
||||
if let Some(meta) = &mut block.transactions[0].meta {
|
||||
meta.inner_instructions = None; // Legacy bincode implementation does not suport inner_instructions
|
||||
}
|
||||
assert_eq!(block, bincode_block.into());
|
||||
} else {
|
||||
panic!("deserialization should produce CellData::Bincode");
|
||||
}
|
||||
|
||||
let result = deserialize_protobuf_or_bincode_cell_data::<
|
||||
StoredConfirmedBlock,
|
||||
generated::ConfirmedBlock,
|
||||
>(&[("proto".to_string(), bincode_block)], "", "".to_string());
|
||||
assert!(result.is_err());
|
||||
|
||||
let result = deserialize_protobuf_or_bincode_cell_data::<
|
||||
StoredConfirmedBlock,
|
||||
generated::ConfirmedBlock,
|
||||
>(
|
||||
&[("proto".to_string(), vec![1, 2, 3, 4])],
|
||||
"",
|
||||
"".to_string(),
|
||||
);
|
||||
assert!(result.is_err());
|
||||
|
||||
let result = deserialize_protobuf_or_bincode_cell_data::<
|
||||
StoredConfirmedBlock,
|
||||
generated::ConfirmedBlock,
|
||||
>(&[("bin".to_string(), protobuf_block)], "", "".to_string());
|
||||
assert!(result.is_err());
|
||||
|
||||
let result = deserialize_protobuf_or_bincode_cell_data::<
|
||||
StoredConfirmedBlock,
|
||||
generated::ConfirmedBlock,
|
||||
>(&[("bin".to_string(), vec![1, 2, 3, 4])], "", "".to_string());
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
67
storage-bigtable/src/confirmed_block.proto
Normal file
67
storage-bigtable/src/confirmed_block.proto
Normal file
@ -0,0 +1,67 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package solana.bigtable.ConfirmedBlock;
|
||||
|
||||
message ConfirmedBlock {
|
||||
string previous_blockhash = 1;
|
||||
string blockhash = 2;
|
||||
uint64 parent_slot = 3;
|
||||
repeated ConfirmedTransaction transactions = 4;
|
||||
repeated Reward rewards = 5;
|
||||
UnixTimestamp block_time = 6;
|
||||
}
|
||||
|
||||
message ConfirmedTransaction {
|
||||
Transaction transaction = 1;
|
||||
TransactionStatusMeta meta = 2;
|
||||
}
|
||||
|
||||
message Transaction {
|
||||
repeated bytes signatures = 1;
|
||||
Message message = 2;
|
||||
}
|
||||
|
||||
message Message {
|
||||
MessageHeader header = 1;
|
||||
repeated bytes account_keys = 2;
|
||||
bytes recent_blockhash = 3;
|
||||
repeated CompiledInstruction instructions = 4;
|
||||
}
|
||||
|
||||
message MessageHeader {
|
||||
uint32 num_required_signatures = 1;
|
||||
uint32 num_readonly_signed_accounts = 2;
|
||||
uint32 num_readonly_unsigned_accounts = 3;
|
||||
}
|
||||
|
||||
message TransactionStatusMeta {
|
||||
TransactionError err = 1;
|
||||
uint64 fee = 2;
|
||||
repeated uint64 pre_balances = 3;
|
||||
repeated uint64 post_balances = 4;
|
||||
repeated InnerInstructions inner_instructions = 5;
|
||||
}
|
||||
|
||||
message TransactionError {
|
||||
bytes err = 1;
|
||||
}
|
||||
|
||||
message InnerInstructions {
|
||||
uint32 index = 1;
|
||||
repeated CompiledInstruction instructions = 2;
|
||||
}
|
||||
|
||||
message CompiledInstruction {
|
||||
uint32 program_id_index = 1;
|
||||
bytes accounts = 2;
|
||||
bytes data = 3;
|
||||
}
|
||||
|
||||
message Reward {
|
||||
string pubkey = 1;
|
||||
int64 lamports = 2;
|
||||
}
|
||||
|
||||
message UnixTimestamp {
|
||||
int64 timestamp = 1;
|
||||
}
|
291
storage-bigtable/src/convert.rs
Normal file
291
storage-bigtable/src/convert.rs
Normal file
@ -0,0 +1,291 @@
|
||||
use solana_sdk::{
|
||||
hash::Hash,
|
||||
instruction::CompiledInstruction,
|
||||
message::{Message, MessageHeader},
|
||||
pubkey::Pubkey,
|
||||
signature::Signature,
|
||||
transaction::Transaction,
|
||||
};
|
||||
use solana_transaction_status::{
|
||||
ConfirmedBlock, InnerInstructions, Reward, TransactionStatusMeta, TransactionWithStatusMeta,
|
||||
};
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
|
||||
pub mod generated {
|
||||
include!(concat!(
|
||||
env!("CARGO_MANIFEST_DIR"),
|
||||
concat!("/proto/solana.bigtable.confirmed_block.rs")
|
||||
));
|
||||
}
|
||||
|
||||
impl From<Reward> for generated::Reward {
|
||||
fn from(reward: Reward) -> Self {
|
||||
Self {
|
||||
pubkey: reward.pubkey,
|
||||
lamports: reward.lamports,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<generated::Reward> for Reward {
|
||||
fn from(reward: generated::Reward) -> Self {
|
||||
Self {
|
||||
pubkey: reward.pubkey,
|
||||
lamports: reward.lamports,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ConfirmedBlock> for generated::ConfirmedBlock {
|
||||
fn from(confirmed_block: ConfirmedBlock) -> Self {
|
||||
let ConfirmedBlock {
|
||||
previous_blockhash,
|
||||
blockhash,
|
||||
parent_slot,
|
||||
transactions,
|
||||
rewards,
|
||||
block_time,
|
||||
} = confirmed_block;
|
||||
|
||||
Self {
|
||||
previous_blockhash,
|
||||
blockhash,
|
||||
parent_slot,
|
||||
transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
|
||||
rewards: rewards.into_iter().map(|r| r.into()).collect(),
|
||||
block_time: block_time.map(|timestamp| generated::UnixTimestamp { timestamp }),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<generated::ConfirmedBlock> for ConfirmedBlock {
|
||||
type Error = bincode::Error;
|
||||
fn try_from(
|
||||
confirmed_block: generated::ConfirmedBlock,
|
||||
) -> std::result::Result<Self, Self::Error> {
|
||||
let generated::ConfirmedBlock {
|
||||
previous_blockhash,
|
||||
blockhash,
|
||||
parent_slot,
|
||||
transactions,
|
||||
rewards,
|
||||
block_time,
|
||||
} = confirmed_block;
|
||||
|
||||
Ok(Self {
|
||||
previous_blockhash,
|
||||
blockhash,
|
||||
parent_slot,
|
||||
transactions: transactions
|
||||
.into_iter()
|
||||
.map(|tx| tx.try_into())
|
||||
.collect::<std::result::Result<Vec<TransactionWithStatusMeta>, Self::Error>>()?,
|
||||
rewards: rewards.into_iter().map(|r| r.into()).collect(),
|
||||
block_time: block_time.map(|generated::UnixTimestamp { timestamp }| timestamp),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TransactionWithStatusMeta> for generated::ConfirmedTransaction {
|
||||
fn from(value: TransactionWithStatusMeta) -> Self {
|
||||
let meta = if let Some(meta) = value.meta {
|
||||
Some(meta.into())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Self {
|
||||
transaction: Some(value.transaction.into()),
|
||||
meta,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<generated::ConfirmedTransaction> for TransactionWithStatusMeta {
|
||||
type Error = bincode::Error;
|
||||
fn try_from(value: generated::ConfirmedTransaction) -> std::result::Result<Self, Self::Error> {
|
||||
let meta = if let Some(meta) = value.meta {
|
||||
Some(meta.try_into()?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(Self {
|
||||
transaction: value.transaction.expect("transaction is required").into(),
|
||||
meta,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Transaction> for generated::Transaction {
|
||||
fn from(value: Transaction) -> Self {
|
||||
Self {
|
||||
signatures: value
|
||||
.signatures
|
||||
.into_iter()
|
||||
.map(|signature| <Signature as AsRef<[u8]>>::as_ref(&signature).into())
|
||||
.collect(),
|
||||
message: Some(value.message.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<generated::Transaction> for Transaction {
|
||||
fn from(value: generated::Transaction) -> Self {
|
||||
Self {
|
||||
signatures: value
|
||||
.signatures
|
||||
.into_iter()
|
||||
.map(|x| Signature::new(&x))
|
||||
.collect(),
|
||||
message: value.message.expect("message is required").into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Message> for generated::Message {
|
||||
fn from(value: Message) -> Self {
|
||||
Self {
|
||||
header: Some(value.header.into()),
|
||||
account_keys: value
|
||||
.account_keys
|
||||
.into_iter()
|
||||
.map(|key| <Pubkey as AsRef<[u8]>>::as_ref(&key).into())
|
||||
.collect(),
|
||||
recent_blockhash: value.recent_blockhash.to_bytes().into(),
|
||||
instructions: value.instructions.into_iter().map(|ix| ix.into()).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<generated::Message> for Message {
|
||||
fn from(value: generated::Message) -> Self {
|
||||
Self {
|
||||
header: value.header.expect("header is required").into(),
|
||||
account_keys: value
|
||||
.account_keys
|
||||
.into_iter()
|
||||
.map(|key| Pubkey::new(&key))
|
||||
.collect(),
|
||||
recent_blockhash: Hash::new(&value.recent_blockhash),
|
||||
instructions: value.instructions.into_iter().map(|ix| ix.into()).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MessageHeader> for generated::MessageHeader {
|
||||
fn from(value: MessageHeader) -> Self {
|
||||
Self {
|
||||
num_required_signatures: value.num_required_signatures as u32,
|
||||
num_readonly_signed_accounts: value.num_readonly_signed_accounts as u32,
|
||||
num_readonly_unsigned_accounts: value.num_readonly_unsigned_accounts as u32,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<generated::MessageHeader> for MessageHeader {
|
||||
fn from(value: generated::MessageHeader) -> Self {
|
||||
Self {
|
||||
num_required_signatures: value.num_required_signatures as u8,
|
||||
num_readonly_signed_accounts: value.num_readonly_signed_accounts as u8,
|
||||
num_readonly_unsigned_accounts: value.num_readonly_unsigned_accounts as u8,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TransactionStatusMeta> for generated::TransactionStatusMeta {
|
||||
fn from(value: TransactionStatusMeta) -> Self {
|
||||
let TransactionStatusMeta {
|
||||
status,
|
||||
fee,
|
||||
pre_balances,
|
||||
post_balances,
|
||||
inner_instructions,
|
||||
} = value;
|
||||
let err = match status {
|
||||
Ok(()) => None,
|
||||
Err(err) => Some(generated::TransactionError {
|
||||
err: bincode::serialize(&err).expect("transaction error to serialize to bytes"),
|
||||
}),
|
||||
};
|
||||
let inner_instructions = inner_instructions
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|ii| ii.into())
|
||||
.collect();
|
||||
Self {
|
||||
err,
|
||||
fee,
|
||||
pre_balances,
|
||||
post_balances,
|
||||
inner_instructions,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<generated::TransactionStatusMeta> for TransactionStatusMeta {
|
||||
type Error = bincode::Error;
|
||||
|
||||
fn try_from(value: generated::TransactionStatusMeta) -> std::result::Result<Self, Self::Error> {
|
||||
let generated::TransactionStatusMeta {
|
||||
err,
|
||||
fee,
|
||||
pre_balances,
|
||||
post_balances,
|
||||
inner_instructions,
|
||||
} = value;
|
||||
let status = match &err {
|
||||
None => Ok(()),
|
||||
Some(tx_error) => Err(bincode::deserialize(&tx_error.err)?),
|
||||
};
|
||||
let inner_instructions = Some(
|
||||
inner_instructions
|
||||
.into_iter()
|
||||
.map(|inner| inner.into())
|
||||
.collect(),
|
||||
);
|
||||
Ok(Self {
|
||||
status,
|
||||
fee,
|
||||
pre_balances,
|
||||
post_balances,
|
||||
inner_instructions,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerInstructions> for generated::InnerInstructions {
|
||||
fn from(value: InnerInstructions) -> Self {
|
||||
Self {
|
||||
index: value.index as u32,
|
||||
instructions: value.instructions.into_iter().map(|i| i.into()).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<generated::InnerInstructions> for InnerInstructions {
|
||||
fn from(value: generated::InnerInstructions) -> Self {
|
||||
Self {
|
||||
index: value.index as u8,
|
||||
instructions: value.instructions.into_iter().map(|i| i.into()).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompiledInstruction> for generated::CompiledInstruction {
|
||||
fn from(value: CompiledInstruction) -> Self {
|
||||
Self {
|
||||
program_id_index: value.program_id_index as u32,
|
||||
accounts: value.accounts,
|
||||
data: value.data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<generated::CompiledInstruction> for CompiledInstruction {
|
||||
fn from(value: generated::CompiledInstruction) -> Self {
|
||||
Self {
|
||||
program_id_index: value.program_id_index as u8,
|
||||
accounts: value.accounts,
|
||||
data: value.data,
|
||||
}
|
||||
}
|
||||
}
|
@ -8,11 +8,10 @@ use solana_sdk::{
|
||||
transaction::{Transaction, TransactionError},
|
||||
};
|
||||
use solana_transaction_status::{
|
||||
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature,
|
||||
InnerInstructions, Rewards, TransactionStatus, TransactionStatusMeta,
|
||||
TransactionWithStatusMeta,
|
||||
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards,
|
||||
TransactionStatus, TransactionStatusMeta, TransactionWithStatusMeta,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use std::{collections::HashMap, convert::TryInto};
|
||||
use thiserror::Error;
|
||||
|
||||
#[macro_use]
|
||||
@ -21,8 +20,11 @@ extern crate serde_derive;
|
||||
mod access_token;
|
||||
mod bigtable;
|
||||
mod compression;
|
||||
mod convert;
|
||||
mod root_ca_certificate;
|
||||
|
||||
use convert::generated;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("BigTable: {0}")]
|
||||
@ -162,7 +164,6 @@ struct StoredConfirmedBlockTransactionStatusMeta {
|
||||
fee: u64,
|
||||
pre_balances: Vec<u64>,
|
||||
post_balances: Vec<u64>,
|
||||
inner_instructions: Option<Vec<InnerInstructions>>,
|
||||
}
|
||||
|
||||
impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
|
||||
@ -172,7 +173,6 @@ impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
|
||||
fee,
|
||||
pre_balances,
|
||||
post_balances,
|
||||
inner_instructions,
|
||||
} = value;
|
||||
let status = match &err {
|
||||
None => Ok(()),
|
||||
@ -183,7 +183,7 @@ impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
|
||||
fee,
|
||||
pre_balances,
|
||||
post_balances,
|
||||
inner_instructions,
|
||||
inner_instructions: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -195,7 +195,6 @@ impl From<TransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
|
||||
fee,
|
||||
pre_balances,
|
||||
post_balances,
|
||||
inner_instructions,
|
||||
..
|
||||
} = value;
|
||||
Self {
|
||||
@ -203,7 +202,6 @@ impl From<TransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
|
||||
fee,
|
||||
pre_balances,
|
||||
post_balances,
|
||||
inner_instructions,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -279,10 +277,18 @@ impl LedgerStorage {
|
||||
/// Fetch the confirmed block from the desired slot
|
||||
pub async fn get_confirmed_block(&self, slot: Slot) -> Result<ConfirmedBlock> {
|
||||
let mut bigtable = self.connection.client();
|
||||
let block = bigtable
|
||||
.get_bincode_cell::<StoredConfirmedBlock>("blocks", slot_to_key(slot))
|
||||
let block_cell_data = bigtable
|
||||
.get_protobuf_or_bincode_cell::<StoredConfirmedBlock, generated::ConfirmedBlock>(
|
||||
"blocks",
|
||||
slot_to_key(slot),
|
||||
)
|
||||
.await?;
|
||||
Ok(block.into())
|
||||
Ok(match block_cell_data {
|
||||
bigtable::CellData::Bincode(block) => block.into(),
|
||||
bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
|
||||
bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_key(slot)))
|
||||
})?,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_signature_status(&self, signature: &Signature) -> Result<TransactionStatus> {
|
||||
@ -405,7 +411,7 @@ impl LedgerStorage {
|
||||
))
|
||||
})?;
|
||||
let mut cell_data: Vec<TransactionByAddrInfo> =
|
||||
bigtable::deserialize_cell_data(&data, "tx-by-addr", row_key)?;
|
||||
bigtable::deserialize_bincode_cell_data(&data, "tx-by-addr", row_key)?;
|
||||
cell_data.reverse();
|
||||
for tx_by_addr_info in cell_data.into_iter() {
|
||||
// Filter out records before `before_transaction_index`
|
||||
@ -511,7 +517,7 @@ impl LedgerStorage {
|
||||
let blocks_cells = [(slot_to_key(slot), confirmed_block.into())];
|
||||
bytes_written += self
|
||||
.connection
|
||||
.put_bincode_cells_with_retry::<StoredConfirmedBlock>("blocks", &blocks_cells)
|
||||
.put_protobuf_cells_with_retry::<generated::ConfirmedBlock>("blocks", &blocks_cells)
|
||||
.await?;
|
||||
info!(
|
||||
"uploaded block for slot {}: {} transactions, {} bytes",
|
||||
|
@ -227,7 +227,7 @@ pub struct ConfirmedTransactionStatusWithSignature {
|
||||
pub memo: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Reward {
|
||||
pub pubkey: String,
|
||||
pub lamports: i64,
|
||||
@ -235,7 +235,7 @@ pub struct Reward {
|
||||
|
||||
pub type Rewards = Vec<Reward>;
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ConfirmedBlock {
|
||||
pub previous_blockhash: String,
|
||||
|
Loading…
x
Reference in New Issue
Block a user