Convert Blockstore Rewards cf to protobuf (#12860)

* Add Blockstore protobuf cf type

* Add Rewards message to proto and make generated pub

* Convert Rewards cf to ProtobufColumn

* Add bench

* Adjust tags

* Move solana proto definitions and conversion methods to new crate
This commit is contained in:
Tyera Eulberg
2020-10-15 18:04:10 -06:00
committed by GitHub
parent b510474dcb
commit 359707c85e
20 changed files with 734 additions and 32 deletions

View File

@@ -24,6 +24,7 @@ lazy_static = "1.4.0"
libc = "0.2.72"
log = { version = "0.4.8" }
num_cpus = "1.13.0"
prost = "0.6.1"
rand = "0.7.0"
rand_chacha = "0.2.2"
rayon = "1.4.1"
@@ -43,6 +44,7 @@ solana-runtime = { path = "../runtime", version = "1.5.0" }
solana-sdk = { path = "../sdk", version = "1.5.0" }
solana-stake-program = { path = "../programs/stake", version = "1.5.0" }
solana-storage-bigtable = { path = "../storage-bigtable", version = "1.5.0" }
solana-storage-proto = { path = "../storage-proto", version = "1.5.0" }
solana-vote-program = { path = "../programs/vote", version = "1.5.0" }
tempfile = "3.1.0"
thiserror = "1.0"

115
ledger/benches/protobuf.rs Normal file
View File

@@ -0,0 +1,115 @@
#![feature(test)]
extern crate test;
use bincode::{deserialize, serialize};
use solana_ledger::{
blockstore::Blockstore,
blockstore_db::{columns as cf, LedgerColumn},
get_tmp_ledger_path,
};
use solana_runtime::bank::RewardType;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use solana_transaction_status::{Reward, Rewards};
use std::path::Path;
use test::Bencher;
fn create_rewards() -> Rewards {
(0..100)
.map(|i| Reward {
pubkey: Pubkey::new_rand().to_string(),
lamports: 42 + i,
post_balance: std::u64::MAX,
reward_type: Some(RewardType::Fee),
})
.collect()
}
fn write_bincode_rewards(rewards_cf: &LedgerColumn<cf::Rewards>, slot: Slot, rewards: Rewards) {
let data = serialize(&rewards).unwrap();
rewards_cf.put_bytes(slot, &data).unwrap();
}
fn write_protobuf_rewards(rewards_cf: &LedgerColumn<cf::Rewards>, slot: Slot, rewards: Rewards) {
let rewards = rewards.into();
rewards_cf.put_protobuf(slot, &rewards).unwrap();
}
fn read_bincode_rewards(rewards_cf: &LedgerColumn<cf::Rewards>, slot: Slot) -> Option<Rewards> {
rewards_cf
.get_bytes(slot)
.unwrap()
.map(|data| deserialize::<Rewards>(&data).unwrap())
}
fn read_protobuf_rewards(rewards_cf: &LedgerColumn<cf::Rewards>, slot: Slot) -> Option<Rewards> {
rewards_cf.get_protobuf(slot).unwrap().map(|r| r.into())
}
fn bench_write_rewards<F>(bench: &mut Bencher, ledger_path: &Path, write_method: F)
where
F: Fn(&LedgerColumn<cf::Rewards>, Slot, Rewards),
{
let blockstore =
Blockstore::open(ledger_path).expect("Expected to be able to open database ledger");
let rewards = create_rewards();
let mut slot = 0;
let rewards_cf = blockstore.db().column::<cf::Rewards>();
bench.iter(move || {
write_method(&rewards_cf, slot, rewards.clone());
slot += 1;
});
Blockstore::destroy(ledger_path).expect("Expected successful database destruction");
}
fn bench_read_rewards<F, G>(
bench: &mut Bencher,
ledger_path: &Path,
write_method: F,
read_method: G,
) where
F: Fn(&LedgerColumn<cf::Rewards>, Slot, Rewards),
G: Fn(&LedgerColumn<cf::Rewards>, Slot) -> Option<Rewards>,
{
let blockstore =
Blockstore::open(ledger_path).expect("Expected to be able to open database ledger");
let rewards = create_rewards();
let slot = 1;
let rewards_cf = blockstore.db().column::<cf::Rewards>();
write_method(&rewards_cf, slot, rewards);
bench.iter(move || read_method(&rewards_cf, slot));
Blockstore::destroy(ledger_path).expect("Expected successful database destruction");
}
#[bench]
fn bench_serialize_write_bincode(bencher: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
bench_write_rewards(bencher, &ledger_path, write_bincode_rewards);
}
#[bench]
fn bench_serialize_write_protobuf(bencher: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
bench_write_rewards(bencher, &ledger_path, write_protobuf_rewards);
}
#[bench]
fn bench_read_bincode(bencher: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
bench_read_rewards(
bencher,
&ledger_path,
write_bincode_rewards,
read_bincode_rewards,
);
}
#[bench]
fn bench_read_protobuf(bencher: &mut Bencher) {
let ledger_path = get_tmp_ledger_path!();
bench_read_rewards(
bencher,
&ledger_path,
write_protobuf_rewards,
read_protobuf_rewards,
);
}

View File

@@ -37,6 +37,7 @@ use solana_sdk::{
timing::timestamp,
transaction::Transaction,
};
use solana_storage_proto::StoredExtendedRewards;
use solana_transaction_status::{
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards,
TransactionStatusMeta, TransactionWithStatusMeta,
@@ -1694,7 +1695,11 @@ impl Blockstore {
let blockhash = get_last_hash(slot_entries.iter())
.unwrap_or_else(|| panic!("Rooted slot {:?} must have blockhash", slot));
let rewards = self.rewards_cf.get(slot)?.unwrap_or_else(Vec::new);
let rewards = self
.rewards_cf
.get_protobuf_or_bincode::<StoredExtendedRewards>(slot)?
.unwrap_or_default()
.into();
let block_time = self.blocktime_cf.get(slot)?;
let block = ConfirmedBlock {
@@ -2256,11 +2261,14 @@ impl Blockstore {
}
pub fn read_rewards(&self, index: Slot) -> Result<Option<Rewards>> {
self.rewards_cf.get(index)
self.rewards_cf
.get_protobuf_or_bincode::<Rewards>(index)
.map(|result| result.map(|option| option.into()))
}
pub fn write_rewards(&self, index: Slot, rewards: Rewards) -> Result<()> {
self.rewards_cf.put(index, &rewards)
let rewards = rewards.into();
self.rewards_cf.put_protobuf(index, &rewards)
}
fn get_block_timestamps(&self, slot: Slot) -> Result<Vec<(Pubkey, (Slot, UnixTimestamp))>> {
@@ -3446,7 +3454,7 @@ pub mod tests {
use bincode::serialize;
use itertools::Itertools;
use rand::{seq::SliceRandom, thread_rng};
use solana_runtime::bank::Bank;
use solana_runtime::bank::{Bank, RewardType};
use solana_sdk::{
hash::{self, hash, Hash},
instruction::CompiledInstruction,
@@ -3456,7 +3464,8 @@ pub mod tests {
signature::Signature,
transaction::TransactionError,
};
use solana_transaction_status::InnerInstructions;
use solana_storage_proto::convert::generated;
use solana_transaction_status::{InnerInstructions, Reward, Rewards};
use solana_vote_program::{vote_instruction, vote_state::Vote};
use std::{iter::FromIterator, time::Duration};
@@ -7239,4 +7248,44 @@ pub mod tests {
);
assert_eq!(completed_data_indexes, vec![0, 1, 3]);
}
#[test]
fn test_rewards_protobuf_backward_compatability() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let rewards: Rewards = (0..100)
.map(|i| Reward {
pubkey: Pubkey::new_rand().to_string(),
lamports: 42 + i,
post_balance: std::u64::MAX,
reward_type: Some(RewardType::Fee),
})
.collect();
let protobuf_rewards: generated::Rewards = rewards.into();
let deprecated_rewards: StoredExtendedRewards = protobuf_rewards.clone().into();
for slot in 0..2 {
let data = serialize(&deprecated_rewards).unwrap();
blockstore.rewards_cf.put_bytes(slot, &data).unwrap();
}
for slot in 2..4 {
blockstore
.rewards_cf
.put_protobuf(slot, &protobuf_rewards)
.unwrap();
}
for slot in 0..4 {
assert_eq!(
blockstore
.rewards_cf
.get_protobuf_or_bincode::<StoredExtendedRewards>(slot)
.unwrap()
.unwrap(),
protobuf_rewards
);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
}

View File

@@ -2,6 +2,7 @@ use crate::blockstore_meta;
use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder};
use log::*;
use prost::Message;
pub use rocksdb::Direction as IteratorDirection;
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, DBRecoveryMode,
@@ -15,7 +16,8 @@ use solana_sdk::{
pubkey::Pubkey,
signature::Signature,
};
use solana_transaction_status::{Rewards, TransactionStatusMeta};
use solana_storage_proto::convert::generated;
use solana_transaction_status::TransactionStatusMeta;
use std::{collections::HashMap, fs, marker::PhantomData, path::Path, sync::Arc};
use thiserror::Error;
@@ -71,6 +73,8 @@ pub enum BlockstoreError {
TransactionStatusSlotMismatch,
EmptyEpochStakes,
NoVoteTimestampsInRange,
ProtobufEncodeError(#[from] prost::EncodeError),
ProtobufDecodeError(#[from] prost::DecodeError),
}
pub type Result<T> = std::result::Result<T, BlockstoreError>;
@@ -421,6 +425,10 @@ impl TypedColumn for columns::TransactionStatusIndex {
type Type = blockstore_meta::TransactionStatusIndexMeta;
}
pub trait ProtobufColumn: Column {
type Type: prost::Message + Default;
}
pub trait SlotColumn<Index = u64> {}
impl<T: SlotColumn> Column for T {
@@ -543,8 +551,8 @@ impl SlotColumn for columns::Rewards {}
impl ColumnName for columns::Rewards {
const NAME: &'static str = REWARDS_CF;
}
impl TypedColumn for columns::Rewards {
type Type = Rewards;
impl ProtobufColumn for columns::Rewards {
type Type = generated::Rewards;
}
impl SlotColumn for columns::Blocktime {}
@@ -921,6 +929,40 @@ where
}
}
impl<C> LedgerColumn<C>
where
C: ProtobufColumn + ColumnName,
{
pub fn get_protobuf_or_bincode<T: DeserializeOwned + Into<C::Type>>(
&self,
key: C::Index,
) -> Result<Option<C::Type>> {
if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? {
let value = match C::Type::decode(&serialized_value[..]) {
Ok(value) => value,
Err(_) => deserialize::<T>(&serialized_value)?.into(),
};
Ok(Some(value))
} else {
Ok(None)
}
}
pub fn get_protobuf(&self, key: C::Index) -> Result<Option<C::Type>> {
if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? {
Ok(Some(C::Type::decode(&serialized_value[..])?))
} else {
Ok(None)
}
}
pub fn put_protobuf(&self, key: C::Index, value: &C::Type) -> Result<()> {
let mut buf = Vec::with_capacity(value.encoded_len());
value.encode(&mut buf)?;
self.backend.put_cf(self.handle(), &C::key(key), &buf)
}
}
impl<'a> WriteBatch<'a> {
pub fn put_bytes<C: Column + ColumnName>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> {
self.write_batch