Report validator rewards in getConfirmedBlock JSON RPC

This commit is contained in:
Michael Vines
2020-02-04 19:50:24 -07:00
parent 0bbee9456f
commit 72b11081a4
11 changed files with 235 additions and 25 deletions

View File

@ -36,6 +36,7 @@ pub mod repair_service;
pub mod replay_stage;
mod result;
pub mod retransmit_stage;
pub mod rewards_recorder_service;
pub mod rpc;
pub mod rpc_pubsub;
pub mod rpc_pubsub_service;

View File

@ -6,6 +6,7 @@ use crate::{
consensus::{StakeLockout, Tower},
poh_recorder::PohRecorder,
result::Result,
rewards_recorder_service::RewardsRecorderSender,
rpc_subscriptions::RpcSubscriptions,
};
use solana_ledger::{
@ -77,6 +78,7 @@ pub struct ReplayStageConfig {
pub snapshot_package_sender: Option<SnapshotPackageSender>,
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_sender: Option<RewardsRecorderSender>,
}
pub struct ReplayStage {
@ -179,6 +181,7 @@ impl ReplayStage {
snapshot_package_sender,
block_commitment_cache,
transaction_status_sender,
rewards_sender,
} = config;
let (root_bank_sender, root_bank_receiver) = channel();
@ -219,6 +222,7 @@ impl ReplayStage {
&bank_forks,
&leader_schedule_cache,
&subscriptions,
rewards_sender.clone(),
);
datapoint_debug!(
"replay_stage-memory",
@ -395,6 +399,7 @@ impl ReplayStage {
&poh_recorder,
&leader_schedule_cache,
&subscriptions,
rewards_sender.clone(),
);
if let Some(bank) = poh_recorder.lock().unwrap().bank() {
@ -468,6 +473,7 @@ impl ReplayStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
subscriptions: &Arc<RpcSubscriptions>,
rewards_sender: Option<RewardsRecorderSender>,
) {
// all the individual calls to poh_recorder.lock() are designed to
// increase granularity, decrease contention
@ -533,6 +539,7 @@ impl ReplayStage {
.unwrap()
.insert(Bank::new_from_parent(&parent, my_pubkey, poh_slot));
Self::record_rewards(&tpu_bank, &rewards_sender);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
} else {
error!("{} No next leader found", my_pubkey);
@ -976,6 +983,7 @@ impl ReplayStage {
forks_lock: &RwLock<BankForks>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
subscriptions: &Arc<RpcSubscriptions>,
rewards_sender: Option<RewardsRecorderSender>,
) {
// Find the next slot that chains to the old slot
let forks = forks_lock.read().unwrap();
@ -1011,10 +1019,10 @@ impl ReplayStage {
forks.root()
);
subscriptions.notify_slot(child_slot, parent_slot, forks.root());
new_banks.insert(
child_slot,
Bank::new_from_parent(&parent_bank, &leader, child_slot),
);
let child_bank = Bank::new_from_parent(&parent_bank, &leader, child_slot);
Self::record_rewards(&child_bank, &rewards_sender);
new_banks.insert(child_slot, child_bank);
}
}
drop(forks);
@ -1025,6 +1033,16 @@ impl ReplayStage {
}
}
fn record_rewards(bank: &Bank, rewards_sender: &Option<RewardsRecorderSender>) {
if let Some(rewards_sender) = rewards_sender {
if let Some(ref rewards) = bank.rewards {
rewards_sender
.send((bank.slot(), rewards.iter().copied().collect()))
.unwrap_or_else(|err| warn!("rewards_sender failed: {:?}", err));
}
}
}
pub fn join(self) -> thread::Result<()> {
self.commitment_service.join()?;
self.t_replay.join().map(|_| ())
@ -1382,6 +1400,7 @@ pub(crate) mod tests {
&bank_forks,
&leader_schedule_cache,
&subscriptions,
None,
);
assert!(bank_forks.read().unwrap().get(1).is_some());
@ -1394,6 +1413,7 @@ pub(crate) mod tests {
&bank_forks,
&leader_schedule_cache,
&subscriptions,
None,
);
assert!(bank_forks.read().unwrap().get(1).is_some());
assert!(bank_forks.read().unwrap().get(2).is_some());

View File

@ -0,0 +1,67 @@
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_client::rpc_response::RpcReward;
use solana_ledger::blockstore::Blockstore;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};
pub type RewardsRecorderReceiver = Receiver<(Slot, Vec<(Pubkey, i64)>)>;
pub type RewardsRecorderSender = Sender<(Slot, Vec<(Pubkey, i64)>)>;
pub struct RewardsRecorderService {
thread_hdl: JoinHandle<()>,
}
impl RewardsRecorderService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
rewards_receiver: RewardsRecorderReceiver,
blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-rewards-writer".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(RecvTimeoutError::Disconnected) =
Self::write_rewards(&rewards_receiver, &blockstore)
{
break;
}
})
.unwrap();
Self { thread_hdl }
}
fn write_rewards(
rewards_receiver: &RewardsRecorderReceiver,
blockstore: &Arc<Blockstore>,
) -> Result<(), RecvTimeoutError> {
let (slot, rewards) = rewards_receiver.recv_timeout(Duration::from_secs(1))?;
let rpc_rewards = rewards
.into_iter()
.map(|(pubkey, lamports)| RpcReward {
pubkey: pubkey.to_string(),
lamports,
})
.collect();
blockstore
.write_rewards(slot, rpc_rewards)
.expect("Expect database write to succeed");
Ok(())
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -9,6 +9,7 @@ use crate::{
poh_recorder::PohRecorder,
replay_stage::{ReplayStage, ReplayStageConfig},
retransmit_stage::RetransmitStage,
rewards_recorder_service::RewardsRecorderSender,
rpc_subscriptions::RpcSubscriptions,
shred_fetch_stage::ShredFetchStage,
sigverify_shreds::ShredSigVerifier,
@ -86,6 +87,7 @@ impl Tvu {
cfg: Option<Arc<AtomicBool>>,
shred_version: u16,
transaction_status_sender: Option<TransactionStatusSender>,
rewards_sender: Option<RewardsRecorderSender>,
) -> Self {
let keypair: Arc<Keypair> = cluster_info
.read()
@ -170,6 +172,7 @@ impl Tvu {
snapshot_package_sender,
block_commitment_cache,
transaction_status_sender,
rewards_sender,
};
let (replay_stage, root_bank_receiver) = ReplayStage::new(
@ -312,6 +315,7 @@ pub mod tests {
None,
0,
None,
None,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();

View File

@ -8,6 +8,7 @@ use crate::{
gossip_service::{discover_cluster, GossipService},
poh_recorder::PohRecorder,
poh_service::PohService,
rewards_recorder_service::RewardsRecorderService,
rpc::JsonRpcConfig,
rpc_pubsub_service::PubSubService,
rpc_service::JsonRpcService,
@ -122,6 +123,7 @@ pub struct Validator {
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
rpc_service: Option<(JsonRpcService, PubSubService)>,
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>,
gossip_service: GossipService,
serve_repair_service: ServeRepairService,
poh_recorder: Arc<Mutex<PohRecorder>>,
@ -268,6 +270,21 @@ impl Validator {
(None, None)
};
let (rewards_sender, rewards_recorder_service) =
if rpc_service.is_some() && !config.transaction_status_service_disabled {
let (rewards_sender, rewards_receiver) = unbounded();
(
Some(rewards_sender),
Some(RewardsRecorderService::new(
rewards_receiver,
blockstore.clone(),
&exit,
)),
)
} else {
(None, None)
};
info!(
"Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}",
bank.epoch(),
@ -388,6 +405,7 @@ impl Validator {
config.enable_partition.clone(),
node.info.shred_version,
transaction_status_sender.clone(),
rewards_sender,
);
if config.dev_sigverify_disabled {
@ -416,6 +434,7 @@ impl Validator {
serve_repair_service,
rpc_service,
transaction_status_service,
rewards_recorder_service,
tpu,
tvu,
poh_service,
@ -473,6 +492,10 @@ impl Validator {
transaction_status_service.join()?;
}
if let Some(rewards_recorder_service) = self.rewards_recorder_service {
rewards_recorder_service.join()?;
}
self.gossip_service.join()?;
self.serve_repair_service.join()?;
self.tpu.join()?;