Long-term ledger storage with BigTable (bp #11222)

This commit is contained in:
Michael Vines
2020-07-17 13:25:28 -07:00
parent 7f0d4f0656
commit 66242eab41
34 changed files with 5484 additions and 34 deletions

View File

@ -25,6 +25,7 @@ members = [
"log-analyzer",
"merkle-tree",
"stake-o-matic",
"storage-bigtable",
"streamer",
"measure",
"metrics",

View File

@ -7,7 +7,7 @@ source multinode-demo/common.sh
rm -rf config/run/init-completed config/ledger config/snapshot-ledger
timeout 15 ./run.sh &
timeout 120 ./run.sh &
pid=$!
attempts=20

View File

@ -59,20 +59,21 @@ solana-perf = { path = "../perf", version = "1.2.20" }
solana-runtime = { path = "../runtime", version = "1.2.20" }
solana-sdk = { path = "../sdk", version = "1.2.20" }
solana-stake-program = { path = "../programs/stake", version = "1.2.20" }
solana-storage-bigtable = { path = "../storage-bigtable", version = "1.2.20" }
solana-streamer = { path = "../streamer", version = "1.2.20" }
solana-sys-tuner = { path = "../sys-tuner", version = "1.2.20" }
solana-transaction-status = { path = "../transaction-status", version = "1.2.20" }
solana-version = { path = "../version", version = "1.2.20" }
solana-vote-program = { path = "../programs/vote", version = "1.2.20" }
solana-vote-signer = { path = "../vote-signer", version = "1.2.20" }
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.2.20" }
spl-token-v1-0 = { package = "spl-token", version = "1.0.6", features = ["skip-no-mangle"] }
tempfile = "3.1.0"
thiserror = "1.0"
tokio = "0.1"
tokio-codec = "0.1"
tokio-fs = "0.1"
tokio-io = "0.1"
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.2.20" }
tokio = { version = "0.2.22", features = ["full"] }
tokio_01 = { version = "0.1", package = "tokio" }
tokio_fs_01 = { version = "0.1", package = "tokio-fs" }
tokio_io_01 = { version = "0.1", package = "tokio-io" }
trees = "0.2.1"
[dev-dependencies]

View File

@ -64,6 +64,7 @@ use std::{
str::FromStr,
sync::{Arc, RwLock},
};
use tokio::runtime;
fn new_response<T>(bank: &Bank, value: T) -> Result<RpcResponse<T>> {
let context = RpcResponseContext { slot: bank.slot() };
@ -78,6 +79,7 @@ pub struct JsonRpcConfig {
pub identity_pubkey: Pubkey,
pub faucet_addr: Option<SocketAddr>,
pub health_check_slot_distance: u64,
pub enable_bigtable_ledger_storage: bool,
}
#[derive(Clone)]
@ -91,6 +93,8 @@ pub struct JsonRpcRequestProcessor {
cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash,
send_transaction_service: Arc<SendTransactionService>,
runtime_handle: runtime::Handle,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
}
impl Metadata for JsonRpcRequestProcessor {}
@ -145,6 +149,7 @@ impl JsonRpcRequestProcessor {
}
}
#[allow(clippy::too_many_arguments)]
pub fn new(
config: JsonRpcConfig,
bank_forks: Arc<RwLock<BankForks>>,
@ -155,6 +160,8 @@ impl JsonRpcRequestProcessor {
cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash,
send_transaction_service: Arc<SendTransactionService>,
runtime: &runtime::Runtime,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
) -> Self {
Self {
config,
@ -166,6 +173,8 @@ impl JsonRpcRequestProcessor {
cluster_info,
genesis_hash,
send_transaction_service,
runtime_handle: runtime.handle().clone(),
bigtable_ledger_storage,
}
}
@ -507,6 +516,7 @@ impl JsonRpcRequestProcessor {
slot: Slot,
encoding: Option<UiTransactionEncoding>,
) -> Result<Option<ConfirmedBlock>> {
let encoding = encoding.unwrap_or(UiTransactionEncoding::Json);
if self.config.enable_rpc_transaction_history
&& slot
<= self
@ -515,7 +525,15 @@ impl JsonRpcRequestProcessor {
.unwrap()
.largest_confirmed_root()
{
let result = self.blockstore.get_confirmed_block(slot, encoding);
let result = self.blockstore.get_confirmed_block(slot, Some(encoding));
if result.is_err() {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime_handle
.block_on(bigtable_ledger_storage.get_confirmed_block(slot, encoding))
.ok());
}
}
self.check_slot_cleaned_up(&result, slot)?;
Ok(result.ok())
} else {
@ -544,9 +562,25 @@ impl JsonRpcRequestProcessor {
MAX_GET_CONFIRMED_BLOCKS_RANGE
)));
}
let lowest_slot = self.blockstore.lowest_slot();
if start_slot < lowest_slot {
// If the starting slot is lower than what's available in blockstore assume the entire
// [start_slot..end_slot] can be fetched from BigTable.
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime_handle
.block_on(
bigtable_ledger_storage
.get_confirmed_blocks(start_slot, (end_slot - start_slot) as usize),
)
.unwrap_or_else(|_| vec![]));
}
}
Ok(self
.blockstore
.rooted_slot_iterator(max(start_slot, self.blockstore.lowest_slot()))
.rooted_slot_iterator(max(start_slot, lowest_slot))
.map_err(|_| Error::internal_error())?
.filter(|&slot| slot <= end_slot)
.collect())
@ -640,6 +674,16 @@ impl JsonRpcRequestProcessor {
err,
}
})
.or_else(|| {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
self.runtime_handle
.block_on(bigtable_ledger_storage.get_signature_status(&signature))
.map(Some)
.unwrap_or(None)
} else {
None
}
})
} else {
None
};
@ -681,23 +725,39 @@ impl JsonRpcRequestProcessor {
&self,
signature: Signature,
encoding: Option<UiTransactionEncoding>,
) -> Result<Option<ConfirmedTransaction>> {
) -> Option<ConfirmedTransaction> {
let encoding = encoding.unwrap_or(UiTransactionEncoding::Json);
if self.config.enable_rpc_transaction_history {
Ok(self
match self
.blockstore
.get_confirmed_transaction(signature, encoding)
.get_confirmed_transaction(signature, Some(encoding))
.unwrap_or(None)
.filter(|confirmed_transaction| {
confirmed_transaction.slot
{
Some(confirmed_transaction) => {
if confirmed_transaction.slot
<= self
.block_commitment_cache
.read()
.unwrap()
.largest_confirmed_root()
}))
} else {
Ok(None)
.highest_confirmed_slot()
{
return Some(confirmed_transaction);
}
}
None => {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return self
.runtime_handle
.block_on(
bigtable_ledger_storage
.get_confirmed_transaction(&signature, encoding),
)
.unwrap_or(None);
}
}
}
}
None
}
pub fn get_confirmed_signatures_for_address(
@ -707,6 +767,8 @@ impl JsonRpcRequestProcessor {
end_slot: Slot,
) -> Result<Vec<Signature>> {
if self.config.enable_rpc_transaction_history {
// TODO: Add bigtable_ledger_storage support as a part of
// https://github.com/solana-labs/solana/pull/10928
let end_slot = min(
end_slot,
self.block_commitment_cache
@ -724,10 +786,23 @@ impl JsonRpcRequestProcessor {
}
pub fn get_first_available_block(&self) -> Result<Slot> {
Ok(self
let slot = self
.blockstore
.get_first_available_block()
.unwrap_or_default())
.unwrap_or_default();
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let bigtable_slot = self
.runtime_handle
.block_on(bigtable_ledger_storage.get_first_available_block())
.unwrap_or(None)
.unwrap_or(slot);
if bigtable_slot < slot {
return Ok(bigtable_slot);
}
}
Ok(slot)
}
pub fn get_confirmed_signatures_for_address2(
@ -2008,7 +2083,7 @@ impl RpcSol for RpcSolImpl {
signature_str
);
let signature = verify_signature(&signature_str)?;
meta.get_confirmed_transaction(signature, encoding)
Ok(meta.get_confirmed_transaction(signature, encoding))
}
fn get_confirmed_signatures_for_address(
@ -2379,6 +2454,8 @@ pub mod tests {
&bank_forks,
&exit,
)),
&runtime::Runtime::new().unwrap(),
None,
);
cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr(
@ -2430,6 +2507,8 @@ pub mod tests {
&bank_forks,
&exit,
)),
&runtime::Runtime::new().unwrap(),
None,
);
thread::spawn(move || {
let blockhash = bank.confirmed_last_blockhash().0;
@ -3526,6 +3605,8 @@ pub mod tests {
&bank_forks,
&exit,
)),
&runtime::Runtime::new().unwrap(),
None,
);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["37u9WtQpcm6ULa3Vmu7ySnANv"]}"#;
@ -3570,6 +3651,8 @@ pub mod tests {
&bank_forks,
&exit,
)),
&runtime::Runtime::new().unwrap(),
None,
);
let mut bad_transaction =
@ -3764,6 +3847,8 @@ pub mod tests {
&bank_forks,
&exit,
)),
&runtime::Runtime::new().unwrap(),
None,
);
assert_eq!(request_processor.validator_exit(), Ok(false));
assert_eq!(exit.load(Ordering::Relaxed), false);
@ -3796,6 +3881,8 @@ pub mod tests {
&bank_forks,
&exit,
)),
&runtime::Runtime::new().unwrap(),
None,
);
assert_eq!(request_processor.validator_exit(), Ok(true));
assert_eq!(exit.load(Ordering::Relaxed), true);
@ -3887,6 +3974,8 @@ pub mod tests {
&bank_forks,
&exit,
)),
&runtime::Runtime::new().unwrap(),
None,
);
assert_eq!(
request_processor.get_block_commitment(0),

View File

@ -24,7 +24,7 @@ use std::{
sync::{mpsc::channel, Arc, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::prelude::Future;
use tokio::runtime;
pub struct JsonRpcService {
thread_hdl: JoinHandle<()>,
@ -33,6 +33,7 @@ pub struct JsonRpcService {
pub request_processor: JsonRpcRequestProcessor, // Used only by test_rpc_new()...
close_handle: Option<CloseHandle>,
runtime: runtime::Runtime,
}
struct RpcRequestMiddleware {
@ -98,6 +99,9 @@ impl RpcRequestMiddleware {
}
fn process_file_get(&self, path: &str) -> RequestMiddlewareAction {
// Stuck on tokio 0.1 until the jsonrpc-http-server crate upgrades to tokio 0.2
use tokio_01::prelude::*;
let stem = path.split_at(1).1; // Drop leading '/' from path
let filename = {
match path {
@ -116,10 +120,10 @@ impl RpcRequestMiddleware {
RequestMiddlewareAction::Respond {
should_validate_hosts: true,
response: Box::new(
tokio_fs::file::File::open(filename)
tokio_fs_01::file::File::open(filename)
.and_then(|file| {
let buf: Vec<u8> = Vec::new();
tokio_io::io::read_to_end(file, buf)
tokio_io_01::io::read_to_end(file, buf)
.and_then(|item| Ok(hyper::Response::new(item.1.into())))
.or_else(|_| Ok(RpcRequestMiddleware::internal_server_error()))
})
@ -256,6 +260,27 @@ impl JsonRpcService {
&exit_send_transaction_service,
));
let mut runtime = runtime::Builder::new()
.threaded_scheduler()
.thread_name("rpc-runtime")
.enable_all()
.build()
.expect("Runtime");
let bigtable_ledger_storage = if config.enable_bigtable_ledger_storage {
runtime
.block_on(solana_storage_bigtable::LedgerStorage::new(false))
.map(|x| {
info!("BigTable ledger storage initialized");
Some(x)
})
.unwrap_or_else(|err| {
error!("Failed to initialize BigTable ledger storage: {:?}", err);
None
})
} else {
None
};
let request_processor = JsonRpcRequestProcessor::new(
config,
bank_forks.clone(),
@ -266,6 +291,8 @@ impl JsonRpcService {
cluster_info,
genesis_hash,
send_transaction_service,
&runtime,
bigtable_ledger_storage,
);
#[cfg(test)]
@ -325,6 +352,7 @@ impl JsonRpcService {
.register_exit(Box::new(move || close_handle_.close()));
Self {
thread_hdl,
runtime,
#[cfg(test)]
request_processor: test_request_processor,
close_handle: Some(close_handle),
@ -338,6 +366,7 @@ impl JsonRpcService {
}
pub fn join(self) -> thread::Result<()> {
self.runtime.shutdown_background();
self.thread_hdl.join()
}
}

View File

@ -36,7 +36,9 @@ use std::{
iter,
sync::{Arc, Mutex, RwLock},
};
use tokio::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor};
// Stuck on tokio 0.1 until the jsonrpc-pubsub crate upgrades to tokio 0.2
use tokio_01::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor};
const RECEIVE_DELAY_MILLIS: u64 = 100;
@ -965,7 +967,7 @@ pub(crate) mod tests {
system_transaction,
};
use std::{fmt::Debug, sync::mpsc::channel, time::Instant};
use tokio::{prelude::FutureExt, runtime::Runtime, timer::Delay};
use tokio_01::{prelude::FutureExt, runtime::Runtime, timer::Delay};
pub(crate) fn robust_poll_or_panic<T: Debug + Send + 'static>(
receiver: futures::sync::mpsc::Receiver<T>,

View File

@ -26,7 +26,7 @@ use std::{
thread::sleep,
time::{Duration, Instant},
};
use tokio::runtime::Runtime;
use tokio_01::runtime::Runtime;
macro_rules! json_req {
($method: expr, $params: expr) => {{
@ -189,7 +189,7 @@ fn test_rpc_subscriptions() {
.and_then(move |client| {
for sig in signature_set {
let status_sender = status_sender.clone();
tokio::spawn(
tokio_01::spawn(
client
.signature_subscribe(sig.clone(), None)
.and_then(move |sig_stream| {
@ -203,7 +203,7 @@ fn test_rpc_subscriptions() {
}),
);
}
tokio::spawn(
tokio_01::spawn(
client
.slot_subscribe()
.and_then(move |slot_stream| {
@ -218,7 +218,7 @@ fn test_rpc_subscriptions() {
);
for pubkey in account_set {
let account_sender = account_sender.clone();
tokio::spawn(
tokio_01::spawn(
client
.account_subscribe(pubkey, None)
.and_then(move |account_stream| {

View File

@ -0,0 +1,106 @@
# Long term RPC Transaction History
There's a need for RPC to serve at least 6 months of transaction history. The
current history, on the order of days, is insufficient for downstream users.
6 months of transaction data cannot be stored practically in a validator's
rocksdb ledger so an external data store is necessary. The validator's
rocksdb ledger will continue to serve as the primary data source, and then will
fall back to the external data store.
The affected RPC endpoints are:
* [getFirstAvailableBlock](https://docs.solana.com/apps/jsonrpc-api#getfirstavailableblock)
* [getConfirmedBlock](https://docs.solana.com/apps/jsonrpc-api#getconfirmedblock)
* [getConfirmedBlocks](https://docs.solana.com/apps/jsonrpc-api#getconfirmedblocks)
* [getConfirmedSignaturesForAddress](https://docs.solana.com/apps/jsonrpc-api#getconfirmedsignaturesforaddress)
* [getConfirmedTransaction](https://docs.solana.com/apps/jsonrpc-api#getconfirmedtransaction)
* [getSignatureStatuses](https://docs.solana.com/apps/jsonrpc-api#getsignaturestatuses)
Note that [getBlockTime](https://docs.solana.com/apps/jsonrpc-api#getblocktime)
is not supported, as once https://github.com/solana-labs/solana/issues/10089 is
fixed then `getBlockTime` can be removed.
Some system design constraints:
* The volume of data to store and search can quickly jump into the terabytes,
and is immutable.
* The system should be as light as possible for SREs. For example an SQL
database cluster that requires an SRE to continually monitor and rebalance
nodes is undesirable.
* Data must be searchable in real time - batched queries that take minutes or
hours to run are unacceptable.
* Easy to replicate the data worldwide to co-locate it with the RPC endpoints
that will utilize it.
* Interfacing with the external data store should be easy and not require
depending on risky lightly-used community-supported code libraries
Based on these constraints, Google's BigTable product is selected as the data
store.
## Table Schema
A BigTable instance is used to hold all transaction data, broken up into
different tables for quick searching.
New data may be copied into the instance at anytime without affecting the existing
data, and all data is immutable. Generally the expectation is that new data
will be uploaded once an current epoch completes but there is no limitation on
the frequency of data dumps.
Cleanup of old data is automatic by configuring the data retention policy of the
instance tables appropriately, it just disappears. Therefore the order of when data is
added becomes important. For example if data from epoch N-1 is added after data
from epoch N, the older epoch data will outlive the newer data. However beyond
producing _holes_ in query results, this kind of unordered deletion will
have no ill effect. Note that this method of cleanup effectively allows for an
unlimited amount of transaction data to be stored, restricted only by the
monetary costs of doing so.
The table layout s supports the existing RPC endpoints only. New RPC endpoints
in the future may require additions to the schema and potentially iterating over
all transactions to build up the necessary metadata.
## Accessing BigTable
BigTable has a gRPC endpoint that can be accessed using the
[tonic](https://crates.io/crates/crate)] and the raw protobuf API, as currently no
higher-level Rust crate for BigTable exists. Practically this makes parsing the
results of BigTable queries more complicated but is not a significant issue.
## Data Population
The ongoing population of instance data will occur on an epoch cadence through the
use of a new `solana-ledger-tool` command that will convert rocksdb data for a
given slot range into the instance schema.
The same process will be run once, manually, to backfill the existing ledger
data.
### Block Table: `block`
This table contains the compressed block data for a given slot.
The row key is generated by taking the 16 digit lower case hexadecimal
representation of the slot, to ensure that the oldest slot with a confirmed
block will always be first when the rows are listed. eg, The row key for slot
42 would be 000000000000002a.
The row data is a compressed `StoredConfirmedBlock` struct.
### Account Address Transaction Signature Lookup Table: `tx-by-addr`
This table contains the transactions that affect a given address.
The row key is `<base58
address>/<slot-id-one's-compliment-hex-slot-0-prefixed-to-16-digits>`. The row
data is a compressed `TransactionByAddrInfo` struct.
Taking the one's compliment of the slot allows for listing of slots ensures that
the newest slot with transactions that affect an address will always
be listed first.
Sysvar addresses are not indexed. However frequently used programs such as
Vote or System are, and will likely have a row for every confirmed slot.
### Transaction Signature Lookup Table: `tx`
This table maps a transaction signature to its confirmed block, and index within that block.
The row key is the base58-encoded transaction signature.
The row data is a compressed `TransactionInfo` struct.

View File

@ -12,22 +12,27 @@ homepage = "https://solana.com/"
bs58 = "0.3.1"
bytecount = "0.6.0"
clap = "2.33.1"
futures = "0.3.5"
futures-util = "0.3.5"
histogram = "*"
log = { version = "0.4.8" }
regex = "1"
serde_json = "1.0.53"
serde_yaml = "0.8.12"
solana-clap-utils = { path = "../clap-utils", version = "1.2.20" }
solana-cli = { path = "../cli", version = "1.2.20" }
solana-ledger = { path = "../ledger", version = "1.2.20" }
solana-logger = { path = "../logger", version = "1.2.20" }
solana-measure = { path = "../measure", version = "1.2.20" }
solana-runtime = { path = "../runtime", version = "1.2.20" }
solana-sdk = { path = "../sdk", version = "1.2.20" }
solana-stake-program = { path = "../programs/stake", version = "1.2.20" }
solana-storage-bigtable = { path = "../storage-bigtable", version = "1.2.20" }
solana-transaction-status = { path = "../transaction-status", version = "1.2.20" }
solana-version = { path = "../version", version = "1.2.20" }
solana-vote-program = { path = "../programs/vote", version = "1.2.20" }
tempfile = "3.1.0"
regex = "1"
tokio = { version = "0.2.22", features = ["full"] }
[dev-dependencies]
assert_cmd = "1.0"

548
ledger-tool/src/bigtable.rs Normal file
View File

@ -0,0 +1,548 @@
/// The `bigtable` subcommand
use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand};
use log::*;
use solana_clap_utils::{
input_parsers::pubkey_of,
input_validators::{is_slot, is_valid_pubkey},
};
use solana_cli::display::println_transaction;
use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType};
use solana_measure::measure::Measure;
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashSet, path::Path, process::exit, result::Result, time::Duration};
use tokio::time::delay_for;
// Attempt to upload this many blocks in parallel
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32;
// Read up to this many blocks from blockstore before blocking on the upload process
const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2;
async fn upload(
blockstore: Blockstore,
starting_slot: Slot,
ending_slot: Option<Slot>,
allow_missing_metadata: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let mut measure = Measure::start("entire upload");
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
.await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
info!("Loading ledger slots...");
let blockstore_slots: Vec<_> = blockstore
.slot_meta_iterator(starting_slot)
.map_err(|err| {
format!(
"Failed to load entries starting from slot {}: {:?}",
starting_slot, err
)
})?
.filter_map(|(slot, _slot_meta)| {
if let Some(ending_slot) = &ending_slot {
if slot > *ending_slot {
return None;
}
}
Some(slot)
})
.collect();
if blockstore_slots.is_empty() {
info!("Ledger has no slots in the specified range");
return Ok(());
}
info!(
"Found {} slots in the range ({}, {})",
blockstore_slots.len(),
blockstore_slots.first().unwrap(),
blockstore_slots.last().unwrap()
);
let mut blockstore_slots_with_no_confirmed_block = HashSet::new();
// Gather the blocks that are already present in bigtable, by slot
let bigtable_slots = {
let mut bigtable_slots = vec![];
let first_blockstore_slot = *blockstore_slots.first().unwrap();
let last_blockstore_slot = *blockstore_slots.last().unwrap();
info!(
"Loading list of bigtable blocks between slots {} and {}...",
first_blockstore_slot, last_blockstore_slot
);
let mut start_slot = *blockstore_slots.first().unwrap();
while start_slot <= last_blockstore_slot {
let mut next_bigtable_slots = loop {
match bigtable.get_confirmed_blocks(start_slot, 1000).await {
Ok(slots) => break slots,
Err(err) => {
error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err);
// Consider exponential backoff...
delay_for(Duration::from_secs(2)).await;
}
}
};
if next_bigtable_slots.is_empty() {
break;
}
bigtable_slots.append(&mut next_bigtable_slots);
start_slot = bigtable_slots.last().unwrap() + 1;
}
bigtable_slots
.into_iter()
.filter(|slot| *slot <= last_blockstore_slot)
.collect::<Vec<_>>()
};
// The blocks that still need to be uploaded is the difference between what's already in the
// bigtable and what's in blockstore...
let blocks_to_upload = {
let blockstore_slots = blockstore_slots.iter().cloned().collect::<HashSet<_>>();
let bigtable_slots = bigtable_slots.into_iter().collect::<HashSet<_>>();
let mut blocks_to_upload = blockstore_slots
.difference(&blockstore_slots_with_no_confirmed_block)
.cloned()
.collect::<HashSet<_>>()
.difference(&bigtable_slots)
.cloned()
.collect::<Vec<_>>();
blocks_to_upload.sort();
blocks_to_upload
};
if blocks_to_upload.is_empty() {
info!("No blocks need to be uploaded to bigtable");
return Ok(());
}
info!(
"{} blocks to be uploaded to the bucket in the range ({}, {})",
blocks_to_upload.len(),
blocks_to_upload.first().unwrap(),
blocks_to_upload.last().unwrap()
);
// Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading
let (_loader_thread, receiver) = {
let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH);
(
std::thread::spawn(move || {
let mut measure = Measure::start("block loader thread");
for (i, slot) in blocks_to_upload.iter().enumerate() {
let _ = match blockstore.get_confirmed_block(
*slot,
Some(solana_transaction_status::UiTransactionEncoding::Binary),
) {
Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))),
Err(err) => {
warn!(
"Failed to get load confirmed block from slot {}: {:?}",
slot, err
);
sender.send((*slot, None))
}
};
if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 {
info!(
"{}% of blocks processed ({}/{})",
i * 100 / blocks_to_upload.len(),
i,
blocks_to_upload.len()
);
}
}
measure.stop();
info!("{} to load {} blocks", measure, blocks_to_upload.len());
}),
receiver,
)
};
let mut failures = 0;
use futures::stream::StreamExt;
let mut stream =
tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL);
while let Some(blocks) = stream.next().await {
let mut measure_upload = Measure::start("Upload");
let mut num_blocks = blocks.len();
info!("Preparing the next {} blocks for upload", num_blocks);
let uploads = blocks.into_iter().filter_map(|(slot, block)| match block {
None => {
blockstore_slots_with_no_confirmed_block.insert(slot);
num_blocks -= 1;
None
}
Some(confirmed_block) => {
if confirmed_block
.transactions
.iter()
.any(|transaction| transaction.meta.is_none())
{
if allow_missing_metadata {
info!("Transaction metadata missing from slot {}", slot);
} else {
panic!("Transaction metadata missing from slot {}", slot);
}
}
Some(bigtable.upload_confirmed_block(slot, confirmed_block))
}
});
for result in futures::future::join_all(uploads).await {
if result.is_err() {
error!("upload_confirmed_block() failed: {:?}", result.err());
failures += 1;
}
}
measure_upload.stop();
info!("{} for {} blocks", measure_upload, num_blocks);
}
measure.stop();
info!("{}", measure);
if failures > 0 {
Err(format!("Incomplete upload, {} operations failed", failures).into())
} else {
Ok(())
}
}
async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?;
match bigtable.get_first_available_block().await? {
Some(block) => println!("{}", block),
None => println!("No blocks available"),
}
Ok(())
}
async fn block(slot: Slot) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
.await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
let block = bigtable
.get_confirmed_block(slot, UiTransactionEncoding::Binary)
.await?;
println!("Slot: {}", slot);
println!("Parent Slot: {}", block.parent_slot);
println!("Blockhash: {}", block.blockhash);
println!("Previous Blockhash: {}", block.previous_blockhash);
if block.block_time.is_some() {
println!("Block Time: {:?}", block.block_time);
}
if !block.rewards.is_empty() {
println!("Rewards: {:?}", block.rewards);
}
for (index, transaction_with_meta) in block.transactions.iter().enumerate() {
println!("Transaction {}:", index);
println_transaction(
&transaction_with_meta.transaction.decode().unwrap(),
&transaction_with_meta.meta,
" ",
);
}
Ok(())
}
async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
.await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
let slots = bigtable.get_confirmed_blocks(starting_slot, limit).await?;
println!("{:?}", slots);
println!("{} blocks found", slots.len());
Ok(())
}
async fn confirm(signature: &Signature, verbose: bool) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
.await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
let transaction_status = bigtable.get_signature_status(signature).await?;
if verbose {
match bigtable
.get_confirmed_transaction(signature, UiTransactionEncoding::Binary)
.await
{
Ok(Some(confirmed_transaction)) => {
println!(
"\nTransaction executed in slot {}:",
confirmed_transaction.slot
);
println_transaction(
&confirmed_transaction
.transaction
.transaction
.decode()
.expect("Successful decode"),
&confirmed_transaction.transaction.meta,
" ",
);
}
Ok(None) => println!("Confirmed transaction details not available"),
Err(err) => println!("Unable to get confirmed transaction details: {}", err),
}
println!();
}
match transaction_status.status {
Ok(_) => println!("Confirmed"),
Err(err) => println!("Transaction failed: {}", err),
}
Ok(())
}
pub async fn transaction_history(
address: &Pubkey,
limit: usize,
before: Option<&Signature>,
verbose: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?;
let results = bigtable
.get_confirmed_signatures_for_address(address, before, limit)
.await?;
for (signature, slot, memo, err) in results {
if verbose {
println!(
"{}, slot={}, memo=\"{}\", status={}",
signature,
slot,
memo.unwrap_or_else(|| "".to_string()),
match err {
None => "Confirmed".to_string(),
Some(err) => format!("Failed: {:?}", err),
}
);
} else {
println!("{}", signature);
}
}
Ok(())
}
pub trait BigTableSubCommand {
fn bigtable_subcommand(self) -> Self;
}
impl BigTableSubCommand for App<'_, '_> {
fn bigtable_subcommand(self) -> Self {
self.subcommand(
SubCommand::with_name("bigtable")
.about("Ledger data on a BigTable instance")
.setting(AppSettings::ArgRequiredElseHelp)
.subcommand(
SubCommand::with_name("upload")
.about("Upload the ledger to BigTable")
.arg(
Arg::with_name("starting_slot")
.long("starting-slot")
.validator(is_slot)
.value_name("SLOT")
.takes_value(true)
.index(1)
.help(
"Start uploading at this slot [default: first available slot]",
),
)
.arg(
Arg::with_name("ending_slot")
.long("ending-slot")
.validator(is_slot)
.value_name("SLOT")
.takes_value(true)
.index(2)
.help("Stop uploading at this slot [default: last available slot]"),
)
.arg(
Arg::with_name("allow_missing_metadata")
.long("allow-missing-metadata")
.takes_value(false)
.help("Don't panic if transaction metadata is missing"),
),
)
.subcommand(
SubCommand::with_name("first-available-block")
.about("Get the first available block in the storage"),
)
.subcommand(
SubCommand::with_name("blocks")
.about("Get a list of slots with confirmed blocks for the given range")
.arg(
Arg::with_name("starting_slot")
.long("starting-slot")
.validator(is_slot)
.value_name("SLOT")
.takes_value(true)
.index(1)
.required(true)
.default_value("0")
.help("Start listing at this slot"),
)
.arg(
Arg::with_name("limit")
.long("limit")
.validator(is_slot)
.value_name("LIMIT")
.takes_value(true)
.index(2)
.required(true)
.default_value("1000")
.help("Maximum number of slots to return"),
),
)
.subcommand(
SubCommand::with_name("block")
.about("Get a confirmed block")
.arg(
Arg::with_name("slot")
.long("slot")
.validator(is_slot)
.value_name("SLOT")
.takes_value(true)
.index(1)
.required(true),
),
)
.subcommand(
SubCommand::with_name("confirm")
.about("Confirm transaction by signature")
.arg(
Arg::with_name("signature")
.long("signature")
.value_name("TRANSACTION_SIGNATURE")
.takes_value(true)
.required(true)
.index(1)
.help("The transaction signature to confirm"),
)
.arg(
Arg::with_name("verbose")
.short("v")
.long("verbose")
.takes_value(false)
.help("Show additional information"),
),
)
.subcommand(
SubCommand::with_name("transaction-history")
.about(
"Show historical transactions affecting the given address, \
ordered based on the slot in which they were confirmed in \
from lowest to highest slot",
)
.arg(
Arg::with_name("address")
.index(1)
.value_name("ADDRESS")
.required(true)
.validator(is_valid_pubkey)
.help("Account address"),
)
.arg(
Arg::with_name("limit")
.long("limit")
.takes_value(true)
.value_name("LIMIT")
.validator(is_slot)
.index(2)
.default_value("1000")
.help("Maximum number of transaction signatures to return"),
)
.arg(
Arg::with_name("before")
.long("before")
.value_name("TRANSACTION_SIGNATURE")
.takes_value(true)
.help("Start with the first signature older than this one"),
)
.arg(
Arg::with_name("verbose")
.short("v")
.long("verbose")
.takes_value(false)
.help("Show additional information"),
),
),
)
}
}
pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let future = match matches.subcommand() {
("upload", Some(arg_matches)) => {
let starting_slot = value_t!(arg_matches, "starting_slot", Slot).unwrap_or(0);
let ending_slot = value_t!(arg_matches, "ending_slot", Slot).ok();
let allow_missing_metadata = arg_matches.is_present("allow_missing_metadata");
let blockstore =
crate::open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary);
runtime.block_on(upload(
blockstore,
starting_slot,
ending_slot,
allow_missing_metadata,
))
}
("first-available-block", Some(_arg_matches)) => runtime.block_on(first_available_block()),
("block", Some(arg_matches)) => {
let slot = value_t_or_exit!(arg_matches, "slot", Slot);
runtime.block_on(block(slot))
}
("blocks", Some(arg_matches)) => {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let limit = value_t_or_exit!(arg_matches, "limit", usize);
runtime.block_on(blocks(starting_slot, limit))
}
("confirm", Some(arg_matches)) => {
let signature = arg_matches
.value_of("signature")
.unwrap()
.parse()
.expect("Invalid signature");
let verbose = arg_matches.is_present("verbose");
runtime.block_on(confirm(&signature, verbose))
}
("transaction-history", Some(arg_matches)) => {
let address = pubkey_of(arg_matches, "address").unwrap();
let limit = value_t_or_exit!(arg_matches, "limit", usize);
let before = arg_matches
.value_of("before")
.map(|signature| signature.parse().expect("Invalid signature"));
let verbose = arg_matches.is_present("verbose");
runtime.block_on(transaction_history(
&address,
limit,
before.as_ref(),
verbose,
))
}
_ => unreachable!(),
};
future.unwrap_or_else(|err| {
eprintln!("{:?}", err);
exit(1);
});
}

View File

@ -2,6 +2,7 @@ use clap::{
crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, Arg,
ArgMatches, SubCommand,
};
use log::*;
use regex::Regex;
use serde_json::json;
use solana_clap_utils::input_validators::{is_parsable, is_slot};
@ -40,7 +41,8 @@ use std::{
sync::Arc,
};
use log::*;
mod bigtable;
use bigtable::*;
#[derive(PartialEq)]
enum LedgerOutputMethod {
@ -704,6 +706,7 @@ fn main() {
.global(true)
.help("Use DIR for ledger location"),
)
.bigtable_subcommand()
.subcommand(
SubCommand::with_name("print")
.about("Print the ledger")
@ -975,6 +978,7 @@ fn main() {
});
match matches.subcommand() {
("bigtable", Some(arg_matches)) => bigtable_process_command(&ledger_path, arg_matches),
("print", Some(arg_matches)) => {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let allow_dead_slots = arg_matches.is_present("allow_dead_slots");

View File

@ -1652,7 +1652,7 @@ impl Blockstore {
iterator
.map(|transaction| {
let signature = transaction.signatures[0];
let encoded_transaction = EncodedTransaction::encode(transaction, encoding.clone());
let encoded_transaction = EncodedTransaction::encode(transaction, encoding);
TransactionWithStatusMeta {
transaction: encoded_transaction,
meta: self

View File

@ -48,6 +48,9 @@ while [[ -n $1 ]]; do
elif [[ $1 = --enable-rpc-transaction-history ]]; then
args+=("$1")
shift
elif [[ $1 = --enable-rpc-bigtable-ledger-storage ]]; then
args+=("$1")
shift
elif [[ $1 = --skip-poh-verify ]]; then
args+=("$1")
shift

View File

@ -0,0 +1,36 @@
[package]
name = "solana-storage-bigtable"
version = "1.2.28"
description = "Solana Storage BigTable"
authors = ["Solana Maintainers <maintainers@solana.com>"]
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"
edition = "2018"
[dependencies]
backoff = {version="0.2.1", features = ["tokio"]}
bincode = "1.2.1"
bzip2 = "0.3.3"
enum-iterator = "0.6.0"
flate2 = "1.0.14"
goauth = "0.7.2"
log = "0.4.8"
prost = "0.6.1"
prost-types = "0.6.1"
serde = "1.0.112"
serde_derive = "1.0.103"
smpl_jwt = "0.5.0"
solana-sdk = { path = "../sdk", version = "1.1.20" }
solana-transaction-status = { path = "../transaction-status", version = "1.1.20" }
thiserror = "1.0"
futures = "0.3.5"
tonic = {version="0.3.0", features = ["tls", "transport"]}
zstd = "0.5.1"
[lib]
crate-type = ["lib"]
name = "solana_storage_bigtable"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

View File

@ -0,0 +1,22 @@
## BigTable Setup
### Development Environment
The Cloud BigTable emulator can be used during development/test. See
https://cloud.google.com/bigtable/docs/emulator for general setup information.
Process:
1. Run `gcloud beta emulators bigtable start` in the background
2. Run `$(gcloud beta emulators bigtable env-init)` to establish the `BIGTABLE_EMULATOR_HOST` environment variable
3. Run `./init-bigtable.sh` to configure the emulator
4. Develop/test
### Production Environment
Export a standard `GOOGLE_APPLICATION_CREDENTIALS` environment variable to your
service account credentials. The project should contain a BigTable instance
called `solana-ledger` that has been initialized by running the `./init-bigtable.sh` script.
Depending on what operation mode is required, either the
`https://www.googleapis.com/auth/bigtable.data` or
`https://www.googleapis.com/auth/bigtable.data.readonly` OAuth scope will be
requested using the provided credentials.

View File

@ -0,0 +1,2 @@
googleapis/
target/

340
storage-bigtable/build-proto/Cargo.lock generated Normal file
View File

@ -0,0 +1,340 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "anyhow"
version = "1.0.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f"
[[package]]
name = "autocfg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
[[package]]
name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "either"
version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3"
[[package]]
name = "fixedbitset"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d"
[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "hashbrown"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34f595585f103464d8d2f6e9864682d74c1601fed5e07d62b1c9058dba8246fb"
dependencies = [
"autocfg",
]
[[package]]
name = "heck"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "indexmap"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b88cd59ee5f71fea89a62248fc8f387d44400cefe05ef548466d61ced9029a7"
dependencies = [
"autocfg",
"hashbrown",
]
[[package]]
name = "itertools"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f56a2d0bc861f9165be4eb3442afd3c236d8a98afd426f65d92324ae1091a484"
dependencies = [
"either",
]
[[package]]
name = "libc"
version = "0.2.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd7d4bd64732af4bf3a67f367c27df8520ad7e230c5817b8ff485864d80242b9"
[[package]]
name = "log"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b"
dependencies = [
"cfg-if",
]
[[package]]
name = "multimap"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8883adfde9756c1d30b0f519c9b8c502a94b41ac62f696453c37c7fc0a958ce"
[[package]]
name = "petgraph"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "467d164a6de56270bd7c4d070df81d07beace25012d5103ced4e9ff08d6afdb7"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]]
name = "ppv-lite86"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea"
[[package]]
name = "proc-macro2"
version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04f5f085b5d71e2188cb8271e5da0161ad52c3f227a661a3c135fdf28e258b12"
dependencies = [
"unicode-xid",
]
[[package]]
name = "prost"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce49aefe0a6144a45de32927c77bd2859a5f7677b55f220ae5b744e87389c212"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02b10678c913ecbd69350e8535c3aef91a8676c0773fc1d7b95cdd196d7f2f26"
dependencies = [
"bytes",
"heck",
"itertools",
"log",
"multimap",
"petgraph",
"prost",
"prost-types",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "537aa19b95acde10a12fec4301466386f757403de4cd4e5b4fa78fb5ecb18f72"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1834f67c0697c001304b75be76f67add9c89742eda3a085ad8ee0bb38c3417aa"
dependencies = [
"bytes",
"prost",
]
[[package]]
name = "proto"
version = "1.1.20"
dependencies = [
"tonic-build",
]
[[package]]
name = "quote"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom",
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core",
]
[[package]]
name = "redox_syscall"
version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
]
[[package]]
name = "syn"
version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "936cae2873c940d92e697597c5eee105fb570cd5689c695806f672883653349b"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "tempfile"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9"
dependencies = [
"cfg-if",
"libc",
"rand",
"redox_syscall",
"remove_dir_all",
"winapi",
]
[[package]]
name = "tonic-build"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d8d21cb568e802d77055ab7fcd43f0992206de5028de95c8d3a41118d32e8e"
dependencies = [
"proc-macro2",
"prost-build",
"quote",
"syn",
]
[[package]]
name = "unicode-segmentation"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0"
[[package]]
name = "unicode-xid"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "which"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d011071ae14a2f6671d0b74080ae0cd8ebf3a6f8c9589a2cd45f23126fe29724"
dependencies = [
"libc",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"

View File

@ -0,0 +1,15 @@
[package]
authors = ["Solana Maintainers <maintainers@solana.com>"]
description = "Blockchain, Rebuilt for Scale"
edition = "2018"
homepage = "https://solana.com/"
license = "Apache-2.0"
name = "proto"
publish = false
repository = "https://github.com/solana-labs/solana"
version = "1.1.20"
[workspace]
[dependencies]
tonic-build = "0.2.0"

View File

@ -0,0 +1,2 @@
Helper project to build Rust bindings for BigTable, to avoid requiring all
Solana developers have protoc installed

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
set -ex
cd "$(dirname "$0")"
if [[ ! -d googleapis ]]; then
git clone https://github.com/googleapis/googleapis.git
fi
exec cargo run

View File

@ -0,0 +1,19 @@
fn main() -> Result<(), std::io::Error> {
let manifest_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let out_dir = manifest_dir.join("../proto");
let googleapis = manifest_dir.join("googleapis");
println!("Google API directory: {}", googleapis.display());
println!("output directory: {}", out_dir.display());
tonic_build::configure()
.build_client(true)
.build_server(false)
.format(true)
.out_dir(&out_dir)
.compile(
&[googleapis.join("google/bigtable/v2/bigtable.proto")],
&[googleapis],
)
}

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash
#
# Configures a BigTable instance with the expected tables
#
set -e
instance=solana-ledger
cbt=(
cbt
-instance
"$instance"
)
if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then
cbt+=(-project emulator)
fi
for table in blocks tx tx-by-addr; do
(
set -x
"${cbt[@]}" createtable $table
"${cbt[@]}" createfamily $table x
"${cbt[@]}" setgcpolicy $table x maxversions=1
"${cbt[@]}" setgcpolicy $table x maxage=360d
)
done

View File

@ -0,0 +1,632 @@
/// Defines the HTTP configuration for an API service. It contains a list of
/// [HttpRule][google.api.HttpRule], each specifying the mapping of an RPC method
/// to one or more HTTP REST API methods.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Http {
/// A list of HTTP configuration rules that apply to individual API methods.
///
/// **NOTE:** All service configuration rules follow "last one wins" order.
#[prost(message, repeated, tag = "1")]
pub rules: ::std::vec::Vec<HttpRule>,
/// When set to true, URL path parameters will be fully URI-decoded except in
/// cases of single segment matches in reserved expansion, where "%2F" will be
/// left encoded.
///
/// The default behavior is to not decode RFC 6570 reserved characters in multi
/// segment matches.
#[prost(bool, tag = "2")]
pub fully_decode_reserved_expansion: bool,
}
/// # gRPC Transcoding
///
/// gRPC Transcoding is a feature for mapping between a gRPC method and one or
/// more HTTP REST endpoints. It allows developers to build a single API service
/// that supports both gRPC APIs and REST APIs. Many systems, including [Google
/// APIs](https://github.com/googleapis/googleapis),
/// [Cloud Endpoints](https://cloud.google.com/endpoints), [gRPC
/// Gateway](https://github.com/grpc-ecosystem/grpc-gateway),
/// and [Envoy](https://github.com/envoyproxy/envoy) proxy support this feature
/// and use it for large scale production services.
///
/// `HttpRule` defines the schema of the gRPC/REST mapping. The mapping specifies
/// how different portions of the gRPC request message are mapped to the URL
/// path, URL query parameters, and HTTP request body. It also controls how the
/// gRPC response message is mapped to the HTTP response body. `HttpRule` is
/// typically specified as an `google.api.http` annotation on the gRPC method.
///
/// Each mapping specifies a URL path template and an HTTP method. The path
/// template may refer to one or more fields in the gRPC request message, as long
/// as each field is a non-repeated field with a primitive (non-message) type.
/// The path template controls how fields of the request message are mapped to
/// the URL path.
///
/// Example:
///
/// service Messaging {
/// rpc GetMessage(GetMessageRequest) returns (Message) {
/// option (google.api.http) = {
/// get: "/v1/{name=messages/*}"
/// };
/// }
/// }
/// message GetMessageRequest {
/// string name = 1; // Mapped to URL path.
/// }
/// message Message {
/// string text = 1; // The resource content.
/// }
///
/// This enables an HTTP REST to gRPC mapping as below:
///
/// HTTP | gRPC
/// -----|-----
/// `GET /v1/messages/123456` | `GetMessage(name: "messages/123456")`
///
/// Any fields in the request message which are not bound by the path template
/// automatically become HTTP query parameters if there is no HTTP request body.
/// For example:
///
/// service Messaging {
/// rpc GetMessage(GetMessageRequest) returns (Message) {
/// option (google.api.http) = {
/// get:"/v1/messages/{message_id}"
/// };
/// }
/// }
/// message GetMessageRequest {
/// message SubMessage {
/// string subfield = 1;
/// }
/// string message_id = 1; // Mapped to URL path.
/// int64 revision = 2; // Mapped to URL query parameter `revision`.
/// SubMessage sub = 3; // Mapped to URL query parameter `sub.subfield`.
/// }
///
/// This enables a HTTP JSON to RPC mapping as below:
///
/// HTTP | gRPC
/// -----|-----
/// `GET /v1/messages/123456?revision=2&sub.subfield=foo` |
/// `GetMessage(message_id: "123456" revision: 2 sub: SubMessage(subfield:
/// "foo"))`
///
/// Note that fields which are mapped to URL query parameters must have a
/// primitive type or a repeated primitive type or a non-repeated message type.
/// In the case of a repeated type, the parameter can be repeated in the URL
/// as `...?param=A&param=B`. In the case of a message type, each field of the
/// message is mapped to a separate parameter, such as
/// `...?foo.a=A&foo.b=B&foo.c=C`.
///
/// For HTTP methods that allow a request body, the `body` field
/// specifies the mapping. Consider a REST update method on the
/// message resource collection:
///
/// service Messaging {
/// rpc UpdateMessage(UpdateMessageRequest) returns (Message) {
/// option (google.api.http) = {
/// patch: "/v1/messages/{message_id}"
/// body: "message"
/// };
/// }
/// }
/// message UpdateMessageRequest {
/// string message_id = 1; // mapped to the URL
/// Message message = 2; // mapped to the body
/// }
///
/// The following HTTP JSON to RPC mapping is enabled, where the
/// representation of the JSON in the request body is determined by
/// protos JSON encoding:
///
/// HTTP | gRPC
/// -----|-----
/// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id:
/// "123456" message { text: "Hi!" })`
///
/// The special name `*` can be used in the body mapping to define that
/// every field not bound by the path template should be mapped to the
/// request body. This enables the following alternative definition of
/// the update method:
///
/// service Messaging {
/// rpc UpdateMessage(Message) returns (Message) {
/// option (google.api.http) = {
/// patch: "/v1/messages/{message_id}"
/// body: "*"
/// };
/// }
/// }
/// message Message {
/// string message_id = 1;
/// string text = 2;
/// }
///
///
/// The following HTTP JSON to RPC mapping is enabled:
///
/// HTTP | gRPC
/// -----|-----
/// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id:
/// "123456" text: "Hi!")`
///
/// Note that when using `*` in the body mapping, it is not possible to
/// have HTTP parameters, as all fields not bound by the path end in
/// the body. This makes this option more rarely used in practice when
/// defining REST APIs. The common usage of `*` is in custom methods
/// which don't use the URL at all for transferring data.
///
/// It is possible to define multiple HTTP methods for one RPC by using
/// the `additional_bindings` option. Example:
///
/// service Messaging {
/// rpc GetMessage(GetMessageRequest) returns (Message) {
/// option (google.api.http) = {
/// get: "/v1/messages/{message_id}"
/// additional_bindings {
/// get: "/v1/users/{user_id}/messages/{message_id}"
/// }
/// };
/// }
/// }
/// message GetMessageRequest {
/// string message_id = 1;
/// string user_id = 2;
/// }
///
/// This enables the following two alternative HTTP JSON to RPC mappings:
///
/// HTTP | gRPC
/// -----|-----
/// `GET /v1/messages/123456` | `GetMessage(message_id: "123456")`
/// `GET /v1/users/me/messages/123456` | `GetMessage(user_id: "me" message_id:
/// "123456")`
///
/// ## Rules for HTTP mapping
///
/// 1. Leaf request fields (recursive expansion nested messages in the request
/// message) are classified into three categories:
/// - Fields referred by the path template. They are passed via the URL path.
/// - Fields referred by the [HttpRule.body][google.api.HttpRule.body]. They are passed via the HTTP
/// request body.
/// - All other fields are passed via the URL query parameters, and the
/// parameter name is the field path in the request message. A repeated
/// field can be represented as multiple query parameters under the same
/// name.
/// 2. If [HttpRule.body][google.api.HttpRule.body] is "*", there is no URL query parameter, all fields
/// are passed via URL path and HTTP request body.
/// 3. If [HttpRule.body][google.api.HttpRule.body] is omitted, there is no HTTP request body, all
/// fields are passed via URL path and URL query parameters.
///
/// ### Path template syntax
///
/// Template = "/" Segments [ Verb ] ;
/// Segments = Segment { "/" Segment } ;
/// Segment = "*" | "**" | LITERAL | Variable ;
/// Variable = "{" FieldPath [ "=" Segments ] "}" ;
/// FieldPath = IDENT { "." IDENT } ;
/// Verb = ":" LITERAL ;
///
/// The syntax `*` matches a single URL path segment. The syntax `**` matches
/// zero or more URL path segments, which must be the last part of the URL path
/// except the `Verb`.
///
/// The syntax `Variable` matches part of the URL path as specified by its
/// template. A variable template must not contain other variables. If a variable
/// matches a single path segment, its template may be omitted, e.g. `{var}`
/// is equivalent to `{var=*}`.
///
/// The syntax `LITERAL` matches literal text in the URL path. If the `LITERAL`
/// contains any reserved character, such characters should be percent-encoded
/// before the matching.
///
/// If a variable contains exactly one path segment, such as `"{var}"` or
/// `"{var=*}"`, when such a variable is expanded into a URL path on the client
/// side, all characters except `[-_.~0-9a-zA-Z]` are percent-encoded. The
/// server side does the reverse decoding. Such variables show up in the
/// [Discovery
/// Document](https://developers.google.com/discovery/v1/reference/apis) as
/// `{var}`.
///
/// If a variable contains multiple path segments, such as `"{var=foo/*}"`
/// or `"{var=**}"`, when such a variable is expanded into a URL path on the
/// client side, all characters except `[-_.~/0-9a-zA-Z]` are percent-encoded.
/// The server side does the reverse decoding, except "%2F" and "%2f" are left
/// unchanged. Such variables show up in the
/// [Discovery
/// Document](https://developers.google.com/discovery/v1/reference/apis) as
/// `{+var}`.
///
/// ## Using gRPC API Service Configuration
///
/// gRPC API Service Configuration (service config) is a configuration language
/// for configuring a gRPC service to become a user-facing product. The
/// service config is simply the YAML representation of the `google.api.Service`
/// proto message.
///
/// As an alternative to annotating your proto file, you can configure gRPC
/// transcoding in your service config YAML files. You do this by specifying a
/// `HttpRule` that maps the gRPC method to a REST endpoint, achieving the same
/// effect as the proto annotation. This can be particularly useful if you
/// have a proto that is reused in multiple services. Note that any transcoding
/// specified in the service config will override any matching transcoding
/// configuration in the proto.
///
/// Example:
///
/// http:
/// rules:
/// # Selects a gRPC method and applies HttpRule to it.
/// - selector: example.v1.Messaging.GetMessage
/// get: /v1/messages/{message_id}/{sub.subfield}
///
/// ## Special notes
///
/// When gRPC Transcoding is used to map a gRPC to JSON REST endpoints, the
/// proto to JSON conversion must follow the [proto3
/// specification](https://developers.google.com/protocol-buffers/docs/proto3#json).
///
/// While the single segment variable follows the semantics of
/// [RFC 6570](https://tools.ietf.org/html/rfc6570) Section 3.2.2 Simple String
/// Expansion, the multi segment variable **does not** follow RFC 6570 Section
/// 3.2.3 Reserved Expansion. The reason is that the Reserved Expansion
/// does not expand special characters like `?` and `#`, which would lead
/// to invalid URLs. As the result, gRPC Transcoding uses a custom encoding
/// for multi segment variables.
///
/// The path variables **must not** refer to any repeated or mapped field,
/// because client libraries are not capable of handling such variable expansion.
///
/// The path variables **must not** capture the leading "/" character. The reason
/// is that the most common use case "{var}" does not capture the leading "/"
/// character. For consistency, all path variables must share the same behavior.
///
/// Repeated message fields must not be mapped to URL query parameters, because
/// no client library can support such complicated mapping.
///
/// If an API needs to use a JSON array for request or response body, it can map
/// the request or response body to a repeated field. However, some gRPC
/// Transcoding implementations may not support this feature.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HttpRule {
/// Selects a method to which this rule applies.
///
/// Refer to [selector][google.api.DocumentationRule.selector] for syntax details.
#[prost(string, tag = "1")]
pub selector: std::string::String,
/// The name of the request field whose value is mapped to the HTTP request
/// body, or `*` for mapping all request fields not captured by the path
/// pattern to the HTTP body, or omitted for not having any HTTP request body.
///
/// NOTE: the referred field must be present at the top-level of the request
/// message type.
#[prost(string, tag = "7")]
pub body: std::string::String,
/// Optional. The name of the response field whose value is mapped to the HTTP
/// response body. When omitted, the entire response message will be used
/// as the HTTP response body.
///
/// NOTE: The referred field must be present at the top-level of the response
/// message type.
#[prost(string, tag = "12")]
pub response_body: std::string::String,
/// Additional HTTP bindings for the selector. Nested bindings must
/// not contain an `additional_bindings` field themselves (that is,
/// the nesting may only be one level deep).
#[prost(message, repeated, tag = "11")]
pub additional_bindings: ::std::vec::Vec<HttpRule>,
/// Determines the URL pattern is matched by this rules. This pattern can be
/// used with any of the {get|put|post|delete|patch} methods. A custom method
/// can be defined using the 'custom' field.
#[prost(oneof = "http_rule::Pattern", tags = "2, 3, 4, 5, 6, 8")]
pub pattern: ::std::option::Option<http_rule::Pattern>,
}
pub mod http_rule {
/// Determines the URL pattern is matched by this rules. This pattern can be
/// used with any of the {get|put|post|delete|patch} methods. A custom method
/// can be defined using the 'custom' field.
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Pattern {
/// Maps to HTTP GET. Used for listing and getting information about
/// resources.
#[prost(string, tag = "2")]
Get(std::string::String),
/// Maps to HTTP PUT. Used for replacing a resource.
#[prost(string, tag = "3")]
Put(std::string::String),
/// Maps to HTTP POST. Used for creating a resource or performing an action.
#[prost(string, tag = "4")]
Post(std::string::String),
/// Maps to HTTP DELETE. Used for deleting a resource.
#[prost(string, tag = "5")]
Delete(std::string::String),
/// Maps to HTTP PATCH. Used for updating a resource.
#[prost(string, tag = "6")]
Patch(std::string::String),
/// The custom pattern is used for specifying an HTTP method that is not
/// included in the `pattern` field, such as HEAD, or "*" to leave the
/// HTTP method unspecified for this rule. The wild-card rule is useful
/// for services that provide content to Web (HTML) clients.
#[prost(message, tag = "8")]
Custom(super::CustomHttpPattern),
}
}
/// A custom pattern is used for defining custom HTTP verb.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CustomHttpPattern {
/// The name of this custom HTTP verb.
#[prost(string, tag = "1")]
pub kind: std::string::String,
/// The path matched by this custom verb.
#[prost(string, tag = "2")]
pub path: std::string::String,
}
/// An indicator of the behavior of a given field (for example, that a field
/// is required in requests, or given as output but ignored as input).
/// This **does not** change the behavior in protocol buffers itself; it only
/// denotes the behavior and may affect how API tooling handles the field.
///
/// Note: This enum **may** receive new values in the future.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum FieldBehavior {
/// Conventional default for enums. Do not use this.
Unspecified = 0,
/// Specifically denotes a field as optional.
/// While all fields in protocol buffers are optional, this may be specified
/// for emphasis if appropriate.
Optional = 1,
/// Denotes a field as required.
/// This indicates that the field **must** be provided as part of the request,
/// and failure to do so will cause an error (usually `INVALID_ARGUMENT`).
Required = 2,
/// Denotes a field as output only.
/// This indicates that the field is provided in responses, but including the
/// field in a request does nothing (the server *must* ignore it and
/// *must not* throw an error as a result of the field's presence).
OutputOnly = 3,
/// Denotes a field as input only.
/// This indicates that the field is provided in requests, and the
/// corresponding field is not included in output.
InputOnly = 4,
/// Denotes a field as immutable.
/// This indicates that the field may be set once in a request to create a
/// resource, but may not be changed thereafter.
Immutable = 5,
}
/// A simple descriptor of a resource type.
///
/// ResourceDescriptor annotates a resource message (either by means of a
/// protobuf annotation or use in the service config), and associates the
/// resource's schema, the resource type, and the pattern of the resource name.
///
/// Example:
///
/// message Topic {
/// // Indicates this message defines a resource schema.
/// // Declares the resource type in the format of {service}/{kind}.
/// // For Kubernetes resources, the format is {api group}/{kind}.
/// option (google.api.resource) = {
/// type: "pubsub.googleapis.com/Topic"
/// name_descriptor: {
/// pattern: "projects/{project}/topics/{topic}"
/// parent_type: "cloudresourcemanager.googleapis.com/Project"
/// parent_name_extractor: "projects/{project}"
/// }
/// };
/// }
///
/// The ResourceDescriptor Yaml config will look like:
///
/// resources:
/// - type: "pubsub.googleapis.com/Topic"
/// name_descriptor:
/// - pattern: "projects/{project}/topics/{topic}"
/// parent_type: "cloudresourcemanager.googleapis.com/Project"
/// parent_name_extractor: "projects/{project}"
///
/// Sometimes, resources have multiple patterns, typically because they can
/// live under multiple parents.
///
/// Example:
///
/// message LogEntry {
/// option (google.api.resource) = {
/// type: "logging.googleapis.com/LogEntry"
/// name_descriptor: {
/// pattern: "projects/{project}/logs/{log}"
/// parent_type: "cloudresourcemanager.googleapis.com/Project"
/// parent_name_extractor: "projects/{project}"
/// }
/// name_descriptor: {
/// pattern: "folders/{folder}/logs/{log}"
/// parent_type: "cloudresourcemanager.googleapis.com/Folder"
/// parent_name_extractor: "folders/{folder}"
/// }
/// name_descriptor: {
/// pattern: "organizations/{organization}/logs/{log}"
/// parent_type: "cloudresourcemanager.googleapis.com/Organization"
/// parent_name_extractor: "organizations/{organization}"
/// }
/// name_descriptor: {
/// pattern: "billingAccounts/{billing_account}/logs/{log}"
/// parent_type: "billing.googleapis.com/BillingAccount"
/// parent_name_extractor: "billingAccounts/{billing_account}"
/// }
/// };
/// }
///
/// The ResourceDescriptor Yaml config will look like:
///
/// resources:
/// - type: 'logging.googleapis.com/LogEntry'
/// name_descriptor:
/// - pattern: "projects/{project}/logs/{log}"
/// parent_type: "cloudresourcemanager.googleapis.com/Project"
/// parent_name_extractor: "projects/{project}"
/// - pattern: "folders/{folder}/logs/{log}"
/// parent_type: "cloudresourcemanager.googleapis.com/Folder"
/// parent_name_extractor: "folders/{folder}"
/// - pattern: "organizations/{organization}/logs/{log}"
/// parent_type: "cloudresourcemanager.googleapis.com/Organization"
/// parent_name_extractor: "organizations/{organization}"
/// - pattern: "billingAccounts/{billing_account}/logs/{log}"
/// parent_type: "billing.googleapis.com/BillingAccount"
/// parent_name_extractor: "billingAccounts/{billing_account}"
///
/// For flexible resources, the resource name doesn't contain parent names, but
/// the resource itself has parents for policy evaluation.
///
/// Example:
///
/// message Shelf {
/// option (google.api.resource) = {
/// type: "library.googleapis.com/Shelf"
/// name_descriptor: {
/// pattern: "shelves/{shelf}"
/// parent_type: "cloudresourcemanager.googleapis.com/Project"
/// }
/// name_descriptor: {
/// pattern: "shelves/{shelf}"
/// parent_type: "cloudresourcemanager.googleapis.com/Folder"
/// }
/// };
/// }
///
/// The ResourceDescriptor Yaml config will look like:
///
/// resources:
/// - type: 'library.googleapis.com/Shelf'
/// name_descriptor:
/// - pattern: "shelves/{shelf}"
/// parent_type: "cloudresourcemanager.googleapis.com/Project"
/// - pattern: "shelves/{shelf}"
/// parent_type: "cloudresourcemanager.googleapis.com/Folder"
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ResourceDescriptor {
/// The resource type. It must be in the format of
/// {service_name}/{resource_type_kind}. The `resource_type_kind` must be
/// singular and must not include version numbers.
///
/// Example: `storage.googleapis.com/Bucket`
///
/// The value of the resource_type_kind must follow the regular expression
/// /[A-Za-z][a-zA-Z0-9]+/. It should start with an upper case character and
/// should use PascalCase (UpperCamelCase). The maximum number of
/// characters allowed for the `resource_type_kind` is 100.
#[prost(string, tag = "1")]
pub r#type: std::string::String,
/// Optional. The relative resource name pattern associated with this resource
/// type. The DNS prefix of the full resource name shouldn't be specified here.
///
/// The path pattern must follow the syntax, which aligns with HTTP binding
/// syntax:
///
/// Template = Segment { "/" Segment } ;
/// Segment = LITERAL | Variable ;
/// Variable = "{" LITERAL "}" ;
///
/// Examples:
///
/// - "projects/{project}/topics/{topic}"
/// - "projects/{project}/knowledgeBases/{knowledge_base}"
///
/// The components in braces correspond to the IDs for each resource in the
/// hierarchy. It is expected that, if multiple patterns are provided,
/// the same component name (e.g. "project") refers to IDs of the same
/// type of resource.
#[prost(string, repeated, tag = "2")]
pub pattern: ::std::vec::Vec<std::string::String>,
/// Optional. The field on the resource that designates the resource name
/// field. If omitted, this is assumed to be "name".
#[prost(string, tag = "3")]
pub name_field: std::string::String,
/// Optional. The historical or future-looking state of the resource pattern.
///
/// Example:
///
/// // The InspectTemplate message originally only supported resource
/// // names with organization, and project was added later.
/// message InspectTemplate {
/// option (google.api.resource) = {
/// type: "dlp.googleapis.com/InspectTemplate"
/// pattern:
/// "organizations/{organization}/inspectTemplates/{inspect_template}"
/// pattern: "projects/{project}/inspectTemplates/{inspect_template}"
/// history: ORIGINALLY_SINGLE_PATTERN
/// };
/// }
#[prost(enumeration = "resource_descriptor::History", tag = "4")]
pub history: i32,
/// The plural name used in the resource name and permission names, such as
/// 'projects' for the resource name of 'projects/{project}' and the permission
/// name of 'cloudresourcemanager.googleapis.com/projects.get'. It is the same
/// concept of the `plural` field in k8s CRD spec
/// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/
///
/// Note: The plural form is required even for singleton resources. See
/// https://aip.dev/156
#[prost(string, tag = "5")]
pub plural: std::string::String,
/// The same concept of the `singular` field in k8s CRD spec
/// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/
/// Such as "project" for the `resourcemanager.googleapis.com/Project` type.
#[prost(string, tag = "6")]
pub singular: std::string::String,
}
pub mod resource_descriptor {
/// A description of the historical or future-looking state of the
/// resource pattern.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum History {
/// The "unset" value.
Unspecified = 0,
/// The resource originally had one pattern and launched as such, and
/// additional patterns were added later.
OriginallySinglePattern = 1,
/// The resource has one pattern, but the API owner expects to add more
/// later. (This is the inverse of ORIGINALLY_SINGLE_PATTERN, and prevents
/// that from being necessary once there are multiple patterns.)
FutureMultiPattern = 2,
}
}
/// Defines a proto annotation that describes a string field that refers to
/// an API resource.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ResourceReference {
/// The resource type that the annotated field references.
///
/// Example:
///
/// message Subscription {
/// string topic = 2 [(google.api.resource_reference) = {
/// type: "pubsub.googleapis.com/Topic"
/// }];
/// }
///
/// Occasionally, a field may reference an arbitrary resource. In this case,
/// APIs use the special value * in their resource reference.
///
/// Example:
///
/// message GetIamPolicyRequest {
/// string resource = 2 [(google.api.resource_reference) = {
/// type: "*"
/// }];
/// }
#[prost(string, tag = "1")]
pub r#type: std::string::String,
/// The resource type of a child collection that the annotated field
/// references. This is useful for annotating the `parent` field that
/// doesn't have a fixed resource type.
///
/// Example:
///
/// message ListLogEntriesRequest {
/// string parent = 1 [(google.api.resource_reference) = {
/// child_type: "logging.googleapis.com/LogEntry"
/// };
/// }
#[prost(string, tag = "2")]
pub child_type: std::string::String,
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,22 @@
/// The `Status` type defines a logical error model that is suitable for
/// different programming environments, including REST APIs and RPC APIs. It is
/// used by [gRPC](https://github.com/grpc). Each `Status` message contains
/// three pieces of data: error code, error message, and error details.
///
/// You can find out more about this error model and how to work with it in the
/// [API Design Guide](https://cloud.google.com/apis/design/errors).
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Status {
/// The status code, which should be an enum value of [google.rpc.Code][google.rpc.Code].
#[prost(int32, tag = "1")]
pub code: i32,
/// A developer-facing error message, which should be in English. Any
/// user-facing error message should be localized and sent in the
/// [google.rpc.Status.details][google.rpc.Status.details] field, or localized by the client.
#[prost(string, tag = "2")]
pub message: std::string::String,
/// A list of messages that carry the error details. There is a common set of
/// message types for APIs to use.
#[prost(message, repeated, tag = "3")]
pub details: ::std::vec::Vec<::prost_types::Any>,
}

View File

@ -0,0 +1,118 @@
/// A module for managing a Google API access token
use goauth::{
auth::{JwtClaims, Token},
credentials::Credentials,
};
use log::*;
use smpl_jwt::Jwt;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
{Arc, RwLock},
},
time::Instant,
};
pub use goauth::scopes::Scope;
fn load_credentials() -> Result<Credentials, String> {
// Use standard GOOGLE_APPLICATION_CREDENTIALS environment variable
let credentials_file = std::env::var("GOOGLE_APPLICATION_CREDENTIALS")
.map_err(|_| "GOOGLE_APPLICATION_CREDENTIALS environment variable not found".to_string())?;
Credentials::from_file(&credentials_file).map_err(|err| {
format!(
"Failed to read GCP credentials from {}: {}",
credentials_file, err
)
})
}
#[derive(Clone)]
pub struct AccessToken {
credentials: Credentials,
scope: Scope,
refresh_active: Arc<AtomicBool>,
token: Arc<RwLock<(Token, Instant)>>,
}
impl AccessToken {
pub async fn new(scope: Scope) -> Result<Self, String> {
let credentials = load_credentials()?;
if let Err(err) = credentials.rsa_key() {
Err(format!("Invalid rsa key: {}", err))
} else {
let token = Arc::new(RwLock::new(Self::get_token(&credentials, &scope).await?));
let access_token = Self {
credentials,
scope,
token,
refresh_active: Arc::new(AtomicBool::new(false)),
};
Ok(access_token)
}
}
/// The project that this token grants access to
pub fn project(&self) -> String {
self.credentials.project()
}
async fn get_token(
credentials: &Credentials,
scope: &Scope,
) -> Result<(Token, Instant), String> {
info!("Requesting token for {:?} scope", scope);
let claims = JwtClaims::new(
credentials.iss(),
scope,
credentials.token_uri(),
None,
None,
);
let jwt = Jwt::new(claims, credentials.rsa_key().unwrap(), None);
let token = goauth::get_token(&jwt, credentials)
.await
.map_err(|err| format!("Failed to refresh access token: {}", err))?;
info!("Token expires in {} seconds", token.expires_in());
Ok((token, Instant::now()))
}
/// Call this function regularly to ensure the access token does not expire
pub async fn refresh(&self) {
// Check if it's time to try a token refresh
{
let token_r = self.token.read().unwrap();
if token_r.1.elapsed().as_secs() < token_r.0.expires_in() as u64 / 2 {
return;
}
if self
.refresh_active
.compare_and_swap(false, true, Ordering::Relaxed)
{
// Refresh already pending
return;
}
}
info!("Refreshing token");
let new_token = Self::get_token(&self.credentials, &self.scope).await;
{
let mut token_w = self.token.write().unwrap();
match new_token {
Ok(new_token) => *token_w = new_token,
Err(err) => warn!("{}", err),
}
self.refresh_active.store(false, Ordering::Relaxed);
}
}
/// Return an access token suitable for use in an HTTP authorization header
pub fn get(&self) -> String {
let token_r = self.token.read().unwrap();
format!("{} {}", token_r.0.token_type(), token_r.0.access_token())
}
}

View File

@ -0,0 +1,466 @@
// Primitives for reading/writing BigTable tables
use crate::access_token::{AccessToken, Scope};
use crate::compression::{compress_best, decompress};
use crate::root_ca_certificate;
use log::*;
use thiserror::Error;
use tonic::{metadata::MetadataValue, transport::ClientTlsConfig, Request};
mod google {
mod rpc {
include!(concat!(
env!("CARGO_MANIFEST_DIR"),
concat!("/proto/google.rpc.rs")
));
}
pub mod bigtable {
pub mod v2 {
include!(concat!(
env!("CARGO_MANIFEST_DIR"),
concat!("/proto/google.bigtable.v2.rs")
));
}
}
}
use google::bigtable::v2::*;
pub type RowKey = String;
pub type CellName = String;
pub type CellValue = Vec<u8>;
pub type RowData = Vec<(CellName, CellValue)>;
#[derive(Debug, Error)]
pub enum Error {
#[error("AccessToken error: {0}")]
AccessTokenError(String),
#[error("Certificate error: {0}")]
CertificateError(String),
#[error("I/O Error: {0}")]
IoError(std::io::Error),
#[error("Transport error: {0}")]
TransportError(tonic::transport::Error),
#[error("Invalid URI {0}: {1}")]
InvalidUri(String, String),
#[error("Row not found")]
RowNotFound,
#[error("Row write failed")]
RowWriteFailed,
#[error("Object not found: {0}")]
ObjectNotFound(String),
#[error("Object is corrupt: {0}")]
ObjectCorrupt(String),
#[error("RPC error: {0}")]
RpcError(tonic::Status),
}
impl std::convert::From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}
impl std::convert::From<tonic::transport::Error> for Error {
fn from(err: tonic::transport::Error) -> Self {
Self::TransportError(err)
}
}
impl std::convert::From<tonic::Status> for Error {
fn from(err: tonic::Status) -> Self {
Self::RpcError(err)
}
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone)]
pub struct BigTableConnection {
access_token: Option<AccessToken>,
channel: tonic::transport::Channel,
table_prefix: String,
}
impl BigTableConnection {
/// Establish a connection to the BigTable instance named `instance_name`. If read-only access
/// is required, the `read_only` flag should be used to reduce the requested OAuth2 scope.
///
/// The GOOGLE_APPLICATION_CREDENTIALS environment variable will be used to determine the
/// program name that contains the BigTable instance in addition to access credentials.
///
/// The BIGTABLE_EMULATOR_HOST environment variable is also respected.
///
pub async fn new(instance_name: &str, read_only: bool) -> Result<Self> {
match std::env::var("BIGTABLE_EMULATOR_HOST") {
Ok(endpoint) => {
info!("Connecting to bigtable emulator at {}", endpoint);
Ok(Self {
access_token: None,
channel: tonic::transport::Channel::from_shared(format!("http://{}", endpoint))
.map_err(|err| Error::InvalidUri(endpoint, err.to_string()))?
.connect_lazy()?,
table_prefix: format!("projects/emulator/instances/{}/tables/", instance_name),
})
}
Err(_) => {
let access_token = AccessToken::new(if read_only {
Scope::BigTableDataReadOnly
} else {
Scope::BigTableData
})
.await
.map_err(Error::AccessTokenError)?;
let table_prefix = format!(
"projects/{}/instances/{}/tables/",
access_token.project(),
instance_name
);
Ok(Self {
access_token: Some(access_token),
channel: tonic::transport::Channel::from_static(
"https://bigtable.googleapis.com",
)
.tls_config(
ClientTlsConfig::new()
.ca_certificate(
root_ca_certificate::load().map_err(Error::CertificateError)?,
)
.domain_name("bigtable.googleapis.com"),
)?
.connect_lazy()?,
table_prefix,
})
}
}
}
/// Create a new BigTable client.
///
/// Clients require `&mut self`, due to `Tonic::transport::Channel` limitations, however
/// creating new clients is cheap and thus can be used as a work around for ease of use.
pub fn client(&self) -> BigTable {
let client = if let Some(access_token) = &self.access_token {
let access_token = access_token.clone();
bigtable_client::BigtableClient::with_interceptor(
self.channel.clone(),
move |mut req: Request<()>| {
match MetadataValue::from_str(&access_token.get()) {
Ok(authorization_header) => {
req.metadata_mut()
.insert("authorization", authorization_header);
}
Err(err) => {
warn!("Failed to set authorization header: {}", err);
}
}
Ok(req)
},
)
} else {
bigtable_client::BigtableClient::new(self.channel.clone())
};
BigTable {
access_token: self.access_token.clone(),
client,
table_prefix: self.table_prefix.clone(),
}
}
pub async fn put_bincode_cells_with_retry<T>(
&self,
table: &str,
cells: &[(RowKey, T)],
) -> Result<usize>
where
T: serde::ser::Serialize,
{
use backoff::{future::FutureOperation as _, ExponentialBackoff};
(|| async {
let mut client = self.client();
Ok(client.put_bincode_cells(table, cells).await?)
})
.retry(ExponentialBackoff::default())
.await
}
}
pub struct BigTable {
access_token: Option<AccessToken>,
client: bigtable_client::BigtableClient<tonic::transport::Channel>,
table_prefix: String,
}
impl BigTable {
async fn decode_read_rows_response(
mut rrr: tonic::codec::Streaming<ReadRowsResponse>,
) -> Result<Vec<(RowKey, RowData)>> {
let mut rows: Vec<(RowKey, RowData)> = vec![];
let mut row_key = None;
let mut row_data = vec![];
let mut cell_name = None;
let mut cell_timestamp = 0;
let mut cell_value = vec![];
let mut cell_version_ok = true;
while let Some(res) = rrr.message().await? {
for (i, mut chunk) in res.chunks.into_iter().enumerate() {
// The comments for `read_rows_response::CellChunk` provide essential details for
// understanding how the below decoding works...
trace!("chunk {}: {:?}", i, chunk);
// Starting a new row?
if !chunk.row_key.is_empty() {
row_key = String::from_utf8(chunk.row_key).ok(); // Require UTF-8 for row keys
}
// Starting a new cell?
if let Some(qualifier) = chunk.qualifier {
if let Some(cell_name) = cell_name {
row_data.push((cell_name, cell_value));
cell_value = vec![];
}
cell_name = String::from_utf8(qualifier).ok(); // Require UTF-8 for cell names
cell_timestamp = chunk.timestamp_micros;
cell_version_ok = true;
} else {
// Continuing the existing cell. Check if this is the start of another version of the cell
if chunk.timestamp_micros != 0 {
if chunk.timestamp_micros < cell_timestamp {
cell_version_ok = false; // ignore older versions of the cell
} else {
// newer version of the cell, remove the older cell
cell_version_ok = true;
cell_value = vec![];
cell_timestamp = chunk.timestamp_micros;
}
}
}
if cell_version_ok {
cell_value.append(&mut chunk.value);
}
// End of a row?
if chunk.row_status.is_some() {
if let Some(read_rows_response::cell_chunk::RowStatus::CommitRow(_)) =
chunk.row_status
{
if let Some(cell_name) = cell_name {
row_data.push((cell_name, cell_value));
}
if let Some(row_key) = row_key {
rows.push((row_key, row_data))
}
}
row_key = None;
row_data = vec![];
cell_value = vec![];
cell_name = None;
}
}
}
Ok(rows)
}
async fn refresh_access_token(&self) {
if let Some(ref access_token) = self.access_token {
access_token.refresh().await;
}
}
/// Get `table` row keys in lexical order.
///
/// If `start_at` is provided, the row key listing will start with key.
/// Otherwise the listing will start from the start of the table.
pub async fn get_row_keys(
&mut self,
table_name: &str,
start_at: Option<RowKey>,
rows_limit: i64,
) -> Result<Vec<RowKey>> {
self.refresh_access_token().await;
let response = self
.client
.read_rows(ReadRowsRequest {
table_name: format!("{}{}", self.table_prefix, table_name),
rows_limit,
rows: Some(RowSet {
row_keys: vec![],
row_ranges: if let Some(row_key) = start_at {
vec![RowRange {
start_key: Some(row_range::StartKey::StartKeyClosed(
row_key.into_bytes(),
)),
end_key: None,
}]
} else {
vec![]
},
}),
filter: Some(RowFilter {
filter: Some(row_filter::Filter::Chain(row_filter::Chain {
filters: vec![
RowFilter {
// Return minimal number of cells
filter: Some(row_filter::Filter::CellsPerRowLimitFilter(1)),
},
RowFilter {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
},
RowFilter {
// Strip the cell values
filter: Some(row_filter::Filter::StripValueTransformer(true)),
},
],
})),
}),
..ReadRowsRequest::default()
})
.await?
.into_inner();
let rows = Self::decode_read_rows_response(response).await?;
Ok(rows.into_iter().map(|r| r.0).collect())
}
/// Get latest data from `limit` rows of `table`, starting inclusively at the `row_key` row.
///
/// All column families are accepted, and only the latest version of each column cell will be
/// returned.
pub async fn get_row_data(&mut self, table_name: &str, row_key: RowKey) -> Result<RowData> {
self.refresh_access_token().await;
let response = self
.client
.read_rows(ReadRowsRequest {
table_name: format!("{}{}", self.table_prefix, table_name),
rows_limit: 1,
rows: Some(RowSet {
row_keys: vec![row_key.into_bytes()],
row_ranges: vec![],
}),
filter: Some(RowFilter {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
..ReadRowsRequest::default()
})
.await?
.into_inner();
let rows = Self::decode_read_rows_response(response).await?;
rows.into_iter()
.next()
.map(|r| r.1)
.ok_or_else(|| Error::RowNotFound)
}
/// Store data for one or more `table` rows in the `family_name` Column family
async fn put_row_data(
&mut self,
table_name: &str,
family_name: &str,
row_data: &[(&RowKey, RowData)],
) -> Result<()> {
self.refresh_access_token().await;
let mut entries = vec![];
for (row_key, row_data) in row_data {
let mutations = row_data
.iter()
.map(|(column_key, column_value)| Mutation {
mutation: Some(mutation::Mutation::SetCell(mutation::SetCell {
family_name: family_name.to_string(),
column_qualifier: column_key.clone().into_bytes(),
timestamp_micros: -1, // server assigned
value: column_value.to_vec(),
})),
})
.collect();
entries.push(mutate_rows_request::Entry {
row_key: (*row_key).clone().into_bytes(),
mutations,
});
}
let mut response = self
.client
.mutate_rows(MutateRowsRequest {
table_name: format!("{}{}", self.table_prefix, table_name),
entries,
..MutateRowsRequest::default()
})
.await?
.into_inner();
while let Some(res) = response.message().await? {
for entry in res.entries {
if let Some(status) = entry.status {
if status.code != 0 {
eprintln!("put_row_data error {}: {}", status.code, status.message);
warn!("put_row_data error {}: {}", status.code, status.message);
return Err(Error::RowWriteFailed);
}
}
}
}
Ok(())
}
pub async fn get_bincode_cell<T>(&mut self, table: &str, key: RowKey) -> Result<T>
where
T: serde::de::DeserializeOwned,
{
let row_data = self.get_row_data(table, key.clone()).await?;
let value = row_data
.into_iter()
.find(|(name, _)| name == "bin")
.ok_or_else(|| Error::ObjectNotFound(format!("{}/{}", table, key)))?
.1;
let data = decompress(&value)?;
bincode::deserialize(&data).map_err(|err| {
warn!("Failed to deserialize {}/{}: {}", table, key, err);
Error::ObjectCorrupt(format!("{}/{}", table, key))
})
}
pub async fn put_bincode_cells<T>(
&mut self,
table: &str,
cells: &[(RowKey, T)],
) -> Result<usize>
where
T: serde::ser::Serialize,
{
let mut bytes_written = 0;
let mut new_row_data = vec![];
for (row_key, data) in cells {
let data = compress_best(&bincode::serialize(&data).unwrap())?;
bytes_written += data.len();
new_row_data.push((row_key, vec![("bin".to_string(), data)]));
}
self.put_row_data(table, "x", &new_row_data).await?;
Ok(bytes_written)
}
}

View File

@ -0,0 +1,105 @@
use enum_iterator::IntoEnumIterator;
use std::io::{self, BufReader, Read, Write};
#[derive(Debug, Serialize, Deserialize, IntoEnumIterator)]
pub enum CompressionMethod {
NoCompression,
Bzip2,
Gzip,
Zstd,
}
fn decompress_reader<'a, R: Read + 'a>(
method: CompressionMethod,
stream: R,
) -> Result<Box<dyn Read + 'a>, io::Error> {
let buf_reader = BufReader::new(stream);
let decompress_reader: Box<dyn Read> = match method {
CompressionMethod::Bzip2 => Box::new(bzip2::bufread::BzDecoder::new(buf_reader)),
CompressionMethod::Gzip => Box::new(flate2::read::GzDecoder::new(buf_reader)),
CompressionMethod::Zstd => Box::new(zstd::stream::read::Decoder::new(buf_reader)?),
CompressionMethod::NoCompression => Box::new(buf_reader),
};
Ok(decompress_reader)
}
pub fn decompress(data: &[u8]) -> Result<Vec<u8>, io::Error> {
let method_size = bincode::serialized_size(&CompressionMethod::NoCompression).unwrap();
if (data.len() as u64) < method_size {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("data len too small: {}", data.len()),
));
}
let method = bincode::deserialize(&data[..method_size as usize]).map_err(|err| {
io::Error::new(
io::ErrorKind::Other,
format!("method deserialize failed: {}", err),
)
})?;
let mut reader = decompress_reader(method, &data[method_size as usize..])?;
let mut uncompressed_data = vec![];
reader.read_to_end(&mut uncompressed_data)?;
Ok(uncompressed_data)
}
pub fn compress(method: CompressionMethod, data: &[u8]) -> Result<Vec<u8>, io::Error> {
let mut compressed_data = bincode::serialize(&method).unwrap();
compressed_data.extend(
match method {
CompressionMethod::Bzip2 => {
let mut e = bzip2::write::BzEncoder::new(Vec::new(), bzip2::Compression::Best);
e.write_all(data)?;
e.finish()?
}
CompressionMethod::Gzip => {
let mut e =
flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
e.write_all(data)?;
e.finish()?
}
CompressionMethod::Zstd => {
let mut e = zstd::stream::write::Encoder::new(Vec::new(), 0).unwrap();
e.write_all(data)?;
e.finish()?
}
CompressionMethod::NoCompression => data.to_vec(),
}
.into_iter(),
);
Ok(compressed_data)
}
pub fn compress_best(data: &[u8]) -> Result<Vec<u8>, io::Error> {
let mut candidates = vec![];
for method in CompressionMethod::into_enum_iter() {
candidates.push(compress(method, data)?);
}
Ok(candidates
.into_iter()
.min_by(|a, b| a.len().cmp(&b.len()))
.unwrap())
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_compress_uncompress() {
let data = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
assert_eq!(
decompress(&compress_best(&data).expect("compress_best")).expect("decompress"),
data
);
}
#[test]
fn test_compress() {
let data = vec![0; 256];
assert!(compress_best(&data).expect("compress_best").len() < data.len());
}
}

536
storage-bigtable/src/lib.rs Normal file
View File

@ -0,0 +1,536 @@
use log::*;
use serde::{Deserialize, Serialize};
use solana_sdk::{
clock::{Slot, UnixTimestamp},
pubkey::Pubkey,
signature::Signature,
sysvar::is_sysvar_id,
transaction::{Transaction, TransactionError},
};
use solana_transaction_status::{
ConfirmedBlock, ConfirmedTransaction, EncodedTransaction, Rewards, TransactionStatus,
TransactionWithStatusMeta, UiTransactionEncoding, UiTransactionStatusMeta,
};
use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
};
use thiserror::Error;
#[macro_use]
extern crate serde_derive;
mod access_token;
mod bigtable;
mod compression;
mod root_ca_certificate;
#[derive(Debug, Error)]
pub enum Error {
#[error("BigTable: {0}")]
BigTableError(bigtable::Error),
#[error("I/O Error: {0}")]
IoError(std::io::Error),
#[error("Transaction encoded is not supported")]
UnsupportedTransactionEncoding,
#[error("Block not found: {0}")]
BlockNotFound(Slot),
#[error("Signature not found")]
SignatureNotFound,
}
impl std::convert::From<bigtable::Error> for Error {
fn from(err: bigtable::Error) -> Self {
Self::BigTableError(err)
}
}
impl std::convert::From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}
pub type Result<T> = std::result::Result<T, Error>;
// Convert a slot to its bucket representation whereby lower slots are always lexically ordered
// before higher slots
fn slot_to_key(slot: Slot) -> String {
format!("{:016x}", slot)
}
// Reverse of `slot_to_key`
fn key_to_slot(key: &str) -> Option<Slot> {
match Slot::from_str_radix(key, 16) {
Ok(slot) => Some(slot),
Err(err) => {
// bucket data is probably corrupt
warn!("Failed to parse object key as a slot: {}: {}", key, err);
None
}
}
}
// A serialized `StoredConfirmedBlock` is stored in the `block` table
//
// StoredConfirmedBlock holds the same contents as ConfirmedBlock, but is slightly compressed and avoids
// some serde JSON directives that cause issues with bincode
//
#[derive(Serialize, Deserialize)]
struct StoredConfirmedBlock {
previous_blockhash: String,
blockhash: String,
parent_slot: Slot,
transactions: Vec<StoredConfirmedBlockTransaction>,
rewards: Rewards,
block_time: Option<UnixTimestamp>,
}
impl StoredConfirmedBlock {
fn into_confirmed_block(self, encoding: UiTransactionEncoding) -> ConfirmedBlock {
let StoredConfirmedBlock {
previous_blockhash,
blockhash,
parent_slot,
transactions,
rewards,
block_time,
} = self;
ConfirmedBlock {
previous_blockhash,
blockhash,
parent_slot,
transactions: transactions
.into_iter()
.map(|transaction| transaction.into_transaction_with_status_meta(encoding))
.collect(),
rewards,
block_time,
}
}
}
impl TryFrom<ConfirmedBlock> for StoredConfirmedBlock {
type Error = Error;
fn try_from(confirmed_block: ConfirmedBlock) -> Result<Self> {
let ConfirmedBlock {
previous_blockhash,
blockhash,
parent_slot,
transactions,
rewards,
block_time,
} = confirmed_block;
let mut encoded_transactions = vec![];
for transaction in transactions.into_iter() {
encoded_transactions.push(transaction.try_into()?);
}
Ok(Self {
previous_blockhash,
blockhash,
parent_slot,
transactions: encoded_transactions,
rewards,
block_time,
})
}
}
#[derive(Serialize, Deserialize)]
struct StoredConfirmedBlockTransaction {
transaction: Transaction,
meta: Option<StoredConfirmedBlockTransactionStatusMeta>,
}
impl StoredConfirmedBlockTransaction {
fn into_transaction_with_status_meta(
self,
encoding: UiTransactionEncoding,
) -> TransactionWithStatusMeta {
let StoredConfirmedBlockTransaction { transaction, meta } = self;
TransactionWithStatusMeta {
transaction: EncodedTransaction::encode(transaction, encoding),
meta: meta.map(|meta| meta.into()),
}
}
}
impl TryFrom<TransactionWithStatusMeta> for StoredConfirmedBlockTransaction {
type Error = Error;
fn try_from(value: TransactionWithStatusMeta) -> Result<Self> {
let TransactionWithStatusMeta { transaction, meta } = value;
Ok(Self {
transaction: transaction
.decode()
.ok_or(Error::UnsupportedTransactionEncoding)?,
meta: meta.map(|meta| meta.into()),
})
}
}
#[derive(Serialize, Deserialize)]
struct StoredConfirmedBlockTransactionStatusMeta {
err: Option<TransactionError>,
fee: u64,
pre_balances: Vec<u64>,
post_balances: Vec<u64>,
}
impl From<StoredConfirmedBlockTransactionStatusMeta> for UiTransactionStatusMeta {
fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self {
let StoredConfirmedBlockTransactionStatusMeta {
err,
fee,
pre_balances,
post_balances,
} = value;
let status = match &err {
None => Ok(()),
Some(err) => Err(err.clone()),
};
Self {
err,
status,
fee,
pre_balances,
post_balances,
}
}
}
impl From<UiTransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
fn from(value: UiTransactionStatusMeta) -> Self {
let UiTransactionStatusMeta {
err,
fee,
pre_balances,
post_balances,
..
} = value;
Self {
err,
fee,
pre_balances,
post_balances,
}
}
}
// A serialized `TransactionInfo` is stored in the `tx` table
#[derive(Serialize, Deserialize)]
struct TransactionInfo {
slot: Slot, // The slot that contains the block with this transaction in it
index: u32, // Where the transaction is located in the block
err: Option<TransactionError>, // None if the transaction executed successfully
memo: Option<String>, // Transaction memo
}
impl From<TransactionInfo> for TransactionStatus {
fn from(transaction_info: TransactionInfo) -> Self {
let TransactionInfo { slot, err, .. } = transaction_info;
let status = match &err {
None => Ok(()),
Some(err) => Err(err.clone()),
};
Self {
slot,
confirmations: None,
status,
err,
}
}
}
// A serialized `Vec<TransactionByAddrInfo>` is stored in the `tx-by-addr` table. The row keys are
// the one's compliment of the slot so that rows may be listed in reverse order
#[derive(Serialize, Deserialize)]
struct TransactionByAddrInfo {
signature: Signature, // The transaction signature
err: Option<TransactionError>, // None if the transaction executed successfully
index: u32, // Where the transaction is located in the block
memo: Option<String>, // Transaction memo
}
#[derive(Clone)]
pub struct LedgerStorage {
connection: bigtable::BigTableConnection,
}
impl LedgerStorage {
pub async fn new(read_only: bool) -> Result<Self> {
let connection = bigtable::BigTableConnection::new("solana-ledger", read_only).await?;
Ok(Self { connection })
}
/// Return the available slot that contains a block
pub async fn get_first_available_block(&self) -> Result<Option<Slot>> {
let mut bigtable = self.connection.client();
let blocks = bigtable.get_row_keys("blocks", None, 1).await?;
if blocks.is_empty() {
return Ok(None);
}
Ok(key_to_slot(&blocks[0]))
}
/// Fetch the next slots after the provided slot that contains a block
///
/// start_slot: slot to start the search from (inclusive)
/// limit: stop after this many slots have been found.
pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
let mut bigtable = self.connection.client();
let blocks = bigtable
.get_row_keys("blocks", Some(slot_to_key(start_slot)), limit as i64)
.await?;
Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
}
/// Fetch the confirmed block from the desired slot
pub async fn get_confirmed_block(
&self,
slot: Slot,
encoding: UiTransactionEncoding,
) -> Result<ConfirmedBlock> {
let mut bigtable = self.connection.client();
let block = bigtable
.get_bincode_cell::<StoredConfirmedBlock>("blocks", slot_to_key(slot))
.await?;
Ok(block.into_confirmed_block(encoding))
}
pub async fn get_signature_status(&self, signature: &Signature) -> Result<TransactionStatus> {
let mut bigtable = self.connection.client();
let transaction_info = bigtable
.get_bincode_cell::<TransactionInfo>("tx", signature.to_string())
.await?;
Ok(transaction_info.into())
}
/// Fetch a confirmed transaction
pub async fn get_confirmed_transaction(
&self,
signature: &Signature,
encoding: UiTransactionEncoding,
) -> Result<Option<ConfirmedTransaction>> {
let mut bigtable = self.connection.client();
// Figure out which block the transaction is located in
let TransactionInfo { slot, index, .. } = bigtable
.get_bincode_cell("tx", signature.to_string())
.await?;
// Load the block and return the transaction
let block = bigtable
.get_bincode_cell::<StoredConfirmedBlock>("blocks", slot_to_key(slot))
.await?;
match block.transactions.into_iter().nth(index as usize) {
None => {
warn!("Transaction info for {} is corrupt", signature);
Ok(None)
}
Some(bucket_block_transaction) => {
if bucket_block_transaction.transaction.signatures[0] != *signature {
warn!(
"Transaction info or confirmed block for {} is corrupt",
signature
);
Ok(None)
} else {
Ok(Some(ConfirmedTransaction {
slot,
transaction: bucket_block_transaction
.into_transaction_with_status_meta(encoding),
}))
}
}
}
}
/// Get confirmed signatures for the provided address, in descending ledger order
///
/// address: address to search for
/// before_signature: start with the first signature older than this one
/// limit: stop after this many signatures.
pub async fn get_confirmed_signatures_for_address(
&self,
address: &Pubkey,
before_signature: Option<&Signature>,
limit: usize,
) -> Result<Vec<(Signature, Slot, Option<String>, Option<TransactionError>)>> {
let mut bigtable = self.connection.client();
let address_prefix = format!("{}/", address);
// Figure out where to start listing from based on `before_signature`
let (first_slot, mut first_transaction_index) = match before_signature {
None => (Slot::MAX, 0),
Some(before_signature) => {
let TransactionInfo { slot, index, .. } = bigtable
.get_bincode_cell("tx", before_signature.to_string())
.await?;
(slot, index + 1)
}
};
let mut infos = vec![];
// Return the next `limit` tx-by-addr keys
let tx_by_addr_info_keys = bigtable
.get_row_keys(
"tx-by-addr",
Some(format!("{}{}", address_prefix, slot_to_key(!first_slot))),
limit as i64,
)
.await?;
// Read each tx-by-addr object until `limit` signatures have been found
'outer: for key in tx_by_addr_info_keys {
trace!("key is {}: slot is {}", key, &key[address_prefix.len()..]);
if !key.starts_with(&address_prefix) {
break 'outer;
}
let slot = !key_to_slot(&key[address_prefix.len()..]).ok_or_else(|| {
bigtable::Error::ObjectCorrupt(format!(
"Failed to convert key to slot: tx-by-addr/{}",
key
))
})?;
let tx_by_addr_infos = bigtable
.get_bincode_cell::<Vec<TransactionByAddrInfo>>("tx-by-addr", key)
.await?;
for tx_by_addr_info in tx_by_addr_infos
.into_iter()
.skip(first_transaction_index as usize)
{
infos.push((
tx_by_addr_info.signature,
slot,
tx_by_addr_info.memo,
tx_by_addr_info.err,
));
if infos.len() >= limit {
break 'outer;
}
}
first_transaction_index = 0;
}
Ok(infos)
}
// Upload a new confirmed block and associated meta data.
pub async fn upload_confirmed_block(
&self,
slot: Slot,
confirmed_block: ConfirmedBlock,
) -> Result<()> {
let mut bytes_written = 0;
let mut by_addr: HashMap<Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
let mut tx_cells = vec![];
for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
let err = transaction_with_meta
.meta
.as_ref()
.and_then(|meta| meta.err.clone());
let index = index as u32;
let transaction = transaction_with_meta
.transaction
.decode()
.expect("transaction decode failed");
let signature = transaction.signatures[0];
for address in transaction.message.account_keys {
if !is_sysvar_id(&address) {
by_addr
.entry(address)
.or_default()
.push(TransactionByAddrInfo {
signature,
err: err.clone(),
index,
memo: None, // TODO
});
}
}
tx_cells.push((
signature.to_string(),
TransactionInfo {
slot,
index,
err,
memo: None, // TODO
},
));
}
let tx_by_addr_cells: Vec<_> = by_addr
.into_iter()
.map(|(address, transaction_info_by_addr)| {
(
format!("{}/{}", address, slot_to_key(!slot)),
transaction_info_by_addr,
)
})
.collect();
if !tx_cells.is_empty() {
bytes_written += self
.connection
.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
.await?;
}
if !tx_by_addr_cells.is_empty() {
bytes_written += self
.connection
.put_bincode_cells_with_retry::<Vec<TransactionByAddrInfo>>(
"tx-by-addr",
&tx_by_addr_cells,
)
.await?;
}
let num_transactions = confirmed_block.transactions.len();
// Store the block itself last, after all other metadata about the block has been
// successfully stored. This avoids partial uploaded blocks from becoming visible to
// `get_confirmed_block()` and `get_confirmed_blocks()`
let blocks_cells = [(slot_to_key(slot), confirmed_block.try_into()?)];
bytes_written += self
.connection
.put_bincode_cells_with_retry::<StoredConfirmedBlock>("blocks", &blocks_cells)
.await?;
info!(
"uploaded block for slot {}: {} transactions, {} bytes",
slot, num_transactions, bytes_written
);
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_slot_to_key() {
assert_eq!(slot_to_key(0), "0000000000000000");
assert_eq!(slot_to_key(!0), "ffffffffffffffff");
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,20 @@
use std::{fs::File, io::Read};
use tonic::transport::Certificate;
pub fn load() -> Result<Certificate, String> {
// Respect the standard GRPC_DEFAULT_SSL_ROOTS_FILE_PATH environment variable if present,
// otherwise use the built-in root certificate
let pem = match std::env::var("GRPC_DEFAULT_SSL_ROOTS_FILE_PATH").ok() {
Some(cert_file) => File::open(&cert_file)
.and_then(|mut file| {
let mut pem = Vec::new();
file.read_to_end(&mut pem).map(|_| pem)
})
.map_err(|err| format!("Failed to read {}: {}", cert_file, err))?,
None => {
// PEM file from Google Trust Services (https://pki.goog/roots.pem)
include_bytes!("pki-goog-roots.pem").to_vec()
}
};
Ok(Certificate::from_pem(&pem))
}

View File

@ -214,7 +214,7 @@ pub struct TransactionWithStatusMeta {
pub meta: Option<UiTransactionStatusMeta>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub enum UiTransactionEncoding {
Binary,

View File

@ -620,7 +620,15 @@ pub fn main() {
.takes_value(false)
.help("Enable historical transaction info over JSON RPC, \
including the 'getConfirmedBlock' API. \
This will cause an increase in disk usage and IOPS"),
This will cause an increase in disk usage and IOPS"),
)
.arg(
Arg::with_name("enable_rpc_bigtable_ledger_storage")
.long("enable-rpc-bigtable-ledger-storage")
.requires("enable_rpc_transaction_history")
.takes_value(false)
.help("Fetch historical transaction info from a BigTable instance \
as a fallback to local ledger data"),
)
.arg(
Arg::with_name("health_check_slot_distance")
@ -914,6 +922,8 @@ pub fn main() {
enable_validator_exit: matches.is_present("enable_rpc_exit"),
enable_set_log_filter: matches.is_present("enable_rpc_set_log_filter"),
enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"),
enable_bigtable_ledger_storage: matches
.is_present("enable_rpc_bigtable_ledger_storage"),
identity_pubkey: identity_keypair.pubkey(),
faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| {
solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")